From 7db8c049e8fac81d471fb99c502a86f9b1062e83 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Sun, 12 Jan 2025 00:17:29 +0000 Subject: [PATCH] update docs --- docs/mapreduce.md | 24 +++------ docs/memory.md | 122 +++++++++++++++++++--------------------------- docs/node.md | 15 +++--- docs/rag.md | 1 - docs/viz.md | 2 + 5 files changed, 64 insertions(+), 100 deletions(-) diff --git a/docs/mapreduce.md b/docs/mapreduce.md index ac3607b..9f5a526 100644 --- a/docs/mapreduce.md +++ b/docs/mapreduce.md @@ -13,30 +13,18 @@ Process large inputs by splitting them into chunks using [BatchNode](./batch.md) ```python class MapSummaries(BatchNode): - def prep(self, shared): - text = shared["text"] - return [text[i:i+10000] for i in range(0, len(text), 10000)] - - def exec(self, chunk): - return call_llm(f"Summarize this chunk: {chunk}") - - def post(self, shared, prep_res, exec_res_list): - shared["summaries"] = exec_res_list + def prep(self, shared): return [shared["text"][i:i+10000] for i in range(0, len(shared["text"]), 10000)] + def exec(self, chunk): return call_llm(f"Summarize this chunk: {chunk}") + def post(self, shared, prep_res, exec_res_list): shared["summaries"] = exec_res_list class ReduceSummaries(Node): - def prep(self, shared): - return shared["summaries"] - - def exec(self, summaries): - return call_llm(f"Combine these summaries: {summaries}") - - def post(self, shared, prep_res, exec_res): - shared["final_summary"] = exec_res + def prep(self, shared): return shared["summaries"] + def exec(self, summaries): return call_llm(f"Combine these summaries: {summaries}") + def post(self, shared, prep_res, exec_res): shared["final_summary"] = exec_res # Connect nodes map_node = MapSummaries() reduce_node = ReduceSummaries() - map_node >> reduce_node # Create flow diff --git a/docs/memory.md b/docs/memory.md index 5e01324..47968dc 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -43,85 +43,61 @@ flow = Flow(start=chat) ### 2. Improved Memory Management We can: -1. Recursively summarize conversations for overview. -2. Use [vector search](./tool.md) to retrieve relevant past exchanges for details +1. Limit the chat history to the most recent 4. +2. Use [vector search](./tool.md) to retrieve relevant exchanges beyond the last 4. ```python -class HandleInput(Node): - def prep(self, shared): - if "history" not in shared: - shared["history"] = [] - shared["summary"] = "" - shared["memory_index"] = None - shared["memories"] = [] +class ChatWithMemory(Node): + def prep(self, s): + # Initialize shared dict + s.setdefault("history", []) + s.setdefault("memory_index", None) user_input = input("You: ") - query_embedding = get_embedding(user_input) - relevant_memories = [] - if shared["memory_index"] is not None: - indices, _ = search_index(shared["memory_index"], query_embedding, top_k=2) - relevant_memories = [shared["memories"][i[0]] for i in indices] + # Retrieve relevant past if we have enough history and an index + relevant = [] + if len(s["history"]) > 8 and s["memory_index"]: + idx, _ = search_index(s["memory_index"], get_embedding(user_input), top_k=2) + relevant = [s["history"][i[0]] for i in idx] + + return {"user_input": user_input, "recent": s["history"][-8:], "relevant": relevant} + + def exec(self, c): + messages = [{"role": "system", "content": "You are a helpful assistant."}] + # Include relevant history if any + if c["relevant"]: + messages.append({"role": "system", "content": f"Relevant: {c['relevant']}"}) + # Add recent history and the current user input + messages += c["recent"] + [{"role": "user", "content": c["user_input"]}] + return call_llm(messages) + + def post(self, s, pre, ans): + # Update chat history + s["history"] += [ + {"role": "user", "content": pre["user_input"]}, + {"role": "assistant", "content": ans} + ] - shared["current_input"] = { - "summary": shared["summary"], - "relevant": relevant_memories, - "input": user_input - } - -class GenerateResponse(Node): - def prep(self, shared): - return shared["current_input"] - - def exec(self, context): - prompt = f"""Context: -Summary: {context['summary']} -Relevant past: {context['relevant']} -User: {context['input']} - -Response:""" - return call_llm(prompt) + # When first reaching 8 messages, create index + if len(s["history"]) == 8: + embeddings = [] + for i in range(0, 8, 2): + e = s["history"][i]["content"] + " " + s["history"][i+1]["content"] + embeddings.append(get_embedding(e)) + s["memory_index"] = create_index(embeddings) + + # Embed older exchanges once we exceed 8 messages + elif len(s["history"]) > 8: + pair = s["history"][-10:-8] + embedding = get_embedding(pair[0]["content"] + " " + pair[1]["content"]) + s["memory_index"].add(np.array([embedding]).astype('float32')) - def post(self, shared, prep_res, exec_res): - shared["history"].append({"role": "user", "content": prep_res["input"]}) - shared["history"].append({"role": "assistant", "content": exec_res}) + print(f"Assistant: {ans}") + return "continue" -class UpdateMemory(Node): - def prep(self, shared): - return shared["current_input"]["input"] - - def exec(self, user_input): - return get_embedding(user_input) - - def post(self, shared, prep_res, exec_res): - shared["memories"].append(prep_res) - if shared["memory_index"] is None: - shared["memory_index"] = create_index([exec_res]) - else: - shared["memory_index"].add(np.array([exec_res])) - -class UpdateSummary(Node): - def prep(self, shared): - if shared["history"]: - return shared["history"][-10:] - return None - - def exec(self, recent_history): - if recent_history: - return call_llm(f"Summarize this conversation:\n{recent_history}") - return "" - - def post(self, shared, prep_res, exec_res): - if exec_res: - shared["summary"] = exec_res - -# Connect nodes -input_node = HandleInput() -response_node = GenerateResponse() -memory_node = UpdateMemory() -summary_node = UpdateSummary() - -input_node >> response_node >> memory_node >> summary_node >> input_node - -chat_flow = Flow(start=input_node) +chat = ChatWithMemory() +chat - "continue" >> chat +flow = Flow(start=chat) +flow.run({}) ``` diff --git a/docs/node.md b/docs/node.md index 1398bfd..5be6c08 100644 --- a/docs/node.md +++ b/docs/node.md @@ -18,7 +18,7 @@ A **Node** is the smallest building block. Each Node has 3 steps: - The **main execution** step, with optional retries and error handling (below). - Examples: *primarily for LLMs, but can also for remote APIs*. - ⚠️ If retries enabled, ensure idempotent implementation. - - ⚠️ This step must not write to the `shared` store. If reading from `shared` is necessary, retrieve and pass it along in `prep()`. + - ⚠️ This must **NOT** write to `shared`. If reads are necessary, extract them in `prep()` and pass them in `prep_res`. - Returns `exec_res`, which is passed to `post()`. 3. `post(shared, prep_res, exec_res)` @@ -30,7 +30,7 @@ A **Node** is the smallest building block. Each Node has 3 steps: {: .note } -## Fault Tolerance & Retries +### Fault Tolerance & Retries Nodes can **retry** execution if `exec()` raises an exception. You control this via two parameters when you create the Node: @@ -55,10 +55,9 @@ def exec_fallback(self, shared, prep_res, exc): raise exc ``` -By default, it just re-raises `exc`. But you can return a fallback result instead. -That fallback result becomes the `exec_res` passed to `post()`. +By default, it just re-raises `exc`. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. -## Example +### Example: Summarize file ```python class SummarizeFile(Node): @@ -83,10 +82,10 @@ class SummarizeFile(Node): # Return "default" by not returning anything summarize_node = SummarizeFile(max_retries=3) - -# Run the node standalone for testing (calls prep->exec->post). -# If exec() fails, it retries up to 3 times before calling exec_fallback(). summarize_node.set_params({"filename": "test_file.txt"}) + +# node.run() calls prep->exec->post +# If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = summarize_node.run(shared) print("Action returned:", action_result) # Usually "default" diff --git a/docs/rag.md b/docs/rag.md index 2bd5f19..147f687 100644 --- a/docs/rag.md +++ b/docs/rag.md @@ -38,7 +38,6 @@ class AnswerQuestion(Node): # Connect nodes prep = PrepareEmbeddings() qa = AnswerQuestion() - prep >> qa # Create flow diff --git a/docs/viz.md b/docs/viz.md index c78862d..623a725 100644 --- a/docs/viz.md +++ b/docs/viz.md @@ -11,6 +11,8 @@ Visualizing the nested graph can help understanding. While we **don’t** includ ### Example: Visualization of Node with Mermaid +This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization. + {% raw %} ```python def build_mermaid(start):