diff --git a/docs/communication.md b/docs/communication.md index ebba2e2..dfcf67f 100644 --- a/docs/communication.md +++ b/docs/communication.md @@ -15,8 +15,7 @@ Nodes and Flows **communicate** in two ways: If you know memory management, **Shared Store** is like a **heap** shared across function calls, while **Params** is like a **stack** assigned by parent function calls. - -> **Why not use other communication models like Message Passing?** *Message passing* works well for simple DAGs, but with *nested graphs* (Flows containing Flows, repeated or cyclic calls), routing messages becomes hard to maintain. A shared store keeps the design simple and easy. +> **Why not use other communication models like Message Passing?** *Message passing* works well for simple DAGs, but with *nested graphs* (Flows containing Flows, repeated or cyclic calls), routing messages becomes hard to maintain. A shared store keeps the design simple and easy. However, a high-level message-passing abstraction among agents can be achieved using queues in shared storage (more in [Multi-Agents]./multi_agent.md). {: .note } --- diff --git a/docs/index.md b/docs/index.md index a65f3bf..8845c90 100644 --- a/docs/index.md +++ b/docs/index.md @@ -56,7 +56,7 @@ We model the LLM workflow as a **Nested Directed Graph**: - [RAG](./rag.md) - [Chat Memory](./memory.md) - [Agent](./agent.md) -- Multi-Agent +- [(Advanced) Multi-Agents](./multi_agent.md) - Evaluation ## Example Projects diff --git a/docs/multi_agent.md b/docs/multi_agent.md new file mode 100644 index 0000000..22e53e8 --- /dev/null +++ b/docs/multi_agent.md @@ -0,0 +1,185 @@ +--- +layout: default +title: "(Advanced) Multi-Agents" +parent: "Paradigm" +nav_order: 7 +--- + +# (Advanced) Multi-Agents + +Multiple [Agents](./flow.md) can work together by handling subtasks and communicating the progress. +Communication between agents is typically implemented using message queues in shared storage. + + +### Example Agent Communication: Message Queue + +Here's a simple example showing how to implement agent communication using `asyncio.Queue`. +The agent listens for messages, processes them, and continues listening: + +```python +class AgentNode(AsyncNode): + async def prep_async(self, _): + message_queue = self.params["messages"] + message = await message_queue.get() + print(f"Agent received: {message}") + return message + +# Create node and flow +agent = AgentNode() +agent >> agent # connect to self +flow = AsyncFlow(start=agent) + +# Create heartbeat sender +async def send_system_messages(message_queue): + counter = 0 + messages = [ + "System status: all systems operational", + "Memory usage: normal", + "Network connectivity: stable", + "Processing load: optimal" + ] + + while True: + message = f"{messages[counter % len(messages)]} | timestamp_{counter}" + await message_queue.put(message) + counter += 1 + await asyncio.sleep(1) + +async def main(): + message_queue = asyncio.Queue() + shared = {} + flow.set_params({"messages": message_queue}) + + # Run both coroutines + await asyncio.gather( + flow.run_async(shared), + send_system_messages(message_queue) + ) + +asyncio.run(main()) +``` + +The output: + +``` +Agent received: System status: all systems operational | timestamp_0 +Agent received: Memory usage: normal | timestamp_1 +Agent received: Network connectivity: stable | timestamp_2 +Agent received: Processing load: optimal | timestamp_3 +``` + + +### Interactive Multi-Agent Example: Taboo Game + +Here's a more complex example where two agents play the word-guessing game Taboo. +One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word: + +```python +class AsyncHinter(AsyncNode): + async def prep_async(self, shared): + guess = await shared["hinter_queue"].get() + if guess == "GAME_OVER": + return None + return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", []) + + async def exec_async(self, inputs): + if inputs is None: + return None + target, forbidden, past_guesses = inputs + prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" + if past_guesses: + prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific." + prompt += "\nUse at most 5 words." + + hint = call_llm(prompt) + print(f"\nHinter: Here's your hint - {hint}") + return hint + + async def post_async(self, shared, prep_res, exec_res): + if exec_res is None: + return "end" + await shared["guesser_queue"].put(exec_res) + return "continue" + +class AsyncGuesser(AsyncNode): + async def prep_async(self, shared): + hint = await shared["guesser_queue"].get() + return hint, shared.get("past_guesses", []) + + async def exec_async(self, inputs): + hint, past_guesses = inputs + prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:" + guess = call_llm(prompt) + print(f"Guesser: I guess it's - {guess}") + return guess + + async def post_async(self, shared, prep_res, exec_res): + if exec_res.lower() == shared["target_word"].lower(): + print("Game Over - Correct guess!") + await shared["hinter_queue"].put("GAME_OVER") + return "end" + + if "past_guesses" not in shared: + shared["past_guesses"] = [] + shared["past_guesses"].append(exec_res) + + await shared["hinter_queue"].put(exec_res) + return "continue" + +async def main(): + # Set up game + shared = { + "target_word": "nostalgia", + "forbidden_words": ["memory", "past", "remember", "feeling", "longing"], + "hinter_queue": asyncio.Queue(), + "guesser_queue": asyncio.Queue() + } + + print("Game starting!") + print(f"Target word: {shared['target_word']}") + print(f"Forbidden words: {shared['forbidden_words']}") + + # Initialize by sending empty guess to hinter + await shared["hinter_queue"].put("") + + # Create nodes and flows + hinter = AsyncHinter() + guesser = AsyncGuesser() + + # Set up flows + hinter_flow = AsyncFlow(start=hinter) + guesser_flow = AsyncFlow(start=guesser) + + # Connect nodes to themselves + hinter - "continue" >> hinter + guesser - "continue" >> guesser + + # Run both agents concurrently + await asyncio.gather( + hinter_flow.run_async(shared), + guesser_flow.run_async(shared) + ) + +asyncio.run(main()) +``` + +The Output: + +``` +Game starting! +Target word: nostalgia +Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing'] + +Hinter: Here's your hint - Thinking of childhood summer days +Guesser: I guess it's - popsicle + +Hinter: Here's your hint - When childhood cartoons make you emotional +Guesser: I guess it's - nostalgic + +Hinter: Here's your hint - When old songs move you +Guesser: I guess it's - memories + +Hinter: Here's your hint - That warm emotion about childhood +Guesser: I guess it's - nostalgia +Game Over - Correct guess! +``` \ No newline at end of file