================================================
File: docs/guide.md
================================================
---
layout: default
title: "Design Guidance"
parent: "Apps"
nav_order: 1
---
# LLM System Design Guidance
## Example LLM Project File Structure
```
my_project/
├── main.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── tests/
│ ├── __init__.py
│ ├── test_flow.py
│ └── test_nodes.py
├── requirements.txt
└── docs/
└── design.md
```
### `docs/`
Store the documentation of the project.
It should include a `design.md` file, which describes
- Project requirements
- Required utility functions
- High-level flow with a mermaid diagram
- Shared memory data structure
- For each node, discuss
- Node purpose and design (e.g., should it be a batch or async node?)
- How the data shall be read (for `prep`) and written (for `post`)
- How the data shall be processed (for `exec`)
### `utils/`
Houses functions for external API calls (e.g., LLMs, web searches, etc.).
It’s recommended to dedicate one Python file per API call, with names like `call_llm.py` or `search_web.py`. Each file should include:
- The function to call the API
- A main function to run that API call
For instance, here’s a simplified `call_llm.py` example:
```python
from openai import OpenAI
def call_llm(prompt):
client = OpenAI(api_key="YOUR_API_KEY_HERE")
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
def main():
prompt = "Hello, how are you?"
print(call_llm(prompt))
if __name__ == "__main__":
main()
```
### `main.py`
Serves as the project’s entry point.
### `flow.py`
Implements the application’s flow, starting with node followed by the flow structure.
### `tests/`
Optionally contains all tests. Use `pytest` for testing flows, nodes, and utility functions.
For example, `test_call_llm.py` might look like:
```python
from utils.call_llm import call_llm
def test_call_llm():
prompt = "Hello, how are you?"
assert call_llm(prompt) is not None
```
## System Design Steps
1. **Project Requirements**
- Identify the project's core entities.
- Define each functional requirement and map out how these entities interact step by step.
2. **Utility Functions**
- Determine the low-level utility functions you’ll need (e.g., for LLM calls, web searches, file handling).
- Implement these functions and write basic tests to confirm they work correctly.
3. **Flow Design**
- Develop a high-level process flow that meets the project’s requirements.
- Specify which utility functions are used at each step.
- Identify possible decision points for *Node Actions* and data-intensive operations for *Batch* tasks.
- Illustrate the flow with a Mermaid diagram.
4. **Data Structure**
- Decide how to store and update state, whether in memory (for smaller applications) or a database (for larger or persistent needs).
- Define data schemas or models that detail how information is stored, accessed, and updated.
5. **Implementation**
- Start coding with a simple, direct approach (avoid over-engineering at first).
- For each node in your flow:
- **prep**: Determine how data is accessed or retrieved.
- **exec**: Outline the actual processing or logic needed.
- **post**: Handle any final updates or data persistence tasks.
6. **Optimization**
- **Prompt Engineering**: Use clear and specific instructions with illustrative examples to reduce ambiguity.
- **Task Decomposition**: Break large, complex tasks into manageable, logical steps.
7. **Reliability**
- **Structured Output**: Verify outputs conform to the required format. Consider increasing `max_retries` if needed.
- **Test Cases**: Develop clear, reproducible tests for each part of the flow.
- **Self-Evaluation**: Introduce an additional Node (powered by LLMs) to review outputs when the results are uncertain.
================================================
File: docs/agent.md
================================================
---
layout: default
title: "Agent"
parent: "Paradigm"
nav_order: 6
---
# Agent
For many tasks, we need agents that take dynamic and recursive actions based on the inputs they receive.
You can create these agents as **Nodes** connected by *Actions* in a directed graph using [Flow](./flow.md).
### Example: Search Agent
This agent:
1. Decides whether to search or answer
2. If searches, loops back to decide if more search needed
3. Answers when enough context gathered
```python
class DecideAction(Node):
def prep(self, shared):
context = shared.get("context", "No previous search")
query = shared["query"]
return query, context
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
resp = call_llm(prompt)
yaml_str = resp.split("```yaml")[1].split("```")[0].strip()
result = yaml.safe_load(yaml_str)
assert isinstance(result, dict)
assert "action" in result
assert "reason" in result
assert result["action"] in ["search", "answer"]
if result["action"] == "search":
assert "search_term" in result
return result
def post(self, shared, prep_res, exec_res):
if exec_res["action"] == "search":
shared["search_term"] = exec_res["search_term"]
return exec_res["action"]
class SearchWeb(Node):
def prep(self, shared):
return shared["search_term"]
def exec(self, search_term):
return search_web(search_term)
def post(self, shared, prep_res, exec_res):
prev_searches = shared.get("context", [])
shared["context"] = prev_searches + [
{"term": shared["search_term"], "result": exec_res}
]
return "decide"
class DirectAnswer(Node):
def prep(self, shared):
return shared["query"], shared.get("context", "")
def exec(self, inputs):
query, context = inputs
return call_llm(f"Context: {context}\nAnswer: {query}")
def post(self, shared, prep_res, exec_res):
print(f"Answer: {exec_res}")
shared["answer"] = exec_res
# Connect nodes
decide = DecideAction()
search = SearchWeb()
answer = DirectAnswer()
decide - "search" >> search
decide - "answer" >> answer
search - "decide" >> decide # Loop back
flow = Flow(start=decide)
flow.run({"query": "Who won the Nobel Prize in Physics 2024?"})
```
================================================
File: docs/async.md
================================================
---
layout: default
title: "(Advanced) Async"
parent: "Core Abstraction"
nav_order: 5
---
# (Advanced) Async
**Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for:
1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way.
2. **exec_async()**: Typically used for async LLM calls.
3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`.
**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes.
### Example
```python
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
# Example: read a file asynchronously
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
# Example: async LLM call
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
# Example: wait for user feedback
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "approve"
return "deny"
summarize_node = SummarizeThenVerify()
final_node = Finalize()
# Define transitions
summarize_node - "approve" >> final_node
summarize_node - "deny" >> summarize_node # retry
flow = AsyncFlow(start=summarize_node)
async def main():
shared = {"doc_path": "document.txt"}
await flow.run_async(shared)
print("Final Summary:", shared.get("summary"))
asyncio.run(main())
```
================================================
File: docs/batch.md
================================================
---
layout: default
title: "Batch"
parent: "Core Abstraction"
nav_order: 4
---
# Batch
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases:
- **Chunk-based** processing (e.g., splitting large texts).
- **Iterative** processing over lists of input items (e.g., user queries, files, URLs).
## 1. BatchNode
A **BatchNode** extends `Node` but changes `prep()` and `exec()`:
- **`prep(shared)`**: returns an **iterable** (e.g., list, generator).
- **`exec(item)`**: called **once** per item in that iterable.
- **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**.
### Example: Summarize a Large File
```python
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"]
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"] = combined
return "default"
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)
```
---
## 2. BatchFlow
A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set.
### Example: Summarize Many Files
```python
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)
```
### Under the Hood
1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`.
2. The **BatchFlow** loops through each dict. For each one:
- It merges the dict with the BatchFlow’s own `params`.
- It calls `flow.run(shared)` using the merged result.
3. This means the sub-Flow is run **repeatedly**, once for every param dict.
---
## 3. Nested or Multi-Level Batches
You can nest a **BatchFlow** in another **BatchFlow**. For instance:
- **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...).
- **Inner** batch: returning a list of per-file param dicts.
At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
```python
class FileBatchFlow(BatchFlow):
def prep(self, shared):
directory = self.params["directory"]
# e.g., files = ["file1.txt", "file2.txt", ...]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"}
inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)
```
================================================
File: docs/communication.md
================================================
---
layout: default
title: "Communication"
parent: "Core Abstraction"
nav_order: 3
---
# Communication
Nodes and Flows **communicate** in two ways:
1. **Shared Store (recommended)**
- A global data structure (often an in-mem dict) that all nodes can read and write by `prep()` and `post()`.
- Great for data results, large content, or anything multiple nodes need.
- You shall design the data structure and populate it ahead.
2. **Params (only for [Batch](./batch.md))**
- Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**.
- Good for identifiers like filenames or numeric IDs, in Batch mode.
If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller).
> **Best Practice:** Use `Shared Store` for almost all cases. It's flexible and easy to manage. It separates data storage from data processing, making the code more readable and easier to maintain.
>
> `Params` is more a syntax sugar for [Batch](./batch.md).
{: .note }
---
## 1. Shared Store
### Overview
A shared store is typically an in-mem dictionary, like:
```python
shared = {"data": {}, "summary": {}, "config": {...}, ...}
```
It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
### Example
```python
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
# We write data to shared store
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
# We read data from shared store
return shared["data"]
def exec(self, prep_res):
# Call LLM to summarize
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# We write summary to shared store
shared["summary"] = exec_res
return "default"
load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)
shared = {}
flow.run(shared)
```
Here:
- `LoadData` writes to `shared["data"]`.
- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`.
---
## 2. Params
**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are:
- **Immutable** during a Node’s run cycle (i.e., they don’t change mid-`prep->exec->post`).
- **Set** via `set_params()`.
- **Cleared** and updated each time a parent Flow calls it.
> Only set the uppermost Flow params because others will be overwritten by the parent Flow.
>
> If you need to set child node params, see [Batch](./batch.md).
{: .warning }
Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
### Example
```python
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
# 2) Set params
node = SummarizeFile()
# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)
# 4) Create Flow
flow = Flow(start=node)
# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1
```
---
================================================
File: docs/decomp.md
================================================
---
layout: default
title: "Task Decomposition"
parent: "Paradigm"
nav_order: 2
---
# Task Decomposition
Many real-world tasks are too complex for one LLM call. The solution is to decompose them into multiple calls as a [Flow](./flow.md) of Nodes.
### Example: Article Writing
```python
class GenerateOutline(Node):
def prep(self, shared): return shared["topic"]
def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}")
def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res
class WriteSection(Node):
def prep(self, shared): return shared["outline"]
def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}")
def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res
class ReviewAndRefine(Node):
def prep(self, shared): return shared["draft"]
def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}")
def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res
# Connect nodes
outline = GenerateOutline()
write = WriteSection()
review = ReviewAndRefine()
outline >> write >> review
# Create and run flow
writing_flow = Flow(start=outline)
shared = {"topic": "AI Safety"}
writing_flow.run(shared)
```
================================================
File: docs/essay.md
================================================
---
layout: default
title: "Essay"
parent: "Apps"
nav_order: 2
---
# Summarization + QA agent for Paul Graham Essay
```python
from pocketflow import *
import openai, os, yaml
# Minimal LLM wrapper
def call_llm(prompt):
openai.api_key = "YOUR_API_KEY_HERE"
r = openai.ChatCompletion.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
shared = {"data": {}, "summary": {}}
# Load data into shared['data']
class LoadData(Node):
def prep(self, shared):
path = "./PocketFlow/data/PaulGrahamEssaysLarge"
for fn in os.listdir(path):
with open(os.path.join(path, fn), 'r') as f:
shared['data'][fn] = f.read()
def exec(self, res): pass
def post(self, s, pr, er): pass
LoadData().run(shared)
# Summarize one file
class SummarizeFile(Node):
def prep(self, s): return s['data'][self.params['filename']]
def exec(self, content): return call_llm(f"{content} Summarize in 10 words.")
def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr
node_summ = SummarizeFile()
node_summ.set_params({"filename":"addiction.txt"})
node_summ.run(shared)
# Map-Reduce summarization
class MapSummaries(BatchNode):
def prep(self, s):
text = s['data'][self.params['filename']]
return [text[i:i+10000] for i in range(0, len(text), 10000)]
def exec(self, chunk):
return call_llm(f"{chunk} Summarize in 10 words.")
def post(self, s, pr, er):
s["summary"][self.params['filename']] = [f"{i}. {r}" for i,r in enumerate(er)]
class ReduceSummaries(Node):
def prep(self, s): return s["summary"][self.params['filename']]
def exec(self, chunks): return call_llm(f"{chunks} Combine into 10 words summary.")
def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr
map_summ = MapSummaries()
reduce_summ = ReduceSummaries()
map_summ >> reduce_summ
flow = Flow(start=map_summ)
flow.set_params({"filename":"before.txt"})
flow.run(shared)
# Summarize all files
class SummarizeAllFiles(BatchFlow):
def prep(self, s): return [{"filename":fn} for fn in s['data']]
SummarizeAllFiles(start=flow).run(shared)
# QA agent
class FindRelevantFile(Node):
def prep(self, s):
q = input("Enter a question: ")
filenames = list(s['summary'].keys())
file_summaries = [f"- '{fn}': {s['summary'][fn]}" for fn in filenames]
return q, filenames, file_summaries
def exec(self, p):
q, filenames, file_summaries = p
if not q:
return {"think":"no question", "has_relevant":False}
resp = call_llm(f"""
Question: {q}
Find the most relevant file from: {file_summaries}
If none, explain why
Output in code fence:
```yaml
think: >
reasoning about relevance
has_relevant: true/false
most_relevant: filename if relevant
```""")
yaml_str = resp.split("```yaml")[1].split("```")[0].strip()
result = yaml.safe_load(yaml_str)
# Validate response
assert isinstance(result, dict)
assert "think" in result
assert "has_relevant" in result
assert isinstance(result["has_relevant"], bool)
if result["has_relevant"]:
assert "most_relevant" in result
assert result["most_relevant"] in filenames
return result
def exec_fallback(self, p, exc): return {"think":"error","has_relevant":False}
def post(self, s, pr, res):
q, _ = pr
if not q:
print("No question asked"); return "end"
if res["has_relevant"]:
s["question"], s["relevant_file"] = q, res["most_relevant"]
print("Relevant file:", res["most_relevant"])
return "answer"
else:
print("No relevant file:", res["think"])
return "retry"
class AnswerQuestion(Node):
def prep(self, s):
return s['question'], s['data'][s['relevant_file']]
def exec(self, p):
q, txt = p
return call_llm(f"Question: {q}\nText: {txt}\nAnswer in 50 words.")
def post(self, s, pr, ex):
print("Answer:", ex)
class NoOp(Node): pass
frf = FindRelevantFile(max_retries=3)
aq = AnswerQuestion()
noop = NoOp()
frf - "answer" >> aq >> frf
frf - "retry" >> frf
frf - "end" >> noop
qa = Flow(start=frf)
qa.run(shared)
```
================================================
File: docs/flow.md
================================================
---
layout: default
title: "Flow"
parent: "Core Abstraction"
nav_order: 2
---
# Flow
A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`.
## 1. Action-based Transitions
Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`.
You define transitions with the syntax:
1. **Basic default transition**: `node_a >> node_b`
This means if `node_a.post()` returns `"default"`, go to `node_b`.
(Equivalent to `node_a - "default" >> node_b`)
2. **Named action transition**: `node_a - "action_name" >> node_b`
This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
It's possible to create loops, branching, or multi-step flows.
## 2. Creating a Flow
A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node.
### Example: Simple Sequence
Here's a minimal flow of two nodes in a chain:
```python
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)
```
- When you run the flow, it executes `node_a`.
- Suppose `node_a.post()` returns `"default"`.
- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`.
- `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there.
### Example: Branching & Looping
Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions:
- `"approved"`: expense is approved, move to payment processing
- `"needs_revision"`: expense needs changes, send back for revision
- `"rejected"`: expense is denied, finish the process
We can wire them like this:
```python
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)
```
Let's see how it flows:
1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node
2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review`
3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops
```mermaid
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
```
### Running Individual Nodes vs. Running a Flow
- `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action.
- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue.
> `node.run(shared)` **does not** proceed to the successor.
> This is mainly for debugging or testing a single node.
>
> Always use `flow.run(...)` in production to ensure the full pipeline runs correctly.
{: .warning }
## 3. Nested Flows
A **Flow** can act like a Node, which enables powerful composition patterns. This means you can:
1. Use a Flow as a Node within another Flow's transitions.
2. Combine multiple smaller Flows into a larger Flow for reuse.
3. Node `params` will be a merging of **all** parents' `params`.
### Flow's Node Methods
A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However:
- It **won't** run `exec()`, as its main logic is to orchestrate its nodes.
- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store.
### Basic Flow Nesting
Here's how to connect a flow to another node:
```python
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)
```
When `parent_flow.run()` executes:
1. It starts `subflow`
2. `subflow` runs through its nodes (`node_a->node_b`)
3. After `subflow` completes, execution continues to `node_c`
### Example: Order Processing Pipeline
Here's a practical example that breaks down order processing into nested flows:
```python
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
```
This creates a clean separation of concerns while maintaining a clear execution path:
```mermaid
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end
```
================================================
File: docs/index.md
================================================
---
layout: default
title: "Home"
nav_order: 1
---
# Pocket Flow
A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*.
We model the LLM workflow as a **Nested Directed Graph**:
- **Nodes** handle simple (LLM) tasks.
- Nodes connect through **Actions** (labeled edges) for *Agents*.
- **Flows** orchestrate a directed graph of Nodes for *Task Decomposition*.
- A Flow can be used as a Node (for **Nesting**).
- **Batch** Nodes/Flows for data-intensive tasks.
- **Async** Nodes/Flows allow waits or **Parallel** execution
> Have questions? Chat with [AI Assistant](https://chatgpt.com/g/g-677464af36588191b9eba4901946557b-mini-llm-flow-assistant)
{: .note }
## Core Abstraction
- [Node](./node.md)
- [Flow](./flow.md)
- [Communication](./communication.md)
- [Batch](./batch.md)
- [(Advanced) Async](./async.md)
- [(Advanced) Parallel](./parallel.md)
## Low-Level Details
- [LLM Wrapper](./llm.md)
- [Tool](./tool.md)
- [Viz and Debug](./viz.md)
- Chunking
> We do not provide built-in implementations.
>
> Example implementations are provided as reference.
{: .warning }
## High-Level Paradigm
- [Structured Output](./structure.md)
- [Task Decomposition](./decomp.md)
- [Map Reduce](./mapreduce.md)
- [RAG](./rag.md)
- [Chat Memory](./memory.md)
- [Agent](./agent.md)
- [(Advanced) Multi-Agents](./multi_agent.md)
- Evaluation
## Example LLM Apps
[LLM System Design Guidance](./guide.md)
- [Summarization + QA agent for Paul Graham Essay](./essay.md)
- More coming soon...
================================================
File: docs/llm.md
================================================
---
layout: default
title: "LLM Wrapper"
parent: "Details"
nav_order: 1
---
# LLM Wrappers
We **don't** provide built-in LLM wrappers. Instead, please implement your own, for example by asking an assistant like ChatGPT or Claude. If you ask ChatGPT to "implement a `call_llm` function that takes a prompt and returns the LLM response," you shall get something like:
```python
def call_llm(prompt):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
# Example usage
call_llm("How are you?")
```
> Store the API key in an environment variable like OPENAI_API_KEY for security.
{: .note }
## Improvements
Feel free to enhance your `call_llm` function as needed. Here are examples:
- Handle chat history:
```python
def call_llm(messages):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return r.choices[0].message.content
```
- Add in-memory caching
```python
from functools import lru_cache
@lru_cache(maxsize=1000)
def call_llm(prompt):
# Your implementation here
pass
```
> ⚠️ Caching conflicts with Node retries, as retries yield the same result.
>
> To address this, you could use cached results only if not retried.
{: .warning }
```python
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_call(prompt):
pass
def call_llm(prompt, use_cache):
if use_cache:
return cached_call(prompt)
# Call the underlying function directly
return cached_call.__wrapped__(prompt)
class SummarizeNode(Node):
def exec(self, text):
return call_llm(f"Summarize: {text}", self.cur_retry==0)
```
- Enable logging:
```python
def call_llm(prompt):
import logging
logging.info(f"Prompt: {prompt}")
response = ... # Your implementation here
logging.info(f"Response: {response}")
return response
```
## Why Not Provide Built-in LLM Wrappers?
I believe it is a **bad practice** to provide LLM-specific implementations in a general framework:
- **LLM APIs change frequently**. Hardcoding them makes maintenance a nighmare.
- You may need **flexibility** to switch vendors, use fine-tuned models, or deploy local LLMs.
- You may need **optimizations** like prompt caching, request batching, or response streaming.
================================================
File: docs/mapreduce.md
================================================
---
layout: default
title: "Map Reduce"
parent: "Paradigm"
nav_order: 3
---
# Map Reduce
Process large inputs by splitting them into chunks using [BatchNode](./batch.md), then combining results.
### Example: Document Summarization
```python
class MapSummaries(BatchNode):
def prep(self, shared): return [shared["text"][i:i+10000] for i in range(0, len(shared["text"]), 10000)]
def exec(self, chunk): return call_llm(f"Summarize this chunk: {chunk}")
def post(self, shared, prep_res, exec_res_list): shared["summaries"] = exec_res_list
class ReduceSummaries(Node):
def prep(self, shared): return shared["summaries"]
def exec(self, summaries): return call_llm(f"Combine these summaries: {summaries}")
def post(self, shared, prep_res, exec_res): shared["final_summary"] = exec_res
# Connect nodes
map_node = MapSummaries()
reduce_node = ReduceSummaries()
map_node >> reduce_node
# Create flow
summarize_flow = Flow(start=map_node)
summarize_flow.run(shared)
```
================================================
File: docs/memory.md
================================================
---
layout: default
title: "Chat Memory"
parent: "Paradigm"
nav_order: 5
---
# Chat Memory
Multi-turn conversations require memory management to maintain context while avoiding overwhelming the LLM.
### 1. Naive Approach: Full History
Sending the full chat history may overwhelm LLMs.
```python
class ChatNode(Node):
def prep(self, shared):
if "history" not in shared:
shared["history"] = []
user_input = input("You: ")
return shared["history"], user_input
def exec(self, inputs):
history, user_input = inputs
messages = [{"role": "system", "content": "You are a helpful assistant"}]
for h in history:
messages.append(h)
messages.append({"role": "user", "content": user_input})
response = call_llm(messages)
return response
def post(self, shared, prep_res, exec_res):
shared["history"].append({"role": "user", "content": prep_res[1]})
shared["history"].append({"role": "assistant", "content": exec_res})
return "continue"
chat = ChatNode()
chat - "continue" >> chat
flow = Flow(start=chat)
```
### 2. Improved Memory Management
We can:
1. Limit the chat history to the most recent 4.
2. Use [vector search](./tool.md) to retrieve relevant exchanges beyond the last 4.
```python
class ChatWithMemory(Node):
def prep(self, s):
# Initialize shared dict
s.setdefault("history", [])
s.setdefault("memory_index", None)
user_input = input("You: ")
# Retrieve relevant past if we have enough history and an index
relevant = []
if len(s["history"]) > 8 and s["memory_index"]:
idx, _ = search_index(s["memory_index"], get_embedding(user_input), top_k=2)
relevant = [s["history"][i[0]] for i in idx]
return {"user_input": user_input, "recent": s["history"][-8:], "relevant": relevant}
def exec(self, c):
messages = [{"role": "system", "content": "You are a helpful assistant."}]
# Include relevant history if any
if c["relevant"]:
messages.append({"role": "system", "content": f"Relevant: {c['relevant']}"})
# Add recent history and the current user input
messages += c["recent"] + [{"role": "user", "content": c["user_input"]}]
return call_llm(messages)
def post(self, s, pre, ans):
# Update chat history
s["history"] += [
{"role": "user", "content": pre["user_input"]},
{"role": "assistant", "content": ans}
]
# When first reaching 8 messages, create index
if len(s["history"]) == 8:
embeddings = []
for i in range(0, 8, 2):
e = s["history"][i]["content"] + " " + s["history"][i+1]["content"]
embeddings.append(get_embedding(e))
s["memory_index"] = create_index(embeddings)
# Embed older exchanges once we exceed 8 messages
elif len(s["history"]) > 8:
pair = s["history"][-10:-8]
embedding = get_embedding(pair[0]["content"] + " " + pair[1]["content"])
s["memory_index"].add(np.array([embedding]).astype('float32'))
print(f"Assistant: {ans}")
return "continue"
chat = ChatWithMemory()
chat - "continue" >> chat
flow = Flow(start=chat)
flow.run({})
```
================================================
File: docs/multi_agent.md
================================================
---
layout: default
title: "(Advanced) Multi-Agents"
parent: "Paradigm"
nav_order: 7
---
# (Advanced) Multi-Agents
Multiple [Agents](./flow.md) can work together by handling subtasks and communicating the progress.
Communication between agents is typically implemented using message queues in shared storage.
### Example Agent Communication: Message Queue
Here's a simple example showing how to implement agent communication using `asyncio.Queue`.
The agent listens for messages, processes them, and continues listening:
```python
class AgentNode(AsyncNode):
async def prep_async(self, _):
message_queue = self.params["messages"]
message = await message_queue.get()
print(f"Agent received: {message}")
return message
# Create node and flow
agent = AgentNode()
agent >> agent # connect to self
flow = AsyncFlow(start=agent)
# Create heartbeat sender
async def send_system_messages(message_queue):
counter = 0
messages = [
"System status: all systems operational",
"Memory usage: normal",
"Network connectivity: stable",
"Processing load: optimal"
]
while True:
message = f"{messages[counter % len(messages)]} | timestamp_{counter}"
await message_queue.put(message)
counter += 1
await asyncio.sleep(1)
async def main():
message_queue = asyncio.Queue()
shared = {}
flow.set_params({"messages": message_queue})
# Run both coroutines
await asyncio.gather(
flow.run_async(shared),
send_system_messages(message_queue)
)
asyncio.run(main())
```
The output:
```
Agent received: System status: all systems operational | timestamp_0
Agent received: Memory usage: normal | timestamp_1
Agent received: Network connectivity: stable | timestamp_2
Agent received: Processing load: optimal | timestamp_3
```
### Interactive Multi-Agent Example: Taboo Game
Here's a more complex example where two agents play the word-guessing game Taboo.
One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word:
```python
class AsyncHinter(AsyncNode):
async def prep_async(self, shared):
guess = await shared["hinter_queue"].get()
if guess == "GAME_OVER":
return None
return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", [])
async def exec_async(self, inputs):
if inputs is None:
return None
target, forbidden, past_guesses = inputs
prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}"
if past_guesses:
prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific."
prompt += "\nUse at most 5 words."
hint = call_llm(prompt)
print(f"\nHinter: Here's your hint - {hint}")
return hint
async def post_async(self, shared, prep_res, exec_res):
if exec_res is None:
return "end"
await shared["guesser_queue"].put(exec_res)
return "continue"
class AsyncGuesser(AsyncNode):
async def prep_async(self, shared):
hint = await shared["guesser_queue"].get()
return hint, shared.get("past_guesses", [])
async def exec_async(self, inputs):
hint, past_guesses = inputs
prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:"
guess = call_llm(prompt)
print(f"Guesser: I guess it's - {guess}")
return guess
async def post_async(self, shared, prep_res, exec_res):
if exec_res.lower() == shared["target_word"].lower():
print("Game Over - Correct guess!")
await shared["hinter_queue"].put("GAME_OVER")
return "end"
if "past_guesses" not in shared:
shared["past_guesses"] = []
shared["past_guesses"].append(exec_res)
await shared["hinter_queue"].put(exec_res)
return "continue"
async def main():
# Set up game
shared = {
"target_word": "nostalgia",
"forbidden_words": ["memory", "past", "remember", "feeling", "longing"],
"hinter_queue": asyncio.Queue(),
"guesser_queue": asyncio.Queue()
}
print("Game starting!")
print(f"Target word: {shared['target_word']}")
print(f"Forbidden words: {shared['forbidden_words']}")
# Initialize by sending empty guess to hinter
await shared["hinter_queue"].put("")
# Create nodes and flows
hinter = AsyncHinter()
guesser = AsyncGuesser()
# Set up flows
hinter_flow = AsyncFlow(start=hinter)
guesser_flow = AsyncFlow(start=guesser)
# Connect nodes to themselves
hinter - "continue" >> hinter
guesser - "continue" >> guesser
# Run both agents concurrently
await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared)
)
asyncio.run(main())
```
The Output:
```
Game starting!
Target word: nostalgia
Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing']
Hinter: Here's your hint - Thinking of childhood summer days
Guesser: I guess it's - popsicle
Hinter: Here's your hint - When childhood cartoons make you emotional
Guesser: I guess it's - nostalgic
Hinter: Here's your hint - When old songs move you
Guesser: I guess it's - memories
Hinter: Here's your hint - That warm emotion about childhood
Guesser: I guess it's - nostalgia
Game Over - Correct guess!
```
================================================
File: docs/node.md
================================================
---
layout: default
title: "Node"
parent: "Core Abstraction"
nav_order: 1
---
# Node
A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
1. `prep(shared)`
- **Read and preprocess data** from `shared` store.
- Examples: *query DB, read files, or serialize data into a string*.
- Return `prep_res`, which is used by `exec()` and `post()`.
2. `exec(prep_res)`
- **Execute compute logic**, with optional retries and error handling (below).
- Examples: *(mostly) LLM calls, remote APIs, tool use*.
- ⚠️ This shall be only for compute and **NOT** access `shared`.
- ⚠️ If retries enabled, ensure idempotent implementation.
- Return `exec_res`, which is passed to `post()`.
3. `post(shared, prep_res, exec_res)`
- **Postprocess and write data** back to `shared`.
- Examples: *update DB, change states, log results*.
- **Decide the next action** by returning a *string* (`action = "default"` if *None*).
> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately.
>
> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data.
{: .note }
### Fault Tolerance & Retries
You can **retry** `exec()` if it raises an exception via two parameters when define the Node:
- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry).
- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting).
`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
```python
my_node = SummarizeFile(max_retries=3, wait=10)
```
When an exception occurs in `exec()`, the Node automatically retries until:
- It either succeeds, or
- The Node has retried `max_retries - 1` times already and fails on the last attempt.
You can get the current retry times (0-based) from `self.cur_retry`.
```python
class RetryNode(Node):
def exec(self, prep_res):
print(f"Retry {self.cur_retry} times")
raise Exception("Failed")
```
### Graceful Fallback
To **gracefully handle** the exception (after all retries) rather than raising it, override:
```python
def exec_fallback(self, shared, prep_res, exc):
raise exc
```
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`.
### Example: Summarize file
```python
class SummarizeFile(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, shared, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])
```
================================================
File: docs/paradigm.md
================================================
---
layout: default
title: "Paradigm"
nav_order: 4
has_children: true
---
================================================
File: docs/parallel.md
================================================
---
layout: default
title: "(Advanced) Parallel"
parent: "Core Abstraction"
nav_order: 6
---
# (Advanced) Parallel
**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute.
> Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O.
{: .warning }
## AsyncParallelBatchNode
Like **AsyncBatchNode**, but run `exec_async()` in **parallel**:
```python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
# e.g., multiple texts
return shared["texts"]
async def exec_async(self, text):
prompt = f"Summarize: {text}"
return await call_llm_async(prompt)
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"
node = ParallelSummaries()
flow = AsyncFlow(start=node)
```
## AsyncParallelBatchFlow
Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters:
```python
class SummarizeMultipleFiles(AsyncParallelBatchFlow):
async def prep_async(self, shared):
return [{"filename": f} for f in shared["files"]]
sub_flow = AsyncFlow(start=LoadAndSummarizeFile())
parallel_flow = SummarizeMultipleFiles(start=sub_flow)
await parallel_flow.run_async(shared)
```
## Best Practices
- **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize.
- **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals).
- **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits.
================================================
File: docs/preparation.md
================================================
---
layout: default
title: "Details"
nav_order: 3
has_children: true
---
================================================
File: docs/rag.md
================================================
---
layout: default
title: "RAG"
parent: "Paradigm"
nav_order: 4
---
# RAG (Retrieval Augmented Generation)
For certain LLM tasks like answering questions, providing context is essential.
Use [vector search](./tool.md) to find relevant context for LLM responses.
### Example: Question Answering
```python
class PrepareEmbeddings(Node):
def prep(self, shared):
texts = shared["texts"]
embeddings = [get_embedding(text) for text in texts]
shared["search_index"] = create_index(embeddings)
class AnswerQuestion(Node):
def prep(self, shared):
question = input("Enter question: ")
query_embedding = get_embedding(question)
indices, _ = search_index(shared["search_index"], query_embedding, top_k=1)
relevant_text = shared["texts"][indices[0][0]]
return question, relevant_text
def exec(self, inputs):
question, context = inputs
prompt = f"Question: {question}\nContext: {context}\nAnswer: "
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
print(f"Answer: {exec_res}")
# Connect nodes
prep = PrepareEmbeddings()
qa = AnswerQuestion()
prep >> qa
# Create flow
qa_flow = Flow(start=prep)
qa_flow.run(shared)
```
================================================
File: docs/structure.md
================================================
---
layout: default
title: "Structured Output"
parent: "Paradigm"
nav_order: 1
---
# Structured Output
In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys.
There are several approaches to achieve a structured output:
- **Prompting** the LLM to strictly return a defined structure.
- Using LLMs that natively support **schema enforcement**.
- **Post-processing** the LLM's response to extract structured content.
In practice, **Prompting** is simple and reliable for modern LLMs.
### Example Use Cases
- Extracting Key Information
```yaml
product:
name: Widget Pro
price: 199.99
description: |
A high-quality widget designed for professionals.
Recommended for advanced users.
```
- Summarizing Documents into Bullet Points
```yaml
summary:
- This product is easy to use.
- It is cost-effective.
- Suitable for all skill levels.
```
- Generating Configuration Files
```yaml
server:
host: 127.0.0.1
port: 8080
ssl: true
```
## Prompt Engineering
When prompting the LLM to produce **structured** output:
1. **Wrap** the structure in code fences (e.g., `yaml`).
2. **Validate** that all required fields exist (and let `Node` handles retry).
### Example Text Summarization
```python
class SummarizeNode(Node):
def exec(self, prep_res):
# Suppose `prep_res` is the text to summarize.
prompt = f"""
Please summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Now, output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_result
```
### Why YAML instead of JSON?
Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes.
**In JSON**
```json
{
"dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\""
}
```
- Every double quote inside the string must be escaped with `\"`.
- Each newline in the dialogue must be represented as `\n`.
**In YAML**
```yaml
dialogue: |
Alice said: "Hello Bob.
How are you?
I am good."
```
- No need to escape interior quotes—just place the entire text under a block literal (`|`).
- Newlines are naturally preserved without needing `\n`.
================================================
File: docs/tool.md
================================================
---
layout: default
title: "Tool"
parent: "Details"
nav_order: 2
---
# Tool
Similar to LLM wrappers, we **don't** provide built-in tools. Here, we recommend some *minimal* (and incomplete) implementations of commonly used tools. These examples can serve as a starting point for your own tooling.
---
## 1. Embedding Calls
```python
def get_embedding(text):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.embeddings.create(
model="text-embedding-ada-002",
input=text
)
return r.data[0].embedding
get_embedding("What's the meaning of life?")
```
---
## 2. Vector Database (Faiss)
```python
import faiss
import numpy as np
def create_index(embeddings):
dim = len(embeddings[0])
index = faiss.IndexFlatL2(dim)
index.add(np.array(embeddings).astype('float32'))
return index
def search_index(index, query_embedding, top_k=5):
D, I = index.search(
np.array([query_embedding]).astype('float32'),
top_k
)
return I, D
index = create_index(embeddings)
search_index(index, query_embedding)
```
---
## 3. Local Database
```python
import sqlite3
def execute_sql(query):
conn = sqlite3.connect("mydb.db")
cursor = conn.cursor()
cursor.execute(query)
result = cursor.fetchall()
conn.commit()
conn.close()
return result
```
> ⚠️ Beware of SQL injection risk
{: .warning }
---
## 4. Python Function Execution
```python
def run_code(code_str):
env = {}
exec(code_str, env)
return env
run_code("print('Hello, world!')")
```
> ⚠️ exec() is dangerous with untrusted input
{: .warning }
---
## 5. PDF Extraction
If your PDFs are text-based, use PyMuPDF:
```python
import fitz # PyMuPDF
def extract_text(pdf_path):
doc = fitz.open(pdf_path)
text = ""
for page in doc:
text += page.get_text()
doc.close()
return text
extract_text("document.pdf")
```
For image-based PDFs (e.g., scanned), OCR is needed. A easy and fast option is using an LLM with vision capabilities:
```python
from openai import OpenAI
import base64
def call_llm_vision(prompt, image_data):
client = OpenAI(api_key="YOUR_API_KEY_HERE")
img_base64 = base64.b64encode(image_data).decode('utf-8')
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_base64}"}}
]
}]
)
return response.choices[0].message.content
pdf_document = fitz.open("document.pdf")
page_num = 0
page = pdf_document[page_num]
pix = page.get_pixmap()
img_data = pix.tobytes("png")
call_llm_vision("Extract text from this image", img_data)
```
---
## 6. Web Crawling
```python
def crawl_web(url):
import requests
from bs4 import BeautifulSoup
html = requests.get(url).text
soup = BeautifulSoup(html, "html.parser")
return soup.title.string, soup.get_text()
```
---
## 7. Basic Search (SerpAPI example)
```python
def search_google(query):
import requests
params = {
"engine": "google",
"q": query,
"api_key": "YOUR_API_KEY"
}
r = requests.get("https://serpapi.com/search", params=params)
return r.json()
```
---
## 8. Audio Transcription (OpenAI Whisper)
```python
def transcribe_audio(file_path):
import openai
audio_file = open(file_path, "rb")
transcript = openai.Audio.transcribe("whisper-1", audio_file)
return transcript["text"]
```
---
## 9. Text-to-Speech (TTS)
```python
def text_to_speech(text):
import pyttsx3
engine = pyttsx3.init()
engine.say(text)
engine.runAndWait()
```
---
## 10. Sending Email
```python
def send_email(to_address, subject, body, from_address, password):
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = from_address
msg["To"] = to_address
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(from_address, password)
server.sendmail(from_address, [to_address], msg.as_string())
```
================================================
File: docs/viz.md
================================================
---
layout: default
title: "Viz and Debug"
parent: "Details"
nav_order: 3
---
# Visualization and Debugging
Similar to LLM wrappers, we **don't** provide built-in visualization and debugging. Here, we recommend some *minimal* (and incomplete) implementations These examples can serve as a starting point for your own tooling.
## 1. Visualization with Mermaid
This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization.
{% raw %}
```python
def build_mermaid(start):
ids, visited, lines = {}, set(), ["graph LR"]
ctr = 1
def get_id(n):
nonlocal ctr
return ids[n] if n in ids else (ids.setdefault(n, f"N{ctr}"), (ctr := ctr + 1))[0]
def link(a, b):
lines.append(f" {a} --> {b}")
def walk(node, parent=None):
if node in visited:
return parent and link(parent, get_id(node))
visited.add(node)
if isinstance(node, Flow):
node.start and parent and link(parent, get_id(node.start))
lines.append(f"\n subgraph sub_flow_{get_id(node)}[{type(node).__name__}]")
node.start and walk(node.start)
for nxt in node.successors.values():
node.start and walk(nxt, get_id(node.start)) or (parent and link(parent, get_id(nxt))) or walk(nxt)
lines.append(" end\n")
else:
lines.append(f" {(nid := get_id(node))}['{type(node).__name__}']")
parent and link(parent, nid)
[walk(nxt, nid) for nxt in node.successors.values()]
walk(start)
return "\n".join(lines)
```
{% endraw %}
For example, suppose we have a complex Flow for data science:
```python
class DataPrepBatchNode(BatchNode):
def prep(self,shared): return []
class ValidateDataNode(Node): pass
class FeatureExtractionNode(Node): pass
class TrainModelNode(Node): pass
class EvaluateModelNode(Node): pass
class ModelFlow(Flow): pass
class DataScienceFlow(Flow):pass
feature_node = FeatureExtractionNode()
train_node = TrainModelNode()
evaluate_node = EvaluateModelNode()
feature_node >> train_node >> evaluate_node
model_flow = ModelFlow(start=feature_node)
data_prep_node = DataPrepBatchNode()
validate_node = ValidateDataNode()
data_prep_node >> validate_node >> model_flow
data_science_flow = DataScienceFlow(start=data_prep_node)
result = build_mermaid(start=data_science_flow)
```
The code generates a Mermaid diagram:
```mermaid
graph LR
subgraph sub_flow_N1[DataScienceFlow]
N2['DataPrepBatchNode']
N3['ValidateDataNode']
N2 --> N3
N3 --> N4
subgraph sub_flow_N5[ModelFlow]
N4['FeatureExtractionNode']
N6['TrainModelNode']
N4 --> N6
N7['EvaluateModelNode']
N6 --> N7
end
end
```
## 2. Call Stack Debugging
It would be useful to print the Node call stacks for debugging. This can be achieved by inspecting the runtime call stack:
```python
import inspect
def get_node_call_stack():
stack = inspect.stack()
node_names = []
seen_ids = set()
for frame_info in stack[1:]:
local_vars = frame_info.frame.f_locals
if 'self' in local_vars:
caller_self = local_vars['self']
if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids:
seen_ids.add(id(caller_self))
node_names.append(type(caller_self).__name__)
return node_names
```
For example, suppose we have a complex Flow for data science:
```python
class DataPrepBatchNode(BatchNode):
def prep(self, shared): return []
class ValidateDataNode(Node): pass
class FeatureExtractionNode(Node): pass
class TrainModelNode(Node): pass
class EvaluateModelNode(Node):
def prep(self, shared):
stack = get_node_call_stack()
print("Call stack:", stack)
class ModelFlow(Flow): pass
class DataScienceFlow(Flow):pass
feature_node = FeatureExtractionNode()
train_node = TrainModelNode()
evaluate_node = EvaluateModelNode()
feature_node >> train_node >> evaluate_node
model_flow = ModelFlow(start=feature_node)
data_prep_node = DataPrepBatchNode()
validate_node = ValidateDataNode()
data_prep_node >> validate_node >> model_flow
data_science_flow = DataScienceFlow(start=data_prep_node)
data_science_flow.run({})
```
The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']`