multi-agent example
This commit is contained in:
parent
028c242a1f
commit
3fc3b8a503
|
|
@ -15,8 +15,7 @@ Nodes and Flows **communicate** in two ways:
|
|||
If you know memory management, **Shared Store** is like a **heap** shared across function calls, while **Params** is like a **stack** assigned by parent function calls.
|
||||
|
||||
|
||||
|
||||
> **Why not use other communication models like Message Passing?** *Message passing* works well for simple DAGs, but with *nested graphs* (Flows containing Flows, repeated or cyclic calls), routing messages becomes hard to maintain. A shared store keeps the design simple and easy.
|
||||
> **Why not use other communication models like Message Passing?** *Message passing* works well for simple DAGs, but with *nested graphs* (Flows containing Flows, repeated or cyclic calls), routing messages becomes hard to maintain. A shared store keeps the design simple and easy. However, a high-level message-passing abstraction among agents can be achieved using queues in shared storage (more in [Multi-Agents]./multi_agent.md).
|
||||
{: .note }
|
||||
|
||||
---
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ We model the LLM workflow as a **Nested Directed Graph**:
|
|||
- [RAG](./rag.md)
|
||||
- [Chat Memory](./memory.md)
|
||||
- [Agent](./agent.md)
|
||||
- Multi-Agent
|
||||
- [(Advanced) Multi-Agents](./multi_agent.md)
|
||||
- Evaluation
|
||||
|
||||
## Example Projects
|
||||
|
|
|
|||
|
|
@ -0,0 +1,185 @@
|
|||
---
|
||||
layout: default
|
||||
title: "(Advanced) Multi-Agents"
|
||||
parent: "Paradigm"
|
||||
nav_order: 7
|
||||
---
|
||||
|
||||
# (Advanced) Multi-Agents
|
||||
|
||||
Multiple [Agents](./flow.md) can work together by handling subtasks and communicating the progress.
|
||||
Communication between agents is typically implemented using message queues in shared storage.
|
||||
|
||||
|
||||
### Example Agent Communication: Message Queue
|
||||
|
||||
Here's a simple example showing how to implement agent communication using `asyncio.Queue`.
|
||||
The agent listens for messages, processes them, and continues listening:
|
||||
|
||||
```python
|
||||
class AgentNode(AsyncNode):
|
||||
async def prep_async(self, _):
|
||||
message_queue = self.params["messages"]
|
||||
message = await message_queue.get()
|
||||
print(f"Agent received: {message}")
|
||||
return message
|
||||
|
||||
# Create node and flow
|
||||
agent = AgentNode()
|
||||
agent >> agent # connect to self
|
||||
flow = AsyncFlow(start=agent)
|
||||
|
||||
# Create heartbeat sender
|
||||
async def send_system_messages(message_queue):
|
||||
counter = 0
|
||||
messages = [
|
||||
"System status: all systems operational",
|
||||
"Memory usage: normal",
|
||||
"Network connectivity: stable",
|
||||
"Processing load: optimal"
|
||||
]
|
||||
|
||||
while True:
|
||||
message = f"{messages[counter % len(messages)]} | timestamp_{counter}"
|
||||
await message_queue.put(message)
|
||||
counter += 1
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def main():
|
||||
message_queue = asyncio.Queue()
|
||||
shared = {}
|
||||
flow.set_params({"messages": message_queue})
|
||||
|
||||
# Run both coroutines
|
||||
await asyncio.gather(
|
||||
flow.run_async(shared),
|
||||
send_system_messages(message_queue)
|
||||
)
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
The output:
|
||||
|
||||
```
|
||||
Agent received: System status: all systems operational | timestamp_0
|
||||
Agent received: Memory usage: normal | timestamp_1
|
||||
Agent received: Network connectivity: stable | timestamp_2
|
||||
Agent received: Processing load: optimal | timestamp_3
|
||||
```
|
||||
|
||||
|
||||
### Interactive Multi-Agent Example: Taboo Game
|
||||
|
||||
Here's a more complex example where two agents play the word-guessing game Taboo.
|
||||
One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word:
|
||||
|
||||
```python
|
||||
class AsyncHinter(AsyncNode):
|
||||
async def prep_async(self, shared):
|
||||
guess = await shared["hinter_queue"].get()
|
||||
if guess == "GAME_OVER":
|
||||
return None
|
||||
return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", [])
|
||||
|
||||
async def exec_async(self, inputs):
|
||||
if inputs is None:
|
||||
return None
|
||||
target, forbidden, past_guesses = inputs
|
||||
prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}"
|
||||
if past_guesses:
|
||||
prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific."
|
||||
prompt += "\nUse at most 5 words."
|
||||
|
||||
hint = call_llm(prompt)
|
||||
print(f"\nHinter: Here's your hint - {hint}")
|
||||
return hint
|
||||
|
||||
async def post_async(self, shared, prep_res, exec_res):
|
||||
if exec_res is None:
|
||||
return "end"
|
||||
await shared["guesser_queue"].put(exec_res)
|
||||
return "continue"
|
||||
|
||||
class AsyncGuesser(AsyncNode):
|
||||
async def prep_async(self, shared):
|
||||
hint = await shared["guesser_queue"].get()
|
||||
return hint, shared.get("past_guesses", [])
|
||||
|
||||
async def exec_async(self, inputs):
|
||||
hint, past_guesses = inputs
|
||||
prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:"
|
||||
guess = call_llm(prompt)
|
||||
print(f"Guesser: I guess it's - {guess}")
|
||||
return guess
|
||||
|
||||
async def post_async(self, shared, prep_res, exec_res):
|
||||
if exec_res.lower() == shared["target_word"].lower():
|
||||
print("Game Over - Correct guess!")
|
||||
await shared["hinter_queue"].put("GAME_OVER")
|
||||
return "end"
|
||||
|
||||
if "past_guesses" not in shared:
|
||||
shared["past_guesses"] = []
|
||||
shared["past_guesses"].append(exec_res)
|
||||
|
||||
await shared["hinter_queue"].put(exec_res)
|
||||
return "continue"
|
||||
|
||||
async def main():
|
||||
# Set up game
|
||||
shared = {
|
||||
"target_word": "nostalgia",
|
||||
"forbidden_words": ["memory", "past", "remember", "feeling", "longing"],
|
||||
"hinter_queue": asyncio.Queue(),
|
||||
"guesser_queue": asyncio.Queue()
|
||||
}
|
||||
|
||||
print("Game starting!")
|
||||
print(f"Target word: {shared['target_word']}")
|
||||
print(f"Forbidden words: {shared['forbidden_words']}")
|
||||
|
||||
# Initialize by sending empty guess to hinter
|
||||
await shared["hinter_queue"].put("")
|
||||
|
||||
# Create nodes and flows
|
||||
hinter = AsyncHinter()
|
||||
guesser = AsyncGuesser()
|
||||
|
||||
# Set up flows
|
||||
hinter_flow = AsyncFlow(start=hinter)
|
||||
guesser_flow = AsyncFlow(start=guesser)
|
||||
|
||||
# Connect nodes to themselves
|
||||
hinter - "continue" >> hinter
|
||||
guesser - "continue" >> guesser
|
||||
|
||||
# Run both agents concurrently
|
||||
await asyncio.gather(
|
||||
hinter_flow.run_async(shared),
|
||||
guesser_flow.run_async(shared)
|
||||
)
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
The Output:
|
||||
|
||||
```
|
||||
Game starting!
|
||||
Target word: nostalgia
|
||||
Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing']
|
||||
|
||||
Hinter: Here's your hint - Thinking of childhood summer days
|
||||
Guesser: I guess it's - popsicle
|
||||
|
||||
Hinter: Here's your hint - When childhood cartoons make you emotional
|
||||
Guesser: I guess it's - nostalgic
|
||||
|
||||
Hinter: Here's your hint - When old songs move you
|
||||
Guesser: I guess it's - memories
|
||||
|
||||
Hinter: Here's your hint - That warm emotion about childhood
|
||||
Guesser: I guess it's - nostalgia
|
||||
Game Over - Correct guess!
|
||||
```
|
||||
Loading…
Reference in New Issue