add gradion-hitl example

This commit is contained in:
Wetter 2025-05-30 16:55:33 +00:00
parent d13180f835
commit ecbb7e6e4d
12 changed files with 570 additions and 0 deletions

View File

@ -0,0 +1,94 @@
# PocketFlow Gradio HITL Example
A web-based application that demonstrates Human-in-the-Loop (HITL) workflow orchestration using PocketFlow and Gradio. This example provides an interactive interface for users to engage with AI-powered tasks while maintaining human oversight and feedback.
## Features
- **Web-based Interface**: Built with Gradio for an accessible and user-friendly experience
- **Human-in-the-Loop Integration**: Seamless integration of human feedback into the AI workflow
- **Modern UI**: Clean and intuitive interface for better user interaction
- **Powered by LLMs**: Utilizes OpenAI's models for intelligent task processing
- **Flow Visualization**: Real-time visualization of node execution sequence and workflow progress
- **Interactive Debugging**: Monitor and understand the decision-making process through visual feedback
## Getting Started
This project is part of the PocketFlow cookbook examples. It's assumed you have already cloned the [PocketFlow repository](https://github.com/the-pocket/PocketFlow) and are in the `cookbook/pocketflow-gradio-hitl` directory.
1. **Install required dependencies**:
```bash
pip install -r requirements.txt
```
2. **Set up your OpenAI API key**:
The application uses OpenAI models for processing. You need to set your API key as an environment variable:
```bash
export OPENAI_API_KEY="your-openai-api-key-here"
```
3. **Run the Application**:
```bash
python main.py
```
This will start the Gradio web interface, typically accessible at `http://localhost:7860`
## How It Works
The system implements a PocketFlow workflow with a web interface:
```mermaid
flowchart TD
DecideAction[Decide Action Node] --> |"check-weather"| CheckWeather[Check Weather Node]
CheckWeather --> DecideAction
DecideAction --> |"book-hotel"| BookHotel[Book Hotel Node]
BookHotel --> DecideAction
DecideAction --> |"follow-up"| FollowUp[Follow Up Node]
DecideAction --> |"result-notification"| ResultNotification[Result Notification Node]
```
The workflow consists of the following nodes:
1. **Decide Action Node**: The central decision-making node that determines the next action based on user input and context
2. **Check Weather Node**: Provides weather information for specified cities and dates
3. **Book Hotel Node**: Handles hotel reservation requests with check-in and check-out dates
4. **Follow Up Node**: Manages user interactions by asking clarifying questions or handling out-of-scope requests
5. **Result Notification Node**: Delivers action results and offers additional assistance
The flow is orchestrated through a series of directed connections:
- The Decide Action node can trigger weather checks, hotel bookings, follow-ups, or result notifications
- Weather checks and hotel bookings can feed back to the Decide Action node for further processing
- Follow-up and result notification nodes provide the final steps in the workflow
### Flow Visualization
The application provides real-time visualization of the workflow execution:
- The sequence of node activations is displayed chronologically
- Users can see which decision paths are being taken
- The visualization helps in understanding the AI's decision-making process
![flow visualization](./assets/flow_visualization.png)
## Sample Output
Here's an example of book hotel:
![book hotel](./assets/book_hotel.png)
Here's an example of changing intention mid-conversation:
![change intention](./assets/change_intention.png)
## Files
- [`main.py`](./main.py): Entry point for the application and Gradio interface setup
- [`flow.py`](./flow.py): Defines the PocketFlow graph and node connections
- [`nodes.py`](./nodes.py): Contains the node definitions for the workflow
- [`utils/`](./utils/): Contains utility functions and helper modules
- [`requirements.txt`](./requirements.txt): Lists project dependencies
## Requirements
- Python 3.8+
- PocketFlow >= 0.0.2
- Gradio >= 5.29.1
- OpenAI >= 1.78.1

Binary file not shown.

After

Width:  |  Height:  |  Size: 131 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 136 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 105 KiB

View File

@ -0,0 +1,29 @@
from pocketflow import Flow
from nodes import (
DecideAction,
CheckWeather,
BookHotel,
FollowUp,
ResultNotification,
)
def create_flow():
"""
Create and connect the nodes to form a complete agent flow.
"""
decide_action = DecideAction()
check_weather = CheckWeather()
book_hotel = BookHotel()
follow_up = FollowUp()
result_notification = ResultNotification()
decide_action - "check-weather" >> check_weather
check_weather >> decide_action
decide_action - "book-hotel" >> book_hotel
book_hotel >> decide_action
decide_action - "follow-up" >> follow_up
decide_action - "result-notification" >> result_notification
return Flow(start=decide_action)

View File

@ -0,0 +1,104 @@
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import gradio as gr
from gradio import ChatMessage
from flow import create_flow
# create global thread pool
chatflow_thread_pool = ThreadPoolExecutor(
max_workers=5,
thread_name_prefix="chatflow_worker",
)
def chat_fn(message, history, uuid):
"""
Main chat function that handles the conversation flow and message processing.
Args:
message (str): The current user message
history (list): Previous conversation history
uuid (UUID): Unique identifier for the conversation
Yields:
ChatMessage: Streams of thought process and chat responses
"""
# Log conversation details
print(f"Conversation ID: {str(uuid)}\nHistory: {history}\nQuery: {message}\n---")
# Initialize queues for chat messages and flow thoughts
chat_queue = Queue()
flow_queue = Queue()
# Create shared context for the flow
shared = {
"conversation_id": str(uuid),
"query": message,
"history": history,
"queue": chat_queue,
"flow_queue": flow_queue,
}
# Create and run the chat flow in a separate thread
chat_flow = create_flow()
chatflow_thread_pool.submit(chat_flow.run, shared)
# Initialize thought response tracking
start_time = time.time()
thought_response = ChatMessage(
content="", metadata={"title": "Flow Log", "id": 0, "status": "pending"}
)
yield thought_response
# Process and accumulate thoughts from the flow queue
accumulated_thoughts = ""
while True:
thought = flow_queue.get()
if thought is None:
break
accumulated_thoughts += f"- {thought}\n\n"
thought_response.content = accumulated_thoughts.strip()
yield thought_response
flow_queue.task_done()
# Mark thought processing as complete and record duration
thought_response.metadata["status"] = "done"
thought_response.metadata["duration"] = time.time() - start_time
yield thought_response
# Process and yield chat messages from the chat queue
while True:
msg = chat_queue.get()
if msg is None:
break
chat_response = [thought_response, ChatMessage(content=msg)]
yield chat_response
chat_queue.task_done()
def clear_fn():
print("Clearing conversation")
return uuid.uuid4()
with gr.Blocks(fill_height=True, theme="ocean") as demo:
uuid_state = gr.State(uuid.uuid4())
demo.load(clear_fn, outputs=[uuid_state])
chatbot = gr.Chatbot(type="messages", scale=1)
chatbot.clear(clear_fn, outputs=[uuid_state])
gr.ChatInterface(
fn=chat_fn,
type="messages",
additional_inputs=[uuid_state],
chatbot=chatbot,
title="PocketFlow Gradio Demo",
)
demo.launch()

View File

@ -0,0 +1,241 @@
from datetime import datetime
from queue import Queue
import yaml
from pocketflow import Node
from utils.call_llm import call_llm
from utils.call_mock_api import call_book_hotel_api, call_check_weather_api
from utils.conversation import load_conversation, save_conversation
from utils.format_chat_history import format_chat_history
class DecideAction(Node):
def prep(self, shared):
conversation_id = shared["conversation_id"]
session = load_conversation(conversation_id)
return session, shared["history"], shared["query"]
def exec(self, prep_res):
session, history, query = prep_res
prompt = f"""
### INSTRUCTIONS
You are a lifestyle assistant capable of helping users book hotels and check weather conditions.
You need to decide the next action based on your last action, action execution result, chat history, and current user question.
### CHAT HISTORY
{format_chat_history(history)}
### CURRENT USER QUESTION
user: {query}
### CONTEXT
Last Action: {session.get("last_action", None)}
Last Action Result: {session.get("action_result", None)}
Current Date: {datetime.now().date()}
### ACTION SPACE
[1] check-weather
Description: When the user asks about the weather, use this tool.
Parameters:
- name: city
description: The city to check the weather
required: true
example: Beijing
- name: date
description: The date to check the weather, if not provided, use the current date
required: false
example: 2025-05-28
[2] book-hotel
Description: When the user wants to book a hotel, use this tool.
Parameters:
- name: hotel
description: The name of the hotel to be booked
required: true
example: ShanghaiHilton Hotel
- name: checkin_date
description: The check-in date
required: true
example: 2025-05-28
- name: checkout_date
description: The check-out date
required: true
example: 2025-05-29
[3] follow-up
Description: 1. When the user's question is out of the scope of booking hotels and checking weather, use this tool to guide the user; 2. When the current information cannot meet the parameter requirements of the corresponding tool, use this tool to ask the user.
Parameters:
- name: question
description: Your guidance or follow-up to the user, maintain an enthusiastic and lively language style, and use the same language as the user's question.
required: true
example: Which hotel would you like to book?😊
[4] result-notification
Description: When the booking of a hotel or checking the weather is completed, use this tool to notify the user of the result and ask if they need any other help. If you find that the user's question is not completed in the history conversation, you can guide the user to complete the intention in the last step.
Parameters:
- name: result
description: Notify the user of the result based on the Last Action Result. Maintain an enthusiastic and lively language style, and use the same language as the user's question.
required: true
example: The hotel has been successfully booked for you. 😉\n\nThe check-in date is XX, and the check-out date is XX. Thank you for using it. Would you like any other help?😀
## NEXT ACTION
Decide the next action based on the context and available actions.
Return your response in this format:
```yaml
thinking: |
<your step-by-step reasoning process>
action: check-weather OR book-hotel OR follow-up OR result-notification
reason: <why you chose this action>
question: <if action is follow-up>
city: <if action is check-weather>
hotel: <if action is book-hotel>
checkin_date: <if action is book-hotel>
checkout_date: <if action is book-hotel>
result: <if action is result-notification>
```
IMPORTANT: Make sure to:
1. Use proper indentation (4 spaces) for all multi-line fields
2. Use the | character for multi-line text fields
3. Keep single-line fields without the | character
"""
response = call_llm(prompt.strip())
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
print(f"🤖 Agent response: \n{yaml_str}")
decision = yaml.safe_load(yaml_str)
return decision
def post(self, shared, prep_res, exec_res):
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
"""Save the decision and determine the next step in the flow."""
# If LLM decided to search, save the search query
session["last_action"] = exec_res["action"]
flow_log: Queue = shared["flow_queue"]
for line in exec_res["thinking"].split("\n"):
line = line.replace("-", "").strip()
if line:
flow_log.put(f"🤔 {line}")
if exec_res["action"] == "check-weather":
session["check_weather_params"] = {
"city": exec_res["city"],
"date": exec_res.get("date", None),
}
flow_log.put(f"➡️ Agent decided to check weather for: {exec_res['city']}")
elif exec_res["action"] == "book-hotel":
session["book_hotel_params"] = {
"hotel": exec_res["hotel"],
"checkin_date": exec_res["checkin_date"],
"checkout_date": exec_res["checkout_date"],
}
flow_log.put(f"➡️ Agent decided to book hotel: {exec_res['hotel']}")
elif exec_res["action"] == "follow-up":
session["follow_up_params"] = {"question": exec_res["question"]}
flow_log.put(f"➡️ Agent decided to follow up: {exec_res['question']}")
elif exec_res["action"] == "result-notification":
session["result_notification_params"] = {"result": exec_res["result"]}
flow_log.put(f"➡️ Agent decided to notify the result: {exec_res['result']}")
save_conversation(conversation_id, session)
# Return the action to determine the next node in the flow
return exec_res["action"]
class CheckWeather(Node):
def prep(self, shared):
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
city = session["check_weather_params"]["city"]
date = session["check_weather_params"].get("date", None)
return city, date
def exec(self, prep_res):
city, date = prep_res
return call_check_weather_api(city, date)
def post(self, shared, prep_res, exec_res):
flow_log: Queue = shared["flow_queue"]
flow_log.put(f"⬅️ Check weather result: {exec_res}")
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
session["action_result"] = exec_res
save_conversation(conversation_id, session)
return "default"
class BookHotel(Node):
def prep(self, shared):
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
hotel = session["book_hotel_params"]["hotel"]
checkin_date = session["book_hotel_params"]["checkin_date"]
checkout_date = session["book_hotel_params"]["checkout_date"]
return hotel, checkin_date, checkout_date
def exec(self, prep_res):
hotel, checkin_date, checkout_date = prep_res
return call_book_hotel_api(hotel, checkin_date, checkout_date)
def post(self, shared, prep_res, exec_res):
flow_log: Queue = shared["flow_queue"]
flow_log.put(f"⬅️ Book hotel result: {exec_res}")
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
session["action_result"] = exec_res
save_conversation(conversation_id, session)
return "default"
class FollowUp(Node):
def prep(self, shared):
flow_log: Queue = shared["flow_queue"]
flow_log.put(None)
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
question = session["follow_up_params"]["question"]
return question, shared["queue"]
def exec(self, prep_res):
question, queue = prep_res
queue.put(question)
queue.put(None)
return question
def post(self, shared, prep_res, exec_res):
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
session["action_result"] = exec_res
return "done"
class ResultNotification(Node):
def prep(self, shared):
flow_log: Queue = shared["flow_queue"]
flow_log.put(None)
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
result = session["result_notification_params"]["result"]
return result, shared["queue"]
def exec(self, prep_res):
result, queue = prep_res
queue.put(result)
queue.put(None)
return result
def post(self, shared, prep_res, exec_res):
conversation_id = shared["conversation_id"]
session: dict = load_conversation(conversation_id)
session["action_result"] = None
session["last_action"] = None
save_conversation(conversation_id, session)
return "done"

View File

@ -0,0 +1,3 @@
pocketflow>=0.0.2
gradio>=5.29.1
openai>=1.78.1

View File

@ -0,0 +1,21 @@
import os
from openai import OpenAI
from openai.types.chat.chat_completion import ChatCompletion
api_key = os.getenv("OPENAI_API_KEY")
base_url = "https://api.openai.com/v1"
model = "gpt-4o"
def call_llm(message: str):
print(f"Calling LLM with message: \n{message}")
client = OpenAI(api_key=api_key, base_url=base_url)
response: ChatCompletion = client.chat.completions.create(
model=model, messages=[{"role": "user", "content": message}]
)
return response.choices[0].message.content
if __name__ == "__main__":
print(call_llm("Hello, how are you?"))

View File

@ -0,0 +1,39 @@
import random
from datetime import date, datetime
def call_check_weather_api(city: str, date: date | None):
if date is None:
date = datetime.now().date()
current_date = datetime.now().date()
# calculate date difference
date_diff = (date - current_date).days
# check if the date is within the allowed range
if abs(date_diff) > 7:
return f"Failed to check weather: Date {date} is more than 7 days away from current date."
return f"The weather in {city} on {date} is {random.choice(['sunny', 'cloudy', 'rainy', 'snowy'])}, and the temperature is {random.randint(10, 30)}°C."
def call_book_hotel_api(hotel: str, checkin_date: date, checkout_date: date):
current_date = datetime.now().date()
# check if the checkin date is after the current date
if checkin_date <= current_date:
return (
f"Failed to book hotel {hotel}: Check-in date must be after current date."
)
# check if the checkin date is before the checkout date
if checkin_date >= checkout_date:
return f"Failed to book hotel {hotel}, because the checkin date is after the checkout date."
# check if the date difference is more than 7 days
date_diff = (checkout_date - checkin_date).days
if date_diff > 7:
return f"Failed to book hotel {hotel}: Stay duration cannot exceed 7 days."
return f"Booked hotel {hotel} from {checkin_date.strftime('%Y-%m-%d')} to {checkout_date.strftime('%Y-%m-%d')} successfully."

View File

@ -0,0 +1,11 @@
conversation_cache = {}
def load_conversation(conversation_id: str):
print(f"Loading conversation {conversation_id}")
return conversation_cache.get(conversation_id, {})
def save_conversation(conversation_id: str, session: dict):
print(f"Saving conversation {session}")
conversation_cache[conversation_id] = session

View File

@ -0,0 +1,28 @@
def format_chat_history(history):
"""
Format the chat history for LLM
Args:
history (list): The chat history list, each element contains role and content
Returns:
str: The formatted chat history string
"""
if not history:
return "No history"
formatted_history = []
for message in history:
role = "user" if message["role"] == "user" else "assistant"
content = message["content"]
# filter out the thinking content
if role == "assistant":
if (
content.startswith("- 🤔")
or content.startswith("- ➡️")
or content.startswith("- ⬅️")
):
continue
formatted_history.append(f"{role}: {content}")
return "\n".join(formatted_history)