From dbbdbba8c1f49f889d2b80ec3db210d342ad0cb7 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Sat, 28 Dec 2024 23:02:36 +0000 Subject: [PATCH] async --- README.md | 24 +++++---- docs/async.md | 143 ++++++++++++-------------------------------------- docs/node.md | 3 +- 3 files changed, 48 insertions(+), 122 deletions(-) diff --git a/README.md b/README.md index f1c7d31..deffd76 100644 --- a/README.md +++ b/README.md @@ -13,21 +13,23 @@ Documentation: https://zachary62.github.io/miniLLMFlow/ ## Why Mini LLM Flow? -In the future, **LLM apps will be developed by LLMs**: users specify requirements, and LLMs design, build, and maintain on their own. Current LLMs: +Mini LLM Flow is designed to **be the framework used by LLM assistants**. In the future, LLM app development will be heavily **LLM-assisted**: Users specify requirements, and LLM assistants design, build, and maintain themselves. Current LLM assistants: -1. **๐Ÿ‘ Shine at Low-level Implementation**: -With proper docs, LLMs can handle APIs, tools, chunking, prompt wrapping, etc. -These are hard to maintain and optimize for a general-purpose framework. -2. **๐Ÿ‘Ž Struggle with High-level Paradigms**: -Paradigms like MapReduce, task decomposition, and agents are powerful for development. -However, designing these elegantly remains challenging for LLMs. +1. **๐Ÿ‘ Shine at Low-level Implementation** +LLMs excel at APIs, tools, chunking, prompting, etc. These don't belong in a general-purpose framework; they're too specialized to maintain and optimize. -To enable LLMs to develop LLM app, a framework should -(1) remove specialized low-level implementations, and -(2) keep high-level paradigms to program against. -Hence, I built this framework that lets LLMs focus on what matters. It turns out 100 lines is all you need. +2. **๐Ÿ‘Ž Struggle with High-level Paradigms** +Paradigms like MapReduce, task decomposition, and agents are powerful. However, designing these elegantly remains challenging for LLMs. + + +The ideal framework for LLM assistants should: +(1) Remove specialized low-level implementations. +(2) Keep high-level paradigms to program against. +Hence, I built this minimal (100-line) framework so LLMs can focus on what matters. + +Mini LLM Flow is also a great learning resource, as many frameworks abstract too much away.
diff --git a/docs/async.md b/docs/async.md index 5f2afcc..3f4efad 100644 --- a/docs/async.md +++ b/docs/async.md @@ -7,135 +7,58 @@ nav_order: 5 # Async -**Mini LLM Flow** supports **async/await** paradigms for concurrency or parallel workloads. This is particularly useful for: -- Making **concurrent LLM calls** (e.g., if your LLM client library supports async). -- Handling **network I/O** or **external APIs** in an event loop. -- Minimizing **idle** time while waiting for responses, especially in batch operations. +**Async** pattern allows the `post()` step to be asynchronous (`post_async()`). This is especially helpful if you need to **await** something in `post_async()`โ€”for example, user feedback or external async requests. + +**Warning**: Only `post()` is async. `prep()` and `exec()` must be sync (often used for LLM calls). + +--- ## 1. AsyncNode -An **AsyncNode** is like a normal `Node`, except `exec()` (and optionally `prep()`) can be declared **async**. You can `await` inside these methods. For example: +Below is a minimal **AsyncNode** that calls an LLM in `exec()` (sync) and then awaits user feedback in `post_async()`: ```python -class AsyncSummarizeFile(AsyncNode): - async def prep(self, shared): - # Possibly do async file reads or small concurrency tasks - filename = self.params["filename"] - return shared["data"].get(filename, "") +class SummarizeThenVerify(AsyncNode): + def exec(self, shared, prep_res): + doc = shared.get("doc", "") + return call_llm(f"Summarize: {doc}") - async def exec(self, shared, prep_res): - # Use an async LLM client or other I/O - if not prep_res: - raise ValueError("File content is empty (async).") - - prompt = f"Summarize asynchronously: {prep_res}" - # Suppose call_llm_async is an async function - summary = await call_llm_async(prompt) - return summary - - def post(self, shared, prep_res, exec_res): - # post can remain sync - filename = self.params["filename"] - shared["summary"][filename] = exec_res - return "default" + async def post_async(self, shared, prep_res, exec_res): + user_decision = await gather_user_feedback(exec_res) + if user_decision == "approve": + shared["summary"] = exec_res + return "approve" + else: + return "deny" ``` -- **`prep(shared)`** can be `async def` if you want to do asynchronous pre-processing. -- **`exec(shared, prep_res)`** is typically the main place for async logic. -- **`post`** can stay sync or be async; itโ€™s optional to mark it `async`. +--- ## 2. AsyncFlow -An **AsyncFlow** is a Flow where nodes can be **AsyncNode**s or normal `Node`s. You run it in an event loop with `await async_flow.run(shared)`. - -### Minimal Example +We can build an **AsyncFlow** around this node. If the user denies, we loop back for another attempt; if approved, we pass to a final node: ```python -class MyAsyncFlow(AsyncFlow): - pass # Usually, you just instantiate AsyncFlow with a start node +summarize_node = SummarizeThenVerify() +final_node = Finalize() -# Build your nodes -load_data_node = LoadData() # normal Node is OK, too -async_summarize = AsyncSummarizeFile() +# Chain conditions +summarize_node - "approve" >> final_node +summarize_node - "deny" >> summarize_node # retry loop -# Connect them -load_data_node >> async_summarize - -my_flow = MyAsyncFlow(start=load_data_node) - -# Running the flow (in an async context): -import asyncio +flow = AsyncFlow(start=summarize_node) async def main(): - shared = {"data": {}, "summary": {}} - await my_flow.run(shared) + shared = {"doc": "Mini LLM Flow is a lightweight LLM framework."} + await flow.run_async(shared) + print("Final stored summary:", shared.get("final_summary")) asyncio.run(main()) ``` -- If the start node or any subsequent node is an `AsyncNode`, the Flow automatically calls its `prep()`, `exec()`, `post()` as async functions. -- You can mix normal `Node`s and `AsyncNode`s in the same flow. **AsyncFlow** will handle the difference seamlessly. +- **SummarizeThenVerify**: + - `exec()`: Summarizes text (sync LLM call). + - `post_async()`: Waits for user approval. +- **Finalize**: Makes a final LLM call and prints the summary. +- If user denies, the flow loops back to **SummarizeThenVerify**. -## 3. BatchAsyncFlow - -If you want to run a batch of flows **concurrently**, you can use `BatchAsyncFlow`. Like `BatchFlow`, it generates a list of parameter sets in `prep()`, but each iteration runs the sub-flow asynchronously. - -```python -class SummarizeAllFilesAsync(BatchAsyncFlow): - async def prep(self, shared): - # Return a list of param dicts (like in BatchFlow), - # but you can do async logic here if needed. - filenames = list(shared["data"].keys()) - return [{"filename": fn} for fn in filenames] - -# Usage: -# Suppose async_summarize_flow is an AsyncFlow that processes a single file. - -all_files_flow = SummarizeAllFilesAsync(start=async_summarize_flow) - -# Then in your async context: -await all_files_flow.run(shared) -``` - -Under the hood: -1. `prep()` returns a list of param sets. -2. `BatchAsyncFlow` processes each **in sequence** by default, but each iteration is still an async run of the sub-flow. -3. If you want **true concurrency** (e.g., launch sub-flows in parallel), you can override methods or manage concurrency at a higher level. - -## 4. Combining Async with Retries & Fault Tolerance - -Just like normal Nodes, an `AsyncNode` can have `max_retries` and a `process_after_fail(...)` method: - -```python -class RetryAsyncNode(AsyncNode): - def __init__(self, max_retries=3): - super().__init__(max_retries=max_retries) - - async def exec(self, shared, prep_res): - # Potentially failing async call - response = await async_api_call(...) - return response - - def process_after_fail(self, shared, prep_res, exc): - # Provide fallback response - return "Unable to complete async call due to error." -``` - -## 5. Best Practices - -1. **Ensure Your LLM Client or I/O Library is Async-Aware** - - For truly concurrent calls, your LLM or HTTP client must support async. Otherwise, you gain little from using `AsyncNode`. -2. **Manage Rate Limits** - - Parallelizing many calls can hit LLM rate limits. Consider adding semaphores or concurrency checks. -3. **Use `asyncio.gather` If Needed** - - If you want multiple async calls in the same node, you can await them concurrently with `asyncio.gather`. -4. **Check Exceptions** - - If one call fails, how does it affect your flow? Decide if you want to retry or fallback. - -## 6. Summary - -- **AsyncNode**: A Node that supports `async def prep()/exec()/post()`. -- **AsyncFlow**: Orchestrates normal + async nodes. Run via `await flow.run(shared)`. -- **BatchAsyncFlow**: Repeats an AsyncFlow for multiple parameter sets, each iteration in an async manner. - -By taking advantage of Pythonโ€™s `asyncio` and the same minimal design, you can scale up your LLM or I/O tasks without blocking, making **Mini LLM Flow** suitable for high-throughput or high-latency scenarios. diff --git a/docs/node.md b/docs/node.md index 6784ac1..1468a4e 100644 --- a/docs/node.md +++ b/docs/node.md @@ -15,7 +15,8 @@ A **Node** is the smallest building block of Mini LLM Flow. Each Node has three - Returns `prep_res`, which will be passed to both `exec()` and `post()`. 2. **`exec(shared, prep_res)`** - - The main execution step, typically where you call your LLM or any external APIs. + - The main execution step where the LLM is called. + - Has a built-in retry feature to handle errors and ensure reliable results. - Returns `exec_res`, which is passed to `post()`. 3. **`post(shared, prep_res, exec_res)`**