From 4a9dd5bca05791fce3bd4db7c1c3cd5c32f7af44 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Fri, 28 Feb 2025 18:15:17 -0500 Subject: [PATCH] update doc --- docs/guide.md | 49 ++++++------------------ docs/mapreduce.md | 8 +++- docs/memory.md | 96 +++++++++++++++++++++++++++++------------------ docs/parallel.md | 17 ++++----- docs/rag.md | 42 +++++++++++++-------- docs/structure.md | 3 ++ docs/tool.md | 1 - docs/viz.md | 2 - 8 files changed, 114 insertions(+), 104 deletions(-) diff --git a/docs/guide.md b/docs/guide.md index 64134fb..98e4c16 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -52,7 +52,6 @@ nav_order: 1 - **Test Cases**: Develop clear, reproducible tests for each part of the flow. - **Self-Evaluation**: Introduce an additional node (powered by LLMs) to review outputs when results are uncertain. - ## Example LLM Project File Structure ``` @@ -63,38 +62,29 @@ my_project/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py -├── tests/ -│ ├── __init__.py -│ ├── test_flow.py -│ └── test_nodes.py ├── requirements.txt └── docs/ └── design.md ``` - ### `docs/` -Store the documentation of the project. - -It should include a `design.md` file, which describes +Holds all project documentation. Include a `design.md` file covering: - Project requirements -- Required utility functions -- High-level flow with a mermaid diagram +- Utility functions +- High-level flow (with a Mermaid diagram) - Shared memory data structure -- For each node, discuss - - Node purpose and design (e.g., should it be a batch or async node?) - - How the data shall be read (for `prep`) and written (for `post`) - - How the data shall be processed (for `exec`) +- Node designs: + - Purpose and design (e.g., batch or async) + - Data read (prep) and write (post) + - Data processing (exec) ### `utils/` -Houses functions for external API calls (e.g., LLMs, web searches, etc.). - -It’s recommended to dedicate one Python file per API call, with names like `call_llm.py` or `search_web.py`. Each file should include: +Houses functions for external API calls (e.g., LLMs, web searches, etc.). It’s recommended to dedicate one Python file per API call, with names like `call_llm.py` or `search_web.py`. Each file should include: - The function to call the API -- A main function to run that API call +- A main function to run that API call for testing For instance, here’s a simplified `call_llm.py` example: @@ -109,12 +99,9 @@ def call_llm(prompt): ) return response.choices[0].message.content -def main(): +if __name__ == "__main__": prompt = "Hello, how are you?" print(call_llm(prompt)) - -if __name__ == "__main__": - main() ``` ### `main.py` @@ -123,18 +110,4 @@ Serves as the project’s entry point. ### `flow.py` -Implements the application’s flow, starting with node followed by the flow structure. - - -### `tests/` - -Optionally contains all tests. Use `pytest` for testing flows, nodes, and utility functions. -For example, `test_call_llm.py` might look like: - -```python -from utils.call_llm import call_llm - -def test_call_llm(): - prompt = "Hello, how are you?" - assert call_llm(prompt) is not None -``` +Implements the application’s flow, starting with node followed by the flow structure. \ No newline at end of file diff --git a/docs/mapreduce.md b/docs/mapreduce.md index 186f83b..8698192 100644 --- a/docs/mapreduce.md +++ b/docs/mapreduce.md @@ -7,7 +7,13 @@ nav_order: 3 # Map Reduce -Process large inputs by splitting them into chunks using [BatchNode](./batch.md), then combining results. +MapReduce is a design pattern suitable when you have either: +- Large input data (e.g., multiple files to process), or +- Large output data (e.g., multiple forms to fill) + +and there is a logical way to break the task into smaller, ideally independent parts. +You first break down the task using [BatchNode](./batch.md) in the map phase, followed by aggregation in the reduce phase. + ### Example: Document Summarization diff --git a/docs/memory.md b/docs/memory.md index 683b5a8..09dc538 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -47,57 +47,79 @@ We can: 2. Use [vector search](./tool.md) to retrieve relevant exchanges beyond the last 4. ```python -class ChatWithMemory(Node): +################################ +# Node A: Retrieve user input & relevant messages +################################ +class ChatRetrieve(Node): def prep(self, s): - # Initialize shared dict s.setdefault("history", []) s.setdefault("memory_index", None) - user_input = input("You: ") - - # Retrieve relevant past if we have enough history and an index + return user_input + + def exec(self, user_input): + emb = get_embedding(user_input) relevant = [] - if len(s["history"]) > 8 and s["memory_index"]: - idx, _ = search_index(s["memory_index"], get_embedding(user_input), top_k=2) - relevant = [s["history"][i[0]] for i in idx] + if len(shared["history"]) > 8 and shared["memory_index"]: + idx, _ = search_index(shared["memory_index"], emb, top_k=2) + relevant = [shared["history"][i[0]] for i in idx] + return (user_input, relevant) - return {"user_input": user_input, "recent": s["history"][-8:], "relevant": relevant} + def post(self, s, p, r): + user_input, relevant = r + s["user_input"] = user_input + s["relevant"] = relevant + return "continue" - def exec(self, c): - messages = [{"role": "system", "content": "You are a helpful assistant."}] - # Include relevant history if any - if c["relevant"]: - messages.append({"role": "system", "content": f"Relevant: {c['relevant']}"}) - # Add recent history and the current user input - messages += c["recent"] + [{"role": "user", "content": c["user_input"]}] - return call_llm(messages) +################################ +# Node B: Call LLM, update history + index +################################ +class ChatReply(Node): + def prep(self, s): + user_input = s["user_input"] + recent = s["history"][-8:] + relevant = s.get("relevant", []) + return user_input, recent, relevant + + def exec(self, inputs): + user_input, recent, relevant = inputs + msgs = [{"role":"system","content":"You are a helpful assistant."}] + if relevant: + msgs.append({"role":"system","content":f"Relevant: {relevant}"}) + msgs.extend(recent) + msgs.append({"role":"user","content":user_input}) + ans = call_llm(msgs) + return ans def post(self, s, pre, ans): - # Update chat history - s["history"] += [ - {"role": "user", "content": pre["user_input"]}, - {"role": "assistant", "content": ans} - ] + user_input, _, _ = pre + s["history"].append({"role":"user","content":user_input}) + s["history"].append({"role":"assistant","content":ans}) - # When first reaching 8 messages, create index + # Manage memory index if len(s["history"]) == 8: - embeddings = [] + embs = [] for i in range(0, 8, 2): - e = s["history"][i]["content"] + " " + s["history"][i+1]["content"] - embeddings.append(get_embedding(e)) - s["memory_index"] = create_index(embeddings) - - # Embed older exchanges once we exceed 8 messages + text = s["history"][i]["content"] + " " + s["history"][i+1]["content"] + embs.append(get_embedding(text)) + s["memory_index"] = create_index(embs) elif len(s["history"]) > 8: - pair = s["history"][-10:-8] - embedding = get_embedding(pair[0]["content"] + " " + pair[1]["content"]) - s["memory_index"].add(np.array([embedding]).astype('float32')) - + text = s["history"][-2]["content"] + " " + s["history"][-1]["content"] + new_emb = np.array([get_embedding(text)]).astype('float32') + s["memory_index"].add(new_emb) + print(f"Assistant: {ans}") return "continue" -chat = ChatWithMemory() -chat - "continue" >> chat -flow = Flow(start=chat) -flow.run({}) +################################ +# Flow wiring +################################ +retrieve = ChatRetrieve() +reply = ChatReply() +retrieve - "continue" >> reply +reply - "continue" >> retrieve + +flow = Flow(start=retrieve) +shared = {} +flow.run(shared) ``` diff --git a/docs/parallel.md b/docs/parallel.md index 067b0fe..747dfbd 100644 --- a/docs/parallel.md +++ b/docs/parallel.md @@ -12,6 +12,14 @@ nav_order: 6 > Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning } +> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. +> +> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). +> +> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. +{: .best-practice } + + ## AsyncParallelBatchNode Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: @@ -47,12 +55,3 @@ sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) parallel_flow = SummarizeMultipleFiles(start=sub_flow) await parallel_flow.run_async(shared) ``` - - -## Best Practices - -- **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. - -- **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). - -- **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. diff --git a/docs/rag.md b/docs/rag.md index 7308c8a..35d5c8b 100644 --- a/docs/rag.md +++ b/docs/rag.md @@ -15,32 +15,42 @@ Use [vector search](./tool.md) to find relevant context for LLM responses. ```python class PrepareEmbeddings(Node): def prep(self, shared): - texts = shared["texts"] - embeddings = [get_embedding(text) for text in texts] - shared["search_index"] = create_index(embeddings) + return shared["texts"] + + def exec(self, texts): + # Embed each text chunk + embs = [get_embedding(t) for t in texts] + return embs + + def post(self, shared, prep_res, exec_res): + shared["search_index"] = create_index(exec_res) + # no action string means "default" class AnswerQuestion(Node): def prep(self, shared): question = input("Enter question: ") - query_embedding = get_embedding(question) - indices, _ = search_index(shared["search_index"], query_embedding, top_k=1) - relevant_text = shared["texts"][indices[0][0]] - return question, relevant_text + return question - def exec(self, inputs): - question, context = inputs - prompt = f"Question: {question}\nContext: {context}\nAnswer: " + def exec(self, question): + q_emb = get_embedding(question) + idx, _ = search_index(shared["search_index"], q_emb, top_k=1) + best_id = idx[0][0] + relevant_text = shared["texts"][best_id] + prompt = f"Question: {question}\nContext: {relevant_text}\nAnswer:" return call_llm(prompt) - def post(self, shared, prep_res, exec_res): - print(f"Answer: {exec_res}") + def post(self, shared, p, answer): + print("Answer:", answer) -# Connect nodes +############################################ +# Wire up the flow prep = PrepareEmbeddings() qa = AnswerQuestion() prep >> qa -# Create flow -qa_flow = Flow(start=prep) -qa_flow.run(shared) +flow = Flow(start=prep) + +# Example usage +shared = {"texts": ["I love apples", "Cats are great", "The sky is blue"]} +flow.run(shared) ``` \ No newline at end of file diff --git a/docs/structure.md b/docs/structure.md index 9062171..d379083 100644 --- a/docs/structure.md +++ b/docs/structure.md @@ -83,6 +83,9 @@ summary: return structured_result ``` +> Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic) +{: .note } + ### Why YAML instead of JSON? Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. diff --git a/docs/tool.md b/docs/tool.md index 818ebec..262c7d8 100644 --- a/docs/tool.md +++ b/docs/tool.md @@ -11,7 +11,6 @@ Similar to LLM wrappers, we **don't** provide built-in tools. Here, we recommend --- - ## 1. Embedding Calls ```python diff --git a/docs/viz.md b/docs/viz.md index 35ef076..0841079 100644 --- a/docs/viz.md +++ b/docs/viz.md @@ -140,5 +140,3 @@ data_science_flow.run({}) The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']` - -