streamlit example
This commit is contained in:
parent
cc63514f6d
commit
d5e58f0a45
|
|
@ -0,0 +1,34 @@
|
||||||
|
# PocketFlow Streamlit Human-in-the-Loop (HITL) Application
|
||||||
|
|
||||||
|
Minimal Human-in-the-Loop (HITL) web application using PocketFlow and Streamlit. Submit text, review processed output, and approve/reject.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- **Streamlit UI:** Simple, interactive interface for submitting tasks and providing feedback, built entirely in Python.
|
||||||
|
- **PocketFlow Workflow:** Manages distinct processing stages (initial processing, finalization) using synchronous PocketFlow `Flow`s.
|
||||||
|
- **Session State Management:** Utilizes Streamlit's `st.session_state` to manage the current stage of the workflow and to act as the `shared` data store for PocketFlow.
|
||||||
|
- **Iterative Feedback Loop:** Allows users to reject processed output and resubmit, facilitating refinement.
|
||||||
|
|
||||||
|
## How to Run
|
||||||
|
|
||||||
|
1. **Install Dependencies:**
|
||||||
|
```bash
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Run the Streamlit Application:**
|
||||||
|
```bash
|
||||||
|
streamlit run app.py
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Access the Web UI:**
|
||||||
|
Open the URL provided by Streamlit (usually `http://localhost:8501`).
|
||||||
|
|
||||||
|
## Files
|
||||||
|
|
||||||
|
- [`app.py`](./app.py): Main Streamlit application logic and UI.
|
||||||
|
- [`nodes.py`](./nodes.py): PocketFlow `Node` definitions.
|
||||||
|
- [`flows.py`](./flows.py): PocketFlow `Flow` construction.
|
||||||
|
- [`utils/process_task.py`](./utils/process_task.py): Simulated task processing utility.
|
||||||
|
- [`requirements.txt`](./requirements.txt): Project dependencies.
|
||||||
|
- [`README.md`](./README.md): This file.
|
||||||
|
|
@ -0,0 +1,156 @@
|
||||||
|
import streamlit as st
|
||||||
|
from flow import create_initial_processing_flow, create_finalization_flow
|
||||||
|
|
||||||
|
st.title("PocketFlow HITL with Streamlit")
|
||||||
|
|
||||||
|
# Initialize session state variables if they don't exist
|
||||||
|
if 'stage' not in st.session_state:
|
||||||
|
st.session_state.stage = "initial"
|
||||||
|
st.session_state.error_message = None
|
||||||
|
st.session_state.task_input = ""
|
||||||
|
# Flow-related data will be added directly as needed
|
||||||
|
print("Initialized session state.")
|
||||||
|
|
||||||
|
# --- Helper Function to Reset State ---
|
||||||
|
def reset_state():
|
||||||
|
# Keep essential Streamlit state keys if necessary, or clear selectively
|
||||||
|
keys_to_clear = [k for k in st.session_state.keys() if k not in ['stage', 'error_message', 'task_input']]
|
||||||
|
for key in keys_to_clear:
|
||||||
|
del st.session_state[key]
|
||||||
|
|
||||||
|
st.session_state.stage = "initial"
|
||||||
|
st.session_state.error_message = None
|
||||||
|
st.session_state.task_input = ""
|
||||||
|
print("Reset session state (keeping core stage/error keys).")
|
||||||
|
|
||||||
|
# --- Display Area for Shared Data (now the entire session state) ---
|
||||||
|
with st.expander("Show Session State (Shared Data)"):
|
||||||
|
# Convert to dict for clean JSON display, excluding internal Streamlit keys if desired
|
||||||
|
display_state = {k: v for k, v in st.session_state.items() if not k.startswith("_")}
|
||||||
|
st.json(display_state)
|
||||||
|
|
||||||
|
# --- Stage: Initial Input ---
|
||||||
|
if st.session_state.stage == "initial":
|
||||||
|
st.header("1. Submit Data for Processing")
|
||||||
|
# Use st.session_state.task_input directly for the text area's value
|
||||||
|
task_input_value = st.text_area("Enter data to process:", value=st.session_state.task_input, height=150)
|
||||||
|
|
||||||
|
if st.button("Submit"):
|
||||||
|
if not task_input_value.strip():
|
||||||
|
st.error("Please enter some data to process.")
|
||||||
|
else:
|
||||||
|
print(f"Submit button clicked. Input: '{task_input_value[:50]}...'")
|
||||||
|
# Store input directly in session state
|
||||||
|
st.session_state.task_input = task_input_value
|
||||||
|
st.session_state.error_message = None
|
||||||
|
# Clear previous results if any
|
||||||
|
if "processed_output" in st.session_state: del st.session_state.processed_output
|
||||||
|
if "final_result" in st.session_state: del st.session_state.final_result
|
||||||
|
if "input_used_by_process" in st.session_state: del st.session_state.input_used_by_process
|
||||||
|
|
||||||
|
try:
|
||||||
|
with st.spinner("Processing initial task..."):
|
||||||
|
initial_flow = create_initial_processing_flow()
|
||||||
|
# Pass the entire session state as shared data
|
||||||
|
initial_flow.run(st.session_state)
|
||||||
|
|
||||||
|
# Check if processing was successful (output exists directly in session state)
|
||||||
|
if "processed_output" in st.session_state:
|
||||||
|
st.session_state.stage = "awaiting_review"
|
||||||
|
print("Initial processing complete. Moving to 'awaiting_review' stage.")
|
||||||
|
st.rerun()
|
||||||
|
else:
|
||||||
|
st.session_state.error_message = "Processing failed to produce an output."
|
||||||
|
print("Error: Processing failed, no output found.")
|
||||||
|
# Keep stage as initial to allow retry/correction
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
st.session_state.error_message = f"An error occurred during initial processing: {e}"
|
||||||
|
print(f"Exception during initial processing: {e}")
|
||||||
|
# Keep stage as initial
|
||||||
|
|
||||||
|
# --- Stage: Awaiting Review ---
|
||||||
|
elif st.session_state.stage == "awaiting_review":
|
||||||
|
st.header("2. Review Processed Output")
|
||||||
|
# Get processed output directly from session state
|
||||||
|
processed_output = st.session_state.get("processed_output", "Error: Processed output not found!")
|
||||||
|
|
||||||
|
st.subheader("Output to Review:")
|
||||||
|
st.markdown(f"```\n{str(processed_output)}\n```") # Display as markdown code block
|
||||||
|
|
||||||
|
col1, col2, _ = st.columns([1, 1, 5]) # Layout buttons
|
||||||
|
with col1:
|
||||||
|
if st.button("Approve"):
|
||||||
|
print("Approve button clicked.")
|
||||||
|
st.session_state.error_message = None
|
||||||
|
try:
|
||||||
|
with st.spinner("Finalizing result..."):
|
||||||
|
finalization_flow = create_finalization_flow()
|
||||||
|
# Pass the entire session state
|
||||||
|
finalization_flow.run(st.session_state)
|
||||||
|
|
||||||
|
# Check for final result directly in session state
|
||||||
|
if "final_result" in st.session_state:
|
||||||
|
st.session_state.stage = "completed"
|
||||||
|
print("Approval processed. Moving to 'completed' stage.")
|
||||||
|
st.rerun()
|
||||||
|
else:
|
||||||
|
st.session_state.error_message = "Finalization failed to produce a result."
|
||||||
|
print("Error: Finalization failed, no final_result found.")
|
||||||
|
# Stay in review stage and show error.
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
st.session_state.error_message = f"An error occurred during finalization: {e}"
|
||||||
|
print(f"Exception during finalization: {e}")
|
||||||
|
# Stay in review stage
|
||||||
|
st.rerun() # Rerun to show error message
|
||||||
|
|
||||||
|
with col2:
|
||||||
|
if st.button("Reject"):
|
||||||
|
print("Reject button clicked.")
|
||||||
|
st.session_state.error_message = None # Clear previous errors
|
||||||
|
# Go back to initial stage to allow modification/resubmission
|
||||||
|
st.session_state.stage = "initial"
|
||||||
|
# Keep the rejected output visible in the input field for modification
|
||||||
|
st.session_state.task_input = st.session_state.get("processed_output", st.session_state.task_input)
|
||||||
|
# Clear the processed output so it doesn't linger
|
||||||
|
if "processed_output" in st.session_state: del st.session_state.processed_output
|
||||||
|
if "final_result" in st.session_state: del st.session_state.final_result
|
||||||
|
st.info("Task rejected. Modify the input below and resubmit.")
|
||||||
|
print("Task rejected. Moving back to 'initial' stage.")
|
||||||
|
st.rerun()
|
||||||
|
|
||||||
|
# --- Stage: Completed ---
|
||||||
|
elif st.session_state.stage == "completed":
|
||||||
|
st.header("3. Task Completed")
|
||||||
|
# Get final result directly from session state
|
||||||
|
final_result = st.session_state.get("final_result", "Error: Final result not found!")
|
||||||
|
st.subheader("Final Result:")
|
||||||
|
st.success("Task approved and completed successfully!")
|
||||||
|
st.text_area("", value=str(final_result), height=200, disabled=True)
|
||||||
|
|
||||||
|
if st.button("Start Over"):
|
||||||
|
print("Start Over button clicked.")
|
||||||
|
reset_state()
|
||||||
|
st.rerun()
|
||||||
|
|
||||||
|
# --- Stage: Rejected ---
|
||||||
|
elif st.session_state.stage == "rejected_final":
|
||||||
|
st.header("3. Task Rejected")
|
||||||
|
st.error("The processed output was rejected.")
|
||||||
|
# Get rejected output directly from session state
|
||||||
|
rejected_output = st.session_state.get("processed_output", "")
|
||||||
|
if rejected_output:
|
||||||
|
st.text_area("Rejected Output:", value=str(rejected_output), height=150, disabled=True)
|
||||||
|
|
||||||
|
if st.button("Start Over"):
|
||||||
|
print("Start Over button clicked.")
|
||||||
|
reset_state()
|
||||||
|
st.rerun()
|
||||||
|
|
||||||
|
# --- Display Error Messages ---
|
||||||
|
if st.session_state.error_message:
|
||||||
|
st.error(st.session_state.error_message)
|
||||||
|
|
||||||
|
# --- Add a button to reset state anytime (for debugging) ---
|
||||||
|
# st.sidebar.button("Reset State", on_click=reset_state) # Removed sidebar
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
from pocketflow import Flow
|
||||||
|
from nodes import InitialInputNode, ProcessDataNode, PrepareFinalResultNode
|
||||||
|
|
||||||
|
def create_initial_processing_flow():
|
||||||
|
"""Creates a flow for the initial data processing stage."""
|
||||||
|
initial_input_node = InitialInputNode()
|
||||||
|
process_data_node = ProcessDataNode()
|
||||||
|
|
||||||
|
# Define transitions: Input -> Process
|
||||||
|
initial_input_node >> process_data_node
|
||||||
|
|
||||||
|
# Create the Flow, starting with the input node
|
||||||
|
flow = Flow(start=initial_input_node)
|
||||||
|
print("Initial processing flow created.")
|
||||||
|
return flow
|
||||||
|
|
||||||
|
def create_finalization_flow():
|
||||||
|
"""Creates a flow to finalize the result after approval."""
|
||||||
|
prepare_final_result_node = PrepareFinalResultNode()
|
||||||
|
|
||||||
|
# This flow only has one node
|
||||||
|
flow = Flow(start=prepare_final_result_node)
|
||||||
|
print("Finalization flow created.")
|
||||||
|
return flow
|
||||||
|
|
@ -0,0 +1,57 @@
|
||||||
|
from pocketflow import Node
|
||||||
|
from utils.process_task import process_task
|
||||||
|
|
||||||
|
class InitialInputNode(Node):
|
||||||
|
"""Reads the initial task input from shared_data."""
|
||||||
|
def prep(self, shared):
|
||||||
|
print("InitialInputNode: Prep")
|
||||||
|
return shared.get("task_input", "Default Task Input")
|
||||||
|
|
||||||
|
def exec(self, prep_res):
|
||||||
|
print(f"InitialInputNode: Executing with input: '{prep_res[:50]}...'")
|
||||||
|
# No real computation needed here, just passing the input along
|
||||||
|
return prep_res
|
||||||
|
|
||||||
|
def post(self, shared, prep_res, exec_res):
|
||||||
|
# Ensure the input used is stored, although it might already be there
|
||||||
|
shared["input_used_by_process"] = exec_res
|
||||||
|
print(f"InitialInputNode: Post - Stored input '{exec_res[:50]}...' in shared_data.")
|
||||||
|
return "default" # Proceed to next node in the flow
|
||||||
|
|
||||||
|
class ProcessDataNode(Node):
|
||||||
|
"""Processes the data using the utility function."""
|
||||||
|
def prep(self, shared):
|
||||||
|
task_input = shared.get("input_used_by_process", "No input found")
|
||||||
|
print(f"ProcessDataNode: Prep - Input: '{task_input[:50]}...'")
|
||||||
|
return task_input
|
||||||
|
|
||||||
|
def exec(self, prep_res):
|
||||||
|
print("ProcessDataNode: Exec - Calling process_task utility")
|
||||||
|
# Call the actual processing logic
|
||||||
|
processed_output = process_task(prep_res)
|
||||||
|
return processed_output
|
||||||
|
|
||||||
|
def post(self, shared, prep_res, exec_res):
|
||||||
|
# Store the result for review
|
||||||
|
shared["processed_output"] = exec_res
|
||||||
|
print(f"ProcessDataNode: Post - Stored processed output: '{str(exec_res)[:50]}...'")
|
||||||
|
# This node ends the initial processing subflow
|
||||||
|
return None
|
||||||
|
|
||||||
|
class PrepareFinalResultNode(Node):
|
||||||
|
"""Takes the approved processed output and sets it as the final result."""
|
||||||
|
def prep(self, shared):
|
||||||
|
approved_output = shared.get("processed_output", "No processed output found")
|
||||||
|
print(f"PrepareFinalResultNode: Prep - Approved output: '{str(approved_output)[:50]}...'")
|
||||||
|
return approved_output
|
||||||
|
|
||||||
|
def exec(self, prep_res):
|
||||||
|
print("PrepareFinalResultNode: Exec - Finalizing result.")
|
||||||
|
# Could potentially do final formatting here if needed
|
||||||
|
return prep_res
|
||||||
|
|
||||||
|
def post(self, shared, prep_res, exec_res):
|
||||||
|
shared["final_result"] = exec_res
|
||||||
|
print(f"PrepareFinalResultNode: Post - Stored final result: '{str(exec_res)[:50]}...'")
|
||||||
|
# This node ends the finalization subflow
|
||||||
|
return None
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
streamlit
|
||||||
|
pocketflow
|
||||||
|
openai
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
import time
|
||||||
|
|
||||||
|
def process_task(task_input: str) -> str:
|
||||||
|
"""
|
||||||
|
Simulates processing the input, potentially calling an LLM.
|
||||||
|
Replace this with your actual task logic.
|
||||||
|
"""
|
||||||
|
print(f"Processing task: {task_input[:50]}...")
|
||||||
|
|
||||||
|
result = f"Rephrased text for the following input: {task_input}"
|
||||||
|
|
||||||
|
# Simulate some work
|
||||||
|
time.sleep(2)
|
||||||
|
return result
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_input = "This is a test input for the processing task."
|
||||||
|
print(f"Input: {test_input}")
|
||||||
|
output = process_task(test_input)
|
||||||
|
print(f"Output: {output}")
|
||||||
|
|
||||||
|
# We don't need a separate utils/call_llm.py for this minimal example,
|
||||||
|
# but you would add it here if ProcessNode used an LLM.
|
||||||
|
|
||||||
Loading…
Reference in New Issue