From f631aaa4d33d8b34eabff66a2570145d10a06188 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Wed, 25 Dec 2024 23:02:32 +0000 Subject: [PATCH] tests --- tests/test_async_flow.py | 154 ++++++++++++++++++++++++++++++++ tests/test_flow_basic.py | 156 +++++++++++++++++++++++++++++++++ tests/test_flow_composition.py | 135 ++++++++++++++++++++++++++++ 3 files changed, 445 insertions(+) create mode 100644 tests/test_async_flow.py create mode 100644 tests/test_flow_basic.py create mode 100644 tests/test_flow_composition.py diff --git a/tests/test_async_flow.py b/tests/test_async_flow.py new file mode 100644 index 0000000..a93c6a1 --- /dev/null +++ b/tests/test_async_flow.py @@ -0,0 +1,154 @@ +import unittest +import asyncio +import sys +from pathlib import Path + +sys.path.append(str(Path(__file__).parent.parent)) +from minillmflow import Node, AsyncNode, AsyncFlow + + +class AsyncNumberNode(AsyncNode): + """ + Simple async node that sets 'current' to a given number. + Demonstrates overriding .process() (sync) and using + postprocess_async() for the async portion. + """ + def __init__(self, number): + super().__init__() + self.number = number + + def process(self, shared_storage, data): + # Synchronous work is allowed inside an AsyncNode, + # but final 'condition' is determined by postprocess_async(). + shared_storage['current'] = self.number + return "set_number" + + async def postprocess_async(self, shared_storage, prep_result, proc_result): + # Possibly do asynchronous tasks here + await asyncio.sleep(0.01) + # Return a condition for the flow + return "number_set" + + +class AsyncIncrementNode(AsyncNode): + """ + Demonstrates incrementing the 'current' value asynchronously. + """ + def process(self, shared_storage, data): + shared_storage['current'] = shared_storage.get('current', 0) + 1 + return "incremented" + + async def postprocess_async(self, shared_storage, prep_result, proc_result): + await asyncio.sleep(0.01) # simulate async I/O + return "done" + + +class TestAsyncNode(unittest.TestCase): + """ + Test the AsyncNode (and descendants) in isolation (not in a flow). + """ + def test_async_number_node_direct_call(self): + """ + Even though AsyncNumberNode is designed for an async flow, + we can still test it directly by calling run_async(). + """ + async def run_node(): + node = AsyncNumberNode(42) + shared_storage = {} + condition = await node.run_async(shared_storage) + return shared_storage, condition + + shared_storage, condition = asyncio.run(run_node()) + self.assertEqual(shared_storage['current'], 42) + self.assertEqual(condition, "number_set") + + def test_async_increment_node_direct_call(self): + async def run_node(): + node = AsyncIncrementNode() + shared_storage = {'current': 10} + condition = await node.run_async(shared_storage) + return shared_storage, condition + + shared_storage, condition = asyncio.run(run_node()) + self.assertEqual(shared_storage['current'], 11) + self.assertEqual(condition, "done") + + +class TestAsyncFlow(unittest.TestCase): + """ + Test how AsyncFlow orchestrates multiple async nodes. + """ + def test_simple_async_flow(self): + """ + Flow: + 1) AsyncNumberNode(5) -> sets 'current' to 5 + 2) AsyncIncrementNode() -> increments 'current' to 6 + """ + + # Create our nodes + start_node = AsyncNumberNode(5) + inc_node = AsyncIncrementNode() + + # Chain them: start_node >> inc_node + start_node - "number_set" >> inc_node + + # Create an AsyncFlow with start_node + flow = AsyncFlow(start_node) + + # We'll run the flow synchronously (which under the hood is asyncio.run()) + shared_storage = {} + flow.run(shared_storage) + + self.assertEqual(shared_storage['current'], 6) + + def test_async_flow_branching(self): + """ + Demonstrate a branching scenario where we return different + conditions. For example, you could have an async node that + returns "go_left" or "go_right" in postprocess_async, but here + we'll keep it simpler for demonstration. + """ + + class BranchingAsyncNode(AsyncNode): + def process(self, shared_storage, data): + value = shared_storage.get("value", 0) + shared_storage["value"] = value + # We'll decide branch based on whether 'value' is positive + return None + + async def postprocess_async(self, shared_storage, prep_result, proc_result): + await asyncio.sleep(0.01) + if shared_storage["value"] >= 0: + return "positive_branch" + else: + return "negative_branch" + + class PositiveNode(Node): + def process(self, shared_storage, data): + shared_storage["path"] = "positive" + return None + + class NegativeNode(Node): + def process(self, shared_storage, data): + shared_storage["path"] = "negative" + return None + + shared_storage = {"value": 10} + + start_node = BranchingAsyncNode() + positive_node = PositiveNode() + negative_node = NegativeNode() + + # Condition-based chaining + start_node - "positive_branch" >> positive_node + start_node - "negative_branch" >> negative_node + + flow = AsyncFlow(start_node) + flow.run(shared_storage) + + self.assertEqual(shared_storage["path"], "positive", + "Should have taken the positive branch") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_flow_basic.py b/tests/test_flow_basic.py new file mode 100644 index 0000000..f852613 --- /dev/null +++ b/tests/test_flow_basic.py @@ -0,0 +1,156 @@ +import unittest +import sys +from pathlib import Path + +sys.path.append(str(Path(__file__).parent.parent)) +from minillmflow import Node, Flow + +class NumberNode(Node): + def __init__(self, number): + super().__init__() + self.number = number + + def process(self, shared_storage, data): + shared_storage['current'] = self.number + +class AddNode(Node): + def __init__(self, number): + super().__init__() + self.number = number + + def process(self, shared_storage, data): + shared_storage['current'] += self.number + +class MultiplyNode(Node): + def __init__(self, number): + super().__init__() + self.number = number + + def process(self, shared_storage, data): + shared_storage['current'] *= self.number + +class CheckPositiveNode(Node): + def postprocess(self, shared_storage, prep_result, proc_result): + if shared_storage['current'] >= 0: + return 'positive' + else: + return 'negative' + +class NoOpNode(Node): + def process(self, shared_storage, data): + # Do nothing, just pass + pass + +class TestNode(unittest.TestCase): + def test_single_number(self): + shared_storage = {} + start = NumberNode(5) + pipeline = Flow(start_node=start) + pipeline.run(shared_storage) + self.assertEqual(shared_storage['current'], 5) + + def test_sequence(self): + """ + Test a simple linear pipeline: + NumberNode(5) -> AddNode(3) -> MultiplyNode(2) + + Expected result: + (5 + 3) * 2 = 16 + """ + shared_storage = {} + n1 = NumberNode(5) + n2 = AddNode(3) + n3 = MultiplyNode(2) + + # Chain them in sequence using the >> operator + n1 >> n2 >> n3 + + pipeline = Flow(start_node=n1) + pipeline.run(shared_storage) + + self.assertEqual(shared_storage['current'], 16) + + def test_branching_positive(self): + """ + Test a branching pipeline with positive route: + start = NumberNode(5) + check = CheckPositiveNode() + if 'positive' -> AddNode(10) + if 'negative' -> AddNode(-20) + + Since we start with 5, + check returns 'positive', + so we add 10. Final result = 15. + """ + shared_storage = {} + start = NumberNode(5) + check = CheckPositiveNode() + add_if_positive = AddNode(10) + add_if_negative = AddNode(-20) + + start >> check + + # Use the new dash operator for condition + check - "positive" >> add_if_positive + check - "negative" >> add_if_negative + + pipeline = Flow(start_node=start) + pipeline.run(shared_storage) + + self.assertEqual(shared_storage['current'], 15) + + def test_negative_branch(self): + """ + Same branching pipeline, but starting with -5. + That should return 'negative' from CheckPositiveNode + and proceed to add_if_negative, i.e. add -20. + + Final result: (-5) + (-20) = -25. + """ + shared_storage = {} + start = NumberNode(-5) + check = CheckPositiveNode() + add_if_positive = AddNode(10) + add_if_negative = AddNode(-20) + + # Build the flow + start >> check + check - "positive" >> add_if_positive + check - "negative" >> add_if_negative + + pipeline = Flow(start_node=start) + pipeline.run(shared_storage) + + # Should have gone down the 'negative' branch + self.assertEqual(shared_storage['current'], -25) + + def test_cycle_until_negative(self): + """ + Demonstrate a cyclical pipeline: + Start with 10, check if positive -> subtract 3, then go back to check. + Repeat until the number becomes negative, at which point pipeline ends. + """ + shared_storage = {} + n1 = NumberNode(10) + check = CheckPositiveNode() + subtract3 = AddNode(-3) + no_op = NoOpNode() # Dummy node for the 'negative' branch + + # Build the cycle: + # n1 -> check -> if 'positive': subtract3 -> back to check + n1 >> check + check - 'positive' >> subtract3 + subtract3 >> check + + # Attach a no-op node on the negative branch to avoid warning + check - 'negative' >> no_op + + pipeline = Flow(start_node=n1) + pipeline.run(shared_storage) + + # final result should be -2: (10 -> 7 -> 4 -> 1 -> -2) + self.assertEqual(shared_storage['current'], -2) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_flow_composition.py b/tests/test_flow_composition.py new file mode 100644 index 0000000..4381987 --- /dev/null +++ b/tests/test_flow_composition.py @@ -0,0 +1,135 @@ +import unittest +import asyncio +import sys +from pathlib import Path + +sys.path.append(str(Path(__file__).parent.parent)) +from minillmflow import Node, Flow + +class NumberNode(Node): + def __init__(self, number): + super().__init__() + self.number = number + + def process(self, shared_storage, prep_result): + shared_storage['current'] = self.number + +class AddNode(Node): + def __init__(self, number): + super().__init__() + self.number = number + + def process(self, shared_storage, prep_result): + shared_storage['current'] += self.number + +class MultiplyNode(Node): + def __init__(self, number): + super().__init__() + self.number = number + + def process(self, shared_storage, prep_result): + shared_storage['current'] *= self.number + + +class TestFlowComposition(unittest.TestCase): + + def test_flow_as_node(self): + """ + Demonstrates that a Flow can itself be chained like a Node. + We create a flow (f1) that starts with NumberNode(5) -> AddNode(10). + Then we chain f1 >> MultiplyNode(2). + + Expected result after running from f1: + start = 5 + 5 + 10 = 15 + 15 * 2 = 30 + """ + shared_storage = {} + + # Inner flow f1 + f1 = Flow(start_node=NumberNode(5)) + f1 >> AddNode(10) + + # Then chain a node after the flow + f1 >> MultiplyNode(2) + + # Run from f1 + f1.run(shared_storage) + + self.assertEqual(shared_storage['current'], 30) + + def test_nested_flow(self): + """ + Demonstrates embedding one Flow inside another Flow. + inner_flow: NumberNode(5) -> AddNode(3) + outer_flow: starts with inner_flow -> MultiplyNode(4) + + Expected result: + (5 + 3) * 4 = 32 + """ + shared_storage = {} + + # Define an inner flow + inner_flow = Flow(start_node=NumberNode(5)) + inner_flow >> AddNode(3) + + # Define an outer flow, whose start node is inner_flow + outer_flow = Flow(start_node=inner_flow) + outer_flow >> MultiplyNode(4) + + # Run outer_flow + outer_flow.run(shared_storage) + + self.assertEqual(shared_storage['current'], 32) # (5+3)*4=32 + + def test_flow_chaining_flows(self): + """ + Demonstrates chaining one flow to another flow. + flow1: NumberNode(10) -> AddNode(10) # final shared_storage['current'] = 20 + flow2: MultiplyNode(2) # final shared_storage['current'] = 40 + + flow1 >> flow2 means once flow1 finishes, flow2 starts. + + Expected result: (10 + 10) * 2 = 40 + """ + shared_storage = {} + + # flow1 + flow1 = Flow(start_node=NumberNode(10)) + flow1 >> AddNode(10) + + # flow2 + flow2 = Flow(start_node=MultiplyNode(2)) + + # Chain them: flow1 >> flow2 + flow1 >> flow2 + + # Start running from flow1 + flow1.run(shared_storage) + + self.assertEqual(shared_storage['current'], 40) + + def test_flow_with_parameters(self): + """ + Demonstrates passing parameters into a Flow (and retrieved by a Node). + """ + + class ParamNode(Node): + def process(self, shared_storage, prep_result): + # Reads 'level' from the node's (or flow's) parameters + shared_storage['param'] = self.parameters.get('level', 'no param') + + shared_storage = {} + + # Create a flow with a ParamNode + f = Flow(start_node=ParamNode()) + # Set parameters on the flow + f.parameters = {'level': 'Level 1'} + + f.run(shared_storage) + + self.assertEqual(shared_storage['param'], 'Level 1') + + +if __name__ == '__main__': + unittest.main()