diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..004e932 --- /dev/null +++ b/.gitignore @@ -0,0 +1,71 @@ +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# IDE specific files +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Node +node_modules/ +npm-debug.log +yarn-debug.log +yarn-error.log +.env +.env.local +.env.development.local +.env.test.local +.env.production.local + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +venv/ +ENV/ + +# Logs and databases +*.log +*.sql +*.sqlite + +# Build output +dist/ +build/ +out/ + +# Coverage reports +coverage/ +.coverage +.coverage.* +htmlcov/ + +# Misc +*.bak +*.tmp +*.temp \ No newline at end of file diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py index 2a7ea3d..9d560b1 100644 --- a/minillmflow/__init__.py +++ b/minillmflow/__init__.py @@ -1,93 +1,56 @@ -import asyncio - -def _wrap_async(fn): +class BaseNode: """ - Given a synchronous function fn, return a coroutine (async function) that - simply awaits the (synchronous) call. + A base node that provides: + - preprocess() + - process() + - postprocess() + - run() -- just runs itself (no chaining) """ - async def _async_wrapper(self, *args, **kwargs): - return fn(self, *args, **kwargs) - return _async_wrapper - - -class NodeMeta(type): - """ - Metaclass that converts certain methods into async if they are not already. - """ - def __new__(mcs, name, bases, attrs): - # Add ANY method names you want to auto-wrap here: - methods_to_wrap = ( - "preprocess", - "process", - "postprocess", - "process_after_fail", - "process_one", - "process_one_after_fail", - ) - - for attr_name in methods_to_wrap: - if attr_name in attrs: - # If it's not already a coroutine function, wrap it - if not asyncio.iscoroutinefunction(attrs[attr_name]): - old_fn = attrs[attr_name] - attrs[attr_name] = _wrap_async(old_fn) - - return super().__new__(mcs, name, bases, attrs) - - -async def hello(text): - print("Start") - await asyncio.sleep(1) # Simulate some async work - print(text) - -class BaseNode(metaclass=NodeMeta): def __init__(self): self.parameters = {} self.successors = {} - # Syntactic sugar for chaining + def set_parameters(self, params): + self.parameters.update(params) + def add_successor(self, node, condition="default"): - # warn if we're overwriting an existing successor if condition in self.successors: print(f"Warning: overwriting existing successor for condition '{condition}'") self.successors[condition] = node return node - - # By default these are already async. If a subclass overrides them - # with non-async definitions, they'll get wrapped automatically. + def preprocess(self, shared_storage): return None - def process(self, shared_storage, data): + def process(self, shared_storage, prep_result): return None - def postprocess(self, shared_storage, preprocess_result, process_result): + def _process(self, shared_storage, prep_result): + # Could have retry logic or other wrap logic + return self.process(shared_storage, prep_result) + + def postprocess(self, shared_storage, prep_result, proc_result): return "default" - async def _process(self, shared_storage, data): - return await self.process(shared_storage, data) - - async def _run_one(self, shared_storage): - preprocess_result = await self.preprocess(shared_storage) - process_result = await self._process(shared_storage, preprocess_result) - condition = await self.postprocess(shared_storage, preprocess_result, process_result) or "default" - return self.successors.get(condition) - def run(self, shared_storage=None): - asyncio.run(self._run_async(shared_storage)) + if not shared_storage: + shared_storage = {} - async def run_in_jupyter(self, shared_storage=None): - await self._run_async(shared_storage) - - async def _run_async(self, shared_storage): - current_node = self - while current_node: - current_node = await current_node._run_one(shared_storage) + prep = self.preprocess(shared_storage) + proc = self._process(shared_storage, prep) + return self.postprocess(shared_storage, prep, proc) def __rshift__(self, other): + """ + For chaining with >> operator, e.g. node1 >> node2 + """ return self.add_successor(other) def __gt__(self, other): + """ + For chaining with > operator, e.g. node1 > "some_condition" + then >> node2 + """ if isinstance(other, str): return _ConditionalTransition(self, other) elif isinstance(other, BaseNode): @@ -95,49 +58,153 @@ class BaseNode(metaclass=NodeMeta): raise TypeError("Unsupported operand type") def __call__(self, condition): + """ + For node("condition") >> next_node syntax + """ return _ConditionalTransition(self, condition) + class _ConditionalTransition: + """ + Helper for Node > 'condition' >> AnotherNode style + """ def __init__(self, source_node, condition): self.source_node = source_node self.condition = condition - def __gt__(self, target_node): - if not isinstance(target_node, BaseNode): - raise TypeError("Target must be a BaseNode") + def __rshift__(self, target_node): return self.source_node.add_successor(target_node, self.condition) +# robust running process class Node(BaseNode): - def __init__(self, max_retries=5, delay_s=0.1): + def __init__(self, max_retries=1): super().__init__() self.max_retries = max_retries - self.delay_s = delay_s def process_after_fail(self, shared_storage, data, exc): raise exc # return "fail" - async def _process(self, shared_storage, data): + def _process(self, shared_storage, data): for attempt in range(self.max_retries): try: - return await super()._process(shared_storage, data) + return super()._process(shared_storage, data) except Exception as e: if attempt == self.max_retries - 1: - return await self.process_after_fail(shared_storage, data, e) - await asyncio.sleep(self.delay_s) + return self.process_after_fail(shared_storage, data, e) -class Flow(BaseNode): +class InteractiveNode(BaseNode): + """ + Interactive node. Instead of returning a condition, + we 'signal' the condition via a callback provided by the Flow. + """ + + def postprocess(self, shared_storage, prep_result, proc_result, next_node_callback): + """ + We do NOT return anything. We call 'next_node_callback("some_condition")' + to tell the Flow which successor to pick. + """ + # e.g. here we pick "default", but in real usage you'd do logic or rely on user input + next_node_callback("default") + + def run(self, shared_storage=None): + """ + Run just THIS node (no chain). + """ + if not shared_storage: + shared_storage = {} + + # 1) Preprocess + prep = self.preprocess(shared_storage) + + # 2) Process + proc = self._process(shared_storage, prep) + + # 3) Postprocess with a dummy callback + def dummy_callback(condition="default"): + print("[Dummy callback] To run the flow, pass this node into a Flow instance.") + + self.postprocess(shared_storage, prep, proc, dummy_callback) + + def is_interactive(self): + return True + +class Flow: + """ + A Flow that runs through a chain of nodes, from a start node onward. + Each iteration: + - preprocess + - process + - postprocess + The postprocess is given a callback to choose the next node. + We'll 'yield' the current node each time, so the caller can see progress. + """ def __init__(self, start_node=None): - super().__init__() self.start_node = start_node - async def _process(self, shared_storage, _): - if self.start_node: - current_node = self.start_node - while current_node: - current_node = await current_node._run_one(shared_storage or {}) - return "Flow done" + def run(self, shared_storage=None): + if shared_storage is None: + shared_storage = {} + current_node = self.start_node + print("hihihi") + + while current_node: + # 1) Preprocess + prep_result = current_node.preprocess(shared_storage) + print("prep") + # 2) Process + proc_result = current_node._process(shared_storage, prep_result) + + # Prepare next_node variable + next_node = [None] + + # We'll define a callback only if this is an interactive node. + # The callback sets next_node[0] based on condition. + def next_node_callback(condition="default"): + nxt = current_node.successors.get(condition) + next_node[0] = nxt + + # 3) Check if it's an interactive node + is_interactive = ( + hasattr(current_node, 'is_interactive') + and current_node.is_interactive() + ) + + if is_interactive: + print("ineractive") + # + # ---- INTERACTIVE CASE ---- + # + # a) yield so that external code can do UI, etc. + # yield current_node, prep_result, proc_result, next_node_callback + + # # b) Now we do postprocess WITH the callback: + # current_node.postprocess( + # shared_storage, + # prep_result, + # proc_result, + # next_node_callback + # ) + # # once postprocess is done, next_node[0] should be set + + else: + # + # ---- NON-INTERACTIVE CASE ---- + # + # We just call postprocess WITHOUT callback, + # and let it return the condition string: + condition = current_node.postprocess( + shared_storage, + prep_result, + proc_result + ) + # Then we figure out the next node: + next_node[0] = current_node.successors.get(condition, None) + + # 5) Move on to the next node + current_node = next_node[0] + class BatchNode(BaseNode): def __init__(self, max_retries=5, delay_s=0.1): super().__init__() diff --git a/minillmflow/__pycache__/__init__.cpython-39.pyc b/minillmflow/__pycache__/__init__.cpython-39.pyc index 57d50b0..1400bbb 100644 Binary files a/minillmflow/__pycache__/__init__.cpython-39.pyc and b/minillmflow/__pycache__/__init__.cpython-39.pyc differ