add emphasis for batch mode that the data is passed in params, not shared store.

This commit is contained in:
BO WEN 2025-04-30 11:38:07 -04:00
parent 000dc61ef5
commit b561a10c76
1 changed files with 80 additions and 10 deletions

View File

@ -52,30 +52,75 @@ flow.run(shared)
A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set.
### Key Differences from BatchNode
**Important**: Unlike BatchNode, which processes items and modifies the shared store:
1. BatchFlow returns **parameters to pass to the child Flow**, not data to process
2. These parameters are accessed in child nodes via `self.params`, not from the shared store
3. Each child Flow runs independently with a different set of parameters
4. Child nodes can be regular Nodes, not BatchNodes (the batching happens at the Flow level)
### Example: Summarize Many Files ### Example: Summarize Many Files
```python ```python
class SummarizeAllFiles(BatchFlow): class SummarizeAllFiles(BatchFlow):
def prep(self, shared): def prep(self, shared):
# Return a list of param dicts (one per file) # IMPORTANT: Return a list of param dictionaries (not data for processing)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames] return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): # Child node that accesses filename from params, not shared store
summarize_file = SummarizeFile(start=load_file) class LoadFile(Node):
def prep(self, shared):
# Access filename from params (not from shared)
filename = self.params["filename"] # Important! Use self.params, not shared
return filename
def exec(self, filename):
with open(filename, 'r') as f:
return f.read()
def post(self, shared, prep_res, exec_res):
# Store file content in shared
shared["current_file_content"] = exec_res
return "default"
# Wrap that flow into a BatchFlow: # Summarize node that works on the currently loaded file
class Summarize(Node):
def prep(self, shared):
return shared["current_file_content"]
def exec(self, content):
prompt = f"Summarize this file in 50 words: {content}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
# Store summary in shared, indexed by current filename
filename = self.params["filename"] # Again, using params
if "summaries" not in shared:
shared["summaries"] = {}
shared["summaries"][filename] = exec_res
return "default"
# Create a per-file flow
load_file = LoadFile()
summarize = Summarize()
load_file >> summarize
summarize_file = Flow(start=load_file)
# Wrap in a BatchFlow to process all files
summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared) summarize_all_files.run(shared)
``` ```
### Under the Hood ### Under the Hood
1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. 1. `prep(shared)` in the BatchFlow returns a list of param dicts—e.g., `[{"filename": "file1.txt"}, {"filename": "file2.txt"}, ...]`.
2. The **BatchFlow** loops through each dict. For each one: 2. The **BatchFlow** loops through each dict. For each one:
- It merges the dict with the BatchFlows own `params`. - It merges the dict with the BatchFlow's own `params` (if any): `{**batch_flow.params, **dict_from_prep}`
- It calls `flow.run(shared)` using the merged result. - It calls `flow.run(shared)` using the merged parameters
3. This means the sub-Flow is run **repeatedly**, once for every param dict. - **IMPORTANT**: These parameters are passed to the child Flow's nodes via `self.params`, NOT via the shared store
3. This means the sub-Flow is run **repeatedly**, once for every param dict, with each node in the flow accessing the parameters via `self.params`.
--- ---
@ -91,6 +136,7 @@ At each level, **BatchFlow** merges its own param dict with the parents. By t
class FileBatchFlow(BatchFlow): class FileBatchFlow(BatchFlow):
def prep(self, shared): def prep(self, shared):
# Access directory from params (set by parent)
directory = self.params["directory"] directory = self.params["directory"]
# e.g., files = ["file1.txt", "file2.txt", ...] # e.g., files = ["file1.txt", "file2.txt", ...]
files = [f for f in os.listdir(directory) if f.endswith(".txt")] files = [f for f in os.listdir(directory) if f.endswith(".txt")]
@ -101,7 +147,31 @@ class DirectoryBatchFlow(BatchFlow):
directories = [ "/path/to/dirA", "/path/to/dirB"] directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories] return [{"directory": d} for d in directories]
# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} # The actual processing node
inner_flow = FileBatchFlow(start=MapSummaries()) class ProcessFile(Node):
def prep(self, shared):
# Access both directory and filename from params
directory = self.params["directory"] # From outer batch
filename = self.params["filename"] # From inner batch
full_path = os.path.join(directory, filename)
return full_path
def exec(self, full_path):
# Process the file...
return f"Processed {full_path}"
def post(self, shared, prep_res, exec_res):
# Store results, perhaps indexed by path
if "results" not in shared:
shared["results"] = {}
shared["results"][prep_res] = exec_res
return "default"
# Set up the nested batch structure
process_node = ProcessFile()
inner_flow = FileBatchFlow(start=process_node)
outer_flow = DirectoryBatchFlow(start=inner_flow) outer_flow = DirectoryBatchFlow(start=inner_flow)
# Run it
outer_flow.run(shared)
``` ```