diff --git a/cookbook/pocketflow-fastapi-background/README.md b/cookbook/pocketflow-fastapi-background/README.md new file mode 100644 index 0000000..db38513 --- /dev/null +++ b/cookbook/pocketflow-fastapi-background/README.md @@ -0,0 +1,74 @@ +# PocketFlow FastAPI Background Job + +A minimal example of running PocketFlow workflows as background jobs with real-time progress updates via Server-Sent Events (SSE). + +## Features + +- Start article generation jobs via REST API +- Real-time granular progress updates via SSE (shows progress for each section) +- Background processing with FastAPI +- Simple three-step workflow: Outline → Content → Style +- Web interface for easy job submission and monitoring + +## Getting Started + +1. Install dependencies: +```bash +pip install -r requirements.txt +``` + +2. Set your OpenAI API key: +```bash +export OPENAI_API_KEY=your_api_key_here +``` + +3. Run the server: +```bash +python main.py +``` + +## Usage + +### Web Interface (Recommended) + +1. Open your browser and go to `http://localhost:8000` +2. Enter an article topic (e.g., "AI Safety", "Climate Change") +3. Click "Generate Article" +4. You'll be redirected to a progress page showing real-time updates +5. The final article will appear when generation is complete + +### API Usage + +#### Start a Job +```bash +curl -X POST "http://localhost:8000/start-job" -d "topic=AI Safety" -H "Content-Type: application/x-www-form-urlencoded" +``` + +Response: +```json +{"job_id": "123e4567-e89b-12d3-a456-426614174000", "topic": "AI Safety", "status": "started"} +``` + +#### Monitor Progress +```bash +curl "http://localhost:8000/progress/123e4567-e89b-12d3-a456-426614174000" +``` + +SSE Stream: +``` +data: {"step": "outline", "progress": 33, "data": {"sections": ["Introduction", "Challenges", "Solutions"]}} +data: {"step": "content", "progress": 44, "data": {"section": "Introduction", "completed_sections": 1, "total_sections": 3}} +data: {"step": "content", "progress": 55, "data": {"section": "Challenges", "completed_sections": 2, "total_sections": 3}} +data: {"step": "content", "progress": 66, "data": {"section": "Solutions", "completed_sections": 3, "total_sections": 3}} +data: {"step": "content", "progress": 66, "data": {"draft_length": 1234, "status": "complete"}} +data: {"step": "complete", "progress": 100, "data": {"final_article": "..."}} +``` + +## Files + +- `main.py` - FastAPI app with background jobs and SSE +- `flow.py` - PocketFlow workflow definition +- `nodes.py` - Workflow nodes (Outline, Content, Style) +- `utils/call_llm.py` - LLM utility function +- `static/index.html` - Main page for starting jobs +- `static/progress.html` - Progress monitoring page with real-time updates \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-background/docs/design.md b/cookbook/pocketflow-fastapi-background/docs/design.md new file mode 100644 index 0000000..9ef1506 --- /dev/null +++ b/cookbook/pocketflow-fastapi-background/docs/design.md @@ -0,0 +1,104 @@ +# Design Doc: PocketFlow FastAPI Background Job with SSE Progress + +> Please DON'T remove notes for AI + +## Requirements + +> Notes for AI: Keep it simple and clear. +> If the requirements are abstract, write concrete user stories + +**User Story**: As a user, I want to submit an article topic via a web API and receive real-time progress updates while the article is being generated in the background, so I can see the workflow progress without blocking the UI. + +**Core Requirements**: +1. Submit article topic via REST API endpoint +2. Start background job for article generation workflow +3. Receive real-time progress updates via Server-Sent Events (SSE) +4. Get final article result when workflow completes +5. Handle multiple concurrent requests + +**Technical Requirements**: +- FastAPI web server with REST endpoints +- Background task processing using asyncio +- Server-Sent Events for progress streaming +- Simple web interface to test the functionality + +## Flow Design + +> Notes for AI: +> 1. Consider the design patterns of agent, map-reduce, rag, and workflow. Apply them if they fit. +> 2. Present a concise, high-level description of the workflow. + +### Applicable Design Pattern: + +**Workflow Pattern**: Sequential processing of article generation steps with progress reporting at each stage. + +### Flow High-level Design: + +1. **Generate Outline Node**: Creates a structured outline for the article topic +2. **Write Content Node**: Writes content for each section in the outline +3. **Apply Style Node**: Applies conversational styling to the final article + +Each node puts progress updates into an asyncio.Queue for SSE streaming. + +```mermaid +flowchart LR + outline[Generate Outline] --> content[Write Content] + content --> styling[Apply Style] +``` + +## Utility Functions + +> Notes for AI: +> 1. Understand the utility function definition thoroughly by reviewing the doc. +> 2. Include only the necessary utility functions, based on nodes in the flow. + +1. **Call LLM** (`utils/call_llm.py`) + - *Input*: prompt (str) + - *Output*: response (str) + - Used by all workflow nodes for LLM tasks + +## Node Design + +### Shared Store + +> Notes for AI: Try to minimize data redundancy + +The shared store structure is organized as follows: + +```python +shared = { + "topic": "user-provided-topic", + "sse_queue": asyncio.Queue(), # For sending SSE updates + "sections": ["section1", "section2", "section3"], + "draft": "combined-section-content", + "final_article": "styled-final-article" +} +``` + +### Node Steps + +> Notes for AI: Carefully decide whether to use Batch/Async Node/Flow. + +1. **Generate Outline Node** + - *Purpose*: Create a structured outline with 3 main sections using YAML output + - *Type*: Regular Node (synchronous LLM call) + - *Steps*: + - *prep*: Read "topic" from shared store + - *exec*: Call LLM to generate YAML outline, parse and validate structure + - *post*: Write "sections" to shared store, put progress update in sse_queue + +2. **Write Content Node** + - *Purpose*: Generate concise content for each outline section + - *Type*: BatchNode (processes each section independently) + - *Steps*: + - *prep*: Read "sections" from shared store (returns list of sections) + - *exec*: For one section, call LLM to write 100-word content + - *post*: Combine all section content into "draft", put progress update in sse_queue + +3. **Apply Style Node** + - *Purpose*: Apply conversational, engaging style to the combined content + - *Type*: Regular Node (single LLM call for styling) + - *Steps*: + - *prep*: Read "draft" from shared store + - *exec*: Call LLM to rewrite in conversational style + - *post*: Write "final_article" to shared store, put completion update in sse_queue diff --git a/cookbook/pocketflow-fastapi-background/flow.py b/cookbook/pocketflow-fastapi-background/flow.py new file mode 100644 index 0000000..9a62a9a --- /dev/null +++ b/cookbook/pocketflow-fastapi-background/flow.py @@ -0,0 +1,19 @@ +from pocketflow import Flow +from nodes import GenerateOutline, WriteContent, ApplyStyle + +def create_article_flow(): + """ + Create and configure the article writing workflow + """ + # Create node instances + outline_node = GenerateOutline() + content_node = WriteContent() + style_node = ApplyStyle() + + # Connect nodes in sequence + outline_node >> content_node >> style_node + + # Create flow starting with outline node + article_flow = Flow(start=outline_node) + + return article_flow \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-background/main.py b/cookbook/pocketflow-fastapi-background/main.py new file mode 100644 index 0000000..70dd43a --- /dev/null +++ b/cookbook/pocketflow-fastapi-background/main.py @@ -0,0 +1,107 @@ +import asyncio +import json +import uuid +from fastapi import FastAPI, BackgroundTasks, Form +from fastapi.responses import StreamingResponse +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +from flow import create_article_flow + +app = FastAPI() + +# Mount static files +app.mount("/static", StaticFiles(directory="static"), name="static") + +# Store active jobs and their SSE queues +active_jobs = {} + +def run_article_workflow(job_id: str, topic: str): + """Run the article workflow in background""" + try: + # Create shared store with SSE queue + sse_queue = asyncio.Queue() + shared = { + "topic": topic, + "sse_queue": sse_queue, + "sections": [], + "draft": "", + "final_article": "" + } + + # Store the queue for SSE access + active_jobs[job_id] = sse_queue + + # Run the workflow + flow = create_article_flow() + flow.run(shared) + + except Exception as e: + # Send error message + error_msg = {"step": "error", "progress": 0, "data": {"error": str(e)}} + if job_id in active_jobs: + active_jobs[job_id].put_nowait(error_msg) + +@app.post("/start-job") +async def start_job(background_tasks: BackgroundTasks, topic: str = Form(...)): + """Start a new article generation job""" + job_id = str(uuid.uuid4()) + + # Start background task + background_tasks.add_task(run_article_workflow, job_id, topic) + + return {"job_id": job_id, "topic": topic, "status": "started"} + +@app.get("/progress/{job_id}") +async def get_progress(job_id: str): + """Stream progress updates via SSE""" + + async def event_stream(): + if job_id not in active_jobs: + yield f"data: {json.dumps({'error': 'Job not found'})}\n\n" + return + + sse_queue = active_jobs[job_id] + + try: + while True: + # Wait for next progress update + try: + # Use asyncio.wait_for to avoid blocking forever + progress_msg = await asyncio.wait_for(sse_queue.get(), timeout=1.0) + yield f"data: {json.dumps(progress_msg)}\n\n" + + # If job is complete, clean up and exit + if progress_msg.get("step") == "complete": + del active_jobs[job_id] + break + + except asyncio.TimeoutError: + # Send heartbeat to keep connection alive + yield f"data: {json.dumps({'heartbeat': True})}\n\n" + + except Exception as e: + yield f"data: {json.dumps({'error': str(e)})}\n\n" + + return StreamingResponse( + event_stream(), + media_type="text/plain", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Content-Type": "text/event-stream" + } + ) + +@app.get("/") +async def get_index(): + """Serve the main page""" + return FileResponse("static/index.html") + +@app.get("/progress.html") +async def get_progress_page(): + """Serve the progress page""" + return FileResponse("static/progress.html") + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-background/nodes.py b/cookbook/pocketflow-fastapi-background/nodes.py new file mode 100644 index 0000000..a7f4cca --- /dev/null +++ b/cookbook/pocketflow-fastapi-background/nodes.py @@ -0,0 +1,109 @@ +import yaml +from pocketflow import Node, BatchNode +from utils.call_llm import call_llm + +class GenerateOutline(Node): + def prep(self, shared): + return shared["topic"] + + def exec(self, topic): + prompt = f""" +Create a simple outline for an article about {topic}. +Include at most 3 main sections (no subsections). + +Output the sections in YAML format as shown below: + +```yaml +sections: + - First section title + - Second section title + - Third section title +```""" + response = call_llm(prompt) + yaml_str = response.split("```yaml")[1].split("```")[0].strip() + structured_result = yaml.safe_load(yaml_str) + return structured_result + + def post(self, shared, prep_res, exec_res): + sections = exec_res["sections"] + shared["sections"] = sections + + # Send progress update via SSE queue + progress_msg = {"step": "outline", "progress": 33, "data": {"sections": sections}} + shared["sse_queue"].put_nowait(progress_msg) + + return "default" + +class WriteContent(BatchNode): + def prep(self, shared): + # Store sections and sse_queue for use in exec + self.sections = shared.get("sections", []) + self.sse_queue = shared["sse_queue"] + return self.sections + + def exec(self, section): + prompt = f""" +Write a short paragraph (MAXIMUM 100 WORDS) about this section: + +{section} + +Requirements: +- Explain the idea in simple, easy-to-understand terms +- Use everyday language, avoiding jargon +- Keep it very concise (no more than 100 words) +- Include one brief example or analogy +""" + content = call_llm(prompt) + + # Send progress update for this section + current_section_index = self.sections.index(section) if section in self.sections else 0 + total_sections = len(self.sections) + + # Progress from 33% (after outline) to 66% (before styling) + # Each section contributes (66-33)/total_sections = 33/total_sections percent + section_progress = 33 + ((current_section_index + 1) * 33 // total_sections) + + progress_msg = { + "step": "content", + "progress": section_progress, + "data": { + "section": section, + "completed_sections": current_section_index + 1, + "total_sections": total_sections + } + } + self.sse_queue.put_nowait(progress_msg) + + return f"## {section}\n\n{content}\n" + + def post(self, shared, prep_res, exec_res_list): + draft = "\n".join(exec_res_list) + shared["draft"] = draft + return "default" + +class ApplyStyle(Node): + def prep(self, shared): + return shared["draft"] + + def exec(self, draft): + prompt = f""" +Rewrite the following draft in a conversational, engaging style: + +{draft} + +Make it: +- Conversational and warm in tone +- Include rhetorical questions that engage the reader +- Add analogies and metaphors where appropriate +- Include a strong opening and conclusion +""" + return call_llm(prompt) + + def post(self, shared, prep_res, exec_res): + shared["final_article"] = exec_res + + # Send completion update via SSE queue + progress_msg = {"step": "complete", "progress": 100, "data": {"final_article": exec_res}} + shared["sse_queue"].put_nowait(progress_msg) + + return "default" \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-background/requirements.txt b/cookbook/pocketflow-fastapi-background/requirements.txt new file mode 100644 index 0000000..16c3405 --- /dev/null +++ b/cookbook/pocketflow-fastapi-background/requirements.txt @@ -0,0 +1,5 @@ +fastapi +uvicorn +openai +pyyaml +python-multipart \ No newline at end of file diff --git a/cookbook/pocketflow-fastapi-background/static/index.html b/cookbook/pocketflow-fastapi-background/static/index.html new file mode 100644 index 0000000..84fd8e2 --- /dev/null +++ b/cookbook/pocketflow-fastapi-background/static/index.html @@ -0,0 +1,124 @@ + + +
+ + +
+
+