--- layout: default title: "Agentic Coding" --- # Agentic Coding: Humans Design, Agents code! > If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification. {: .warning } ## Agentic Coding Steps Agentic Coding should be a collaboration between Human System Design and Agent Implementation: | Steps | Human | AI | Comment | |:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------| | 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. | | 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. | | 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. | | 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. | | 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. | | 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. | | 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. | 1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. - Understand AI systems' strengths and limitations: - **Good for**: Routine tasks requiring common sense (filling forms, replying to emails) - **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL) - **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning) - **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features. - **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early. 2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes. - Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)). - For each node in the flow, start with a high-level one-line description of what it does. - If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine). - If using **Agent**, specify what are the inputs (context) and what are the possible actions. - If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows. - Outline the flow and draw it in a mermaid diagram. For example: ```mermaid flowchart LR start[Start] --> batch[Batch] batch --> check[Check] check -->|OK| process check -->|Error| fix[Fix] fix --> check subgraph process[Process] step1[Step 1] --> step2[Step 2] end process --> endNode[End] ``` - > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition. {: .best-practice } 3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions. - Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world:
- Reading inputs (e.g., retrieving Slack messages, reading emails) - Writing outputs (e.g., generating reports, sending emails) - Using external tools (e.g., calling LLMs, searching the web) - **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system. - For each utility function, implement it and write a simple test. - Document their input/output, as well as why they are necessary. For example: - *Name*: Embedding (`utils/get_embedding.py`) - *Input*: `str` - *Output*: a vector of 3072 floats - *Necessity:* Used by the second node to embed text - > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. {: .best-practice } 4. **Node Design**: Plan how each node will read and write data, and use utility functions. - One core design principle for PocketFlow is to use a shared store, so start with a shared store design: - For simple systems, use an in-memory dictionary. - For more complex systems or when persistence is required, use a database. - **Don't Repeat Yourself**: Use in-memory references or foreign keys. - Example shared store design: ```python shared = { "user": { "id": "user123", "context": { # Another nested dict "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} # Empty dict to store outputs } ``` - For each node, describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: - `type`: Regular (or Batch, or Async) - `prep`: Read "text" from the shared store - `exec`: Call the embedding utility function - `post`: Write "embedding" to the shared store 5. **Implementation**: Implement the initial nodes and flows based on the design. - 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins! - **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking. - **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system. - Add logging throughout the code to facilitate debugging. 7. **Optimization**: - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. - If your flow design is already solid, move on to micro-optimizations: - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. - > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times. > >
{: .best-practice } 8. **Reliability** - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. ## Example LLM Project File Structure ``` my_project/ ├── main.py ├── flow.py ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py ├── requirements.txt └── docs/ └── design.md ``` - **`docs/design.md`**: Contains project documentation for each step above. This should be high-level and no-code. - **`utils/`**: Contains all utility functions. - It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. - Each file should also include a `main()` function to try that API call - **`flow.py`**: Implements the system's flow, starting with node definitions followed by the overall structure. - **`main.py`**: Serves as the project's entry point. ================================================ File: docs/index.md ================================================ --- layout: default title: "Home" nav_order: 1 --- # Pocket Flow A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. - **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in. - **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more. - **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction We model the LLM workflow as a **Graph + Shared Store**: - [Node](./core_abstraction/node.md) handles simple (LLM) tasks. - [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges). - [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows. - [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks. - [Async](./core_abstraction/async.md) nodes/flows allow waiting for asynchronous tasks. - [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
## Design Pattern From there, it’s easy to implement popular design patterns: - [Agent](./design_pattern/agent.md) autonomously makes decisions. - [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines. - [RAG](./design_pattern/rag.md) integrates data retrieval with generation. - [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps. - [Structured Output](./design_pattern/structure.md) formats outputs consistently. - [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*: - [LLM Wrapper](./utility_function/llm.md) - [Viz and Debug](./utility_function/viz.md) - [Web Search](./utility_function/websearch.md) - [Chunking](./utility_function/chunking.md) - [Embedding](./utility_function/embedding.md) - [Vector Databases](./utility_function/vector.md) - [Text-to-Speech](./utility_function/text_to_speech.md) **Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework: - *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs. - *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally. - *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in. ## Ready to build your Apps? Check out [Agentic Coding Guidance](./guide.md), the fastest way to develop LLM projects with Pocket Flow! ================================================ File: docs/core_abstraction/async.md ================================================ --- layout: default title: "(Advanced) Async" parent: "Core Abstraction" nav_order: 5 --- # (Advanced) Async **Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: 1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. 2. **exec_async()**: Typically used for async LLM calls. 3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. **Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. ### Example ```python class SummarizeThenVerify(AsyncNode): async def prep_async(self, shared): # Example: read a file asynchronously doc_text = await read_file_async(shared["doc_path"]) return doc_text async def exec_async(self, prep_res): # Example: async LLM call summary = await call_llm_async(f"Summarize: {prep_res}") return summary async def post_async(self, shared, prep_res, exec_res): # Example: wait for user feedback decision = await gather_user_feedback(exec_res) if decision == "approve": shared["summary"] = exec_res return "approve" return "deny" summarize_node = SummarizeThenVerify() final_node = Finalize() # Define transitions summarize_node - "approve" >> final_node summarize_node - "deny" >> summarize_node # retry flow = AsyncFlow(start=summarize_node) async def main(): shared = {"doc_path": "document.txt"} await flow.run_async(shared) print("Final Summary:", shared.get("summary")) asyncio.run(main()) ``` ================================================ File: docs/core_abstraction/batch.md ================================================ --- layout: default title: "Batch" parent: "Core Abstraction" nav_order: 4 --- # Batch **Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: - **Chunk-based** processing (e.g., splitting large texts). - **Iterative** processing over lists of input items (e.g., user queries, files, URLs). ## 1. BatchNode A **BatchNode** extends `Node` but changes `prep()` and `exec()`: - **`prep(shared)`**: returns an **iterable** (e.g., list, generator). - **`exec(item)`**: called **once** per item in that iterable. - **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. ### Example: Summarize a Large File ```python class MapSummaries(BatchNode): def prep(self, shared): # Suppose we have a big file; chunk it content = shared["data"] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] return chunks def exec(self, chunk): prompt = f"Summarize this chunk in 10 words: {chunk}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res_list): combined = "\n".join(exec_res_list) shared["summary"] = combined return "default" map_summaries = MapSummaries() flow = Flow(start=map_summaries) flow.run(shared) ``` --- ## 2. BatchFlow A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. ### Example: Summarize Many Files ```python class SummarizeAllFiles(BatchFlow): def prep(self, shared): # Return a list of param dicts (one per file) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames] # Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): summarize_file = SummarizeFile(start=load_file) # Wrap that flow into a BatchFlow: summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files.run(shared) ``` ### Under the Hood 1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. 2. The **BatchFlow** loops through each dict. For each one: - It merges the dict with the BatchFlow’s own `params`. - It calls `flow.run(shared)` using the merged result. 3. This means the sub-Flow is run **repeatedly**, once for every param dict. --- ## 3. Nested or Multi-Level Batches You can nest a **BatchFlow** in another **BatchFlow**. For instance: - **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). - **Inner** batch: returning a list of per-file param dicts. At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. ```python class FileBatchFlow(BatchFlow): def prep(self, shared): directory = self.params["directory"] # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files] class DirectoryBatchFlow(BatchFlow): def prep(self, shared): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] # MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} inner_flow = FileBatchFlow(start=MapSummaries()) outer_flow = DirectoryBatchFlow(start=inner_flow) ``` ================================================ File: docs/core_abstraction/communication.md ================================================ --- layout: default title: "Communication" parent: "Core Abstraction" nav_order: 3 --- # Communication Nodes and Flows **communicate** in 2 ways: 1. **Shared Store (for almost all the cases)** - A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`). - Great for data results, large content, or anything multiple nodes need. - You shall design the data structure and populate it ahead. - > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md). {: .best-practice } 2. **Params (only for [Batch](./batch.md))** - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. - Good for identifiers like filenames or numeric IDs, in Batch mode. If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). --- ## 1. Shared Store ### Overview A shared store is typically an in-mem dictionary, like: ```python shared = {"data": {}, "summary": {}, "config": {...}, ...} ``` It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. ### Example ```python class LoadData(Node): def post(self, shared, prep_res, exec_res): # We write data to shared store shared["data"] = "Some text content" return None class Summarize(Node): def prep(self, shared): # We read data from shared store return shared["data"] def exec(self, prep_res): # Call LLM to summarize prompt = f"Summarize: {prep_res}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # We write summary to shared store shared["summary"] = exec_res return "default" load_data = LoadData() summarize = Summarize() load_data >> summarize flow = Flow(start=load_data) shared = {} flow.run(shared) ``` Here: - `LoadData` writes to `shared["data"]`. - `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. --- ## 2. Params **Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: - **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`). - **Set** via `set_params()`. - **Cleared** and updated each time a parent Flow calls it. > Only set the uppermost Flow params because others will be overwritten by the parent Flow. > > If you need to set child node params, see [Batch](./batch.md). {: .warning } Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. ### Example ```python # 1) Create a Node that uses params class SummarizeFile(Node): def prep(self, shared): # Access the node's param filename = self.params["filename"] return shared["data"].get(filename, "") def exec(self, prep_res): prompt = f"Summarize: {prep_res}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): filename = self.params["filename"] shared["summary"][filename] = exec_res return "default" # 2) Set params node = SummarizeFile() # 3) Set Node params directly (for testing) node.set_params({"filename": "doc1.txt"}) node.run(shared) # 4) Create Flow flow = Flow(start=node) # 5) Set Flow params (overwrites node params) flow.set_params({"filename": "doc2.txt"}) flow.run(shared) # The node summarizes doc2, not doc1 ``` ================================================ File: docs/core_abstraction/flow.md ================================================ --- layout: default title: "Flow" parent: "Core Abstraction" nav_order: 2 --- # Flow A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`. ## 1. Action-based Transitions Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`. You define transitions with the syntax: 1. **Basic default transition**: `node_a >> node_b` This means if `node_a.post()` returns `"default"`, go to `node_b`. (Equivalent to `node_a - "default" >> node_b`) 2. **Named action transition**: `node_a - "action_name" >> node_b` This means if `node_a.post()` returns `"action_name"`, go to `node_b`. It's possible to create loops, branching, or multi-step flows. ## 2. Creating a Flow A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node. ### Example: Simple Sequence Here's a minimal flow of two nodes in a chain: ```python node_a >> node_b flow = Flow(start=node_a) flow.run(shared) ``` - When you run the flow, it executes `node_a`. - Suppose `node_a.post()` returns `"default"`. - The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. - `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there. ### Example: Branching & Looping Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: - `"approved"`: expense is approved, move to payment processing - `"needs_revision"`: expense needs changes, send back for revision - `"rejected"`: expense is denied, finish the process We can wire them like this: ```python # Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process revise >> review # After revision, go back for another review payment >> finish # After payment, finish the process flow = Flow(start=review) ``` Let's see how it flows: 1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node 2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review` 3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops ```mermaid flowchart TD review[Review Expense] -->|approved| payment[Process Payment] review -->|needs_revision| revise[Revise Report] review -->|rejected| finish[Finish Process] revise --> review payment --> finish ``` ### Running Individual Nodes vs. Running a Flow - `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. - `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. > `node.run(shared)` **does not** proceed to the successor. > This is mainly for debugging or testing a single node. > > Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. {: .warning } ## 3. Nested Flows A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: 1. Use a Flow as a Node within another Flow's transitions. 2. Combine multiple smaller Flows into a larger Flow for reuse. 3. Node `params` will be a merging of **all** parents' `params`. ### Flow's Node Methods A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - It **won't** run `exec()`, as its main logic is to orchestrate its nodes. - `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. ### Basic Flow Nesting Here's how to connect a flow to another node: ```python # Create a sub-flow node_a >> node_b subflow = Flow(start=node_a) # Connect it to another node subflow >> node_c # Create the parent flow parent_flow = Flow(start=subflow) ``` When `parent_flow.run()` executes: 1. It starts `subflow` 2. `subflow` runs through its nodes (`node_a->node_b`) 3. After `subflow` completes, execution continues to `node_c` ### Example: Order Processing Pipeline Here's a practical example that breaks down order processing into nested flows: ```python # Payment processing sub-flow validate_payment >> process_payment >> payment_confirmation payment_flow = Flow(start=validate_payment) # Inventory sub-flow check_stock >> reserve_items >> update_inventory inventory_flow = Flow(start=check_stock) # Shipping sub-flow create_label >> assign_carrier >> schedule_pickup shipping_flow = Flow(start=create_label) # Connect the flows into a main order pipeline payment_flow >> inventory_flow >> shipping_flow # Create the master flow order_pipeline = Flow(start=payment_flow) # Run the entire pipeline order_pipeline.run(shared_data) ``` This creates a clean separation of concerns while maintaining a clear execution path: ```mermaid flowchart LR subgraph order_pipeline[Order Pipeline] subgraph paymentFlow["Payment Flow"] A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] end subgraph inventoryFlow["Inventory Flow"] D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] end subgraph shippingFlow["Shipping Flow"] G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end ``` ================================================ File: docs/core_abstraction/node.md ================================================ --- layout: default title: "Node" parent: "Core Abstraction" nav_order: 1 --- # Node A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
1. `prep(shared)` - **Read and preprocess data** from `shared` store. - Examples: *query DB, read files, or serialize data into a string*. - Return `prep_res`, which is used by `exec()` and `post()`. 2. `exec(prep_res)` - **Execute compute logic**, with optional retries and error handling (below). - Examples: *(mostly) LLM calls, remote APIs, tool use*. - ⚠️ This shall be only for compute and **NOT** access `shared`. - ⚠️ If retries enabled, ensure idempotent implementation. - Return `exec_res`, which is passed to `post()`. 3. `post(shared, prep_res, exec_res)` - **Postprocess and write data** back to `shared`. - Examples: *update DB, change states, log results*. - **Decide the next action** by returning a *string* (`action = "default"` if *None*). > **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. > > All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. {: .note } ### Fault Tolerance & Retries You can **retry** `exec()` if it raises an exception via two parameters when define the Node: - `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). - `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). `wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. ```python my_node = SummarizeFile(max_retries=3, wait=10) ``` When an exception occurs in `exec()`, the Node automatically retries until: - It either succeeds, or - The Node has retried `max_retries - 1` times already and fails on the last attempt. You can get the current retry times (0-based) from `self.cur_retry`. ```python class RetryNode(Node): def exec(self, prep_res): print(f"Retry {self.cur_retry} times") raise Exception("Failed") ``` ### Graceful Fallback To **gracefully handle** the exception (after all retries) rather than raising it, override: ```python def exec_fallback(self, prep_res, exc): raise exc ``` By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. ### Example: Summarize file ```python class SummarizeFile(Node): def prep(self, shared): return shared["data"] def exec(self, prep_res): if not prep_res: return "Empty file content" prompt = f"Summarize this text in 10 words: {prep_res}" summary = call_llm(prompt) # might fail return summary def exec_fallback(self, prep_res, exc): # Provide a simple fallback instead of crashing return "There was an error processing your request." def post(self, shared, prep_res, exec_res): shared["summary"] = exec_res # Return "default" by not returning summarize_node = SummarizeFile(max_retries=3) # node.run() calls prep->exec->post # If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = summarize_node.run(shared) print("Action returned:", action_result) # "default" print("Summary stored:", shared["summary"]) ``` ================================================ File: docs/core_abstraction/parallel.md ================================================ --- layout: default title: "(Advanced) Parallel" parent: "Core Abstraction" nav_order: 6 --- # (Advanced) Parallel **Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. > Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning } > - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. > > - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). > > - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. {: .best-practice } ## AsyncParallelBatchNode Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: ```python class ParallelSummaries(AsyncParallelBatchNode): async def prep_async(self, shared): # e.g., multiple texts return shared["texts"] async def exec_async(self, text): prompt = f"Summarize: {text}" return await call_llm_async(prompt) async def post_async(self, shared, prep_res, exec_res_list): shared["summary"] = "\n\n".join(exec_res_list) return "default" node = ParallelSummaries() flow = AsyncFlow(start=node) ``` ## AsyncParallelBatchFlow Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: ```python class SummarizeMultipleFiles(AsyncParallelBatchFlow): async def prep_async(self, shared): return [{"filename": f} for f in shared["files"]] sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) parallel_flow = SummarizeMultipleFiles(start=sub_flow) await parallel_flow.run_async(shared) ``` ================================================ File: docs/design_pattern/agent.md ================================================ --- layout: default title: "Agent" parent: "Design Pattern" nav_order: 1 --- # Agent Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph 1. **Context and Action:** Implement nodes that supply context and perform actions. 2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step. 3. **Agent Node:** Provide a prompt to decide action—for example: ```python f""" ### CONTEXT Task: {task_description} Previous Actions: {previous_actions} Current State: {current_state} ### ACTION SPACE [1] search Description: Use web search to get results Parameters: - query (str): What to search for [2] answer Description: Conclude based on the results Parameters: - result (str): Final answer to provide ### NEXT ACTION Decide the next action based on the current context and available action space. Return your response in the following format: ```yaml thinking: | action: parameters: : ```""" ``` The core of building **high-performance** and **reliable** agents boils down to: 1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content. 2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database. ## Example Good Action Design - **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once. - **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). - **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. - **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. ## Example: Search Agent This agent: 1. Decides whether to search or answer 2. If searches, loops back to decide if more search needed 3. Answers when enough context gathered ```python class DecideAction(Node): def prep(self, shared): context = shared.get("context", "No previous search") query = shared["query"] return query, context def exec(self, inputs): query, context = inputs prompt = f""" Given input: {query} Previous search results: {context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: ```yaml action: search/answer reason: why this action search_term: search phrase if action is search ```""" resp = call_llm(prompt) yaml_str = resp.split("```yaml")[1].split("```")[0].strip() result = yaml.safe_load(yaml_str) assert isinstance(result, dict) assert "action" in result assert "reason" in result assert result["action"] in ["search", "answer"] if result["action"] == "search": assert "search_term" in result return result def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_term"] = exec_res["search_term"] return exec_res["action"] class SearchWeb(Node): def prep(self, shared): return shared["search_term"] def exec(self, search_term): return search_web(search_term) def post(self, shared, prep_res, exec_res): prev_searches = shared.get("context", []) shared["context"] = prev_searches + [ {"term": shared["search_term"], "result": exec_res} ] return "decide" class DirectAnswer(Node): def prep(self, shared): return shared["query"], shared.get("context", "") def exec(self, inputs): query, context = inputs return call_llm(f"Context: {context}\nAnswer: {query}") def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") shared["answer"] = exec_res # Connect nodes decide = DecideAction() search = SearchWeb() answer = DirectAnswer() decide - "search" >> search decide - "answer" >> answer search - "decide" >> decide # Loop back flow = Flow(start=decide) flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) ``` ================================================ File: docs/design_pattern/mapreduce.md ================================================ --- layout: default title: "Map Reduce" parent: "Design Pattern" nav_order: 4 --- # Map Reduce MapReduce is a design pattern suitable when you have either: - Large input data (e.g., multiple files to process), or - Large output data (e.g., multiple forms to fill) and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. ### Example: Document Summarization ```python class SummarizeAllFiles(BatchNode): def prep(self, shared): files_dict = shared["files"] # e.g. 10 files return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] def exec(self, one_file): filename, file_content = one_file summary_text = call_llm(f"Summarize the following file:\n{file_content}") return (filename, summary_text) def post(self, shared, prep_res, exec_res_list): shared["file_summaries"] = dict(exec_res_list) class CombineSummaries(Node): def prep(self, shared): return shared["file_summaries"] def exec(self, file_summaries): # format as: "File1: summary\nFile2: summary...\n" text_list = [] for fname, summ in file_summaries.items(): text_list.append(f"{fname} summary:\n{summ}\n") big_text = "\n---\n".join(text_list) return call_llm(f"Combine these file summaries into one final summary:\n{big_text}") def post(self, shared, prep_res, final_summary): shared["all_files_summary"] = final_summary batch_node = SummarizeAllFiles() combine_node = CombineSummaries() batch_node >> combine_node flow = Flow(start=batch_node) shared = { "files": { "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", "file2.txt": "Some other interesting text ...", # ... } } flow.run(shared) print("Individual Summaries:", shared["file_summaries"]) print("\nFinal Summary:\n", shared["all_files_summary"]) ``` ================================================ File: docs/design_pattern/multi_agent.md ================================================ --- layout: default title: "(Advanced) Multi-Agents" parent: "Design Pattern" nav_order: 6 --- # (Advanced) Multi-Agents Multiple [Agents](./flow.md) can work together by handling subtasks and communicating the progress. Communication between agents is typically implemented using message queues in shared storage. > Most of time, you don't need Multi-Agents. Start with a simple solution first. {: .best-practice } ### Example Agent Communication: Message Queue Here's a simple example showing how to implement agent communication using `asyncio.Queue`. The agent listens for messages, processes them, and continues listening: ```python class AgentNode(AsyncNode): async def prep_async(self, _): message_queue = self.params["messages"] message = await message_queue.get() print(f"Agent received: {message}") return message # Create node and flow agent = AgentNode() agent >> agent # connect to self flow = AsyncFlow(start=agent) # Create heartbeat sender async def send_system_messages(message_queue): counter = 0 messages = [ "System status: all systems operational", "Memory usage: normal", "Network connectivity: stable", "Processing load: optimal" ] while True: message = f"{messages[counter % len(messages)]} | timestamp_{counter}" await message_queue.put(message) counter += 1 await asyncio.sleep(1) async def main(): message_queue = asyncio.Queue() shared = {} flow.set_params({"messages": message_queue}) # Run both coroutines await asyncio.gather( flow.run_async(shared), send_system_messages(message_queue) ) asyncio.run(main()) ``` The output: ``` Agent received: System status: all systems operational | timestamp_0 Agent received: Memory usage: normal | timestamp_1 Agent received: Network connectivity: stable | timestamp_2 Agent received: Processing load: optimal | timestamp_3 ``` ### Interactive Multi-Agent Example: Taboo Game Here's a more complex example where two agents play the word-guessing game Taboo. One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word: ```python class AsyncHinter(AsyncNode): async def prep_async(self, shared): guess = await shared["hinter_queue"].get() if guess == "GAME_OVER": return None return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", []) async def exec_async(self, inputs): if inputs is None: return None target, forbidden, past_guesses = inputs prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" if past_guesses: prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific." prompt += "\nUse at most 5 words." hint = call_llm(prompt) print(f"\nHinter: Here's your hint - {hint}") return hint async def post_async(self, shared, prep_res, exec_res): if exec_res is None: return "end" await shared["guesser_queue"].put(exec_res) return "continue" class AsyncGuesser(AsyncNode): async def prep_async(self, shared): hint = await shared["guesser_queue"].get() return hint, shared.get("past_guesses", []) async def exec_async(self, inputs): hint, past_guesses = inputs prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:" guess = call_llm(prompt) print(f"Guesser: I guess it's - {guess}") return guess async def post_async(self, shared, prep_res, exec_res): if exec_res.lower() == shared["target_word"].lower(): print("Game Over - Correct guess!") await shared["hinter_queue"].put("GAME_OVER") return "end" if "past_guesses" not in shared: shared["past_guesses"] = [] shared["past_guesses"].append(exec_res) await shared["hinter_queue"].put(exec_res) return "continue" async def main(): # Set up game shared = { "target_word": "nostalgia", "forbidden_words": ["memory", "past", "remember", "feeling", "longing"], "hinter_queue": asyncio.Queue(), "guesser_queue": asyncio.Queue() } print("Game starting!") print(f"Target word: {shared['target_word']}") print(f"Forbidden words: {shared['forbidden_words']}") # Initialize by sending empty guess to hinter await shared["hinter_queue"].put("") # Create nodes and flows hinter = AsyncHinter() guesser = AsyncGuesser() # Set up flows hinter_flow = AsyncFlow(start=hinter) guesser_flow = AsyncFlow(start=guesser) # Connect nodes to themselves hinter - "continue" >> hinter guesser - "continue" >> guesser # Run both agents concurrently await asyncio.gather( hinter_flow.run_async(shared), guesser_flow.run_async(shared) ) asyncio.run(main()) ``` The Output: ``` Game starting! Target word: nostalgia Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing'] Hinter: Here's your hint - Thinking of childhood summer days Guesser: I guess it's - popsicle Hinter: Here's your hint - When childhood cartoons make you emotional Guesser: I guess it's - nostalgic Hinter: Here's your hint - When old songs move you Guesser: I guess it's - memories Hinter: Here's your hint - That warm emotion about childhood Guesser: I guess it's - nostalgia Game Over - Correct guess! ``` ================================================ File: docs/design_pattern/rag.md ================================================ --- layout: default title: "RAG" parent: "Design Pattern" nav_order: 3 --- # RAG (Retrieval Augmented Generation) For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index"). 2. **Online stage**: Given a question, generate answers by retrieving the most relevant context. --- ## Stage 1: Offline Indexing We create three Nodes: 1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text. 2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk. 3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md). ```python class ChunkDocs(BatchNode): def prep(self, shared): # A list of file paths in shared["files"]. We process each file. return shared["files"] def exec(self, filepath): # read file content. In real usage, do error handling. with open(filepath, "r", encoding="utf-8") as f: text = f.read() # chunk by 100 chars each chunks = [] size = 100 for i in range(0, len(text), size): chunks.append(text[i : i + size]) return chunks def post(self, shared, prep_res, exec_res_list): # exec_res_list is a list of chunk-lists, one per file. # flatten them all into a single list of chunks. all_chunks = [] for chunk_list in exec_res_list: all_chunks.extend(chunk_list) shared["all_chunks"] = all_chunks class EmbedDocs(BatchNode): def prep(self, shared): return shared["all_chunks"] def exec(self, chunk): return get_embedding(chunk) def post(self, shared, prep_res, exec_res_list): # Store the list of embeddings. shared["all_embeds"] = exec_res_list print(f"Total embeddings: {len(exec_res_list)}") class StoreIndex(Node): def prep(self, shared): # We'll read all embeds from shared. return shared["all_embeds"] def exec(self, all_embeds): # Create a vector index (faiss or other DB in real usage). index = create_index(all_embeds) return index def post(self, shared, prep_res, index): shared["index"] = index # Wire them in sequence chunk_node = ChunkDocs() embed_node = EmbedDocs() store_node = StoreIndex() chunk_node >> embed_node >> store_node OfflineFlow = Flow(start=chunk_node) ``` Usage example: ```python shared = { "files": ["doc1.txt", "doc2.txt"], # any text files } OfflineFlow.run(shared) ``` --- ## Stage 2: Online Query & Answer We have 3 nodes: 1. `EmbedQuery` – embeds the user’s question. 2. `RetrieveDocs` – retrieves top chunk from the index. 3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer. ```python class EmbedQuery(Node): def prep(self, shared): return shared["question"] def exec(self, question): return get_embedding(question) def post(self, shared, prep_res, q_emb): shared["q_emb"] = q_emb class RetrieveDocs(Node): def prep(self, shared): # We'll need the query embedding, plus the offline index/chunks return shared["q_emb"], shared["index"], shared["all_chunks"] def exec(self, inputs): q_emb, index, chunks = inputs I, D = search_index(index, q_emb, top_k=1) best_id = I[0][0] relevant_chunk = chunks[best_id] return relevant_chunk def post(self, shared, prep_res, relevant_chunk): shared["retrieved_chunk"] = relevant_chunk print("Retrieved chunk:", relevant_chunk[:60], "...") class GenerateAnswer(Node): def prep(self, shared): return shared["question"], shared["retrieved_chunk"] def exec(self, inputs): question, chunk = inputs prompt = f"Question: {question}\nContext: {chunk}\nAnswer:" return call_llm(prompt) def post(self, shared, prep_res, answer): shared["answer"] = answer print("Answer:", answer) embed_qnode = EmbedQuery() retrieve_node = RetrieveDocs() generate_node = GenerateAnswer() embed_qnode >> retrieve_node >> generate_node OnlineFlow = Flow(start=embed_qnode) ``` Usage example: ```python # Suppose we already ran OfflineFlow and have: # shared["all_chunks"], shared["index"], etc. shared["question"] = "Why do people like cats?" OnlineFlow.run(shared) # final answer in shared["answer"] ``` ================================================ File: docs/design_pattern/structure.md ================================================ --- layout: default title: "Structured Output" parent: "Design Pattern" nav_order: 5 --- # Structured Output In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. There are several approaches to achieve a structured output: - **Prompting** the LLM to strictly return a defined structure. - Using LLMs that natively support **schema enforcement**. - **Post-processing** the LLM's response to extract structured content. In practice, **Prompting** is simple and reliable for modern LLMs. ### Example Use Cases - Extracting Key Information ```yaml product: name: Widget Pro price: 199.99 description: | A high-quality widget designed for professionals. Recommended for advanced users. ``` - Summarizing Documents into Bullet Points ```yaml summary: - This product is easy to use. - It is cost-effective. - Suitable for all skill levels. ``` - Generating Configuration Files ```yaml server: host: 127.0.0.1 port: 8080 ssl: true ``` ## Prompt Engineering When prompting the LLM to produce **structured** output: 1. **Wrap** the structure in code fences (e.g., `yaml`). 2. **Validate** that all required fields exist (and let `Node` handles retry). ### Example Text Summarization ```python class SummarizeNode(Node): def exec(self, prep_res): # Suppose `prep_res` is the text to summarize. prompt = f""" Please summarize the following text as YAML, with exactly 3 bullet points {prep_res} Now, output: ```yaml summary: - bullet 1 - bullet 2 - bullet 3 ```""" response = call_llm(prompt) yaml_str = response.split("```yaml")[1].split("```")[0].strip() import yaml structured_result = yaml.safe_load(yaml_str) assert "summary" in structured_result assert isinstance(structured_result["summary"], list) return structured_result ``` > Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic) {: .note } ### Why YAML instead of JSON? Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. **In JSON** ```json { "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" } ``` - Every double quote inside the string must be escaped with `\"`. - Each newline in the dialogue must be represented as `\n`. **In YAML** ```yaml dialogue: | Alice said: "Hello Bob. How are you? I am good." ``` - No need to escape interior quotes—just place the entire text under a block literal (`|`). - Newlines are naturally preserved without needing `\n`. ================================================ File: docs/design_pattern/workflow.md ================================================ --- layout: default title: "Workflow" parent: "Design Pattern" nav_order: 2 --- # Workflow Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. > - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. > > You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md). {: .best-practice } ### Example: Article Writing ```python class GenerateOutline(Node): def prep(self, shared): return shared["topic"] def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res class WriteSection(Node): def prep(self, shared): return shared["outline"] def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res class ReviewAndRefine(Node): def prep(self, shared): return shared["draft"] def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res # Connect nodes outline = GenerateOutline() write = WriteSection() review = ReviewAndRefine() outline >> write >> review # Create and run flow writing_flow = Flow(start=outline) shared = {"topic": "AI Safety"} writing_flow.run(shared) ``` For *dynamic cases*, consider using [Agents](./agent.md). ================================================ File: docs/utility_function/chunking.md ================================================ --- layout: default title: "Text Chunking" parent: "Utility Function" nav_order: 4 --- # Text Chunking We recommend some implementations of commonly used text chunking approaches. > Text Chunking is more a micro optimization, compared to the Flow Design. > > It's recommended to start with the Naive Chunking and optimize later. {: .best-practice } --- ## Example Python Code Samples ### 1. Naive (Fixed-Size) Chunking Splits text by a fixed number of words, ignoring sentence or semantic boundaries. ```python def fixed_size_chunk(text, chunk_size=100): chunks = [] for i in range(0, len(text), chunk_size): chunks.append(text[i : i + chunk_size]) return chunks ``` However, sentences are often cut awkwardly, losing coherence. ### 2. Sentence-Based Chunking ```python import nltk def sentence_based_chunk(text, max_sentences=2): sentences = nltk.sent_tokenize(text) chunks = [] for i in range(0, len(sentences), max_sentences): chunks.append(" ".join(sentences[i : i + max_sentences])) return chunks ``` However, might not handle very long sentences or paragraphs well. ### 3. Other Chunking - **Paragraph-Based**: Split text by paragraphs (e.g., newlines). Large paragraphs can create big chunks. - **Semantic**: Use embeddings or topic modeling to chunk by semantic boundaries. - **Agentic**: Use an LLM to decide chunk boundaries based on context or meaning. ================================================ File: docs/utility_function/embedding.md ================================================ --- layout: default title: "Embedding" parent: "Utility Function" nav_order: 5 --- # Embedding Below you will find an overview table of various text embedding APIs, along with example Python code. > Embedding is more a micro optimization, compared to the Flow Design. > > It's recommended to start with the most convenient one and optimize later. {: .best-practice } | **API** | **Free Tier** | **Pricing Model** | **Docs** | | --- | --- | --- | --- | | **OpenAI** | ~$5 credit | ~$0.0001/1K tokens | [OpenAI Embeddings](https://platform.openai.com/docs/api-reference/embeddings) | | **Azure OpenAI** | $200 credit | Same as OpenAI (~$0.0001/1K tokens) | [Azure OpenAI Embeddings](https://learn.microsoft.com/azure/cognitive-services/openai/how-to/create-resource?tabs=portal) | | **Google Vertex AI** | $300 credit | ~$0.025 / million chars | [Vertex AI Embeddings](https://cloud.google.com/vertex-ai/docs/generative-ai/embeddings/get-text-embeddings) | | **AWS Bedrock** | No free tier, but AWS credits may apply | ~$0.00002/1K tokens (Titan V2) | [Amazon Bedrock](https://docs.aws.amazon.com/bedrock/) | | **Cohere** | Limited free tier | ~$0.0001/1K tokens | [Cohere Embeddings](https://docs.cohere.com/docs/cohere-embed) | | **Hugging Face** | ~$0.10 free compute monthly | Pay per second of compute | [HF Inference API](https://huggingface.co/docs/api-inference) | | **Jina** | 1M tokens free | Pay per token after | [Jina Embeddings](https://jina.ai/embeddings/) | ## Example Python Code ### 1. OpenAI ```python import openai openai.api_key = "YOUR_API_KEY" resp = openai.Embedding.create(model="text-embedding-ada-002", input="Hello world") vec = resp["data"][0]["embedding"] print(vec) ``` ### 2. Azure OpenAI ```python import openai openai.api_type = "azure" openai.api_base = "https://YOUR_RESOURCE_NAME.openai.azure.com" openai.api_version = "2023-03-15-preview" openai.api_key = "YOUR_AZURE_API_KEY" resp = openai.Embedding.create(engine="ada-embedding", input="Hello world") vec = resp["data"][0]["embedding"] print(vec) ``` ### 3. Google Vertex AI ```python from vertexai.preview.language_models import TextEmbeddingModel import vertexai vertexai.init(project="YOUR_GCP_PROJECT_ID", location="us-central1") model = TextEmbeddingModel.from_pretrained("textembedding-gecko@001") emb = model.get_embeddings(["Hello world"]) print(emb[0]) ``` ### 4. AWS Bedrock ```python import boto3, json client = boto3.client("bedrock-runtime", region_name="us-east-1") body = {"inputText": "Hello world"} resp = client.invoke_model(modelId="amazon.titan-embed-text-v2:0", contentType="application/json", body=json.dumps(body)) resp_body = json.loads(resp["body"].read()) vec = resp_body["embedding"] print(vec) ``` ### 5. Cohere ```python import cohere co = cohere.Client("YOUR_API_KEY") resp = co.embed(texts=["Hello world"]) vec = resp.embeddings[0] print(vec) ``` ### 6. Hugging Face ```python import requests API_URL = "https://api-inference.huggingface.co/models/sentence-transformers/all-MiniLM-L6-v2" HEADERS = {"Authorization": "Bearer YOUR_HF_TOKEN"} res = requests.post(API_URL, headers=HEADERS, json={"inputs": "Hello world"}) vec = res.json()[0] print(vec) ``` ### 7. Jina ```python import requests url = "https://api.jina.ai/v2/embed" headers = {"Authorization": "Bearer YOUR_JINA_TOKEN"} payload = {"data": ["Hello world"], "model": "jina-embeddings-v3"} res = requests.post(url, headers=headers, json=payload) vec = res.json()["data"][0]["embedding"] print(vec) ``` ================================================ File: docs/utility_function/llm.md ================================================ --- layout: default title: "LLM Wrapper" parent: "Utility Function" nav_order: 1 --- # LLM Wrappers Check out libraries like [litellm](https://github.com/BerriAI/litellm). Here, we provide some minimal example implementations: 1. OpenAI ```python def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content # Example usage call_llm("How are you?") ``` > Store the API key in an environment variable like OPENAI_API_KEY for security. {: .best-practice } 2. Claude (Anthropic) ```python def call_llm(prompt): from anthropic import Anthropic client = Anthropic(api_key="YOUR_API_KEY_HERE") response = client.messages.create( model="claude-2", messages=[{"role": "user", "content": prompt}], max_tokens=100 ) return response.content ``` 3. Google (Generative AI Studio / PaLM API) ```python def call_llm(prompt): import google.generativeai as genai genai.configure(api_key="YOUR_API_KEY_HERE") response = genai.generate_text( model="models/text-bison-001", prompt=prompt ) return response.result ``` 4. Azure (Azure OpenAI) ```python def call_llm(prompt): from openai import AzureOpenAI client = AzureOpenAI( azure_endpoint="https://.openai.azure.com/", api_key="YOUR_API_KEY_HERE", api_version="2023-05-15" ) r = client.chat.completions.create( model="", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content ``` 5. Ollama (Local LLM) ```python def call_llm(prompt): from ollama import chat response = chat( model="llama2", messages=[{"role": "user", "content": prompt}] ) return response.message.content ``` ## Improvements Feel free to enhance your `call_llm` function as needed. Here are examples: - Handle chat history: ```python def call_llm(messages): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=messages ) return r.choices[0].message.content ``` - Add in-memory caching ```python from functools import lru_cache @lru_cache(maxsize=1000) def call_llm(prompt): # Your implementation here pass ``` > ⚠️ Caching conflicts with Node retries, as retries yield the same result. > > To address this, you could use cached results only if not retried. {: .warning } ```python from functools import lru_cache @lru_cache(maxsize=1000) def cached_call(prompt): pass def call_llm(prompt, use_cache): if use_cache: return cached_call(prompt) # Call the underlying function directly return cached_call.__wrapped__(prompt) class SummarizeNode(Node): def exec(self, text): return call_llm(f"Summarize: {text}", self.cur_retry==0) ``` - Enable logging: ```python def call_llm(prompt): import logging logging.info(f"Prompt: {prompt}") response = ... # Your implementation here logging.info(f"Response: {response}") return response ``` ================================================ File: docs/utility_function/text_to_speech.md ================================================ --- layout: default title: "Text-to-Speech" parent: "Utility Function" nav_order: 7 --- # Text-to-Speech | **Service** | **Free Tier** | **Pricing Model** | **Docs** | |----------------------|-----------------------|--------------------------------------------------------------|---------------------------------------------------------------------| | **Amazon Polly** | 5M std + 1M neural | ~$4 /M (std), ~$16 /M (neural) after free tier | [Polly Docs](https://aws.amazon.com/polly/) | | **Google Cloud TTS** | 4M std + 1M WaveNet | ~$4 /M (std), ~$16 /M (WaveNet) pay-as-you-go | [Cloud TTS Docs](https://cloud.google.com/text-to-speech) | | **Azure TTS** | 500K neural ongoing | ~$15 /M (neural), discount at higher volumes | [Azure TTS Docs](https://azure.microsoft.com/products/cognitive-services/text-to-speech/) | | **IBM Watson TTS** | 10K chars Lite plan | ~$0.02 /1K (i.e. ~$20 /M). Enterprise options available | [IBM Watson Docs](https://www.ibm.com/cloud/watson-text-to-speech) | | **ElevenLabs** | 10K chars monthly | From ~$5/mo (30K chars) up to $330/mo (2M chars). Enterprise | [ElevenLabs Docs](https://elevenlabs.io) | ## Example Python Code ### Amazon Polly ```python import boto3 polly = boto3.client("polly", region_name="us-east-1", aws_access_key_id="YOUR_AWS_ACCESS_KEY_ID", aws_secret_access_key="YOUR_AWS_SECRET_ACCESS_KEY") resp = polly.synthesize_speech( Text="Hello from Polly!", OutputFormat="mp3", VoiceId="Joanna" ) with open("polly.mp3", "wb") as f: f.write(resp["AudioStream"].read()) ``` ### Google Cloud TTS ```python from google.cloud import texttospeech client = texttospeech.TextToSpeechClient() input_text = texttospeech.SynthesisInput(text="Hello from Google Cloud TTS!") voice = texttospeech.VoiceSelectionParams(language_code="en-US") audio_cfg = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) resp = client.synthesize_speech(input=input_text, voice=voice, audio_config=audio_cfg) with open("gcloud_tts.mp3", "wb") as f: f.write(resp.audio_content) ``` ### Azure TTS ```python import azure.cognitiveservices.speech as speechsdk speech_config = speechsdk.SpeechConfig( subscription="AZURE_KEY", region="AZURE_REGION") audio_cfg = speechsdk.audio.AudioConfig(filename="azure_tts.wav") synthesizer = speechsdk.SpeechSynthesizer( speech_config=speech_config, audio_config=audio_cfg ) synthesizer.speak_text_async("Hello from Azure TTS!").get() ``` ### IBM Watson TTS ```python from ibm_watson import TextToSpeechV1 from ibm_cloud_sdk_core.authenticators import IAMAuthenticator auth = IAMAuthenticator("IBM_API_KEY") service = TextToSpeechV1(authenticator=auth) service.set_service_url("IBM_SERVICE_URL") resp = service.synthesize( "Hello from IBM Watson!", voice="en-US_AllisonV3Voice", accept="audio/mp3" ).get_result() with open("ibm_tts.mp3", "wb") as f: f.write(resp.content) ``` ### ElevenLabs ```python import requests api_key = "ELEVENLABS_KEY" voice_id = "ELEVENLABS_VOICE" url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" headers = {"xi-api-key": api_key, "Content-Type": "application/json"} json_data = { "text": "Hello from ElevenLabs!", "voice_settings": {"stability": 0.75, "similarity_boost": 0.75} } resp = requests.post(url, headers=headers, json=json_data) with open("elevenlabs.mp3", "wb") as f: f.write(resp.content) ``` ================================================ File: docs/utility_function/vector.md ================================================ --- layout: default title: "Vector Databases" parent: "Utility Function" nav_order: 6 --- # Vector Databases Below is a table of the popular vector search solutions: | **Tool** | **Free Tier** | **Pricing Model** | **Docs** | | --- | --- | --- | --- | | **FAISS** | N/A, self-host | Open-source | [Faiss.ai](https://faiss.ai) | | **Pinecone** | 2GB free | From $25/mo | [pinecone.io](https://pinecone.io) | | **Qdrant** | 1GB free cloud | Pay-as-you-go | [qdrant.tech](https://qdrant.tech) | | **Weaviate** | 14-day sandbox | From $25/mo | [weaviate.io](https://weaviate.io) | | **Milvus** | 5GB free cloud | PAYG or $99/mo dedicated | [milvus.io](https://milvus.io) | | **Chroma** | N/A, self-host | Free (Apache 2.0) | [trychroma.com](https://trychroma.com) | | **Redis** | 30MB free | From $5/mo | [redis.io](https://redis.io) | --- ## Example Python Code Below are basic usage snippets for each tool. ### FAISS ```python import faiss import numpy as np # Dimensionality of embeddings d = 128 # Create a flat L2 index index = faiss.IndexFlatL2(d) # Random vectors data = np.random.random((1000, d)).astype('float32') index.add(data) # Query query = np.random.random((1, d)).astype('float32') D, I = index.search(query, k=5) print("Distances:", D) print("Neighbors:", I) ``` ### Pinecone ```python import pinecone pinecone.init(api_key="YOUR_API_KEY", environment="YOUR_ENV") index_name = "my-index" # Create the index if it doesn't exist if index_name not in pinecone.list_indexes(): pinecone.create_index(name=index_name, dimension=128) # Connect index = pinecone.Index(index_name) # Upsert vectors = [ ("id1", [0.1]*128), ("id2", [0.2]*128) ] index.upsert(vectors) # Query response = index.query([[0.15]*128], top_k=3) print(response) ``` ### Qdrant ```python import qdrant_client from qdrant_client.models import Distance, VectorParams, PointStruct client = qdrant_client.QdrantClient( url="https://YOUR-QDRANT-CLOUD-ENDPOINT", api_key="YOUR_API_KEY" ) collection = "my_collection" client.recreate_collection( collection_name=collection, vectors_config=VectorParams(size=128, distance=Distance.COSINE) ) points = [ PointStruct(id=1, vector=[0.1]*128, payload={"type": "doc1"}), PointStruct(id=2, vector=[0.2]*128, payload={"type": "doc2"}), ] client.upsert(collection_name=collection, points=points) results = client.search( collection_name=collection, query_vector=[0.15]*128, limit=2 ) print(results) ``` ### Weaviate ```python import weaviate client = weaviate.Client("https://YOUR-WEAVIATE-CLOUD-ENDPOINT") schema = { "classes": [ { "class": "Article", "vectorizer": "none" } ] } client.schema.create(schema) obj = { "title": "Hello World", "content": "Weaviate vector search" } client.data_object.create(obj, "Article", vector=[0.1]*128) resp = ( client.query .get("Article", ["title", "content"]) .with_near_vector({"vector": [0.15]*128}) .with_limit(3) .do() ) print(resp) ``` ### Milvus ```python from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection import numpy as np connections.connect(alias="default", host="localhost", port="19530") fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128) ] schema = CollectionSchema(fields) collection = Collection("MyCollection", schema) emb = np.random.rand(10, 128).astype('float32') ids = list(range(10)) collection.insert([ids, emb]) index_params = { "index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2" } collection.create_index("embedding", index_params) collection.load() query_emb = np.random.rand(1, 128).astype('float32') results = collection.search(query_emb, "embedding", param={"nprobe": 10}, limit=3) print(results) ``` ### Chroma ```python import chromadb from chromadb.config import Settings client = chromadb.Client(Settings( chroma_db_impl="duckdb+parquet", persist_directory="./chroma_data" )) coll = client.create_collection("my_collection") vectors = [[0.1, 0.2, 0.3], [0.2, 0.2, 0.2]] metas = [{"doc": "text1"}, {"doc": "text2"}] ids = ["id1", "id2"] coll.add(embeddings=vectors, metadatas=metas, ids=ids) res = coll.query(query_embeddings=[[0.15, 0.25, 0.3]], n_results=2) print(res) ``` ### Redis ```python import redis import struct r = redis.Redis(host="localhost", port=6379) # Create index r.execute_command( "FT.CREATE", "my_idx", "ON", "HASH", "SCHEMA", "embedding", "VECTOR", "FLAT", "6", "TYPE", "FLOAT32", "DIM", "128", "DISTANCE_METRIC", "L2" ) # Insert vec = struct.pack('128f', *[0.1]*128) r.hset("doc1", mapping={"embedding": vec}) # Search qvec = struct.pack('128f', *[0.15]*128) q = "*=>[KNN 3 @embedding $BLOB AS dist]" res = r.ft("my_idx").search(q, query_params={"BLOB": qvec}) print(res.docs) ``` ================================================ File: docs/utility_function/viz.md ================================================ --- layout: default title: "Viz and Debug" parent: "Utility Function" nav_order: 2 --- # Visualization and Debugging Similar to LLM wrappers, we **don't** provide built-in visualization and debugging. Here, we recommend some *minimal* (and incomplete) implementations These examples can serve as a starting point for your own tooling. ## 1. Visualization with Mermaid This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization. {% raw %} ```python def build_mermaid(start): ids, visited, lines = {}, set(), ["graph LR"] ctr = 1 def get_id(n): nonlocal ctr return ids[n] if n in ids else (ids.setdefault(n, f"N{ctr}"), (ctr := ctr + 1))[0] def link(a, b): lines.append(f" {a} --> {b}") def walk(node, parent=None): if node in visited: return parent and link(parent, get_id(node)) visited.add(node) if isinstance(node, Flow): node.start and parent and link(parent, get_id(node.start)) lines.append(f"\n subgraph sub_flow_{get_id(node)}[{type(node).__name__}]") node.start and walk(node.start) for nxt in node.successors.values(): node.start and walk(nxt, get_id(node.start)) or (parent and link(parent, get_id(nxt))) or walk(nxt) lines.append(" end\n") else: lines.append(f" {(nid := get_id(node))}['{type(node).__name__}']") parent and link(parent, nid) [walk(nxt, nid) for nxt in node.successors.values()] walk(start) return "\n".join(lines) ``` {% endraw %} For example, suppose we have a complex Flow for data science: ```python class DataPrepBatchNode(BatchNode): def prep(self,shared): return [] class ValidateDataNode(Node): pass class FeatureExtractionNode(Node): pass class TrainModelNode(Node): pass class EvaluateModelNode(Node): pass class ModelFlow(Flow): pass class DataScienceFlow(Flow):pass feature_node = FeatureExtractionNode() train_node = TrainModelNode() evaluate_node = EvaluateModelNode() feature_node >> train_node >> evaluate_node model_flow = ModelFlow(start=feature_node) data_prep_node = DataPrepBatchNode() validate_node = ValidateDataNode() data_prep_node >> validate_node >> model_flow data_science_flow = DataScienceFlow(start=data_prep_node) result = build_mermaid(start=data_science_flow) ``` The code generates a Mermaid diagram: ```mermaid graph LR subgraph sub_flow_N1[DataScienceFlow] N2['DataPrepBatchNode'] N3['ValidateDataNode'] N2 --> N3 N3 --> N4 subgraph sub_flow_N5[ModelFlow] N4['FeatureExtractionNode'] N6['TrainModelNode'] N4 --> N6 N7['EvaluateModelNode'] N6 --> N7 end end ``` ## 2. Call Stack Debugging It would be useful to print the Node call stacks for debugging. This can be achieved by inspecting the runtime call stack: ```python import inspect def get_node_call_stack(): stack = inspect.stack() node_names = [] seen_ids = set() for frame_info in stack[1:]: local_vars = frame_info.frame.f_locals if 'self' in local_vars: caller_self = local_vars['self'] if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids: seen_ids.add(id(caller_self)) node_names.append(type(caller_self).__name__) return node_names ``` For example, suppose we have a complex Flow for data science: ```python class DataPrepBatchNode(BatchNode): def prep(self, shared): return [] class ValidateDataNode(Node): pass class FeatureExtractionNode(Node): pass class TrainModelNode(Node): pass class EvaluateModelNode(Node): def prep(self, shared): stack = get_node_call_stack() print("Call stack:", stack) class ModelFlow(Flow): pass class DataScienceFlow(Flow):pass feature_node = FeatureExtractionNode() train_node = TrainModelNode() evaluate_node = EvaluateModelNode() feature_node >> train_node >> evaluate_node model_flow = ModelFlow(start=feature_node) data_prep_node = DataPrepBatchNode() validate_node = ValidateDataNode() data_prep_node >> validate_node >> model_flow data_science_flow = DataScienceFlow(start=data_prep_node) data_science_flow.run({}) ``` The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']` ================================================ File: docs/utility_function/websearch.md ================================================ --- layout: default title: "Web Search" parent: "Utility Function" nav_order: 3 --- # Web Search We recommend some implementations of commonly used web search tools. | **API** | **Free Tier** | **Pricing Model** | **Docs** | |---------------------------------|-----------------------------------------------|-----------------------------------------------------------------|------------------------------------------------------------------------| | **Google Custom Search JSON API** | 100 queries/day free | $5 per 1000 queries. | [Link](https://developers.google.com/custom-search/v1/overview) | | **Bing Web Search API** | 1,000 queries/month | $15–$25 per 1,000 queries. | [Link](https://azure.microsoft.com/en-us/services/cognitive-services/bing-web-search-api/) | | **DuckDuckGo Instant Answer** | Completely free (Instant Answers only, **no URLs**) | No paid plans; usage unlimited, but data is limited | [Link](https://duckduckgo.com/api) | | **Brave Search API** | 2,000 queries/month free | $3 per 1k queries for Base, $5 per 1k for Pro | [Link](https://brave.com/search/api/) | | **SerpApi** | 100 searches/month free | Start at $75/month for 5,000 searches| [Link](https://serpapi.com/) | | **RapidAPI** | Many options | Many options | [Link](https://rapidapi.com/search?term=search&sortBy=ByRelevance) | ## Example Python Code ### 1. Google Custom Search JSON API ```python import requests API_KEY = "YOUR_API_KEY" CX_ID = "YOUR_CX_ID" query = "example" url = "https://www.googleapis.com/customsearch/v1" params = { "key": API_KEY, "cx": CX_ID, "q": query } response = requests.get(url, params=params) results = response.json() print(results) ``` ### 2. Bing Web Search API ```python import requests SUBSCRIPTION_KEY = "YOUR_BING_API_KEY" query = "example" url = "https://api.bing.microsoft.com/v7.0/search" headers = {"Ocp-Apim-Subscription-Key": SUBSCRIPTION_KEY} params = {"q": query} response = requests.get(url, headers=headers, params=params) results = response.json() print(results) ``` ### 3. DuckDuckGo Instant Answer ```python import requests query = "example" url = "https://api.duckduckgo.com/" params = { "q": query, "format": "json" } response = requests.get(url, params=params) results = response.json() print(results) ``` ### 4. Brave Search API ```python import requests SUBSCRIPTION_TOKEN = "YOUR_BRAVE_API_TOKEN" query = "example" url = "https://api.search.brave.com/res/v1/web/search" headers = { "X-Subscription-Token": SUBSCRIPTION_TOKEN } params = { "q": query } response = requests.get(url, headers=headers, params=params) results = response.json() print(results) ``` ### 5. SerpApi ```python import requests API_KEY = "YOUR_SERPAPI_KEY" query = "example" url = "https://serpapi.com/search" params = { "engine": "google", "q": query, "api_key": API_KEY } response = requests.get(url, params=params) results = response.json() print(results) ``` ================================================ File: docs/_config.yml ================================================ # Basic site settings title: Pocket Flow tagline: A 100-line LLM framework description: Minimalist LLM Framework in 100 Lines, Enabling LLMs to Program Themselves # Theme settings remote_theme: just-the-docs/just-the-docs # Navigation nav_sort: case_sensitive # Aux links (shown in upper right) aux_links: "View on GitHub": - "//github.com/the-pocket/PocketFlow" # Color scheme color_scheme: light # Author settings author: name: Zachary Huang url: https://www.columbia.edu/~zh2408/ twitter: ZacharyHuang12 # Mermaid settings mermaid: version: "9.1.3" # Pick the version you want # Default configuration config: | directionLR # Callouts settings callouts: warning: title: Warning color: red note: title: Note color: blue best-practice: title: Best Practice color: green # The custom navigation nav: - Home: index.md # Link to your main docs index - GitHub: "https://github.com/the-pocket/PocketFlow" - Discord: "https://discord.gg/hUHHE9Sa6T"