diff --git a/cookbook/pocketflow-fastapi-websocket/README.md b/cookbook/pocketflow-fastapi-websocket/README.md index 8ec9ad0..8058e82 100644 --- a/cookbook/pocketflow-fastapi-websocket/README.md +++ b/cookbook/pocketflow-fastapi-websocket/README.md @@ -27,8 +27,6 @@ For real LLM responses, set your OpenAI API key: export OPENAI_API_KEY="your-api-key-here" ``` -**Note**: The app works without an API key using fake streaming responses for testing. - ### 3. Run the Application ```bash diff --git a/cookbook/pocketflow-fastapi-websocket/docs/design.md b/cookbook/pocketflow-fastapi-websocket/docs/design.md index 0b1b0e4..88964d6 100644 --- a/cookbook/pocketflow-fastapi-websocket/docs/design.md +++ b/cookbook/pocketflow-fastapi-websocket/docs/design.md @@ -10,13 +10,13 @@ **User Story**: As a user, I want to interact with an AI chatbot through a web interface where: 1. I can send messages and receive real-time streaming responses 2. The connection stays persistent (WebSocket) -3. I can see the AI response being typed out in real-time +3. I can see the AI response being typed out in real-time as the LLM generates it 4. The interface is minimal and easy to use **Technical Requirements**: - FastAPI backend with WebSocket support - Real-time bidirectional communication -- LLM streaming integration using PocketFlow +- True LLM streaming integration using PocketFlow AsyncNode - Simple HTML/JavaScript frontend - Minimal dependencies @@ -28,19 +28,19 @@ ### Applicable Design Pattern: -**Single Node Pattern**: One PocketFlow node handles the entire LLM streaming process +**Single Async Node Pattern**: One PocketFlow AsyncNode handles the entire LLM streaming process with real-time WebSocket streaming ### Flow high-level Design: -**PocketFlow Flow**: Just one node -1. **Streaming Chat Node**: Processes message, calls LLM, streams response +**PocketFlow AsyncFlow**: Just one async node +1. **Streaming Chat Node**: Processes message, calls LLM with real streaming, sends chunks immediately to WebSocket -**Integration**: FastAPI WebSocket endpoint calls the PocketFlow flow +**Integration**: FastAPI WebSocket endpoint calls the PocketFlow AsyncFlow ```mermaid flowchart TD user((User Browser)) --> websocket(FastAPI WebSocket) - websocket --> flow[Streaming Chat Node] + websocket --> flow[Streaming Chat AsyncNode] flow --> websocket websocket --> user @@ -56,9 +56,9 @@ flowchart TD > 2. Include only the necessary utility functions, based on nodes in the flow. 1. **Stream LLM** (`utils/stream_llm.py`) - - *Input*: prompt (str) - - *Output*: streaming response chunks - - Used by streaming chat node to get LLM chunks + - *Input*: messages (list of chat history) + - *Output*: generator yielding real-time response chunks from OpenAI API + - Used by streaming chat node to get LLM chunks as they're generated ## Node Design @@ -72,7 +72,7 @@ The shared store structure is organized as follows: shared = { "websocket": None, # WebSocket connection object "user_message": "", # Current user message - "conversation_history": [] # List of message history + "conversation_history": [] # List of message history with roles } ``` @@ -81,9 +81,9 @@ shared = { > Notes for AI: Carefully decide whether to use Batch/Async Node/Flow. 1. **Streaming Chat Node** - - *Purpose*: Process user message, call LLM with streaming, and send chunks via WebSocket - - *Type*: Regular Node + - *Purpose*: Process user message, call LLM with real streaming, and send chunks immediately via WebSocket + - *Type*: AsyncNode (for real-time streaming) - *Steps*: - - *prep*: Read user message and conversation history, format prompt - - *exec*: Call streaming LLM utility - - *post*: Stream chunks via WebSocket and update conversation history + - *prep*: Read user message, build conversation history with new message + - *exec_async*: Call streaming LLM utility, stream each chunk immediately to WebSocket as received + - *post*: Update conversation history with complete assistant response diff --git a/cookbook/pocketflow-fastapi-websocket/flow.py b/cookbook/pocketflow-fastapi-websocket/flow.py index 5cb4dd6..08a33a8 100644 --- a/cookbook/pocketflow-fastapi-websocket/flow.py +++ b/cookbook/pocketflow-fastapi-websocket/flow.py @@ -1,6 +1,6 @@ -from pocketflow import Flow +from pocketflow import AsyncFlow from nodes import StreamingChatNode def create_streaming_chat_flow(): chat_node = StreamingChatNode() - return Flow(start=chat_node) \ No newline at end of file + return AsyncFlow(start=chat_node) \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/main.py b/cookbook/pocketflow-fastapi-websocket/main.py index ffef702..5c18f4e 100644 --- a/cookbook/pocketflow-fastapi-websocket/main.py +++ b/cookbook/pocketflow-fastapi-websocket/main.py @@ -1,102 +1,36 @@ -import asyncio -import logging import json from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from flow import create_streaming_chat_flow -from nodes import StreamingChatNode -# Set up logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -app = FastAPI(title="PocketFlow Chat Interface", version="1.0.0") - -# Mount static files +app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") -# Store active connections (in production, use Redis or similar) -active_connections: dict = {} - @app.get("/") async def get_chat_interface(): return FileResponse("static/index.html") @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): - """ - WebSocket endpoint for chat functionality - """ await websocket.accept() - connection_id = id(websocket) - - # Initialize shared store for this connection - shared_store = { - "websocket": websocket, - "user_message": "", - "conversation_history": [] - } - - active_connections[connection_id] = shared_store - logger.info(f"New WebSocket connection: {connection_id}") try: while True: - # Receive message from client data = await websocket.receive_text() - logger.info(f"Received message: {data}") + message = json.loads(data) - # Parse the message - try: - parsed_message = json.loads(data) - message_type = parsed_message.get("type", "message") - content = parsed_message.get("content", "") - except json.JSONDecodeError: - # If not JSON, treat as plain text message - message_type = "message" - content = data + shared_store = { + "websocket": websocket, + "user_message": message.get("content", "") + } - if message_type == "message": - # Store user message in shared store - shared_store["user_message"] = content - - # Process message through PocketFlow - try: - flow = create_streaming_chat_flow() - action = flow.run(shared_store) - - # Handle streaming if chunks are available - if "response_chunks" in shared_store: - chunks = shared_store["response_chunks"] - chat_node = StreamingChatNode() - full_response = await chat_node.stream_response(chunks, websocket) - - # Add AI response to conversation history - shared_store["conversation_history"].append({ - "role": "assistant", - "content": full_response - }) - - logger.info(f"Processed message, response length: {len(full_response)}") - except Exception as e: - logger.error(f"Error processing message: {e}") - await websocket.send_text(json.dumps({ - "type": "error", - "content": f"Processing error: {str(e)}" - })) + flow = create_streaming_chat_flow() + await flow.run_async(shared_store) except WebSocketDisconnect: - logger.info(f"WebSocket disconnected: {connection_id}") - if connection_id in active_connections: - del active_connections[connection_id] - except Exception as e: - logger.error(f"WebSocket error: {e}") - if connection_id in active_connections: - del active_connections[connection_id] + pass if __name__ == "__main__": import uvicorn - print("🚀 Starting PocketFlow Chat Interface...") - print("📱 Open http://localhost:8000 in your browser") uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/nodes.py b/cookbook/pocketflow-fastapi-websocket/nodes.py index 74f270f..040b6fc 100644 --- a/cookbook/pocketflow-fastapi-websocket/nodes.py +++ b/cookbook/pocketflow-fastapi-websocket/nodes.py @@ -1,92 +1,40 @@ import asyncio import json -from pocketflow import Node +from pocketflow import AsyncNode from utils.stream_llm import stream_llm -class StreamingChatNode(Node): - """ - Single node that processes user message and streams LLM response via WebSocket - """ +class StreamingChatNode(AsyncNode): def prep(self, shared): user_message = shared.get("user_message", "") - conversation_history = shared.get("conversation_history", []) websocket = shared.get("websocket") - # Build messages for OpenAI format - messages = [] + conversation_history = shared.get("conversation_history", []) + conversation_history.append({"role": "user", "content": user_message}) - # Add system message - messages.append({ - "role": "system", - "content": "You are a helpful AI assistant. Please respond naturally and helpfully to user queries." - }) - - # Add conversation history (keep last 10 messages) - for msg in conversation_history[-10:]: - messages.append(msg) - - # Add current user message - messages.append({ - "role": "user", - "content": user_message - }) - - return messages, websocket, user_message + return conversation_history, websocket - def exec(self, prep_res): - messages, websocket, user_message = prep_res + async def exec_async(self, prep_res): + messages, websocket = prep_res - # Get streaming response from LLM - response_chunks = stream_llm(messages) + await websocket.send_text(json.dumps({"type": "start", "content": ""})) - return response_chunks, websocket, user_message - - async def stream_response(self, chunks, websocket): - """ - Stream LLM response chunks to WebSocket - """ full_response = "" - - try: - # Send start indicator - await websocket.send_text(json.dumps({"type": "start", "content": ""})) - - # Stream each chunk - for chunk_content in chunks: - full_response += chunk_content - - # Send chunk via WebSocket - await websocket.send_text(json.dumps({ - "type": "chunk", - "content": chunk_content - })) - - # Add small delay to simulate real streaming - await asyncio.sleep(0.05) - - # Send end indicator - await websocket.send_text(json.dumps({"type": "end", "content": ""})) - - except Exception as e: + for chunk_content in stream_llm(messages): + full_response += chunk_content await websocket.send_text(json.dumps({ - "type": "error", - "content": f"Streaming error: {str(e)}" + "type": "chunk", + "content": chunk_content })) - print(f"Streaming error: {e}") - return full_response + await websocket.send_text(json.dumps({"type": "end", "content": ""})) + + return full_response, websocket def post(self, shared, prep_res, exec_res): - chunks, websocket, user_message = exec_res + full_response, websocket = exec_res - # Store the chunks and websocket for async processing - shared["response_chunks"] = chunks - shared["websocket"] = websocket - - # Add user message to conversation history - shared["conversation_history"].append({ - "role": "user", - "content": user_message - }) + conversation_history = shared.get("conversation_history", []) + conversation_history.append({"role": "assistant", "content": full_response}) + shared["conversation_history"] = conversation_history return "stream" \ No newline at end of file