diff --git a/docs/core_abstraction/batch.md b/docs/core_abstraction/batch.md index 1ddd7dd..35063bd 100644 --- a/docs/core_abstraction/batch.md +++ b/docs/core_abstraction/batch.md @@ -52,30 +52,75 @@ flow.run(shared) A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. +### Key Differences from BatchNode + +**Important**: Unlike BatchNode, which processes items and modifies the shared store: + +1. BatchFlow returns **parameters to pass to the child Flow**, not data to process +2. These parameters are accessed in child nodes via `self.params`, not from the shared store +3. Each child Flow runs independently with a different set of parameters +4. Child nodes can be regular Nodes, not BatchNodes (the batching happens at the Flow level) ### Example: Summarize Many Files ```python class SummarizeAllFiles(BatchFlow): def prep(self, shared): - # Return a list of param dicts (one per file) + # IMPORTANT: Return a list of param dictionaries (not data for processing) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames] -# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): -summarize_file = SummarizeFile(start=load_file) +# Child node that accesses filename from params, not shared store +class LoadFile(Node): + def prep(self, shared): + # Access filename from params (not from shared) + filename = self.params["filename"] # Important! Use self.params, not shared + return filename + + def exec(self, filename): + with open(filename, 'r') as f: + return f.read() + + def post(self, shared, prep_res, exec_res): + # Store file content in shared + shared["current_file_content"] = exec_res + return "default" -# Wrap that flow into a BatchFlow: +# Summarize node that works on the currently loaded file +class Summarize(Node): + def prep(self, shared): + return shared["current_file_content"] + + def exec(self, content): + prompt = f"Summarize this file in 50 words: {content}" + return call_llm(prompt) + + def post(self, shared, prep_res, exec_res): + # Store summary in shared, indexed by current filename + filename = self.params["filename"] # Again, using params + if "summaries" not in shared: + shared["summaries"] = {} + shared["summaries"][filename] = exec_res + return "default" + +# Create a per-file flow +load_file = LoadFile() +summarize = Summarize() +load_file >> summarize +summarize_file = Flow(start=load_file) + +# Wrap in a BatchFlow to process all files summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files.run(shared) ``` ### Under the Hood -1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. +1. `prep(shared)` in the BatchFlow returns a list of param dicts—e.g., `[{"filename": "file1.txt"}, {"filename": "file2.txt"}, ...]`. 2. The **BatchFlow** loops through each dict. For each one: - - It merges the dict with the BatchFlow’s own `params`. - - It calls `flow.run(shared)` using the merged result. -3. This means the sub-Flow is run **repeatedly**, once for every param dict. + - It merges the dict with the BatchFlow's own `params` (if any): `{**batch_flow.params, **dict_from_prep}` + - It calls `flow.run(shared)` using the merged parameters + - **IMPORTANT**: These parameters are passed to the child Flow's nodes via `self.params`, NOT via the shared store +3. This means the sub-Flow is run **repeatedly**, once for every param dict, with each node in the flow accessing the parameters via `self.params`. --- @@ -91,6 +136,7 @@ At each level, **BatchFlow** merges its own param dict with the parent’s. By t class FileBatchFlow(BatchFlow): def prep(self, shared): + # Access directory from params (set by parent) directory = self.params["directory"] # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] @@ -101,7 +147,31 @@ class DirectoryBatchFlow(BatchFlow): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] -# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} -inner_flow = FileBatchFlow(start=MapSummaries()) +# The actual processing node +class ProcessFile(Node): + def prep(self, shared): + # Access both directory and filename from params + directory = self.params["directory"] # From outer batch + filename = self.params["filename"] # From inner batch + full_path = os.path.join(directory, filename) + return full_path + + def exec(self, full_path): + # Process the file... + return f"Processed {full_path}" + + def post(self, shared, prep_res, exec_res): + # Store results, perhaps indexed by path + if "results" not in shared: + shared["results"] = {} + shared["results"][prep_res] = exec_res + return "default" + +# Set up the nested batch structure +process_node = ProcessFile() +inner_flow = FileBatchFlow(start=process_node) outer_flow = DirectoryBatchFlow(start=inner_flow) + +# Run it +outer_flow.run(shared) ```