From 9de04109d8218f060172faf5ae3f85cc0aa1528a Mon Sep 17 00:00:00 2001 From: zachary62 Date: Fri, 11 Apr 2025 12:05:14 -0400 Subject: [PATCH] experiment with sse --- cookbook/pocketflow-web-hitl/README.md | 42 ++++ cookbook/pocketflow-web-hitl/docs/design.md | 55 +++++ cookbook/pocketflow-web-hitl/flask_server.py | 176 +++++++++++++++ cookbook/pocketflow-web-hitl/flow.py | 18 ++ cookbook/pocketflow-web-hitl/main.py | 16 ++ cookbook/pocketflow-web-hitl/nodes.py | 79 +++++++ cookbook/pocketflow-web-hitl/requirements.txt | 3 + cookbook/pocketflow-web-hitl/static/style.css | 13 ++ .../pocketflow-web-hitl/templates/index.html | 200 ++++++++++++++++++ .../pocketflow-web-hitl/utils/__init__.py | 0 .../pocketflow-web-hitl/utils/process_task.py | 16 ++ 11 files changed, 618 insertions(+) create mode 100644 cookbook/pocketflow-web-hitl/README.md create mode 100644 cookbook/pocketflow-web-hitl/docs/design.md create mode 100644 cookbook/pocketflow-web-hitl/flask_server.py create mode 100644 cookbook/pocketflow-web-hitl/flow.py create mode 100644 cookbook/pocketflow-web-hitl/main.py create mode 100644 cookbook/pocketflow-web-hitl/nodes.py create mode 100644 cookbook/pocketflow-web-hitl/requirements.txt create mode 100644 cookbook/pocketflow-web-hitl/static/style.css create mode 100644 cookbook/pocketflow-web-hitl/templates/index.html create mode 100644 cookbook/pocketflow-web-hitl/utils/__init__.py create mode 100644 cookbook/pocketflow-web-hitl/utils/process_task.py diff --git a/cookbook/pocketflow-web-hitl/README.md b/cookbook/pocketflow-web-hitl/README.md new file mode 100644 index 0000000..acfc7ec --- /dev/null +++ b/cookbook/pocketflow-web-hitl/README.md @@ -0,0 +1,42 @@ +# PocketFlow Hello World + +Your first PocketFlow application! This simple example demonstrates how to create a basic PocketFlow app from scratch. + +## Project Structure + +``` +. +├── docs/ # Documentation files +├── utils/ # Utility functions +├── flow.py # PocketFlow implementation +├── main.py # Main application entry point +└── README.md # Project documentation +``` + +## Setup + +1. Create a virtual environment: +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +``` + +2. Install dependencies: +```bash +pip install -r requirements.txt +``` + +3. Run the example: +```bash +python main.py +``` + +## What This Example Demonstrates + +- How to create your first PocketFlow application +- Basic PocketFlow concepts and usage +- Simple example of PocketFlow's capabilities + +## Additional Resources + +- [PocketFlow Documentation](https://the-pocket.github.io/PocketFlow/) \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/docs/design.md b/cookbook/pocketflow-web-hitl/docs/design.md new file mode 100644 index 0000000..2a44a36 --- /dev/null +++ b/cookbook/pocketflow-web-hitl/docs/design.md @@ -0,0 +1,55 @@ +# Human-in-the-Loop Web Service + +## 1. Requirements + +* **Goal:** Create a web service for task submission, processing, human review (Approve/Reject loop via UI), and finalization. +* **Interface:** Simple web UI (HTML/JS) for input, status display, and feedback buttons. +* **Backend:** Flask application using PocketFlow for workflow management. +* **Real-time Updates:** Use Server-Sent Events (SSE) to push status changes (pending, running, waiting_for_review, completed, failed) and intermediate results to the client without page reloads. +* **State:** Use in-memory storage for task state (Warning: Not suitable for production). + +## 2. Flow Design + +* **Core Pattern:** Workflow with a conditional loop based on human feedback. SSE for asynchronous status communication. +* **Nodes:** + 1. `ProcessNode` (Regular): Takes input, executes the (simulated) task processing. + 2. `ReviewNode` (Async): Waits for human feedback signaled via an `asyncio.Event`. Pushes "waiting\_for\_review" status to the SSE queue. + 3. `ResultNode` (Regular): Marks the task as complete and logs the final result. +* **Shared Store (`shared` dict per task):** + * `task_input`: Initial data from user. + * `processed_output`: Result from `ProcessNode`. + * `feedback`: 'approved' or 'rejected' set by the `/feedback` endpoint. + * `review_event`: `asyncio.Event` used by `ReviewNode` to wait and `/feedback` to signal. + * `final_result`: The approved output. + * `current_attempt`: Tracks reprocessing count. + * `task_id`: Unique identifier for the task. +* **SSE Communication:** An `asyncio.Queue` (stored alongside the `shared` store in the server's global `tasks` dict, *not directly in PocketFlow's shared store*) is used per task. Nodes (or wrapper code) put status updates onto this queue. The `/stream` endpoint reads from the queue and sends SSE messages. +* **Mermaid Diagram:** + +```mermaid +flowchart TD + Process[Process Task] -- "default" --> Review{Wait for Feedback} + Review -- "approved" --> Result[Final Result] + Review -- "rejected" --> Process +``` + +## 3. Utilities + +For this specific example, the core "utility" is the processing logic itself. Let's simulate it with a simple function. The Flask server acts as the external interface. + +* `process_task(input_data)`: A placeholder function. In a real scenario, this might call an LLM (`utils/call_llm.py`). + +## 4. Node Design (Detailed) + +* **`ProcessNode` (Node):** + * `prep`: Reads `task_input`, `current_attempt` from `shared`. + * `exec`: Calls `utils.process_task.process_task`. + * `post`: Writes `processed_output` to `shared`, increments `current_attempt`. Returns "default". +* **`ReviewNode` (AsyncNode):** + * `prep_async`: (As modified/wrapped by server.py) Reads `review_event`, `processed_output` from `shared`. **Puts "waiting\_for\_review" status onto the task's SSE queue.** + * `exec_async`: `await shared["review_event"].wait()`. + * `post_async`: Reads `feedback` from `shared`. Clears the event. Returns "approved" or "rejected". If approved, stores `processed_output` into `final_result`. +* **`ResultNode` (Node):** + * `prep`: Reads `final_result` from `shared`. + * `exec`: Prints/logs the final result. + * `post`: Returns `None` (ends flow). \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/flask_server.py b/cookbook/pocketflow-web-hitl/flask_server.py new file mode 100644 index 0000000..dc472a2 --- /dev/null +++ b/cookbook/pocketflow-web-hitl/flask_server.py @@ -0,0 +1,176 @@ +import asyncio +import uuid +import json +import os +from flask import Flask, request, jsonify, render_template, send_from_directory, Response +from flow import create_feedback_flow + +# --- Configuration --- +template_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'templates')) +static_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'static')) +app = Flask(__name__, template_folder=template_dir, static_folder=static_dir) + +# --- State Management (In-Memory - NOT FOR PRODUCTION) --- +tasks = {} # task_id -> {"shared": dict, "status": str, "task_obj": asyncio.Task} + +# --- Background Flow Runner --- +async def run_flow_background(task_id, flow, shared): + """Runs the flow in background, uses queue in shared for SSE.""" + if task_id not in tasks: return # Should not happen + queue = shared.get("sse_queue") + if not queue: + print(f"ERROR: Task {task_id} missing sse_queue in shared store!") + tasks[task_id]["status"] = "failed" + # Cannot easily report via SSE if queue is missing + return + + tasks[task_id]["status"] = "running" + await queue.put({"status": "running"}) + print(f"Task {task_id}: Flow starting.") + + final_status = "unknown" + error_message = None + try: + await flow.run_async(shared) + # Check final state + if shared.get("final_result") is not None: + final_status = "completed" + else: + # If flow ends without setting final_result (e.g., error before ResultNode) + final_status = "finished_incomplete" + print(f"Task {task_id}: Flow finished with status: {final_status}") + + except Exception as e: + final_status = "failed" + error_message = str(e) + print(f"Task {task_id}: Flow failed: {e}") + finally: + if task_id in tasks: + tasks[task_id]["status"] = final_status + final_update = {"status": final_status} + if final_status == "completed": + final_update["final_result"] = shared.get("final_result") + elif error_message: + final_update["error"] = error_message + await queue.put(final_update) + # Signal end of stream + await queue.put(None) + print(f"Task {task_id}: Background task ended. Final update put on queue.") + +# --- Flask Routes --- +@app.route('/') +async def index(): + return render_template('index.html') + +@app.route('/static/') +async def static_files(filename): + return send_from_directory(app.static_folder, filename) + +@app.route('/submit', methods=['POST']) +async def submit_task(): + if not request.is_json or 'data' not in request.json: + return jsonify({"error": "Requires JSON with 'data' field"}), 400 + + task_id = str(uuid.uuid4()) + feedback_event = asyncio.Event() + status_queue = asyncio.Queue() # Queue for SSE + + # Initial shared state for the flow + shared = { + "task_input": request.json['data'], + "processed_output": None, + "feedback": None, + "review_event": feedback_event, + "sse_queue": status_queue, # Make queue accessible to nodes + "final_result": None, + "task_id": task_id + } + + flow = create_feedback_flow() + + # Store task state + tasks[task_id] = { + "shared": shared, + "status": "pending", + # "flow": flow, # Not strictly needed if we don't re-use it + "task_obj": None # Will hold the background task + } + + await status_queue.put({"status": "pending", "task_id": task_id}) + + # Start flow execution in background + task_obj = asyncio.create_task(run_flow_background(task_id, flow, shared)) + tasks[task_id]["task_obj"] = task_obj + + print(f"Task {task_id}: Submitted.") + return jsonify({"message": "Task submitted", "task_id": task_id}), 202 + +@app.route('/feedback/', methods=['POST']) +async def provide_feedback(task_id): + if task_id not in tasks: + return jsonify({"error": "Task not found"}), 404 + + task_info = tasks[task_id] + shared = task_info["shared"] + queue = shared.get("sse_queue") + + async def report_error(message, status_code=400): + print(f"Task {task_id}: Feedback error - {message}") + if queue: await queue.put({"status": "feedback_error", "error": message}) + return jsonify({"error": message}), status_code + + if not request.is_json or 'feedback' not in request.json: + return await report_error("Requires JSON with 'feedback' field") + feedback = request.json.get('feedback') + if feedback not in ["approved", "rejected"]: + return await report_error("Invalid feedback value") + + review_event = shared.get("review_event") + if not review_event or review_event.is_set(): + return await report_error("Task not awaiting feedback or feedback already sent", 409) + + print(f"Task {task_id}: Received feedback: {feedback}") + if queue: await queue.put({"status": "processing_feedback"}) + tasks[task_id]["status"] = "processing_feedback" + + shared["feedback"] = feedback + review_event.set() # Signal the waiting ReviewNode + + return jsonify({"message": f"Feedback '{feedback}' received"}), 200 + +# --- SSE Endpoint --- +@app.route('/stream/') +async def stream(task_id): + if task_id not in tasks or "sse_queue" not in tasks[task_id]["shared"]: + return Response("data: {\"status\": \"error\", \"error\": \"Task or queue not found\"}\n\n", + mimetype='text/event-stream', status=404) + + queue = tasks[task_id]["shared"]["sse_queue"] + + async def event_generator(): + print(f"SSE Stream: Client connected for {task_id}") + try: + while True: + update = await queue.get() + if update is None: # Sentinel for end of stream + print(f"SSE Stream: Sentinel received for {task_id}, closing.") + yield f"data: {json.dumps({'status': 'stream_closed'})}\n\n" + break + sse_data = json.dumps(update) + print(f"SSE Stream: Sending for {task_id}: {sse_data}") + yield f"data: {sse_data}\n\n" + queue.task_done() + except asyncio.CancelledError: + print(f"SSE Stream: Client disconnected for {task_id}.") + finally: + print(f"SSE Stream: Generator finished for {task_id}.") + # Optional: Cleanup task entry after stream ends? + # if task_id in tasks: del tasks[task_id] # Careful if task state needed elsewhere + + headers = {'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache'} + return Response(event_generator(), mimetype='text/event-stream', headers=headers) + +# --- Main --- +# Use an ASGI server like Hypercorn: `hypercorn server:app` +if __name__ == '__main__': + print("Run using an ASGI server, e.g., 'hypercorn flask_server:app'") \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/flow.py b/cookbook/pocketflow-web-hitl/flow.py new file mode 100644 index 0000000..f0d5747 --- /dev/null +++ b/cookbook/pocketflow-web-hitl/flow.py @@ -0,0 +1,18 @@ +from pocketflow import AsyncFlow +from nodes import ProcessNode, ReviewNode, ResultNode + +def create_feedback_flow(): + """Creates the minimal feedback workflow.""" + process_node = ProcessNode() + review_node = ReviewNode() + result_node = ResultNode() + + # Define transitions + process_node >> review_node + review_node - "approved" >> result_node + review_node - "rejected" >> process_node # Loop back + + # Create the AsyncFlow + flow = AsyncFlow(start=process_node) + print("Minimal feedback flow created.") + return flow \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/main.py b/cookbook/pocketflow-web-hitl/main.py new file mode 100644 index 0000000..05805c5 --- /dev/null +++ b/cookbook/pocketflow-web-hitl/main.py @@ -0,0 +1,16 @@ +from flow import qa_flow + +# Example main function +# Please replace this with your own main function +def main(): + shared = { + "question": "In one sentence, what's the end of universe?", + "answer": None + } + + qa_flow.run(shared) + print("Question:", shared["question"]) + print("Answer:", shared["answer"]) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/nodes.py b/cookbook/pocketflow-web-hitl/nodes.py new file mode 100644 index 0000000..aaac399 --- /dev/null +++ b/cookbook/pocketflow-web-hitl/nodes.py @@ -0,0 +1,79 @@ +import asyncio +from pocketflow import Node, AsyncNode +from utils.process_task import process_task + +class ProcessNode(Node): + def prep(self, shared): + task_input = shared.get("task_input", "No input") + print("ProcessNode Prep") + return task_input + + def exec(self, prep_res): + return process_task(prep_res) + + def post(self, shared, prep_res, exec_res): + shared["processed_output"] = exec_res + print("ProcessNode Post: Output stored.") + return "default" # Go to ReviewNode + +class ReviewNode(AsyncNode): + async def prep_async(self, shared): + review_event = shared.get("review_event") + queue = shared.get("sse_queue") # Expect queue in shared + processed_output = shared.get("processed_output", "N/A") + + if not review_event or not queue: + print("ERROR: ReviewNode Prep - Missing review_event or sse_queue in shared store!") + return None # Signal failure + + # Push status update to SSE queue + status_update = { + "status": "waiting_for_review", + "output_to_review": processed_output + } + await queue.put(status_update) + print("ReviewNode Prep: Put 'waiting_for_review' on SSE queue.") + + return review_event # Return event for exec_async + + async def exec_async(self, prep_res): + review_event = prep_res + if not review_event: + print("ReviewNode Exec: Skipping wait (no event from prep).") + return + print("ReviewNode Exec: Waiting on review_event...") + await review_event.wait() + print("ReviewNode Exec: review_event set.") + + async def post_async(self, shared, prep_res, exec_res): + feedback = shared.get("feedback") + print(f"ReviewNode Post: Processing feedback '{feedback}'") + + # Clear the event for potential loops + review_event = shared.get("review_event") + if review_event: + review_event.clear() + shared["feedback"] = None # Reset feedback + + if feedback == "approved": + shared["final_result"] = shared.get("processed_output") + print("ReviewNode Post: Action=approved") + return "approved" + else: + print("ReviewNode Post: Action=rejected") + return "rejected" + +class ResultNode(Node): + def prep(self, shared): + print("ResultNode Prep") + return shared.get("final_result", "No final result.") + + def exec(self, prep_res): + print(f"--- FINAL RESULT ---") + print(prep_res) + print(f"--------------------") + return prep_res + + def post(self, shared, prep_res, exec_res): + print("ResultNode Post: Flow finished.") + return None # End flow \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/requirements.txt b/cookbook/pocketflow-web-hitl/requirements.txt new file mode 100644 index 0000000..aaad2df --- /dev/null +++ b/cookbook/pocketflow-web-hitl/requirements.txt @@ -0,0 +1,3 @@ +pocketflow>=0.0.1 +flask +hypercorn \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/static/style.css b/cookbook/pocketflow-web-hitl/static/style.css new file mode 100644 index 0000000..c57fe7d --- /dev/null +++ b/cookbook/pocketflow-web-hitl/static/style.css @@ -0,0 +1,13 @@ +/* Minimal functional styling */ +body { font-family: sans-serif; margin: 15px; background-color: #fdfdfd; } +.container, .status-container { background: #fff; padding: 15px; border: 1px solid #eee; margin-bottom: 15px; border-radius: 4px; max-width: 600px;} +textarea { width: 95%; padding: 8px; margin-bottom: 10px; border: 1px solid #ccc; font-size: 1em; min-height: 50px; } +button { padding: 8px 12px; margin-right: 5px; cursor: pointer; border: 1px solid #ccc; border-radius: 3px;} +button:disabled { cursor: not-allowed; opacity: 0.6; } +#task-id-display { font-size: 0.85em; color: #666; margin-bottom: 5px; } +#status-display { font-weight: bold; margin-bottom: 10px; padding: 8px; background-color: #f0f0f0; border-radius: 3px;} +.hidden { display: none; } +.review-box, .result-box { border: 1px solid #ddd; padding: 10px; margin-top: 10px; background-color: #f9f9f9; } +pre { background-color: #eee; padding: 10px; border: 1px solid #ddd; white-space: pre-wrap; word-wrap: break-word; max-height: 200px; overflow-y: auto;} +.approve { background-color: #d4edda; border-color: #c3e6cb; } +.reject { background-color: #f8d7da; border-color: #f5c6cb; } \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/templates/index.html b/cookbook/pocketflow-web-hitl/templates/index.html new file mode 100644 index 0000000..0ed293b --- /dev/null +++ b/cookbook/pocketflow-web-hitl/templates/index.html @@ -0,0 +1,200 @@ + + + + + + Minimal Feedback Task (SSE) + + + +

Minimal Task Submitter (SSE)

+ +
+ + +
+ +
+

Status

+
Task ID: N/A
+
Submit a task.
+ + + + +
+ + + + \ No newline at end of file diff --git a/cookbook/pocketflow-web-hitl/utils/__init__.py b/cookbook/pocketflow-web-hitl/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cookbook/pocketflow-web-hitl/utils/process_task.py b/cookbook/pocketflow-web-hitl/utils/process_task.py new file mode 100644 index 0000000..b52d520 --- /dev/null +++ b/cookbook/pocketflow-web-hitl/utils/process_task.py @@ -0,0 +1,16 @@ +import time + +def process_task(input_data): + """Minimal simulation of processing the input data.""" + print(f"Processing: '{input_data[:50]}...'") + + # Simulate work + time.sleep(2) + + processed_result = f"Processed: {input_data}" + print(f"Finished processing.") + return processed_result + +# We don't need a separate utils/call_llm.py for this minimal example, +# but you would add it here if ProcessNode used an LLM. +