update batch doc
This commit is contained in:
parent
3f91b0adbd
commit
84fbfdec3d
|
|
@ -7,10 +7,9 @@ nav_order: 4
|
||||||
|
|
||||||
# Batch
|
# Batch
|
||||||
|
|
||||||
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Handy for:
|
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases:
|
||||||
- **Chunk-based** processing (e.g., splitting large texts).
|
- **Chunk-based** processing (e.g., splitting large texts).
|
||||||
- **Multi-file** processing.
|
- **Iterative** processing over lists of input items (e.g., user queries, files, URLs).
|
||||||
- **Iterating** over lists of params (e.g., user queries, documents, URLs).
|
|
||||||
|
|
||||||
## 1. BatchNode
|
## 1. BatchNode
|
||||||
|
|
||||||
|
|
@ -27,7 +26,7 @@ A **BatchNode** extends `Node` but changes `prep()` and `exec()`:
|
||||||
class MapSummaries(BatchNode):
|
class MapSummaries(BatchNode):
|
||||||
def prep(self, shared):
|
def prep(self, shared):
|
||||||
# Suppose we have a big file; chunk it
|
# Suppose we have a big file; chunk it
|
||||||
content = shared["data"].get("large_text.txt", "")
|
content = shared["data"]
|
||||||
chunk_size = 10000
|
chunk_size = 10000
|
||||||
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
|
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
|
||||||
return chunks
|
return chunks
|
||||||
|
|
@ -39,7 +38,7 @@ class MapSummaries(BatchNode):
|
||||||
|
|
||||||
def post(self, shared, prep_res, exec_res_list):
|
def post(self, shared, prep_res, exec_res_list):
|
||||||
combined = "\n".join(exec_res_list)
|
combined = "\n".join(exec_res_list)
|
||||||
shared["summary"]["large_text.txt"] = combined
|
shared["summary"] = combined
|
||||||
return "default"
|
return "default"
|
||||||
|
|
||||||
map_summaries = MapSummaries()
|
map_summaries = MapSummaries()
|
||||||
|
|
@ -93,6 +92,7 @@ At each level, **BatchFlow** merges its own param dict with the parent’s. By t
|
||||||
class FileBatchFlow(BatchFlow):
|
class FileBatchFlow(BatchFlow):
|
||||||
def prep(self, shared):
|
def prep(self, shared):
|
||||||
directory = self.params["directory"]
|
directory = self.params["directory"]
|
||||||
|
# e.g., files = ["file1.txt", "file2.txt", ...]
|
||||||
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
|
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
|
||||||
return [{"filename": f} for f in files]
|
return [{"filename": f} for f in files]
|
||||||
|
|
||||||
|
|
@ -101,7 +101,7 @@ class DirectoryBatchFlow(BatchFlow):
|
||||||
directories = [ "/path/to/dirA", "/path/to/dirB"]
|
directories = [ "/path/to/dirA", "/path/to/dirB"]
|
||||||
return [{"directory": d} for d in directories]
|
return [{"directory": d} for d in directories]
|
||||||
|
|
||||||
|
# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"}
|
||||||
inner_flow = FileBatchFlow(start=MapSummaries())
|
inner_flow = FileBatchFlow(start=MapSummaries())
|
||||||
outer_flow = DirectoryBatchFlow(start=inner_flow)
|
outer_flow = DirectoryBatchFlow(start=inner_flow)
|
||||||
```
|
```
|
||||||
|
|
@ -9,17 +9,22 @@ nav_order: 3
|
||||||
|
|
||||||
Nodes and Flows **communicate** in two ways:
|
Nodes and Flows **communicate** in two ways:
|
||||||
|
|
||||||
1. **Shared Store** – A global data structure (often an in-mem dict) that all nodes can read and write. Every Node's `prep()` and `post()` methods receive the **same** `shared` store.
|
1. **Shared Store (recommended)**
|
||||||
2. **Params** – Each node and Flow has a unique `params` dict assigned by the **parent Flow**, typically used as an identifier for tasks. It’s strongly recommended to keep parameter keys and values **immutable**.
|
|
||||||
|
- A global data structure (often an in-mem dict) that all nodes can read and write by `prep()` and `post()`.
|
||||||
|
- Great for data results, large content, or anything multiple nodes need.
|
||||||
|
- You shall design the data structure and populate it ahead.
|
||||||
|
|
||||||
|
|
||||||
|
2. **Params (only for [Batch](./batch.md))**
|
||||||
|
- Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**.
|
||||||
|
- Good for identifiers like filenames or numeric IDs, in Batch mode.
|
||||||
|
|
||||||
If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller).
|
If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller).
|
||||||
|
|
||||||
|
> **Best Practice:** Use `Shared Store` for almost all cases. It's flexible and easy to manage. It separates data storage from data processing, making the code more readable and easier to maintain.
|
||||||
> **Why not use other communication models like Message Passing?**
|
|
||||||
>
|
>
|
||||||
> At a *low-level* between nodes, *Message Passing* works fine for simple DAGs, but in nested or cyclic Flows it gets unwieldy. A shared store keeps things straightforward.
|
> `Params` is more a syntax sugar for [Batch](./batch.md).
|
||||||
>
|
|
||||||
> That said, *high-level* multi-agent patterns like *Message Passing* and *Event-Driven Design* can still be layered on top via *Async Queues or Pub/Sub* in a shared store (see [Multi-Agents](./multi_agent.md)).
|
|
||||||
{: .note }
|
{: .note }
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
@ -39,24 +44,25 @@ It can also contain local file handlers, DB connections, or a combination for pe
|
||||||
|
|
||||||
```python
|
```python
|
||||||
class LoadData(Node):
|
class LoadData(Node):
|
||||||
def prep(self, shared):
|
def post(self, shared, prep_res, exec_res):
|
||||||
# Suppose we read from disk or an API
|
# We write data to shared store
|
||||||
shared["data"]["my_file.txt"] = "Some text content"
|
shared["data"] = "Some text content"
|
||||||
return None
|
return None
|
||||||
|
|
||||||
class Summarize(Node):
|
class Summarize(Node):
|
||||||
def prep(self, shared):
|
def prep(self, shared):
|
||||||
# We can read what LoadData wrote
|
# We read data from shared store
|
||||||
content = shared["data"].get("my_file.txt", "")
|
return shared["data"]
|
||||||
return content
|
|
||||||
|
|
||||||
def exec(self, prep_res):
|
def exec(self, prep_res):
|
||||||
|
# Call LLM to summarize
|
||||||
prompt = f"Summarize: {prep_res}"
|
prompt = f"Summarize: {prep_res}"
|
||||||
summary = call_llm(prompt)
|
summary = call_llm(prompt)
|
||||||
return summary
|
return summary
|
||||||
|
|
||||||
def post(self, shared, prep_res, exec_res):
|
def post(self, shared, prep_res, exec_res):
|
||||||
shared["summary"]["my_file.txt"] = exec_res
|
# We write summary to shared store
|
||||||
|
shared["summary"] = exec_res
|
||||||
return "default"
|
return "default"
|
||||||
|
|
||||||
load_data = LoadData()
|
load_data = LoadData()
|
||||||
|
|
@ -70,20 +76,21 @@ flow.run(shared)
|
||||||
|
|
||||||
Here:
|
Here:
|
||||||
- `LoadData` writes to `shared["data"]`.
|
- `LoadData` writes to `shared["data"]`.
|
||||||
- `Summarize` reads from the same location.
|
- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`.
|
||||||
No special data-passing—just the same `shared` object.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 2. Params
|
## 2. Params
|
||||||
|
|
||||||
**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are:
|
**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are:
|
||||||
- **Immutable** during a Node’s run cycle (i.e., they don’t change mid-`prep`, `exec`, `post`).
|
- **Immutable** during a Node’s run cycle (i.e., they don’t change mid-`prep->exec->post`).
|
||||||
- **Set** via `set_params()`.
|
- **Set** via `set_params()`.
|
||||||
- **Cleared** and updated each time a parent Flow calls it.
|
- **Cleared** and updated each time a parent Flow calls it.
|
||||||
|
|
||||||
|
|
||||||
> Only set the uppermost Flow params because others will be overwritten by the parent Flow. If you need to set child node params, see [Batch](./batch.md).
|
> Only set the uppermost Flow params because others will be overwritten by the parent Flow.
|
||||||
|
>
|
||||||
|
> If you need to set child node params, see [Batch](./batch.md).
|
||||||
{: .warning }
|
{: .warning }
|
||||||
|
|
||||||
Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
|
Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
|
||||||
|
|
@ -123,19 +130,3 @@ flow.run(shared) # The node summarizes doc2, not doc1
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 3. Shared Store vs. Params
|
|
||||||
|
|
||||||
Think of the **Shared Store** like a heap and **Params** like a stack.
|
|
||||||
|
|
||||||
- **Shared Store**:
|
|
||||||
- Public, global.
|
|
||||||
- You can design and populate ahead, e.g., for the input to process.
|
|
||||||
- Great for data results, large content, or anything multiple nodes need.
|
|
||||||
- Keep it tidy—structure it carefully (like a mini schema).
|
|
||||||
|
|
||||||
- **Params**:
|
|
||||||
- Local, ephemeral.
|
|
||||||
- Passed in by parent Flows. You should only set it for the uppermost flow.
|
|
||||||
- Perfect for small values like filenames or numeric IDs.
|
|
||||||
- Do **not** persist across different nodes and are reset.
|
|
||||||
15
docs/flow.md
15
docs/flow.md
|
|
@ -101,6 +101,14 @@ A **Flow** can act like a Node, which enables powerful composition patterns. Thi
|
||||||
2. Combine multiple smaller Flows into a larger Flow for reuse.
|
2. Combine multiple smaller Flows into a larger Flow for reuse.
|
||||||
3. Node `params` will be a merging of **all** parents' `params`.
|
3. Node `params` will be a merging of **all** parents' `params`.
|
||||||
|
|
||||||
|
### Flow's Node Methods
|
||||||
|
|
||||||
|
A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However:
|
||||||
|
|
||||||
|
- It **won't** run `exec()`, as its main logic is to orchestrate its nodes.
|
||||||
|
- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store.
|
||||||
|
|
||||||
|
|
||||||
### Basic Flow Nesting
|
### Basic Flow Nesting
|
||||||
|
|
||||||
Here's how to connect a flow to another node:
|
Here's how to connect a flow to another node:
|
||||||
|
|
@ -171,10 +179,3 @@ flowchart LR
|
||||||
end
|
end
|
||||||
```
|
```
|
||||||
|
|
||||||
### Flow's Node Methods
|
|
||||||
|
|
||||||
A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However:
|
|
||||||
|
|
||||||
- It **won't** run `exec()`, as its main logic is to orchestrate other nodes.
|
|
||||||
- `post()` always receives None for exec_res and should instead get the flow execution results from the shared store.
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->po
|
||||||
- Examples: *update DB, change states, log results*.
|
- Examples: *update DB, change states, log results*.
|
||||||
- **Decide the next action** by returning a *string* (`action = "default"` if *None*).
|
- **Decide the next action** by returning a *string* (`action = "default"` if *None*).
|
||||||
|
|
||||||
> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data model are operated separately from the business logic on them.
|
> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately.
|
||||||
>
|
>
|
||||||
> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data.
|
> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data.
|
||||||
{: .note }
|
{: .note }
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue