From c1ba9dd0d49b2efa1fe6604934c8e46d1642b264 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Tue, 31 Dec 2024 01:26:52 +0000 Subject: [PATCH] parallel --- docs/async.md | 59 +++++++++--------- minillmflow/__init__.py | 108 ++++++++++++++++----------------- tests/test_async_batch_flow.py | 24 ++++---- tests/test_async_flow.py | 4 +- 4 files changed, 99 insertions(+), 96 deletions(-) diff --git a/docs/async.md b/docs/async.md index 3e91c8f..bbb5c95 100644 --- a/docs/async.md +++ b/docs/async.md @@ -7,57 +7,60 @@ nav_order: 5 # Async -**Async** pattern allows the `post()` step to be asynchronous: `post_async()`. This is especially helpful if you need to `await` something—for example, user feedback or external async requests. +**Mini LLM Flow** allows fully asynchronous nodes by implementing `prep_async()`, `exec_async()`, and/or `post_async()`. This is useful for: -**⚠️ Warning**: Only `post_async()` is async. `prep()` and `exec()` must be sync. +## Implementation ---- +1. **prep_async()** + - For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. -## 1. AsyncNode +2. **exec_async()** + - Typically used for async LLM calls. -Below is a minimal **AsyncNode** that calls an LLM in `exec()` to summarize texts, and then awaits user feedback in `post_async()`: +3. **post_async()** + - For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. + +Each step can be either sync or async; the framework automatically detects which to call. + +**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. + +### Example ```python class SummarizeThenVerify(AsyncNode): - def prep(self, shared): - return shared.get("doc", "") + async def prep_async(self, shared): + # Example: read a file asynchronously + doc_text = await read_file_async(shared["doc_path"]) + return doc_text - def exec(self, prep_res): - return call_llm(f"Summarize: {prep_res}") + async def exec_async(self, prep_res): + # Example: async LLM call + summary = await call_llm_async(f"Summarize: {prep_res}") + return summary async def post_async(self, shared, prep_res, exec_res): - user_decision = await gather_user_feedback(exec_res) - if user_decision == "approve": + # Example: wait for user feedback + decision = await gather_user_feedback(exec_res) + if decision == "approve": shared["summary"] = exec_res return "approve" - else: - return "deny" -``` + return "deny" -- `exec()`: Summarizes text (sync LLM call). -- `post_async()`: Waits for user approval (async). - ---- - -## 2. AsyncFlow - -We can build an **AsyncFlow** around this node. If the user denies, we loop back for another attempt; if approved, we pass to a final node: - -```python summarize_node = SummarizeThenVerify() final_node = Finalize() -# Chain conditions +# Define transitions summarize_node - "approve" >> final_node -summarize_node - "deny" >> summarize_node # retry loop +summarize_node - "deny" >> summarize_node # retry flow = AsyncFlow(start=summarize_node) async def main(): - shared = {"doc": "Mini LLM Flow is a lightweight LLM framework."} + shared = {"doc_path": "document.txt"} await flow.run_async(shared) - print("Final stored summary:", shared.get("final_summary")) + print("Final Summary:", shared.get("summary")) asyncio.run(main()) ``` +Keep it simple: go async only when needed, handle errors gracefully, and leverage Python’s `asyncio`. diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py index 0d4fa43..ac1a4a0 100644 --- a/minillmflow/__init__.py +++ b/minillmflow/__init__.py @@ -6,16 +6,13 @@ class BaseNode: def add_successor(self,node,action="default"): if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'") self.successors[action]=node;return node - def prep(self,shared): return None - def exec(self,prep_res): return None + def prep(self,shared): pass + def exec(self,prep_res): pass + def post(self,shared,prep_res,exec_res): pass def _exec(self,prep_res): return self.exec(prep_res) - def post(self,shared,prep_res,exec_res): return "default" - def _run(self,shared): - prep_res=self.prep(shared) - exec_res=self._exec(prep_res) - return self.post(shared,prep_res,exec_res) - def run(self,shared): - if self.successors: warnings.warn("Node won't run successors. Use a parent Flow instead.") + def _run(self,shared): p=self.prep(shared);e=self._exec(p);return self.post(shared,p,e) + def run(self,shared): + if self.successors: warnings.warn("Node won't run successors. Use Flow.") return self._run(shared) def __rshift__(self,other): return self.add_successor(other) def __sub__(self,action): @@ -27,74 +24,77 @@ class _ConditionalTransition: def __rshift__(self,tgt): return self.src.add_successor(tgt,self.action) class Node(BaseNode): - def __init__(self,max_retries=1): - super().__init__() - self.max_retries=max_retries + def __init__(self,max_retries=1): super().__init__();self.max_retries=max_retries def process_after_fail(self,prep_res,exc): raise exc def _exec(self,prep_res): for i in range(self.max_retries): - try:return super()._exec(prep_res) + try: return super()._exec(prep_res) except Exception as e: - if i==self.max_retries-1:return self.process_after_fail(prep_res,e) + if i==self.max_retries-1: return self.process_after_fail(prep_res,e) class BatchNode(Node): - def prep(self,shared): return [] def _exec(self,items): return [super(Node,self)._exec(i) for i in items] class Flow(BaseNode): - def __init__(self,start): - super().__init__() - self.start=start + def __init__(self,start): super().__init__();self.start=start def get_next_node(self,curr,action): - nxt=curr.successors.get(action if action is not None else "default") - if not nxt and curr.successors: warnings.warn(f"Flow ends: action '{action}' not found in {list(curr.successors)}") + nxt=curr.successors.get(action or "default") + if not nxt and curr.successors: + warnings.warn(f"Flow ends: '{action}' not found in {list(curr.successors)}") return nxt - def _orchestrate(self,shared,params=None): - curr,p=self.start,(params if params else {**self.params}) - while curr: - curr.set_params(p) - curr=self.get_next_node(curr,curr._run(shared)) - def _run(self,shared): - self._orchestrate(shared) - return self.post(shared,self.prep(shared),None) - def exec(self,prep_res): - raise RuntimeError("Flow should not exec directly. Create a child Node instead.") + def _orch(self,shared,params=None): + curr,p=self.start,(params or {**self.params}) + while curr: curr.set_params(p);c=curr._run(shared);curr=self.get_next_node(curr,c) + def _run(self,shared): pr=self.prep(shared);self._orch(shared);return self.post(shared,pr,None) + def exec(self,prep_res): raise RuntimeError("Flow can't exec.") class BatchFlow(Flow): - def prep(self,shared): return [] def _run(self,shared): - prep_res=self.prep(shared) - for batch_params in prep_res:self._orchestrate(shared,{**self.params,**batch_params}) - return self.post(shared,prep_res,None) + pr=self.prep(shared) or [] + for bp in pr: self._orch(shared,{**self.params,**bp}) + return self.post(shared,pr,None) class AsyncNode(Node): - def post(self,shared,prep_res,exec_res): - raise RuntimeError("AsyncNode should post using post_async instead.") - async def post_async(self,shared,prep_res,exec_res): - await asyncio.sleep(0);return "default" - async def run_async(self,shared): - if self.successors: - warnings.warn("Node won't run successors. Use a parent AsyncFlow instead.") + def prep(self,shared): raise RuntimeError("Use prep_async.") + def exec(self,prep_res): raise RuntimeError("Use exec_async.") + def post(self,shared,prep_res,exec_res): raise RuntimeError("Use post_async.") + def _run(self,shared): raise RuntimeError("Use run_async.") + async def prep_async(self,shared): pass + async def exec_async(self,prep_res): pass + async def post_async(self,shared,prep_res,exec_res): pass + async def _exec(self,prep_res): return await self.exec_async(prep_res) + async def run_async(self,shared): + if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.") return await self._run_async(shared) async def _run_async(self,shared): - prep_res=self.prep(shared) - exec_res=self._exec(prep_res) - return await self.post_async(shared,prep_res,exec_res) - def _run(self,shared): raise RuntimeError("AsyncNode should run using run_async instead.") + p=await self.prep_async(shared);e=await self._exec(p) + return await self.post_async(shared,p,e) + +class AsyncBatchNode(AsyncNode): + async def _exec(self,items): return [await super()._exec(i) for i in items] + +class AsyncParallelBatchNode(AsyncNode): + async def _exec(self,items): return await asyncio.gather(*(super()._exec(i) for i in items)) class AsyncFlow(Flow,AsyncNode): - async def _orchestrate_async(self,shared,params=None): - curr,p=self.start,(params if params else {**self.params}) + async def _orch_async(self,shared,params=None): + curr,p=self.start,(params or {**self.params}) while curr: curr.set_params(p) - c=await curr._run_async(shared) if hasattr(curr,"run_async") else curr._run(shared) + c=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared) curr=self.get_next_node(curr,c) async def _run_async(self,shared): - await self._orchestrate_async(shared) - return await self.post_async(shared,self.prep(shared),None) + pr=await self.prep_async(shared);await self._orch_async(shared) + return await self.post_async(shared,pr,None) -class BatchAsyncFlow(BatchFlow,AsyncFlow): +class AsyncBatchFlow(AsyncFlow): async def _run_async(self,shared): - prep_res=self.prep(shared) - for batch_params in prep_res:await self._orchestrate_async(shared,{**self.params,**batch_params}) - return await self.post_async(shared,prep_res,None) \ No newline at end of file + pr=await self.prep_async(shared) or [] + for bp in pr: await self._orch_async(shared,{**self.params,**bp}) + return await self.post_async(shared,pr,None) + +class AsyncParallelBatchFlow(AsyncFlow): + async def _run_async(self,shared): + pr=await self.prep_async(shared) or [] + await asyncio.gather(*(self._orch_async(shared,{**self.params,**bp}) for bp in pr)) + return await self.post_async(shared,pr,None) \ No newline at end of file diff --git a/tests/test_async_batch_flow.py b/tests/test_async_batch_flow.py index 28e3b20..2a73eab 100644 --- a/tests/test_async_batch_flow.py +++ b/tests/test_async_batch_flow.py @@ -4,10 +4,10 @@ import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) -from minillmflow import AsyncNode, BatchAsyncFlow +from minillmflow import AsyncNode, AsyncBatchFlow class AsyncDataProcessNode(AsyncNode): - def prep(self, shared_storage): + async def prep_async(self, shared_storage): key = self.params.get('key') data = shared_storage['input_data'][key] if 'results' not in shared_storage: @@ -34,8 +34,8 @@ class TestAsyncBatchFlow(unittest.TestCase): def test_basic_async_batch_processing(self): """Test basic async batch processing with multiple keys""" - class SimpleTestAsyncBatchFlow(BatchAsyncFlow): - def prep(self, shared_storage): + class SimpleTestAsyncBatchFlow(AsyncBatchFlow): + async def prep_async(self, shared_storage): return [{'key': k} for k in shared_storage['input_data'].keys()] shared_storage = { @@ -58,8 +58,8 @@ class TestAsyncBatchFlow(unittest.TestCase): def test_empty_async_batch(self): """Test async batch processing with empty input""" - class EmptyTestAsyncBatchFlow(BatchAsyncFlow): - def prep(self, shared_storage): + class EmptyTestAsyncBatchFlow(AsyncBatchFlow): + async def prep_async(self, shared_storage): return [{'key': k} for k in shared_storage['input_data'].keys()] shared_storage = { @@ -73,8 +73,8 @@ class TestAsyncBatchFlow(unittest.TestCase): def test_async_error_handling(self): """Test error handling during async batch processing""" - class ErrorTestAsyncBatchFlow(BatchAsyncFlow): - def prep(self, shared_storage): + class ErrorTestAsyncBatchFlow(AsyncBatchFlow): + async def prep_async(self, shared_storage): return [{'key': k} for k in shared_storage['input_data'].keys()] shared_storage = { @@ -110,8 +110,8 @@ class TestAsyncBatchFlow(unittest.TestCase): await asyncio.sleep(0.01) return "done" - class NestedAsyncBatchFlow(BatchAsyncFlow): - def prep(self, shared_storage): + class NestedAsyncBatchFlow(AsyncBatchFlow): + async def prep_async(self, shared_storage): return [{'key': k} for k in shared_storage['input_data'].keys()] # Create inner flow @@ -147,8 +147,8 @@ class TestAsyncBatchFlow(unittest.TestCase): shared_storage['results'][key] = shared_storage['input_data'][key] * multiplier return "done" - class CustomParamAsyncBatchFlow(BatchAsyncFlow): - def prep(self, shared_storage): + class CustomParamAsyncBatchFlow(AsyncBatchFlow): + async def prep_async(self, shared_storage): return [{ 'key': k, 'multiplier': i + 1 diff --git a/tests/test_async_flow.py b/tests/test_async_flow.py index 2cf7d2b..9334c03 100644 --- a/tests/test_async_flow.py +++ b/tests/test_async_flow.py @@ -17,7 +17,7 @@ class AsyncNumberNode(AsyncNode): super().__init__() self.number = number - def prep(self, shared_storage): + async def prep_async(self, shared_storage): # Synchronous work is allowed inside an AsyncNode, # but final 'condition' is determined by post_async(). shared_storage['current'] = self.number @@ -34,7 +34,7 @@ class AsyncIncrementNode(AsyncNode): """ Demonstrates incrementing the 'current' value asynchronously. """ - def prep(self, shared_storage): + async def prep_async(self, shared_storage): shared_storage['current'] = shared_storage.get('current', 0) + 1 return "incremented"