diff --git a/README.md b/README.md
index f8260f6..3bdbd82 100644
--- a/README.md
+++ b/README.md
@@ -83,6 +83,7 @@ From there, it's easy to implement popular design patterns like ([Multi-](https:
| [Thinking](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | ★☆☆ *Beginner* | Solve complex reasoning problems through Chain-of-Thought |
| [Memory](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory) | ★☆☆ *Beginner* | A chat bot with short-term and long-term memory |
| [MCP](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-mcp) | ★☆☆ *Beginner* | Agent using Model Context Protocol for numerical operations |
+| [Web HITL](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-web-hitl) | ★☆☆ *Beginner* | A minimal web service for a human review loop with SSE updates |
diff --git a/cookbook/pocketflow-rag/requirements.txt b/cookbook/pocketflow-rag/requirements.txt
index abb8e35..dc90e11 100644
--- a/cookbook/pocketflow-rag/requirements.txt
+++ b/cookbook/pocketflow-rag/requirements.txt
@@ -1,4 +1,4 @@
-pocketflow>=0.0.5
+pocketflow>=0.0.1
numpy>=1.20.0
faiss-cpu>=1.7.0
openai>=1.0.0
\ No newline at end of file
diff --git a/cookbook/pocketflow-web-hitl/README.md b/cookbook/pocketflow-web-hitl/README.md
index acfc7ec..3f3a231 100644
--- a/cookbook/pocketflow-web-hitl/README.md
+++ b/cookbook/pocketflow-web-hitl/README.md
@@ -1,42 +1,82 @@
-# PocketFlow Hello World
+# PocketFlow Web Human-in-the-Loop (HITL) Feedback Service
-Your first PocketFlow application! This simple example demonstrates how to create a basic PocketFlow app from scratch.
+This project demonstrates a minimal web application for human-in-the-loop workflows using PocketFlow, FastAPI, and Server-Sent Events (SSE). Users can submit text, have it processed (simulated), review the output, and approve or reject it, potentially triggering reprocessing until approved.
-## Project Structure
+
+
+
-```
-.
-├── docs/ # Documentation files
-├── utils/ # Utility functions
-├── flow.py # PocketFlow implementation
-├── main.py # Main application entry point
-└── README.md # Project documentation
+## Features
+
+- **Web UI:** Simple interface for submitting tasks and providing feedback.
+- **PocketFlow Workflow:** Manages the process -> review -> result/reprocess logic.
+- **FastAPI Backend:** Serves the UI and handles API requests asynchronously.
+- **Server-Sent Events (SSE):** Provides real-time status updates to the client without polling.
+
+## How to Run
+
+1. Install Dependencies:
+ ```bash
+ pip install -r requirements.txt
+ ```
+
+2. Run the FastAPI Server:
+ Use Uvicorn (or another ASGI server):
+ ```bash
+ uvicorn server:app --reload --port 8000
+ ```
+ *(The `--reload` flag is useful for development.)*
+
+3. Access the Web UI:
+ Open your web browser and navigate to `http://127.0.0.1:8000`.
+
+4. Use the Application:
+ * Enter text into the textarea and click "Submit".
+ * Observe the status updates pushed via SSE.
+ * When prompted ("waiting_for_review"), use the "Approve" or "Reject" buttons.
+ * If rejected, the process loops back. If approved, the final result is displayed.
+
+## How It Works
+
+The application uses PocketFlow to define and execute the feedback loop workflow. FastAPI handles web requests and manages the real-time SSE communication.
+
+**PocketFlow Workflow:**
+
+The core logic is orchestrated by an `AsyncFlow` defined in `flow.py`:
+
+```mermaid
+flowchart TD
+ subgraph FeedbackFlow[MinimalFeedbackFlow]
+ Process[ProcessNode] -- default --> Review[ReviewNode]
+ Review -- approved --> Result[ResultNode]
+ Review -- rejected --> Process
+ end
```
-## Setup
+1. **`ProcessNode`**: Receives input text, calls the minimal `process_task` utility, and stores the output.
+2. **`ReviewNode` (Async)**:
+ * Pushes a "waiting_for_review" status with the processed output to the SSE queue.
+ * Waits asynchronously for an external signal (triggered by the `/feedback` API endpoint).
+ * Based on the received feedback ("approved" or "rejected"), determines the next step in the flow. Stores the result if approved.
+3. **`ResultNode`**: Logs the final approved result.
-1. Create a virtual environment:
-```bash
-python -m venv venv
-source venv/bin/activate # On Windows: venv\Scripts\activate
-```
+**FastAPI & SSE Integration:**
-2. Install dependencies:
-```bash
-pip install -r requirements.txt
-```
+* The `/submit` endpoint creates a unique task, initializes the PocketFlow `shared` state (including an `asyncio.Event` for review and an `asyncio.Queue` for SSE), and schedules the flow execution using `BackgroundTasks`.
+* Nodes within the flow (specifically `ReviewNode`'s prep logic) put status updates onto the task-specific `sse_queue`.
+* The `/stream/{task_id}` endpoint uses `StreamingResponse` to read from the task's `sse_queue` and push formatted status updates to the connected client via Server-Sent Events.
+* The `/feedback/{task_id}` endpoint receives the human's decision, updates the `shared` state, and sets the `asyncio.Event` to unblock the waiting `ReviewNode`.
-3. Run the example:
-```bash
-python main.py
-```
+This setup allows for a decoupled workflow logic (PocketFlow) and web interaction layer (FastAPI), with efficient real-time updates pushed to the user.
-## What This Example Demonstrates
+## Files
-- How to create your first PocketFlow application
-- Basic PocketFlow concepts and usage
-- Simple example of PocketFlow's capabilities
-
-## Additional Resources
-
-- [PocketFlow Documentation](https://the-pocket.github.io/PocketFlow/)
\ No newline at end of file
+- [`server.py`](./server.py): The main FastAPI application handling HTTP requests, SSE, state management, and background task scheduling.
+- [`nodes.py`](./nodes.py): Defines the PocketFlow `Node` classes (`ProcessNode`, `ReviewNode`, `ResultNode`) for the workflow steps.
+- [`flow.py`](./flow.py): Defines the PocketFlow `AsyncFlow` that connects the nodes into the feedback loop.
+- [`utils/process_task.py`](./utils/process_task.py): Contains the minimal simulation function for task processing.
+- [`templates/index.html`](./templates/index.html): The HTML structure for the frontend user interface.
+- [`static/style.css`](./static/style.css): Basic CSS for styling the frontend.
+- [`requirements.txt`](./requirements.txt): Project dependencies (FastAPI, Uvicorn, Jinja2, PocketFlow).
diff --git a/cookbook/pocketflow-web-hitl/assets/banner.png b/cookbook/pocketflow-web-hitl/assets/banner.png
new file mode 100644
index 0000000..f23581c
Binary files /dev/null and b/cookbook/pocketflow-web-hitl/assets/banner.png differ
diff --git a/cookbook/pocketflow-web-hitl/docs/design.md b/cookbook/pocketflow-web-hitl/docs/design.md
index 2a44a36..f95f79d 100644
--- a/cookbook/pocketflow-web-hitl/docs/design.md
+++ b/cookbook/pocketflow-web-hitl/docs/design.md
@@ -4,7 +4,7 @@
* **Goal:** Create a web service for task submission, processing, human review (Approve/Reject loop via UI), and finalization.
* **Interface:** Simple web UI (HTML/JS) for input, status display, and feedback buttons.
-* **Backend:** Flask application using PocketFlow for workflow management.
+* **Backend:** FastAPI using PocketFlow for workflow management.
* **Real-time Updates:** Use Server-Sent Events (SSE) to push status changes (pending, running, waiting_for_review, completed, failed) and intermediate results to the client without page reloads.
* **State:** Use in-memory storage for task state (Warning: Not suitable for production).
@@ -35,7 +35,7 @@ flowchart TD
## 3. Utilities
-For this specific example, the core "utility" is the processing logic itself. Let's simulate it with a simple function. The Flask server acts as the external interface.
+For this specific example, the core "utility" is the processing logic itself. Let's simulate it with a simple function. The FastAPI server acts as the external interface.
* `process_task(input_data)`: A placeholder function. In a real scenario, this might call an LLM (`utils/call_llm.py`).
diff --git a/cookbook/pocketflow-web-hitl/flask_server.py b/cookbook/pocketflow-web-hitl/flask_server.py
deleted file mode 100644
index dc472a2..0000000
--- a/cookbook/pocketflow-web-hitl/flask_server.py
+++ /dev/null
@@ -1,176 +0,0 @@
-import asyncio
-import uuid
-import json
-import os
-from flask import Flask, request, jsonify, render_template, send_from_directory, Response
-from flow import create_feedback_flow
-
-# --- Configuration ---
-template_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'templates'))
-static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'static'))
-app = Flask(__name__, template_folder=template_dir, static_folder=static_dir)
-
-# --- State Management (In-Memory - NOT FOR PRODUCTION) ---
-tasks = {} # task_id -> {"shared": dict, "status": str, "task_obj": asyncio.Task}
-
-# --- Background Flow Runner ---
-async def run_flow_background(task_id, flow, shared):
- """Runs the flow in background, uses queue in shared for SSE."""
- if task_id not in tasks: return # Should not happen
- queue = shared.get("sse_queue")
- if not queue:
- print(f"ERROR: Task {task_id} missing sse_queue in shared store!")
- tasks[task_id]["status"] = "failed"
- # Cannot easily report via SSE if queue is missing
- return
-
- tasks[task_id]["status"] = "running"
- await queue.put({"status": "running"})
- print(f"Task {task_id}: Flow starting.")
-
- final_status = "unknown"
- error_message = None
- try:
- await flow.run_async(shared)
- # Check final state
- if shared.get("final_result") is not None:
- final_status = "completed"
- else:
- # If flow ends without setting final_result (e.g., error before ResultNode)
- final_status = "finished_incomplete"
- print(f"Task {task_id}: Flow finished with status: {final_status}")
-
- except Exception as e:
- final_status = "failed"
- error_message = str(e)
- print(f"Task {task_id}: Flow failed: {e}")
- finally:
- if task_id in tasks:
- tasks[task_id]["status"] = final_status
- final_update = {"status": final_status}
- if final_status == "completed":
- final_update["final_result"] = shared.get("final_result")
- elif error_message:
- final_update["error"] = error_message
- await queue.put(final_update)
- # Signal end of stream
- await queue.put(None)
- print(f"Task {task_id}: Background task ended. Final update put on queue.")
-
-# --- Flask Routes ---
-@app.route('/')
-async def index():
- return render_template('index.html')
-
-@app.route('/static/')
-async def static_files(filename):
- return send_from_directory(app.static_folder, filename)
-
-@app.route('/submit', methods=['POST'])
-async def submit_task():
- if not request.is_json or 'data' not in request.json:
- return jsonify({"error": "Requires JSON with 'data' field"}), 400
-
- task_id = str(uuid.uuid4())
- feedback_event = asyncio.Event()
- status_queue = asyncio.Queue() # Queue for SSE
-
- # Initial shared state for the flow
- shared = {
- "task_input": request.json['data'],
- "processed_output": None,
- "feedback": None,
- "review_event": feedback_event,
- "sse_queue": status_queue, # Make queue accessible to nodes
- "final_result": None,
- "task_id": task_id
- }
-
- flow = create_feedback_flow()
-
- # Store task state
- tasks[task_id] = {
- "shared": shared,
- "status": "pending",
- # "flow": flow, # Not strictly needed if we don't re-use it
- "task_obj": None # Will hold the background task
- }
-
- await status_queue.put({"status": "pending", "task_id": task_id})
-
- # Start flow execution in background
- task_obj = asyncio.create_task(run_flow_background(task_id, flow, shared))
- tasks[task_id]["task_obj"] = task_obj
-
- print(f"Task {task_id}: Submitted.")
- return jsonify({"message": "Task submitted", "task_id": task_id}), 202
-
-@app.route('/feedback/', methods=['POST'])
-async def provide_feedback(task_id):
- if task_id not in tasks:
- return jsonify({"error": "Task not found"}), 404
-
- task_info = tasks[task_id]
- shared = task_info["shared"]
- queue = shared.get("sse_queue")
-
- async def report_error(message, status_code=400):
- print(f"Task {task_id}: Feedback error - {message}")
- if queue: await queue.put({"status": "feedback_error", "error": message})
- return jsonify({"error": message}), status_code
-
- if not request.is_json or 'feedback' not in request.json:
- return await report_error("Requires JSON with 'feedback' field")
- feedback = request.json.get('feedback')
- if feedback not in ["approved", "rejected"]:
- return await report_error("Invalid feedback value")
-
- review_event = shared.get("review_event")
- if not review_event or review_event.is_set():
- return await report_error("Task not awaiting feedback or feedback already sent", 409)
-
- print(f"Task {task_id}: Received feedback: {feedback}")
- if queue: await queue.put({"status": "processing_feedback"})
- tasks[task_id]["status"] = "processing_feedback"
-
- shared["feedback"] = feedback
- review_event.set() # Signal the waiting ReviewNode
-
- return jsonify({"message": f"Feedback '{feedback}' received"}), 200
-
-# --- SSE Endpoint ---
-@app.route('/stream/')
-async def stream(task_id):
- if task_id not in tasks or "sse_queue" not in tasks[task_id]["shared"]:
- return Response("data: {\"status\": \"error\", \"error\": \"Task or queue not found\"}\n\n",
- mimetype='text/event-stream', status=404)
-
- queue = tasks[task_id]["shared"]["sse_queue"]
-
- async def event_generator():
- print(f"SSE Stream: Client connected for {task_id}")
- try:
- while True:
- update = await queue.get()
- if update is None: # Sentinel for end of stream
- print(f"SSE Stream: Sentinel received for {task_id}, closing.")
- yield f"data: {json.dumps({'status': 'stream_closed'})}\n\n"
- break
- sse_data = json.dumps(update)
- print(f"SSE Stream: Sending for {task_id}: {sse_data}")
- yield f"data: {sse_data}\n\n"
- queue.task_done()
- except asyncio.CancelledError:
- print(f"SSE Stream: Client disconnected for {task_id}.")
- finally:
- print(f"SSE Stream: Generator finished for {task_id}.")
- # Optional: Cleanup task entry after stream ends?
- # if task_id in tasks: del tasks[task_id] # Careful if task state needed elsewhere
-
- headers = {'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache'}
- return Response(event_generator(), mimetype='text/event-stream', headers=headers)
-
-# --- Main ---
-# Use an ASGI server like Hypercorn: `hypercorn server:app`
-if __name__ == '__main__':
- print("Run using an ASGI server, e.g., 'hypercorn flask_server:app'")
\ No newline at end of file
diff --git a/cookbook/pocketflow-web-hitl/nodes.py b/cookbook/pocketflow-web-hitl/nodes.py
index aaac399..5b6309d 100644
--- a/cookbook/pocketflow-web-hitl/nodes.py
+++ b/cookbook/pocketflow-web-hitl/nodes.py
@@ -1,4 +1,3 @@
-import asyncio
from pocketflow import Node, AsyncNode
from utils.process_task import process_task
diff --git a/cookbook/pocketflow-web-hitl/requirements.txt b/cookbook/pocketflow-web-hitl/requirements.txt
index aaad2df..9d1ea7b 100644
--- a/cookbook/pocketflow-web-hitl/requirements.txt
+++ b/cookbook/pocketflow-web-hitl/requirements.txt
@@ -1,3 +1,4 @@
pocketflow>=0.0.1
-flask
-hypercorn
\ No newline at end of file
+fastapi
+uvicorn[standard] # ASGI server for FastAPI
+jinja2 # For HTML templating
\ No newline at end of file
diff --git a/cookbook/pocketflow-web-hitl/server.py b/cookbook/pocketflow-web-hitl/server.py
new file mode 100644
index 0000000..e51b611
--- /dev/null
+++ b/cookbook/pocketflow-web-hitl/server.py
@@ -0,0 +1,253 @@
+import asyncio
+import uuid
+import json
+import os
+from fastapi import FastAPI, Request, HTTPException, status, BackgroundTasks # Import BackgroundTasks
+from fastapi.responses import HTMLResponse, StreamingResponse
+from fastapi.staticfiles import StaticFiles
+from fastapi.templating import Jinja2Templates
+from pydantic import BaseModel, Field # Import Pydantic for request/response models
+from typing import Dict, Any, Literal # For type hinting
+
+from flow import create_feedback_flow # PocketFlow imports
+
+# --- Configuration ---
+app = FastAPI(title="Minimal Feedback Loop API")
+
+static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'static'))
+if os.path.isdir(static_dir):
+ app.mount("/static", StaticFiles(directory=static_dir), name="static")
+else:
+ print(f"Warning: Static directory '{static_dir}' not found.")
+
+template_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'templates'))
+if os.path.isdir(template_dir):
+ templates = Jinja2Templates(directory=template_dir)
+else:
+ print(f"Warning: Template directory '{template_dir}' not found.")
+ templates = None
+
+# --- State Management (In-Memory - NOT FOR PRODUCTION) ---
+# Global dictionary to store task state. In production, use Redis, DB, etc.
+tasks: Dict[str, Dict[str, Any]] = {}
+# Structure: task_id -> {"shared": dict, "status": str, "task_obj": asyncio.Task | None}
+
+
+# --- Background Flow Runner ---
+# This function remains mostly the same, as it defines the work to be done.
+# It will be scheduled by FastAPI's BackgroundTasks now.
+async def run_flow_background(task_id: str, flow, shared: Dict[str, Any]):
+ """Runs the flow in background, uses queue in shared for SSE."""
+ # Check if task exists (might have been cancelled/deleted)
+ if task_id not in tasks:
+ print(f"Background task {task_id}: Task not found, aborting.")
+ return
+ queue = shared.get("sse_queue")
+ if not queue:
+ print(f"ERROR: Task {task_id} missing sse_queue in shared store!")
+ tasks[task_id]["status"] = "failed"
+ # Cannot report failure via SSE if queue is missing
+ return
+
+ tasks[task_id]["status"] = "running"
+ await queue.put({"status": "running"})
+ print(f"Task {task_id}: Background flow starting.")
+
+ final_status = "unknown"
+ error_message = None
+ try:
+ # Execute the potentially long-running PocketFlow
+ await flow.run_async(shared)
+
+ # Determine final status based on shared state after flow completion
+ if shared.get("final_result") is not None:
+ final_status = "completed"
+ else:
+ # If flow ends without setting final_result
+ final_status = "finished_incomplete"
+ print(f"Task {task_id}: Flow finished with status: {final_status}")
+
+ except Exception as e:
+ final_status = "failed"
+ error_message = str(e)
+ print(f"Task {task_id}: Flow execution failed: {e}")
+ # Consider logging traceback here in production
+ finally:
+ # Ensure task still exists before updating state
+ if task_id in tasks:
+ tasks[task_id]["status"] = final_status
+ final_update = {"status": final_status}
+ if final_status == "completed":
+ final_update["final_result"] = shared.get("final_result")
+ elif error_message:
+ final_update["error"] = error_message
+ # Put final status update onto the queue
+ await queue.put(final_update)
+
+ # Signal the end of the SSE stream by putting None
+ # Must happen regardless of whether task was deleted mid-run
+ if queue:
+ await queue.put(None)
+ print(f"Task {task_id}: Background task ended. Final update sentinel put on queue.")
+ # Remove the reference to the completed/failed asyncio Task object
+ if task_id in tasks:
+ tasks[task_id]["task_obj"] = None
+
+# --- Pydantic Models for Request/Response Validation ---
+class SubmitRequest(BaseModel):
+ data: str = Field(..., min_length=1, description="Input data for the task")
+
+class SubmitResponse(BaseModel):
+ message: str = "Task submitted"
+ task_id: str
+
+class FeedbackRequest(BaseModel):
+ feedback: Literal["approved", "rejected"] # Use Literal for specific choices
+
+class FeedbackResponse(BaseModel):
+ message: str
+
+# --- FastAPI Routes ---
+@app.get("/", response_class=HTMLResponse, include_in_schema=False)
+async def get_index(request: Request):
+ """Serves the main HTML frontend."""
+ if templates is None:
+ raise HTTPException(status_code=500, detail="Templates directory not configured.")
+ return templates.TemplateResponse("index.html", {"request": request})
+
+@app.post("/submit", response_model=SubmitResponse, status_code=status.HTTP_202_ACCEPTED)
+async def submit_task(
+ submit_request: SubmitRequest, # Use Pydantic model for validation
+ background_tasks: BackgroundTasks # Inject BackgroundTasks instance
+):
+ """
+ Submits a new task. The actual processing runs in the background.
+ Returns immediately with the task ID.
+ """
+ task_id = str(uuid.uuid4())
+ feedback_event = asyncio.Event()
+ status_queue = asyncio.Queue()
+
+ shared = {
+ "task_input": submit_request.data,
+ "processed_output": None,
+ "feedback": None,
+ "review_event": feedback_event,
+ "sse_queue": status_queue,
+ "final_result": None,
+ "task_id": task_id
+ }
+
+ flow = create_feedback_flow()
+
+ # Store task state BEFORE scheduling background task
+ tasks[task_id] = {
+ "shared": shared,
+ "status": "pending",
+ "task_obj": None # Placeholder for the asyncio Task created by BackgroundTasks
+ }
+
+ await status_queue.put({"status": "pending", "task_id": task_id})
+
+ # Schedule the flow execution using FastAPI's BackgroundTasks
+ # This runs AFTER the response has been sent
+ background_tasks.add_task(run_flow_background, task_id, flow, shared)
+ # Note: We don't get a direct reference to the asyncio Task object this way,
+ # which is fine for this minimal example. If cancellation were needed,
+ # managing asyncio.create_task manually would be necessary.
+
+ print(f"Task {task_id}: Submitted, scheduled for background execution.")
+ return SubmitResponse(task_id=task_id)
+
+
+@app.post("/feedback/{task_id}", response_model=FeedbackResponse)
+async def provide_feedback(task_id: str, feedback_request: FeedbackRequest):
+ """Provides feedback (approved/rejected) to potentially unblock a waiting task."""
+ if task_id not in tasks:
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
+
+ task_info = tasks[task_id]
+ shared = task_info["shared"]
+ queue = shared.get("sse_queue")
+ review_event = shared.get("review_event")
+
+ async def report_error(message, status_code=status.HTTP_400_BAD_REQUEST):
+ # Helper to log, put status on queue, and raise HTTP exception
+ print(f"Task {task_id}: Feedback error - {message}")
+ if queue: await queue.put({"status": "feedback_error", "error": message})
+ raise HTTPException(status_code=status_code, detail=message)
+
+ if not review_event:
+ # This indicates an internal setup error if the task exists but has no event
+ await report_error("Task not configured for feedback", status.HTTP_500_INTERNAL_SERVER_ERROR)
+ if review_event.is_set():
+ # Prevent processing feedback multiple times or if the task isn't waiting
+ await report_error("Task not awaiting feedback or feedback already sent", status.HTTP_409_CONFLICT)
+
+ feedback = feedback_request.feedback # Already validated by Pydantic
+ print(f"Task {task_id}: Received feedback via POST: {feedback}")
+
+ # Update status *before* setting the event, so client sees 'processing' first
+ if queue: await queue.put({"status": "processing_feedback", "feedback_value": feedback})
+ tasks[task_id]["status"] = "processing_feedback" # Update central status tracker
+
+ # Store feedback and signal the waiting ReviewNode
+ shared["feedback"] = feedback
+ review_event.set()
+
+ return FeedbackResponse(message=f"Feedback '{feedback}' received")
+
+
+# --- SSE Endpoint ---
+@app.get("/stream/{task_id}")
+async def stream_status(task_id: str):
+ """Streams status updates for a given task using Server-Sent Events."""
+ if task_id not in tasks or "sse_queue" not in tasks[task_id]["shared"]:
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task or queue not found")
+
+ queue = tasks[task_id]["shared"]["sse_queue"]
+
+ async def event_generator():
+ """Yields SSE messages from the task's queue."""
+ print(f"SSE Stream: Client connected for {task_id}")
+ try:
+ while True:
+ # Wait for the next status update from the queue
+ update = await queue.get()
+ if update is None: # Sentinel value indicates end of stream
+ print(f"SSE Stream: Sentinel received for {task_id}, closing stream.")
+ yield f"data: {json.dumps({'status': 'stream_closed'})}\n\n"
+ break
+
+ sse_data = json.dumps(update)
+ print(f"SSE Stream: Sending for {task_id}: {sse_data}")
+ yield f"data: {sse_data}\n\n" # SSE format: "data: \n\n"
+ queue.task_done() # Acknowledge processing the queue item
+
+ except asyncio.CancelledError:
+ # This happens if the client disconnects
+ print(f"SSE Stream: Client disconnected for {task_id}.")
+ except Exception as e:
+ # Log unexpected errors during streaming
+ print(f"SSE Stream: Error in generator for {task_id}: {e}")
+ # Optionally send an error message to the client if possible
+ try:
+ yield f"data: {json.dumps({'status': 'stream_error', 'error': str(e)})}\n\n"
+ except Exception: # Catch errors if yield fails (e.g., connection already closed)
+ pass
+ finally:
+ print(f"SSE Stream: Generator finished for {task_id}.")
+ # Consider cleanup here (e.g., removing task if no longer needed)
+ # if task_id in tasks: del tasks[task_id]
+
+ # Use FastAPI/Starlette's StreamingResponse for SSE
+ headers = {'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}
+ return StreamingResponse(event_generator(), media_type="text/event-stream", headers=headers)
+
+# --- Main Execution Guard (for running with uvicorn) ---
+if __name__ == "__main__":
+ print("Starting FastAPI server using Uvicorn is recommended:")
+ print("uvicorn server:app --reload --host 0.0.0.0 --port 8000")
+ # Example using uvicorn programmatically (less common than CLI)
+ # import uvicorn
+ # uvicorn.run(app, host="0.0.0.0", port=8000)
\ No newline at end of file
diff --git a/cookbook/pocketflow-web-hitl/static/style.css b/cookbook/pocketflow-web-hitl/static/style.css
index c57fe7d..a3a8fc1 100644
--- a/cookbook/pocketflow-web-hitl/static/style.css
+++ b/cookbook/pocketflow-web-hitl/static/style.css
@@ -1,13 +1,137 @@
-/* Minimal functional styling */
-body { font-family: sans-serif; margin: 15px; background-color: #fdfdfd; }
-.container, .status-container { background: #fff; padding: 15px; border: 1px solid #eee; margin-bottom: 15px; border-radius: 4px; max-width: 600px;}
-textarea { width: 95%; padding: 8px; margin-bottom: 10px; border: 1px solid #ccc; font-size: 1em; min-height: 50px; }
-button { padding: 8px 12px; margin-right: 5px; cursor: pointer; border: 1px solid #ccc; border-radius: 3px;}
-button:disabled { cursor: not-allowed; opacity: 0.6; }
-#task-id-display { font-size: 0.85em; color: #666; margin-bottom: 5px; }
-#status-display { font-weight: bold; margin-bottom: 10px; padding: 8px; background-color: #f0f0f0; border-radius: 3px;}
-.hidden { display: none; }
-.review-box, .result-box { border: 1px solid #ddd; padding: 10px; margin-top: 10px; background-color: #f9f9f9; }
-pre { background-color: #eee; padding: 10px; border: 1px solid #ddd; white-space: pre-wrap; word-wrap: break-word; max-height: 200px; overflow-y: auto;}
-.approve { background-color: #d4edda; border-color: #c3e6cb; }
-.reject { background-color: #f8d7da; border-color: #f5c6cb; }
\ No newline at end of file
+body {
+ font-family: sans-serif;
+ margin: 0; /* Remove default body margin */
+ padding: 20px; /* Add some padding around the content */
+ background-color: #f8f9fa; /* Lighter grey background */
+ display: flex; /* Enable Flexbox */
+ flex-direction: column; /* Stack children vertically */
+ align-items: center; /* Center children horizontally */
+ min-height: 100vh; /* Ensure body takes at least full viewport height */
+ box-sizing: border-box; /* Include padding in height calculation */
+}
+
+h1 {
+ text-align: center; /* Center the main title */
+ color: #343a40;
+ margin-bottom: 25px;
+}
+
+/* Style the main containers */
+.container, .status-container {
+ background: #ffffff;
+ padding: 20px 25px; /* More padding */
+ border: 1px solid #dee2e6; /* Softer border */
+ margin-bottom: 20px;
+ border-radius: 6px; /* Slightly rounder corners */
+ width: 90%; /* Responsive width */
+ max-width: 650px; /* Max width for readability */
+ box-shadow: 0 2px 5px rgba(0,0,0,0.05); /* Subtle shadow */
+ box-sizing: border-box; /* Include padding/border in width */
+}
+
+textarea {
+ width: 100%; /* Take full width of parent container */
+ padding: 10px;
+ margin-bottom: 10px;
+ border: 1px solid #ced4da;
+ border-radius: 4px;
+ font-size: 1em;
+ min-height: 60px;
+ box-sizing: border-box;
+}
+
+button {
+ padding: 9px 15px; /* Slightly adjusted padding */
+ margin-right: 8px;
+ cursor: pointer;
+ border: none; /* Remove default border */
+ border-radius: 4px;
+ font-weight: 500;
+ transition: background-color 0.2s ease;
+}
+
+button:disabled {
+ cursor: not-allowed;
+ opacity: 0.6;
+}
+
+/* Specific button styling */
+#submit-button {
+ background-color: #0d6efd; /* Bootstrap primary blue */
+ color: white;
+}
+#submit-button:hover:not(:disabled) {
+ background-color: #0b5ed7;
+}
+
+.approve {
+ background-color: #198754; /* Bootstrap success green */
+ color: white;
+}
+.approve:hover:not(:disabled) {
+ background-color: #157347;
+}
+
+.reject {
+ background-color: #dc3545; /* Bootstrap danger red */
+ color: white;
+}
+.reject:hover:not(:disabled) {
+ background-color: #bb2d3b;
+}
+
+
+#task-id-display {
+ font-size: 0.9em;
+ color: #6c757d; /* Bootstrap secondary text color */
+ margin-bottom: 8px;
+ word-wrap: break-word;
+}
+
+#status-display {
+ font-weight: bold;
+ margin-bottom: 15px;
+ padding: 10px;
+ background-color: #e9ecef; /* Light grey background */
+ border: 1px solid #dee2e6;
+ border-radius: 4px;
+ color: #495057;
+}
+
+.hidden {
+ display: none;
+}
+
+/* Review/Result Box Styling */
+.review-box, .result-box {
+ border: 1px solid #dee2e6;
+ padding: 15px;
+ margin-top: 15px;
+ border-radius: 4px;
+ background-color: #f8f9fa; /* Very light background */
+}
+
+h2, h3 {
+ margin-top: 0; /* Remove default top margin */
+ margin-bottom: 15px;
+ color: #495057;
+}
+
+h3 {
+ border-bottom: 1px solid #eee;
+ padding-bottom: 8px;
+}
+
+pre {
+ background-color: #e9ecef;
+ padding: 12px;
+ border: 1px solid #ced4da;
+ border-radius: 4px;
+ white-space: pre-wrap;
+ word-wrap: break-word;
+ max-height: 250px; /* Adjusted height */
+ overflow-y: auto;
+ font-family: monospace;
+ font-size: 0.95em;
+ color: #212529;
+}
\ No newline at end of file
diff --git a/cookbook/pocketflow-web-hitl/templates/index.html b/cookbook/pocketflow-web-hitl/templates/index.html
index 0ed293b..591ce80 100644
--- a/cookbook/pocketflow-web-hitl/templates/index.html
+++ b/cookbook/pocketflow-web-hitl/templates/index.html
@@ -3,11 +3,11 @@
- Minimal Feedback Task (SSE)
-
+ Pocket Flow Web Feedback
+
-