diff --git a/cookbook/README.md b/cookbook/README.md index d77fb5d..1a75946 100644 --- a/cookbook/README.md +++ b/cookbook/README.md @@ -20,6 +20,7 @@ | [Thinking](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | ★☆☆
*Beginner* | Solve complex reasoning problems through Chain-of-Thought | | [Memory](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory) | ★☆☆
*Beginner* | A chat bot with short-term and long-term memory | | [MCP](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-mcp) | ★☆☆
*Beginner* | Agent using Model Context Protocol for numerical operations | +| [Tracing](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-tracing) | ★☆☆
*Beginner* | Trace and visualize the execution of your flow | diff --git a/cookbook/pocketflow-tracing/.env.example b/cookbook/pocketflow-tracing/.env.example new file mode 100644 index 0000000..21f8686 --- /dev/null +++ b/cookbook/pocketflow-tracing/.env.example @@ -0,0 +1,20 @@ +# PocketFlow Tracing Configuration Template +# Copy this file to .env and replace the placeholder values with your actual Langfuse credentials + +# Required Langfuse configuration +LANGFUSE_SECRET_KEY=your-langfuse-secret-key +LANGFUSE_PUBLIC_KEY=your-langfuse-public-key +LANGFUSE_HOST=your-langfuse-host-url + +# Optional tracing configuration +POCKETFLOW_TRACING_DEBUG=true +POCKETFLOW_TRACE_INPUTS=true +POCKETFLOW_TRACE_OUTPUTS=true +POCKETFLOW_TRACE_PREP=true +POCKETFLOW_TRACE_EXEC=true +POCKETFLOW_TRACE_POST=true +POCKETFLOW_TRACE_ERRORS=true + +# Optional session/user tracking +POCKETFLOW_SESSION_ID=your-session-id +POCKETFLOW_USER_ID=your-user-id diff --git a/cookbook/pocketflow-tracing/README.md b/cookbook/pocketflow-tracing/README.md new file mode 100644 index 0000000..e5c4a79 --- /dev/null +++ b/cookbook/pocketflow-tracing/README.md @@ -0,0 +1,290 @@ +# PocketFlow Tracing with Langfuse + +This cookbook provides comprehensive observability for PocketFlow workflows using [Langfuse](https://langfuse.com/) as the tracing backend. With minimal code changes (just adding a decorator), you can automatically trace all node executions, inputs, outputs, and errors in your PocketFlow workflows. + +## 🎯 Features + +- **Automatic Tracing**: Trace entire flows with a single decorator +- **Node-Level Observability**: Automatically trace `prep`, `exec`, and `post` phases of each node +- **Input/Output Tracking**: Capture all data flowing through your workflow +- **Error Tracking**: Automatically capture and trace exceptions +- **Async Support**: Full support for AsyncFlow and AsyncNode +- **Minimal Code Changes**: Just add `@trace_flow()` to your flow classes +- **Langfuse Integration**: Leverage Langfuse's powerful observability platform + +## 🚀 Quick Start + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 2. Environment Setup + +Copy the example environment file and configure your Langfuse credentials: + +```bash +cp .env.example .env +``` + +Then edit the `.env` file with your actual Langfuse configuration: + +```env +LANGFUSE_SECRET_KEY=your-langfuse-secret-key +LANGFUSE_PUBLIC_KEY=your-langfuse-public-key +LANGFUSE_HOST=your-langfuse-host-url +POCKETFLOW_TRACING_DEBUG=true +``` + +**Note**: Replace the placeholder values with your actual Langfuse credentials and host URL. + +### 3. Basic Usage + +```python +from pocketflow import Node, Flow +from tracing import trace_flow + +class MyNode(Node): + def prep(self, shared): + return shared["input"] + + def exec(self, data): + return f"Processed: {data}" + + def post(self, shared, prep_res, exec_res): + shared["output"] = exec_res + return "default" + +@trace_flow() # 🎉 That's it! Your flow is now traced +class MyFlow(Flow): + def __init__(self): + super().__init__(start=MyNode()) + +# Run your flow - tracing happens automatically +flow = MyFlow() +shared = {"input": "Hello World"} +flow.run(shared) +``` + +## 📊 What Gets Traced + +When you apply the `@trace_flow()` decorator, the system automatically traces: + +### Flow Level +- **Flow Start/End**: Overall execution time and status +- **Input Data**: Initial shared state when flow starts +- **Output Data**: Final shared state when flow completes +- **Errors**: Any exceptions that occur during flow execution + +### Node Level +For each node in your flow, the system traces: + +- **prep() Phase**: + - Input: `shared` data + - Output: `prep_res` returned by prep method + - Execution time and any errors + +- **exec() Phase**: + - Input: `prep_res` from prep phase + - Output: `exec_res` returned by exec method + - Execution time and any errors + - Retry attempts (if configured) + +- **post() Phase**: + - Input: `shared`, `prep_res`, `exec_res` + - Output: Action string returned + - Execution time and any errors + +## 🔧 Configuration Options + +### Basic Configuration + +```python +from tracing import trace_flow, TracingConfig + +# Use environment variables (default) +@trace_flow() +class MyFlow(Flow): + pass + +# Custom flow name +@trace_flow(flow_name="CustomFlowName") +class MyFlow(Flow): + pass + +# Custom session and user IDs +@trace_flow(session_id="session-123", user_id="user-456") +class MyFlow(Flow): + pass +``` + +### Advanced Configuration + +```python +from tracing import TracingConfig + +# Create custom configuration +config = TracingConfig( + langfuse_secret_key="your-secret-key", + langfuse_public_key="your-public-key", + langfuse_host="https://your-langfuse-instance.com", + debug=True, + trace_inputs=True, + trace_outputs=True, + trace_errors=True +) + +@trace_flow(config=config) +class MyFlow(Flow): + pass +``` + +## 📁 Examples + +### Basic Synchronous Flow +See `examples/basic_example.py` for a complete example of tracing a simple synchronous flow. + +```bash +cd examples +python basic_example.py +``` + +### Asynchronous Flow +See `examples/async_example.py` for tracing AsyncFlow and AsyncNode. + +```bash +cd examples +python async_example.py +``` + +## 🔍 Viewing Traces + +After running your traced flows, visit your Langfuse dashboard to view the traces: + +**Dashboard URL**: Use the URL you configured in `LANGFUSE_HOST` environment variable + +In the dashboard you'll see: +- **Traces**: One trace per flow execution +- **Spans**: Individual node phases (prep, exec, post) +- **Input/Output Data**: All data flowing through your workflow +- **Performance Metrics**: Execution times for each phase +- **Error Details**: Stack traces and error messages + +The tracings in examples. +![alt text](screenshots/chrome_2025-06-27_12-05-28.png) + +Detailed tracing for a node. +![langfuse](screenshots/chrome_2025-06-27_12-07-56.png) + +## 🛠️ Advanced Usage + +### Custom Tracer Configuration + +```python +from tracing import TracingConfig, LangfuseTracer + +# Create custom configuration +config = TracingConfig.from_env() +config.debug = True + +# Use tracer directly (for advanced use cases) +tracer = LangfuseTracer(config) +``` + +### Environment Variables + +You can customize tracing behavior with these environment variables: + +```env +# Required Langfuse configuration +LANGFUSE_SECRET_KEY=your-secret-key +LANGFUSE_PUBLIC_KEY=your-public-key +LANGFUSE_HOST=your-langfuse-host + +# Optional tracing configuration +POCKETFLOW_TRACING_DEBUG=true +POCKETFLOW_TRACE_INPUTS=true +POCKETFLOW_TRACE_OUTPUTS=true +POCKETFLOW_TRACE_PREP=true +POCKETFLOW_TRACE_EXEC=true +POCKETFLOW_TRACE_POST=true +POCKETFLOW_TRACE_ERRORS=true + +# Optional session/user tracking +POCKETFLOW_SESSION_ID=your-session-id +POCKETFLOW_USER_ID=your-user-id +``` + +## 🐛 Troubleshooting + +### Common Issues + +1. **"langfuse package not installed"** + ```bash + pip install langfuse + ``` + +2. **"Langfuse client initialization failed"** + - Check your `.env` file configuration + - Verify Langfuse server is running at the specified host + - Check network connectivity + +3. **"No traces appearing in dashboard"** + - Ensure `POCKETFLOW_TRACING_DEBUG=true` to see debug output + - Check that your flow is actually being executed + - Verify Langfuse credentials are correct + +### Debug Mode + +Enable debug mode to see detailed tracing information: + +```env +POCKETFLOW_TRACING_DEBUG=true +``` + +This will print detailed information about: +- Langfuse client initialization +- Trace and span creation +- Data serialization +- Error messages + +## 📚 API Reference + +### `@trace_flow()` + +Decorator to add Langfuse tracing to PocketFlow flows. + +**Parameters:** +- `config` (TracingConfig, optional): Custom configuration. If None, loads from environment. +- `flow_name` (str, optional): Custom name for the flow. If None, uses class name. +- `session_id` (str, optional): Session ID for grouping related traces. +- `user_id` (str, optional): User ID for the trace. + +### `TracingConfig` + +Configuration class for tracing settings. + +**Methods:** +- `TracingConfig.from_env()`: Create config from environment variables +- `validate()`: Check if configuration is valid +- `to_langfuse_kwargs()`: Convert to Langfuse client kwargs + +### `LangfuseTracer` + +Core tracer class for Langfuse integration. + +**Methods:** +- `start_trace()`: Start a new trace +- `end_trace()`: End the current trace +- `start_node_span()`: Start a span for node execution +- `end_node_span()`: End a node execution span +- `flush()`: Flush pending traces to Langfuse + +## 🤝 Contributing + +This cookbook is designed to be a starting point for PocketFlow observability. Feel free to extend and customize it for your specific needs! + +## 📄 License + +This cookbook follows the same license as PocketFlow. diff --git a/cookbook/pocketflow-tracing/examples/async_example.py b/cookbook/pocketflow-tracing/examples/async_example.py new file mode 100644 index 0000000..5d26962 --- /dev/null +++ b/cookbook/pocketflow-tracing/examples/async_example.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +Async example demonstrating PocketFlow tracing with Langfuse. + +This example shows how to use the @trace_flow decorator with AsyncFlow +and AsyncNode to trace asynchronous workflows. +""" + +import asyncio +import sys +import os +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# Add parent directory to path to import pocketflow and tracing +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..")) +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from pocketflow import AsyncNode, AsyncFlow +from tracing import trace_flow, TracingConfig + + +class AsyncDataFetchNode(AsyncNode): + """An async node that simulates fetching data.""" + + async def prep_async(self, shared): + """Extract the query from shared data.""" + query = shared.get("query", "default") + return query + + async def exec_async(self, query): + """Simulate async data fetching.""" + print(f"🔍 Fetching data for query: {query}") + + # Simulate async operation + await asyncio.sleep(1) + + # Return mock data + data = { + "query": query, + "results": [f"Result {i} for {query}" for i in range(3)], + "timestamp": "2024-01-01T00:00:00Z", + } + return data + + async def post_async(self, shared, prep_res, exec_res): + """Store the fetched data.""" + shared["fetched_data"] = exec_res + return "process" + + +class AsyncDataProcessNode(AsyncNode): + """An async node that processes the fetched data.""" + + async def prep_async(self, shared): + """Get the fetched data.""" + return shared.get("fetched_data", {}) + + async def exec_async(self, data): + """Process the data asynchronously.""" + print("⚙️ Processing fetched data...") + + # Simulate async processing + await asyncio.sleep(0.5) + + # Process the results + processed_results = [] + for result in data.get("results", []): + processed_results.append(f"PROCESSED: {result}") + + return { + "original_query": data.get("query"), + "processed_results": processed_results, + "result_count": len(processed_results), + } + + async def post_async(self, shared, prep_res, exec_res): + """Store the processed data.""" + shared["processed_data"] = exec_res + return "default" + + +@trace_flow(flow_name="AsyncDataProcessingFlow") +class AsyncDataProcessingFlow(AsyncFlow): + """An async flow that fetches and processes data.""" + + def __init__(self): + # Create async nodes + fetch_node = AsyncDataFetchNode() + process_node = AsyncDataProcessNode() + + # Connect nodes + fetch_node - "process" >> process_node + + # Initialize async flow + super().__init__(start=fetch_node) + + +async def main(): + """Run the async tracing example.""" + print("🚀 Starting PocketFlow Async Tracing Example") + print("=" * 50) + + # Create the async flow + flow = AsyncDataProcessingFlow() + + # Prepare shared data + shared = {"query": "machine learning tutorials"} + + print(f"📥 Input: {shared}") + + # Run the async flow (this will be automatically traced) + try: + result = await flow.run_async(shared) + print(f"📤 Output: {shared}") + print(f"🎯 Result: {result}") + print("✅ Async flow completed successfully!") + + # Print the processed data + if "processed_data" in shared: + processed = shared["processed_data"] + print( + f"🎉 Processed {processed['result_count']} results for query: {processed['original_query']}" + ) + for result in processed["processed_results"]: + print(f" - {result}") + + except Exception as e: + print(f"❌ Async flow failed with error: {e}") + raise + + print("\n📊 Check your Langfuse dashboard to see the async trace!") + langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host") + print(f" Dashboard URL: {langfuse_host}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/cookbook/pocketflow-tracing/examples/basic_example.py b/cookbook/pocketflow-tracing/examples/basic_example.py new file mode 100644 index 0000000..587f76d --- /dev/null +++ b/cookbook/pocketflow-tracing/examples/basic_example.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +""" +Basic example demonstrating PocketFlow tracing with Langfuse. + +This example shows how to use the @trace_flow decorator to automatically +trace a simple PocketFlow workflow. +""" + +import sys +import os +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# Add parent directory to path to import pocketflow and tracing +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..")) +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from pocketflow import Node, Flow +from tracing import trace_flow, TracingConfig + + +class GreetingNode(Node): + """A simple node that creates a greeting message.""" + + def prep(self, shared): + """Extract the name from shared data.""" + name = shared.get("name", "World") + return name + + def exec(self, name): + """Create a greeting message.""" + greeting = f"Hello, {name}!" + return greeting + + def post(self, shared, prep_res, exec_res): + """Store the greeting in shared data.""" + shared["greeting"] = exec_res + return "default" + + +class UppercaseNode(Node): + """A node that converts the greeting to uppercase.""" + + def prep(self, shared): + """Get the greeting from shared data.""" + return shared.get("greeting", "") + + def exec(self, greeting): + """Convert to uppercase.""" + return greeting.upper() + + def post(self, shared, prep_res, exec_res): + """Store the uppercase greeting.""" + shared["uppercase_greeting"] = exec_res + return "default" + + +@trace_flow(flow_name="BasicGreetingFlow") +class BasicGreetingFlow(Flow): + """A simple flow that creates and processes a greeting.""" + + def __init__(self): + # Create nodes + greeting_node = GreetingNode() + uppercase_node = UppercaseNode() + + # Connect nodes + greeting_node >> uppercase_node + + # Initialize flow + super().__init__(start=greeting_node) + + +def main(): + """Run the basic tracing example.""" + print("🚀 Starting PocketFlow Tracing Basic Example") + print("=" * 50) + + # Create the flow + flow = BasicGreetingFlow() + + # Prepare shared data + shared = {"name": "PocketFlow User"} + + print(f"📥 Input: {shared}") + + # Run the flow (this will be automatically traced) + try: + result = flow.run(shared) + print(f"📤 Output: {shared}") + print(f"🎯 Result: {result}") + print("✅ Flow completed successfully!") + + # Print the final greeting + if "uppercase_greeting" in shared: + print(f"🎉 Final greeting: {shared['uppercase_greeting']}") + + except Exception as e: + print(f"❌ Flow failed with error: {e}") + raise + + print("\n📊 Check your Langfuse dashboard to see the trace!") + langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host") + print(f" Dashboard URL: {langfuse_host}") + + +if __name__ == "__main__": + main() diff --git a/cookbook/pocketflow-tracing/requirements.txt b/cookbook/pocketflow-tracing/requirements.txt new file mode 100644 index 0000000..d4684e7 --- /dev/null +++ b/cookbook/pocketflow-tracing/requirements.txt @@ -0,0 +1,6 @@ +# Core dependencies for PocketFlow tracing +langfuse>=2.0.0,<3.0.0 # v2 low level SDK compatible with Langfuse servers +python-dotenv>=1.0.0 + +# Optional dependencies for enhanced functionality +pydantic>=2.0.0 # For data validation and serialization diff --git a/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-05-28.png b/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-05-28.png new file mode 100644 index 0000000..842f183 Binary files /dev/null and b/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-05-28.png differ diff --git a/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-07-56.png b/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-07-56.png new file mode 100644 index 0000000..7e26de2 Binary files /dev/null and b/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-07-56.png differ diff --git a/cookbook/pocketflow-tracing/setup.py b/cookbook/pocketflow-tracing/setup.py new file mode 100644 index 0000000..6f61cc4 --- /dev/null +++ b/cookbook/pocketflow-tracing/setup.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +""" +Setup script for PocketFlow Tracing cookbook. + +This script helps install dependencies and verify the setup. +""" + +import subprocess +import sys +import os + + +def install_dependencies(): + """Install required dependencies.""" + print("📦 Installing dependencies...") + try: + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"] + ) + print("✅ Dependencies installed successfully!") + return True + except subprocess.CalledProcessError as e: + print(f"❌ Failed to install dependencies: {e}") + return False + + +def verify_setup(): + """Verify that the setup is working.""" + print("🔍 Verifying setup...") + try: + # Try to import the tracing module + from tracing import trace_flow, TracingConfig + + print("✅ Tracing module imported successfully!") + + # Try to load configuration + config = TracingConfig.from_env() + if config.validate(): + print("✅ Configuration is valid!") + else: + print("⚠️ Configuration validation failed - check your .env file") + + return True + except ImportError as e: + print(f"❌ Failed to import tracing module: {e}") + return False + except Exception as e: + print(f"❌ Setup verification failed: {e}") + return False + + +def main(): + """Main setup function.""" + print("🚀 PocketFlow Tracing Setup") + print("=" * 40) + + # Check if we're in the right directory + if not os.path.exists("requirements.txt"): + print( + "❌ requirements.txt not found. Please run this script from the pocketflow-tracing directory." + ) + sys.exit(1) + + # Install dependencies + if not install_dependencies(): + sys.exit(1) + + # Verify setup + if not verify_setup(): + sys.exit(1) + + print("\n🎉 Setup completed successfully!") + print("\n📚 Next steps:") + print("1. Check the README.md for usage instructions") + print("2. Run the examples: python examples/basic_example.py") + print("3. Run the test suite: python test_tracing.py") + print("4. Check your Langfuse dashboard (URL configured in LANGFUSE_HOST)") + + +if __name__ == "__main__": + main() diff --git a/cookbook/pocketflow-tracing/test_tracing.py b/cookbook/pocketflow-tracing/test_tracing.py new file mode 100644 index 0000000..562bd2e --- /dev/null +++ b/cookbook/pocketflow-tracing/test_tracing.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +""" +Test script for PocketFlow tracing functionality. + +This script tests the tracing implementation to ensure it works correctly +with Langfuse integration. +""" + +import sys +import os +import asyncio +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# Add paths for imports +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) +sys.path.insert(0, os.path.dirname(__file__)) + +from pocketflow import Node, Flow, AsyncNode, AsyncFlow +from tracing import trace_flow, TracingConfig +from utils import setup_tracing + + +class TestNode(Node): + """Simple test node for tracing verification.""" + + def prep(self, shared): + """Test prep phase.""" + return shared.get("input", "test_input") + + def exec(self, prep_res): + """Test exec phase.""" + return f"processed_{prep_res}" + + def post(self, shared, prep_res, exec_res): + """Test post phase.""" + shared["output"] = exec_res + return "default" + + +class TestAsyncNode(AsyncNode): + """Simple async test node for tracing verification.""" + + async def prep_async(self, shared): + """Test async prep phase.""" + await asyncio.sleep(0.1) # Simulate async work + return shared.get("input", "async_test_input") + + async def exec_async(self, prep_res): + """Test async exec phase.""" + await asyncio.sleep(0.1) # Simulate async work + return f"async_processed_{prep_res}" + + async def post_async(self, shared, prep_res, exec_res): + """Test async post phase.""" + shared["output"] = exec_res + return "default" + + +@trace_flow(flow_name="TestSyncFlow") +class TestSyncFlow(Flow): + """Test synchronous flow with tracing.""" + + def __init__(self): + super().__init__(start=TestNode()) + + +@trace_flow(flow_name="TestAsyncFlow") +class TestAsyncFlow(AsyncFlow): + """Test asynchronous flow with tracing.""" + + def __init__(self): + super().__init__(start=TestAsyncNode()) + + +def test_sync_flow(): + """Test synchronous flow tracing.""" + print("🧪 Testing synchronous flow tracing...") + + flow = TestSyncFlow() + shared = {"input": "sync_test_data"} + + print(f" Input: {shared}") + result = flow.run(shared) + print(f" Output: {shared}") + print(f" Result: {result}") + + # Verify the flow worked + assert "output" in shared + assert shared["output"] == "processed_sync_test_data" + print(" ✅ Sync flow test passed") + + +async def test_async_flow(): + """Test asynchronous flow tracing.""" + print("🧪 Testing asynchronous flow tracing...") + + flow = TestAsyncFlow() + shared = {"input": "async_test_data"} + + print(f" Input: {shared}") + result = await flow.run_async(shared) + print(f" Output: {shared}") + print(f" Result: {result}") + + # Verify the flow worked + assert "output" in shared + assert shared["output"] == "async_processed_async_test_data" + print(" ✅ Async flow test passed") + + +def test_configuration(): + """Test configuration loading and validation.""" + print("🧪 Testing configuration...") + + # Test loading from environment + config = TracingConfig.from_env() + print(f" Loaded config: debug={config.debug}") + + # Test validation + is_valid = config.validate() + print(f" Config valid: {is_valid}") + + if is_valid: + print(" ✅ Configuration test passed") + else: + print( + " ⚠️ Configuration test failed (this may be expected if env vars not set)" + ) + + +def test_error_handling(): + """Test error handling in traced flows.""" + print("🧪 Testing error handling...") + + class ErrorNode(Node): + def exec(self, prep_res): + raise ValueError("Test error for tracing") + + @trace_flow(flow_name="TestErrorFlow") + class ErrorFlow(Flow): + def __init__(self): + super().__init__(start=ErrorNode()) + + flow = ErrorFlow() + shared = {"input": "error_test"} + + try: + flow.run(shared) + print(" ❌ Expected error but flow succeeded") + except ValueError as e: + print(f" ✅ Error correctly caught and traced: {e}") + except Exception as e: + print(f" ⚠️ Unexpected error type: {e}") + + +async def main(): + """Run all tests.""" + print("🚀 Starting PocketFlow Tracing Tests") + print("=" * 50) + + # Test configuration first + test_configuration() + print() + + # Test setup (optional - only if environment is configured) + try: + print("🔧 Testing setup...") + config = setup_tracing() + print(" ✅ Setup test passed") + except Exception as e: + print(f" ⚠️ Setup test failed: {e}") + print(" (This is expected if Langfuse is not configured)") + print() + + # Test sync flow + test_sync_flow() + print() + + # Test async flow + await test_async_flow() + print() + + # Test error handling + test_error_handling() + print() + + print("🎉 All tests completed!") + print("\n📊 If Langfuse is configured, check your dashboard for traces:") + langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host") + print(f" Dashboard URL: {langfuse_host}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/cookbook/pocketflow-tracing/tracing/__init__.py b/cookbook/pocketflow-tracing/tracing/__init__.py new file mode 100644 index 0000000..ace8069 --- /dev/null +++ b/cookbook/pocketflow-tracing/tracing/__init__.py @@ -0,0 +1,13 @@ +""" +PocketFlow Tracing Module + +This module provides observability and tracing capabilities for PocketFlow workflows +using Langfuse as the backend. It includes decorators and utilities to automatically +trace node execution, inputs, and outputs. +""" + +from .config import TracingConfig +from .core import LangfuseTracer +from .decorator import trace_flow + +__all__ = ["trace_flow", "TracingConfig", "LangfuseTracer"] diff --git a/cookbook/pocketflow-tracing/tracing/config.py b/cookbook/pocketflow-tracing/tracing/config.py new file mode 100644 index 0000000..f8d6f2a --- /dev/null +++ b/cookbook/pocketflow-tracing/tracing/config.py @@ -0,0 +1,111 @@ +""" +Configuration module for PocketFlow tracing with Langfuse. +""" + +import os +from dataclasses import dataclass +from typing import Optional +from dotenv import load_dotenv + + +@dataclass +class TracingConfig: + """Configuration class for PocketFlow tracing with Langfuse.""" + + # Langfuse configuration + langfuse_secret_key: Optional[str] = None + langfuse_public_key: Optional[str] = None + langfuse_host: Optional[str] = None + + # PocketFlow tracing configuration + debug: bool = False + trace_inputs: bool = True + trace_outputs: bool = True + trace_prep: bool = True + trace_exec: bool = True + trace_post: bool = True + trace_errors: bool = True + + # Session configuration + session_id: Optional[str] = None + user_id: Optional[str] = None + + @classmethod + def from_env(cls, env_file: Optional[str] = None) -> "TracingConfig": + """ + Create TracingConfig from environment variables. + + Args: + env_file: Optional path to .env file. If None, looks for .env in current directory. + + Returns: + TracingConfig instance with values from environment variables. + """ + # Load environment variables from .env file if it exists + if env_file: + load_dotenv(env_file) + else: + # Try to find .env file in current directory or parent directories + load_dotenv() + + return cls( + langfuse_secret_key=os.getenv("LANGFUSE_SECRET_KEY"), + langfuse_public_key=os.getenv("LANGFUSE_PUBLIC_KEY"), + langfuse_host=os.getenv("LANGFUSE_HOST"), + debug=os.getenv("POCKETFLOW_TRACING_DEBUG", "false").lower() == "true", + trace_inputs=os.getenv("POCKETFLOW_TRACE_INPUTS", "true").lower() == "true", + trace_outputs=os.getenv("POCKETFLOW_TRACE_OUTPUTS", "true").lower() == "true", + trace_prep=os.getenv("POCKETFLOW_TRACE_PREP", "true").lower() == "true", + trace_exec=os.getenv("POCKETFLOW_TRACE_EXEC", "true").lower() == "true", + trace_post=os.getenv("POCKETFLOW_TRACE_POST", "true").lower() == "true", + trace_errors=os.getenv("POCKETFLOW_TRACE_ERRORS", "true").lower() == "true", + session_id=os.getenv("POCKETFLOW_SESSION_ID"), + user_id=os.getenv("POCKETFLOW_USER_ID"), + ) + + def validate(self) -> bool: + """ + Validate that required configuration is present. + + Returns: + True if configuration is valid, False otherwise. + """ + if not self.langfuse_secret_key: + if self.debug: + print("Warning: LANGFUSE_SECRET_KEY not set") + return False + + if not self.langfuse_public_key: + if self.debug: + print("Warning: LANGFUSE_PUBLIC_KEY not set") + return False + + if not self.langfuse_host: + if self.debug: + print("Warning: LANGFUSE_HOST not set") + return False + + return True + + def to_langfuse_kwargs(self) -> dict: + """ + Convert configuration to kwargs for Langfuse client initialization. + + Returns: + Dictionary of kwargs for Langfuse client. + """ + kwargs = {} + + if self.langfuse_secret_key: + kwargs["secret_key"] = self.langfuse_secret_key + + if self.langfuse_public_key: + kwargs["public_key"] = self.langfuse_public_key + + if self.langfuse_host: + kwargs["host"] = self.langfuse_host + + if self.debug: + kwargs["debug"] = True + + return kwargs diff --git a/cookbook/pocketflow-tracing/tracing/core.py b/cookbook/pocketflow-tracing/tracing/core.py new file mode 100644 index 0000000..1e72599 --- /dev/null +++ b/cookbook/pocketflow-tracing/tracing/core.py @@ -0,0 +1,287 @@ +""" +Core tracing functionality for PocketFlow with Langfuse integration. +""" + +import json +import time +import uuid +from typing import Any, Dict, Optional, Union +from datetime import datetime + +try: + from langfuse import Langfuse + + LANGFUSE_AVAILABLE = True +except ImportError: + LANGFUSE_AVAILABLE = False + print("Warning: langfuse package not installed. Install with: pip install langfuse") + +from .config import TracingConfig + + +class LangfuseTracer: + """ + Core tracer class that handles Langfuse integration for PocketFlow. + """ + + def __init__(self, config: TracingConfig): + """ + Initialize the LangfuseTracer. + + Args: + config: TracingConfig instance with Langfuse settings. + """ + self.config = config + self.client = None + self.current_trace = None + self.spans = {} # Store spans by node ID + + if LANGFUSE_AVAILABLE and config.validate(): + try: + # Initialize Langfuse client with proper parameters + kwargs = {} + if config.langfuse_secret_key: + kwargs["secret_key"] = config.langfuse_secret_key + if config.langfuse_public_key: + kwargs["public_key"] = config.langfuse_public_key + if config.langfuse_host: + kwargs["host"] = config.langfuse_host + if config.debug: + kwargs["debug"] = True + + self.client = Langfuse(**kwargs) + if config.debug: + print( + f"✓ Langfuse client initialized with host: {config.langfuse_host}" + ) + except Exception as e: + if config.debug: + print(f"✗ Failed to initialize Langfuse client: {e}") + self.client = None + else: + if config.debug: + print("✗ Langfuse not available or configuration invalid") + + def start_trace(self, flow_name: str, input_data: Dict[str, Any]) -> Optional[str]: + """ + Start a new trace for a flow execution. + + Args: + flow_name: Name of the flow being traced. + input_data: Input data for the flow. + + Returns: + Trace ID if successful, None otherwise. + """ + if not self.client: + return None + + try: + # Serialize input data safely + serialized_input = self._serialize_data(input_data) + + # Use Langfuse v2 API to create a trace + self.current_trace = self.client.trace( + name=flow_name, + input=serialized_input, + metadata={ + "framework": "PocketFlow", + "trace_type": "flow_execution", + "timestamp": datetime.now().isoformat(), + }, + session_id=self.config.session_id, + user_id=self.config.user_id, + ) + + # Get the trace ID + trace_id = self.current_trace.id + + if self.config.debug: + print(f"✓ Started trace: {trace_id} for flow: {flow_name}") + + return trace_id + + except Exception as e: + if self.config.debug: + print(f"✗ Failed to start trace: {e}") + return None + + def end_trace(self, output_data: Dict[str, Any], status: str = "success") -> None: + """ + End the current trace. + + Args: + output_data: Output data from the flow. + status: Status of the trace execution. + """ + if not self.current_trace: + return + + try: + # Serialize output data safely + serialized_output = self._serialize_data(output_data) + + # Update the trace with output data using v2 API + self.current_trace.update( + output=serialized_output, + metadata={ + "status": status, + "end_timestamp": datetime.now().isoformat(), + }, + ) + + if self.config.debug: + print(f"✓ Ended trace with status: {status}") + + except Exception as e: + if self.config.debug: + print(f"✗ Failed to end trace: {e}") + finally: + self.current_trace = None + self.spans.clear() + + def start_node_span( + self, node_name: str, node_id: str, phase: str + ) -> Optional[str]: + """ + Start a span for a node execution phase. + + Args: + node_name: Name/type of the node. + node_id: Unique identifier for the node instance. + phase: Execution phase (prep, exec, post). + + Returns: + Span ID if successful, None otherwise. + """ + if not self.current_trace: + return None + + try: + span_id = f"{node_id}_{phase}" + + # Create a child span using v2 API + span = self.current_trace.span( + name=f"{node_name}.{phase}", + metadata={ + "node_type": node_name, + "node_id": node_id, + "phase": phase, + "start_timestamp": datetime.now().isoformat(), + }, + ) + + self.spans[span_id] = span + + if self.config.debug: + print(f"✓ Started span: {span_id}") + + return span_id + + except Exception as e: + if self.config.debug: + print(f"✗ Failed to start span: {e}") + return None + + def end_node_span( + self, + span_id: str, + input_data: Any = None, + output_data: Any = None, + error: Exception = None, + ) -> None: + """ + End a node execution span. + + Args: + span_id: ID of the span to end. + input_data: Input data for the phase. + output_data: Output data from the phase. + error: Exception if the phase failed. + """ + if span_id not in self.spans: + return + + try: + span = self.spans[span_id] + + # Prepare update data + update_data = {} + + if input_data is not None and self.config.trace_inputs: + update_data["input"] = self._serialize_data(input_data) + if output_data is not None and self.config.trace_outputs: + update_data["output"] = self._serialize_data(output_data) + + if error and self.config.trace_errors: + update_data.update( + { + "level": "ERROR", + "status_message": str(error), + "metadata": { + "error_type": type(error).__name__, + "error_message": str(error), + "end_timestamp": datetime.now().isoformat(), + }, + } + ) + else: + update_data.update( + { + "level": "DEFAULT", + "metadata": {"end_timestamp": datetime.now().isoformat()}, + } + ) + + # Update the span with all data at once + span.update(**update_data) + + # End the span + span.end() + + if self.config.debug: + status = "ERROR" if error else "SUCCESS" + print(f"✓ Ended span: {span_id} with status: {status}") + + except Exception as e: + if self.config.debug: + print(f"✗ Failed to end span: {e}") + finally: + if span_id in self.spans: + del self.spans[span_id] + + def _serialize_data(self, data: Any) -> Any: + """ + Safely serialize data for Langfuse. + + Args: + data: Data to serialize. + + Returns: + Serialized data that can be sent to Langfuse. + """ + try: + # Handle common PocketFlow data types + if hasattr(data, "__dict__"): + # Convert objects to dict representation + return {"_type": type(data).__name__, "_data": str(data)} + elif isinstance(data, (dict, list, str, int, float, bool, type(None))): + # JSON-serializable types + return data + else: + # Fallback to string representation + return {"_type": type(data).__name__, "_data": str(data)} + except Exception: + # Ultimate fallback + return {"_type": "unknown", "_data": ""} + + def flush(self) -> None: + """Flush any pending traces to Langfuse.""" + if self.client: + try: + self.client.flush() + if self.config.debug: + print("✓ Flushed traces to Langfuse") + except Exception as e: + if self.config.debug: + print(f"✗ Failed to flush traces: {e}") diff --git a/cookbook/pocketflow-tracing/tracing/decorator.py b/cookbook/pocketflow-tracing/tracing/decorator.py new file mode 100644 index 0000000..6eac297 --- /dev/null +++ b/cookbook/pocketflow-tracing/tracing/decorator.py @@ -0,0 +1,293 @@ +""" +Decorator for tracing PocketFlow workflows with Langfuse. +""" + +import functools +import inspect +import uuid +from typing import Any, Callable, Dict, Optional, Union + +from .config import TracingConfig +from .core import LangfuseTracer + + +def trace_flow( + config: Optional[TracingConfig] = None, + flow_name: Optional[str] = None, + session_id: Optional[str] = None, + user_id: Optional[str] = None +): + """ + Decorator to add Langfuse tracing to PocketFlow flows. + + This decorator automatically traces: + - Flow execution start/end + - Each node's prep, exec, and post phases + - Input and output data for each phase + - Errors and exceptions + + Args: + config: TracingConfig instance. If None, loads from environment. + flow_name: Custom name for the flow. If None, uses the flow class name. + session_id: Session ID for grouping related traces. + user_id: User ID for the trace. + + Returns: + Decorated flow class or function. + + Example: + ```python + from tracing import trace_flow + + @trace_flow() + class MyFlow(Flow): + def __init__(self): + super().__init__(start=MyNode()) + + # Or with custom configuration + config = TracingConfig.from_env() + + @trace_flow(config=config, flow_name="CustomFlow") + class MyFlow(Flow): + pass + ``` + """ + def decorator(flow_class_or_func): + # Handle both class and function decoration + if inspect.isclass(flow_class_or_func): + return _trace_flow_class(flow_class_or_func, config, flow_name, session_id, user_id) + else: + return _trace_flow_function(flow_class_or_func, config, flow_name, session_id, user_id) + + return decorator + + +def _trace_flow_class(flow_class, config, flow_name, session_id, user_id): + """Trace a Flow class by wrapping its methods.""" + + # Get or create config + if config is None: + config = TracingConfig.from_env() + + # Override session/user if provided + if session_id: + config.session_id = session_id + if user_id: + config.user_id = user_id + + # Get flow name + if flow_name is None: + flow_name = flow_class.__name__ + + # Store original methods + original_init = flow_class.__init__ + original_run = getattr(flow_class, 'run', None) + original_run_async = getattr(flow_class, 'run_async', None) + + def traced_init(self, *args, **kwargs): + """Initialize the flow with tracing capabilities.""" + # Call original init + original_init(self, *args, **kwargs) + + # Add tracing attributes + self._tracer = LangfuseTracer(config) + self._flow_name = flow_name + self._trace_id = None + + # Patch all nodes in the flow + self._patch_nodes() + + def traced_run(self, shared): + """Traced version of the run method.""" + if not hasattr(self, '_tracer'): + # Fallback if not properly initialized + return original_run(self, shared) if original_run else None + + # Start trace + self._trace_id = self._tracer.start_trace(self._flow_name, shared) + + try: + # Run the original flow + result = original_run(self, shared) if original_run else None + + # End trace successfully + self._tracer.end_trace(shared, "success") + + return result + + except Exception as e: + # End trace with error + self._tracer.end_trace(shared, "error") + raise + finally: + # Ensure cleanup + self._tracer.flush() + + async def traced_run_async(self, shared): + """Traced version of the async run method.""" + if not hasattr(self, '_tracer'): + # Fallback if not properly initialized + return await original_run_async(self, shared) if original_run_async else None + + # Start trace + self._trace_id = self._tracer.start_trace(self._flow_name, shared) + + try: + # Run the original flow + result = await original_run_async(self, shared) if original_run_async else None + + # End trace successfully + self._tracer.end_trace(shared, "success") + + return result + + except Exception as e: + # End trace with error + self._tracer.end_trace(shared, "error") + raise + finally: + # Ensure cleanup + self._tracer.flush() + + def patch_nodes(self): + """Patch all nodes in the flow to add tracing.""" + if not hasattr(self, 'start_node') or not self.start_node: + return + + visited = set() + nodes_to_patch = [self.start_node] + + while nodes_to_patch: + node = nodes_to_patch.pop(0) + if id(node) in visited: + continue + + visited.add(id(node)) + + # Patch this node + self._patch_node(node) + + # Add successors to patch list + if hasattr(node, 'successors'): + for successor in node.successors.values(): + if successor and id(successor) not in visited: + nodes_to_patch.append(successor) + + def patch_node(self, node): + """Patch a single node to add tracing.""" + if hasattr(node, '_pocketflow_traced'): + return # Already patched + + node_id = str(uuid.uuid4()) + node_name = type(node).__name__ + + # Store original methods + original_prep = getattr(node, 'prep', None) + original_exec = getattr(node, 'exec', None) + original_post = getattr(node, 'post', None) + original_prep_async = getattr(node, 'prep_async', None) + original_exec_async = getattr(node, 'exec_async', None) + original_post_async = getattr(node, 'post_async', None) + + # Create traced versions + if original_prep: + node.prep = self._create_traced_method(original_prep, node_id, node_name, 'prep') + if original_exec: + node.exec = self._create_traced_method(original_exec, node_id, node_name, 'exec') + if original_post: + node.post = self._create_traced_method(original_post, node_id, node_name, 'post') + if original_prep_async: + node.prep_async = self._create_traced_async_method(original_prep_async, node_id, node_name, 'prep') + if original_exec_async: + node.exec_async = self._create_traced_async_method(original_exec_async, node_id, node_name, 'exec') + if original_post_async: + node.post_async = self._create_traced_async_method(original_post_async, node_id, node_name, 'post') + + # Mark as traced + node._pocketflow_traced = True + + def create_traced_method(self, original_method, node_id, node_name, phase): + """Create a traced version of a synchronous method.""" + @functools.wraps(original_method) + def traced_method(*args, **kwargs): + span_id = self._tracer.start_node_span(node_name, node_id, phase) + + try: + result = original_method(*args, **kwargs) + self._tracer.end_node_span(span_id, input_data=args, output_data=result) + return result + except Exception as e: + self._tracer.end_node_span(span_id, input_data=args, error=e) + raise + + return traced_method + + def create_traced_async_method(self, original_method, node_id, node_name, phase): + """Create a traced version of an asynchronous method.""" + @functools.wraps(original_method) + async def traced_async_method(*args, **kwargs): + span_id = self._tracer.start_node_span(node_name, node_id, phase) + + try: + result = await original_method(*args, **kwargs) + self._tracer.end_node_span(span_id, input_data=args, output_data=result) + return result + except Exception as e: + self._tracer.end_node_span(span_id, input_data=args, error=e) + raise + + return traced_async_method + + # Replace methods on the class + flow_class.__init__ = traced_init + flow_class._patch_nodes = patch_nodes + flow_class._patch_node = patch_node + flow_class._create_traced_method = create_traced_method + flow_class._create_traced_async_method = create_traced_async_method + + if original_run: + flow_class.run = traced_run + if original_run_async: + flow_class.run_async = traced_run_async + + return flow_class + + +def _trace_flow_function(flow_func, config, flow_name, session_id, user_id): + """Trace a flow function (for functional-style flows).""" + + # Get or create config + if config is None: + config = TracingConfig.from_env() + + # Override session/user if provided + if session_id: + config.session_id = session_id + if user_id: + config.user_id = user_id + + # Get flow name + if flow_name is None: + flow_name = flow_func.__name__ + + tracer = LangfuseTracer(config) + + @functools.wraps(flow_func) + def traced_flow_func(*args, **kwargs): + # Assume first argument is shared data + shared = args[0] if args else {} + + # Start trace + trace_id = tracer.start_trace(flow_name, shared) + + try: + result = flow_func(*args, **kwargs) + tracer.end_trace(shared, "success") + return result + except Exception as e: + tracer.end_trace(shared, "error") + raise + finally: + tracer.flush() + + return traced_flow_func diff --git a/cookbook/pocketflow-tracing/utils/__init__.py b/cookbook/pocketflow-tracing/utils/__init__.py new file mode 100644 index 0000000..f5d9c1e --- /dev/null +++ b/cookbook/pocketflow-tracing/utils/__init__.py @@ -0,0 +1,7 @@ +""" +Utility functions for PocketFlow tracing. +""" + +from .setup import setup_tracing, test_langfuse_connection + +__all__ = ['setup_tracing', 'test_langfuse_connection'] diff --git a/cookbook/pocketflow-tracing/utils/setup.py b/cookbook/pocketflow-tracing/utils/setup.py new file mode 100644 index 0000000..56d3a14 --- /dev/null +++ b/cookbook/pocketflow-tracing/utils/setup.py @@ -0,0 +1,163 @@ +""" +Setup and testing utilities for PocketFlow tracing. +""" + +import os +import sys +from typing import Optional + +# Add parent directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +try: + from langfuse import Langfuse + LANGFUSE_AVAILABLE = True +except ImportError: + LANGFUSE_AVAILABLE = False + +from tracing import TracingConfig, LangfuseTracer + + +def setup_tracing(env_file: Optional[str] = None) -> TracingConfig: + """ + Set up tracing configuration and validate the setup. + + Args: + env_file: Optional path to .env file. If None, uses default location. + + Returns: + TracingConfig instance. + + Raises: + RuntimeError: If setup fails. + """ + print("🔧 Setting up PocketFlow tracing...") + + # Check if langfuse is installed + if not LANGFUSE_AVAILABLE: + raise RuntimeError( + "Langfuse package not installed. Install with: pip install langfuse" + ) + + # Load configuration + if env_file: + config = TracingConfig.from_env(env_file) + print(f"✓ Loaded configuration from: {env_file}") + else: + config = TracingConfig.from_env() + print("✓ Loaded configuration from environment") + + # Validate configuration + if not config.validate(): + raise RuntimeError( + "Invalid tracing configuration. Please check your environment variables:\n" + "- LANGFUSE_SECRET_KEY\n" + "- LANGFUSE_PUBLIC_KEY\n" + "- LANGFUSE_HOST" + ) + + print("✓ Configuration validated") + + # Test connection + if test_langfuse_connection(config): + print("✓ Langfuse connection successful") + else: + raise RuntimeError("Failed to connect to Langfuse. Check your configuration and network.") + + print("🎉 PocketFlow tracing setup complete!") + return config + + +def test_langfuse_connection(config: TracingConfig) -> bool: + """ + Test connection to Langfuse. + + Args: + config: TracingConfig instance. + + Returns: + True if connection successful, False otherwise. + """ + try: + # Create a test tracer + tracer = LangfuseTracer(config) + + if not tracer.client: + return False + + # Try to start and end a test trace + trace_id = tracer.start_trace("test_connection", {"test": True}) + if trace_id: + tracer.end_trace({"test": "completed"}, "success") + tracer.flush() + return True + + return False + + except Exception as e: + if config.debug: + print(f"Connection test failed: {e}") + return False + + +def print_configuration_help(): + """Print help information for configuring tracing.""" + print(""" +🔧 PocketFlow Tracing Configuration Help + +To use PocketFlow tracing, you need to configure Langfuse credentials. + +1. Create or update your .env file with: + +LANGFUSE_SECRET_KEY=your-secret-key +LANGFUSE_PUBLIC_KEY=your-public-key +LANGFUSE_HOST=your-langfuse-host +POCKETFLOW_TRACING_DEBUG=true + +2. Optional configuration: + +POCKETFLOW_TRACE_INPUTS=true +POCKETFLOW_TRACE_OUTPUTS=true +POCKETFLOW_TRACE_PREP=true +POCKETFLOW_TRACE_EXEC=true +POCKETFLOW_TRACE_POST=true +POCKETFLOW_TRACE_ERRORS=true +POCKETFLOW_SESSION_ID=your-session-id +POCKETFLOW_USER_ID=your-user-id + +3. Install required packages: + +pip install -r requirements.txt + +4. Test your setup: + +python -c "from utils import setup_tracing; setup_tracing()" +""") + + +if __name__ == "__main__": + """Command-line interface for setup and testing.""" + import argparse + + parser = argparse.ArgumentParser(description="PocketFlow Tracing Setup") + parser.add_argument("--test", action="store_true", help="Test Langfuse connection") + parser.add_argument("--help-config", action="store_true", help="Show configuration help") + parser.add_argument("--env-file", type=str, help="Path to .env file") + + args = parser.parse_args() + + if args.help_config: + print_configuration_help() + sys.exit(0) + + if args.test: + try: + config = setup_tracing(args.env_file) + print("\n✅ All tests passed! Your tracing setup is ready.") + except Exception as e: + print(f"\n❌ Setup failed: {e}") + print("\nFor help with configuration, run:") + print("python utils/setup.py --help-config") + sys.exit(1) + else: + print_configuration_help() diff --git a/docs/utility_function/viz.md b/docs/utility_function/viz.md index a518be7..065fdaa 100644 --- a/docs/utility_function/viz.md +++ b/docs/utility_function/viz.md @@ -139,4 +139,6 @@ data_science_flow = DataScienceFlow(start=data_prep_node) data_science_flow.run({}) ``` -The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']` \ No newline at end of file +The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']` + +For a more complete implementation, check out [the cookbook](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-tracing). \ No newline at end of file