#!/usr/bin/env python3 """ Test script for PocketFlow tracing functionality. This script tests the tracing implementation to ensure it works correctly with Langfuse integration. """ import sys import os import asyncio from dotenv import load_dotenv # Load environment variables load_dotenv() # Add paths for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) sys.path.insert(0, os.path.dirname(__file__)) from pocketflow import Node, Flow, AsyncNode, AsyncFlow from tracing import trace_flow, TracingConfig from utils import setup_tracing class TestNode(Node): """Simple test node for tracing verification.""" def prep(self, shared): """Test prep phase.""" return shared.get("input", "test_input") def exec(self, prep_res): """Test exec phase.""" return f"processed_{prep_res}" def post(self, shared, prep_res, exec_res): """Test post phase.""" shared["output"] = exec_res return "default" class TestAsyncNode(AsyncNode): """Simple async test node for tracing verification.""" async def prep_async(self, shared): """Test async prep phase.""" await asyncio.sleep(0.1) # Simulate async work return shared.get("input", "async_test_input") async def exec_async(self, prep_res): """Test async exec phase.""" await asyncio.sleep(0.1) # Simulate async work return f"async_processed_{prep_res}" async def post_async(self, shared, prep_res, exec_res): """Test async post phase.""" shared["output"] = exec_res return "default" @trace_flow(flow_name="TestSyncFlow") class TestSyncFlow(Flow): """Test synchronous flow with tracing.""" def __init__(self): super().__init__(start=TestNode()) @trace_flow(flow_name="TestAsyncFlow") class TestAsyncFlow(AsyncFlow): """Test asynchronous flow with tracing.""" def __init__(self): super().__init__(start=TestAsyncNode()) def test_sync_flow(): """Test synchronous flow tracing.""" print("๐Ÿงช Testing synchronous flow tracing...") flow = TestSyncFlow() shared = {"input": "sync_test_data"} print(f" Input: {shared}") result = flow.run(shared) print(f" Output: {shared}") print(f" Result: {result}") # Verify the flow worked assert "output" in shared assert shared["output"] == "processed_sync_test_data" print(" โœ… Sync flow test passed") async def test_async_flow(): """Test asynchronous flow tracing.""" print("๐Ÿงช Testing asynchronous flow tracing...") flow = TestAsyncFlow() shared = {"input": "async_test_data"} print(f" Input: {shared}") result = await flow.run_async(shared) print(f" Output: {shared}") print(f" Result: {result}") # Verify the flow worked assert "output" in shared assert shared["output"] == "async_processed_async_test_data" print(" โœ… Async flow test passed") def test_configuration(): """Test configuration loading and validation.""" print("๐Ÿงช Testing configuration...") # Test loading from environment config = TracingConfig.from_env() print(f" Loaded config: debug={config.debug}") # Test validation is_valid = config.validate() print(f" Config valid: {is_valid}") if is_valid: print(" โœ… Configuration test passed") else: print( " โš ๏ธ Configuration test failed (this may be expected if env vars not set)" ) def test_error_handling(): """Test error handling in traced flows.""" print("๐Ÿงช Testing error handling...") class ErrorNode(Node): def exec(self, prep_res): raise ValueError("Test error for tracing") @trace_flow(flow_name="TestErrorFlow") class ErrorFlow(Flow): def __init__(self): super().__init__(start=ErrorNode()) flow = ErrorFlow() shared = {"input": "error_test"} try: flow.run(shared) print(" โŒ Expected error but flow succeeded") except ValueError as e: print(f" โœ… Error correctly caught and traced: {e}") except Exception as e: print(f" โš ๏ธ Unexpected error type: {e}") async def main(): """Run all tests.""" print("๐Ÿš€ Starting PocketFlow Tracing Tests") print("=" * 50) # Test configuration first test_configuration() print() # Test setup (optional - only if environment is configured) try: print("๐Ÿ”ง Testing setup...") config = setup_tracing() print(" โœ… Setup test passed") except Exception as e: print(f" โš ๏ธ Setup test failed: {e}") print(" (This is expected if Langfuse is not configured)") print() # Test sync flow test_sync_flow() print() # Test async flow await test_async_flow() print() # Test error handling test_error_handling() print() print("๐ŸŽ‰ All tests completed!") print("\n๐Ÿ“Š If Langfuse is configured, check your dashboard for traces:") langfuse_host = os.getenv("LANGFUSE_HOST", "your-langfuse-host") print(f" Dashboard URL: {langfuse_host}") if __name__ == "__main__": asyncio.run(main())