diff --git a/cookbook/pocketflow-fastapi-websocket/README.md b/cookbook/pocketflow-fastapi-websocket/README.md new file mode 100644 index 0000000..8ec9ad0 --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/README.md @@ -0,0 +1,137 @@ +# PocketFlow FastAPI WebSocket Chat Interface + +A minimal real-time chat interface built with FastAPI, WebSocket, and PocketFlow that supports streaming LLM responses. + +## Features + +- 🚀 **Real-time Communication**: WebSocket-based bidirectional communication +- 📡 **Streaming Responses**: See AI responses being typed out in real-time +- 🔄 **Persistent Connection**: Stay connected throughout the conversation +- 💬 **Conversation History**: Maintains context across messages +- 🎨 **Modern UI**: Clean, responsive chat interface +- 🛠️ **Minimal Dependencies**: Built with minimal, production-ready dependencies + +## Quick Start + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 2. Set Up OpenAI API Key (Optional) + +For real LLM responses, set your OpenAI API key: + +```bash +export OPENAI_API_KEY="your-api-key-here" +``` + +**Note**: The app works without an API key using fake streaming responses for testing. + +### 3. Run the Application + +```bash +python main.py +``` + +### 4. Open in Browser + +Navigate to: `http://localhost:8000` + +## Architecture + +This application uses a **simplified single-node pattern** with PocketFlow: + +```mermaid +flowchart TD + websocket[FastAPI WebSocket] --> stream[Streaming Chat Node] + stream --> websocket +``` + +### Components + +- **FastAPI**: Web framework with WebSocket support +- **PocketFlow**: Single node handles message processing and LLM streaming +- **Streaming LLM**: Real-time response generation + +### File Structure + +``` +cookbook/pocketflow-fastapi-websocket/ +├── main.py # FastAPI application with WebSocket endpoint +├── nodes.py # Single PocketFlow node for chat processing +├── flow.py # Simple flow with one node +├── utils/ +│ └── stream_llm.py # LLM streaming utilities +├── requirements.txt # Dependencies +├── README.md # This file +└── docs/ + └── design.md # Detailed design documentation +``` + +## Usage + +1. **Start a Conversation**: Type a message and press Enter or click Send +2. **Watch Streaming**: See the AI response appear in real-time +3. **Continue Chatting**: The conversation maintains context automatically +4. **Multiple Users**: Each WebSocket connection has its own conversation + +## Development + +### Using Real OpenAI API + +To use real OpenAI API instead of fake responses: + +1. Set your API key: `export OPENAI_API_KEY="your-key"` +2. In `nodes.py`, change line 35 from `fake_stream_llm(formatted_prompt)` to `stream_llm(formatted_prompt)` + +### Testing + +Test the PocketFlow logic without WebSocket: + +```bash +python test_flow.py +``` + +Test the streaming utility: + +```bash +cd utils +python stream_llm.py +``` + +### Customization + +- **Modify System Prompt**: Edit the system prompt in `nodes.py` StreamingChatNode +- **Change UI**: Update the HTML template in `main.py` +- **Add Features**: Extend the single node or add new nodes to the flow + +## Why This Simple Design? + +This implementation demonstrates PocketFlow's philosophy of **minimal complexity**: + +- **Single Node**: One node handles message processing, LLM calls, and streaming +- **No Utility Bloat**: Direct JSON handling instead of wrapper functions +- **Clear Separation**: FastAPI handles WebSocket, PocketFlow handles LLM logic +- **Easy to Extend**: Simple to add features like RAG, agents, or multi-step workflows + +## Production Considerations + +- **Connection Management**: Use Redis or database for connection storage +- **Rate Limiting**: Add rate limiting for API calls +- **Error Handling**: Enhance error handling and user feedback +- **Authentication**: Add user authentication if needed +- **Scaling**: Use multiple workers with proper session management + +## Technology Stack + +- **Backend**: FastAPI + WebSocket +- **Frontend**: Pure HTML/CSS/JavaScript +- **AI Framework**: PocketFlow (single node) +- **LLM**: OpenAI GPT-4 +- **Real-time**: WebSocket with streaming + +## License + +MIT License \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/docs/design.md b/cookbook/pocketflow-fastapi-websocket/docs/design.md new file mode 100644 index 0000000..0b1b0e4 --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/docs/design.md @@ -0,0 +1,89 @@ +# Design Doc: FastAPI WebSocket Chat Interface + +> Please DON'T remove notes for AI + +## Requirements + +> Notes for AI: Keep it simple and clear. +> If the requirements are abstract, write concrete user stories + +**User Story**: As a user, I want to interact with an AI chatbot through a web interface where: +1. I can send messages and receive real-time streaming responses +2. The connection stays persistent (WebSocket) +3. I can see the AI response being typed out in real-time +4. The interface is minimal and easy to use + +**Technical Requirements**: +- FastAPI backend with WebSocket support +- Real-time bidirectional communication +- LLM streaming integration using PocketFlow +- Simple HTML/JavaScript frontend +- Minimal dependencies + +## Flow Design + +> Notes for AI: +> 1. Consider the design patterns of agent, map-reduce, rag, and workflow. Apply them if they fit. +> 2. Present a concise, high-level description of the workflow. + +### Applicable Design Pattern: + +**Single Node Pattern**: One PocketFlow node handles the entire LLM streaming process + +### Flow high-level Design: + +**PocketFlow Flow**: Just one node +1. **Streaming Chat Node**: Processes message, calls LLM, streams response + +**Integration**: FastAPI WebSocket endpoint calls the PocketFlow flow + +```mermaid +flowchart TD + user((User Browser)) --> websocket(FastAPI WebSocket) + websocket --> flow[Streaming Chat Node] + flow --> websocket + websocket --> user + + style user fill:#e1f5fe + style websocket fill:#f3e5f5 + style flow fill:#e8f5e8,stroke:#4caf50,stroke-width:3px +``` + +## Utility Functions + +> Notes for AI: +> 1. Understand the utility function definition thoroughly by reviewing the doc. +> 2. Include only the necessary utility functions, based on nodes in the flow. + +1. **Stream LLM** (`utils/stream_llm.py`) + - *Input*: prompt (str) + - *Output*: streaming response chunks + - Used by streaming chat node to get LLM chunks + +## Node Design + +### Shared Store + +> Notes for AI: Try to minimize data redundancy + +The shared store structure is organized as follows: + +```python +shared = { + "websocket": None, # WebSocket connection object + "user_message": "", # Current user message + "conversation_history": [] # List of message history +} +``` + +### Node Steps + +> Notes for AI: Carefully decide whether to use Batch/Async Node/Flow. + +1. **Streaming Chat Node** + - *Purpose*: Process user message, call LLM with streaming, and send chunks via WebSocket + - *Type*: Regular Node + - *Steps*: + - *prep*: Read user message and conversation history, format prompt + - *exec*: Call streaming LLM utility + - *post*: Stream chunks via WebSocket and update conversation history diff --git a/cookbook/pocketflow-fastapi-websocket/flow.py b/cookbook/pocketflow-fastapi-websocket/flow.py new file mode 100644 index 0000000..5cb4dd6 --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/flow.py @@ -0,0 +1,6 @@ +from pocketflow import Flow +from nodes import StreamingChatNode + +def create_streaming_chat_flow(): + chat_node = StreamingChatNode() + return Flow(start=chat_node) \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/main.py b/cookbook/pocketflow-fastapi-websocket/main.py new file mode 100644 index 0000000..ffef702 --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/main.py @@ -0,0 +1,102 @@ +import asyncio +import logging +import json +from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +from flow import create_streaming_chat_flow +from nodes import StreamingChatNode + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +app = FastAPI(title="PocketFlow Chat Interface", version="1.0.0") + +# Mount static files +app.mount("/static", StaticFiles(directory="static"), name="static") + +# Store active connections (in production, use Redis or similar) +active_connections: dict = {} + +@app.get("/") +async def get_chat_interface(): + return FileResponse("static/index.html") + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """ + WebSocket endpoint for chat functionality + """ + await websocket.accept() + connection_id = id(websocket) + + # Initialize shared store for this connection + shared_store = { + "websocket": websocket, + "user_message": "", + "conversation_history": [] + } + + active_connections[connection_id] = shared_store + logger.info(f"New WebSocket connection: {connection_id}") + + try: + while True: + # Receive message from client + data = await websocket.receive_text() + logger.info(f"Received message: {data}") + + # Parse the message + try: + parsed_message = json.loads(data) + message_type = parsed_message.get("type", "message") + content = parsed_message.get("content", "") + except json.JSONDecodeError: + # If not JSON, treat as plain text message + message_type = "message" + content = data + + if message_type == "message": + # Store user message in shared store + shared_store["user_message"] = content + + # Process message through PocketFlow + try: + flow = create_streaming_chat_flow() + action = flow.run(shared_store) + + # Handle streaming if chunks are available + if "response_chunks" in shared_store: + chunks = shared_store["response_chunks"] + chat_node = StreamingChatNode() + full_response = await chat_node.stream_response(chunks, websocket) + + # Add AI response to conversation history + shared_store["conversation_history"].append({ + "role": "assistant", + "content": full_response + }) + + logger.info(f"Processed message, response length: {len(full_response)}") + except Exception as e: + logger.error(f"Error processing message: {e}") + await websocket.send_text(json.dumps({ + "type": "error", + "content": f"Processing error: {str(e)}" + })) + + except WebSocketDisconnect: + logger.info(f"WebSocket disconnected: {connection_id}") + if connection_id in active_connections: + del active_connections[connection_id] + except Exception as e: + logger.error(f"WebSocket error: {e}") + if connection_id in active_connections: + del active_connections[connection_id] + +if __name__ == "__main__": + import uvicorn + print("🚀 Starting PocketFlow Chat Interface...") + print("📱 Open http://localhost:8000 in your browser") + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/nodes.py b/cookbook/pocketflow-fastapi-websocket/nodes.py new file mode 100644 index 0000000..74f270f --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/nodes.py @@ -0,0 +1,92 @@ +import asyncio +import json +from pocketflow import Node +from utils.stream_llm import stream_llm + +class StreamingChatNode(Node): + """ + Single node that processes user message and streams LLM response via WebSocket + """ + def prep(self, shared): + user_message = shared.get("user_message", "") + conversation_history = shared.get("conversation_history", []) + websocket = shared.get("websocket") + + # Build messages for OpenAI format + messages = [] + + # Add system message + messages.append({ + "role": "system", + "content": "You are a helpful AI assistant. Please respond naturally and helpfully to user queries." + }) + + # Add conversation history (keep last 10 messages) + for msg in conversation_history[-10:]: + messages.append(msg) + + # Add current user message + messages.append({ + "role": "user", + "content": user_message + }) + + return messages, websocket, user_message + + def exec(self, prep_res): + messages, websocket, user_message = prep_res + + # Get streaming response from LLM + response_chunks = stream_llm(messages) + + return response_chunks, websocket, user_message + + async def stream_response(self, chunks, websocket): + """ + Stream LLM response chunks to WebSocket + """ + full_response = "" + + try: + # Send start indicator + await websocket.send_text(json.dumps({"type": "start", "content": ""})) + + # Stream each chunk + for chunk_content in chunks: + full_response += chunk_content + + # Send chunk via WebSocket + await websocket.send_text(json.dumps({ + "type": "chunk", + "content": chunk_content + })) + + # Add small delay to simulate real streaming + await asyncio.sleep(0.05) + + # Send end indicator + await websocket.send_text(json.dumps({"type": "end", "content": ""})) + + except Exception as e: + await websocket.send_text(json.dumps({ + "type": "error", + "content": f"Streaming error: {str(e)}" + })) + print(f"Streaming error: {e}") + + return full_response + + def post(self, shared, prep_res, exec_res): + chunks, websocket, user_message = exec_res + + # Store the chunks and websocket for async processing + shared["response_chunks"] = chunks + shared["websocket"] = websocket + + # Add user message to conversation history + shared["conversation_history"].append({ + "role": "user", + "content": user_message + }) + + return "stream" \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/requirements.txt b/cookbook/pocketflow-fastapi-websocket/requirements.txt new file mode 100644 index 0000000..e8980a8 --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +openai==1.3.8 +pocketflow \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/static/index.html b/cookbook/pocketflow-fastapi-websocket/static/index.html new file mode 100644 index 0000000..e010b20 --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/static/index.html @@ -0,0 +1,184 @@ + + + + PocketFlow Chat + + + +
+

🤖 PocketFlow Chat Interface

+
Connecting...
+
+
+ + +
+
+ + + + \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/utils/__init__.py b/cookbook/pocketflow-fastapi-websocket/utils/__init__.py new file mode 100644 index 0000000..ddf2269 --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/utils/__init__.py @@ -0,0 +1 @@ +# Utils package for FastAPI WebSocket Chat Interface \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-websocket/utils/stream_llm.py b/cookbook/pocketflow-fastapi-websocket/utils/stream_llm.py new file mode 100644 index 0000000..61717c4 --- /dev/null +++ b/cookbook/pocketflow-fastapi-websocket/utils/stream_llm.py @@ -0,0 +1,22 @@ +import os +from openai import OpenAI + +def stream_llm(messages): + client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "your-api-key")) + + stream = client.chat.completions.create( + model="gpt-4o-mini", + messages=messages, + stream=True, + temperature=0.7 + ) + + for chunk in stream: + if chunk.choices[0].delta.content is not None: + yield chunk.choices[0].delta.content + +if __name__ == "__main__": + messages = [{"role": "user", "content": "Hello!"}] + for chunk in stream_llm(messages): + print(chunk, end="", flush=True) + print() \ No newline at end of file