diff --git a/.cursorrules b/.cursorrules index 0aba9a3..f260627 100644 --- a/.cursorrules +++ b/.cursorrules @@ -1,110 +1,169 @@ ================================================ -File: docs/agent.md +File: docs/guide.md ================================================ --- layout: default -title: "Agent" -parent: "Design Pattern" -nav_order: 6 +title: "Build your LLM App" --- -# Agent +# LLM Application Development Playbook -Agent is a powerful design pattern, where node can take dynamic actions based on the context it receives. -To express an agent, create a Node (the agent) with [branching](./flow.md) to other nodes (Actions). +> If you are an AI assistant involved in building LLM Apps, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification. +{: .warning } -> The core of build **performant** and **reliable** agents boils down to: -> -> 1. **Context Management:** Provide *clear, relevant context* so agents can understand the problem.E.g., Rather than dumping an entire chat history or entire files, use a [Workflow](./decomp.md) that filters out and includes only the most relevant information. -> -> 2. **Action Space:** Define *a well-structured, unambiguous, and easy-to-use* set of actions. For instance, avoid creating overlapping actions like `read_databases` and `read_csvs`. Instead, unify data sources (e.g., move CSVs into a database) and design a single action. The action can be parameterized (e.g., string for search) or programmable (e.g., SQL queries). -{: .best-practice } +## System Design Steps -### Example: Search Agent +These system designs should be a collaboration between humans and AI assistants: -This agent: -1. Decides whether to search or answer -2. If searches, loops back to decide if more search needed -3. Answers when enough context gathered +| Stage | Human | AI | Comment | +|:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------| +| 1. Project Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context best. | +| 2. Utility Functions | ★★☆ Medium | ★★☆ Medium | The human is familiar with external APIs and integrations, and the AI assists with implementation. | +| 3. Flow Design | ★★☆ Medium | ★★☆ Medium | The human identifies complex and ambiguous parts, and the AI helps with redesign. | +| 4. Data Schema | ★☆☆ Low | ★★★ High | The AI assists in designing the data schema based on the flow. | +| 5. Implementation | ★☆☆ Low | ★★★ High | The human identifies complex and ambiguous parts, and the AI helps with redesign. | +| 6. Optimization | ★★☆ Medium | ★★☆ Medium | The human reviews the code and evaluates the results, while the AI helps optimize. | +| 7. Reliability | ★☆☆ Low | ★★★ High | The AI helps write test cases and address corner cases. | -```python -class DecideAction(Node): - def prep(self, shared): - context = shared.get("context", "No previous search") - query = shared["query"] - return query, context - - def exec(self, inputs): - query, context = inputs - prompt = f""" -Given input: {query} -Previous search results: {context} -Should I: 1) Search web for more info 2) Answer with current knowledge -Output in yaml: -```yaml -action: search/answer -reason: why this action -search_term: search phrase if action is search -```""" - resp = call_llm(prompt) - yaml_str = resp.split("```yaml")[1].split("```")[0].strip() - result = yaml.safe_load(yaml_str) - - assert isinstance(result, dict) - assert "action" in result - assert "reason" in result - assert result["action"] in ["search", "answer"] - if result["action"] == "search": - assert "search_term" in result - - return result +1. **Project Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. An AI systems are: + - suitable for routine tasks that require common sense (e.g., filling out forms, replying to emails). + - suitable for creative tasks where all inputs are provided (e.g., building slides, writing SQL). + - **NOT** suitable for tasks that are highly ambiguous and require complex information (e.g., building a startup). + - > **If a human can’t solve it, an LLM can’t automate it!** Before building an LLM system, thoroughly understand the problem by manually solving example inputs to develop intuition. + {: .best-practice } - def post(self, shared, prep_res, exec_res): - if exec_res["action"] == "search": - shared["search_term"] = exec_res["search_term"] - return exec_res["action"] +2. **Utility Functions**: AI system is the decision-maker and relies on *external utility functions* to: -class SearchWeb(Node): - def prep(self, shared): - return shared["search_term"] - - def exec(self, search_term): - return search_web(search_term) - - def post(self, shared, prep_res, exec_res): - prev_searches = shared.get("context", []) - shared["context"] = prev_searches + [ - {"term": shared["search_term"], "result": exec_res} - ] - return "decide" - -class DirectAnswer(Node): - def prep(self, shared): - return shared["query"], shared.get("context", "") - - def exec(self, inputs): - query, context = inputs - return call_llm(f"Context: {context}\nAnswer: {query}") +
- def post(self, shared, prep_res, exec_res): - print(f"Answer: {exec_res}") - shared["answer"] = exec_res + - Read inputs (e.g., retrieving Slack messages, reading emails) + - Write outputs (e.g., generating reports, sending emails) + - Use external tools (e.g., calling LLMs, searching the web) + - In contrast, *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions. Instead, they are *internal core functions* within the AI system—designed in step 3—and are built on top of the utility functions. + - > **Start small!** Only include the most important ones to begin with! + {: .best-practice } -# Connect nodes -decide = DecideAction() -search = SearchWeb() -answer = DirectAnswer() +3. **Flow Design (Compute)**: Create a high-level outline for your application’s flow. + - Identify potential design patterns (e.g., Batch, Agent, RAG). + - For each node, specify: + - **Purpose**: The high-level compute logic + - **Type**: Regular node, Batch node, async node, or another type + - `exec`: The specific utility function to call (ideally, one function per node) -decide - "search" >> search -decide - "answer" >> answer -search - "decide" >> decide # Loop back +4. **Data Schema (Data)**: Plan how data will be stored and updated. + - For simple apps, use an in-memory dictionary. + - For more complex apps or when persistence is required, use a database. + - For each node, specify: + - `prep`: How the node reads data + - `post`: How the node writes data -flow = Flow(start=decide) -flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) +5. **Implementation**: Implement nodes and flows based on the design. + - Start with a simple, direct approach (avoid over-engineering and full-scale type checking or testing). Let it fail fast to identify weaknesses. + - Add logging throughout the code to facilitate debugging. + +6. **Optimization**: + - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. + - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. + - If your flow design is already solid, move on to micro-optimizations: + - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. + - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. + + - > **You’ll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times. + > + >
+ {: .best-practice } + +7. **Reliability** + - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. + - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. + - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. + +## Example LLM Project File Structure + +``` +my_project/ +├── main.py +├── flow.py +├── utils/ +│ ├── __init__.py +│ ├── call_llm.py +│ └── search_web.py +├── requirements.txt +└── docs/ + └── design.md ``` +- **`docs/design.md`**: Contains project documentation for each step above. This should be high-level and no-code. +- **`utils/`**: Contains all utility functions. + - It’s recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. + - Each file should also include a `main()` function to try that API call +- **`flow.py`**: Implements the application’s flow, starting with node definitions followed by the overall structure. +- **`main.py`**: Serves as the project’s entry point. + ================================================ -File: docs/async.md +File: docs/index.md +================================================ +--- +layout: default +title: "Home" +nav_order: 1 +--- + +# Pocket Flow + +A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. + +We model the LLM workflow as a **Graph + Shared Store**: +- **Nodes** handle simple (LLM) tasks. +- Nodes connect through **Actions** (labeled edges) for *Agents*. +- **Flows** orchestrate a directed graph of Nodes for *Task Decomposition*. +- Nodes within Flows communicate through a **Shared Store**. +- A Flow can be used as a Node (for **Nesting**). +- **Batch** Nodes/Flows for data-intensive tasks. +- **Async** Nodes/Flows allow waits for asynchronous tasks. + +
+ +
+ +> If you are an AI assistant reading this document, please read all the non-optional and non-advanced parts. If you are helping a human build an LLM app, please carefully review the [Development Playbook](./guide.md). +{: .warning } + +## Core Abstraction + +- [Node](./core_abstraction/node.md) +- [Flow](./core_abstraction/flow.md) +- [Communication](./core_abstraction/communication.md) +- [Batch](./core_abstraction/batch.md) +- [(Advanced) Async](./core_abstraction/async.md) +- [(Advanced) Parallel](./core_abstraction/parallel.md) + +## Utility Function + +- [LLM Wrapper](./utility_function/llm.md) +- [Tool](./utility_function/tool.md) +- [(Optional) Viz and Debug](./utility_function/viz.md) +- Chunking + +> We do not provide built-in utility functions. Example implementations are provided as reference. +{: .warning } + + +## Design Pattern + +- [Structured Output](./design_pattern/structure.md) +- [Workflow](./design_pattern/workflow.md) +- [Map Reduce](./design_pattern/mapreduce.md) +- [RAG](./design_pattern/rag.md) +- [Agent](./design_pattern/agent.md) +- [(Optional) Chat Memory](./design_pattern/memory.md) +- [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) +- Evaluation + +## [Develop your LLM Apps](./guide.md) + +================================================ +File: docs/core_abstraction/async.md ================================================ --- layout: default @@ -118,12 +177,9 @@ nav_order: 5 **Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: 1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. - 2. **exec_async()**: Typically used for async LLM calls. - 3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. - **Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. ### Example @@ -166,7 +222,7 @@ asyncio.run(main()) ``` ================================================ -File: docs/batch.md +File: docs/core_abstraction/batch.md ================================================ --- layout: default @@ -277,7 +333,7 @@ outer_flow = DirectoryBatchFlow(start=inner_flow) ``` ================================================ -File: docs/communication.md +File: docs/core_abstraction/communication.md ================================================ --- layout: default @@ -296,7 +352,6 @@ Nodes and Flows **communicate** in two ways: - Great for data results, large content, or anything multiple nodes need. - You shall design the data structure and populate it ahead. - 2. **Params (only for [Batch](./batch.md))** - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. - Good for identifiers like filenames or numeric IDs, in Batch mode. @@ -362,11 +417,10 @@ Here: ## 2. Params **Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: -- **Immutable** during a Node’s run cycle (i.e., they don’t change mid-`prep->exec->post`). +- **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`). - **Set** via `set_params()`. - **Cleared** and updated each time a parent Flow calls it. - > Only set the uppermost Flow params because others will be overwritten by the parent Flow. > > If you need to set child node params, see [Batch](./batch.md). @@ -408,217 +462,8 @@ flow.set_params({"filename": "doc2.txt"}) flow.run(shared) # The node summarizes doc2, not doc1 ``` ---- - ================================================ -File: docs/decomp.md -================================================ ---- -layout: default -title: "Workflow" -parent: "Design Pattern" -nav_order: 2 ---- - -# Workflow - -Many real-world tasks are too complex for one LLM call. The solution is to decompose them into a [chain](./flow.md) of multiple Nodes. - - -> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. -> - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. -> -> You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md). -{: .best-practice } - -### Example: Article Writing - -```python -class GenerateOutline(Node): - def prep(self, shared): return shared["topic"] - def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") - def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res - -class WriteSection(Node): - def prep(self, shared): return shared["outline"] - def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") - def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res - -class ReviewAndRefine(Node): - def prep(self, shared): return shared["draft"] - def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") - def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res - -# Connect nodes -outline = GenerateOutline() -write = WriteSection() -review = ReviewAndRefine() - -outline >> write >> review - -# Create and run flow -writing_flow = Flow(start=outline) -shared = {"topic": "AI Safety"} -writing_flow.run(shared) -``` - - -================================================ -File: docs/essay.md -================================================ ---- -layout: default -title: "Essay" -parent: "Apps" -nav_order: 2 ---- - -# Summarization + QA agent for Paul Graham Essay - -```python -from pocketflow import * -import openai, os, yaml - -# Minimal LLM wrapper -def call_llm(prompt): - openai.api_key = "YOUR_API_KEY_HERE" - r = openai.ChatCompletion.create( - model="gpt-4o", - messages=[{"role": "user", "content": prompt}] - ) - return r.choices[0].message.content - -shared = {"data": {}, "summary": {}} - -# Load data into shared['data'] -class LoadData(Node): - def prep(self, shared): - path = "./PocketFlow/data/PaulGrahamEssaysLarge" - for fn in os.listdir(path): - with open(os.path.join(path, fn), 'r') as f: - shared['data'][fn] = f.read() - def exec(self, res): pass - def post(self, s, pr, er): pass - -LoadData().run(shared) - -# Summarize one file -class SummarizeFile(Node): - def prep(self, s): return s['data'][self.params['filename']] - def exec(self, content): return call_llm(f"{content} Summarize in 10 words.") - def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr - -node_summ = SummarizeFile() -node_summ.set_params({"filename":"addiction.txt"}) -node_summ.run(shared) - -# Map-Reduce summarization -class MapSummaries(BatchNode): - def prep(self, s): - text = s['data'][self.params['filename']] - return [text[i:i+10000] for i in range(0, len(text), 10000)] - def exec(self, chunk): - return call_llm(f"{chunk} Summarize in 10 words.") - def post(self, s, pr, er): - s["summary"][self.params['filename']] = [f"{i}. {r}" for i,r in enumerate(er)] - -class ReduceSummaries(Node): - def prep(self, s): return s["summary"][self.params['filename']] - def exec(self, chunks): return call_llm(f"{chunks} Combine into 10 words summary.") - def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr - -map_summ = MapSummaries() -reduce_summ = ReduceSummaries() -map_summ >> reduce_summ - -flow = Flow(start=map_summ) -flow.set_params({"filename":"before.txt"}) -flow.run(shared) - -# Summarize all files -class SummarizeAllFiles(BatchFlow): - def prep(self, s): return [{"filename":fn} for fn in s['data']] - -SummarizeAllFiles(start=flow).run(shared) - -# QA agent -class FindRelevantFile(Node): - def prep(self, s): - q = input("Enter a question: ") - filenames = list(s['summary'].keys()) - file_summaries = [f"- '{fn}': {s['summary'][fn]}" for fn in filenames] - return q, filenames, file_summaries - - def exec(self, p): - q, filenames, file_summaries = p - if not q: - return {"think":"no question", "has_relevant":False} - - resp = call_llm(f""" -Question: {q} -Find the most relevant file from: {file_summaries} -If none, explain why - -Output in code fence: -```yaml -think: > - reasoning about relevance -has_relevant: true/false -most_relevant: filename if relevant -```""") - yaml_str = resp.split("```yaml")[1].split("```")[0].strip() - result = yaml.safe_load(yaml_str) - - # Validate response - assert isinstance(result, dict) - assert "think" in result - assert "has_relevant" in result - assert isinstance(result["has_relevant"], bool) - - if result["has_relevant"]: - assert "most_relevant" in result - assert result["most_relevant"] in filenames - - return result - - def exec_fallback(self, p, exc): return {"think":"error","has_relevant":False} - def post(self, s, pr, res): - q, _ = pr - if not q: - print("No question asked"); return "end" - if res["has_relevant"]: - s["question"], s["relevant_file"] = q, res["most_relevant"] - print("Relevant file:", res["most_relevant"]) - return "answer" - else: - print("No relevant file:", res["think"]) - return "retry" - -class AnswerQuestion(Node): - def prep(self, s): - return s['question'], s['data'][s['relevant_file']] - def exec(self, p): - q, txt = p - return call_llm(f"Question: {q}\nText: {txt}\nAnswer in 50 words.") - def post(self, s, pr, ex): - print("Answer:", ex) - -class NoOp(Node): pass - -frf = FindRelevantFile(max_retries=3) -aq = AnswerQuestion() -noop = NoOp() - -frf - "answer" >> aq >> frf -frf - "retry" >> frf -frf - "end" >> noop - -qa = Flow(start=frf) -qa.run(shared) -``` - -================================================ -File: docs/flow.md +File: docs/core_abstraction/flow.md ================================================ --- layout: default @@ -708,7 +553,6 @@ flowchart TD - `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. - `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. - > `node.run(shared)` **does not** proceed to the successor. > This is mainly for debugging or testing a single node. > @@ -730,7 +574,6 @@ A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - It **won't** run `exec()`, as its main logic is to orchestrate its nodes. - `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. - ### Basic Flow Nesting Here's how to connect a flow to another node: @@ -801,258 +644,282 @@ flowchart LR end ``` - - ================================================ -File: docs/guide.md +File: docs/core_abstraction/node.md ================================================ --- layout: default -title: "Development Playbook" -parent: "Apps" +title: "Node" +parent: "Core Abstraction" nav_order: 1 --- -# LLM Application Development Playbook - -## System Design Steps - -1. **Project Requirements**: Clearify the requirements for your project. - -2. **Utility Functions**: Although the system acts as the main decision-maker, it depends on utility functions for routine tasks and real-world interactions: - - `call_llm` (of course) - - Routine tasks (e.g., chunking text, formatting strings) - - External inputs (e.g., searching the web, reading emails) - - Output generation (e.g., producing reports, sending emails) - - - > **If a human can’t solve it, an LLM can’t automate it!** Before building an LLM system, thoroughly understand the problem by manually solving example inputs to develop intuition. - {: .best-practice } - -3. **Flow Design (Compute)**: Create a high-level design for the application’s flow. - - Identify potential design patterns, such as Batch, Agent, or RAG. - - For each node, specify: - - **Purpose**: The high-level compute logic - - `exec`: The specific utility function to call (ideally, one function per node) - -4. **Data Schema (Data)**: Plan how data will be stored and updated. - - For simple apps, use an in-memory dictionary. - - For more complex apps or when persistence is required, use a database. - - For each node, specify: - - `prep`: How the node reads data - - `post`: How the node writes data - -5. **Implementation**: Implement nodes and flows based on the design. - - Start with a simple, direct approach (avoid over-engineering and full-scale type checking or testing). Let it fail fast to identify weaknesses. - - Add logging throughout the code to facilitate debugging. - -6. **Optimization**: - - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. - - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. - - If your flow design is already solid, move on to micro-optimizations: - - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. - - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. - - - > **You’ll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times. - > - >
- {: .best-practice } - -7. **Reliability** - - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. - - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. - - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. - -## Example LLM Project File Structure - -``` -my_project/ -├── main.py -├── flow.py -├── utils/ -│ ├── __init__.py -│ ├── call_llm.py -│ └── search_web.py -├── requirements.txt -└── docs/ - └── design.md -``` - -- **`docs/design.md`**: Contains project documentation and the details of each step above. -- **`utils/`**: Contains all utility functions. - - It’s recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. - - Each file should also include a `main()` function to try that API call -- **`flow.py`**: Implements the application’s flow, starting with node definitions followed by the overall structure. -- **`main.py`**: Serves as the project’s entry point. - -================================================ -File: docs/index.md -================================================ ---- -layout: default -title: "Home" -nav_order: 1 ---- - -# Pocket Flow - -A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. - - -We model the LLM workflow as a **Nested Directed Graph**: -- **Nodes** handle simple (LLM) tasks. -- Nodes connect through **Actions** (labeled edges) for *Agents*. -- **Flows** orchestrate a directed graph of Nodes for *Task Decomposition*. -- A Flow can be used as a Node (for **Nesting**). -- **Batch** Nodes/Flows for data-intensive tasks. -- **Async** Nodes/Flows allow waits or **Parallel** execution +# Node +A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
- +
+1. `prep(shared)` + - **Read and preprocess data** from `shared` store. + - Examples: *query DB, read files, or serialize data into a string*. + - Return `prep_res`, which is used by `exec()` and `post()`. +2. `exec(prep_res)` + - **Execute compute logic**, with optional retries and error handling (below). + - Examples: *(mostly) LLM calls, remote APIs, tool use*. + - ⚠️ This shall be only for compute and **NOT** access `shared`. + - ⚠️ If retries enabled, ensure idempotent implementation. + - Return `exec_res`, which is passed to `post()`. -> Have questions? Chat with [AI Assistant](https://chatgpt.com/g/g-677464af36588191b9eba4901946557b-mini-llm-flow-assistant) +3. `post(shared, prep_res, exec_res)` + - **Postprocess and write data** back to `shared`. + - Examples: *update DB, change states, log results*. + - **Decide the next action** by returning a *string* (`action = "default"` if *None*). + +> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. +> +> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. {: .note } +### Fault Tolerance & Retries -## Core Abstraction +You can **retry** `exec()` if it raises an exception via two parameters when define the Node: -- [Node](./node.md) -- [Flow](./flow.md) -- [Communication](./communication.md) -- [Batch](./batch.md) -- [(Advanced) Async](./async.md) -- [(Advanced) Parallel](./parallel.md) +- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). +- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). +`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. -## Utility Function +```python +my_node = SummarizeFile(max_retries=3, wait=10) +``` -- [LLM Wrapper](./llm.md) -- [Tool](./tool.md) -- [Viz and Debug](./viz.md) -- Chunking +When an exception occurs in `exec()`, the Node automatically retries until: -> We do not provide built-in utility functions. Example implementations are provided as reference. -{: .warning } +- It either succeeds, or +- The Node has retried `max_retries - 1` times already and fails on the last attempt. +You can get the current retry times (0-based) from `self.cur_retry`. -## Design Pattern +```python +class RetryNode(Node): + def exec(self, prep_res): + print(f"Retry {self.cur_retry} times") + raise Exception("Failed") +``` -- [Structured Output](./structure.md) -- [Workflow](./decomp.md) -- [Map Reduce](./mapreduce.md) -- [RAG](./rag.md) -- [Chat Memory](./memory.md) -- [Agent](./agent.md) -- [(Advanced) Multi-Agents](./multi_agent.md) -- Evaluation +### Graceful Fallback -## [LLM Application Development Playbook](./guide.md) +To **gracefully handle** the exception (after all retries) rather than raising it, override: + +```python +def exec_fallback(self, shared, prep_res, exc): + raise exc +``` + +By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. + +### Example: Summarize file + +```python +class SummarizeFile(Node): + def prep(self, shared): + return shared["data"] + + def exec(self, prep_res): + if not prep_res: + return "Empty file content" + prompt = f"Summarize this text in 10 words: {prep_res}" + summary = call_llm(prompt) # might fail + return summary + + def exec_fallback(self, shared, prep_res, exc): + # Provide a simple fallback instead of crashing + return "There was an error processing your request." + + def post(self, shared, prep_res, exec_res): + shared["summary"] = exec_res + # Return "default" by not returning + +summarize_node = SummarizeFile(max_retries=3) + +# node.run() calls prep->exec->post +# If exec() fails, it retries up to 3 times before calling exec_fallback() +action_result = summarize_node.run(shared) + +print("Action returned:", action_result) # "default" +print("Summary stored:", shared["summary"]) +``` ================================================ -File: docs/llm.md +File: docs/core_abstraction/parallel.md ================================================ --- layout: default -title: "LLM Wrapper" -parent: "Utility Function" -nav_order: 1 +title: "(Advanced) Parallel" +parent: "Core Abstraction" +nav_order: 6 --- -# LLM Wrappers +# (Advanced) Parallel -We **don't** provide built-in LLM wrappers. Instead, please implement your own, for example by asking an assistant like ChatGPT or Claude. If you ask ChatGPT to "implement a `call_llm` function that takes a prompt and returns the LLM response," you shall get something like: +**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. -```python -def call_llm(prompt): - from openai import OpenAI - client = OpenAI(api_key="YOUR_API_KEY_HERE") - r = client.chat.completions.create( - model="gpt-4o", - messages=[{"role": "user", "content": prompt}] - ) - return r.choices[0].message.content - -# Example usage -call_llm("How are you?") -``` - -> Store the API key in an environment variable like OPENAI_API_KEY for security. -{: .note } - -## Improvements -Feel free to enhance your `call_llm` function as needed. Here are examples: - -- Handle chat history: - -```python -def call_llm(messages): - from openai import OpenAI - client = OpenAI(api_key="YOUR_API_KEY_HERE") - r = client.chat.completions.create( - model="gpt-4o", - messages=messages - ) - return r.choices[0].message.content -``` - -- Add in-memory caching - -```python -from functools import lru_cache - -@lru_cache(maxsize=1000) -def call_llm(prompt): - # Your implementation here - pass -``` - -> ⚠️ Caching conflicts with Node retries, as retries yield the same result. -> -> To address this, you could use cached results only if not retried. +> Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning } +> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. +> +> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). +> +> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. +{: .best-practice } + +## AsyncParallelBatchNode + +Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: ```python -from functools import lru_cache +class ParallelSummaries(AsyncParallelBatchNode): + async def prep_async(self, shared): + # e.g., multiple texts + return shared["texts"] -@lru_cache(maxsize=1000) -def cached_call(prompt): - pass + async def exec_async(self, text): + prompt = f"Summarize: {text}" + return await call_llm_async(prompt) -def call_llm(prompt, use_cache): - if use_cache: - return cached_call(prompt) - # Call the underlying function directly - return cached_call.__wrapped__(prompt) + async def post_async(self, shared, prep_res, exec_res_list): + shared["summary"] = "\n\n".join(exec_res_list) + return "default" -class SummarizeNode(Node): - def exec(self, text): - return call_llm(f"Summarize: {text}", self.cur_retry==0) +node = ParallelSummaries() +flow = AsyncFlow(start=node) ``` +## AsyncParallelBatchFlow -- Enable logging: +Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: ```python -def call_llm(prompt): - import logging - logging.info(f"Prompt: {prompt}") - response = ... # Your implementation here - logging.info(f"Response: {response}") - return response +class SummarizeMultipleFiles(AsyncParallelBatchFlow): + async def prep_async(self, shared): + return [{"filename": f} for f in shared["files"]] + +sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) +parallel_flow = SummarizeMultipleFiles(start=sub_flow) +await parallel_flow.run_async(shared) ``` -## Why Not Provide Built-in LLM Wrappers? -I believe it is a **bad practice** to provide LLM-specific implementations in a general framework: -- **LLM APIs change frequently**. Hardcoding them makes maintenance a nightmare. -- You may need **flexibility** to switch vendors, use fine-tuned models, or deploy local LLMs. -- You may need **optimizations** like prompt caching, request batching, or response streaming. - - ================================================ -File: docs/mapreduce.md +File: docs/design_pattern/agent.md +================================================ +--- +layout: default +title: "Agent" +parent: "Design Pattern" +nav_order: 6 +--- + +# Agent + +Agent is a powerful design pattern, where node can take dynamic actions based on the context it receives. +To express an agent, create a Node (the agent) with [branching](../core_abstraction/flow.md) to other nodes (Actions). + +> The core of build **performant** and **reliable** agents boils down to: +> +> 1. **Context Management:** Provide *clear, relevant context* so agents can understand the problem.E.g., Rather than dumping an entire chat history or entire files, use a [Workflow](./workflow.md) that filters out and includes only the most relevant information. +> +> 2. **Action Space:** Define *a well-structured, unambiguous, and easy-to-use* set of actions. For instance, avoid creating overlapping actions like `read_databases` and `read_csvs`. Instead, unify data sources (e.g., move CSVs into a database) and design a single action. The action can be parameterized (e.g., string for search) or programmable (e.g., SQL queries). +{: .best-practice } + +### Example: Search Agent + +This agent: +1. Decides whether to search or answer +2. If searches, loops back to decide if more search needed +3. Answers when enough context gathered + +```python +class DecideAction(Node): + def prep(self, shared): + context = shared.get("context", "No previous search") + query = shared["query"] + return query, context + + def exec(self, inputs): + query, context = inputs + prompt = f""" +Given input: {query} +Previous search results: {context} +Should I: 1) Search web for more info 2) Answer with current knowledge +Output in yaml: +```yaml +action: search/answer +reason: why this action +search_term: search phrase if action is search +```""" + resp = call_llm(prompt) + yaml_str = resp.split("```yaml")[1].split("```")[0].strip() + result = yaml.safe_load(yaml_str) + + assert isinstance(result, dict) + assert "action" in result + assert "reason" in result + assert result["action"] in ["search", "answer"] + if result["action"] == "search": + assert "search_term" in result + + return result + + def post(self, shared, prep_res, exec_res): + if exec_res["action"] == "search": + shared["search_term"] = exec_res["search_term"] + return exec_res["action"] + +class SearchWeb(Node): + def prep(self, shared): + return shared["search_term"] + + def exec(self, search_term): + return search_web(search_term) + + def post(self, shared, prep_res, exec_res): + prev_searches = shared.get("context", []) + shared["context"] = prev_searches + [ + {"term": shared["search_term"], "result": exec_res} + ] + return "decide" + +class DirectAnswer(Node): + def prep(self, shared): + return shared["query"], shared.get("context", "") + + def exec(self, inputs): + query, context = inputs + return call_llm(f"Context: {context}\nAnswer: {query}") + + def post(self, shared, prep_res, exec_res): + print(f"Answer: {exec_res}") + shared["answer"] = exec_res + +# Connect nodes +decide = DecideAction() +search = SearchWeb() +answer = DirectAnswer() + +decide - "search" >> search +decide - "answer" >> answer +search - "decide" >> decide # Loop back + +flow = Flow(start=decide) +flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) +``` + +================================================ +File: docs/design_pattern/mapreduce.md ================================================ --- layout: default @@ -1068,8 +935,7 @@ MapReduce is a design pattern suitable when you have either: - Large output data (e.g., multiple forms to fill) and there is a logical way to break the task into smaller, ideally independent parts. -You first break down the task using [BatchNode](./batch.md) in the map phase, followed by aggregation in the reduce phase. - +You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. ### Example: Document Summarization @@ -1094,9 +960,8 @@ summarize_flow = Flow(start=map_node) summarize_flow.run(shared) ``` - ================================================ -File: docs/memory.md +File: docs/design_pattern/memory.md ================================================ --- layout: default @@ -1224,9 +1089,8 @@ shared = {} flow.run(shared) ``` - ================================================ -File: docs/multi_agent.md +File: docs/design_pattern/multi_agent.md ================================================ --- layout: default @@ -1300,7 +1164,6 @@ Agent received: Network connectivity: stable | timestamp_2 Agent received: Processing load: optimal | timestamp_3 ``` - ### Interactive Multi-Agent Example: Taboo Game Here's a more complex example where two agents play the word-guessing game Taboo. @@ -1417,181 +1280,7 @@ Game Over - Correct guess! ``` ================================================ -File: docs/node.md -================================================ ---- -layout: default -title: "Node" -parent: "Core Abstraction" -nav_order: 1 ---- - -# Node - -A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`: - -
- -
- - -1. `prep(shared)` - - **Read and preprocess data** from `shared` store. - - Examples: *query DB, read files, or serialize data into a string*. - - Return `prep_res`, which is used by `exec()` and `post()`. - -2. `exec(prep_res)` - - **Execute compute logic**, with optional retries and error handling (below). - - Examples: *(mostly) LLM calls, remote APIs, tool use*. - - ⚠️ This shall be only for compute and **NOT** access `shared`. - - ⚠️ If retries enabled, ensure idempotent implementation. - - Return `exec_res`, which is passed to `post()`. - -3. `post(shared, prep_res, exec_res)` - - **Postprocess and write data** back to `shared`. - - Examples: *update DB, change states, log results*. - - **Decide the next action** by returning a *string* (`action = "default"` if *None*). - - - -> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. -> -> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. -{: .note } - - -### Fault Tolerance & Retries - -You can **retry** `exec()` if it raises an exception via two parameters when define the Node: - -- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). -- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). -`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. - -```python -my_node = SummarizeFile(max_retries=3, wait=10) -``` - -When an exception occurs in `exec()`, the Node automatically retries until: - -- It either succeeds, or -- The Node has retried `max_retries - 1` times already and fails on the last attempt. - -You can get the current retry times (0-based) from `self.cur_retry`. - -```python -class RetryNode(Node): - def exec(self, prep_res): - print(f"Retry {self.cur_retry} times") - raise Exception("Failed") -``` - -### Graceful Fallback - -To **gracefully handle** the exception (after all retries) rather than raising it, override: - -```python -def exec_fallback(self, shared, prep_res, exc): - raise exc -``` - -By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. - -### Example: Summarize file - -```python -class SummarizeFile(Node): - def prep(self, shared): - return shared["data"] - - def exec(self, prep_res): - if not prep_res: - return "Empty file content" - prompt = f"Summarize this text in 10 words: {prep_res}" - summary = call_llm(prompt) # might fail - return summary - - def exec_fallback(self, shared, prep_res, exc): - # Provide a simple fallback instead of crashing - return "There was an error processing your request." - - def post(self, shared, prep_res, exec_res): - shared["summary"] = exec_res - # Return "default" by not returning - -summarize_node = SummarizeFile(max_retries=3) - -# node.run() calls prep->exec->post -# If exec() fails, it retries up to 3 times before calling exec_fallback() -action_result = summarize_node.run(shared) - -print("Action returned:", action_result) # "default" -print("Summary stored:", shared["summary"]) -``` - -================================================ -File: docs/parallel.md -================================================ ---- -layout: default -title: "(Advanced) Parallel" -parent: "Core Abstraction" -nav_order: 6 ---- - -# (Advanced) Parallel - -**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. - -> Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. -{: .warning } - -> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. -> -> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). -> -> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. -{: .best-practice } - - -## AsyncParallelBatchNode - -Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: - -```python -class ParallelSummaries(AsyncParallelBatchNode): - async def prep_async(self, shared): - # e.g., multiple texts - return shared["texts"] - - async def exec_async(self, text): - prompt = f"Summarize: {text}" - return await call_llm_async(prompt) - - async def post_async(self, shared, prep_res, exec_res_list): - shared["summary"] = "\n\n".join(exec_res_list) - return "default" - -node = ParallelSummaries() -flow = AsyncFlow(start=node) -``` - -## AsyncParallelBatchFlow - -Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: - -```python -class SummarizeMultipleFiles(AsyncParallelBatchFlow): - async def prep_async(self, shared): - return [{"filename": f} for f in shared["files"]] - -sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) -parallel_flow = SummarizeMultipleFiles(start=sub_flow) -await parallel_flow.run_async(shared) -``` - -================================================ -File: docs/rag.md +File: docs/design_pattern/rag.md ================================================ --- layout: default @@ -1603,7 +1292,7 @@ nav_order: 4 # RAG (Retrieval Augmented Generation) For certain LLM tasks like answering questions, providing context is essential. -Use [vector search](./tool.md) to find relevant context for LLM responses. +Use [vector search](../utility_function/tool.md) to find relevant context for LLM responses. ### Example: Question Answering @@ -1651,7 +1340,7 @@ flow.run(shared) ``` ================================================ -File: docs/structure.md +File: docs/design_pattern/structure.md ================================================ --- layout: default @@ -1769,7 +1458,159 @@ dialogue: | - Newlines are naturally preserved without needing `\n`. ================================================ -File: docs/tool.md +File: docs/design_pattern/workflow.md +================================================ +--- +layout: default +title: "Workflow" +parent: "Design Pattern" +nav_order: 2 +--- + +# Workflow + +Many real-world tasks are too complex for one LLM call. The solution is to decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes. + +> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. +> - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. +> +> You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md). +{: .best-practice } + +### Example: Article Writing + +```python +class GenerateOutline(Node): + def prep(self, shared): return shared["topic"] + def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") + def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res + +class WriteSection(Node): + def prep(self, shared): return shared["outline"] + def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") + def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res + +class ReviewAndRefine(Node): + def prep(self, shared): return shared["draft"] + def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") + def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res + +# Connect nodes +outline = GenerateOutline() +write = WriteSection() +review = ReviewAndRefine() + +outline >> write >> review + +# Create and run flow +writing_flow = Flow(start=outline) +shared = {"topic": "AI Safety"} +writing_flow.run(shared) +``` + +For *dynamic cases*, consider using [Agents](./agent.md). + +================================================ +File: docs/utility_function/llm.md +================================================ +--- +layout: default +title: "LLM Wrapper" +parent: "Utility Function" +nav_order: 1 +--- + +# LLM Wrappers + +We **don't** provide built-in LLM wrappers. Instead, please implement your own, for example by asking an assistant like ChatGPT or Claude. If you ask ChatGPT to "implement a `call_llm` function that takes a prompt and returns the LLM response," you shall get something like: + +```python +def call_llm(prompt): + from openai import OpenAI + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + +# Example usage +call_llm("How are you?") +``` + +> Store the API key in an environment variable like OPENAI_API_KEY for security. +{: .note } + +## Improvements +Feel free to enhance your `call_llm` function as needed. Here are examples: + +- Handle chat history: + +```python +def call_llm(messages): + from openai import OpenAI + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4o", + messages=messages + ) + return r.choices[0].message.content +``` + +- Add in-memory caching + +```python +from functools import lru_cache + +@lru_cache(maxsize=1000) +def call_llm(prompt): + # Your implementation here + pass +``` + +> ⚠️ Caching conflicts with Node retries, as retries yield the same result. +> +> To address this, you could use cached results only if not retried. +{: .warning } + + +```python +from functools import lru_cache + +@lru_cache(maxsize=1000) +def cached_call(prompt): + pass + +def call_llm(prompt, use_cache): + if use_cache: + return cached_call(prompt) + # Call the underlying function directly + return cached_call.__wrapped__(prompt) + +class SummarizeNode(Node): + def exec(self, text): + return call_llm(f"Summarize: {text}", self.cur_retry==0) +``` + +- Enable logging: + +```python +def call_llm(prompt): + import logging + logging.info(f"Prompt: {prompt}") + response = ... # Your implementation here + logging.info(f"Response: {response}") + return response +``` + +## Why Not Provide Built-in LLM Wrappers? +I believe it is a **bad practice** to provide LLM-specific implementations in a general framework: +- **LLM APIs change frequently**. Hardcoding them makes maintenance a nightmare. +- You may need **flexibility** to switch vendors, use fine-tuned models, or deploy local LLMs. +- You may need **optimizations** like prompt caching, request batching, or response streaming. + +================================================ +File: docs/utility_function/tool.md ================================================ --- layout: default @@ -1841,7 +1682,6 @@ def execute_sql(query): return result ``` - > ⚠️ Beware of SQL injection risk {: .warning } @@ -1989,7 +1829,7 @@ def send_email(to_address, subject, body, from_address, password): ``` ================================================ -File: docs/viz.md +File: docs/utility_function/viz.md ================================================ --- layout: default