update nodes

This commit is contained in:
zachary62 2025-05-26 16:43:54 -04:00
parent 9f84788063
commit b8817f08d1
5 changed files with 46 additions and 166 deletions

View File

@ -27,8 +27,6 @@ For real LLM responses, set your OpenAI API key:
export OPENAI_API_KEY="your-api-key-here" export OPENAI_API_KEY="your-api-key-here"
``` ```
**Note**: The app works without an API key using fake streaming responses for testing.
### 3. Run the Application ### 3. Run the Application
```bash ```bash

View File

@ -10,13 +10,13 @@
**User Story**: As a user, I want to interact with an AI chatbot through a web interface where: **User Story**: As a user, I want to interact with an AI chatbot through a web interface where:
1. I can send messages and receive real-time streaming responses 1. I can send messages and receive real-time streaming responses
2. The connection stays persistent (WebSocket) 2. The connection stays persistent (WebSocket)
3. I can see the AI response being typed out in real-time 3. I can see the AI response being typed out in real-time as the LLM generates it
4. The interface is minimal and easy to use 4. The interface is minimal and easy to use
**Technical Requirements**: **Technical Requirements**:
- FastAPI backend with WebSocket support - FastAPI backend with WebSocket support
- Real-time bidirectional communication - Real-time bidirectional communication
- LLM streaming integration using PocketFlow - True LLM streaming integration using PocketFlow AsyncNode
- Simple HTML/JavaScript frontend - Simple HTML/JavaScript frontend
- Minimal dependencies - Minimal dependencies
@ -28,19 +28,19 @@
### Applicable Design Pattern: ### Applicable Design Pattern:
**Single Node Pattern**: One PocketFlow node handles the entire LLM streaming process **Single Async Node Pattern**: One PocketFlow AsyncNode handles the entire LLM streaming process with real-time WebSocket streaming
### Flow high-level Design: ### Flow high-level Design:
**PocketFlow Flow**: Just one node **PocketFlow AsyncFlow**: Just one async node
1. **Streaming Chat Node**: Processes message, calls LLM, streams response 1. **Streaming Chat Node**: Processes message, calls LLM with real streaming, sends chunks immediately to WebSocket
**Integration**: FastAPI WebSocket endpoint calls the PocketFlow flow **Integration**: FastAPI WebSocket endpoint calls the PocketFlow AsyncFlow
```mermaid ```mermaid
flowchart TD flowchart TD
user((User Browser)) --> websocket(FastAPI WebSocket) user((User Browser)) --> websocket(FastAPI WebSocket)
websocket --> flow[Streaming Chat Node] websocket --> flow[Streaming Chat AsyncNode]
flow --> websocket flow --> websocket
websocket --> user websocket --> user
@ -56,9 +56,9 @@ flowchart TD
> 2. Include only the necessary utility functions, based on nodes in the flow. > 2. Include only the necessary utility functions, based on nodes in the flow.
1. **Stream LLM** (`utils/stream_llm.py`) 1. **Stream LLM** (`utils/stream_llm.py`)
- *Input*: prompt (str) - *Input*: messages (list of chat history)
- *Output*: streaming response chunks - *Output*: generator yielding real-time response chunks from OpenAI API
- Used by streaming chat node to get LLM chunks - Used by streaming chat node to get LLM chunks as they're generated
## Node Design ## Node Design
@ -72,7 +72,7 @@ The shared store structure is organized as follows:
shared = { shared = {
"websocket": None, # WebSocket connection object "websocket": None, # WebSocket connection object
"user_message": "", # Current user message "user_message": "", # Current user message
"conversation_history": [] # List of message history "conversation_history": [] # List of message history with roles
} }
``` ```
@ -81,9 +81,9 @@ shared = {
> Notes for AI: Carefully decide whether to use Batch/Async Node/Flow. > Notes for AI: Carefully decide whether to use Batch/Async Node/Flow.
1. **Streaming Chat Node** 1. **Streaming Chat Node**
- *Purpose*: Process user message, call LLM with streaming, and send chunks via WebSocket - *Purpose*: Process user message, call LLM with real streaming, and send chunks immediately via WebSocket
- *Type*: Regular Node - *Type*: AsyncNode (for real-time streaming)
- *Steps*: - *Steps*:
- *prep*: Read user message and conversation history, format prompt - *prep*: Read user message, build conversation history with new message
- *exec*: Call streaming LLM utility - *exec_async*: Call streaming LLM utility, stream each chunk immediately to WebSocket as received
- *post*: Stream chunks via WebSocket and update conversation history - *post*: Update conversation history with complete assistant response

View File

@ -1,6 +1,6 @@
from pocketflow import Flow from pocketflow import AsyncFlow
from nodes import StreamingChatNode from nodes import StreamingChatNode
def create_streaming_chat_flow(): def create_streaming_chat_flow():
chat_node = StreamingChatNode() chat_node = StreamingChatNode()
return Flow(start=chat_node) return AsyncFlow(start=chat_node)

View File

@ -1,102 +1,36 @@
import asyncio
import logging
import json import json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from flow import create_streaming_chat_flow from flow import create_streaming_chat_flow
from nodes import StreamingChatNode
# Set up logging app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="PocketFlow Chat Interface", version="1.0.0")
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static") app.mount("/static", StaticFiles(directory="static"), name="static")
# Store active connections (in production, use Redis or similar)
active_connections: dict = {}
@app.get("/") @app.get("/")
async def get_chat_interface(): async def get_chat_interface():
return FileResponse("static/index.html") return FileResponse("static/index.html")
@app.websocket("/ws") @app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket): async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for chat functionality
"""
await websocket.accept() await websocket.accept()
connection_id = id(websocket)
# Initialize shared store for this connection
shared_store = {
"websocket": websocket,
"user_message": "",
"conversation_history": []
}
active_connections[connection_id] = shared_store
logger.info(f"New WebSocket connection: {connection_id}")
try: try:
while True: while True:
# Receive message from client
data = await websocket.receive_text() data = await websocket.receive_text()
logger.info(f"Received message: {data}") message = json.loads(data)
# Parse the message shared_store = {
try: "websocket": websocket,
parsed_message = json.loads(data) "user_message": message.get("content", "")
message_type = parsed_message.get("type", "message") }
content = parsed_message.get("content", "")
except json.JSONDecodeError:
# If not JSON, treat as plain text message
message_type = "message"
content = data
if message_type == "message":
# Store user message in shared store
shared_store["user_message"] = content
# Process message through PocketFlow
try:
flow = create_streaming_chat_flow() flow = create_streaming_chat_flow()
action = flow.run(shared_store) await flow.run_async(shared_store)
# Handle streaming if chunks are available
if "response_chunks" in shared_store:
chunks = shared_store["response_chunks"]
chat_node = StreamingChatNode()
full_response = await chat_node.stream_response(chunks, websocket)
# Add AI response to conversation history
shared_store["conversation_history"].append({
"role": "assistant",
"content": full_response
})
logger.info(f"Processed message, response length: {len(full_response)}")
except Exception as e:
logger.error(f"Error processing message: {e}")
await websocket.send_text(json.dumps({
"type": "error",
"content": f"Processing error: {str(e)}"
}))
except WebSocketDisconnect: except WebSocketDisconnect:
logger.info(f"WebSocket disconnected: {connection_id}") pass
if connection_id in active_connections:
del active_connections[connection_id]
except Exception as e:
logger.error(f"WebSocket error: {e}")
if connection_id in active_connections:
del active_connections[connection_id]
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
print("🚀 Starting PocketFlow Chat Interface...")
print("📱 Open http://localhost:8000 in your browser")
uvicorn.run(app, host="0.0.0.0", port=8000) uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -1,92 +1,40 @@
import asyncio import asyncio
import json import json
from pocketflow import Node from pocketflow import AsyncNode
from utils.stream_llm import stream_llm from utils.stream_llm import stream_llm
class StreamingChatNode(Node): class StreamingChatNode(AsyncNode):
"""
Single node that processes user message and streams LLM response via WebSocket
"""
def prep(self, shared): def prep(self, shared):
user_message = shared.get("user_message", "") user_message = shared.get("user_message", "")
conversation_history = shared.get("conversation_history", [])
websocket = shared.get("websocket") websocket = shared.get("websocket")
# Build messages for OpenAI format conversation_history = shared.get("conversation_history", [])
messages = [] conversation_history.append({"role": "user", "content": user_message})
# Add system message return conversation_history, websocket
messages.append({
"role": "system",
"content": "You are a helpful AI assistant. Please respond naturally and helpfully to user queries."
})
# Add conversation history (keep last 10 messages) async def exec_async(self, prep_res):
for msg in conversation_history[-10:]: messages, websocket = prep_res
messages.append(msg)
# Add current user message
messages.append({
"role": "user",
"content": user_message
})
return messages, websocket, user_message
def exec(self, prep_res):
messages, websocket, user_message = prep_res
# Get streaming response from LLM
response_chunks = stream_llm(messages)
return response_chunks, websocket, user_message
async def stream_response(self, chunks, websocket):
"""
Stream LLM response chunks to WebSocket
"""
full_response = ""
try:
# Send start indicator
await websocket.send_text(json.dumps({"type": "start", "content": ""})) await websocket.send_text(json.dumps({"type": "start", "content": ""}))
# Stream each chunk full_response = ""
for chunk_content in chunks: for chunk_content in stream_llm(messages):
full_response += chunk_content full_response += chunk_content
# Send chunk via WebSocket
await websocket.send_text(json.dumps({ await websocket.send_text(json.dumps({
"type": "chunk", "type": "chunk",
"content": chunk_content "content": chunk_content
})) }))
# Add small delay to simulate real streaming
await asyncio.sleep(0.05)
# Send end indicator
await websocket.send_text(json.dumps({"type": "end", "content": ""})) await websocket.send_text(json.dumps({"type": "end", "content": ""}))
except Exception as e: return full_response, websocket
await websocket.send_text(json.dumps({
"type": "error",
"content": f"Streaming error: {str(e)}"
}))
print(f"Streaming error: {e}")
return full_response
def post(self, shared, prep_res, exec_res): def post(self, shared, prep_res, exec_res):
chunks, websocket, user_message = exec_res full_response, websocket = exec_res
# Store the chunks and websocket for async processing conversation_history = shared.get("conversation_history", [])
shared["response_chunks"] = chunks conversation_history.append({"role": "assistant", "content": full_response})
shared["websocket"] = websocket shared["conversation_history"] = conversation_history
# Add user message to conversation history
shared["conversation_history"].append({
"role": "user",
"content": user_message
})
return "stream" return "stream"