pocketflow/cookbook/pocketflow-web-hitl/nodes.py

78 lines
2.7 KiB
Python

from pocketflow import Node, AsyncNode
from utils.process_task import process_task
class ProcessNode(Node):
def prep(self, shared):
task_input = shared.get("task_input", "No input")
print("ProcessNode Prep")
return task_input
def exec(self, prep_res):
return process_task(prep_res)
def post(self, shared, prep_res, exec_res):
shared["processed_output"] = exec_res
print("ProcessNode Post: Output stored.")
return "default" # Go to ReviewNode
class ReviewNode(AsyncNode):
async def prep_async(self, shared):
review_event = shared.get("review_event")
queue = shared.get("sse_queue") # Expect queue in shared
processed_output = shared.get("processed_output", "N/A")
if not review_event or not queue:
print("ERROR: ReviewNode Prep - Missing review_event or sse_queue in shared store!")
return None # Signal failure
# Push status update to SSE queue
status_update = {
"status": "waiting_for_review",
"output_to_review": processed_output
}
await queue.put(status_update)
print("ReviewNode Prep: Put 'waiting_for_review' on SSE queue.")
return review_event # Return event for exec_async
async def exec_async(self, prep_res):
review_event = prep_res
if not review_event:
print("ReviewNode Exec: Skipping wait (no event from prep).")
return
print("ReviewNode Exec: Waiting on review_event...")
await review_event.wait()
print("ReviewNode Exec: review_event set.")
async def post_async(self, shared, prep_res, exec_res):
feedback = shared.get("feedback")
print(f"ReviewNode Post: Processing feedback '{feedback}'")
# Clear the event for potential loops
review_event = shared.get("review_event")
if review_event:
review_event.clear()
shared["feedback"] = None # Reset feedback
if feedback == "approved":
shared["final_result"] = shared.get("processed_output")
print("ReviewNode Post: Action=approved")
return "approved"
else:
print("ReviewNode Post: Action=rejected")
return "rejected"
class ResultNode(Node):
def prep(self, shared):
print("ResultNode Prep")
return shared.get("final_result", "No final result.")
def exec(self, prep_res):
print(f"--- FINAL RESULT ---")
print(prep_res)
print(f"--------------------")
return prep_res
def post(self, shared, prep_res, exec_res):
print("ResultNode Post: Flow finished.")
return None # End flow