From 129b9b07c7bd9b87fe55c909930311b33983b733 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A3=AB=E6=9C=AC?= Date: Wed, 13 Aug 2025 23:52:23 +0800 Subject: [PATCH] FIX: RecursionError when loop flow --- .../async_loop_flow.py | 73 +++++++++++++++ .../pocketflow-visualization/visualize.py | 93 +++++++++++++++++-- 2 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 cookbook/pocketflow-visualization/async_loop_flow.py diff --git a/cookbook/pocketflow-visualization/async_loop_flow.py b/cookbook/pocketflow-visualization/async_loop_flow.py new file mode 100644 index 0000000..23a0ea3 --- /dev/null +++ b/cookbook/pocketflow-visualization/async_loop_flow.py @@ -0,0 +1,73 @@ +from async_flow import * +from pocketflow import Flow, AsyncParallelBatchNode, Node + +# Create node instances +validate_payment = ValidatePayment() +process_payment = ProcessPayment() +payment_confirmation = PaymentConfirmation() + +check_stock = CheckStock() +reserve_items = ReserveItems() +update_inventory = UpdateInventory() + +create_label = CreateLabel() +assign_carrier = AssignCarrier() +schedule_pickup = SchedulePickup() + +# Payment processing sub-flow +validate_payment >> process_payment +validate_payment - "out_of_stock" >> validate_payment # 循环重试 +process_payment - 'something fail' >> validate_payment +process_payment - 'pass' >> payment_confirmation +payment_flow = AsyncFlow(start=validate_payment) + +# Inventory sub-flow +check_stock >> reserve_items >> update_inventory +inventory_flow = AsyncFlow(start=check_stock) + +# Shipping sub-flow +create_label >> assign_carrier >> schedule_pickup +shipping_flow = AsyncFlow(start=create_label) + +# Connect the flows into a main order pipeline +payment_flow >> inventory_flow >> shipping_flow +# payment_flow >> inventory_flow >> create_label +# payment_flow >> inventory_flow >> assign_carrier + + +# Create the master flow +class OrderFlow(AsyncFlow): + pass + +order_pipeline = OrderFlow(start=payment_flow) + +# Create shared data structure +shared_data = { + "order_id": "ORD-12345", + "customer": "John Doe", + "items": [ + {"id": "ITEM-001", "name": "Smartphone", "price": 999.99, "quantity": 1}, + {"id": "ITEM-002", "name": "Phone case", "price": 29.99, "quantity": 1}, + ], + "shipping_address": { + "street": "123 Main St", + "city": "Anytown", + "state": "CA", + "zip": "12345", + }, +} + + +# Run the entire pipeline asynchronously +async def main(): + await order_pipeline.run_async(shared_data) + + # Print final status + print("\nOrder processing completed!") + print(f"Payment: {shared_data.get('payment_confirmation')}") + print(f"Inventory: {shared_data.get('inventory_update')}") + print(f"Shipping: {shared_data.get('pickup_status')}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/cookbook/pocketflow-visualization/visualize.py b/cookbook/pocketflow-visualization/visualize.py index 6caf215..fe7fbb2 100644 --- a/cookbook/pocketflow-visualization/visualize.py +++ b/cookbook/pocketflow-visualization/visualize.py @@ -77,6 +77,7 @@ def flow_to_json(start): node_types = {} flow_nodes = {} # Keep track of flow nodes ctr = 1 + visited = set() def get_id(n): nonlocal ctr @@ -99,6 +100,9 @@ def flow_to_json(start): action: Action label on the edge from parent to this node """ node_id = get_id(node) + if (node_id, action) in visited: + return + visited.add((node_id, action)) # Add node if not already in nodes list and not a Flow if not any(n["id"] == node_id for n in nodes) and not isinstance(node, Flow): @@ -552,8 +556,38 @@ def create_d3_visualization( // Update positions on each tick simulation.on("tick", () => { - // Update links with straight lines + // Update links with curved paths for bidirectional connections link.attr("d", d => { + // Handle self-referencing links with a water-drop shape + if (d.source === d.target) { + const nodeX = d.source.x; + const nodeY = d.source.y; + const offsetX = 40; + const offsetY = 10; + const controlOffset = 50; + + // Create a water-drop shaped path + return `M ${nodeX},${nodeY - 5} + C ${nodeX + controlOffset},${nodeY - 30} + ${nodeX + offsetX},${nodeY + offsetY} + ${nodeX},${nodeY}`; + } + + // Check if there's a reverse connection + const isReverse = data.links.some(l => + l.source === d.target && l.target === d.source + ); + + // If it's part of a bidirectional connection, curve the path + if (isReverse) { + const dx = d.target.x - d.source.x; + const dy = d.target.y - d.source.y; + const dr = Math.sqrt(dx * dx + dy * dy) * 0.9; + + return `M${d.source.x},${d.source.y}A${dr},${dr} 0 0,1 ${d.target.x},${d.target.y}`; + } + + // For unidirectional connections, use straight lines return `M${d.source.x},${d.source.y} L${d.target.x},${d.target.y}`; }); @@ -567,10 +601,57 @@ def create_d3_visualization( .attr("x", d => d.x) .attr("y", d => d.y); - // Position link labels at midpoint - linkLabel - .attr("x", d => (d.source.x + d.target.x) / 2) - .attr("y", d => (d.source.y + d.target.y) / 2); + // Position link labels with offset for bidirectional connections + linkLabel.attr("x", d => { + // Handle self-referencing links + if (d.source === d.target) { + return d.source.x + 30; + } + + // Check if there's a reverse connection + const reverseLink = data.links.find(l => + l.source === d.target && l.target === d.source + ); + + // If it's part of a bidirectional connection, offset the label + if (reverseLink) { + const dx = d.target.x - d.source.x; + const dy = d.target.y - d.source.y; + // Calculate perpendicular offset + const length = Math.sqrt(dx * dx + dy * dy); + const offsetX = -dy / length * 10; // Perpendicular offset + + return (d.source.x + d.target.x) / 2 + offsetX; + } + + // For unidirectional connections, use midpoint + return (d.source.x + d.target.x) / 2; + }) + .attr("y", d => { + // Handle self-referencing links + if (d.source === d.target) { + return d.source.y; + } + + // Check if there's a reverse connection + const reverseLink = data.links.find(l => + l.source === d.target && l.target === d.source + ); + + // If it's part of a bidirectional connection, offset the label + if (reverseLink) { + const dx = d.target.x - d.source.x; + const dy = d.target.y - d.source.y; + // Calculate perpendicular offset + const length = Math.sqrt(dx * dx + dy * dy); + const offsetY = dx / length * 10; // Perpendicular offset + + return (d.source.y + d.target.y) / 2 + offsetY; + } + + // For unidirectional connections, use midpoint + return (d.source.y + d.target.y) / 2; + }); // Update group containers groupContainers.each(function(d) { @@ -893,7 +974,7 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Visualize a PocketFlow flow") parser.add_argument( - "--module", default="async_flow", help="Module containing the flow" + "--module", default="async_loop_flow", help="Module containing the flow" ) parser.add_argument( "--flow", default="order_pipeline", help="Flow variable name in the module"