diff --git a/.cursor/rules/utility_function/viz.mdc b/.cursor/rules/utility_function/viz.mdc index 0c0b669..560099b 100644 --- a/.cursor/rules/utility_function/viz.mdc +++ b/.cursor/rules/utility_function/viz.mdc @@ -1,4 +1,9 @@ --- +description: +globs: +alwaysApply: false +--- +--- description: Guidelines for using PocketFlow, Utility Function, Viz and Debug globs: alwaysApply: false @@ -87,7 +92,146 @@ graph LR end ``` -## 2. Call Stack Debugging +## 2. Interactive D3.js Visualization + +For more complex flows, a static diagram may not be sufficient. We provide a D3.js-based interactive visualization that allows for dragging nodes, showing group boundaries for flows, and connecting flows at their boundaries. + +### Converting Flow to JSON + +First, we convert the PocketFlow graph to JSON format suitable for D3.js: + +```python +def flow_to_json(start): + """Convert a flow to JSON format suitable for D3.js visualization. + + This function walks through the flow graph and builds a structure with: + - nodes: All non-Flow nodes with their group memberships + - links: Connections between nodes within the same group + - group_links: Connections between different groups (for inter-flow connections) + - flows: Flow information for group labeling + """ + nodes = [] + links = [] + group_links = [] # For connections between groups (Flow to Flow) + ids = {} + node_types = {} + flow_nodes = {} # Keep track of flow nodes + ctr = 1 + + # Implementation details... + + # Post-processing: Generate group links based on node connections between different groups + node_groups = {n["id"]: n["group"] for n in nodes} + filtered_links = [] + + # Filter out direct node-to-node connections between different groups + for link in links: + source_id = link["source"] + target_id = link["target"] + source_group = node_groups.get(source_id, 0) + target_group = node_groups.get(target_id, 0) + + if source_group != target_group and source_group > 0 and target_group > 0: + # Create group-to-group links instead of node-to-node links across groups + if not any(gl["source"] == source_group and gl["target"] == target_group + for gl in group_links): + group_links.append({ + "source": source_group, + "target": target_group, + "action": link["action"] + }) + # Skip adding this link to filtered_links - we don't want direct node connections across groups + else: + # Keep links within the same group + filtered_links.append(link) + + return { + "nodes": nodes, + "links": filtered_links, + "group_links": group_links, + "flows": {str(k): v.__class__.__name__ for k, v in flow_nodes.items()}, + } +``` + +### Creating the Visualization + +Then, we generate an HTML file with D3.js visualization: + +```python +def create_d3_visualization(json_data, output_dir="./viz", filename="flow_viz"): + """Create a D3.js visualization from JSON data.""" + # Create output directory + os.makedirs(output_dir, exist_ok=True) + + # Save JSON data to file + json_path = os.path.join(output_dir, f"{filename}.json") + with open(json_path, "w") as f: + json.dump(json_data, f, indent=2) + + # Generate HTML with D3.js visualization + # ...HTML template with D3.js code... + + # Key features implemented in the visualization: + # 1. Nodes can be dragged to reorganize the layout + # 2. Flows are shown as dashed rectangles (groups) + # 3. Inter-group connections shown as dashed lines connecting at group boundaries + # 4. Edge labels show transition actions + + # Write HTML to file + html_path = os.path.join(output_dir, f"{filename}.html") + with open(html_path, "w") as f: + f.write(html_content) + + print(f"Visualization created at {html_path}") + return html_path +``` + +### Convenience Function + +A convenience function to visualize flows: + +```python +def visualize_flow(flow, flow_name): + """Helper function to visualize a flow with both mermaid and D3.js""" + print(f"\n--- {flow_name} Mermaid Diagram ---") + print(build_mermaid(start=flow)) + + print(f"\n--- {flow_name} D3.js Visualization ---") + json_data = flow_to_json(flow) + create_d3_visualization( + json_data, filename=f"{flow_name.lower().replace(' ', '_')}" + ) +``` + +### Usage Example + +```python +from visualize import visualize_flow + +# Create a complex flow with nested subflows +# ...flow definition... + +# Generate visualization +visualize_flow(data_science_flow, "Data Science Flow") +``` + +### Customizing the Visualization + +You can customize the visualization by adjusting the force simulation parameters: + +```javascript +const simulation = d3.forceSimulation(data.nodes) + // Controls the distance between connected nodes + .force("link", d3.forceLink(data.links).id(d => d.id).distance(100)) + // Controls how nodes repel each other - lower values bring nodes closer + .force("charge", d3.forceManyBody().strength(-30)) + // Centers the entire graph in the SVG + .force("center", d3.forceCenter(width / 2, height / 2)) + // Prevents nodes from overlapping - acts like a minimum distance + .force("collide", d3.forceCollide().radius(50)); +``` + +## 3. Call Stack Debugging It would be useful to print the Node call stacks for debugging. This can be achieved by inspecting the runtime call stack: diff --git a/cookbook/pocketflow-visualization-d3/README.md b/cookbook/pocketflow-visualization-d3/README.md new file mode 100644 index 0000000..2788a8d --- /dev/null +++ b/cookbook/pocketflow-visualization-d3/README.md @@ -0,0 +1,131 @@ +# PocketFlow Visualization + +This directory contains tools for visualizing PocketFlow workflow graphs using interactive D3.js visualizations. + +## Overview + +The visualization tools allow you to: + +1. View PocketFlow nodes and flows as an interactive graph +2. See how different flows connect to each other +3. Understand the relationships between nodes within flows + +## Features + +- **Interactive Graph**: Nodes can be dragged to reorganize the layout +- **Group Visualization**: Flows are displayed as groups with dashed borders +- **Inter-Group Links**: Connections between flows are shown as dashed lines connecting group boundaries +- **Action Labels**: Edge labels show the actions that trigger transitions between nodes + +## Requirements + +- Python 3.6 or higher +- Modern web browser (Chrome, Firefox, Edge) for viewing the visualizations + +## Usage + +### 1. Basic Visualization + +To visualize a PocketFlow graph, you can use the `visualize_flow` function in `visualize.py`: + +```python +from visualize import visualize_flow +from your_flow_module import your_flow + +# Generate visualization +visualize_flow(your_flow, "Your Flow Name") +``` + +This will: +1. Print a Mermaid diagram to the console +2. Generate a D3.js visualization in the `./viz` directory + +### 2. Running the Example + +The included example shows an order processing pipeline with payment, inventory, and shipping flows: + +```bash +# Navigate to the directory +cd cookbook/pocketflow-minimal-flow2flow + +# Run the visualization script +python visualize.py +``` + +This will generate visualization files in the `./viz` directory. + +### 3. Viewing the Visualization + +After running the script: + +1. Host with + ``` + cd ./viz/ + ``` + +2. Interact with the visualization: + - **Drag nodes** to reorganize + - **Hover over nodes** to see node names + - **Observe connections** between nodes and flows + +## Customizing the Visualization + +### Adjusting Layout Parameters + +You can adjust the force simulation parameters in `visualize.py` to change how nodes and groups are positioned: + +```javascript +// Create a force simulation +const simulation = d3.forceSimulation(data.nodes) + // Controls the distance between connected nodes + .force("link", d3.forceLink(data.links).id(d => d.id).distance(100)) + // Controls how nodes repel each other - lower values bring nodes closer + .force("charge", d3.forceManyBody().strength(-30)) + // Centers the entire graph in the SVG + .force("center", d3.forceCenter(width / 2, height / 2)) + // Prevents nodes from overlapping - acts like a minimum distance + .force("collide", d3.forceCollide().radius(50)); +``` + +### Styling + +Adjust the CSS styles in the HTML template inside `create_d3_visualization` function to change colors, shapes, and other visual properties. + +## How It Works + +The visualization process consists of three main steps: + +1. **Flow to JSON Conversion**: The `flow_to_json` function traverses the PocketFlow graph and converts it to a structure with nodes, links, and group information. + +2. **D3.js Visualization**: The JSON data is used to create an interactive D3.js visualization with: + - Nodes represented as circles + - Flows represented as dashed rectangles containing nodes + - Links showing connections within and between flows + +3. **Group Boundary Connections**: The visualization calculates intersection points with group boundaries to ensure inter-group links connect at the borders rather than centers. + +## Extending the Visualization + +You can extend the visualization tools by: + +1. Adding new node shapes +2. Implementing additional layout algorithms +3. Adding tooltips with more detailed information +4. Creating animation for flow execution + +## Troubleshooting + +If you encounter any issues: + +- Make sure your flow objects are properly constructed with nodes connected correctly +- Check the browser console for any JavaScript errors +- Verify that the generated JSON data structure matches what you expect + +## Example Output + +The visualization displays: +- Payment processing flow nodes +- Inventory management flow nodes +- Shipping flow nodes +- Group boundaries around each flow +- Connections between flows (Payment → Inventory → Shipping) diff --git a/cookbook/pocketflow-visualization-d3/async_flow.py b/cookbook/pocketflow-visualization-d3/async_flow.py new file mode 100644 index 0000000..19c89be --- /dev/null +++ b/cookbook/pocketflow-visualization-d3/async_flow.py @@ -0,0 +1,165 @@ +from pocketflow import AsyncNode, AsyncFlow +import asyncio + + +# Define Payment Nodes +class ValidatePayment(AsyncNode): + async def exec_async(self, prep_res): + print("1.1.Validating payment...") + return "Payment validated successfully" + + async def post_async(self, shared, prep_res, exec_res): + shared["payment_status"] = exec_res + return "default" + + +class ProcessPayment(AsyncNode): + async def exec_async(self, prep_res): + print("1.2.Processing payment...") + return "Payment processed successfully" + + async def post_async(self, shared, prep_res, exec_res): + shared["payment_result"] = exec_res + return "default" + + +class PaymentConfirmation(AsyncNode): + async def exec_async(self, prep_res): + print("1.3.Confirming payment...") + return "Payment confirmed" + + async def post_async(self, shared, prep_res, exec_res): + shared["payment_confirmation"] = exec_res + return "default" + + +# Define Inventory Nodes +class CheckStock(AsyncNode): + async def exec_async(self, prep_res): + print("2.1.Checking inventory stock...") + return "Stock available" + + async def post_async(self, shared, prep_res, exec_res): + shared["stock_status"] = exec_res + return "default" + + +class ReserveItems(AsyncNode): + async def exec_async(self, prep_res): + print("2.2.Reserving items...") + return "Items reserved" + + async def post_async(self, shared, prep_res, exec_res): + shared["reservation_status"] = exec_res + return "default" + + +class UpdateInventory(AsyncNode): + async def exec_async(self, prep_res): + print("2.3. Updating inventory...") + return "Inventory updated" + + async def post_async(self, shared, prep_res, exec_res): + shared["inventory_update"] = exec_res + return "default" + + +# Define Shipping Nodes +class CreateLabel(AsyncNode): + async def exec_async(self, prep_res): + print("3.1 Creating shipping label...") + return "Shipping label created" + + async def post_async(self, shared, prep_res, exec_res): + shared["shipping_label"] = exec_res + return "default" + + +class AssignCarrier(AsyncNode): + async def exec_async(self, prep_res): + print("3.2 Assigning carrier...") + return "Carrier assigned" + + async def post_async(self, shared, prep_res, exec_res): + shared["carrier"] = exec_res + return "default" + + +class SchedulePickup(AsyncNode): + async def exec_async(self, prep_res): + print("3.3 Scheduling pickup...") + return "Pickup scheduled" + + async def post_async(self, shared, prep_res, exec_res): + shared["pickup_status"] = exec_res + return "default" + + +# Create node instances +validate_payment = ValidatePayment() +process_payment = ProcessPayment() +payment_confirmation = PaymentConfirmation() + +check_stock = CheckStock() +reserve_items = ReserveItems() +update_inventory = UpdateInventory() + +create_label = CreateLabel() +assign_carrier = AssignCarrier() +schedule_pickup = SchedulePickup() + +# Payment processing sub-flow +validate_payment >> process_payment >> payment_confirmation +payment_flow = AsyncFlow(start=validate_payment) + +# Inventory sub-flow +check_stock >> reserve_items >> update_inventory +inventory_flow = AsyncFlow(start=check_stock) + +# Shipping sub-flow +create_label >> assign_carrier >> schedule_pickup +shipping_flow = AsyncFlow(start=create_label) + +# Connect the flows into a main order pipeline +payment_flow >> inventory_flow >> shipping_flow +# payment_flow >> inventory_flow >> create_label +# payment_flow >> inventory_flow >> assign_carrier + + +# Create the master flow +class OrderFlow(AsyncFlow): + pass + + +order_pipeline = OrderFlow(start=payment_flow) + +# Create shared data structure +shared_data = { + "order_id": "ORD-12345", + "customer": "John Doe", + "items": [ + {"id": "ITEM-001", "name": "Smartphone", "price": 999.99, "quantity": 1}, + {"id": "ITEM-002", "name": "Phone case", "price": 29.99, "quantity": 1}, + ], + "shipping_address": { + "street": "123 Main St", + "city": "Anytown", + "state": "CA", + "zip": "12345", + }, +} + + +# Run the entire pipeline asynchronously +async def main(): + await order_pipeline.run_async(shared_data) + + # Print final status + print("\nOrder processing completed!") + print(f"Payment: {shared_data.get('payment_confirmation')}") + print(f"Inventory: {shared_data.get('inventory_update')}") + print(f"Shipping: {shared_data.get('pickup_status')}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/cookbook/pocketflow-visualization-d3/visualize.py b/cookbook/pocketflow-visualization-d3/visualize.py new file mode 100644 index 0000000..937a36b --- /dev/null +++ b/cookbook/pocketflow-visualization-d3/visualize.py @@ -0,0 +1,932 @@ +# %% + +import json +import os +import http.server +import socketserver +import threading +import webbrowser +import time +import socket +import importlib +import sys +from pathlib import Path +from typing import Any, Optional, Tuple, Union + +from pocketflow import Flow + +from async_flow import order_pipeline + + +def build_mermaid(start): + ids, visited, lines = {}, set(), ["graph LR"] + ctr = 1 + + def get_id(n): + nonlocal ctr + return ( + ids[n] if n in ids else (ids.setdefault(n, f"N{ctr}"), (ctr := ctr + 1))[0] + ) + + def link(a, b): + lines.append(f" {a} --> {b}") + + def walk(node, parent=None): + if node in visited: + return parent and link(parent, get_id(node)) + visited.add(node) + if isinstance(node, Flow): + node.start_node and parent and link(parent, get_id(node.start_node)) + lines.append( + f"\n subgraph sub_flow_{get_id(node)}[{type(node).__name__}]" + ) + node.start_node and walk(node.start_node) + for nxt in node.successors.values(): + node.start_node and walk(nxt, get_id(node.start_node)) or ( + parent and link(parent, get_id(nxt)) + ) or walk(nxt) + lines.append(" end\n") + else: + lines.append(f" {(nid := get_id(node))}['{type(node).__name__}']") + parent and link(parent, nid) + [walk(nxt, nid) for nxt in node.successors.values()] + + walk(start) + return "\n".join(lines) + + +def flow_to_json(start): + """Convert a flow to JSON format suitable for D3.js visualization. + + This function walks through the flow graph and builds a structure with: + - nodes: All non-Flow nodes with their group memberships + - links: Connections between nodes within the same group + - group_links: Connections between different groups (for inter-flow connections) + - flows: Flow information for group labeling + + Returns: + dict: A JSON-serializable dictionary with 'nodes' and 'links' arrays. + """ + nodes = [] + links = [] + group_links = [] # For connections between groups (Flow to Flow) + ids = {} + node_types = {} + flow_nodes = {} # Keep track of flow nodes + ctr = 1 + + def get_id(n): + nonlocal ctr + if n not in ids: + ids[n] = ctr + node_types[ctr] = type(n).__name__ + if isinstance(n, Flow): + flow_nodes[ctr] = n # Store flow reference + ctr += 1 + return ids[n] + + def walk(node, parent=None, group=None, parent_group=None, action=None): + """Recursively walk the flow graph to build the visualization data. + + Args: + node: Current node being processed + parent: ID of the parent node that connects to this node + group: Group (Flow) ID this node belongs to + parent_group: Group ID of the parent node + action: Action label on the edge from parent to this node + """ + node_id = get_id(node) + + # Add node if not already in nodes list and not a Flow + if not any(n["id"] == node_id for n in nodes) and not isinstance(node, Flow): + node_data = { + "id": node_id, + "name": node_types[node_id], + "group": group or 0, # Default group + } + nodes.append(node_data) + + # Add link from parent if exists + if parent and not isinstance(node, Flow): + links.append( + {"source": parent, "target": node_id, "action": action or "default"} + ) + + # Process different types of nodes + if isinstance(node, Flow): + # This is a Flow node - it becomes a group container + flow_group = node_id # Use flow's ID as group for contained nodes + + # Add a group-to-group link if this flow has a parent group + # This creates connections between nested flows + if parent_group is not None and parent_group != flow_group: + # Check if this link already exists + if not any( + l["source"] == parent_group and l["target"] == flow_group + for l in group_links + ): + group_links.append( + { + "source": parent_group, + "target": flow_group, + "action": action or "default", + } + ) + + if node.start_node: + # Process the start node of this flow + walk(node.start_node, parent, flow_group, parent_group, action) + + # Process successors of the flow's start node + for next_action, nxt in node.successors.items(): + walk( + nxt, + get_id(node.start_node), + flow_group, + parent_group, + next_action, + ) + else: + # Process successors for regular nodes + for next_action, nxt in node.successors.items(): + if isinstance(nxt, Flow): + # This node connects to a flow - track the group relationship + flow_group_id = get_id(nxt) + walk(nxt, node_id, None, group, next_action) + else: + # Regular node-to-node connection + walk(nxt, node_id, group, parent_group, next_action) + + # Start the traversal + walk(start) + + # Post-processing: Generate group links based on node connections between different groups + # This ensures that when nodes in different groups are connected, we show a group-to-group + # link rather than a direct node-to-node link + node_groups = {n["id"]: n["group"] for n in nodes} + filtered_links = [] + + for link in links: + source_id = link["source"] + target_id = link["target"] + source_group = node_groups.get(source_id, 0) + target_group = node_groups.get(target_id, 0) + + # If source and target are in different groups and both groups are valid + if source_group != target_group and source_group > 0 and target_group > 0: + # Add to group links if not already there + # This creates the dashed lines connecting group boxes + if not any( + gl["source"] == source_group and gl["target"] == target_group + for gl in group_links + ): + group_links.append( + { + "source": source_group, + "target": target_group, + "action": link["action"], + } + ) + # Skip adding this link to filtered_links - we don't want direct node connections across groups + else: + # Keep links within the same group + filtered_links.append(link) + + return { + "nodes": nodes, + "links": filtered_links, # Use filtered links instead of all links + "group_links": group_links, + "flows": {str(k): v.__class__.__name__ for k, v in flow_nodes.items()}, + } + + +def create_d3_visualization( + json_data, + output_dir="./viz", + filename="flow_viz", + html_title="PocketFlow Visualization", +): + """Create a D3.js visualization from JSON data. + + Args: + json_data: The JSON data for the visualization + output_dir: Directory to save the files + filename: Base filename (without extension) + html_title: Title for the HTML page + + Returns: + str: Path to the HTML file + """ + # Create output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + # Save JSON data to file + json_path = os.path.join(output_dir, f"{filename}.json") + with open(json_path, "w") as f: + json.dump(json_data, f, indent=2) + + # Create HTML file with D3.js visualization + html_content = r""" + +
+ +