pocketflow/cookbook/pocketflow-multi-agent/main.py

101 lines
3.3 KiB
Python

import asyncio
from pocketflow import AsyncNode, AsyncFlow
from utils import call_llm
class AsyncHinter(AsyncNode):
async def prep_async(self, shared):
# Wait for message from guesser (or empty string at start)
guess = await shared["hinter_queue"].get()
if guess == "GAME_OVER":
return None
return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", [])
async def exec_async(self, inputs):
if inputs is None:
return None
target, forbidden, past_guesses = inputs
prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}"
if past_guesses:
prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific."
prompt += "\nUse at most 5 words."
hint = call_llm(prompt)
print(f"\nHinter: Here's your hint - {hint}")
return hint
async def post_async(self, shared, prep_res, exec_res):
if exec_res is None:
return "end"
# Send hint to guesser
await shared["guesser_queue"].put(exec_res)
return "continue"
class AsyncGuesser(AsyncNode):
async def prep_async(self, shared):
# Wait for hint from hinter
hint = await shared["guesser_queue"].get()
return hint, shared.get("past_guesses", [])
async def exec_async(self, inputs):
hint, past_guesses = inputs
prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:"
guess = call_llm(prompt)
print(f"Guesser: I guess it's - {guess}")
return guess
async def post_async(self, shared, prep_res, exec_res):
# Check if guess is correct
if exec_res.lower() == shared["target_word"].lower():
print("Game Over - Correct guess!")
await shared["hinter_queue"].put("GAME_OVER")
return "end"
# Store the guess in shared state
if "past_guesses" not in shared:
shared["past_guesses"] = []
shared["past_guesses"].append(exec_res)
# Send guess to hinter
await shared["hinter_queue"].put(exec_res)
return "continue"
async def main():
# Set up game
shared = {
"target_word": "nostalgic",
"forbidden_words": ["memory", "past", "remember", "feeling", "longing"],
"hinter_queue": asyncio.Queue(),
"guesser_queue": asyncio.Queue()
}
print("=========== Taboo Game Starting! ===========")
print(f"Target word: {shared['target_word']}")
print(f"Forbidden words: {shared['forbidden_words']}")
print("============================================")
# Initialize by sending empty guess to hinter
await shared["hinter_queue"].put("")
# Create nodes and flows
hinter = AsyncHinter()
guesser = AsyncGuesser()
# Set up flows
hinter_flow = AsyncFlow(start=hinter)
guesser_flow = AsyncFlow(start=guesser)
# Connect nodes to themselves for looping
hinter - "continue" >> hinter
guesser - "continue" >> guesser
# Run both agents concurrently
await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared)
)
print("=========== Game Complete! ===========")
if __name__ == "__main__":
asyncio.run(main())