diff --git a/assets/mapreduce.png b/assets/mapreduce.png new file mode 100644 index 0000000..5d84753 Binary files /dev/null and b/assets/mapreduce.png differ diff --git a/assets/workflow.png b/assets/workflow.png new file mode 100644 index 0000000..27239d0 Binary files /dev/null and b/assets/workflow.png differ diff --git a/docs/design_pattern/mapreduce.md b/docs/design_pattern/mapreduce.md index fb767a1..61dce0c 100644 --- a/docs/design_pattern/mapreduce.md +++ b/docs/design_pattern/mapreduce.md @@ -12,27 +12,59 @@ MapReduce is a design pattern suitable when you have either: - Large output data (e.g., multiple forms to fill) and there is a logical way to break the task into smaller, ideally independent parts. + +
+ +
+ You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. ### Example: Document Summarization ```python -class MapSummaries(BatchNode): - def prep(self, shared): return [shared["text"][i:i+10000] for i in range(0, len(shared["text"]), 10000)] - def exec(self, chunk): return call_llm(f"Summarize this chunk: {chunk}") - def post(self, shared, prep_res, exec_res_list): shared["summaries"] = exec_res_list +class SummarizeAllFiles(BatchNode): + def prep(self, shared): + files_dict = shared["files"] # e.g. 10 files + return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] -class ReduceSummaries(Node): - def prep(self, shared): return shared["summaries"] - def exec(self, summaries): return call_llm(f"Combine these summaries: {summaries}") - def post(self, shared, prep_res, exec_res): shared["final_summary"] = exec_res + def exec(self, one_file): + filename, file_content = one_file + summary_text = call_llm(f"Summarize the following file:\n{file_content}") + return (filename, summary_text) -# Connect nodes -map_node = MapSummaries() -reduce_node = ReduceSummaries() -map_node >> reduce_node + def post(self, shared, prep_res, exec_res_list): + shared["file_summaries"] = dict(exec_res_list) -# Create flow -summarize_flow = Flow(start=map_node) -summarize_flow.run(shared) +class CombineSummaries(Node): + def prep(self, shared): + return shared["file_summaries"] + + def exec(self, file_summaries): + # format as: "File1: summary\nFile2: summary...\n" + text_list = [] + for fname, summ in file_summaries.items(): + text_list.append(f"{fname} summary:\n{summ}\n") + big_text = "\n---\n".join(text_list) + + return call_llm(f"Combine these file summaries into one final summary:\n{big_text}") + + def post(self, shared, prep_res, final_summary): + shared["all_files_summary"] = final_summary + +batch_node = SummarizeAllFiles() +combine_node = CombineSummaries() +batch_node >> combine_node + +flow = Flow(start=batch_node) + +shared = { + "files": { + "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", + "file2.txt": "Some other interesting text ...", + # ... + } +} +flow.run(shared) +print("Individual Summaries:", shared["file_summaries"]) +print("\nFinal Summary:\n", shared["all_files_summary"]) ``` \ No newline at end of file diff --git a/docs/design_pattern/workflow.md b/docs/design_pattern/workflow.md index 5601867..62b4436 100644 --- a/docs/design_pattern/workflow.md +++ b/docs/design_pattern/workflow.md @@ -9,6 +9,10 @@ nav_order: 2 Many real-world tasks are too complex for one LLM call. The solution is to decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes. +
+ +
+ > - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. > - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. >