|
|
||
|---|---|---|
| .. | ||
| assets | ||
| docs | ||
| static | ||
| templates | ||
| utils | ||
| README.md | ||
| flow.py | ||
| nodes.py | ||
| requirements.txt | ||
| server.py | ||
README.md
PocketFlow Web Human-in-the-Loop (HITL) Feedback Service
This project demonstrates a minimal web application for human-in-the-loop workflows using PocketFlow, FastAPI, and Server-Sent Events (SSE). Users can submit text, have it processed (simulated), review the output, and approve or reject it, potentially triggering reprocessing until approved.
Features
- Web UI: Simple interface for submitting tasks and providing feedback.
- PocketFlow Workflow: Manages the process -> review -> result/reprocess logic.
- FastAPI Backend: Serves the UI and handles API requests asynchronously.
- Server-Sent Events (SSE): Provides real-time status updates to the client without polling.
How to Run
-
Install Dependencies:
pip install -r requirements.txt -
Run the FastAPI Server: Use Uvicorn (or another ASGI server):
uvicorn server:app --reload --port 8000(The
--reloadflag is useful for development.) -
Access the Web UI: Open your web browser and navigate to
http://127.0.0.1:8000. -
Use the Application:
- Enter text into the textarea and click "Submit".
- Observe the status updates pushed via SSE.
- When prompted ("waiting_for_review"), use the "Approve" or "Reject" buttons.
- If rejected, the process loops back. If approved, the final result is displayed.
How It Works
The application uses PocketFlow to define and execute the feedback loop workflow. FastAPI handles web requests and manages the real-time SSE communication.
PocketFlow Workflow:
The core logic is orchestrated by an AsyncFlow defined in flow.py:
flowchart TD
subgraph FeedbackFlow[MinimalFeedbackFlow]
Process[ProcessNode] -- default --> Review[ReviewNode]
Review -- approved --> Result[ResultNode]
Review -- rejected --> Process
end
ProcessNode: Receives input text, calls the minimalprocess_taskutility, and stores the output.ReviewNode(Async):- Pushes a "waiting_for_review" status with the processed output to the SSE queue.
- Waits asynchronously for an external signal (triggered by the
/feedbackAPI endpoint). - Based on the received feedback ("approved" or "rejected"), determines the next step in the flow. Stores the result if approved.
ResultNode: Logs the final approved result.
FastAPI & SSE Integration:
- The
/submitendpoint creates a unique task, initializes the PocketFlowsharedstate (including anasyncio.Eventfor review and anasyncio.Queuefor SSE), and schedules the flow execution usingBackgroundTasks. - Nodes within the flow (specifically
ReviewNode's prep logic) put status updates onto the task-specificsse_queue. - The
/stream/{task_id}endpoint usesStreamingResponseto read from the task'ssse_queueand push formatted status updates to the connected client via Server-Sent Events. - The
/feedback/{task_id}endpoint receives the human's decision, updates thesharedstate, and sets theasyncio.Eventto unblock the waitingReviewNode.
This setup allows for a decoupled workflow logic (PocketFlow) and web interaction layer (FastAPI), with efficient real-time updates pushed to the user.
Files
server.py: The main FastAPI application handling HTTP requests, SSE, state management, and background task scheduling.nodes.py: Defines the PocketFlowNodeclasses (ProcessNode,ReviewNode,ResultNode) for the workflow steps.flow.py: Defines the PocketFlowAsyncFlowthat connects the nodes into the feedback loop.utils/process_task.py: Contains the minimal simulation function for task processing.templates/index.html: The HTML structure for the frontend user interface.static/style.css: Basic CSS for styling the frontend.requirements.txt: Project dependencies (FastAPI, Uvicorn, Jinja2, PocketFlow).