198 lines
5.1 KiB
Python
198 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for PocketFlow tracing functionality.
|
|
|
|
This script tests the tracing implementation to ensure it works correctly
|
|
with Langfuse integration.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Add paths for imports
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
sys.path.insert(0, os.path.dirname(__file__))
|
|
|
|
from pocketflow import Node, Flow, AsyncNode, AsyncFlow
|
|
from tracing import trace_flow, TracingConfig
|
|
from utils import setup_tracing
|
|
|
|
|
|
class TestNode(Node):
|
|
"""Simple test node for tracing verification."""
|
|
|
|
def prep(self, shared):
|
|
"""Test prep phase."""
|
|
return shared.get("input", "test_input")
|
|
|
|
def exec(self, prep_res):
|
|
"""Test exec phase."""
|
|
return f"processed_{prep_res}"
|
|
|
|
def post(self, shared, prep_res, exec_res):
|
|
"""Test post phase."""
|
|
shared["output"] = exec_res
|
|
return "default"
|
|
|
|
|
|
class TestAsyncNode(AsyncNode):
|
|
"""Simple async test node for tracing verification."""
|
|
|
|
async def prep_async(self, shared):
|
|
"""Test async prep phase."""
|
|
await asyncio.sleep(0.1) # Simulate async work
|
|
return shared.get("input", "async_test_input")
|
|
|
|
async def exec_async(self, prep_res):
|
|
"""Test async exec phase."""
|
|
await asyncio.sleep(0.1) # Simulate async work
|
|
return f"async_processed_{prep_res}"
|
|
|
|
async def post_async(self, shared, prep_res, exec_res):
|
|
"""Test async post phase."""
|
|
shared["output"] = exec_res
|
|
return "default"
|
|
|
|
|
|
@trace_flow(flow_name="TestSyncFlow")
|
|
class TestSyncFlow(Flow):
|
|
"""Test synchronous flow with tracing."""
|
|
|
|
def __init__(self):
|
|
super().__init__(start=TestNode())
|
|
|
|
|
|
@trace_flow(flow_name="TestAsyncFlow")
|
|
class TestAsyncFlow(AsyncFlow):
|
|
"""Test asynchronous flow with tracing."""
|
|
|
|
def __init__(self):
|
|
super().__init__(start=TestAsyncNode())
|
|
|
|
|
|
def test_sync_flow():
|
|
"""Test synchronous flow tracing."""
|
|
print("🧪 Testing synchronous flow tracing...")
|
|
|
|
flow = TestSyncFlow()
|
|
shared = {"input": "sync_test_data"}
|
|
|
|
print(f" Input: {shared}")
|
|
result = flow.run(shared)
|
|
print(f" Output: {shared}")
|
|
print(f" Result: {result}")
|
|
|
|
# Verify the flow worked
|
|
assert "output" in shared
|
|
assert shared["output"] == "processed_sync_test_data"
|
|
print(" ✅ Sync flow test passed")
|
|
|
|
|
|
async def test_async_flow():
|
|
"""Test asynchronous flow tracing."""
|
|
print("🧪 Testing asynchronous flow tracing...")
|
|
|
|
flow = TestAsyncFlow()
|
|
shared = {"input": "async_test_data"}
|
|
|
|
print(f" Input: {shared}")
|
|
result = await flow.run_async(shared)
|
|
print(f" Output: {shared}")
|
|
print(f" Result: {result}")
|
|
|
|
# Verify the flow worked
|
|
assert "output" in shared
|
|
assert shared["output"] == "async_processed_async_test_data"
|
|
print(" ✅ Async flow test passed")
|
|
|
|
|
|
def test_configuration():
|
|
"""Test configuration loading and validation."""
|
|
print("🧪 Testing configuration...")
|
|
|
|
# Test loading from environment
|
|
config = TracingConfig.from_env()
|
|
print(f" Loaded config: debug={config.debug}")
|
|
|
|
# Test validation
|
|
is_valid = config.validate()
|
|
print(f" Config valid: {is_valid}")
|
|
|
|
if is_valid:
|
|
print(" ✅ Configuration test passed")
|
|
else:
|
|
print(
|
|
" ⚠️ Configuration test failed (this may be expected if env vars not set)"
|
|
)
|
|
|
|
|
|
def test_error_handling():
|
|
"""Test error handling in traced flows."""
|
|
print("🧪 Testing error handling...")
|
|
|
|
class ErrorNode(Node):
|
|
def exec(self, prep_res):
|
|
raise ValueError("Test error for tracing")
|
|
|
|
@trace_flow(flow_name="TestErrorFlow")
|
|
class ErrorFlow(Flow):
|
|
def __init__(self):
|
|
super().__init__(start=ErrorNode())
|
|
|
|
flow = ErrorFlow()
|
|
shared = {"input": "error_test"}
|
|
|
|
try:
|
|
flow.run(shared)
|
|
print(" ❌ Expected error but flow succeeded")
|
|
except ValueError as e:
|
|
print(f" ✅ Error correctly caught and traced: {e}")
|
|
except Exception as e:
|
|
print(f" ⚠️ Unexpected error type: {e}")
|
|
|
|
|
|
async def main():
|
|
"""Run all tests."""
|
|
print("🚀 Starting PocketFlow Tracing Tests")
|
|
print("=" * 50)
|
|
|
|
# Test configuration first
|
|
test_configuration()
|
|
print()
|
|
|
|
# Test setup (optional - only if environment is configured)
|
|
try:
|
|
print("🔧 Testing setup...")
|
|
config = setup_tracing()
|
|
print(" ✅ Setup test passed")
|
|
except Exception as e:
|
|
print(f" ⚠️ Setup test failed: {e}")
|
|
print(" (This is expected if Langfuse is not configured)")
|
|
print()
|
|
|
|
# Test sync flow
|
|
test_sync_flow()
|
|
print()
|
|
|
|
# Test async flow
|
|
await test_async_flow()
|
|
print()
|
|
|
|
# Test error handling
|
|
test_error_handling()
|
|
print()
|
|
|
|
print("🎉 All tests completed!")
|
|
print("\n📊 If Langfuse is configured, check your dashboard for traces:")
|
|
langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host")
|
|
print(f" Dashboard URL: {langfuse_host}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|