diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py index 9d560b1..f3a8146 100644 --- a/minillmflow/__init__.py +++ b/minillmflow/__init__.py @@ -1,3 +1,5 @@ +import asyncio + class BaseNode: """ A base node that provides: @@ -33,9 +35,6 @@ class BaseNode: return "default" def run(self, shared_storage=None): - if not shared_storage: - shared_storage = {} - prep = self.preprocess(shared_storage) proc = self._process(shared_storage, prep) return self.postprocess(shared_storage, prep, proc) @@ -93,118 +92,80 @@ class Node(BaseNode): if attempt == self.max_retries - 1: return self.process_after_fail(shared_storage, data, e) -class InteractiveNode(BaseNode): - """ - Interactive node. Instead of returning a condition, - we 'signal' the condition via a callback provided by the Flow. - """ - - def postprocess(self, shared_storage, prep_result, proc_result, next_node_callback): - """ - We do NOT return anything. We call 'next_node_callback("some_condition")' - to tell the Flow which successor to pick. - """ - # e.g. here we pick "default", but in real usage you'd do logic or rely on user input - next_node_callback("default") - - def run(self, shared_storage=None): - """ - Run just THIS node (no chain). - """ - if not shared_storage: - shared_storage = {} - - # 1) Preprocess - prep = self.preprocess(shared_storage) - - # 2) Process - proc = self._process(shared_storage, prep) - - # 3) Postprocess with a dummy callback - def dummy_callback(condition="default"): - print("[Dummy callback] To run the flow, pass this node into a Flow instance.") - - self.postprocess(shared_storage, prep, proc, dummy_callback) - - def is_interactive(self): - return True - -class Flow: - """ - A Flow that runs through a chain of nodes, from a start node onward. - Each iteration: - - preprocess - - process - - postprocess - The postprocess is given a callback to choose the next node. - We'll 'yield' the current node each time, so the caller can see progress. - """ +class Flow(BaseNode): def __init__(self, start_node=None): self.start_node = start_node - - def run(self, shared_storage=None): - if shared_storage is None: - shared_storage = {} - - current_node = self.start_node - print("hihihi") + def _process(self, shared_storage, _): + current_node = self.start_node while current_node: - # 1) Preprocess - prep_result = current_node.preprocess(shared_storage) - print("prep") - # 2) Process - proc_result = current_node._process(shared_storage, prep_result) - - # Prepare next_node variable - next_node = [None] - - # We'll define a callback only if this is an interactive node. - # The callback sets next_node[0] based on condition. - def next_node_callback(condition="default"): - nxt = current_node.successors.get(condition) - next_node[0] = nxt - - # 3) Check if it's an interactive node - is_interactive = ( - hasattr(current_node, 'is_interactive') - and current_node.is_interactive() - ) - - if is_interactive: - print("ineractive") - # - # ---- INTERACTIVE CASE ---- - # - # a) yield so that external code can do UI, etc. - # yield current_node, prep_result, proc_result, next_node_callback - - # # b) Now we do postprocess WITH the callback: - # current_node.postprocess( - # shared_storage, - # prep_result, - # proc_result, - # next_node_callback - # ) - # # once postprocess is done, next_node[0] should be set - - else: - # - # ---- NON-INTERACTIVE CASE ---- - # - # We just call postprocess WITHOUT callback, - # and let it return the condition string: - condition = current_node.postprocess( - shared_storage, - prep_result, - proc_result - ) - # Then we figure out the next node: - next_node[0] = current_node.successors.get(condition, None) - - # 5) Move on to the next node - current_node = next_node[0] + condition = current_node.run(shared_storage) + current_node = current_node.successors.get(condition, None) + def postprocess(self, shared_storage, prep_result, proc_result): + return None + + + +class AsyncNode(Node): + """ + A Node whose postprocess step is async. + You can also override process() to be async if needed. + """ + + async def postprocess_async(self, shared_storage, prep_result, proc_result): + """ + Async version of postprocess. By default, returns "default". + Override as needed. + """ + await asyncio.sleep(0) # trivial async pause (no-op) + return "default" + + async def run_async(self, shared_storage=None): + """ + Async version of run. + If your process method is also async, you'll need to adapt accordingly. + """ + # We can keep preprocess synchronous or make it async as well, + # depending on your usage. Here it's left as sync for simplicity. + prep = self.preprocess(shared_storage) + + # process can remain sync if you prefer, or you can define an async process. + proc = self._process(shared_storage, prep) + + # postprocess is async + return await self.postprocess_async(shared_storage, prep, proc) + + +class AsyncFlow(Flow): + """ + A Flow that can handle a mixture of sync and async nodes. + If the node is an AsyncNode, calls `run_async`. + Otherwise, calls `run`. + """ + async def _process(self, shared_storage, _): + current_node = self.start_node + while current_node: + if hasattr(current_node, "run_async") and callable(current_node.run_async): + # If it's an async node, await its run_async + condition = await current_node.run_async(shared_storage) + else: + # Otherwise, assume it's a sync node + condition = current_node.run(shared_storage) + + current_node = current_node.successors.get(condition, None) + + async def run_async(self, shared_storage=None): + """ + Kicks off the async flow. Similar to Flow.run, + but uses our async _process method. + """ + prep = self.preprocess(shared_storage) + # Note: flows typically don't need a meaningful process step + # because the "process" is the iteration through the nodes. + await self._process(shared_storage, prep) + return self.postprocess(shared_storage, prep, None) + class BatchNode(BaseNode): def __init__(self, max_retries=5, delay_s=0.1): super().__init__() diff --git a/minillmflow/__pycache__/__init__.cpython-39.pyc b/minillmflow/__pycache__/__init__.cpython-39.pyc index 1400bbb..912c60f 100644 Binary files a/minillmflow/__pycache__/__init__.cpython-39.pyc and b/minillmflow/__pycache__/__init__.cpython-39.pyc differ