From 91dc7bbc9930f792bbd9c8c51fda3d67dc33fc91 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Sat, 12 Apr 2025 16:30:19 -0400 Subject: [PATCH] update a2a --- cookbook/pocketflow_a2a/a2a_client.py | 17 +++++++++++------ cookbook/pocketflow_a2a/a2a_server.py | 2 +- cookbook/pocketflow_a2a/requirements.txt | 1 + cookbook/pocketflow_a2a/task_manager.py | 11 ++++++----- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/cookbook/pocketflow_a2a/a2a_client.py b/cookbook/pocketflow_a2a/a2a_client.py index 04c295e..356027c 100644 --- a/cookbook/pocketflow_a2a/a2a_client.py +++ b/cookbook/pocketflow_a2a/a2a_client.py @@ -3,6 +3,8 @@ import asyncio import asyncclick as click # Using asyncclick for async main from uuid import uuid4 import json # For potentially inspecting raw errors +import anyio +import functools # Import from the common directory placed alongside this script from common.client import A2AClient @@ -51,17 +53,20 @@ async def cli(agent_url: str): while True: taskId = uuid4().hex # Generate a new task ID for each interaction try: - prompt = await click.prompt( + # Use functools.partial to prepare the prompt function call + prompt_func = functools.partial( + click.prompt, colorize(C_CYAN, "\nEnter your question (:q or quit to exit)"), prompt_suffix=" > ", - type=str # Ensure prompt returns string + type=str ) - except RuntimeError: - # This can happen if stdin is closed, e.g., in some test runners - print(colorize(C_RED, "Failed to read input. Exiting.")) + # Run the synchronous prompt function in a worker thread + prompt = await anyio.to_thread.run_sync(prompt_func) + except (EOFError, RuntimeError, KeyboardInterrupt): + # Catch potential errors during input or if stdin closes + print(colorize(C_RED, "\nInput closed or interrupted. Exiting.")) break - if prompt.lower() in [":q", "quit"]: print(colorize(C_YELLOW, "Exiting client.")) break diff --git a/cookbook/pocketflow_a2a/a2a_server.py b/cookbook/pocketflow_a2a/a2a_server.py index 9ef6127..d00569b 100644 --- a/cookbook/pocketflow_a2a/a2a_server.py +++ b/cookbook/pocketflow_a2a/a2a_server.py @@ -8,7 +8,7 @@ from common.server import A2AServer from common.types import AgentCard, AgentCapabilities, AgentSkill, MissingAPIKeyError # Import your custom TaskManager (which now imports from your original files) -from .task_manager import PocketFlowTaskManager +from task_manager import PocketFlowTaskManager logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/cookbook/pocketflow_a2a/requirements.txt b/cookbook/pocketflow_a2a/requirements.txt index 5134cff..9ddb196 100644 --- a/cookbook/pocketflow_a2a/requirements.txt +++ b/cookbook/pocketflow_a2a/requirements.txt @@ -17,5 +17,6 @@ click>=8.0.0,<9.0.0 # For A2A Client httpx>=0.27.0,<0.28.0 +httpx-sse>=0.4.0 asyncclick>=8.1.8 # Or just 'click' if you prefer asyncio.run pydantic>=2.0.0,<3.0.0 # For common.types \ No newline at end of file diff --git a/cookbook/pocketflow_a2a/task_manager.py b/cookbook/pocketflow_a2a/task_manager.py index 039f0f8..202ecdd 100644 --- a/cookbook/pocketflow_a2a/task_manager.py +++ b/cookbook/pocketflow_a2a/task_manager.py @@ -9,13 +9,13 @@ from common.types import ( JSONRPCResponse, SendTaskRequest, SendTaskResponse, SendTaskStreamingRequest, SendTaskStreamingResponse, Task, TaskSendParams, TaskState, TaskStatus, TextPart, Artifact, UnsupportedOperationError, - InternalError, InvalidParamsError + InternalError, InvalidParamsError, + Message ) import common.server.utils as server_utils # Import directly from your original PocketFlow files -from .flow import create_agent_flow # Assumes flow.py is in the same directory -from .utils import call_llm, search_web # Make utils functions available if needed elsewhere +from flow import create_agent_flow logger = logging.getLogger(__name__) @@ -62,9 +62,10 @@ class PocketFlowTaskManager(InMemoryTaskManager): # executor to avoid blocking the event loop. For simplicity here, we run it directly. # Consider adding a timeout if flows can hang. logger.info(f"Running PocketFlow for task {task_params.id}...") - final_state_dict = agent_flow.run(shared_data) + agent_flow.run(shared_data) # Run the flow, modifying shared_data in place logger.info(f"PocketFlow completed for task {task_params.id}") - answer_text = final_state_dict.get("answer", "Agent did not produce a final answer text.") + # Access the original shared_data dictionary, which was modified by the flow + answer_text = shared_data.get("answer", "Agent did not produce a final answer text.") # --- Package result into A2A Task --- final_task_status = TaskStatus(state=TaskState.COMPLETED)