pocketflow/cookbook/pocketflow-parallel-batch/main.py

103 lines
3.2 KiB
Python

import asyncio
import time
from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow
####################################
# Dummy async function (1s delay)
####################################
async def dummy_llm_summarize(text):
"""Simulates an async LLM call that takes 1 second."""
await asyncio.sleep(1)
return f"Summarized({len(text)} chars)"
###############################################
# 1) AsyncBatchNode (sequential) version
###############################################
class SummariesAsyncNode(AsyncBatchNode):
"""
Processes items sequentially in an async manner.
The next item won't start until the previous item has finished.
"""
async def prep_async(self, shared):
# Return a list of items to process.
# Each item is (filename, content).
return list(shared["data"].items())
async def exec_async(self, item):
filename, content = item
print(f"[Sequential] Summarizing {filename}...")
summary = await dummy_llm_summarize(content)
return (filename, summary)
async def post_async(self, shared, prep_res, exec_res_list):
# exec_res_list is a list of (filename, summary)
shared["sequential_summaries"] = dict(exec_res_list)
return "done_sequential"
###############################################
# 2) AsyncParallelBatchNode (concurrent) version
###############################################
class SummariesAsyncParallelNode(AsyncParallelBatchNode):
"""
Processes items in parallel. Many LLM calls start at once.
"""
async def prep_async(self, shared):
return list(shared["data"].items())
async def exec_async(self, item):
filename, content = item
print(f"[Parallel] Summarizing {filename}...")
summary = await dummy_llm_summarize(content)
return (filename, summary)
async def post_async(self, shared, prep_res, exec_res_list):
shared["parallel_summaries"] = dict(exec_res_list)
return "done_parallel"
###############################################
# Demo comparing the two approaches
###############################################
async def main():
# We'll use the same data for both flows
shared_data = {
"data": {
"file1.txt": "Hello world 1",
"file2.txt": "Hello world 2",
"file3.txt": "Hello world 3",
}
}
# 1) Run the sequential version
seq_node = SummariesAsyncNode()
seq_flow = AsyncFlow(start=seq_node)
print("\n=== Running Sequential (AsyncBatchNode) ===")
t0 = time.time()
await seq_flow.run_async(shared_data)
t1 = time.time()
# 2) Run the parallel version
par_node = SummariesAsyncParallelNode()
par_flow = AsyncFlow(start=par_node)
print("\n=== Running Parallel (AsyncParallelBatchNode) ===")
t2 = time.time()
await par_flow.run_async(shared_data)
t3 = time.time()
# Show times
print("\n--- Results ---")
print(f"Sequential Summaries: {shared_data.get('sequential_summaries')}")
print(f"Parallel Summaries: {shared_data.get('parallel_summaries')}")
print(f"Sequential took: {t1 - t0:.2f} seconds")
print(f"Parallel took: {t3 - t2:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())