commit
2334db7e71
|
|
@ -0,0 +1,94 @@
|
|||
# PocketFlow Gradio HITL Example
|
||||
|
||||
A web-based application that demonstrates Human-in-the-Loop (HITL) workflow orchestration using PocketFlow and Gradio. This example provides an interactive interface for users to engage with AI-powered tasks while maintaining human oversight and feedback.
|
||||
|
||||
## Features
|
||||
|
||||
- **Web-based Interface**: Built with Gradio for an accessible and user-friendly experience
|
||||
- **Human-in-the-Loop Integration**: Seamless integration of human feedback into the AI workflow
|
||||
- **Modern UI**: Clean and intuitive interface for better user interaction
|
||||
- **Powered by LLMs**: Utilizes OpenAI's models for intelligent task processing
|
||||
- **Flow Visualization**: Real-time visualization of node execution sequence and workflow progress
|
||||
- **Interactive Debugging**: Monitor and understand the decision-making process through visual feedback
|
||||
|
||||
## Getting Started
|
||||
|
||||
This project is part of the PocketFlow cookbook examples. It's assumed you have already cloned the [PocketFlow repository](https://github.com/the-pocket/PocketFlow) and are in the `cookbook/pocketflow-gradio-hitl` directory.
|
||||
|
||||
1. **Install required dependencies**:
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. **Set up your OpenAI API key**:
|
||||
The application uses OpenAI models for processing. You need to set your API key as an environment variable:
|
||||
```bash
|
||||
export OPENAI_API_KEY="your-openai-api-key-here"
|
||||
```
|
||||
|
||||
3. **Run the Application**:
|
||||
```bash
|
||||
python main.py
|
||||
```
|
||||
This will start the Gradio web interface, typically accessible at `http://localhost:7860`
|
||||
|
||||
## How It Works
|
||||
|
||||
The system implements a PocketFlow workflow with a web interface:
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
DecideAction[Decide Action Node] --> |"check-weather"| CheckWeather[Check Weather Node]
|
||||
CheckWeather --> DecideAction
|
||||
DecideAction --> |"book-hotel"| BookHotel[Book Hotel Node]
|
||||
BookHotel --> DecideAction
|
||||
DecideAction --> |"follow-up"| FollowUp[Follow Up Node]
|
||||
DecideAction --> |"result-notification"| ResultNotification[Result Notification Node]
|
||||
```
|
||||
|
||||
The workflow consists of the following nodes:
|
||||
|
||||
1. **Decide Action Node**: The central decision-making node that determines the next action based on user input and context
|
||||
2. **Check Weather Node**: Provides weather information for specified cities and dates
|
||||
3. **Book Hotel Node**: Handles hotel reservation requests with check-in and check-out dates
|
||||
4. **Follow Up Node**: Manages user interactions by asking clarifying questions or handling out-of-scope requests
|
||||
5. **Result Notification Node**: Delivers action results and offers additional assistance
|
||||
|
||||
The flow is orchestrated through a series of directed connections:
|
||||
- The Decide Action node can trigger weather checks, hotel bookings, follow-ups, or result notifications
|
||||
- Weather checks and hotel bookings can feed back to the Decide Action node for further processing
|
||||
- Follow-up and result notification nodes provide the final steps in the workflow
|
||||
|
||||
### Flow Visualization
|
||||
|
||||
The application provides real-time visualization of the workflow execution:
|
||||
- The sequence of node activations is displayed chronologically
|
||||
- Users can see which decision paths are being taken
|
||||
- The visualization helps in understanding the AI's decision-making process
|
||||
|
||||

|
||||
|
||||
## Sample Output
|
||||
|
||||
Here's an example of book hotel:
|
||||
|
||||

|
||||
|
||||
Here's an example of changing intention mid-conversation:
|
||||
|
||||

|
||||
|
||||
## Files
|
||||
|
||||
- [`main.py`](./main.py): Entry point for the application and Gradio interface setup
|
||||
- [`flow.py`](./flow.py): Defines the PocketFlow graph and node connections
|
||||
- [`nodes.py`](./nodes.py): Contains the node definitions for the workflow
|
||||
- [`utils/`](./utils/): Contains utility functions and helper modules
|
||||
- [`requirements.txt`](./requirements.txt): Lists project dependencies
|
||||
|
||||
## Requirements
|
||||
|
||||
- Python 3.8+
|
||||
- PocketFlow >= 0.0.2
|
||||
- Gradio >= 5.29.1
|
||||
- OpenAI >= 1.78.1
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 131 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 136 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 105 KiB |
|
|
@ -0,0 +1,29 @@
|
|||
from pocketflow import Flow
|
||||
|
||||
from nodes import (
|
||||
DecideAction,
|
||||
CheckWeather,
|
||||
BookHotel,
|
||||
FollowUp,
|
||||
ResultNotification,
|
||||
)
|
||||
|
||||
|
||||
def create_flow():
|
||||
"""
|
||||
Create and connect the nodes to form a complete agent flow.
|
||||
"""
|
||||
decide_action = DecideAction()
|
||||
check_weather = CheckWeather()
|
||||
book_hotel = BookHotel()
|
||||
follow_up = FollowUp()
|
||||
result_notification = ResultNotification()
|
||||
|
||||
decide_action - "check-weather" >> check_weather
|
||||
check_weather >> decide_action
|
||||
decide_action - "book-hotel" >> book_hotel
|
||||
book_hotel >> decide_action
|
||||
decide_action - "follow-up" >> follow_up
|
||||
decide_action - "result-notification" >> result_notification
|
||||
|
||||
return Flow(start=decide_action)
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
import time
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from queue import Queue
|
||||
|
||||
import gradio as gr
|
||||
from gradio import ChatMessage
|
||||
|
||||
from flow import create_flow
|
||||
|
||||
# create global thread pool
|
||||
chatflow_thread_pool = ThreadPoolExecutor(
|
||||
max_workers=5,
|
||||
thread_name_prefix="chatflow_worker",
|
||||
)
|
||||
|
||||
|
||||
def chat_fn(message, history, uuid):
|
||||
"""
|
||||
Main chat function that handles the conversation flow and message processing.
|
||||
|
||||
Args:
|
||||
message (str): The current user message
|
||||
history (list): Previous conversation history
|
||||
uuid (UUID): Unique identifier for the conversation
|
||||
|
||||
Yields:
|
||||
ChatMessage: Streams of thought process and chat responses
|
||||
"""
|
||||
# Log conversation details
|
||||
print(f"Conversation ID: {str(uuid)}\nHistory: {history}\nQuery: {message}\n---")
|
||||
|
||||
# Initialize queues for chat messages and flow thoughts
|
||||
chat_queue = Queue()
|
||||
flow_queue = Queue()
|
||||
|
||||
# Create shared context for the flow
|
||||
shared = {
|
||||
"conversation_id": str(uuid),
|
||||
"query": message,
|
||||
"history": history,
|
||||
"queue": chat_queue,
|
||||
"flow_queue": flow_queue,
|
||||
}
|
||||
|
||||
# Create and run the chat flow in a separate thread
|
||||
chat_flow = create_flow()
|
||||
chatflow_thread_pool.submit(chat_flow.run, shared)
|
||||
|
||||
# Initialize thought response tracking
|
||||
start_time = time.time()
|
||||
thought_response = ChatMessage(
|
||||
content="", metadata={"title": "Flow Log", "id": 0, "status": "pending"}
|
||||
)
|
||||
yield thought_response
|
||||
|
||||
# Process and accumulate thoughts from the flow queue
|
||||
accumulated_thoughts = ""
|
||||
while True:
|
||||
thought = flow_queue.get()
|
||||
if thought is None:
|
||||
break
|
||||
accumulated_thoughts += f"- {thought}\n\n"
|
||||
thought_response.content = accumulated_thoughts.strip()
|
||||
yield thought_response
|
||||
flow_queue.task_done()
|
||||
|
||||
# Mark thought processing as complete and record duration
|
||||
thought_response.metadata["status"] = "done"
|
||||
thought_response.metadata["duration"] = time.time() - start_time
|
||||
yield thought_response
|
||||
|
||||
# Process and yield chat messages from the chat queue
|
||||
while True:
|
||||
msg = chat_queue.get()
|
||||
if msg is None:
|
||||
break
|
||||
chat_response = [thought_response, ChatMessage(content=msg)]
|
||||
yield chat_response
|
||||
chat_queue.task_done()
|
||||
|
||||
|
||||
def clear_fn():
|
||||
print("Clearing conversation")
|
||||
return uuid.uuid4()
|
||||
|
||||
|
||||
with gr.Blocks(fill_height=True, theme="ocean") as demo:
|
||||
uuid_state = gr.State(uuid.uuid4())
|
||||
demo.load(clear_fn, outputs=[uuid_state])
|
||||
|
||||
chatbot = gr.Chatbot(type="messages", scale=1)
|
||||
chatbot.clear(clear_fn, outputs=[uuid_state])
|
||||
|
||||
gr.ChatInterface(
|
||||
fn=chat_fn,
|
||||
type="messages",
|
||||
additional_inputs=[uuid_state],
|
||||
chatbot=chatbot,
|
||||
title="PocketFlow Gradio Demo",
|
||||
)
|
||||
|
||||
|
||||
demo.launch()
|
||||
|
|
@ -0,0 +1,241 @@
|
|||
from datetime import datetime
|
||||
from queue import Queue
|
||||
|
||||
import yaml
|
||||
from pocketflow import Node
|
||||
|
||||
from utils.call_llm import call_llm
|
||||
from utils.call_mock_api import call_book_hotel_api, call_check_weather_api
|
||||
from utils.conversation import load_conversation, save_conversation
|
||||
from utils.format_chat_history import format_chat_history
|
||||
|
||||
|
||||
class DecideAction(Node):
|
||||
def prep(self, shared):
|
||||
conversation_id = shared["conversation_id"]
|
||||
session = load_conversation(conversation_id)
|
||||
return session, shared["history"], shared["query"]
|
||||
|
||||
def exec(self, prep_res):
|
||||
session, history, query = prep_res
|
||||
prompt = f"""
|
||||
### INSTRUCTIONS
|
||||
You are a lifestyle assistant capable of helping users book hotels and check weather conditions.
|
||||
You need to decide the next action based on your last action, action execution result, chat history, and current user question.
|
||||
|
||||
### CHAT HISTORY
|
||||
{format_chat_history(history)}
|
||||
|
||||
### CURRENT USER QUESTION
|
||||
user: {query}
|
||||
|
||||
### CONTEXT
|
||||
Last Action: {session.get("last_action", None)}
|
||||
Last Action Result: {session.get("action_result", None)}
|
||||
Current Date: {datetime.now().date()}
|
||||
|
||||
### ACTION SPACE
|
||||
[1] check-weather
|
||||
Description: When the user asks about the weather, use this tool.
|
||||
Parameters:
|
||||
- name: city
|
||||
description: The city to check the weather
|
||||
required: true
|
||||
example: Beijing
|
||||
- name: date
|
||||
description: The date to check the weather, if not provided, use the current date
|
||||
required: false
|
||||
example: 2025-05-28
|
||||
|
||||
[2] book-hotel
|
||||
Description: When the user wants to book a hotel, use this tool.
|
||||
Parameters:
|
||||
- name: hotel
|
||||
description: The name of the hotel to be booked
|
||||
required: true
|
||||
example: ShanghaiHilton Hotel
|
||||
- name: checkin_date
|
||||
description: The check-in date
|
||||
required: true
|
||||
example: 2025-05-28
|
||||
- name: checkout_date
|
||||
description: The check-out date
|
||||
required: true
|
||||
example: 2025-05-29
|
||||
|
||||
[3] follow-up
|
||||
Description: 1. When the user's question is out of the scope of booking hotels and checking weather, use this tool to guide the user; 2. When the current information cannot meet the parameter requirements of the corresponding tool, use this tool to ask the user.
|
||||
Parameters:
|
||||
- name: question
|
||||
description: Your guidance or follow-up to the user, maintain an enthusiastic and lively language style, and use the same language as the user's question.
|
||||
required: true
|
||||
example: Which hotel would you like to book?😊
|
||||
|
||||
[4] result-notification
|
||||
Description: When the booking of a hotel or checking the weather is completed, use this tool to notify the user of the result and ask if they need any other help. If you find that the user's question is not completed in the history conversation, you can guide the user to complete the intention in the last step.
|
||||
Parameters:
|
||||
- name: result
|
||||
description: Notify the user of the result based on the Last Action Result. Maintain an enthusiastic and lively language style, and use the same language as the user's question.
|
||||
required: true
|
||||
example: The hotel has been successfully booked for you. 😉\n\nThe check-in date is XX, and the check-out date is XX. Thank you for using it. Would you like any other help?😀
|
||||
|
||||
## NEXT ACTION
|
||||
Decide the next action based on the context and available actions.
|
||||
Return your response in this format:
|
||||
|
||||
```yaml
|
||||
thinking: |
|
||||
<your step-by-step reasoning process>
|
||||
action: check-weather OR book-hotel OR follow-up OR result-notification
|
||||
reason: <why you chose this action>
|
||||
question: <if action is follow-up>
|
||||
city: <if action is check-weather>
|
||||
hotel: <if action is book-hotel>
|
||||
checkin_date: <if action is book-hotel>
|
||||
checkout_date: <if action is book-hotel>
|
||||
result: <if action is result-notification>
|
||||
```
|
||||
|
||||
IMPORTANT: Make sure to:
|
||||
1. Use proper indentation (4 spaces) for all multi-line fields
|
||||
2. Use the | character for multi-line text fields
|
||||
3. Keep single-line fields without the | character
|
||||
"""
|
||||
|
||||
response = call_llm(prompt.strip())
|
||||
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
|
||||
print(f"🤖 Agent response: \n{yaml_str}")
|
||||
decision = yaml.safe_load(yaml_str)
|
||||
return decision
|
||||
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
"""Save the decision and determine the next step in the flow."""
|
||||
# If LLM decided to search, save the search query
|
||||
session["last_action"] = exec_res["action"]
|
||||
flow_log: Queue = shared["flow_queue"]
|
||||
|
||||
for line in exec_res["thinking"].split("\n"):
|
||||
line = line.replace("-", "").strip()
|
||||
if line:
|
||||
flow_log.put(f"🤔 {line}")
|
||||
|
||||
if exec_res["action"] == "check-weather":
|
||||
session["check_weather_params"] = {
|
||||
"city": exec_res["city"],
|
||||
"date": exec_res.get("date", None),
|
||||
}
|
||||
flow_log.put(f"➡️ Agent decided to check weather for: {exec_res['city']}")
|
||||
elif exec_res["action"] == "book-hotel":
|
||||
session["book_hotel_params"] = {
|
||||
"hotel": exec_res["hotel"],
|
||||
"checkin_date": exec_res["checkin_date"],
|
||||
"checkout_date": exec_res["checkout_date"],
|
||||
}
|
||||
flow_log.put(f"➡️ Agent decided to book hotel: {exec_res['hotel']}")
|
||||
elif exec_res["action"] == "follow-up":
|
||||
session["follow_up_params"] = {"question": exec_res["question"]}
|
||||
flow_log.put(f"➡️ Agent decided to follow up: {exec_res['question']}")
|
||||
elif exec_res["action"] == "result-notification":
|
||||
session["result_notification_params"] = {"result": exec_res["result"]}
|
||||
flow_log.put(f"➡️ Agent decided to notify the result: {exec_res['result']}")
|
||||
save_conversation(conversation_id, session)
|
||||
# Return the action to determine the next node in the flow
|
||||
return exec_res["action"]
|
||||
|
||||
|
||||
class CheckWeather(Node):
|
||||
def prep(self, shared):
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
city = session["check_weather_params"]["city"]
|
||||
date = session["check_weather_params"].get("date", None)
|
||||
return city, date
|
||||
|
||||
def exec(self, prep_res):
|
||||
city, date = prep_res
|
||||
return call_check_weather_api(city, date)
|
||||
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
flow_log: Queue = shared["flow_queue"]
|
||||
flow_log.put(f"⬅️ Check weather result: {exec_res}")
|
||||
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
session["action_result"] = exec_res
|
||||
save_conversation(conversation_id, session)
|
||||
return "default"
|
||||
|
||||
|
||||
class BookHotel(Node):
|
||||
def prep(self, shared):
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
|
||||
hotel = session["book_hotel_params"]["hotel"]
|
||||
checkin_date = session["book_hotel_params"]["checkin_date"]
|
||||
checkout_date = session["book_hotel_params"]["checkout_date"]
|
||||
return hotel, checkin_date, checkout_date
|
||||
|
||||
def exec(self, prep_res):
|
||||
hotel, checkin_date, checkout_date = prep_res
|
||||
return call_book_hotel_api(hotel, checkin_date, checkout_date)
|
||||
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
flow_log: Queue = shared["flow_queue"]
|
||||
flow_log.put(f"⬅️ Book hotel result: {exec_res}")
|
||||
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
session["action_result"] = exec_res
|
||||
save_conversation(conversation_id, session)
|
||||
return "default"
|
||||
|
||||
|
||||
class FollowUp(Node):
|
||||
def prep(self, shared):
|
||||
flow_log: Queue = shared["flow_queue"]
|
||||
flow_log.put(None)
|
||||
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
question = session["follow_up_params"]["question"]
|
||||
return question, shared["queue"]
|
||||
|
||||
def exec(self, prep_res):
|
||||
question, queue = prep_res
|
||||
queue.put(question)
|
||||
queue.put(None)
|
||||
return question
|
||||
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
session["action_result"] = exec_res
|
||||
return "done"
|
||||
|
||||
|
||||
class ResultNotification(Node):
|
||||
def prep(self, shared):
|
||||
flow_log: Queue = shared["flow_queue"]
|
||||
flow_log.put(None)
|
||||
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
result = session["result_notification_params"]["result"]
|
||||
return result, shared["queue"]
|
||||
|
||||
def exec(self, prep_res):
|
||||
result, queue = prep_res
|
||||
queue.put(result)
|
||||
queue.put(None)
|
||||
return result
|
||||
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
conversation_id = shared["conversation_id"]
|
||||
session: dict = load_conversation(conversation_id)
|
||||
session["action_result"] = None
|
||||
session["last_action"] = None
|
||||
save_conversation(conversation_id, session)
|
||||
return "done"
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
pocketflow>=0.0.2
|
||||
gradio>=5.29.1
|
||||
openai>=1.78.1
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
import os
|
||||
|
||||
from openai import OpenAI
|
||||
from openai.types.chat.chat_completion import ChatCompletion
|
||||
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
base_url = "https://api.openai.com/v1"
|
||||
model = "gpt-4o"
|
||||
|
||||
|
||||
def call_llm(message: str):
|
||||
print(f"Calling LLM with message: \n{message}")
|
||||
client = OpenAI(api_key=api_key, base_url=base_url)
|
||||
response: ChatCompletion = client.chat.completions.create(
|
||||
model=model, messages=[{"role": "user", "content": message}]
|
||||
)
|
||||
return response.choices[0].message.content
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(call_llm("Hello, how are you?"))
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
import random
|
||||
from datetime import date, datetime
|
||||
|
||||
|
||||
def call_check_weather_api(city: str, date: date | None):
|
||||
if date is None:
|
||||
date = datetime.now().date()
|
||||
|
||||
current_date = datetime.now().date()
|
||||
|
||||
# calculate date difference
|
||||
date_diff = (date - current_date).days
|
||||
|
||||
# check if the date is within the allowed range
|
||||
if abs(date_diff) > 7:
|
||||
return f"Failed to check weather: Date {date} is more than 7 days away from current date."
|
||||
|
||||
return f"The weather in {city} on {date} is {random.choice(['sunny', 'cloudy', 'rainy', 'snowy'])}, and the temperature is {random.randint(10, 30)}°C."
|
||||
|
||||
|
||||
def call_book_hotel_api(hotel: str, checkin_date: date, checkout_date: date):
|
||||
current_date = datetime.now().date()
|
||||
|
||||
# check if the checkin date is after the current date
|
||||
if checkin_date <= current_date:
|
||||
return (
|
||||
f"Failed to book hotel {hotel}: Check-in date must be after current date."
|
||||
)
|
||||
|
||||
# check if the checkin date is before the checkout date
|
||||
if checkin_date >= checkout_date:
|
||||
return f"Failed to book hotel {hotel}, because the checkin date is after the checkout date."
|
||||
|
||||
# check if the date difference is more than 7 days
|
||||
date_diff = (checkout_date - checkin_date).days
|
||||
if date_diff > 7:
|
||||
return f"Failed to book hotel {hotel}: Stay duration cannot exceed 7 days."
|
||||
|
||||
return f"Booked hotel {hotel} from {checkin_date.strftime('%Y-%m-%d')} to {checkout_date.strftime('%Y-%m-%d')} successfully."
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
conversation_cache = {}
|
||||
|
||||
|
||||
def load_conversation(conversation_id: str):
|
||||
print(f"Loading conversation {conversation_id}")
|
||||
return conversation_cache.get(conversation_id, {})
|
||||
|
||||
|
||||
def save_conversation(conversation_id: str, session: dict):
|
||||
print(f"Saving conversation {session}")
|
||||
conversation_cache[conversation_id] = session
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
def format_chat_history(history):
|
||||
"""
|
||||
Format the chat history for LLM
|
||||
|
||||
Args:
|
||||
history (list): The chat history list, each element contains role and content
|
||||
|
||||
Returns:
|
||||
str: The formatted chat history string
|
||||
"""
|
||||
if not history:
|
||||
return "No history"
|
||||
|
||||
formatted_history = []
|
||||
for message in history:
|
||||
role = "user" if message["role"] == "user" else "assistant"
|
||||
content = message["content"]
|
||||
# filter out the thinking content
|
||||
if role == "assistant":
|
||||
if (
|
||||
content.startswith("- 🤔")
|
||||
or content.startswith("- ➡️")
|
||||
or content.startswith("- ⬅️")
|
||||
):
|
||||
continue
|
||||
formatted_history.append(f"{role}: {content}")
|
||||
|
||||
return "\n".join(formatted_history)
|
||||
Loading…
Reference in New Issue