From d5e58f0a45eaafe55cc88c5abd9a3cb8fbdd6564 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Tue, 13 May 2025 00:07:47 -0400 Subject: [PATCH] streamlit example --- cookbook/pocketflow-streamlit-hitl/README.md | 34 ++++ cookbook/pocketflow-streamlit-hitl/app.py | 156 ++++++++++++++++++ cookbook/pocketflow-streamlit-hitl/flow.py | 24 +++ cookbook/pocketflow-streamlit-hitl/nodes.py | 57 +++++++ .../requirements.txt | 3 + .../utils/__init__.py | 0 .../utils/process_task.py | 24 +++ 7 files changed, 298 insertions(+) create mode 100644 cookbook/pocketflow-streamlit-hitl/README.md create mode 100644 cookbook/pocketflow-streamlit-hitl/app.py create mode 100644 cookbook/pocketflow-streamlit-hitl/flow.py create mode 100644 cookbook/pocketflow-streamlit-hitl/nodes.py create mode 100644 cookbook/pocketflow-streamlit-hitl/requirements.txt create mode 100644 cookbook/pocketflow-streamlit-hitl/utils/__init__.py create mode 100644 cookbook/pocketflow-streamlit-hitl/utils/process_task.py diff --git a/cookbook/pocketflow-streamlit-hitl/README.md b/cookbook/pocketflow-streamlit-hitl/README.md new file mode 100644 index 0000000..53b4c7a --- /dev/null +++ b/cookbook/pocketflow-streamlit-hitl/README.md @@ -0,0 +1,34 @@ +# PocketFlow Streamlit Human-in-the-Loop (HITL) Application + +Minimal Human-in-the-Loop (HITL) web application using PocketFlow and Streamlit. Submit text, review processed output, and approve/reject. + +## Features + +- **Streamlit UI:** Simple, interactive interface for submitting tasks and providing feedback, built entirely in Python. +- **PocketFlow Workflow:** Manages distinct processing stages (initial processing, finalization) using synchronous PocketFlow `Flow`s. +- **Session State Management:** Utilizes Streamlit's `st.session_state` to manage the current stage of the workflow and to act as the `shared` data store for PocketFlow. +- **Iterative Feedback Loop:** Allows users to reject processed output and resubmit, facilitating refinement. + +## How to Run + +1. **Install Dependencies:** + ```bash + pip install -r requirements.txt + ``` + +2. **Run the Streamlit Application:** + ```bash + streamlit run app.py + ``` + +3. **Access the Web UI:** + Open the URL provided by Streamlit (usually `http://localhost:8501`). + +## Files + +- [`app.py`](./app.py): Main Streamlit application logic and UI. +- [`nodes.py`](./nodes.py): PocketFlow `Node` definitions. +- [`flows.py`](./flows.py): PocketFlow `Flow` construction. +- [`utils/process_task.py`](./utils/process_task.py): Simulated task processing utility. +- [`requirements.txt`](./requirements.txt): Project dependencies. +- [`README.md`](./README.md): This file. diff --git a/cookbook/pocketflow-streamlit-hitl/app.py b/cookbook/pocketflow-streamlit-hitl/app.py new file mode 100644 index 0000000..0e82097 --- /dev/null +++ b/cookbook/pocketflow-streamlit-hitl/app.py @@ -0,0 +1,156 @@ +import streamlit as st +from flow import create_initial_processing_flow, create_finalization_flow + +st.title("PocketFlow HITL with Streamlit") + +# Initialize session state variables if they don't exist +if 'stage' not in st.session_state: + st.session_state.stage = "initial" + st.session_state.error_message = None + st.session_state.task_input = "" + # Flow-related data will be added directly as needed + print("Initialized session state.") + +# --- Helper Function to Reset State --- +def reset_state(): + # Keep essential Streamlit state keys if necessary, or clear selectively + keys_to_clear = [k for k in st.session_state.keys() if k not in ['stage', 'error_message', 'task_input']] + for key in keys_to_clear: + del st.session_state[key] + + st.session_state.stage = "initial" + st.session_state.error_message = None + st.session_state.task_input = "" + print("Reset session state (keeping core stage/error keys).") + +# --- Display Area for Shared Data (now the entire session state) --- +with st.expander("Show Session State (Shared Data)"): + # Convert to dict for clean JSON display, excluding internal Streamlit keys if desired + display_state = {k: v for k, v in st.session_state.items() if not k.startswith("_")} + st.json(display_state) + +# --- Stage: Initial Input --- +if st.session_state.stage == "initial": + st.header("1. Submit Data for Processing") + # Use st.session_state.task_input directly for the text area's value + task_input_value = st.text_area("Enter data to process:", value=st.session_state.task_input, height=150) + + if st.button("Submit"): + if not task_input_value.strip(): + st.error("Please enter some data to process.") + else: + print(f"Submit button clicked. Input: '{task_input_value[:50]}...'") + # Store input directly in session state + st.session_state.task_input = task_input_value + st.session_state.error_message = None + # Clear previous results if any + if "processed_output" in st.session_state: del st.session_state.processed_output + if "final_result" in st.session_state: del st.session_state.final_result + if "input_used_by_process" in st.session_state: del st.session_state.input_used_by_process + + try: + with st.spinner("Processing initial task..."): + initial_flow = create_initial_processing_flow() + # Pass the entire session state as shared data + initial_flow.run(st.session_state) + + # Check if processing was successful (output exists directly in session state) + if "processed_output" in st.session_state: + st.session_state.stage = "awaiting_review" + print("Initial processing complete. Moving to 'awaiting_review' stage.") + st.rerun() + else: + st.session_state.error_message = "Processing failed to produce an output." + print("Error: Processing failed, no output found.") + # Keep stage as initial to allow retry/correction + + except Exception as e: + st.session_state.error_message = f"An error occurred during initial processing: {e}" + print(f"Exception during initial processing: {e}") + # Keep stage as initial + +# --- Stage: Awaiting Review --- +elif st.session_state.stage == "awaiting_review": + st.header("2. Review Processed Output") + # Get processed output directly from session state + processed_output = st.session_state.get("processed_output", "Error: Processed output not found!") + + st.subheader("Output to Review:") + st.markdown(f"```\n{str(processed_output)}\n```") # Display as markdown code block + + col1, col2, _ = st.columns([1, 1, 5]) # Layout buttons + with col1: + if st.button("Approve"): + print("Approve button clicked.") + st.session_state.error_message = None + try: + with st.spinner("Finalizing result..."): + finalization_flow = create_finalization_flow() + # Pass the entire session state + finalization_flow.run(st.session_state) + + # Check for final result directly in session state + if "final_result" in st.session_state: + st.session_state.stage = "completed" + print("Approval processed. Moving to 'completed' stage.") + st.rerun() + else: + st.session_state.error_message = "Finalization failed to produce a result." + print("Error: Finalization failed, no final_result found.") + # Stay in review stage and show error. + + except Exception as e: + st.session_state.error_message = f"An error occurred during finalization: {e}" + print(f"Exception during finalization: {e}") + # Stay in review stage + st.rerun() # Rerun to show error message + + with col2: + if st.button("Reject"): + print("Reject button clicked.") + st.session_state.error_message = None # Clear previous errors + # Go back to initial stage to allow modification/resubmission + st.session_state.stage = "initial" + # Keep the rejected output visible in the input field for modification + st.session_state.task_input = st.session_state.get("processed_output", st.session_state.task_input) + # Clear the processed output so it doesn't linger + if "processed_output" in st.session_state: del st.session_state.processed_output + if "final_result" in st.session_state: del st.session_state.final_result + st.info("Task rejected. Modify the input below and resubmit.") + print("Task rejected. Moving back to 'initial' stage.") + st.rerun() + +# --- Stage: Completed --- +elif st.session_state.stage == "completed": + st.header("3. Task Completed") + # Get final result directly from session state + final_result = st.session_state.get("final_result", "Error: Final result not found!") + st.subheader("Final Result:") + st.success("Task approved and completed successfully!") + st.text_area("", value=str(final_result), height=200, disabled=True) + + if st.button("Start Over"): + print("Start Over button clicked.") + reset_state() + st.rerun() + +# --- Stage: Rejected --- +elif st.session_state.stage == "rejected_final": + st.header("3. Task Rejected") + st.error("The processed output was rejected.") + # Get rejected output directly from session state + rejected_output = st.session_state.get("processed_output", "") + if rejected_output: + st.text_area("Rejected Output:", value=str(rejected_output), height=150, disabled=True) + + if st.button("Start Over"): + print("Start Over button clicked.") + reset_state() + st.rerun() + +# --- Display Error Messages --- +if st.session_state.error_message: + st.error(st.session_state.error_message) + +# --- Add a button to reset state anytime (for debugging) --- +# st.sidebar.button("Reset State", on_click=reset_state) # Removed sidebar diff --git a/cookbook/pocketflow-streamlit-hitl/flow.py b/cookbook/pocketflow-streamlit-hitl/flow.py new file mode 100644 index 0000000..d204c28 --- /dev/null +++ b/cookbook/pocketflow-streamlit-hitl/flow.py @@ -0,0 +1,24 @@ +from pocketflow import Flow +from nodes import InitialInputNode, ProcessDataNode, PrepareFinalResultNode + +def create_initial_processing_flow(): + """Creates a flow for the initial data processing stage.""" + initial_input_node = InitialInputNode() + process_data_node = ProcessDataNode() + + # Define transitions: Input -> Process + initial_input_node >> process_data_node + + # Create the Flow, starting with the input node + flow = Flow(start=initial_input_node) + print("Initial processing flow created.") + return flow + +def create_finalization_flow(): + """Creates a flow to finalize the result after approval.""" + prepare_final_result_node = PrepareFinalResultNode() + + # This flow only has one node + flow = Flow(start=prepare_final_result_node) + print("Finalization flow created.") + return flow diff --git a/cookbook/pocketflow-streamlit-hitl/nodes.py b/cookbook/pocketflow-streamlit-hitl/nodes.py new file mode 100644 index 0000000..5a265be --- /dev/null +++ b/cookbook/pocketflow-streamlit-hitl/nodes.py @@ -0,0 +1,57 @@ +from pocketflow import Node +from utils.process_task import process_task + +class InitialInputNode(Node): + """Reads the initial task input from shared_data.""" + def prep(self, shared): + print("InitialInputNode: Prep") + return shared.get("task_input", "Default Task Input") + + def exec(self, prep_res): + print(f"InitialInputNode: Executing with input: '{prep_res[:50]}...'") + # No real computation needed here, just passing the input along + return prep_res + + def post(self, shared, prep_res, exec_res): + # Ensure the input used is stored, although it might already be there + shared["input_used_by_process"] = exec_res + print(f"InitialInputNode: Post - Stored input '{exec_res[:50]}...' in shared_data.") + return "default" # Proceed to next node in the flow + +class ProcessDataNode(Node): + """Processes the data using the utility function.""" + def prep(self, shared): + task_input = shared.get("input_used_by_process", "No input found") + print(f"ProcessDataNode: Prep - Input: '{task_input[:50]}...'") + return task_input + + def exec(self, prep_res): + print("ProcessDataNode: Exec - Calling process_task utility") + # Call the actual processing logic + processed_output = process_task(prep_res) + return processed_output + + def post(self, shared, prep_res, exec_res): + # Store the result for review + shared["processed_output"] = exec_res + print(f"ProcessDataNode: Post - Stored processed output: '{str(exec_res)[:50]}...'") + # This node ends the initial processing subflow + return None + +class PrepareFinalResultNode(Node): + """Takes the approved processed output and sets it as the final result.""" + def prep(self, shared): + approved_output = shared.get("processed_output", "No processed output found") + print(f"PrepareFinalResultNode: Prep - Approved output: '{str(approved_output)[:50]}...'") + return approved_output + + def exec(self, prep_res): + print("PrepareFinalResultNode: Exec - Finalizing result.") + # Could potentially do final formatting here if needed + return prep_res + + def post(self, shared, prep_res, exec_res): + shared["final_result"] = exec_res + print(f"PrepareFinalResultNode: Post - Stored final result: '{str(exec_res)[:50]}...'") + # This node ends the finalization subflow + return None \ No newline at end of file diff --git a/cookbook/pocketflow-streamlit-hitl/requirements.txt b/cookbook/pocketflow-streamlit-hitl/requirements.txt new file mode 100644 index 0000000..d63f8e3 --- /dev/null +++ b/cookbook/pocketflow-streamlit-hitl/requirements.txt @@ -0,0 +1,3 @@ +streamlit +pocketflow +openai diff --git a/cookbook/pocketflow-streamlit-hitl/utils/__init__.py b/cookbook/pocketflow-streamlit-hitl/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cookbook/pocketflow-streamlit-hitl/utils/process_task.py b/cookbook/pocketflow-streamlit-hitl/utils/process_task.py new file mode 100644 index 0000000..5bb7a13 --- /dev/null +++ b/cookbook/pocketflow-streamlit-hitl/utils/process_task.py @@ -0,0 +1,24 @@ +import time + +def process_task(task_input: str) -> str: + """ + Simulates processing the input, potentially calling an LLM. + Replace this with your actual task logic. + """ + print(f"Processing task: {task_input[:50]}...") + + result = f"Rephrased text for the following input: {task_input}" + + # Simulate some work + time.sleep(2) + return result + +if __name__ == "__main__": + test_input = "This is a test input for the processing task." + print(f"Input: {test_input}") + output = process_task(test_input) + print(f"Output: {output}") + +# We don't need a separate utils/call_llm.py for this minimal example, +# but you would add it here if ProcessNode used an LLM. +