From 84fbfdec3db21265739d0384f9fe7b38738316b2 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Sun, 16 Feb 2025 14:26:17 -0500 Subject: [PATCH] update batch doc --- docs/batch.md | 12 ++++----- docs/communication.md | 59 ++++++++++++++++++------------------------- docs/flow.md | 15 ++++++----- docs/node.md | 2 +- 4 files changed, 40 insertions(+), 48 deletions(-) diff --git a/docs/batch.md b/docs/batch.md index b263845..6d7e0c2 100644 --- a/docs/batch.md +++ b/docs/batch.md @@ -7,10 +7,9 @@ nav_order: 4 # Batch -**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Handy for: +**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: - **Chunk-based** processing (e.g., splitting large texts). -- **Multi-file** processing. -- **Iterating** over lists of params (e.g., user queries, documents, URLs). +- **Iterative** processing over lists of input items (e.g., user queries, files, URLs). ## 1. BatchNode @@ -27,7 +26,7 @@ A **BatchNode** extends `Node` but changes `prep()` and `exec()`: class MapSummaries(BatchNode): def prep(self, shared): # Suppose we have a big file; chunk it - content = shared["data"].get("large_text.txt", "") + content = shared["data"] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] return chunks @@ -39,7 +38,7 @@ class MapSummaries(BatchNode): def post(self, shared, prep_res, exec_res_list): combined = "\n".join(exec_res_list) - shared["summary"]["large_text.txt"] = combined + shared["summary"] = combined return "default" map_summaries = MapSummaries() @@ -93,6 +92,7 @@ At each level, **BatchFlow** merges its own param dict with the parent’s. By t class FileBatchFlow(BatchFlow): def prep(self, shared): directory = self.params["directory"] + # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files] @@ -101,7 +101,7 @@ class DirectoryBatchFlow(BatchFlow): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] - +# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} inner_flow = FileBatchFlow(start=MapSummaries()) outer_flow = DirectoryBatchFlow(start=inner_flow) ``` \ No newline at end of file diff --git a/docs/communication.md b/docs/communication.md index bbb0648..5e48faa 100644 --- a/docs/communication.md +++ b/docs/communication.md @@ -9,17 +9,22 @@ nav_order: 3 Nodes and Flows **communicate** in two ways: -1. **Shared Store** – A global data structure (often an in-mem dict) that all nodes can read and write. Every Node's `prep()` and `post()` methods receive the **same** `shared` store. -2. **Params** – Each node and Flow has a unique `params` dict assigned by the **parent Flow**, typically used as an identifier for tasks. It’s strongly recommended to keep parameter keys and values **immutable**. +1. **Shared Store (recommended)** + + - A global data structure (often an in-mem dict) that all nodes can read and write by `prep()` and `post()`. + - Great for data results, large content, or anything multiple nodes need. + - You shall design the data structure and populate it ahead. + + +2. **Params (only for [Batch](./batch.md))** + - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. + - Good for identifiers like filenames or numeric IDs, in Batch mode. If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). - -> **Why not use other communication models like Message Passing?** +> **Best Practice:** Use `Shared Store` for almost all cases. It's flexible and easy to manage. It separates data storage from data processing, making the code more readable and easier to maintain. > -> At a *low-level* between nodes, *Message Passing* works fine for simple DAGs, but in nested or cyclic Flows it gets unwieldy. A shared store keeps things straightforward. -> -> That said, *high-level* multi-agent patterns like *Message Passing* and *Event-Driven Design* can still be layered on top via *Async Queues or Pub/Sub* in a shared store (see [Multi-Agents](./multi_agent.md)). +> `Params` is more a syntax sugar for [Batch](./batch.md). {: .note } --- @@ -39,24 +44,25 @@ It can also contain local file handlers, DB connections, or a combination for pe ```python class LoadData(Node): - def prep(self, shared): - # Suppose we read from disk or an API - shared["data"]["my_file.txt"] = "Some text content" + def post(self, shared, prep_res, exec_res): + # We write data to shared store + shared["data"] = "Some text content" return None class Summarize(Node): def prep(self, shared): - # We can read what LoadData wrote - content = shared["data"].get("my_file.txt", "") - return content + # We read data from shared store + return shared["data"] def exec(self, prep_res): + # Call LLM to summarize prompt = f"Summarize: {prep_res}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): - shared["summary"]["my_file.txt"] = exec_res + # We write summary to shared store + shared["summary"] = exec_res return "default" load_data = LoadData() @@ -70,20 +76,21 @@ flow.run(shared) Here: - `LoadData` writes to `shared["data"]`. -- `Summarize` reads from the same location. -No special data-passing—just the same `shared` object. +- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. --- ## 2. Params **Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: -- **Immutable** during a Node’s run cycle (i.e., they don’t change mid-`prep`, `exec`, `post`). +- **Immutable** during a Node’s run cycle (i.e., they don’t change mid-`prep->exec->post`). - **Set** via `set_params()`. - **Cleared** and updated each time a parent Flow calls it. -> Only set the uppermost Flow params because others will be overwritten by the parent Flow. If you need to set child node params, see [Batch](./batch.md). +> Only set the uppermost Flow params because others will be overwritten by the parent Flow. +> +> If you need to set child node params, see [Batch](./batch.md). {: .warning } Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. @@ -123,19 +130,3 @@ flow.run(shared) # The node summarizes doc2, not doc1 ``` --- - -## 3. Shared Store vs. Params - -Think of the **Shared Store** like a heap and **Params** like a stack. - -- **Shared Store**: - - Public, global. - - You can design and populate ahead, e.g., for the input to process. - - Great for data results, large content, or anything multiple nodes need. - - Keep it tidy—structure it carefully (like a mini schema). - -- **Params**: - - Local, ephemeral. - - Passed in by parent Flows. You should only set it for the uppermost flow. - - Perfect for small values like filenames or numeric IDs. - - Do **not** persist across different nodes and are reset. \ No newline at end of file diff --git a/docs/flow.md b/docs/flow.md index 8112303..70dad46 100644 --- a/docs/flow.md +++ b/docs/flow.md @@ -101,6 +101,14 @@ A **Flow** can act like a Node, which enables powerful composition patterns. Thi 2. Combine multiple smaller Flows into a larger Flow for reuse. 3. Node `params` will be a merging of **all** parents' `params`. +### Flow's Node Methods + +A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: + +- It **won't** run `exec()`, as its main logic is to orchestrate its nodes. +- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. + + ### Basic Flow Nesting Here's how to connect a flow to another node: @@ -171,10 +179,3 @@ flowchart LR end ``` -### Flow's Node Methods - -A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - -- It **won't** run `exec()`, as its main logic is to orchestrate other nodes. -- `post()` always receives None for exec_res and should instead get the flow execution results from the shared store. - diff --git a/docs/node.md b/docs/node.md index 7bb4b28..9554cf0 100644 --- a/docs/node.md +++ b/docs/node.md @@ -26,7 +26,7 @@ A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->po - Examples: *update DB, change states, log results*. - **Decide the next action** by returning a *string* (`action = "default"` if *None*). -> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data model are operated separately from the business logic on them. +> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. > > All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. {: .note }