This commit is contained in:
zachary62 2025-01-01 21:06:05 +00:00
parent 27a86e8568
commit 118293a641
3 changed files with 15 additions and 10 deletions

View File

@ -11,7 +11,7 @@ A **Node** is the smallest building block of Mini LLM Flow. Each Node has 3 step
1. **`prep(shared)`** 1. **`prep(shared)`**
- Reads and preprocesses data from the `shared` store for LLMs. - Reads and preprocesses data from the `shared` store for LLMs.
- Examples: query DB, read files, or serialize data into a string. - Examples: *query DB, read files, or serialize data into a string*.
- Returns `prep_res`, which will be passed to both `exec()` and `post()`. - Returns `prep_res`, which will be passed to both `exec()` and `post()`.
2. **`exec(prep_res)`** 2. **`exec(prep_res)`**
@ -22,17 +22,20 @@ A **Node** is the smallest building block of Mini LLM Flow. Each Node has 3 step
3. **`post(shared, prep_res, exec_res)`** 3. **`post(shared, prep_res, exec_res)`**
- Writes results back to the `shared` store or decides the next action. - Writes results back to the `shared` store or decides the next action.
- Examples: finalize outputs, trigger next steps, or log results. - Examples: *finalize outputs, trigger next steps, or log results*.
- Returns a **string** to specify the next action (`"default"` if nothing or `None` is returned). - Returns a **string** to specify the next action (`"default"` if nothing or `None` is returned).
All 3 steps are optional. For example, you might only need to run the Prep without calling the LLM. All 3 steps are optional. For example, you might only need to run the Prep without calling the LLM.
## Fault Tolerance & Retries ## Fault Tolerance & Retries
Nodes in Mini LLM Flow can **retry** execution if `exec()` raises an exception. You control this via a `max_retries` parameter when you create the Node. By default, `max_retries = 1` (meaning no retry). Nodes in Mini LLM Flow can **retry** execution if `exec()` raises an exception. You control this via two parameters when you create the Node:
- `max_retries` (int): How many times to try running `exec()`. The default is `1`, which means **no** retry.
- `wait` (int): The time to wait (in **seconds**) before each retry attempt. By default, `wait=0` (i.e., no waiting). Increasing this is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
```python ```python
my_node = SummarizeFile(max_retries=3) my_node = SummarizeFile(max_retries=3, wait=10)
``` ```
When an exception occurs in `exec()`, the Node automatically retries until: When an exception occurs in `exec()`, the Node automatically retries until:
@ -40,6 +43,8 @@ When an exception occurs in `exec()`, the Node automatically retries until:
- It either succeeds, or - It either succeeds, or
- The Node has retried `max_retries - 1` times already and fails on the last attempt. - The Node has retried `max_retries - 1` times already and fails on the last attempt.
### Graceful Fallback
If you want to **gracefully handle** the error rather than raising it, you can override: If you want to **gracefully handle** the error rather than raising it, you can override:
```python ```python

View File

@ -1,4 +1,4 @@
import asyncio, warnings, copy import asyncio, warnings, copy, time
class BaseNode: class BaseNode:
def __init__(self): self.params,self.successors={},{} def __init__(self): self.params,self.successors={},{}
@ -24,13 +24,14 @@ class _ConditionalTransition:
def __rshift__(self,tgt): return self.src.add_successor(tgt,self.action) def __rshift__(self,tgt): return self.src.add_successor(tgt,self.action)
class Node(BaseNode): class Node(BaseNode):
def __init__(self,max_retries=1): super().__init__();self.max_retries=max_retries def __init__(self,max_retries=1,wait=0): super().__init__();self.max_retries,self.wait =max_retries,wait
def exec_fallback(self,prep_res,exc): raise exc def exec_fallback(self,prep_res,exc): raise exc
def _exec(self,prep_res): def _exec(self,prep_res):
for i in range(self.max_retries): for i in range(self.max_retries):
try: return self.exec(prep_res) try: return self.exec(prep_res)
except Exception as e: except Exception as e:
if i==self.max_retries-1: return self.exec_fallback(prep_res,e) if i==self.max_retries-1: return self.exec_fallback(prep_res,e)
if self.wait>0: time.sleep(self.wait)
class BatchNode(Node): class BatchNode(Node):
def _exec(self,items): return [super(BatchNode,self)._exec(i) for i in items] def _exec(self,items): return [super(BatchNode,self)._exec(i) for i in items]
@ -68,6 +69,7 @@ class AsyncNode(Node):
try: return await self.exec_async(prep_res) try: return await self.exec_async(prep_res)
except Exception as e: except Exception as e:
if i==self.max_retries-1: return await self.exec_fallback_async(prep_res,e) if i==self.max_retries-1: return await self.exec_fallback_async(prep_res,e)
if self.wait>0: await asyncio.sleep(self.wait)
async def run_async(self,shared): async def run_async(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.") if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.")
return await self._run_async(shared) return await self._run_async(shared)
@ -82,9 +84,7 @@ class AsyncParallelBatchNode(AsyncNode):
class AsyncFlow(Flow,AsyncNode): class AsyncFlow(Flow,AsyncNode):
async def _orch_async(self,shared,params=None): async def _orch_async(self,shared,params=None):
curr,p=copy.copy(self.start),(params or {**self.params}) curr,p=copy.copy(self.start),(params or {**self.params})
while curr: while curr:curr.set_params(p);c=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared);curr=copy.copy(self.get_next_node(curr,c))
curr.set_params(p);c=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared)
curr=copy.copy(self.get_next_node(curr,c))
async def _run_async(self,shared): p=await self.prep_async(shared);await self._orch_async(shared);return await self.post_async(shared,p,None) async def _run_async(self,shared): p=await self.prep_async(shared);await self._orch_async(shared);return await self.post_async(shared,p,None)
class AsyncBatchFlow(AsyncFlow): class AsyncBatchFlow(AsyncFlow):

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name="minillmflow", name="minillmflow",
version="0.0.4", version="0.0.5",
packages=find_packages(), packages=find_packages(),
author="Zachary Huang", author="Zachary Huang",
author_email="zh2408@columbia.edu", author_email="zh2408@columbia.edu",