update doc

This commit is contained in:
zachary62 2025-02-28 18:15:17 -05:00
parent 3dc17d3741
commit 4a9dd5bca0
8 changed files with 114 additions and 104 deletions

View File

@ -52,7 +52,6 @@ nav_order: 1
- **Test Cases**: Develop clear, reproducible tests for each part of the flow.
- **Self-Evaluation**: Introduce an additional node (powered by LLMs) to review outputs when results are uncertain.
## Example LLM Project File Structure
```
@ -63,38 +62,29 @@ my_project/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── tests/
│ ├── __init__.py
│ ├── test_flow.py
│ └── test_nodes.py
├── requirements.txt
└── docs/
└── design.md
```
### `docs/`
Store the documentation of the project.
It should include a `design.md` file, which describes
Holds all project documentation. Include a `design.md` file covering:
- Project requirements
- Required utility functions
- High-level flow with a mermaid diagram
- Utility functions
- High-level flow (with a Mermaid diagram)
- Shared memory data structure
- For each node, discuss
- Node purpose and design (e.g., should it be a batch or async node?)
- How the data shall be read (for `prep`) and written (for `post`)
- How the data shall be processed (for `exec`)
- Node designs:
- Purpose and design (e.g., batch or async)
- Data read (prep) and write (post)
- Data processing (exec)
### `utils/`
Houses functions for external API calls (e.g., LLMs, web searches, etc.).
Its recommended to dedicate one Python file per API call, with names like `call_llm.py` or `search_web.py`. Each file should include:
Houses functions for external API calls (e.g., LLMs, web searches, etc.). Its recommended to dedicate one Python file per API call, with names like `call_llm.py` or `search_web.py`. Each file should include:
- The function to call the API
- A main function to run that API call
- A main function to run that API call for testing
For instance, heres a simplified `call_llm.py` example:
@ -109,12 +99,9 @@ def call_llm(prompt):
)
return response.choices[0].message.content
def main():
if __name__ == "__main__":
prompt = "Hello, how are you?"
print(call_llm(prompt))
if __name__ == "__main__":
main()
```
### `main.py`
@ -124,17 +111,3 @@ Serves as the projects entry point.
### `flow.py`
Implements the applications flow, starting with node followed by the flow structure.
### `tests/`
Optionally contains all tests. Use `pytest` for testing flows, nodes, and utility functions.
For example, `test_call_llm.py` might look like:
```python
from utils.call_llm import call_llm
def test_call_llm():
prompt = "Hello, how are you?"
assert call_llm(prompt) is not None
```

View File

@ -7,7 +7,13 @@ nav_order: 3
# Map Reduce
Process large inputs by splitting them into chunks using [BatchNode](./batch.md), then combining results.
MapReduce is a design pattern suitable when you have either:
- Large input data (e.g., multiple files to process), or
- Large output data (e.g., multiple forms to fill)
and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](./batch.md) in the map phase, followed by aggregation in the reduce phase.
### Example: Document Summarization

View File

@ -47,57 +47,79 @@ We can:
2. Use [vector search](./tool.md) to retrieve relevant exchanges beyond the last 4.
```python
class ChatWithMemory(Node):
################################
# Node A: Retrieve user input & relevant messages
################################
class ChatRetrieve(Node):
def prep(self, s):
# Initialize shared dict
s.setdefault("history", [])
s.setdefault("memory_index", None)
user_input = input("You: ")
return user_input
# Retrieve relevant past if we have enough history and an index
def exec(self, user_input):
emb = get_embedding(user_input)
relevant = []
if len(s["history"]) > 8 and s["memory_index"]:
idx, _ = search_index(s["memory_index"], get_embedding(user_input), top_k=2)
relevant = [s["history"][i[0]] for i in idx]
if len(shared["history"]) > 8 and shared["memory_index"]:
idx, _ = search_index(shared["memory_index"], emb, top_k=2)
relevant = [shared["history"][i[0]] for i in idx]
return (user_input, relevant)
return {"user_input": user_input, "recent": s["history"][-8:], "relevant": relevant}
def post(self, s, p, r):
user_input, relevant = r
s["user_input"] = user_input
s["relevant"] = relevant
return "continue"
def exec(self, c):
messages = [{"role": "system", "content": "You are a helpful assistant."}]
# Include relevant history if any
if c["relevant"]:
messages.append({"role": "system", "content": f"Relevant: {c['relevant']}"})
# Add recent history and the current user input
messages += c["recent"] + [{"role": "user", "content": c["user_input"]}]
return call_llm(messages)
################################
# Node B: Call LLM, update history + index
################################
class ChatReply(Node):
def prep(self, s):
user_input = s["user_input"]
recent = s["history"][-8:]
relevant = s.get("relevant", [])
return user_input, recent, relevant
def exec(self, inputs):
user_input, recent, relevant = inputs
msgs = [{"role":"system","content":"You are a helpful assistant."}]
if relevant:
msgs.append({"role":"system","content":f"Relevant: {relevant}"})
msgs.extend(recent)
msgs.append({"role":"user","content":user_input})
ans = call_llm(msgs)
return ans
def post(self, s, pre, ans):
# Update chat history
s["history"] += [
{"role": "user", "content": pre["user_input"]},
{"role": "assistant", "content": ans}
]
user_input, _, _ = pre
s["history"].append({"role":"user","content":user_input})
s["history"].append({"role":"assistant","content":ans})
# When first reaching 8 messages, create index
# Manage memory index
if len(s["history"]) == 8:
embeddings = []
embs = []
for i in range(0, 8, 2):
e = s["history"][i]["content"] + " " + s["history"][i+1]["content"]
embeddings.append(get_embedding(e))
s["memory_index"] = create_index(embeddings)
# Embed older exchanges once we exceed 8 messages
text = s["history"][i]["content"] + " " + s["history"][i+1]["content"]
embs.append(get_embedding(text))
s["memory_index"] = create_index(embs)
elif len(s["history"]) > 8:
pair = s["history"][-10:-8]
embedding = get_embedding(pair[0]["content"] + " " + pair[1]["content"])
s["memory_index"].add(np.array([embedding]).astype('float32'))
text = s["history"][-2]["content"] + " " + s["history"][-1]["content"]
new_emb = np.array([get_embedding(text)]).astype('float32')
s["memory_index"].add(new_emb)
print(f"Assistant: {ans}")
return "continue"
chat = ChatWithMemory()
chat - "continue" >> chat
flow = Flow(start=chat)
flow.run({})
################################
# Flow wiring
################################
retrieve = ChatRetrieve()
reply = ChatReply()
retrieve - "continue" >> reply
reply - "continue" >> retrieve
flow = Flow(start=retrieve)
shared = {}
flow.run(shared)
```

View File

@ -12,6 +12,14 @@ nav_order: 6
> Because of Pythons GIL, parallel nodes and flows cant truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O.
{: .warning }
> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize.
>
> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals).
>
> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits.
{: .best-practice }
## AsyncParallelBatchNode
Like **AsyncBatchNode**, but run `exec_async()` in **parallel**:
@ -47,12 +55,3 @@ sub_flow = AsyncFlow(start=LoadAndSummarizeFile())
parallel_flow = SummarizeMultipleFiles(start=sub_flow)
await parallel_flow.run_async(shared)
```
## Best Practices
- **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize.
- **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals).
- **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits.

View File

@ -15,32 +15,42 @@ Use [vector search](./tool.md) to find relevant context for LLM responses.
```python
class PrepareEmbeddings(Node):
def prep(self, shared):
texts = shared["texts"]
embeddings = [get_embedding(text) for text in texts]
shared["search_index"] = create_index(embeddings)
return shared["texts"]
def exec(self, texts):
# Embed each text chunk
embs = [get_embedding(t) for t in texts]
return embs
def post(self, shared, prep_res, exec_res):
shared["search_index"] = create_index(exec_res)
# no action string means "default"
class AnswerQuestion(Node):
def prep(self, shared):
question = input("Enter question: ")
query_embedding = get_embedding(question)
indices, _ = search_index(shared["search_index"], query_embedding, top_k=1)
relevant_text = shared["texts"][indices[0][0]]
return question, relevant_text
return question
def exec(self, inputs):
question, context = inputs
prompt = f"Question: {question}\nContext: {context}\nAnswer: "
def exec(self, question):
q_emb = get_embedding(question)
idx, _ = search_index(shared["search_index"], q_emb, top_k=1)
best_id = idx[0][0]
relevant_text = shared["texts"][best_id]
prompt = f"Question: {question}\nContext: {relevant_text}\nAnswer:"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
print(f"Answer: {exec_res}")
def post(self, shared, p, answer):
print("Answer:", answer)
# Connect nodes
############################################
# Wire up the flow
prep = PrepareEmbeddings()
qa = AnswerQuestion()
prep >> qa
# Create flow
qa_flow = Flow(start=prep)
qa_flow.run(shared)
flow = Flow(start=prep)
# Example usage
shared = {"texts": ["I love apples", "Cats are great", "The sky is blue"]}
flow.run(shared)
```

View File

@ -83,6 +83,9 @@ summary:
return structured_result
```
> Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic)
{: .note }
### Why YAML instead of JSON?
Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes.

View File

@ -11,7 +11,6 @@ Similar to LLM wrappers, we **don't** provide built-in tools. Here, we recommend
---
## 1. Embedding Calls
```python

View File

@ -140,5 +140,3 @@ data_science_flow.run({})
The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']`