update docs

This commit is contained in:
zachary62 2025-01-12 00:17:29 +00:00
parent 1d6f7a3e78
commit 7db8c049e8
5 changed files with 64 additions and 100 deletions

View File

@ -13,30 +13,18 @@ Process large inputs by splitting them into chunks using [BatchNode](./batch.md)
```python
class MapSummaries(BatchNode):
def prep(self, shared):
text = shared["text"]
return [text[i:i+10000] for i in range(0, len(text), 10000)]
def exec(self, chunk):
return call_llm(f"Summarize this chunk: {chunk}")
def post(self, shared, prep_res, exec_res_list):
shared["summaries"] = exec_res_list
def prep(self, shared): return [shared["text"][i:i+10000] for i in range(0, len(shared["text"]), 10000)]
def exec(self, chunk): return call_llm(f"Summarize this chunk: {chunk}")
def post(self, shared, prep_res, exec_res_list): shared["summaries"] = exec_res_list
class ReduceSummaries(Node):
def prep(self, shared):
return shared["summaries"]
def exec(self, summaries):
return call_llm(f"Combine these summaries: {summaries}")
def post(self, shared, prep_res, exec_res):
shared["final_summary"] = exec_res
def prep(self, shared): return shared["summaries"]
def exec(self, summaries): return call_llm(f"Combine these summaries: {summaries}")
def post(self, shared, prep_res, exec_res): shared["final_summary"] = exec_res
# Connect nodes
map_node = MapSummaries()
reduce_node = ReduceSummaries()
map_node >> reduce_node
# Create flow

View File

@ -43,85 +43,61 @@ flow = Flow(start=chat)
### 2. Improved Memory Management
We can:
1. Recursively summarize conversations for overview.
2. Use [vector search](./tool.md) to retrieve relevant past exchanges for details
1. Limit the chat history to the most recent 4.
2. Use [vector search](./tool.md) to retrieve relevant exchanges beyond the last 4.
```python
class HandleInput(Node):
def prep(self, shared):
if "history" not in shared:
shared["history"] = []
shared["summary"] = ""
shared["memory_index"] = None
shared["memories"] = []
class ChatWithMemory(Node):
def prep(self, s):
# Initialize shared dict
s.setdefault("history", [])
s.setdefault("memory_index", None)
user_input = input("You: ")
query_embedding = get_embedding(user_input)
relevant_memories = []
if shared["memory_index"] is not None:
indices, _ = search_index(shared["memory_index"], query_embedding, top_k=2)
relevant_memories = [shared["memories"][i[0]] for i in indices]
# Retrieve relevant past if we have enough history and an index
relevant = []
if len(s["history"]) > 8 and s["memory_index"]:
idx, _ = search_index(s["memory_index"], get_embedding(user_input), top_k=2)
relevant = [s["history"][i[0]] for i in idx]
return {"user_input": user_input, "recent": s["history"][-8:], "relevant": relevant}
def exec(self, c):
messages = [{"role": "system", "content": "You are a helpful assistant."}]
# Include relevant history if any
if c["relevant"]:
messages.append({"role": "system", "content": f"Relevant: {c['relevant']}"})
# Add recent history and the current user input
messages += c["recent"] + [{"role": "user", "content": c["user_input"]}]
return call_llm(messages)
def post(self, s, pre, ans):
# Update chat history
s["history"] += [
{"role": "user", "content": pre["user_input"]},
{"role": "assistant", "content": ans}
]
shared["current_input"] = {
"summary": shared["summary"],
"relevant": relevant_memories,
"input": user_input
}
class GenerateResponse(Node):
def prep(self, shared):
return shared["current_input"]
def exec(self, context):
prompt = f"""Context:
Summary: {context['summary']}
Relevant past: {context['relevant']}
User: {context['input']}
Response:"""
return call_llm(prompt)
# When first reaching 8 messages, create index
if len(s["history"]) == 8:
embeddings = []
for i in range(0, 8, 2):
e = s["history"][i]["content"] + " " + s["history"][i+1]["content"]
embeddings.append(get_embedding(e))
s["memory_index"] = create_index(embeddings)
# Embed older exchanges once we exceed 8 messages
elif len(s["history"]) > 8:
pair = s["history"][-10:-8]
embedding = get_embedding(pair[0]["content"] + " " + pair[1]["content"])
s["memory_index"].add(np.array([embedding]).astype('float32'))
def post(self, shared, prep_res, exec_res):
shared["history"].append({"role": "user", "content": prep_res["input"]})
shared["history"].append({"role": "assistant", "content": exec_res})
print(f"Assistant: {ans}")
return "continue"
class UpdateMemory(Node):
def prep(self, shared):
return shared["current_input"]["input"]
def exec(self, user_input):
return get_embedding(user_input)
def post(self, shared, prep_res, exec_res):
shared["memories"].append(prep_res)
if shared["memory_index"] is None:
shared["memory_index"] = create_index([exec_res])
else:
shared["memory_index"].add(np.array([exec_res]))
class UpdateSummary(Node):
def prep(self, shared):
if shared["history"]:
return shared["history"][-10:]
return None
def exec(self, recent_history):
if recent_history:
return call_llm(f"Summarize this conversation:\n{recent_history}")
return ""
def post(self, shared, prep_res, exec_res):
if exec_res:
shared["summary"] = exec_res
# Connect nodes
input_node = HandleInput()
response_node = GenerateResponse()
memory_node = UpdateMemory()
summary_node = UpdateSummary()
input_node >> response_node >> memory_node >> summary_node >> input_node
chat_flow = Flow(start=input_node)
chat = ChatWithMemory()
chat - "continue" >> chat
flow = Flow(start=chat)
flow.run({})
```

View File

@ -18,7 +18,7 @@ A **Node** is the smallest building block. Each Node has 3 steps:
- The **main execution** step, with optional retries and error handling (below).
- Examples: *primarily for LLMs, but can also for remote APIs*.
- ⚠️ If retries enabled, ensure idempotent implementation.
- ⚠️ This step must not write to the `shared` store. If reading from `shared` is necessary, retrieve and pass it along in `prep()`.
- ⚠️ This must **NOT** write to `shared`. If reads are necessary, extract them in `prep()` and pass them in `prep_res`.
- Returns `exec_res`, which is passed to `post()`.
3. `post(shared, prep_res, exec_res)`
@ -30,7 +30,7 @@ A **Node** is the smallest building block. Each Node has 3 steps:
{: .note }
## Fault Tolerance & Retries
### Fault Tolerance & Retries
Nodes can **retry** execution if `exec()` raises an exception. You control this via two parameters when you create the Node:
@ -55,10 +55,9 @@ def exec_fallback(self, shared, prep_res, exc):
raise exc
```
By default, it just re-raises `exc`. But you can return a fallback result instead.
That fallback result becomes the `exec_res` passed to `post()`.
By default, it just re-raises `exc`. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`.
## Example
### Example: Summarize file
```python
class SummarizeFile(Node):
@ -83,10 +82,10 @@ class SummarizeFile(Node):
# Return "default" by not returning anything
summarize_node = SummarizeFile(max_retries=3)
# Run the node standalone for testing (calls prep->exec->post).
# If exec() fails, it retries up to 3 times before calling exec_fallback().
summarize_node.set_params({"filename": "test_file.txt"})
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # Usually "default"

View File

@ -38,7 +38,6 @@ class AnswerQuestion(Node):
# Connect nodes
prep = PrepareEmbeddings()
qa = AnswerQuestion()
prep >> qa
# Create flow

View File

@ -11,6 +11,8 @@ Visualizing the nested graph can help understanding. While we **dont** includ
### Example: Visualization of Node with Mermaid
This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization.
{% raw %}
```python
def build_mermaid(start):