From 118293a641da6f9c77f9c61dd63a1c0b7144af5d Mon Sep 17 00:00:00 2001 From: zachary62 Date: Wed, 1 Jan 2025 21:06:05 +0000 Subject: [PATCH] wait --- docs/node.md | 13 +++++++++---- minillmflow/__init__.py | 10 +++++----- setup.py | 2 +- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/docs/node.md b/docs/node.md index 39d96b0..581388c 100644 --- a/docs/node.md +++ b/docs/node.md @@ -11,7 +11,7 @@ A **Node** is the smallest building block of Mini LLM Flow. Each Node has 3 step 1. **`prep(shared)`** - Reads and preprocesses data from the `shared` store for LLMs. - - Examples: query DB, read files, or serialize data into a string. + - Examples: *query DB, read files, or serialize data into a string*. - Returns `prep_res`, which will be passed to both `exec()` and `post()`. 2. **`exec(prep_res)`** @@ -22,17 +22,20 @@ A **Node** is the smallest building block of Mini LLM Flow. Each Node has 3 step 3. **`post(shared, prep_res, exec_res)`** - Writes results back to the `shared` store or decides the next action. - - Examples: finalize outputs, trigger next steps, or log results. + - Examples: *finalize outputs, trigger next steps, or log results*. - Returns a **string** to specify the next action (`"default"` if nothing or `None` is returned). All 3 steps are optional. For example, you might only need to run the Prep without calling the LLM. ## Fault Tolerance & Retries -Nodes in Mini LLM Flow can **retry** execution if `exec()` raises an exception. You control this via a `max_retries` parameter when you create the Node. By default, `max_retries = 1` (meaning no retry). +Nodes in Mini LLM Flow can **retry** execution if `exec()` raises an exception. You control this via two parameters when you create the Node: + +- `max_retries` (int): How many times to try running `exec()`. The default is `1`, which means **no** retry. +- `wait` (int): The time to wait (in **seconds**) before each retry attempt. By default, `wait=0` (i.e., no waiting). Increasing this is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. ```python -my_node = SummarizeFile(max_retries=3) +my_node = SummarizeFile(max_retries=3, wait=10) ``` When an exception occurs in `exec()`, the Node automatically retries until: @@ -40,6 +43,8 @@ When an exception occurs in `exec()`, the Node automatically retries until: - It either succeeds, or - The Node has retried `max_retries - 1` times already and fails on the last attempt. +### Graceful Fallback + If you want to **gracefully handle** the error rather than raising it, you can override: ```python diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py index a2b5fb3..00506ae 100644 --- a/minillmflow/__init__.py +++ b/minillmflow/__init__.py @@ -1,4 +1,4 @@ -import asyncio, warnings, copy +import asyncio, warnings, copy, time class BaseNode: def __init__(self): self.params,self.successors={},{} @@ -24,13 +24,14 @@ class _ConditionalTransition: def __rshift__(self,tgt): return self.src.add_successor(tgt,self.action) class Node(BaseNode): - def __init__(self,max_retries=1): super().__init__();self.max_retries=max_retries + def __init__(self,max_retries=1,wait=0): super().__init__();self.max_retries,self.wait =max_retries,wait def exec_fallback(self,prep_res,exc): raise exc def _exec(self,prep_res): for i in range(self.max_retries): try: return self.exec(prep_res) except Exception as e: if i==self.max_retries-1: return self.exec_fallback(prep_res,e) + if self.wait>0: time.sleep(self.wait) class BatchNode(Node): def _exec(self,items): return [super(BatchNode,self)._exec(i) for i in items] @@ -68,6 +69,7 @@ class AsyncNode(Node): try: return await self.exec_async(prep_res) except Exception as e: if i==self.max_retries-1: return await self.exec_fallback_async(prep_res,e) + if self.wait>0: await asyncio.sleep(self.wait) async def run_async(self,shared): if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.") return await self._run_async(shared) @@ -82,9 +84,7 @@ class AsyncParallelBatchNode(AsyncNode): class AsyncFlow(Flow,AsyncNode): async def _orch_async(self,shared,params=None): curr,p=copy.copy(self.start),(params or {**self.params}) - while curr: - curr.set_params(p);c=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared) - curr=copy.copy(self.get_next_node(curr,c)) + while curr:curr.set_params(p);c=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared);curr=copy.copy(self.get_next_node(curr,c)) async def _run_async(self,shared): p=await self.prep_async(shared);await self._orch_async(shared);return await self.post_async(shared,p,None) class AsyncBatchFlow(AsyncFlow): diff --git a/setup.py b/setup.py index 47ffcdb..4c1a3b6 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name="minillmflow", - version="0.0.4", + version="0.0.5", packages=find_packages(), author="Zachary Huang", author_email="zh2408@columbia.edu",