commit
98d0c3833c
|
|
@ -74,3 +74,13 @@ htmlcov/
|
||||||
|
|
||||||
test.ipynb
|
test.ipynb
|
||||||
.pytest_cache/
|
.pytest_cache/
|
||||||
|
cookbook/pocketflow-multi-agent/.python-version
|
||||||
|
|
||||||
|
|
||||||
|
# local
|
||||||
|
uv.lock
|
||||||
|
.python-version
|
||||||
|
pyproject.toml
|
||||||
|
usage.md
|
||||||
|
cookbook/pocketflow-minimal-example/viz/flow_visualization.html
|
||||||
|
cookbook/pocketflow-minimal-example/viz/flow_visualization.json
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@
|
||||||
| [Thinking](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | ★☆☆ <br> *Beginner* | Solve complex reasoning problems through Chain-of-Thought |
|
| [Thinking](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | ★☆☆ <br> *Beginner* | Solve complex reasoning problems through Chain-of-Thought |
|
||||||
| [Memory](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory) | ★☆☆ <br> *Beginner* | A chat bot with short-term and long-term memory |
|
| [Memory](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory) | ★☆☆ <br> *Beginner* | A chat bot with short-term and long-term memory |
|
||||||
| [MCP](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-mcp) | ★☆☆ <br> *Beginner* | Agent using Model Context Protocol for numerical operations |
|
| [MCP](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-mcp) | ★☆☆ <br> *Beginner* | Agent using Model Context Protocol for numerical operations |
|
||||||
|
| [Tracing](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-tracing) | ★☆☆ <br> *Beginner* | Trace and visualize the execution of your flow |
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
# PocketFlow Tracing Configuration Template
|
||||||
|
# Copy this file to .env and replace the placeholder values with your actual Langfuse credentials
|
||||||
|
|
||||||
|
# Required Langfuse configuration
|
||||||
|
LANGFUSE_SECRET_KEY=your-langfuse-secret-key
|
||||||
|
LANGFUSE_PUBLIC_KEY=your-langfuse-public-key
|
||||||
|
LANGFUSE_HOST=your-langfuse-host-url
|
||||||
|
|
||||||
|
# Optional tracing configuration
|
||||||
|
POCKETFLOW_TRACING_DEBUG=true
|
||||||
|
POCKETFLOW_TRACE_INPUTS=true
|
||||||
|
POCKETFLOW_TRACE_OUTPUTS=true
|
||||||
|
POCKETFLOW_TRACE_PREP=true
|
||||||
|
POCKETFLOW_TRACE_EXEC=true
|
||||||
|
POCKETFLOW_TRACE_POST=true
|
||||||
|
POCKETFLOW_TRACE_ERRORS=true
|
||||||
|
|
||||||
|
# Optional session/user tracking
|
||||||
|
POCKETFLOW_SESSION_ID=your-session-id
|
||||||
|
POCKETFLOW_USER_ID=your-user-id
|
||||||
|
|
@ -0,0 +1,290 @@
|
||||||
|
# PocketFlow Tracing with Langfuse
|
||||||
|
|
||||||
|
This cookbook provides comprehensive observability for PocketFlow workflows using [Langfuse](https://langfuse.com/) as the tracing backend. With minimal code changes (just adding a decorator), you can automatically trace all node executions, inputs, outputs, and errors in your PocketFlow workflows.
|
||||||
|
|
||||||
|
## 🎯 Features
|
||||||
|
|
||||||
|
- **Automatic Tracing**: Trace entire flows with a single decorator
|
||||||
|
- **Node-Level Observability**: Automatically trace `prep`, `exec`, and `post` phases of each node
|
||||||
|
- **Input/Output Tracking**: Capture all data flowing through your workflow
|
||||||
|
- **Error Tracking**: Automatically capture and trace exceptions
|
||||||
|
- **Async Support**: Full support for AsyncFlow and AsyncNode
|
||||||
|
- **Minimal Code Changes**: Just add `@trace_flow()` to your flow classes
|
||||||
|
- **Langfuse Integration**: Leverage Langfuse's powerful observability platform
|
||||||
|
|
||||||
|
## 🚀 Quick Start
|
||||||
|
|
||||||
|
### 1. Install Dependencies
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Environment Setup
|
||||||
|
|
||||||
|
Copy the example environment file and configure your Langfuse credentials:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cp .env.example .env
|
||||||
|
```
|
||||||
|
|
||||||
|
Then edit the `.env` file with your actual Langfuse configuration:
|
||||||
|
|
||||||
|
```env
|
||||||
|
LANGFUSE_SECRET_KEY=your-langfuse-secret-key
|
||||||
|
LANGFUSE_PUBLIC_KEY=your-langfuse-public-key
|
||||||
|
LANGFUSE_HOST=your-langfuse-host-url
|
||||||
|
POCKETFLOW_TRACING_DEBUG=true
|
||||||
|
```
|
||||||
|
|
||||||
|
**Note**: Replace the placeholder values with your actual Langfuse credentials and host URL.
|
||||||
|
|
||||||
|
### 3. Basic Usage
|
||||||
|
|
||||||
|
```python
|
||||||
|
from pocketflow import Node, Flow
|
||||||
|
from tracing import trace_flow
|
||||||
|
|
||||||
|
class MyNode(Node):
|
||||||
|
def prep(self, shared):
|
||||||
|
return shared["input"]
|
||||||
|
|
||||||
|
def exec(self, data):
|
||||||
|
return f"Processed: {data}"
|
||||||
|
|
||||||
|
def post(self, shared, prep_res, exec_res):
|
||||||
|
shared["output"] = exec_res
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
@trace_flow() # 🎉 That's it! Your flow is now traced
|
||||||
|
class MyFlow(Flow):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(start=MyNode())
|
||||||
|
|
||||||
|
# Run your flow - tracing happens automatically
|
||||||
|
flow = MyFlow()
|
||||||
|
shared = {"input": "Hello World"}
|
||||||
|
flow.run(shared)
|
||||||
|
```
|
||||||
|
|
||||||
|
## 📊 What Gets Traced
|
||||||
|
|
||||||
|
When you apply the `@trace_flow()` decorator, the system automatically traces:
|
||||||
|
|
||||||
|
### Flow Level
|
||||||
|
- **Flow Start/End**: Overall execution time and status
|
||||||
|
- **Input Data**: Initial shared state when flow starts
|
||||||
|
- **Output Data**: Final shared state when flow completes
|
||||||
|
- **Errors**: Any exceptions that occur during flow execution
|
||||||
|
|
||||||
|
### Node Level
|
||||||
|
For each node in your flow, the system traces:
|
||||||
|
|
||||||
|
- **prep() Phase**:
|
||||||
|
- Input: `shared` data
|
||||||
|
- Output: `prep_res` returned by prep method
|
||||||
|
- Execution time and any errors
|
||||||
|
|
||||||
|
- **exec() Phase**:
|
||||||
|
- Input: `prep_res` from prep phase
|
||||||
|
- Output: `exec_res` returned by exec method
|
||||||
|
- Execution time and any errors
|
||||||
|
- Retry attempts (if configured)
|
||||||
|
|
||||||
|
- **post() Phase**:
|
||||||
|
- Input: `shared`, `prep_res`, `exec_res`
|
||||||
|
- Output: Action string returned
|
||||||
|
- Execution time and any errors
|
||||||
|
|
||||||
|
## 🔧 Configuration Options
|
||||||
|
|
||||||
|
### Basic Configuration
|
||||||
|
|
||||||
|
```python
|
||||||
|
from tracing import trace_flow, TracingConfig
|
||||||
|
|
||||||
|
# Use environment variables (default)
|
||||||
|
@trace_flow()
|
||||||
|
class MyFlow(Flow):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Custom flow name
|
||||||
|
@trace_flow(flow_name="CustomFlowName")
|
||||||
|
class MyFlow(Flow):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Custom session and user IDs
|
||||||
|
@trace_flow(session_id="session-123", user_id="user-456")
|
||||||
|
class MyFlow(Flow):
|
||||||
|
pass
|
||||||
|
```
|
||||||
|
|
||||||
|
### Advanced Configuration
|
||||||
|
|
||||||
|
```python
|
||||||
|
from tracing import TracingConfig
|
||||||
|
|
||||||
|
# Create custom configuration
|
||||||
|
config = TracingConfig(
|
||||||
|
langfuse_secret_key="your-secret-key",
|
||||||
|
langfuse_public_key="your-public-key",
|
||||||
|
langfuse_host="https://your-langfuse-instance.com",
|
||||||
|
debug=True,
|
||||||
|
trace_inputs=True,
|
||||||
|
trace_outputs=True,
|
||||||
|
trace_errors=True
|
||||||
|
)
|
||||||
|
|
||||||
|
@trace_flow(config=config)
|
||||||
|
class MyFlow(Flow):
|
||||||
|
pass
|
||||||
|
```
|
||||||
|
|
||||||
|
## 📁 Examples
|
||||||
|
|
||||||
|
### Basic Synchronous Flow
|
||||||
|
See `examples/basic_example.py` for a complete example of tracing a simple synchronous flow.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd examples
|
||||||
|
python basic_example.py
|
||||||
|
```
|
||||||
|
|
||||||
|
### Asynchronous Flow
|
||||||
|
See `examples/async_example.py` for tracing AsyncFlow and AsyncNode.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd examples
|
||||||
|
python async_example.py
|
||||||
|
```
|
||||||
|
|
||||||
|
## 🔍 Viewing Traces
|
||||||
|
|
||||||
|
After running your traced flows, visit your Langfuse dashboard to view the traces:
|
||||||
|
|
||||||
|
**Dashboard URL**: Use the URL you configured in `LANGFUSE_HOST` environment variable
|
||||||
|
|
||||||
|
In the dashboard you'll see:
|
||||||
|
- **Traces**: One trace per flow execution
|
||||||
|
- **Spans**: Individual node phases (prep, exec, post)
|
||||||
|
- **Input/Output Data**: All data flowing through your workflow
|
||||||
|
- **Performance Metrics**: Execution times for each phase
|
||||||
|
- **Error Details**: Stack traces and error messages
|
||||||
|
|
||||||
|
The tracings in examples.
|
||||||
|

|
||||||
|
|
||||||
|
Detailed tracing for a node.
|
||||||
|

|
||||||
|
|
||||||
|
## 🛠️ Advanced Usage
|
||||||
|
|
||||||
|
### Custom Tracer Configuration
|
||||||
|
|
||||||
|
```python
|
||||||
|
from tracing import TracingConfig, LangfuseTracer
|
||||||
|
|
||||||
|
# Create custom configuration
|
||||||
|
config = TracingConfig.from_env()
|
||||||
|
config.debug = True
|
||||||
|
|
||||||
|
# Use tracer directly (for advanced use cases)
|
||||||
|
tracer = LangfuseTracer(config)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Environment Variables
|
||||||
|
|
||||||
|
You can customize tracing behavior with these environment variables:
|
||||||
|
|
||||||
|
```env
|
||||||
|
# Required Langfuse configuration
|
||||||
|
LANGFUSE_SECRET_KEY=your-secret-key
|
||||||
|
LANGFUSE_PUBLIC_KEY=your-public-key
|
||||||
|
LANGFUSE_HOST=your-langfuse-host
|
||||||
|
|
||||||
|
# Optional tracing configuration
|
||||||
|
POCKETFLOW_TRACING_DEBUG=true
|
||||||
|
POCKETFLOW_TRACE_INPUTS=true
|
||||||
|
POCKETFLOW_TRACE_OUTPUTS=true
|
||||||
|
POCKETFLOW_TRACE_PREP=true
|
||||||
|
POCKETFLOW_TRACE_EXEC=true
|
||||||
|
POCKETFLOW_TRACE_POST=true
|
||||||
|
POCKETFLOW_TRACE_ERRORS=true
|
||||||
|
|
||||||
|
# Optional session/user tracking
|
||||||
|
POCKETFLOW_SESSION_ID=your-session-id
|
||||||
|
POCKETFLOW_USER_ID=your-user-id
|
||||||
|
```
|
||||||
|
|
||||||
|
## 🐛 Troubleshooting
|
||||||
|
|
||||||
|
### Common Issues
|
||||||
|
|
||||||
|
1. **"langfuse package not installed"**
|
||||||
|
```bash
|
||||||
|
pip install langfuse
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **"Langfuse client initialization failed"**
|
||||||
|
- Check your `.env` file configuration
|
||||||
|
- Verify Langfuse server is running at the specified host
|
||||||
|
- Check network connectivity
|
||||||
|
|
||||||
|
3. **"No traces appearing in dashboard"**
|
||||||
|
- Ensure `POCKETFLOW_TRACING_DEBUG=true` to see debug output
|
||||||
|
- Check that your flow is actually being executed
|
||||||
|
- Verify Langfuse credentials are correct
|
||||||
|
|
||||||
|
### Debug Mode
|
||||||
|
|
||||||
|
Enable debug mode to see detailed tracing information:
|
||||||
|
|
||||||
|
```env
|
||||||
|
POCKETFLOW_TRACING_DEBUG=true
|
||||||
|
```
|
||||||
|
|
||||||
|
This will print detailed information about:
|
||||||
|
- Langfuse client initialization
|
||||||
|
- Trace and span creation
|
||||||
|
- Data serialization
|
||||||
|
- Error messages
|
||||||
|
|
||||||
|
## 📚 API Reference
|
||||||
|
|
||||||
|
### `@trace_flow()`
|
||||||
|
|
||||||
|
Decorator to add Langfuse tracing to PocketFlow flows.
|
||||||
|
|
||||||
|
**Parameters:**
|
||||||
|
- `config` (TracingConfig, optional): Custom configuration. If None, loads from environment.
|
||||||
|
- `flow_name` (str, optional): Custom name for the flow. If None, uses class name.
|
||||||
|
- `session_id` (str, optional): Session ID for grouping related traces.
|
||||||
|
- `user_id` (str, optional): User ID for the trace.
|
||||||
|
|
||||||
|
### `TracingConfig`
|
||||||
|
|
||||||
|
Configuration class for tracing settings.
|
||||||
|
|
||||||
|
**Methods:**
|
||||||
|
- `TracingConfig.from_env()`: Create config from environment variables
|
||||||
|
- `validate()`: Check if configuration is valid
|
||||||
|
- `to_langfuse_kwargs()`: Convert to Langfuse client kwargs
|
||||||
|
|
||||||
|
### `LangfuseTracer`
|
||||||
|
|
||||||
|
Core tracer class for Langfuse integration.
|
||||||
|
|
||||||
|
**Methods:**
|
||||||
|
- `start_trace()`: Start a new trace
|
||||||
|
- `end_trace()`: End the current trace
|
||||||
|
- `start_node_span()`: Start a span for node execution
|
||||||
|
- `end_node_span()`: End a node execution span
|
||||||
|
- `flush()`: Flush pending traces to Langfuse
|
||||||
|
|
||||||
|
## 🤝 Contributing
|
||||||
|
|
||||||
|
This cookbook is designed to be a starting point for PocketFlow observability. Feel free to extend and customize it for your specific needs!
|
||||||
|
|
||||||
|
## 📄 License
|
||||||
|
|
||||||
|
This cookbook follows the same license as PocketFlow.
|
||||||
|
|
@ -0,0 +1,140 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Async example demonstrating PocketFlow tracing with Langfuse.
|
||||||
|
|
||||||
|
This example shows how to use the @trace_flow decorator with AsyncFlow
|
||||||
|
and AsyncNode to trace asynchronous workflows.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Load environment variables
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
# Add parent directory to path to import pocketflow and tracing
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||||
|
|
||||||
|
from pocketflow import AsyncNode, AsyncFlow
|
||||||
|
from tracing import trace_flow, TracingConfig
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncDataFetchNode(AsyncNode):
|
||||||
|
"""An async node that simulates fetching data."""
|
||||||
|
|
||||||
|
async def prep_async(self, shared):
|
||||||
|
"""Extract the query from shared data."""
|
||||||
|
query = shared.get("query", "default")
|
||||||
|
return query
|
||||||
|
|
||||||
|
async def exec_async(self, query):
|
||||||
|
"""Simulate async data fetching."""
|
||||||
|
print(f"🔍 Fetching data for query: {query}")
|
||||||
|
|
||||||
|
# Simulate async operation
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
# Return mock data
|
||||||
|
data = {
|
||||||
|
"query": query,
|
||||||
|
"results": [f"Result {i} for {query}" for i in range(3)],
|
||||||
|
"timestamp": "2024-01-01T00:00:00Z",
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
|
||||||
|
async def post_async(self, shared, prep_res, exec_res):
|
||||||
|
"""Store the fetched data."""
|
||||||
|
shared["fetched_data"] = exec_res
|
||||||
|
return "process"
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncDataProcessNode(AsyncNode):
|
||||||
|
"""An async node that processes the fetched data."""
|
||||||
|
|
||||||
|
async def prep_async(self, shared):
|
||||||
|
"""Get the fetched data."""
|
||||||
|
return shared.get("fetched_data", {})
|
||||||
|
|
||||||
|
async def exec_async(self, data):
|
||||||
|
"""Process the data asynchronously."""
|
||||||
|
print("⚙️ Processing fetched data...")
|
||||||
|
|
||||||
|
# Simulate async processing
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
# Process the results
|
||||||
|
processed_results = []
|
||||||
|
for result in data.get("results", []):
|
||||||
|
processed_results.append(f"PROCESSED: {result}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"original_query": data.get("query"),
|
||||||
|
"processed_results": processed_results,
|
||||||
|
"result_count": len(processed_results),
|
||||||
|
}
|
||||||
|
|
||||||
|
async def post_async(self, shared, prep_res, exec_res):
|
||||||
|
"""Store the processed data."""
|
||||||
|
shared["processed_data"] = exec_res
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
|
||||||
|
@trace_flow(flow_name="AsyncDataProcessingFlow")
|
||||||
|
class AsyncDataProcessingFlow(AsyncFlow):
|
||||||
|
"""An async flow that fetches and processes data."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# Create async nodes
|
||||||
|
fetch_node = AsyncDataFetchNode()
|
||||||
|
process_node = AsyncDataProcessNode()
|
||||||
|
|
||||||
|
# Connect nodes
|
||||||
|
fetch_node - "process" >> process_node
|
||||||
|
|
||||||
|
# Initialize async flow
|
||||||
|
super().__init__(start=fetch_node)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""Run the async tracing example."""
|
||||||
|
print("🚀 Starting PocketFlow Async Tracing Example")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
# Create the async flow
|
||||||
|
flow = AsyncDataProcessingFlow()
|
||||||
|
|
||||||
|
# Prepare shared data
|
||||||
|
shared = {"query": "machine learning tutorials"}
|
||||||
|
|
||||||
|
print(f"📥 Input: {shared}")
|
||||||
|
|
||||||
|
# Run the async flow (this will be automatically traced)
|
||||||
|
try:
|
||||||
|
result = await flow.run_async(shared)
|
||||||
|
print(f"📤 Output: {shared}")
|
||||||
|
print(f"🎯 Result: {result}")
|
||||||
|
print("✅ Async flow completed successfully!")
|
||||||
|
|
||||||
|
# Print the processed data
|
||||||
|
if "processed_data" in shared:
|
||||||
|
processed = shared["processed_data"]
|
||||||
|
print(
|
||||||
|
f"🎉 Processed {processed['result_count']} results for query: {processed['original_query']}"
|
||||||
|
)
|
||||||
|
for result in processed["processed_results"]:
|
||||||
|
print(f" - {result}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Async flow failed with error: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
print("\n📊 Check your Langfuse dashboard to see the async trace!")
|
||||||
|
langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host")
|
||||||
|
print(f" Dashboard URL: {langfuse_host}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
@ -0,0 +1,110 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Basic example demonstrating PocketFlow tracing with Langfuse.
|
||||||
|
|
||||||
|
This example shows how to use the @trace_flow decorator to automatically
|
||||||
|
trace a simple PocketFlow workflow.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Load environment variables
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
# Add parent directory to path to import pocketflow and tracing
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||||
|
|
||||||
|
from pocketflow import Node, Flow
|
||||||
|
from tracing import trace_flow, TracingConfig
|
||||||
|
|
||||||
|
|
||||||
|
class GreetingNode(Node):
|
||||||
|
"""A simple node that creates a greeting message."""
|
||||||
|
|
||||||
|
def prep(self, shared):
|
||||||
|
"""Extract the name from shared data."""
|
||||||
|
name = shared.get("name", "World")
|
||||||
|
return name
|
||||||
|
|
||||||
|
def exec(self, name):
|
||||||
|
"""Create a greeting message."""
|
||||||
|
greeting = f"Hello, {name}!"
|
||||||
|
return greeting
|
||||||
|
|
||||||
|
def post(self, shared, prep_res, exec_res):
|
||||||
|
"""Store the greeting in shared data."""
|
||||||
|
shared["greeting"] = exec_res
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
|
||||||
|
class UppercaseNode(Node):
|
||||||
|
"""A node that converts the greeting to uppercase."""
|
||||||
|
|
||||||
|
def prep(self, shared):
|
||||||
|
"""Get the greeting from shared data."""
|
||||||
|
return shared.get("greeting", "")
|
||||||
|
|
||||||
|
def exec(self, greeting):
|
||||||
|
"""Convert to uppercase."""
|
||||||
|
return greeting.upper()
|
||||||
|
|
||||||
|
def post(self, shared, prep_res, exec_res):
|
||||||
|
"""Store the uppercase greeting."""
|
||||||
|
shared["uppercase_greeting"] = exec_res
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
|
||||||
|
@trace_flow(flow_name="BasicGreetingFlow")
|
||||||
|
class BasicGreetingFlow(Flow):
|
||||||
|
"""A simple flow that creates and processes a greeting."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# Create nodes
|
||||||
|
greeting_node = GreetingNode()
|
||||||
|
uppercase_node = UppercaseNode()
|
||||||
|
|
||||||
|
# Connect nodes
|
||||||
|
greeting_node >> uppercase_node
|
||||||
|
|
||||||
|
# Initialize flow
|
||||||
|
super().__init__(start=greeting_node)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Run the basic tracing example."""
|
||||||
|
print("🚀 Starting PocketFlow Tracing Basic Example")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
# Create the flow
|
||||||
|
flow = BasicGreetingFlow()
|
||||||
|
|
||||||
|
# Prepare shared data
|
||||||
|
shared = {"name": "PocketFlow User"}
|
||||||
|
|
||||||
|
print(f"📥 Input: {shared}")
|
||||||
|
|
||||||
|
# Run the flow (this will be automatically traced)
|
||||||
|
try:
|
||||||
|
result = flow.run(shared)
|
||||||
|
print(f"📤 Output: {shared}")
|
||||||
|
print(f"🎯 Result: {result}")
|
||||||
|
print("✅ Flow completed successfully!")
|
||||||
|
|
||||||
|
# Print the final greeting
|
||||||
|
if "uppercase_greeting" in shared:
|
||||||
|
print(f"🎉 Final greeting: {shared['uppercase_greeting']}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Flow failed with error: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
print("\n📊 Check your Langfuse dashboard to see the trace!")
|
||||||
|
langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host")
|
||||||
|
print(f" Dashboard URL: {langfuse_host}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
# Core dependencies for PocketFlow tracing
|
||||||
|
langfuse>=2.0.0,<3.0.0 # v2 low level SDK compatible with Langfuse servers
|
||||||
|
python-dotenv>=1.0.0
|
||||||
|
|
||||||
|
# Optional dependencies for enhanced functionality
|
||||||
|
pydantic>=2.0.0 # For data validation and serialization
|
||||||
Binary file not shown.
|
After Width: | Height: | Size: 32 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 36 KiB |
|
|
@ -0,0 +1,81 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Setup script for PocketFlow Tracing cookbook.
|
||||||
|
|
||||||
|
This script helps install dependencies and verify the setup.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def install_dependencies():
|
||||||
|
"""Install required dependencies."""
|
||||||
|
print("📦 Installing dependencies...")
|
||||||
|
try:
|
||||||
|
subprocess.check_call(
|
||||||
|
[sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]
|
||||||
|
)
|
||||||
|
print("✅ Dependencies installed successfully!")
|
||||||
|
return True
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"❌ Failed to install dependencies: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def verify_setup():
|
||||||
|
"""Verify that the setup is working."""
|
||||||
|
print("🔍 Verifying setup...")
|
||||||
|
try:
|
||||||
|
# Try to import the tracing module
|
||||||
|
from tracing import trace_flow, TracingConfig
|
||||||
|
|
||||||
|
print("✅ Tracing module imported successfully!")
|
||||||
|
|
||||||
|
# Try to load configuration
|
||||||
|
config = TracingConfig.from_env()
|
||||||
|
if config.validate():
|
||||||
|
print("✅ Configuration is valid!")
|
||||||
|
else:
|
||||||
|
print("⚠️ Configuration validation failed - check your .env file")
|
||||||
|
|
||||||
|
return True
|
||||||
|
except ImportError as e:
|
||||||
|
print(f"❌ Failed to import tracing module: {e}")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Setup verification failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main setup function."""
|
||||||
|
print("🚀 PocketFlow Tracing Setup")
|
||||||
|
print("=" * 40)
|
||||||
|
|
||||||
|
# Check if we're in the right directory
|
||||||
|
if not os.path.exists("requirements.txt"):
|
||||||
|
print(
|
||||||
|
"❌ requirements.txt not found. Please run this script from the pocketflow-tracing directory."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Install dependencies
|
||||||
|
if not install_dependencies():
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Verify setup
|
||||||
|
if not verify_setup():
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print("\n🎉 Setup completed successfully!")
|
||||||
|
print("\n📚 Next steps:")
|
||||||
|
print("1. Check the README.md for usage instructions")
|
||||||
|
print("2. Run the examples: python examples/basic_example.py")
|
||||||
|
print("3. Run the test suite: python test_tracing.py")
|
||||||
|
print("4. Check your Langfuse dashboard (URL configured in LANGFUSE_HOST)")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,197 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test script for PocketFlow tracing functionality.
|
||||||
|
|
||||||
|
This script tests the tracing implementation to ensure it works correctly
|
||||||
|
with Langfuse integration.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import asyncio
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# Load environment variables
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
# Add paths for imports
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||||
|
sys.path.insert(0, os.path.dirname(__file__))
|
||||||
|
|
||||||
|
from pocketflow import Node, Flow, AsyncNode, AsyncFlow
|
||||||
|
from tracing import trace_flow, TracingConfig
|
||||||
|
from utils import setup_tracing
|
||||||
|
|
||||||
|
|
||||||
|
class TestNode(Node):
|
||||||
|
"""Simple test node for tracing verification."""
|
||||||
|
|
||||||
|
def prep(self, shared):
|
||||||
|
"""Test prep phase."""
|
||||||
|
return shared.get("input", "test_input")
|
||||||
|
|
||||||
|
def exec(self, prep_res):
|
||||||
|
"""Test exec phase."""
|
||||||
|
return f"processed_{prep_res}"
|
||||||
|
|
||||||
|
def post(self, shared, prep_res, exec_res):
|
||||||
|
"""Test post phase."""
|
||||||
|
shared["output"] = exec_res
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
|
||||||
|
class TestAsyncNode(AsyncNode):
|
||||||
|
"""Simple async test node for tracing verification."""
|
||||||
|
|
||||||
|
async def prep_async(self, shared):
|
||||||
|
"""Test async prep phase."""
|
||||||
|
await asyncio.sleep(0.1) # Simulate async work
|
||||||
|
return shared.get("input", "async_test_input")
|
||||||
|
|
||||||
|
async def exec_async(self, prep_res):
|
||||||
|
"""Test async exec phase."""
|
||||||
|
await asyncio.sleep(0.1) # Simulate async work
|
||||||
|
return f"async_processed_{prep_res}"
|
||||||
|
|
||||||
|
async def post_async(self, shared, prep_res, exec_res):
|
||||||
|
"""Test async post phase."""
|
||||||
|
shared["output"] = exec_res
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
|
||||||
|
@trace_flow(flow_name="TestSyncFlow")
|
||||||
|
class TestSyncFlow(Flow):
|
||||||
|
"""Test synchronous flow with tracing."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(start=TestNode())
|
||||||
|
|
||||||
|
|
||||||
|
@trace_flow(flow_name="TestAsyncFlow")
|
||||||
|
class TestAsyncFlow(AsyncFlow):
|
||||||
|
"""Test asynchronous flow with tracing."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(start=TestAsyncNode())
|
||||||
|
|
||||||
|
|
||||||
|
def test_sync_flow():
|
||||||
|
"""Test synchronous flow tracing."""
|
||||||
|
print("🧪 Testing synchronous flow tracing...")
|
||||||
|
|
||||||
|
flow = TestSyncFlow()
|
||||||
|
shared = {"input": "sync_test_data"}
|
||||||
|
|
||||||
|
print(f" Input: {shared}")
|
||||||
|
result = flow.run(shared)
|
||||||
|
print(f" Output: {shared}")
|
||||||
|
print(f" Result: {result}")
|
||||||
|
|
||||||
|
# Verify the flow worked
|
||||||
|
assert "output" in shared
|
||||||
|
assert shared["output"] == "processed_sync_test_data"
|
||||||
|
print(" ✅ Sync flow test passed")
|
||||||
|
|
||||||
|
|
||||||
|
async def test_async_flow():
|
||||||
|
"""Test asynchronous flow tracing."""
|
||||||
|
print("🧪 Testing asynchronous flow tracing...")
|
||||||
|
|
||||||
|
flow = TestAsyncFlow()
|
||||||
|
shared = {"input": "async_test_data"}
|
||||||
|
|
||||||
|
print(f" Input: {shared}")
|
||||||
|
result = await flow.run_async(shared)
|
||||||
|
print(f" Output: {shared}")
|
||||||
|
print(f" Result: {result}")
|
||||||
|
|
||||||
|
# Verify the flow worked
|
||||||
|
assert "output" in shared
|
||||||
|
assert shared["output"] == "async_processed_async_test_data"
|
||||||
|
print(" ✅ Async flow test passed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_configuration():
|
||||||
|
"""Test configuration loading and validation."""
|
||||||
|
print("🧪 Testing configuration...")
|
||||||
|
|
||||||
|
# Test loading from environment
|
||||||
|
config = TracingConfig.from_env()
|
||||||
|
print(f" Loaded config: debug={config.debug}")
|
||||||
|
|
||||||
|
# Test validation
|
||||||
|
is_valid = config.validate()
|
||||||
|
print(f" Config valid: {is_valid}")
|
||||||
|
|
||||||
|
if is_valid:
|
||||||
|
print(" ✅ Configuration test passed")
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
" ⚠️ Configuration test failed (this may be expected if env vars not set)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_error_handling():
|
||||||
|
"""Test error handling in traced flows."""
|
||||||
|
print("🧪 Testing error handling...")
|
||||||
|
|
||||||
|
class ErrorNode(Node):
|
||||||
|
def exec(self, prep_res):
|
||||||
|
raise ValueError("Test error for tracing")
|
||||||
|
|
||||||
|
@trace_flow(flow_name="TestErrorFlow")
|
||||||
|
class ErrorFlow(Flow):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(start=ErrorNode())
|
||||||
|
|
||||||
|
flow = ErrorFlow()
|
||||||
|
shared = {"input": "error_test"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
flow.run(shared)
|
||||||
|
print(" ❌ Expected error but flow succeeded")
|
||||||
|
except ValueError as e:
|
||||||
|
print(f" ✅ Error correctly caught and traced: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠️ Unexpected error type: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""Run all tests."""
|
||||||
|
print("🚀 Starting PocketFlow Tracing Tests")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
# Test configuration first
|
||||||
|
test_configuration()
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Test setup (optional - only if environment is configured)
|
||||||
|
try:
|
||||||
|
print("🔧 Testing setup...")
|
||||||
|
config = setup_tracing()
|
||||||
|
print(" ✅ Setup test passed")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ⚠️ Setup test failed: {e}")
|
||||||
|
print(" (This is expected if Langfuse is not configured)")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Test sync flow
|
||||||
|
test_sync_flow()
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Test async flow
|
||||||
|
await test_async_flow()
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Test error handling
|
||||||
|
test_error_handling()
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("🎉 All tests completed!")
|
||||||
|
print("\n📊 If Langfuse is configured, check your dashboard for traces:")
|
||||||
|
langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host")
|
||||||
|
print(f" Dashboard URL: {langfuse_host}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
"""
|
||||||
|
PocketFlow Tracing Module
|
||||||
|
|
||||||
|
This module provides observability and tracing capabilities for PocketFlow workflows
|
||||||
|
using Langfuse as the backend. It includes decorators and utilities to automatically
|
||||||
|
trace node execution, inputs, and outputs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .config import TracingConfig
|
||||||
|
from .core import LangfuseTracer
|
||||||
|
from .decorator import trace_flow
|
||||||
|
|
||||||
|
__all__ = ["trace_flow", "TracingConfig", "LangfuseTracer"]
|
||||||
|
|
@ -0,0 +1,111 @@
|
||||||
|
"""
|
||||||
|
Configuration module for PocketFlow tracing with Langfuse.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TracingConfig:
|
||||||
|
"""Configuration class for PocketFlow tracing with Langfuse."""
|
||||||
|
|
||||||
|
# Langfuse configuration
|
||||||
|
langfuse_secret_key: Optional[str] = None
|
||||||
|
langfuse_public_key: Optional[str] = None
|
||||||
|
langfuse_host: Optional[str] = None
|
||||||
|
|
||||||
|
# PocketFlow tracing configuration
|
||||||
|
debug: bool = False
|
||||||
|
trace_inputs: bool = True
|
||||||
|
trace_outputs: bool = True
|
||||||
|
trace_prep: bool = True
|
||||||
|
trace_exec: bool = True
|
||||||
|
trace_post: bool = True
|
||||||
|
trace_errors: bool = True
|
||||||
|
|
||||||
|
# Session configuration
|
||||||
|
session_id: Optional[str] = None
|
||||||
|
user_id: Optional[str] = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_env(cls, env_file: Optional[str] = None) -> "TracingConfig":
|
||||||
|
"""
|
||||||
|
Create TracingConfig from environment variables.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
env_file: Optional path to .env file. If None, looks for .env in current directory.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
TracingConfig instance with values from environment variables.
|
||||||
|
"""
|
||||||
|
# Load environment variables from .env file if it exists
|
||||||
|
if env_file:
|
||||||
|
load_dotenv(env_file)
|
||||||
|
else:
|
||||||
|
# Try to find .env file in current directory or parent directories
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
return cls(
|
||||||
|
langfuse_secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
|
||||||
|
langfuse_public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
|
||||||
|
langfuse_host=os.getenv("LANGFUSE_HOST"),
|
||||||
|
debug=os.getenv("POCKETFLOW_TRACING_DEBUG", "false").lower() == "true",
|
||||||
|
trace_inputs=os.getenv("POCKETFLOW_TRACE_INPUTS", "true").lower() == "true",
|
||||||
|
trace_outputs=os.getenv("POCKETFLOW_TRACE_OUTPUTS", "true").lower() == "true",
|
||||||
|
trace_prep=os.getenv("POCKETFLOW_TRACE_PREP", "true").lower() == "true",
|
||||||
|
trace_exec=os.getenv("POCKETFLOW_TRACE_EXEC", "true").lower() == "true",
|
||||||
|
trace_post=os.getenv("POCKETFLOW_TRACE_POST", "true").lower() == "true",
|
||||||
|
trace_errors=os.getenv("POCKETFLOW_TRACE_ERRORS", "true").lower() == "true",
|
||||||
|
session_id=os.getenv("POCKETFLOW_SESSION_ID"),
|
||||||
|
user_id=os.getenv("POCKETFLOW_USER_ID"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def validate(self) -> bool:
|
||||||
|
"""
|
||||||
|
Validate that required configuration is present.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if configuration is valid, False otherwise.
|
||||||
|
"""
|
||||||
|
if not self.langfuse_secret_key:
|
||||||
|
if self.debug:
|
||||||
|
print("Warning: LANGFUSE_SECRET_KEY not set")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self.langfuse_public_key:
|
||||||
|
if self.debug:
|
||||||
|
print("Warning: LANGFUSE_PUBLIC_KEY not set")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self.langfuse_host:
|
||||||
|
if self.debug:
|
||||||
|
print("Warning: LANGFUSE_HOST not set")
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def to_langfuse_kwargs(self) -> dict:
|
||||||
|
"""
|
||||||
|
Convert configuration to kwargs for Langfuse client initialization.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary of kwargs for Langfuse client.
|
||||||
|
"""
|
||||||
|
kwargs = {}
|
||||||
|
|
||||||
|
if self.langfuse_secret_key:
|
||||||
|
kwargs["secret_key"] = self.langfuse_secret_key
|
||||||
|
|
||||||
|
if self.langfuse_public_key:
|
||||||
|
kwargs["public_key"] = self.langfuse_public_key
|
||||||
|
|
||||||
|
if self.langfuse_host:
|
||||||
|
kwargs["host"] = self.langfuse_host
|
||||||
|
|
||||||
|
if self.debug:
|
||||||
|
kwargs["debug"] = True
|
||||||
|
|
||||||
|
return kwargs
|
||||||
|
|
@ -0,0 +1,287 @@
|
||||||
|
"""
|
||||||
|
Core tracing functionality for PocketFlow with Langfuse integration.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from typing import Any, Dict, Optional, Union
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
try:
|
||||||
|
from langfuse import Langfuse
|
||||||
|
|
||||||
|
LANGFUSE_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
LANGFUSE_AVAILABLE = False
|
||||||
|
print("Warning: langfuse package not installed. Install with: pip install langfuse")
|
||||||
|
|
||||||
|
from .config import TracingConfig
|
||||||
|
|
||||||
|
|
||||||
|
class LangfuseTracer:
|
||||||
|
"""
|
||||||
|
Core tracer class that handles Langfuse integration for PocketFlow.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, config: TracingConfig):
|
||||||
|
"""
|
||||||
|
Initialize the LangfuseTracer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: TracingConfig instance with Langfuse settings.
|
||||||
|
"""
|
||||||
|
self.config = config
|
||||||
|
self.client = None
|
||||||
|
self.current_trace = None
|
||||||
|
self.spans = {} # Store spans by node ID
|
||||||
|
|
||||||
|
if LANGFUSE_AVAILABLE and config.validate():
|
||||||
|
try:
|
||||||
|
# Initialize Langfuse client with proper parameters
|
||||||
|
kwargs = {}
|
||||||
|
if config.langfuse_secret_key:
|
||||||
|
kwargs["secret_key"] = config.langfuse_secret_key
|
||||||
|
if config.langfuse_public_key:
|
||||||
|
kwargs["public_key"] = config.langfuse_public_key
|
||||||
|
if config.langfuse_host:
|
||||||
|
kwargs["host"] = config.langfuse_host
|
||||||
|
if config.debug:
|
||||||
|
kwargs["debug"] = True
|
||||||
|
|
||||||
|
self.client = Langfuse(**kwargs)
|
||||||
|
if config.debug:
|
||||||
|
print(
|
||||||
|
f"✓ Langfuse client initialized with host: {config.langfuse_host}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
if config.debug:
|
||||||
|
print(f"✗ Failed to initialize Langfuse client: {e}")
|
||||||
|
self.client = None
|
||||||
|
else:
|
||||||
|
if config.debug:
|
||||||
|
print("✗ Langfuse not available or configuration invalid")
|
||||||
|
|
||||||
|
def start_trace(self, flow_name: str, input_data: Dict[str, Any]) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Start a new trace for a flow execution.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
flow_name: Name of the flow being traced.
|
||||||
|
input_data: Input data for the flow.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Trace ID if successful, None otherwise.
|
||||||
|
"""
|
||||||
|
if not self.client:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Serialize input data safely
|
||||||
|
serialized_input = self._serialize_data(input_data)
|
||||||
|
|
||||||
|
# Use Langfuse v2 API to create a trace
|
||||||
|
self.current_trace = self.client.trace(
|
||||||
|
name=flow_name,
|
||||||
|
input=serialized_input,
|
||||||
|
metadata={
|
||||||
|
"framework": "PocketFlow",
|
||||||
|
"trace_type": "flow_execution",
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
},
|
||||||
|
session_id=self.config.session_id,
|
||||||
|
user_id=self.config.user_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the trace ID
|
||||||
|
trace_id = self.current_trace.id
|
||||||
|
|
||||||
|
if self.config.debug:
|
||||||
|
print(f"✓ Started trace: {trace_id} for flow: {flow_name}")
|
||||||
|
|
||||||
|
return trace_id
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if self.config.debug:
|
||||||
|
print(f"✗ Failed to start trace: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def end_trace(self, output_data: Dict[str, Any], status: str = "success") -> None:
|
||||||
|
"""
|
||||||
|
End the current trace.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_data: Output data from the flow.
|
||||||
|
status: Status of the trace execution.
|
||||||
|
"""
|
||||||
|
if not self.current_trace:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Serialize output data safely
|
||||||
|
serialized_output = self._serialize_data(output_data)
|
||||||
|
|
||||||
|
# Update the trace with output data using v2 API
|
||||||
|
self.current_trace.update(
|
||||||
|
output=serialized_output,
|
||||||
|
metadata={
|
||||||
|
"status": status,
|
||||||
|
"end_timestamp": datetime.now().isoformat(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.config.debug:
|
||||||
|
print(f"✓ Ended trace with status: {status}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if self.config.debug:
|
||||||
|
print(f"✗ Failed to end trace: {e}")
|
||||||
|
finally:
|
||||||
|
self.current_trace = None
|
||||||
|
self.spans.clear()
|
||||||
|
|
||||||
|
def start_node_span(
|
||||||
|
self, node_name: str, node_id: str, phase: str
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Start a span for a node execution phase.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
node_name: Name/type of the node.
|
||||||
|
node_id: Unique identifier for the node instance.
|
||||||
|
phase: Execution phase (prep, exec, post).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Span ID if successful, None otherwise.
|
||||||
|
"""
|
||||||
|
if not self.current_trace:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
span_id = f"{node_id}_{phase}"
|
||||||
|
|
||||||
|
# Create a child span using v2 API
|
||||||
|
span = self.current_trace.span(
|
||||||
|
name=f"{node_name}.{phase}",
|
||||||
|
metadata={
|
||||||
|
"node_type": node_name,
|
||||||
|
"node_id": node_id,
|
||||||
|
"phase": phase,
|
||||||
|
"start_timestamp": datetime.now().isoformat(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
self.spans[span_id] = span
|
||||||
|
|
||||||
|
if self.config.debug:
|
||||||
|
print(f"✓ Started span: {span_id}")
|
||||||
|
|
||||||
|
return span_id
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if self.config.debug:
|
||||||
|
print(f"✗ Failed to start span: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def end_node_span(
|
||||||
|
self,
|
||||||
|
span_id: str,
|
||||||
|
input_data: Any = None,
|
||||||
|
output_data: Any = None,
|
||||||
|
error: Exception = None,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
End a node execution span.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
span_id: ID of the span to end.
|
||||||
|
input_data: Input data for the phase.
|
||||||
|
output_data: Output data from the phase.
|
||||||
|
error: Exception if the phase failed.
|
||||||
|
"""
|
||||||
|
if span_id not in self.spans:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
span = self.spans[span_id]
|
||||||
|
|
||||||
|
# Prepare update data
|
||||||
|
update_data = {}
|
||||||
|
|
||||||
|
if input_data is not None and self.config.trace_inputs:
|
||||||
|
update_data["input"] = self._serialize_data(input_data)
|
||||||
|
if output_data is not None and self.config.trace_outputs:
|
||||||
|
update_data["output"] = self._serialize_data(output_data)
|
||||||
|
|
||||||
|
if error and self.config.trace_errors:
|
||||||
|
update_data.update(
|
||||||
|
{
|
||||||
|
"level": "ERROR",
|
||||||
|
"status_message": str(error),
|
||||||
|
"metadata": {
|
||||||
|
"error_type": type(error).__name__,
|
||||||
|
"error_message": str(error),
|
||||||
|
"end_timestamp": datetime.now().isoformat(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
update_data.update(
|
||||||
|
{
|
||||||
|
"level": "DEFAULT",
|
||||||
|
"metadata": {"end_timestamp": datetime.now().isoformat()},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update the span with all data at once
|
||||||
|
span.update(**update_data)
|
||||||
|
|
||||||
|
# End the span
|
||||||
|
span.end()
|
||||||
|
|
||||||
|
if self.config.debug:
|
||||||
|
status = "ERROR" if error else "SUCCESS"
|
||||||
|
print(f"✓ Ended span: {span_id} with status: {status}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if self.config.debug:
|
||||||
|
print(f"✗ Failed to end span: {e}")
|
||||||
|
finally:
|
||||||
|
if span_id in self.spans:
|
||||||
|
del self.spans[span_id]
|
||||||
|
|
||||||
|
def _serialize_data(self, data: Any) -> Any:
|
||||||
|
"""
|
||||||
|
Safely serialize data for Langfuse.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Data to serialize.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Serialized data that can be sent to Langfuse.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Handle common PocketFlow data types
|
||||||
|
if hasattr(data, "__dict__"):
|
||||||
|
# Convert objects to dict representation
|
||||||
|
return {"_type": type(data).__name__, "_data": str(data)}
|
||||||
|
elif isinstance(data, (dict, list, str, int, float, bool, type(None))):
|
||||||
|
# JSON-serializable types
|
||||||
|
return data
|
||||||
|
else:
|
||||||
|
# Fallback to string representation
|
||||||
|
return {"_type": type(data).__name__, "_data": str(data)}
|
||||||
|
except Exception:
|
||||||
|
# Ultimate fallback
|
||||||
|
return {"_type": "unknown", "_data": "<serialization_failed>"}
|
||||||
|
|
||||||
|
def flush(self) -> None:
|
||||||
|
"""Flush any pending traces to Langfuse."""
|
||||||
|
if self.client:
|
||||||
|
try:
|
||||||
|
self.client.flush()
|
||||||
|
if self.config.debug:
|
||||||
|
print("✓ Flushed traces to Langfuse")
|
||||||
|
except Exception as e:
|
||||||
|
if self.config.debug:
|
||||||
|
print(f"✗ Failed to flush traces: {e}")
|
||||||
|
|
@ -0,0 +1,293 @@
|
||||||
|
"""
|
||||||
|
Decorator for tracing PocketFlow workflows with Langfuse.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import inspect
|
||||||
|
import uuid
|
||||||
|
from typing import Any, Callable, Dict, Optional, Union
|
||||||
|
|
||||||
|
from .config import TracingConfig
|
||||||
|
from .core import LangfuseTracer
|
||||||
|
|
||||||
|
|
||||||
|
def trace_flow(
|
||||||
|
config: Optional[TracingConfig] = None,
|
||||||
|
flow_name: Optional[str] = None,
|
||||||
|
session_id: Optional[str] = None,
|
||||||
|
user_id: Optional[str] = None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Decorator to add Langfuse tracing to PocketFlow flows.
|
||||||
|
|
||||||
|
This decorator automatically traces:
|
||||||
|
- Flow execution start/end
|
||||||
|
- Each node's prep, exec, and post phases
|
||||||
|
- Input and output data for each phase
|
||||||
|
- Errors and exceptions
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: TracingConfig instance. If None, loads from environment.
|
||||||
|
flow_name: Custom name for the flow. If None, uses the flow class name.
|
||||||
|
session_id: Session ID for grouping related traces.
|
||||||
|
user_id: User ID for the trace.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Decorated flow class or function.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
```python
|
||||||
|
from tracing import trace_flow
|
||||||
|
|
||||||
|
@trace_flow()
|
||||||
|
class MyFlow(Flow):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(start=MyNode())
|
||||||
|
|
||||||
|
# Or with custom configuration
|
||||||
|
config = TracingConfig.from_env()
|
||||||
|
|
||||||
|
@trace_flow(config=config, flow_name="CustomFlow")
|
||||||
|
class MyFlow(Flow):
|
||||||
|
pass
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
def decorator(flow_class_or_func):
|
||||||
|
# Handle both class and function decoration
|
||||||
|
if inspect.isclass(flow_class_or_func):
|
||||||
|
return _trace_flow_class(flow_class_or_func, config, flow_name, session_id, user_id)
|
||||||
|
else:
|
||||||
|
return _trace_flow_function(flow_class_or_func, config, flow_name, session_id, user_id)
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
|
def _trace_flow_class(flow_class, config, flow_name, session_id, user_id):
|
||||||
|
"""Trace a Flow class by wrapping its methods."""
|
||||||
|
|
||||||
|
# Get or create config
|
||||||
|
if config is None:
|
||||||
|
config = TracingConfig.from_env()
|
||||||
|
|
||||||
|
# Override session/user if provided
|
||||||
|
if session_id:
|
||||||
|
config.session_id = session_id
|
||||||
|
if user_id:
|
||||||
|
config.user_id = user_id
|
||||||
|
|
||||||
|
# Get flow name
|
||||||
|
if flow_name is None:
|
||||||
|
flow_name = flow_class.__name__
|
||||||
|
|
||||||
|
# Store original methods
|
||||||
|
original_init = flow_class.__init__
|
||||||
|
original_run = getattr(flow_class, 'run', None)
|
||||||
|
original_run_async = getattr(flow_class, 'run_async', None)
|
||||||
|
|
||||||
|
def traced_init(self, *args, **kwargs):
|
||||||
|
"""Initialize the flow with tracing capabilities."""
|
||||||
|
# Call original init
|
||||||
|
original_init(self, *args, **kwargs)
|
||||||
|
|
||||||
|
# Add tracing attributes
|
||||||
|
self._tracer = LangfuseTracer(config)
|
||||||
|
self._flow_name = flow_name
|
||||||
|
self._trace_id = None
|
||||||
|
|
||||||
|
# Patch all nodes in the flow
|
||||||
|
self._patch_nodes()
|
||||||
|
|
||||||
|
def traced_run(self, shared):
|
||||||
|
"""Traced version of the run method."""
|
||||||
|
if not hasattr(self, '_tracer'):
|
||||||
|
# Fallback if not properly initialized
|
||||||
|
return original_run(self, shared) if original_run else None
|
||||||
|
|
||||||
|
# Start trace
|
||||||
|
self._trace_id = self._tracer.start_trace(self._flow_name, shared)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run the original flow
|
||||||
|
result = original_run(self, shared) if original_run else None
|
||||||
|
|
||||||
|
# End trace successfully
|
||||||
|
self._tracer.end_trace(shared, "success")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# End trace with error
|
||||||
|
self._tracer.end_trace(shared, "error")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
# Ensure cleanup
|
||||||
|
self._tracer.flush()
|
||||||
|
|
||||||
|
async def traced_run_async(self, shared):
|
||||||
|
"""Traced version of the async run method."""
|
||||||
|
if not hasattr(self, '_tracer'):
|
||||||
|
# Fallback if not properly initialized
|
||||||
|
return await original_run_async(self, shared) if original_run_async else None
|
||||||
|
|
||||||
|
# Start trace
|
||||||
|
self._trace_id = self._tracer.start_trace(self._flow_name, shared)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run the original flow
|
||||||
|
result = await original_run_async(self, shared) if original_run_async else None
|
||||||
|
|
||||||
|
# End trace successfully
|
||||||
|
self._tracer.end_trace(shared, "success")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# End trace with error
|
||||||
|
self._tracer.end_trace(shared, "error")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
# Ensure cleanup
|
||||||
|
self._tracer.flush()
|
||||||
|
|
||||||
|
def patch_nodes(self):
|
||||||
|
"""Patch all nodes in the flow to add tracing."""
|
||||||
|
if not hasattr(self, 'start_node') or not self.start_node:
|
||||||
|
return
|
||||||
|
|
||||||
|
visited = set()
|
||||||
|
nodes_to_patch = [self.start_node]
|
||||||
|
|
||||||
|
while nodes_to_patch:
|
||||||
|
node = nodes_to_patch.pop(0)
|
||||||
|
if id(node) in visited:
|
||||||
|
continue
|
||||||
|
|
||||||
|
visited.add(id(node))
|
||||||
|
|
||||||
|
# Patch this node
|
||||||
|
self._patch_node(node)
|
||||||
|
|
||||||
|
# Add successors to patch list
|
||||||
|
if hasattr(node, 'successors'):
|
||||||
|
for successor in node.successors.values():
|
||||||
|
if successor and id(successor) not in visited:
|
||||||
|
nodes_to_patch.append(successor)
|
||||||
|
|
||||||
|
def patch_node(self, node):
|
||||||
|
"""Patch a single node to add tracing."""
|
||||||
|
if hasattr(node, '_pocketflow_traced'):
|
||||||
|
return # Already patched
|
||||||
|
|
||||||
|
node_id = str(uuid.uuid4())
|
||||||
|
node_name = type(node).__name__
|
||||||
|
|
||||||
|
# Store original methods
|
||||||
|
original_prep = getattr(node, 'prep', None)
|
||||||
|
original_exec = getattr(node, 'exec', None)
|
||||||
|
original_post = getattr(node, 'post', None)
|
||||||
|
original_prep_async = getattr(node, 'prep_async', None)
|
||||||
|
original_exec_async = getattr(node, 'exec_async', None)
|
||||||
|
original_post_async = getattr(node, 'post_async', None)
|
||||||
|
|
||||||
|
# Create traced versions
|
||||||
|
if original_prep:
|
||||||
|
node.prep = self._create_traced_method(original_prep, node_id, node_name, 'prep')
|
||||||
|
if original_exec:
|
||||||
|
node.exec = self._create_traced_method(original_exec, node_id, node_name, 'exec')
|
||||||
|
if original_post:
|
||||||
|
node.post = self._create_traced_method(original_post, node_id, node_name, 'post')
|
||||||
|
if original_prep_async:
|
||||||
|
node.prep_async = self._create_traced_async_method(original_prep_async, node_id, node_name, 'prep')
|
||||||
|
if original_exec_async:
|
||||||
|
node.exec_async = self._create_traced_async_method(original_exec_async, node_id, node_name, 'exec')
|
||||||
|
if original_post_async:
|
||||||
|
node.post_async = self._create_traced_async_method(original_post_async, node_id, node_name, 'post')
|
||||||
|
|
||||||
|
# Mark as traced
|
||||||
|
node._pocketflow_traced = True
|
||||||
|
|
||||||
|
def create_traced_method(self, original_method, node_id, node_name, phase):
|
||||||
|
"""Create a traced version of a synchronous method."""
|
||||||
|
@functools.wraps(original_method)
|
||||||
|
def traced_method(*args, **kwargs):
|
||||||
|
span_id = self._tracer.start_node_span(node_name, node_id, phase)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = original_method(*args, **kwargs)
|
||||||
|
self._tracer.end_node_span(span_id, input_data=args, output_data=result)
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
self._tracer.end_node_span(span_id, input_data=args, error=e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
return traced_method
|
||||||
|
|
||||||
|
def create_traced_async_method(self, original_method, node_id, node_name, phase):
|
||||||
|
"""Create a traced version of an asynchronous method."""
|
||||||
|
@functools.wraps(original_method)
|
||||||
|
async def traced_async_method(*args, **kwargs):
|
||||||
|
span_id = self._tracer.start_node_span(node_name, node_id, phase)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = await original_method(*args, **kwargs)
|
||||||
|
self._tracer.end_node_span(span_id, input_data=args, output_data=result)
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
self._tracer.end_node_span(span_id, input_data=args, error=e)
|
||||||
|
raise
|
||||||
|
|
||||||
|
return traced_async_method
|
||||||
|
|
||||||
|
# Replace methods on the class
|
||||||
|
flow_class.__init__ = traced_init
|
||||||
|
flow_class._patch_nodes = patch_nodes
|
||||||
|
flow_class._patch_node = patch_node
|
||||||
|
flow_class._create_traced_method = create_traced_method
|
||||||
|
flow_class._create_traced_async_method = create_traced_async_method
|
||||||
|
|
||||||
|
if original_run:
|
||||||
|
flow_class.run = traced_run
|
||||||
|
if original_run_async:
|
||||||
|
flow_class.run_async = traced_run_async
|
||||||
|
|
||||||
|
return flow_class
|
||||||
|
|
||||||
|
|
||||||
|
def _trace_flow_function(flow_func, config, flow_name, session_id, user_id):
|
||||||
|
"""Trace a flow function (for functional-style flows)."""
|
||||||
|
|
||||||
|
# Get or create config
|
||||||
|
if config is None:
|
||||||
|
config = TracingConfig.from_env()
|
||||||
|
|
||||||
|
# Override session/user if provided
|
||||||
|
if session_id:
|
||||||
|
config.session_id = session_id
|
||||||
|
if user_id:
|
||||||
|
config.user_id = user_id
|
||||||
|
|
||||||
|
# Get flow name
|
||||||
|
if flow_name is None:
|
||||||
|
flow_name = flow_func.__name__
|
||||||
|
|
||||||
|
tracer = LangfuseTracer(config)
|
||||||
|
|
||||||
|
@functools.wraps(flow_func)
|
||||||
|
def traced_flow_func(*args, **kwargs):
|
||||||
|
# Assume first argument is shared data
|
||||||
|
shared = args[0] if args else {}
|
||||||
|
|
||||||
|
# Start trace
|
||||||
|
trace_id = tracer.start_trace(flow_name, shared)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = flow_func(*args, **kwargs)
|
||||||
|
tracer.end_trace(shared, "success")
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
tracer.end_trace(shared, "error")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
tracer.flush()
|
||||||
|
|
||||||
|
return traced_flow_func
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
"""
|
||||||
|
Utility functions for PocketFlow tracing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .setup import setup_tracing, test_langfuse_connection
|
||||||
|
|
||||||
|
__all__ = ['setup_tracing', 'test_langfuse_connection']
|
||||||
|
|
@ -0,0 +1,163 @@
|
||||||
|
"""
|
||||||
|
Setup and testing utilities for PocketFlow tracing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
# Add parent directory to path for imports
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||||
|
|
||||||
|
try:
|
||||||
|
from langfuse import Langfuse
|
||||||
|
LANGFUSE_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
LANGFUSE_AVAILABLE = False
|
||||||
|
|
||||||
|
from tracing import TracingConfig, LangfuseTracer
|
||||||
|
|
||||||
|
|
||||||
|
def setup_tracing(env_file: Optional[str] = None) -> TracingConfig:
|
||||||
|
"""
|
||||||
|
Set up tracing configuration and validate the setup.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
env_file: Optional path to .env file. If None, uses default location.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
TracingConfig instance.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: If setup fails.
|
||||||
|
"""
|
||||||
|
print("🔧 Setting up PocketFlow tracing...")
|
||||||
|
|
||||||
|
# Check if langfuse is installed
|
||||||
|
if not LANGFUSE_AVAILABLE:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Langfuse package not installed. Install with: pip install langfuse"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Load configuration
|
||||||
|
if env_file:
|
||||||
|
config = TracingConfig.from_env(env_file)
|
||||||
|
print(f"✓ Loaded configuration from: {env_file}")
|
||||||
|
else:
|
||||||
|
config = TracingConfig.from_env()
|
||||||
|
print("✓ Loaded configuration from environment")
|
||||||
|
|
||||||
|
# Validate configuration
|
||||||
|
if not config.validate():
|
||||||
|
raise RuntimeError(
|
||||||
|
"Invalid tracing configuration. Please check your environment variables:\n"
|
||||||
|
"- LANGFUSE_SECRET_KEY\n"
|
||||||
|
"- LANGFUSE_PUBLIC_KEY\n"
|
||||||
|
"- LANGFUSE_HOST"
|
||||||
|
)
|
||||||
|
|
||||||
|
print("✓ Configuration validated")
|
||||||
|
|
||||||
|
# Test connection
|
||||||
|
if test_langfuse_connection(config):
|
||||||
|
print("✓ Langfuse connection successful")
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Failed to connect to Langfuse. Check your configuration and network.")
|
||||||
|
|
||||||
|
print("🎉 PocketFlow tracing setup complete!")
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
|
def test_langfuse_connection(config: TracingConfig) -> bool:
|
||||||
|
"""
|
||||||
|
Test connection to Langfuse.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: TracingConfig instance.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if connection successful, False otherwise.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Create a test tracer
|
||||||
|
tracer = LangfuseTracer(config)
|
||||||
|
|
||||||
|
if not tracer.client:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Try to start and end a test trace
|
||||||
|
trace_id = tracer.start_trace("test_connection", {"test": True})
|
||||||
|
if trace_id:
|
||||||
|
tracer.end_trace({"test": "completed"}, "success")
|
||||||
|
tracer.flush()
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if config.debug:
|
||||||
|
print(f"Connection test failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def print_configuration_help():
|
||||||
|
"""Print help information for configuring tracing."""
|
||||||
|
print("""
|
||||||
|
🔧 PocketFlow Tracing Configuration Help
|
||||||
|
|
||||||
|
To use PocketFlow tracing, you need to configure Langfuse credentials.
|
||||||
|
|
||||||
|
1. Create or update your .env file with:
|
||||||
|
|
||||||
|
LANGFUSE_SECRET_KEY=your-secret-key
|
||||||
|
LANGFUSE_PUBLIC_KEY=your-public-key
|
||||||
|
LANGFUSE_HOST=your-langfuse-host
|
||||||
|
POCKETFLOW_TRACING_DEBUG=true
|
||||||
|
|
||||||
|
2. Optional configuration:
|
||||||
|
|
||||||
|
POCKETFLOW_TRACE_INPUTS=true
|
||||||
|
POCKETFLOW_TRACE_OUTPUTS=true
|
||||||
|
POCKETFLOW_TRACE_PREP=true
|
||||||
|
POCKETFLOW_TRACE_EXEC=true
|
||||||
|
POCKETFLOW_TRACE_POST=true
|
||||||
|
POCKETFLOW_TRACE_ERRORS=true
|
||||||
|
POCKETFLOW_SESSION_ID=your-session-id
|
||||||
|
POCKETFLOW_USER_ID=your-user-id
|
||||||
|
|
||||||
|
3. Install required packages:
|
||||||
|
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
4. Test your setup:
|
||||||
|
|
||||||
|
python -c "from utils import setup_tracing; setup_tracing()"
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
"""Command-line interface for setup and testing."""
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="PocketFlow Tracing Setup")
|
||||||
|
parser.add_argument("--test", action="store_true", help="Test Langfuse connection")
|
||||||
|
parser.add_argument("--help-config", action="store_true", help="Show configuration help")
|
||||||
|
parser.add_argument("--env-file", type=str, help="Path to .env file")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.help_config:
|
||||||
|
print_configuration_help()
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
if args.test:
|
||||||
|
try:
|
||||||
|
config = setup_tracing(args.env_file)
|
||||||
|
print("\n✅ All tests passed! Your tracing setup is ready.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n❌ Setup failed: {e}")
|
||||||
|
print("\nFor help with configuration, run:")
|
||||||
|
print("python utils/setup.py --help-config")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
print_configuration_help()
|
||||||
|
|
@ -140,3 +140,5 @@ data_science_flow.run({})
|
||||||
```
|
```
|
||||||
|
|
||||||
The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']`
|
The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']`
|
||||||
|
|
||||||
|
For a more complete implementation, check out [the cookbook](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-tracing).
|
||||||
Loading…
Reference in New Issue