diff --git a/.gitignore b/.gitignore
index 4c70e7b..659af27 100644
--- a/.gitignore
+++ b/.gitignore
@@ -73,4 +73,14 @@ htmlcov/
test.ipynb
-.pytest_cache/
\ No newline at end of file
+.pytest_cache/
+cookbook/pocketflow-multi-agent/.python-version
+
+
+# local
+uv.lock
+.python-version
+pyproject.toml
+usage.md
+cookbook/pocketflow-minimal-example/viz/flow_visualization.html
+cookbook/pocketflow-minimal-example/viz/flow_visualization.json
diff --git a/cookbook/README.md b/cookbook/README.md
index d77fb5d..1a75946 100644
--- a/cookbook/README.md
+++ b/cookbook/README.md
@@ -20,6 +20,7 @@
| [Thinking](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | ★☆☆
*Beginner* | Solve complex reasoning problems through Chain-of-Thought |
| [Memory](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory) | ★☆☆
*Beginner* | A chat bot with short-term and long-term memory |
| [MCP](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-mcp) | ★☆☆
*Beginner* | Agent using Model Context Protocol for numerical operations |
+| [Tracing](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-tracing) | ★☆☆
*Beginner* | Trace and visualize the execution of your flow |
diff --git a/cookbook/pocketflow-tracing/.env.example b/cookbook/pocketflow-tracing/.env.example
new file mode 100644
index 0000000..21f8686
--- /dev/null
+++ b/cookbook/pocketflow-tracing/.env.example
@@ -0,0 +1,20 @@
+# PocketFlow Tracing Configuration Template
+# Copy this file to .env and replace the placeholder values with your actual Langfuse credentials
+
+# Required Langfuse configuration
+LANGFUSE_SECRET_KEY=your-langfuse-secret-key
+LANGFUSE_PUBLIC_KEY=your-langfuse-public-key
+LANGFUSE_HOST=your-langfuse-host-url
+
+# Optional tracing configuration
+POCKETFLOW_TRACING_DEBUG=true
+POCKETFLOW_TRACE_INPUTS=true
+POCKETFLOW_TRACE_OUTPUTS=true
+POCKETFLOW_TRACE_PREP=true
+POCKETFLOW_TRACE_EXEC=true
+POCKETFLOW_TRACE_POST=true
+POCKETFLOW_TRACE_ERRORS=true
+
+# Optional session/user tracking
+POCKETFLOW_SESSION_ID=your-session-id
+POCKETFLOW_USER_ID=your-user-id
diff --git a/cookbook/pocketflow-tracing/README.md b/cookbook/pocketflow-tracing/README.md
new file mode 100644
index 0000000..e5c4a79
--- /dev/null
+++ b/cookbook/pocketflow-tracing/README.md
@@ -0,0 +1,290 @@
+# PocketFlow Tracing with Langfuse
+
+This cookbook provides comprehensive observability for PocketFlow workflows using [Langfuse](https://langfuse.com/) as the tracing backend. With minimal code changes (just adding a decorator), you can automatically trace all node executions, inputs, outputs, and errors in your PocketFlow workflows.
+
+## 🎯 Features
+
+- **Automatic Tracing**: Trace entire flows with a single decorator
+- **Node-Level Observability**: Automatically trace `prep`, `exec`, and `post` phases of each node
+- **Input/Output Tracking**: Capture all data flowing through your workflow
+- **Error Tracking**: Automatically capture and trace exceptions
+- **Async Support**: Full support for AsyncFlow and AsyncNode
+- **Minimal Code Changes**: Just add `@trace_flow()` to your flow classes
+- **Langfuse Integration**: Leverage Langfuse's powerful observability platform
+
+## 🚀 Quick Start
+
+### 1. Install Dependencies
+
+```bash
+pip install -r requirements.txt
+```
+
+### 2. Environment Setup
+
+Copy the example environment file and configure your Langfuse credentials:
+
+```bash
+cp .env.example .env
+```
+
+Then edit the `.env` file with your actual Langfuse configuration:
+
+```env
+LANGFUSE_SECRET_KEY=your-langfuse-secret-key
+LANGFUSE_PUBLIC_KEY=your-langfuse-public-key
+LANGFUSE_HOST=your-langfuse-host-url
+POCKETFLOW_TRACING_DEBUG=true
+```
+
+**Note**: Replace the placeholder values with your actual Langfuse credentials and host URL.
+
+### 3. Basic Usage
+
+```python
+from pocketflow import Node, Flow
+from tracing import trace_flow
+
+class MyNode(Node):
+ def prep(self, shared):
+ return shared["input"]
+
+ def exec(self, data):
+ return f"Processed: {data}"
+
+ def post(self, shared, prep_res, exec_res):
+ shared["output"] = exec_res
+ return "default"
+
+@trace_flow() # 🎉 That's it! Your flow is now traced
+class MyFlow(Flow):
+ def __init__(self):
+ super().__init__(start=MyNode())
+
+# Run your flow - tracing happens automatically
+flow = MyFlow()
+shared = {"input": "Hello World"}
+flow.run(shared)
+```
+
+## 📊 What Gets Traced
+
+When you apply the `@trace_flow()` decorator, the system automatically traces:
+
+### Flow Level
+- **Flow Start/End**: Overall execution time and status
+- **Input Data**: Initial shared state when flow starts
+- **Output Data**: Final shared state when flow completes
+- **Errors**: Any exceptions that occur during flow execution
+
+### Node Level
+For each node in your flow, the system traces:
+
+- **prep() Phase**:
+ - Input: `shared` data
+ - Output: `prep_res` returned by prep method
+ - Execution time and any errors
+
+- **exec() Phase**:
+ - Input: `prep_res` from prep phase
+ - Output: `exec_res` returned by exec method
+ - Execution time and any errors
+ - Retry attempts (if configured)
+
+- **post() Phase**:
+ - Input: `shared`, `prep_res`, `exec_res`
+ - Output: Action string returned
+ - Execution time and any errors
+
+## 🔧 Configuration Options
+
+### Basic Configuration
+
+```python
+from tracing import trace_flow, TracingConfig
+
+# Use environment variables (default)
+@trace_flow()
+class MyFlow(Flow):
+ pass
+
+# Custom flow name
+@trace_flow(flow_name="CustomFlowName")
+class MyFlow(Flow):
+ pass
+
+# Custom session and user IDs
+@trace_flow(session_id="session-123", user_id="user-456")
+class MyFlow(Flow):
+ pass
+```
+
+### Advanced Configuration
+
+```python
+from tracing import TracingConfig
+
+# Create custom configuration
+config = TracingConfig(
+ langfuse_secret_key="your-secret-key",
+ langfuse_public_key="your-public-key",
+ langfuse_host="https://your-langfuse-instance.com",
+ debug=True,
+ trace_inputs=True,
+ trace_outputs=True,
+ trace_errors=True
+)
+
+@trace_flow(config=config)
+class MyFlow(Flow):
+ pass
+```
+
+## 📁 Examples
+
+### Basic Synchronous Flow
+See `examples/basic_example.py` for a complete example of tracing a simple synchronous flow.
+
+```bash
+cd examples
+python basic_example.py
+```
+
+### Asynchronous Flow
+See `examples/async_example.py` for tracing AsyncFlow and AsyncNode.
+
+```bash
+cd examples
+python async_example.py
+```
+
+## 🔍 Viewing Traces
+
+After running your traced flows, visit your Langfuse dashboard to view the traces:
+
+**Dashboard URL**: Use the URL you configured in `LANGFUSE_HOST` environment variable
+
+In the dashboard you'll see:
+- **Traces**: One trace per flow execution
+- **Spans**: Individual node phases (prep, exec, post)
+- **Input/Output Data**: All data flowing through your workflow
+- **Performance Metrics**: Execution times for each phase
+- **Error Details**: Stack traces and error messages
+
+The tracings in examples.
+
+
+Detailed tracing for a node.
+
+
+## 🛠️ Advanced Usage
+
+### Custom Tracer Configuration
+
+```python
+from tracing import TracingConfig, LangfuseTracer
+
+# Create custom configuration
+config = TracingConfig.from_env()
+config.debug = True
+
+# Use tracer directly (for advanced use cases)
+tracer = LangfuseTracer(config)
+```
+
+### Environment Variables
+
+You can customize tracing behavior with these environment variables:
+
+```env
+# Required Langfuse configuration
+LANGFUSE_SECRET_KEY=your-secret-key
+LANGFUSE_PUBLIC_KEY=your-public-key
+LANGFUSE_HOST=your-langfuse-host
+
+# Optional tracing configuration
+POCKETFLOW_TRACING_DEBUG=true
+POCKETFLOW_TRACE_INPUTS=true
+POCKETFLOW_TRACE_OUTPUTS=true
+POCKETFLOW_TRACE_PREP=true
+POCKETFLOW_TRACE_EXEC=true
+POCKETFLOW_TRACE_POST=true
+POCKETFLOW_TRACE_ERRORS=true
+
+# Optional session/user tracking
+POCKETFLOW_SESSION_ID=your-session-id
+POCKETFLOW_USER_ID=your-user-id
+```
+
+## 🐛 Troubleshooting
+
+### Common Issues
+
+1. **"langfuse package not installed"**
+ ```bash
+ pip install langfuse
+ ```
+
+2. **"Langfuse client initialization failed"**
+ - Check your `.env` file configuration
+ - Verify Langfuse server is running at the specified host
+ - Check network connectivity
+
+3. **"No traces appearing in dashboard"**
+ - Ensure `POCKETFLOW_TRACING_DEBUG=true` to see debug output
+ - Check that your flow is actually being executed
+ - Verify Langfuse credentials are correct
+
+### Debug Mode
+
+Enable debug mode to see detailed tracing information:
+
+```env
+POCKETFLOW_TRACING_DEBUG=true
+```
+
+This will print detailed information about:
+- Langfuse client initialization
+- Trace and span creation
+- Data serialization
+- Error messages
+
+## 📚 API Reference
+
+### `@trace_flow()`
+
+Decorator to add Langfuse tracing to PocketFlow flows.
+
+**Parameters:**
+- `config` (TracingConfig, optional): Custom configuration. If None, loads from environment.
+- `flow_name` (str, optional): Custom name for the flow. If None, uses class name.
+- `session_id` (str, optional): Session ID for grouping related traces.
+- `user_id` (str, optional): User ID for the trace.
+
+### `TracingConfig`
+
+Configuration class for tracing settings.
+
+**Methods:**
+- `TracingConfig.from_env()`: Create config from environment variables
+- `validate()`: Check if configuration is valid
+- `to_langfuse_kwargs()`: Convert to Langfuse client kwargs
+
+### `LangfuseTracer`
+
+Core tracer class for Langfuse integration.
+
+**Methods:**
+- `start_trace()`: Start a new trace
+- `end_trace()`: End the current trace
+- `start_node_span()`: Start a span for node execution
+- `end_node_span()`: End a node execution span
+- `flush()`: Flush pending traces to Langfuse
+
+## 🤝 Contributing
+
+This cookbook is designed to be a starting point for PocketFlow observability. Feel free to extend and customize it for your specific needs!
+
+## 📄 License
+
+This cookbook follows the same license as PocketFlow.
diff --git a/cookbook/pocketflow-tracing/examples/async_example.py b/cookbook/pocketflow-tracing/examples/async_example.py
new file mode 100644
index 0000000..5d26962
--- /dev/null
+++ b/cookbook/pocketflow-tracing/examples/async_example.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python3
+"""
+Async example demonstrating PocketFlow tracing with Langfuse.
+
+This example shows how to use the @trace_flow decorator with AsyncFlow
+and AsyncNode to trace asynchronous workflows.
+"""
+
+import asyncio
+import sys
+import os
+from dotenv import load_dotenv
+
+# Load environment variables
+load_dotenv()
+
+# Add parent directory to path to import pocketflow and tracing
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+
+from pocketflow import AsyncNode, AsyncFlow
+from tracing import trace_flow, TracingConfig
+
+
+class AsyncDataFetchNode(AsyncNode):
+ """An async node that simulates fetching data."""
+
+ async def prep_async(self, shared):
+ """Extract the query from shared data."""
+ query = shared.get("query", "default")
+ return query
+
+ async def exec_async(self, query):
+ """Simulate async data fetching."""
+ print(f"🔍 Fetching data for query: {query}")
+
+ # Simulate async operation
+ await asyncio.sleep(1)
+
+ # Return mock data
+ data = {
+ "query": query,
+ "results": [f"Result {i} for {query}" for i in range(3)],
+ "timestamp": "2024-01-01T00:00:00Z",
+ }
+ return data
+
+ async def post_async(self, shared, prep_res, exec_res):
+ """Store the fetched data."""
+ shared["fetched_data"] = exec_res
+ return "process"
+
+
+class AsyncDataProcessNode(AsyncNode):
+ """An async node that processes the fetched data."""
+
+ async def prep_async(self, shared):
+ """Get the fetched data."""
+ return shared.get("fetched_data", {})
+
+ async def exec_async(self, data):
+ """Process the data asynchronously."""
+ print("⚙️ Processing fetched data...")
+
+ # Simulate async processing
+ await asyncio.sleep(0.5)
+
+ # Process the results
+ processed_results = []
+ for result in data.get("results", []):
+ processed_results.append(f"PROCESSED: {result}")
+
+ return {
+ "original_query": data.get("query"),
+ "processed_results": processed_results,
+ "result_count": len(processed_results),
+ }
+
+ async def post_async(self, shared, prep_res, exec_res):
+ """Store the processed data."""
+ shared["processed_data"] = exec_res
+ return "default"
+
+
+@trace_flow(flow_name="AsyncDataProcessingFlow")
+class AsyncDataProcessingFlow(AsyncFlow):
+ """An async flow that fetches and processes data."""
+
+ def __init__(self):
+ # Create async nodes
+ fetch_node = AsyncDataFetchNode()
+ process_node = AsyncDataProcessNode()
+
+ # Connect nodes
+ fetch_node - "process" >> process_node
+
+ # Initialize async flow
+ super().__init__(start=fetch_node)
+
+
+async def main():
+ """Run the async tracing example."""
+ print("🚀 Starting PocketFlow Async Tracing Example")
+ print("=" * 50)
+
+ # Create the async flow
+ flow = AsyncDataProcessingFlow()
+
+ # Prepare shared data
+ shared = {"query": "machine learning tutorials"}
+
+ print(f"📥 Input: {shared}")
+
+ # Run the async flow (this will be automatically traced)
+ try:
+ result = await flow.run_async(shared)
+ print(f"📤 Output: {shared}")
+ print(f"🎯 Result: {result}")
+ print("✅ Async flow completed successfully!")
+
+ # Print the processed data
+ if "processed_data" in shared:
+ processed = shared["processed_data"]
+ print(
+ f"🎉 Processed {processed['result_count']} results for query: {processed['original_query']}"
+ )
+ for result in processed["processed_results"]:
+ print(f" - {result}")
+
+ except Exception as e:
+ print(f"❌ Async flow failed with error: {e}")
+ raise
+
+ print("\n📊 Check your Langfuse dashboard to see the async trace!")
+ langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host")
+ print(f" Dashboard URL: {langfuse_host}")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/cookbook/pocketflow-tracing/examples/basic_example.py b/cookbook/pocketflow-tracing/examples/basic_example.py
new file mode 100644
index 0000000..587f76d
--- /dev/null
+++ b/cookbook/pocketflow-tracing/examples/basic_example.py
@@ -0,0 +1,110 @@
+#!/usr/bin/env python3
+"""
+Basic example demonstrating PocketFlow tracing with Langfuse.
+
+This example shows how to use the @trace_flow decorator to automatically
+trace a simple PocketFlow workflow.
+"""
+
+import sys
+import os
+from dotenv import load_dotenv
+
+# Load environment variables
+load_dotenv()
+
+# Add parent directory to path to import pocketflow and tracing
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+
+from pocketflow import Node, Flow
+from tracing import trace_flow, TracingConfig
+
+
+class GreetingNode(Node):
+ """A simple node that creates a greeting message."""
+
+ def prep(self, shared):
+ """Extract the name from shared data."""
+ name = shared.get("name", "World")
+ return name
+
+ def exec(self, name):
+ """Create a greeting message."""
+ greeting = f"Hello, {name}!"
+ return greeting
+
+ def post(self, shared, prep_res, exec_res):
+ """Store the greeting in shared data."""
+ shared["greeting"] = exec_res
+ return "default"
+
+
+class UppercaseNode(Node):
+ """A node that converts the greeting to uppercase."""
+
+ def prep(self, shared):
+ """Get the greeting from shared data."""
+ return shared.get("greeting", "")
+
+ def exec(self, greeting):
+ """Convert to uppercase."""
+ return greeting.upper()
+
+ def post(self, shared, prep_res, exec_res):
+ """Store the uppercase greeting."""
+ shared["uppercase_greeting"] = exec_res
+ return "default"
+
+
+@trace_flow(flow_name="BasicGreetingFlow")
+class BasicGreetingFlow(Flow):
+ """A simple flow that creates and processes a greeting."""
+
+ def __init__(self):
+ # Create nodes
+ greeting_node = GreetingNode()
+ uppercase_node = UppercaseNode()
+
+ # Connect nodes
+ greeting_node >> uppercase_node
+
+ # Initialize flow
+ super().__init__(start=greeting_node)
+
+
+def main():
+ """Run the basic tracing example."""
+ print("🚀 Starting PocketFlow Tracing Basic Example")
+ print("=" * 50)
+
+ # Create the flow
+ flow = BasicGreetingFlow()
+
+ # Prepare shared data
+ shared = {"name": "PocketFlow User"}
+
+ print(f"📥 Input: {shared}")
+
+ # Run the flow (this will be automatically traced)
+ try:
+ result = flow.run(shared)
+ print(f"📤 Output: {shared}")
+ print(f"🎯 Result: {result}")
+ print("✅ Flow completed successfully!")
+
+ # Print the final greeting
+ if "uppercase_greeting" in shared:
+ print(f"🎉 Final greeting: {shared['uppercase_greeting']}")
+
+ except Exception as e:
+ print(f"❌ Flow failed with error: {e}")
+ raise
+
+ print("\n📊 Check your Langfuse dashboard to see the trace!")
+ langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host")
+ print(f" Dashboard URL: {langfuse_host}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/cookbook/pocketflow-tracing/requirements.txt b/cookbook/pocketflow-tracing/requirements.txt
new file mode 100644
index 0000000..d4684e7
--- /dev/null
+++ b/cookbook/pocketflow-tracing/requirements.txt
@@ -0,0 +1,6 @@
+# Core dependencies for PocketFlow tracing
+langfuse>=2.0.0,<3.0.0 # v2 low level SDK compatible with Langfuse servers
+python-dotenv>=1.0.0
+
+# Optional dependencies for enhanced functionality
+pydantic>=2.0.0 # For data validation and serialization
diff --git a/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-05-28.png b/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-05-28.png
new file mode 100644
index 0000000..842f183
Binary files /dev/null and b/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-05-28.png differ
diff --git a/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-07-56.png b/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-07-56.png
new file mode 100644
index 0000000..7e26de2
Binary files /dev/null and b/cookbook/pocketflow-tracing/screenshots/chrome_2025-06-27_12-07-56.png differ
diff --git a/cookbook/pocketflow-tracing/setup.py b/cookbook/pocketflow-tracing/setup.py
new file mode 100644
index 0000000..6f61cc4
--- /dev/null
+++ b/cookbook/pocketflow-tracing/setup.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python3
+"""
+Setup script for PocketFlow Tracing cookbook.
+
+This script helps install dependencies and verify the setup.
+"""
+
+import subprocess
+import sys
+import os
+
+
+def install_dependencies():
+ """Install required dependencies."""
+ print("📦 Installing dependencies...")
+ try:
+ subprocess.check_call(
+ [sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]
+ )
+ print("✅ Dependencies installed successfully!")
+ return True
+ except subprocess.CalledProcessError as e:
+ print(f"❌ Failed to install dependencies: {e}")
+ return False
+
+
+def verify_setup():
+ """Verify that the setup is working."""
+ print("🔍 Verifying setup...")
+ try:
+ # Try to import the tracing module
+ from tracing import trace_flow, TracingConfig
+
+ print("✅ Tracing module imported successfully!")
+
+ # Try to load configuration
+ config = TracingConfig.from_env()
+ if config.validate():
+ print("✅ Configuration is valid!")
+ else:
+ print("⚠️ Configuration validation failed - check your .env file")
+
+ return True
+ except ImportError as e:
+ print(f"❌ Failed to import tracing module: {e}")
+ return False
+ except Exception as e:
+ print(f"❌ Setup verification failed: {e}")
+ return False
+
+
+def main():
+ """Main setup function."""
+ print("🚀 PocketFlow Tracing Setup")
+ print("=" * 40)
+
+ # Check if we're in the right directory
+ if not os.path.exists("requirements.txt"):
+ print(
+ "❌ requirements.txt not found. Please run this script from the pocketflow-tracing directory."
+ )
+ sys.exit(1)
+
+ # Install dependencies
+ if not install_dependencies():
+ sys.exit(1)
+
+ # Verify setup
+ if not verify_setup():
+ sys.exit(1)
+
+ print("\n🎉 Setup completed successfully!")
+ print("\n📚 Next steps:")
+ print("1. Check the README.md for usage instructions")
+ print("2. Run the examples: python examples/basic_example.py")
+ print("3. Run the test suite: python test_tracing.py")
+ print("4. Check your Langfuse dashboard (URL configured in LANGFUSE_HOST)")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/cookbook/pocketflow-tracing/test_tracing.py b/cookbook/pocketflow-tracing/test_tracing.py
new file mode 100644
index 0000000..562bd2e
--- /dev/null
+++ b/cookbook/pocketflow-tracing/test_tracing.py
@@ -0,0 +1,197 @@
+#!/usr/bin/env python3
+"""
+Test script for PocketFlow tracing functionality.
+
+This script tests the tracing implementation to ensure it works correctly
+with Langfuse integration.
+"""
+
+import sys
+import os
+import asyncio
+from dotenv import load_dotenv
+
+# Load environment variables
+load_dotenv()
+
+# Add paths for imports
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
+sys.path.insert(0, os.path.dirname(__file__))
+
+from pocketflow import Node, Flow, AsyncNode, AsyncFlow
+from tracing import trace_flow, TracingConfig
+from utils import setup_tracing
+
+
+class TestNode(Node):
+ """Simple test node for tracing verification."""
+
+ def prep(self, shared):
+ """Test prep phase."""
+ return shared.get("input", "test_input")
+
+ def exec(self, prep_res):
+ """Test exec phase."""
+ return f"processed_{prep_res}"
+
+ def post(self, shared, prep_res, exec_res):
+ """Test post phase."""
+ shared["output"] = exec_res
+ return "default"
+
+
+class TestAsyncNode(AsyncNode):
+ """Simple async test node for tracing verification."""
+
+ async def prep_async(self, shared):
+ """Test async prep phase."""
+ await asyncio.sleep(0.1) # Simulate async work
+ return shared.get("input", "async_test_input")
+
+ async def exec_async(self, prep_res):
+ """Test async exec phase."""
+ await asyncio.sleep(0.1) # Simulate async work
+ return f"async_processed_{prep_res}"
+
+ async def post_async(self, shared, prep_res, exec_res):
+ """Test async post phase."""
+ shared["output"] = exec_res
+ return "default"
+
+
+@trace_flow(flow_name="TestSyncFlow")
+class TestSyncFlow(Flow):
+ """Test synchronous flow with tracing."""
+
+ def __init__(self):
+ super().__init__(start=TestNode())
+
+
+@trace_flow(flow_name="TestAsyncFlow")
+class TestAsyncFlow(AsyncFlow):
+ """Test asynchronous flow with tracing."""
+
+ def __init__(self):
+ super().__init__(start=TestAsyncNode())
+
+
+def test_sync_flow():
+ """Test synchronous flow tracing."""
+ print("🧪 Testing synchronous flow tracing...")
+
+ flow = TestSyncFlow()
+ shared = {"input": "sync_test_data"}
+
+ print(f" Input: {shared}")
+ result = flow.run(shared)
+ print(f" Output: {shared}")
+ print(f" Result: {result}")
+
+ # Verify the flow worked
+ assert "output" in shared
+ assert shared["output"] == "processed_sync_test_data"
+ print(" ✅ Sync flow test passed")
+
+
+async def test_async_flow():
+ """Test asynchronous flow tracing."""
+ print("🧪 Testing asynchronous flow tracing...")
+
+ flow = TestAsyncFlow()
+ shared = {"input": "async_test_data"}
+
+ print(f" Input: {shared}")
+ result = await flow.run_async(shared)
+ print(f" Output: {shared}")
+ print(f" Result: {result}")
+
+ # Verify the flow worked
+ assert "output" in shared
+ assert shared["output"] == "async_processed_async_test_data"
+ print(" ✅ Async flow test passed")
+
+
+def test_configuration():
+ """Test configuration loading and validation."""
+ print("🧪 Testing configuration...")
+
+ # Test loading from environment
+ config = TracingConfig.from_env()
+ print(f" Loaded config: debug={config.debug}")
+
+ # Test validation
+ is_valid = config.validate()
+ print(f" Config valid: {is_valid}")
+
+ if is_valid:
+ print(" ✅ Configuration test passed")
+ else:
+ print(
+ " ⚠️ Configuration test failed (this may be expected if env vars not set)"
+ )
+
+
+def test_error_handling():
+ """Test error handling in traced flows."""
+ print("🧪 Testing error handling...")
+
+ class ErrorNode(Node):
+ def exec(self, prep_res):
+ raise ValueError("Test error for tracing")
+
+ @trace_flow(flow_name="TestErrorFlow")
+ class ErrorFlow(Flow):
+ def __init__(self):
+ super().__init__(start=ErrorNode())
+
+ flow = ErrorFlow()
+ shared = {"input": "error_test"}
+
+ try:
+ flow.run(shared)
+ print(" ❌ Expected error but flow succeeded")
+ except ValueError as e:
+ print(f" ✅ Error correctly caught and traced: {e}")
+ except Exception as e:
+ print(f" ⚠️ Unexpected error type: {e}")
+
+
+async def main():
+ """Run all tests."""
+ print("🚀 Starting PocketFlow Tracing Tests")
+ print("=" * 50)
+
+ # Test configuration first
+ test_configuration()
+ print()
+
+ # Test setup (optional - only if environment is configured)
+ try:
+ print("🔧 Testing setup...")
+ config = setup_tracing()
+ print(" ✅ Setup test passed")
+ except Exception as e:
+ print(f" ⚠️ Setup test failed: {e}")
+ print(" (This is expected if Langfuse is not configured)")
+ print()
+
+ # Test sync flow
+ test_sync_flow()
+ print()
+
+ # Test async flow
+ await test_async_flow()
+ print()
+
+ # Test error handling
+ test_error_handling()
+ print()
+
+ print("🎉 All tests completed!")
+ print("\n📊 If Langfuse is configured, check your dashboard for traces:")
+ langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host")
+ print(f" Dashboard URL: {langfuse_host}")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/cookbook/pocketflow-tracing/tracing/__init__.py b/cookbook/pocketflow-tracing/tracing/__init__.py
new file mode 100644
index 0000000..ace8069
--- /dev/null
+++ b/cookbook/pocketflow-tracing/tracing/__init__.py
@@ -0,0 +1,13 @@
+"""
+PocketFlow Tracing Module
+
+This module provides observability and tracing capabilities for PocketFlow workflows
+using Langfuse as the backend. It includes decorators and utilities to automatically
+trace node execution, inputs, and outputs.
+"""
+
+from .config import TracingConfig
+from .core import LangfuseTracer
+from .decorator import trace_flow
+
+__all__ = ["trace_flow", "TracingConfig", "LangfuseTracer"]
diff --git a/cookbook/pocketflow-tracing/tracing/config.py b/cookbook/pocketflow-tracing/tracing/config.py
new file mode 100644
index 0000000..f8d6f2a
--- /dev/null
+++ b/cookbook/pocketflow-tracing/tracing/config.py
@@ -0,0 +1,111 @@
+"""
+Configuration module for PocketFlow tracing with Langfuse.
+"""
+
+import os
+from dataclasses import dataclass
+from typing import Optional
+from dotenv import load_dotenv
+
+
+@dataclass
+class TracingConfig:
+ """Configuration class for PocketFlow tracing with Langfuse."""
+
+ # Langfuse configuration
+ langfuse_secret_key: Optional[str] = None
+ langfuse_public_key: Optional[str] = None
+ langfuse_host: Optional[str] = None
+
+ # PocketFlow tracing configuration
+ debug: bool = False
+ trace_inputs: bool = True
+ trace_outputs: bool = True
+ trace_prep: bool = True
+ trace_exec: bool = True
+ trace_post: bool = True
+ trace_errors: bool = True
+
+ # Session configuration
+ session_id: Optional[str] = None
+ user_id: Optional[str] = None
+
+ @classmethod
+ def from_env(cls, env_file: Optional[str] = None) -> "TracingConfig":
+ """
+ Create TracingConfig from environment variables.
+
+ Args:
+ env_file: Optional path to .env file. If None, looks for .env in current directory.
+
+ Returns:
+ TracingConfig instance with values from environment variables.
+ """
+ # Load environment variables from .env file if it exists
+ if env_file:
+ load_dotenv(env_file)
+ else:
+ # Try to find .env file in current directory or parent directories
+ load_dotenv()
+
+ return cls(
+ langfuse_secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
+ langfuse_public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
+ langfuse_host=os.getenv("LANGFUSE_HOST"),
+ debug=os.getenv("POCKETFLOW_TRACING_DEBUG", "false").lower() == "true",
+ trace_inputs=os.getenv("POCKETFLOW_TRACE_INPUTS", "true").lower() == "true",
+ trace_outputs=os.getenv("POCKETFLOW_TRACE_OUTPUTS", "true").lower() == "true",
+ trace_prep=os.getenv("POCKETFLOW_TRACE_PREP", "true").lower() == "true",
+ trace_exec=os.getenv("POCKETFLOW_TRACE_EXEC", "true").lower() == "true",
+ trace_post=os.getenv("POCKETFLOW_TRACE_POST", "true").lower() == "true",
+ trace_errors=os.getenv("POCKETFLOW_TRACE_ERRORS", "true").lower() == "true",
+ session_id=os.getenv("POCKETFLOW_SESSION_ID"),
+ user_id=os.getenv("POCKETFLOW_USER_ID"),
+ )
+
+ def validate(self) -> bool:
+ """
+ Validate that required configuration is present.
+
+ Returns:
+ True if configuration is valid, False otherwise.
+ """
+ if not self.langfuse_secret_key:
+ if self.debug:
+ print("Warning: LANGFUSE_SECRET_KEY not set")
+ return False
+
+ if not self.langfuse_public_key:
+ if self.debug:
+ print("Warning: LANGFUSE_PUBLIC_KEY not set")
+ return False
+
+ if not self.langfuse_host:
+ if self.debug:
+ print("Warning: LANGFUSE_HOST not set")
+ return False
+
+ return True
+
+ def to_langfuse_kwargs(self) -> dict:
+ """
+ Convert configuration to kwargs for Langfuse client initialization.
+
+ Returns:
+ Dictionary of kwargs for Langfuse client.
+ """
+ kwargs = {}
+
+ if self.langfuse_secret_key:
+ kwargs["secret_key"] = self.langfuse_secret_key
+
+ if self.langfuse_public_key:
+ kwargs["public_key"] = self.langfuse_public_key
+
+ if self.langfuse_host:
+ kwargs["host"] = self.langfuse_host
+
+ if self.debug:
+ kwargs["debug"] = True
+
+ return kwargs
diff --git a/cookbook/pocketflow-tracing/tracing/core.py b/cookbook/pocketflow-tracing/tracing/core.py
new file mode 100644
index 0000000..1e72599
--- /dev/null
+++ b/cookbook/pocketflow-tracing/tracing/core.py
@@ -0,0 +1,287 @@
+"""
+Core tracing functionality for PocketFlow with Langfuse integration.
+"""
+
+import json
+import time
+import uuid
+from typing import Any, Dict, Optional, Union
+from datetime import datetime
+
+try:
+ from langfuse import Langfuse
+
+ LANGFUSE_AVAILABLE = True
+except ImportError:
+ LANGFUSE_AVAILABLE = False
+ print("Warning: langfuse package not installed. Install with: pip install langfuse")
+
+from .config import TracingConfig
+
+
+class LangfuseTracer:
+ """
+ Core tracer class that handles Langfuse integration for PocketFlow.
+ """
+
+ def __init__(self, config: TracingConfig):
+ """
+ Initialize the LangfuseTracer.
+
+ Args:
+ config: TracingConfig instance with Langfuse settings.
+ """
+ self.config = config
+ self.client = None
+ self.current_trace = None
+ self.spans = {} # Store spans by node ID
+
+ if LANGFUSE_AVAILABLE and config.validate():
+ try:
+ # Initialize Langfuse client with proper parameters
+ kwargs = {}
+ if config.langfuse_secret_key:
+ kwargs["secret_key"] = config.langfuse_secret_key
+ if config.langfuse_public_key:
+ kwargs["public_key"] = config.langfuse_public_key
+ if config.langfuse_host:
+ kwargs["host"] = config.langfuse_host
+ if config.debug:
+ kwargs["debug"] = True
+
+ self.client = Langfuse(**kwargs)
+ if config.debug:
+ print(
+ f"✓ Langfuse client initialized with host: {config.langfuse_host}"
+ )
+ except Exception as e:
+ if config.debug:
+ print(f"✗ Failed to initialize Langfuse client: {e}")
+ self.client = None
+ else:
+ if config.debug:
+ print("✗ Langfuse not available or configuration invalid")
+
+ def start_trace(self, flow_name: str, input_data: Dict[str, Any]) -> Optional[str]:
+ """
+ Start a new trace for a flow execution.
+
+ Args:
+ flow_name: Name of the flow being traced.
+ input_data: Input data for the flow.
+
+ Returns:
+ Trace ID if successful, None otherwise.
+ """
+ if not self.client:
+ return None
+
+ try:
+ # Serialize input data safely
+ serialized_input = self._serialize_data(input_data)
+
+ # Use Langfuse v2 API to create a trace
+ self.current_trace = self.client.trace(
+ name=flow_name,
+ input=serialized_input,
+ metadata={
+ "framework": "PocketFlow",
+ "trace_type": "flow_execution",
+ "timestamp": datetime.now().isoformat(),
+ },
+ session_id=self.config.session_id,
+ user_id=self.config.user_id,
+ )
+
+ # Get the trace ID
+ trace_id = self.current_trace.id
+
+ if self.config.debug:
+ print(f"✓ Started trace: {trace_id} for flow: {flow_name}")
+
+ return trace_id
+
+ except Exception as e:
+ if self.config.debug:
+ print(f"✗ Failed to start trace: {e}")
+ return None
+
+ def end_trace(self, output_data: Dict[str, Any], status: str = "success") -> None:
+ """
+ End the current trace.
+
+ Args:
+ output_data: Output data from the flow.
+ status: Status of the trace execution.
+ """
+ if not self.current_trace:
+ return
+
+ try:
+ # Serialize output data safely
+ serialized_output = self._serialize_data(output_data)
+
+ # Update the trace with output data using v2 API
+ self.current_trace.update(
+ output=serialized_output,
+ metadata={
+ "status": status,
+ "end_timestamp": datetime.now().isoformat(),
+ },
+ )
+
+ if self.config.debug:
+ print(f"✓ Ended trace with status: {status}")
+
+ except Exception as e:
+ if self.config.debug:
+ print(f"✗ Failed to end trace: {e}")
+ finally:
+ self.current_trace = None
+ self.spans.clear()
+
+ def start_node_span(
+ self, node_name: str, node_id: str, phase: str
+ ) -> Optional[str]:
+ """
+ Start a span for a node execution phase.
+
+ Args:
+ node_name: Name/type of the node.
+ node_id: Unique identifier for the node instance.
+ phase: Execution phase (prep, exec, post).
+
+ Returns:
+ Span ID if successful, None otherwise.
+ """
+ if not self.current_trace:
+ return None
+
+ try:
+ span_id = f"{node_id}_{phase}"
+
+ # Create a child span using v2 API
+ span = self.current_trace.span(
+ name=f"{node_name}.{phase}",
+ metadata={
+ "node_type": node_name,
+ "node_id": node_id,
+ "phase": phase,
+ "start_timestamp": datetime.now().isoformat(),
+ },
+ )
+
+ self.spans[span_id] = span
+
+ if self.config.debug:
+ print(f"✓ Started span: {span_id}")
+
+ return span_id
+
+ except Exception as e:
+ if self.config.debug:
+ print(f"✗ Failed to start span: {e}")
+ return None
+
+ def end_node_span(
+ self,
+ span_id: str,
+ input_data: Any = None,
+ output_data: Any = None,
+ error: Exception = None,
+ ) -> None:
+ """
+ End a node execution span.
+
+ Args:
+ span_id: ID of the span to end.
+ input_data: Input data for the phase.
+ output_data: Output data from the phase.
+ error: Exception if the phase failed.
+ """
+ if span_id not in self.spans:
+ return
+
+ try:
+ span = self.spans[span_id]
+
+ # Prepare update data
+ update_data = {}
+
+ if input_data is not None and self.config.trace_inputs:
+ update_data["input"] = self._serialize_data(input_data)
+ if output_data is not None and self.config.trace_outputs:
+ update_data["output"] = self._serialize_data(output_data)
+
+ if error and self.config.trace_errors:
+ update_data.update(
+ {
+ "level": "ERROR",
+ "status_message": str(error),
+ "metadata": {
+ "error_type": type(error).__name__,
+ "error_message": str(error),
+ "end_timestamp": datetime.now().isoformat(),
+ },
+ }
+ )
+ else:
+ update_data.update(
+ {
+ "level": "DEFAULT",
+ "metadata": {"end_timestamp": datetime.now().isoformat()},
+ }
+ )
+
+ # Update the span with all data at once
+ span.update(**update_data)
+
+ # End the span
+ span.end()
+
+ if self.config.debug:
+ status = "ERROR" if error else "SUCCESS"
+ print(f"✓ Ended span: {span_id} with status: {status}")
+
+ except Exception as e:
+ if self.config.debug:
+ print(f"✗ Failed to end span: {e}")
+ finally:
+ if span_id in self.spans:
+ del self.spans[span_id]
+
+ def _serialize_data(self, data: Any) -> Any:
+ """
+ Safely serialize data for Langfuse.
+
+ Args:
+ data: Data to serialize.
+
+ Returns:
+ Serialized data that can be sent to Langfuse.
+ """
+ try:
+ # Handle common PocketFlow data types
+ if hasattr(data, "__dict__"):
+ # Convert objects to dict representation
+ return {"_type": type(data).__name__, "_data": str(data)}
+ elif isinstance(data, (dict, list, str, int, float, bool, type(None))):
+ # JSON-serializable types
+ return data
+ else:
+ # Fallback to string representation
+ return {"_type": type(data).__name__, "_data": str(data)}
+ except Exception:
+ # Ultimate fallback
+ return {"_type": "unknown", "_data": ""}
+
+ def flush(self) -> None:
+ """Flush any pending traces to Langfuse."""
+ if self.client:
+ try:
+ self.client.flush()
+ if self.config.debug:
+ print("✓ Flushed traces to Langfuse")
+ except Exception as e:
+ if self.config.debug:
+ print(f"✗ Failed to flush traces: {e}")
diff --git a/cookbook/pocketflow-tracing/tracing/decorator.py b/cookbook/pocketflow-tracing/tracing/decorator.py
new file mode 100644
index 0000000..6eac297
--- /dev/null
+++ b/cookbook/pocketflow-tracing/tracing/decorator.py
@@ -0,0 +1,293 @@
+"""
+Decorator for tracing PocketFlow workflows with Langfuse.
+"""
+
+import functools
+import inspect
+import uuid
+from typing import Any, Callable, Dict, Optional, Union
+
+from .config import TracingConfig
+from .core import LangfuseTracer
+
+
+def trace_flow(
+ config: Optional[TracingConfig] = None,
+ flow_name: Optional[str] = None,
+ session_id: Optional[str] = None,
+ user_id: Optional[str] = None
+):
+ """
+ Decorator to add Langfuse tracing to PocketFlow flows.
+
+ This decorator automatically traces:
+ - Flow execution start/end
+ - Each node's prep, exec, and post phases
+ - Input and output data for each phase
+ - Errors and exceptions
+
+ Args:
+ config: TracingConfig instance. If None, loads from environment.
+ flow_name: Custom name for the flow. If None, uses the flow class name.
+ session_id: Session ID for grouping related traces.
+ user_id: User ID for the trace.
+
+ Returns:
+ Decorated flow class or function.
+
+ Example:
+ ```python
+ from tracing import trace_flow
+
+ @trace_flow()
+ class MyFlow(Flow):
+ def __init__(self):
+ super().__init__(start=MyNode())
+
+ # Or with custom configuration
+ config = TracingConfig.from_env()
+
+ @trace_flow(config=config, flow_name="CustomFlow")
+ class MyFlow(Flow):
+ pass
+ ```
+ """
+ def decorator(flow_class_or_func):
+ # Handle both class and function decoration
+ if inspect.isclass(flow_class_or_func):
+ return _trace_flow_class(flow_class_or_func, config, flow_name, session_id, user_id)
+ else:
+ return _trace_flow_function(flow_class_or_func, config, flow_name, session_id, user_id)
+
+ return decorator
+
+
+def _trace_flow_class(flow_class, config, flow_name, session_id, user_id):
+ """Trace a Flow class by wrapping its methods."""
+
+ # Get or create config
+ if config is None:
+ config = TracingConfig.from_env()
+
+ # Override session/user if provided
+ if session_id:
+ config.session_id = session_id
+ if user_id:
+ config.user_id = user_id
+
+ # Get flow name
+ if flow_name is None:
+ flow_name = flow_class.__name__
+
+ # Store original methods
+ original_init = flow_class.__init__
+ original_run = getattr(flow_class, 'run', None)
+ original_run_async = getattr(flow_class, 'run_async', None)
+
+ def traced_init(self, *args, **kwargs):
+ """Initialize the flow with tracing capabilities."""
+ # Call original init
+ original_init(self, *args, **kwargs)
+
+ # Add tracing attributes
+ self._tracer = LangfuseTracer(config)
+ self._flow_name = flow_name
+ self._trace_id = None
+
+ # Patch all nodes in the flow
+ self._patch_nodes()
+
+ def traced_run(self, shared):
+ """Traced version of the run method."""
+ if not hasattr(self, '_tracer'):
+ # Fallback if not properly initialized
+ return original_run(self, shared) if original_run else None
+
+ # Start trace
+ self._trace_id = self._tracer.start_trace(self._flow_name, shared)
+
+ try:
+ # Run the original flow
+ result = original_run(self, shared) if original_run else None
+
+ # End trace successfully
+ self._tracer.end_trace(shared, "success")
+
+ return result
+
+ except Exception as e:
+ # End trace with error
+ self._tracer.end_trace(shared, "error")
+ raise
+ finally:
+ # Ensure cleanup
+ self._tracer.flush()
+
+ async def traced_run_async(self, shared):
+ """Traced version of the async run method."""
+ if not hasattr(self, '_tracer'):
+ # Fallback if not properly initialized
+ return await original_run_async(self, shared) if original_run_async else None
+
+ # Start trace
+ self._trace_id = self._tracer.start_trace(self._flow_name, shared)
+
+ try:
+ # Run the original flow
+ result = await original_run_async(self, shared) if original_run_async else None
+
+ # End trace successfully
+ self._tracer.end_trace(shared, "success")
+
+ return result
+
+ except Exception as e:
+ # End trace with error
+ self._tracer.end_trace(shared, "error")
+ raise
+ finally:
+ # Ensure cleanup
+ self._tracer.flush()
+
+ def patch_nodes(self):
+ """Patch all nodes in the flow to add tracing."""
+ if not hasattr(self, 'start_node') or not self.start_node:
+ return
+
+ visited = set()
+ nodes_to_patch = [self.start_node]
+
+ while nodes_to_patch:
+ node = nodes_to_patch.pop(0)
+ if id(node) in visited:
+ continue
+
+ visited.add(id(node))
+
+ # Patch this node
+ self._patch_node(node)
+
+ # Add successors to patch list
+ if hasattr(node, 'successors'):
+ for successor in node.successors.values():
+ if successor and id(successor) not in visited:
+ nodes_to_patch.append(successor)
+
+ def patch_node(self, node):
+ """Patch a single node to add tracing."""
+ if hasattr(node, '_pocketflow_traced'):
+ return # Already patched
+
+ node_id = str(uuid.uuid4())
+ node_name = type(node).__name__
+
+ # Store original methods
+ original_prep = getattr(node, 'prep', None)
+ original_exec = getattr(node, 'exec', None)
+ original_post = getattr(node, 'post', None)
+ original_prep_async = getattr(node, 'prep_async', None)
+ original_exec_async = getattr(node, 'exec_async', None)
+ original_post_async = getattr(node, 'post_async', None)
+
+ # Create traced versions
+ if original_prep:
+ node.prep = self._create_traced_method(original_prep, node_id, node_name, 'prep')
+ if original_exec:
+ node.exec = self._create_traced_method(original_exec, node_id, node_name, 'exec')
+ if original_post:
+ node.post = self._create_traced_method(original_post, node_id, node_name, 'post')
+ if original_prep_async:
+ node.prep_async = self._create_traced_async_method(original_prep_async, node_id, node_name, 'prep')
+ if original_exec_async:
+ node.exec_async = self._create_traced_async_method(original_exec_async, node_id, node_name, 'exec')
+ if original_post_async:
+ node.post_async = self._create_traced_async_method(original_post_async, node_id, node_name, 'post')
+
+ # Mark as traced
+ node._pocketflow_traced = True
+
+ def create_traced_method(self, original_method, node_id, node_name, phase):
+ """Create a traced version of a synchronous method."""
+ @functools.wraps(original_method)
+ def traced_method(*args, **kwargs):
+ span_id = self._tracer.start_node_span(node_name, node_id, phase)
+
+ try:
+ result = original_method(*args, **kwargs)
+ self._tracer.end_node_span(span_id, input_data=args, output_data=result)
+ return result
+ except Exception as e:
+ self._tracer.end_node_span(span_id, input_data=args, error=e)
+ raise
+
+ return traced_method
+
+ def create_traced_async_method(self, original_method, node_id, node_name, phase):
+ """Create a traced version of an asynchronous method."""
+ @functools.wraps(original_method)
+ async def traced_async_method(*args, **kwargs):
+ span_id = self._tracer.start_node_span(node_name, node_id, phase)
+
+ try:
+ result = await original_method(*args, **kwargs)
+ self._tracer.end_node_span(span_id, input_data=args, output_data=result)
+ return result
+ except Exception as e:
+ self._tracer.end_node_span(span_id, input_data=args, error=e)
+ raise
+
+ return traced_async_method
+
+ # Replace methods on the class
+ flow_class.__init__ = traced_init
+ flow_class._patch_nodes = patch_nodes
+ flow_class._patch_node = patch_node
+ flow_class._create_traced_method = create_traced_method
+ flow_class._create_traced_async_method = create_traced_async_method
+
+ if original_run:
+ flow_class.run = traced_run
+ if original_run_async:
+ flow_class.run_async = traced_run_async
+
+ return flow_class
+
+
+def _trace_flow_function(flow_func, config, flow_name, session_id, user_id):
+ """Trace a flow function (for functional-style flows)."""
+
+ # Get or create config
+ if config is None:
+ config = TracingConfig.from_env()
+
+ # Override session/user if provided
+ if session_id:
+ config.session_id = session_id
+ if user_id:
+ config.user_id = user_id
+
+ # Get flow name
+ if flow_name is None:
+ flow_name = flow_func.__name__
+
+ tracer = LangfuseTracer(config)
+
+ @functools.wraps(flow_func)
+ def traced_flow_func(*args, **kwargs):
+ # Assume first argument is shared data
+ shared = args[0] if args else {}
+
+ # Start trace
+ trace_id = tracer.start_trace(flow_name, shared)
+
+ try:
+ result = flow_func(*args, **kwargs)
+ tracer.end_trace(shared, "success")
+ return result
+ except Exception as e:
+ tracer.end_trace(shared, "error")
+ raise
+ finally:
+ tracer.flush()
+
+ return traced_flow_func
diff --git a/cookbook/pocketflow-tracing/utils/__init__.py b/cookbook/pocketflow-tracing/utils/__init__.py
new file mode 100644
index 0000000..f5d9c1e
--- /dev/null
+++ b/cookbook/pocketflow-tracing/utils/__init__.py
@@ -0,0 +1,7 @@
+"""
+Utility functions for PocketFlow tracing.
+"""
+
+from .setup import setup_tracing, test_langfuse_connection
+
+__all__ = ['setup_tracing', 'test_langfuse_connection']
diff --git a/cookbook/pocketflow-tracing/utils/setup.py b/cookbook/pocketflow-tracing/utils/setup.py
new file mode 100644
index 0000000..56d3a14
--- /dev/null
+++ b/cookbook/pocketflow-tracing/utils/setup.py
@@ -0,0 +1,163 @@
+"""
+Setup and testing utilities for PocketFlow tracing.
+"""
+
+import os
+import sys
+from typing import Optional
+
+# Add parent directory to path for imports
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+
+try:
+ from langfuse import Langfuse
+ LANGFUSE_AVAILABLE = True
+except ImportError:
+ LANGFUSE_AVAILABLE = False
+
+from tracing import TracingConfig, LangfuseTracer
+
+
+def setup_tracing(env_file: Optional[str] = None) -> TracingConfig:
+ """
+ Set up tracing configuration and validate the setup.
+
+ Args:
+ env_file: Optional path to .env file. If None, uses default location.
+
+ Returns:
+ TracingConfig instance.
+
+ Raises:
+ RuntimeError: If setup fails.
+ """
+ print("🔧 Setting up PocketFlow tracing...")
+
+ # Check if langfuse is installed
+ if not LANGFUSE_AVAILABLE:
+ raise RuntimeError(
+ "Langfuse package not installed. Install with: pip install langfuse"
+ )
+
+ # Load configuration
+ if env_file:
+ config = TracingConfig.from_env(env_file)
+ print(f"✓ Loaded configuration from: {env_file}")
+ else:
+ config = TracingConfig.from_env()
+ print("✓ Loaded configuration from environment")
+
+ # Validate configuration
+ if not config.validate():
+ raise RuntimeError(
+ "Invalid tracing configuration. Please check your environment variables:\n"
+ "- LANGFUSE_SECRET_KEY\n"
+ "- LANGFUSE_PUBLIC_KEY\n"
+ "- LANGFUSE_HOST"
+ )
+
+ print("✓ Configuration validated")
+
+ # Test connection
+ if test_langfuse_connection(config):
+ print("✓ Langfuse connection successful")
+ else:
+ raise RuntimeError("Failed to connect to Langfuse. Check your configuration and network.")
+
+ print("🎉 PocketFlow tracing setup complete!")
+ return config
+
+
+def test_langfuse_connection(config: TracingConfig) -> bool:
+ """
+ Test connection to Langfuse.
+
+ Args:
+ config: TracingConfig instance.
+
+ Returns:
+ True if connection successful, False otherwise.
+ """
+ try:
+ # Create a test tracer
+ tracer = LangfuseTracer(config)
+
+ if not tracer.client:
+ return False
+
+ # Try to start and end a test trace
+ trace_id = tracer.start_trace("test_connection", {"test": True})
+ if trace_id:
+ tracer.end_trace({"test": "completed"}, "success")
+ tracer.flush()
+ return True
+
+ return False
+
+ except Exception as e:
+ if config.debug:
+ print(f"Connection test failed: {e}")
+ return False
+
+
+def print_configuration_help():
+ """Print help information for configuring tracing."""
+ print("""
+🔧 PocketFlow Tracing Configuration Help
+
+To use PocketFlow tracing, you need to configure Langfuse credentials.
+
+1. Create or update your .env file with:
+
+LANGFUSE_SECRET_KEY=your-secret-key
+LANGFUSE_PUBLIC_KEY=your-public-key
+LANGFUSE_HOST=your-langfuse-host
+POCKETFLOW_TRACING_DEBUG=true
+
+2. Optional configuration:
+
+POCKETFLOW_TRACE_INPUTS=true
+POCKETFLOW_TRACE_OUTPUTS=true
+POCKETFLOW_TRACE_PREP=true
+POCKETFLOW_TRACE_EXEC=true
+POCKETFLOW_TRACE_POST=true
+POCKETFLOW_TRACE_ERRORS=true
+POCKETFLOW_SESSION_ID=your-session-id
+POCKETFLOW_USER_ID=your-user-id
+
+3. Install required packages:
+
+pip install -r requirements.txt
+
+4. Test your setup:
+
+python -c "from utils import setup_tracing; setup_tracing()"
+""")
+
+
+if __name__ == "__main__":
+ """Command-line interface for setup and testing."""
+ import argparse
+
+ parser = argparse.ArgumentParser(description="PocketFlow Tracing Setup")
+ parser.add_argument("--test", action="store_true", help="Test Langfuse connection")
+ parser.add_argument("--help-config", action="store_true", help="Show configuration help")
+ parser.add_argument("--env-file", type=str, help="Path to .env file")
+
+ args = parser.parse_args()
+
+ if args.help_config:
+ print_configuration_help()
+ sys.exit(0)
+
+ if args.test:
+ try:
+ config = setup_tracing(args.env_file)
+ print("\n✅ All tests passed! Your tracing setup is ready.")
+ except Exception as e:
+ print(f"\n❌ Setup failed: {e}")
+ print("\nFor help with configuration, run:")
+ print("python utils/setup.py --help-config")
+ sys.exit(1)
+ else:
+ print_configuration_help()
diff --git a/docs/utility_function/viz.md b/docs/utility_function/viz.md
index a518be7..065fdaa 100644
--- a/docs/utility_function/viz.md
+++ b/docs/utility_function/viz.md
@@ -139,4 +139,6 @@ data_science_flow = DataScienceFlow(start=data_prep_node)
data_science_flow.run({})
```
-The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']`
\ No newline at end of file
+The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']`
+
+For a more complete implementation, check out [the cookbook](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-tracing).
\ No newline at end of file