async
This commit is contained in:
parent
215ae8e089
commit
dbbdbba8c1
24
README.md
24
README.md
|
|
@ -13,21 +13,23 @@ Documentation: https://zachary62.github.io/miniLLMFlow/
|
||||||
|
|
||||||
## Why Mini LLM Flow?
|
## Why Mini LLM Flow?
|
||||||
|
|
||||||
In the future, **LLM apps will be developed by LLMs**: users specify requirements, and LLMs design, build, and maintain on their own. Current LLMs:
|
Mini LLM Flow is designed to **be the framework used by LLM assistants**. In the future, LLM app development will be heavily **LLM-assisted**: Users specify requirements, and LLM assistants design, build, and maintain themselves. Current LLM assistants:
|
||||||
|
|
||||||
1. **👍 Shine at Low-level Implementation**:
|
|
||||||
With proper docs, LLMs can handle APIs, tools, chunking, prompt wrapping, etc.
|
|
||||||
These are hard to maintain and optimize for a general-purpose framework.
|
|
||||||
|
|
||||||
2. **👎 Struggle with High-level Paradigms**:
|
1. **👍 Shine at Low-level Implementation**
|
||||||
Paradigms like MapReduce, task decomposition, and agents are powerful for development.
|
LLMs excel at APIs, tools, chunking, prompting, etc. These don't belong in a general-purpose framework; they're too specialized to maintain and optimize.
|
||||||
However, designing these elegantly remains challenging for LLMs.
|
|
||||||
|
|
||||||
To enable LLMs to develop LLM app, a framework should
|
|
||||||
(1) remove specialized low-level implementations, and
|
|
||||||
(2) keep high-level paradigms to program against.
|
|
||||||
Hence, I built this framework that lets LLMs focus on what matters. It turns out 100 lines is all you need.
|
|
||||||
|
|
||||||
|
2. **👎 Struggle with High-level Paradigms**
|
||||||
|
Paradigms like MapReduce, task decomposition, and agents are powerful. However, designing these elegantly remains challenging for LLMs.
|
||||||
|
|
||||||
|
|
||||||
|
The ideal framework for LLM assistants should:
|
||||||
|
(1) Remove specialized low-level implementations.
|
||||||
|
(2) Keep high-level paradigms to program against.
|
||||||
|
Hence, I built this minimal (100-line) framework so LLMs can focus on what matters.
|
||||||
|
|
||||||
|
Mini LLM Flow is also a great learning resource, as many frameworks abstract too much away.
|
||||||
|
|
||||||
<div align="center">
|
<div align="center">
|
||||||
<img src="/assets/minillmflow.jpg" width="400"/>
|
<img src="/assets/minillmflow.jpg" width="400"/>
|
||||||
|
|
|
||||||
143
docs/async.md
143
docs/async.md
|
|
@ -7,135 +7,58 @@ nav_order: 5
|
||||||
|
|
||||||
# Async
|
# Async
|
||||||
|
|
||||||
**Mini LLM Flow** supports **async/await** paradigms for concurrency or parallel workloads. This is particularly useful for:
|
**Async** pattern allows the `post()` step to be asynchronous (`post_async()`). This is especially helpful if you need to **await** something in `post_async()`—for example, user feedback or external async requests.
|
||||||
- Making **concurrent LLM calls** (e.g., if your LLM client library supports async).
|
|
||||||
- Handling **network I/O** or **external APIs** in an event loop.
|
**Warning**: Only `post()` is async. `prep()` and `exec()` must be sync (often used for LLM calls).
|
||||||
- Minimizing **idle** time while waiting for responses, especially in batch operations.
|
|
||||||
|
---
|
||||||
|
|
||||||
## 1. AsyncNode
|
## 1. AsyncNode
|
||||||
|
|
||||||
An **AsyncNode** is like a normal `Node`, except `exec()` (and optionally `prep()`) can be declared **async**. You can `await` inside these methods. For example:
|
Below is a minimal **AsyncNode** that calls an LLM in `exec()` (sync) and then awaits user feedback in `post_async()`:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
class AsyncSummarizeFile(AsyncNode):
|
class SummarizeThenVerify(AsyncNode):
|
||||||
async def prep(self, shared):
|
def exec(self, shared, prep_res):
|
||||||
# Possibly do async file reads or small concurrency tasks
|
doc = shared.get("doc", "")
|
||||||
filename = self.params["filename"]
|
return call_llm(f"Summarize: {doc}")
|
||||||
return shared["data"].get(filename, "")
|
|
||||||
|
|
||||||
async def exec(self, shared, prep_res):
|
async def post_async(self, shared, prep_res, exec_res):
|
||||||
# Use an async LLM client or other I/O
|
user_decision = await gather_user_feedback(exec_res)
|
||||||
if not prep_res:
|
if user_decision == "approve":
|
||||||
raise ValueError("File content is empty (async).")
|
shared["summary"] = exec_res
|
||||||
|
return "approve"
|
||||||
prompt = f"Summarize asynchronously: {prep_res}"
|
else:
|
||||||
# Suppose call_llm_async is an async function
|
return "deny"
|
||||||
summary = await call_llm_async(prompt)
|
|
||||||
return summary
|
|
||||||
|
|
||||||
def post(self, shared, prep_res, exec_res):
|
|
||||||
# post can remain sync
|
|
||||||
filename = self.params["filename"]
|
|
||||||
shared["summary"][filename] = exec_res
|
|
||||||
return "default"
|
|
||||||
```
|
```
|
||||||
|
|
||||||
- **`prep(shared)`** can be `async def` if you want to do asynchronous pre-processing.
|
---
|
||||||
- **`exec(shared, prep_res)`** is typically the main place for async logic.
|
|
||||||
- **`post`** can stay sync or be async; it’s optional to mark it `async`.
|
|
||||||
|
|
||||||
## 2. AsyncFlow
|
## 2. AsyncFlow
|
||||||
|
|
||||||
An **AsyncFlow** is a Flow where nodes can be **AsyncNode**s or normal `Node`s. You run it in an event loop with `await async_flow.run(shared)`.
|
We can build an **AsyncFlow** around this node. If the user denies, we loop back for another attempt; if approved, we pass to a final node:
|
||||||
|
|
||||||
### Minimal Example
|
|
||||||
|
|
||||||
```python
|
```python
|
||||||
class MyAsyncFlow(AsyncFlow):
|
summarize_node = SummarizeThenVerify()
|
||||||
pass # Usually, you just instantiate AsyncFlow with a start node
|
final_node = Finalize()
|
||||||
|
|
||||||
# Build your nodes
|
# Chain conditions
|
||||||
load_data_node = LoadData() # normal Node is OK, too
|
summarize_node - "approve" >> final_node
|
||||||
async_summarize = AsyncSummarizeFile()
|
summarize_node - "deny" >> summarize_node # retry loop
|
||||||
|
|
||||||
# Connect them
|
flow = AsyncFlow(start=summarize_node)
|
||||||
load_data_node >> async_summarize
|
|
||||||
|
|
||||||
my_flow = MyAsyncFlow(start=load_data_node)
|
|
||||||
|
|
||||||
# Running the flow (in an async context):
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
shared = {"data": {}, "summary": {}}
|
shared = {"doc": "Mini LLM Flow is a lightweight LLM framework."}
|
||||||
await my_flow.run(shared)
|
await flow.run_async(shared)
|
||||||
|
print("Final stored summary:", shared.get("final_summary"))
|
||||||
|
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
```
|
```
|
||||||
|
|
||||||
- If the start node or any subsequent node is an `AsyncNode`, the Flow automatically calls its `prep()`, `exec()`, `post()` as async functions.
|
- **SummarizeThenVerify**:
|
||||||
- You can mix normal `Node`s and `AsyncNode`s in the same flow. **AsyncFlow** will handle the difference seamlessly.
|
- `exec()`: Summarizes text (sync LLM call).
|
||||||
|
- `post_async()`: Waits for user approval.
|
||||||
|
- **Finalize**: Makes a final LLM call and prints the summary.
|
||||||
|
- If user denies, the flow loops back to **SummarizeThenVerify**.
|
||||||
|
|
||||||
## 3. BatchAsyncFlow
|
|
||||||
|
|
||||||
If you want to run a batch of flows **concurrently**, you can use `BatchAsyncFlow`. Like `BatchFlow`, it generates a list of parameter sets in `prep()`, but each iteration runs the sub-flow asynchronously.
|
|
||||||
|
|
||||||
```python
|
|
||||||
class SummarizeAllFilesAsync(BatchAsyncFlow):
|
|
||||||
async def prep(self, shared):
|
|
||||||
# Return a list of param dicts (like in BatchFlow),
|
|
||||||
# but you can do async logic here if needed.
|
|
||||||
filenames = list(shared["data"].keys())
|
|
||||||
return [{"filename": fn} for fn in filenames]
|
|
||||||
|
|
||||||
# Usage:
|
|
||||||
# Suppose async_summarize_flow is an AsyncFlow that processes a single file.
|
|
||||||
|
|
||||||
all_files_flow = SummarizeAllFilesAsync(start=async_summarize_flow)
|
|
||||||
|
|
||||||
# Then in your async context:
|
|
||||||
await all_files_flow.run(shared)
|
|
||||||
```
|
|
||||||
|
|
||||||
Under the hood:
|
|
||||||
1. `prep()` returns a list of param sets.
|
|
||||||
2. `BatchAsyncFlow` processes each **in sequence** by default, but each iteration is still an async run of the sub-flow.
|
|
||||||
3. If you want **true concurrency** (e.g., launch sub-flows in parallel), you can override methods or manage concurrency at a higher level.
|
|
||||||
|
|
||||||
## 4. Combining Async with Retries & Fault Tolerance
|
|
||||||
|
|
||||||
Just like normal Nodes, an `AsyncNode` can have `max_retries` and a `process_after_fail(...)` method:
|
|
||||||
|
|
||||||
```python
|
|
||||||
class RetryAsyncNode(AsyncNode):
|
|
||||||
def __init__(self, max_retries=3):
|
|
||||||
super().__init__(max_retries=max_retries)
|
|
||||||
|
|
||||||
async def exec(self, shared, prep_res):
|
|
||||||
# Potentially failing async call
|
|
||||||
response = await async_api_call(...)
|
|
||||||
return response
|
|
||||||
|
|
||||||
def process_after_fail(self, shared, prep_res, exc):
|
|
||||||
# Provide fallback response
|
|
||||||
return "Unable to complete async call due to error."
|
|
||||||
```
|
|
||||||
|
|
||||||
## 5. Best Practices
|
|
||||||
|
|
||||||
1. **Ensure Your LLM Client or I/O Library is Async-Aware**
|
|
||||||
- For truly concurrent calls, your LLM or HTTP client must support async. Otherwise, you gain little from using `AsyncNode`.
|
|
||||||
2. **Manage Rate Limits**
|
|
||||||
- Parallelizing many calls can hit LLM rate limits. Consider adding semaphores or concurrency checks.
|
|
||||||
3. **Use `asyncio.gather` If Needed**
|
|
||||||
- If you want multiple async calls in the same node, you can await them concurrently with `asyncio.gather`.
|
|
||||||
4. **Check Exceptions**
|
|
||||||
- If one call fails, how does it affect your flow? Decide if you want to retry or fallback.
|
|
||||||
|
|
||||||
## 6. Summary
|
|
||||||
|
|
||||||
- **AsyncNode**: A Node that supports `async def prep()/exec()/post()`.
|
|
||||||
- **AsyncFlow**: Orchestrates normal + async nodes. Run via `await flow.run(shared)`.
|
|
||||||
- **BatchAsyncFlow**: Repeats an AsyncFlow for multiple parameter sets, each iteration in an async manner.
|
|
||||||
|
|
||||||
By taking advantage of Python’s `asyncio` and the same minimal design, you can scale up your LLM or I/O tasks without blocking, making **Mini LLM Flow** suitable for high-throughput or high-latency scenarios.
|
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,8 @@ A **Node** is the smallest building block of Mini LLM Flow. Each Node has three
|
||||||
- Returns `prep_res`, which will be passed to both `exec()` and `post()`.
|
- Returns `prep_res`, which will be passed to both `exec()` and `post()`.
|
||||||
|
|
||||||
2. **`exec(shared, prep_res)`**
|
2. **`exec(shared, prep_res)`**
|
||||||
- The main execution step, typically where you call your LLM or any external APIs.
|
- The main execution step where the LLM is called.
|
||||||
|
- Has a built-in retry feature to handle errors and ensure reliable results.
|
||||||
- Returns `exec_res`, which is passed to `post()`.
|
- Returns `exec_res`, which is passed to `post()`.
|
||||||
|
|
||||||
3. **`post(shared, prep_res, exec_res)`**
|
3. **`post(shared, prep_res, exec_res)`**
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue