clarify nested flow

This commit is contained in:
zachary62 2024-12-29 04:15:27 +00:00
parent f649650d1e
commit 67bf973455
3 changed files with 34 additions and 29 deletions

View File

@ -7,23 +7,23 @@ nav_order: 4
# Batch # Batch
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Useful for: **Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Handy for:
- **Chunk-based** processing (e.g., large texts in parts). - **Chunk-based** processing (e.g., splitting large texts).
- **Multi-file** processing. - **Multi-file** processing.
- **Iterating** over lists of params (e.g., user queries, documents, URLs). - **Iterating** over lists of params (e.g., user queries, documents, URLs).
## 1. BatchNode ## 1. BatchNode
A **BatchNode** extends `Node` but changes `prep()` and `exec()`: A **BatchNode** extends `Node` but changes `prep()` and `exec()`:
- **`prep(shared)`**: returns an **iterable** (list, generator, etc.). - **`prep(shared)`**: returns an **iterable** (e.g., list, generator).
- **`exec(shared, item)`**: called **once** per item in that iterable. - **`exec(item)`**: called **once** per item in that iterable.
- **`post(shared, prep_res, exec_res_list)`**: receives a **list** of all `exec()` results, can combine or store them, and returns an **Action**. - **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**.
### Example: Summarize a Large File ### Example: Summarize a Large File
```python ``python
class MapSummaries(BatchNode): class MapSummaries(BatchNode):
def prep(self, shared): def prep(self, shared):
# Suppose we have a big file; chunk it # Suppose we have a big file; chunk it
@ -32,7 +32,7 @@ class MapSummaries(BatchNode):
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks return chunks
def exec(self, shared, chunk): def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}" prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt) summary = call_llm(prompt)
return summary return summary
@ -45,40 +45,44 @@ class MapSummaries(BatchNode):
map_summaries = MapSummaries() map_summaries = MapSummaries()
flow = Flow(start=map_summaries) flow = Flow(start=map_summaries)
flow.run(shared) flow.run(shared)
``` ``python
--- ---
## 2. BatchFlow ## 2. BatchFlow
.
A **BatchFlow** runs a **Flow** multiple times, each with different `params`. Think of it as a loop that replays the Flow for each param set. A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set.
### Example: Summarize Many Files ### Example: Summarize Many Files
```python ``python
class SummarizeAllFiles(BatchFlow): class SummarizeAllFiles(BatchFlow):
def prep(self, shared): def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames] return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file flow: # Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
# load_file >> summarize >> reduce etc.
summarize_file = SummarizeFile(start=load_file) summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared) summarize_all_files.run(shared)
``` ``python
**Under the hood**: **Under the Hood**:
1. `prep(shared)` returns a list of param dicts (e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`). 1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`.
2. The BatchFlow **iterates** over them, sets params on the sub-Flow, and calls `flow.run(shared)` each time. 2. The **BatchFlow** loops through each dict. For each one:
3. The Flow is run repeatedly, once per item. - It merges the dict with the BatchFlows own `params`.
- It calls `flow.run(shared)` using the merged result.
3. This means the sub-Flow is run **repeatedly**, once for every param dict.
---
### Nested or Multi-level Batches ### Nested or Multi-Level Batches
You can nest a BatchFlow in another BatchFlow. For example: You can nest a **BatchFlow** in another **BatchFlow**. For instance:
- Outer batch: iterate over directories. - **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...).
- Inner batch: summarize each file in a directory. - **Inner** batch: returning a list of per-file param dicts.
The **outer** BatchFlows `exec()` can return a list of directories; the **inner** BatchFlow then processes each file in those dirs. At each level, **BatchFlow** merges its own param dict with the parents. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.

View File

@ -92,9 +92,9 @@ Always use `flow.run(...)` in production to ensure the full pipeline runs correc
A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: A **Flow** can act like a Node, which enables powerful composition patterns. This means you can:
1. Use a flow as a node within another flow's transitions 1. Use a Flow as a Node within another Flow's transitions.
2. Combine multiple smaller flows into a larger pipeline 2. Combine multiple smaller Flows into a larger Flow for reuse.
3. Create reusable flow components 3. Node `params` will be a merging of **all** parents' `params`.
### Basic Flow Nesting ### Basic Flow Nesting

View File

@ -36,7 +36,7 @@ my_node = SummarizeFile(max_retries=3)
When an exception occurs in `exec()`, the Node automatically retries until: When an exception occurs in `exec()`, the Node automatically retries until:
- It either succeeds, **or** - It either succeeds, or
- The Node has retried `max_retries - 1` times already and fails on the last attempt. - The Node has retried `max_retries - 1` times already and fails on the last attempt.
If you want to **gracefully handle** the error rather than raising it, you can override: If you want to **gracefully handle** the error rather than raising it, you can override:
@ -46,7 +46,8 @@ def process_after_fail(self, shared, prep_res, exc):
raise exc raise exc
``` ```
By **default**, it just re-raises `exc`. But you can return a fallback result instead. That fallback result becomes the `exec_res` passed to `post()`. By default, it just re-raises `exc`. But you can return a fallback result instead.
That fallback result becomes the `exec_res` passed to `post()`.
## Example ## Example