change the syntax of exec

This commit is contained in:
zachary62 2024-12-29 02:40:27 +00:00
parent 96cecb9086
commit 2550decdc5
11 changed files with 103 additions and 101 deletions

File diff suppressed because one or more lines are too long

View File

@ -19,9 +19,11 @@ Below is a minimal **AsyncNode** that calls an LLM in `exec()` to summarize text
```python ```python
class SummarizeThenVerify(AsyncNode): class SummarizeThenVerify(AsyncNode):
def exec(self, shared, prep_res): def prep(self, shared):
doc = shared.get("doc", "") return shared.get("doc", "")
return call_llm(f"Summarize: {doc}")
def exec(self, prep_res):
return call_llm(f"Summarize: {prep_res}")
async def post_async(self, shared, prep_res, exec_res): async def post_async(self, shared, prep_res, exec_res):
user_decision = await gather_user_feedback(exec_res) user_decision = await gather_user_feedback(exec_res)

View File

@ -9,7 +9,7 @@ nav_order: 3
Nodes and Flows **communicate** in two ways: Nodes and Flows **communicate** in two ways:
1. **Shared Store** A global data structure (often an in-mem dict) that all nodes can read from and write to. Every Nodes `prep()`, `exec()`, and `post()` methods receive the **same** `shared` store. 1. **Shared Store** A global data structure (often an in-mem dict) that all nodes can read from and write to. Every Nodes `prep()` and `post()` methods receive the **same** `shared` store.
2. **Params** Each node and Flow has a `params` dict assigned by the **parent Flow**. Params mostly serve as identifiers, letting each node/flow know what task its assigned. 2. **Params** Each node and Flow has a `params` dict assigned by the **parent Flow**. Params mostly serve as identifiers, letting each node/flow know what task its assigned.
If you know memory management, **Shared Store** is like a **heap** shared across function calls, while **Params** is like a **stack** assigned by parent function calls. If you know memory management, **Shared Store** is like a **heap** shared across function calls, while **Params** is like a **stack** assigned by parent function calls.
@ -47,7 +47,7 @@ class Summarize(Node):
content = shared["data"].get("my_file.txt", "") content = shared["data"].get("my_file.txt", "")
return content return content
def exec(self, shared, prep_res): def exec(self, prep_res):
prompt = f"Summarize: {prep_res}" prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt) summary = call_llm(prompt)
return summary return summary
@ -91,7 +91,7 @@ class SummarizeFile(Node):
filename = self.params["filename"] filename = self.params["filename"]
return shared["data"].get(filename, "") return shared["data"].get(filename, "")
def exec(self, shared, prep_res): def exec(self, prep_res):
prompt = f"Summarize: {prep_res}" prompt = f"Summarize: {prep_res}"
return call_llm(prompt) return call_llm(prompt)

View File

@ -14,9 +14,10 @@ A **Node** is the smallest building block of Mini LLM Flow. Each Node has three
- Often used for tasks like reading files, chunking text, or validation. - Often used for tasks like reading files, chunking text, or validation.
- Returns `prep_res`, which will be passed to both `exec()` and `post()`. - Returns `prep_res`, which will be passed to both `exec()` and `post()`.
2. **`exec(shared, prep_res)`** 2. **`exec(prep_res)`**
- The main execution step where the LLM is called. - The main execution step where the LLM is called.
- Has a built-in retry feature to handle errors and ensure reliable results. - Optionally has built-in retry and error handling (below).
- ⚠️ If retry enabled, ensure implementation is idempotent.
- Returns `exec_res`, which is passed to `post()`. - Returns `exec_res`, which is passed to `post()`.
3. **`post(shared, prep_res, exec_res)`** 3. **`post(shared, prep_res, exec_res)`**
@ -55,7 +56,7 @@ class SummarizeFile(Node):
filename = self.params["filename"] filename = self.params["filename"]
return shared["data"][filename] return shared["data"][filename]
def exec(self, shared, prep_res): def exec(self, prep_res):
if not prep_res: if not prep_res:
raise ValueError("Empty file content!") raise ValueError("Empty file content!")
prompt = f"Summarize this text in 10 words: {prep_res}" prompt = f"Summarize this text in 10 words: {prep_res}"

View File

@ -7,12 +7,12 @@ class BaseNode:
if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'") if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'")
self.successors[action]=node;return node self.successors[action]=node;return node
def prep(self,shared): return None def prep(self,shared): return None
def exec(self,shared,prep_res): return None def exec(self,prep_res): return None
def _exec(self,shared,prep_res): return self.exec(shared,prep_res) def _exec(self,prep_res): return self.exec(prep_res)
def post(self,shared,prep_res,exec_res): return "default" def post(self,shared,prep_res,exec_res): return "default"
def _run(self,shared): def _run(self,shared):
prep_res=self.prep(shared) prep_res=self.prep(shared)
exec_res=self._exec(shared,prep_res) exec_res=self._exec(prep_res)
return self.post(shared,prep_res,exec_res) return self.post(shared,prep_res,exec_res)
def run(self,shared): def run(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use a parent Flow instead.") if self.successors: warnings.warn("Node won't run successors. Use a parent Flow instead.")
@ -30,16 +30,16 @@ class Node(BaseNode):
def __init__(self,max_retries=1): def __init__(self,max_retries=1):
super().__init__() super().__init__()
self.max_retries=max_retries self.max_retries=max_retries
def process_after_fail(self,shared,prep_res,exc): raise exc def process_after_fail(self,prep_res,exc): raise exc
def _exec(self,shared,prep_res): def _exec(self,prep_res):
for i in range(self.max_retries): for i in range(self.max_retries):
try:return super()._exec(shared,prep_res) try:return super()._exec(prep_res)
except Exception as e: except Exception as e:
if i==self.max_retries-1:return self.process_after_fail(shared,prep_res,e) if i==self.max_retries-1:return self.process_after_fail(prep_res,e)
class BatchNode(Node): class BatchNode(Node):
def prep(self,shared): return [] def prep(self,shared): return []
def _exec(self,shared,items): return [super(Node,self)._exec(shared,i) for i in items] def _exec(self,items): return [super(Node,self)._exec(i) for i in items]
class Flow(BaseNode): class Flow(BaseNode):
def __init__(self,start): def __init__(self,start):
@ -47,23 +47,24 @@ class Flow(BaseNode):
self.start=start self.start=start
def get_next_node(self,curr,action): def get_next_node(self,curr,action):
nxt=curr.successors.get(action if action is not None else "default") nxt=curr.successors.get(action if action is not None else "default")
if not nxt and curr.successors: if not nxt and curr.successors: warnings.warn(f"Flow ends: action '{action}' not found in {list(curr.successors)}")
warnings.warn(f"Flow ends: action '{action}' not found in {list(curr.successors)}")
return nxt return nxt
def _exec(self,shared,params=None): def _orchestrate(self,shared,params=None):
curr,p=self.start,(params if params else {**self.params}) curr,p=self.start,(params if params else {**self.params})
while curr: while curr:
curr.set_params(p) curr.set_params(p)
c=curr._run(shared) curr=self.get_next_node(curr,curr._run(shared))
curr=self.get_next_node(curr,c) def _run(self,shared):
def exec(self,shared,prep_res): self._orchestrate(shared)
return self.post(shared,self.prep(shared),None)
def exec(self,prep_res):
raise RuntimeError("Flow should not exec directly. Create a child Node instead.") raise RuntimeError("Flow should not exec directly. Create a child Node instead.")
class BatchFlow(Flow): class BatchFlow(Flow):
def prep(self,shared): return [] def prep(self,shared): return []
def _run(self,shared): def _run(self,shared):
prep_res=self.prep(shared) prep_res=self.prep(shared)
for batch_params in prep_res:self._exec(shared,{**self.params,**batch_params}) for batch_params in prep_res:self._orchestrate(shared,{**self.params,**batch_params})
return self.post(shared,prep_res,None) return self.post(shared,prep_res,None)
class AsyncNode(Node): class AsyncNode(Node):
@ -77,24 +78,23 @@ class AsyncNode(Node):
return await self._run_async(shared) return await self._run_async(shared)
async def _run_async(self,shared): async def _run_async(self,shared):
prep_res=self.prep(shared) prep_res=self.prep(shared)
exec_res=self._exec(shared,prep_res) exec_res=self._exec(prep_res)
return await self.post_async(shared,prep_res,exec_res) return await self.post_async(shared,prep_res,exec_res)
def _run(self,shared): raise RuntimeError("AsyncNode should run using run_async instead.") def _run(self,shared): raise RuntimeError("AsyncNode should run using run_async instead.")
class AsyncFlow(Flow,AsyncNode): class AsyncFlow(Flow,AsyncNode):
async def _exec_async(self,shared,params=None): async def _orchestrate_async(self,shared,params=None):
curr,p=self.start,(params if params else {**self.params}) curr,p=self.start,(params if params else {**self.params})
while curr: while curr:
curr.set_params(p) curr.set_params(p)
c=await curr._run_async(shared) if hasattr(curr,"run_async") else curr._run(shared) c=await curr._run_async(shared) if hasattr(curr,"run_async") else curr._run(shared)
curr=self.get_next_node(curr,c) curr=self.get_next_node(curr,c)
async def _run_async(self,shared): async def _run_async(self,shared):
prep_res=self.prep(shared) await self._orchestrate_async(shared)
await self._exec_async(shared) return await self.post_async(shared,self.prep(shared),None)
return await self.post_async(shared,prep_res,None)
class BatchAsyncFlow(BatchFlow,AsyncFlow): class BatchAsyncFlow(BatchFlow,AsyncFlow):
async def _run_async(self,shared): async def _run_async(self,shared):
prep_res=self.prep(shared) prep_res=self.prep(shared)
for batch_params in prep_res:await self._exec_async(shared,{**self.params,**batch_params}) for batch_params in prep_res:await self._orchestrate_async(shared,{**self.params,**batch_params})
return await self.post_async(shared,prep_res,None) return await self.post_async(shared,prep_res,None)

View File

@ -3,11 +3,11 @@ import asyncio
import sys import sys
from pathlib import Path from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from minillmflow import AsyncNode, BatchAsyncFlow from minillmflow import AsyncNode, BatchAsyncFlow
class AsyncDataProcessNode(AsyncNode): class AsyncDataProcessNode(AsyncNode):
def exec(self, shared_storage, prep_result): def prep(self, shared_storage):
key = self.params.get('key') key = self.params.get('key')
data = shared_storage['input_data'][key] data = shared_storage['input_data'][key]
if 'results' not in shared_storage: if 'results' not in shared_storage:
@ -18,7 +18,7 @@ class AsyncDataProcessNode(AsyncNode):
async def post_async(self, shared_storage, prep_result, proc_result): async def post_async(self, shared_storage, prep_result, proc_result):
await asyncio.sleep(0.01) # Simulate async work await asyncio.sleep(0.01) # Simulate async work
key = self.params.get('key') key = self.params.get('key')
shared_storage['results'][key] = proc_result * 2 # Double the value shared_storage['results'][key] = prep_result * 2 # Double the value
return "processed" return "processed"
class AsyncErrorNode(AsyncNode): class AsyncErrorNode(AsyncNode):

View File

@ -3,7 +3,7 @@ import asyncio
import sys import sys
from pathlib import Path from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from minillmflow import Node, AsyncNode, AsyncFlow from minillmflow import Node, AsyncNode, AsyncFlow
@ -17,7 +17,7 @@ class AsyncNumberNode(AsyncNode):
super().__init__() super().__init__()
self.number = number self.number = number
def exec(self, shared_storage, data): def prep(self, shared_storage):
# Synchronous work is allowed inside an AsyncNode, # Synchronous work is allowed inside an AsyncNode,
# but final 'condition' is determined by post_async(). # but final 'condition' is determined by post_async().
shared_storage['current'] = self.number shared_storage['current'] = self.number
@ -34,7 +34,7 @@ class AsyncIncrementNode(AsyncNode):
""" """
Demonstrates incrementing the 'current' value asynchronously. Demonstrates incrementing the 'current' value asynchronously.
""" """
def exec(self, shared_storage, data): def prep(self, shared_storage):
shared_storage['current'] = shared_storage.get('current', 0) + 1 shared_storage['current'] = shared_storage.get('current', 0) + 1
return "incremented" return "incremented"
@ -110,7 +110,7 @@ class TestAsyncFlow(unittest.TestCase):
""" """
class BranchingAsyncNode(AsyncNode): class BranchingAsyncNode(AsyncNode):
def exec(self, shared_storage, data): def exec(self, data):
value = shared_storage.get("value", 0) value = shared_storage.get("value", 0)
shared_storage["value"] = value shared_storage["value"] = value
# We'll decide branch based on whether 'value' is positive # We'll decide branch based on whether 'value' is positive
@ -124,12 +124,12 @@ class TestAsyncFlow(unittest.TestCase):
return "negative_branch" return "negative_branch"
class PositiveNode(Node): class PositiveNode(Node):
def exec(self, shared_storage, data): def exec(self, data):
shared_storage["path"] = "positive" shared_storage["path"] = "positive"
return None return None
class NegativeNode(Node): class NegativeNode(Node):
def exec(self, shared_storage, data): def exec(self, data):
shared_storage["path"] = "negative" shared_storage["path"] = "negative"
return None return None

View File

@ -2,11 +2,11 @@ import unittest
import sys import sys
from pathlib import Path from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from minillmflow import Node, BatchFlow, Flow from minillmflow import Node, BatchFlow, Flow
class DataProcessNode(Node): class DataProcessNode(Node):
def exec(self, shared_storage, prep_result): def prep(self, shared_storage):
key = self.params.get('key') key = self.params.get('key')
data = shared_storage['input_data'][key] data = shared_storage['input_data'][key]
if 'results' not in shared_storage: if 'results' not in shared_storage:
@ -14,7 +14,7 @@ class DataProcessNode(Node):
shared_storage['results'][key] = data * 2 shared_storage['results'][key] = data * 2
class ErrorProcessNode(Node): class ErrorProcessNode(Node):
def exec(self, shared_storage, prep_result): def prep(self, shared_storage):
key = self.params.get('key') key = self.params.get('key')
if key == 'error_key': if key == 'error_key':
raise ValueError(f"Error processing key: {key}") raise ValueError(f"Error processing key: {key}")
@ -107,14 +107,14 @@ class TestBatchFlow(unittest.TestCase):
def test_nested_flow(self): def test_nested_flow(self):
"""Test batch processing with nested flows""" """Test batch processing with nested flows"""
class InnerNode(Node): class InnerNode(Node):
def exec(self, shared_storage, prep_result): def exec(self, prep_result):
key = self.params.get('key') key = self.params.get('key')
if 'intermediate_results' not in shared_storage: if 'intermediate_results' not in shared_storage:
shared_storage['intermediate_results'] = {} shared_storage['intermediate_results'] = {}
shared_storage['intermediate_results'][key] = shared_storage['input_data'][key] + 1 shared_storage['intermediate_results'][key] = shared_storage['input_data'][key] + 1
class OuterNode(Node): class OuterNode(Node):
def exec(self, shared_storage, prep_result): def exec(self, prep_result):
key = self.params.get('key') key = self.params.get('key')
if 'results' not in shared_storage: if 'results' not in shared_storage:
shared_storage['results'] = {} shared_storage['results'] = {}
@ -148,7 +148,7 @@ class TestBatchFlow(unittest.TestCase):
def test_custom_parameters(self): def test_custom_parameters(self):
"""Test batch processing with additional custom parameters""" """Test batch processing with additional custom parameters"""
class CustomParamNode(Node): class CustomParamNode(Node):
def exec(self, shared_storage, prep_result): def exec(self, prep_result):
key = self.params.get('key') key = self.params.get('key')
multiplier = self.params.get('multiplier', 1) multiplier = self.params.get('multiplier', 1)
if 'results' not in shared_storage: if 'results' not in shared_storage:

View File

@ -2,7 +2,7 @@ import unittest
import sys import sys
from pathlib import Path from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from minillmflow import Node, BatchNode, Flow from minillmflow import Node, BatchNode, Flow
class ArrayChunkNode(BatchNode): class ArrayChunkNode(BatchNode):
@ -14,16 +14,14 @@ class ArrayChunkNode(BatchNode):
# Get array from shared storage and split into chunks # Get array from shared storage and split into chunks
array = shared_storage.get('input_array', []) array = shared_storage.get('input_array', [])
chunks = [] chunks = []
for i in range(0, len(array), self.chunk_size): for start in range(0, len(array), self.chunk_size):
end = min(i + self.chunk_size, len(array)) end = min(start + self.chunk_size, len(array))
chunks.append((i, end)) chunks.append(array[start: end])
return chunks return chunks
def exec(self, shared_storage, chunk_indices): def exec(self, chunk):
start, end = chunk_indices
array = shared_storage['input_array']
# Process the chunk and return its sum # Process the chunk and return its sum
chunk_sum = sum(array[start:end]) chunk_sum = sum(chunk)
return chunk_sum return chunk_sum
def post(self, shared_storage, prep_result, proc_result): def post(self, shared_storage, prep_result, proc_result):
@ -32,7 +30,7 @@ class ArrayChunkNode(BatchNode):
return "default" return "default"
class SumReduceNode(Node): class SumReduceNode(Node):
def exec(self, shared_storage, data): def prep(self, shared_storage):
# Get chunk results from shared storage and sum them # Get chunk results from shared storage and sum them
chunk_results = shared_storage.get('chunk_results', []) chunk_results = shared_storage.get('chunk_results', [])
total = sum(chunk_results) total = sum(chunk_results)
@ -48,9 +46,9 @@ class TestBatchNode(unittest.TestCase):
} }
chunk_node = ArrayChunkNode(chunk_size=10) chunk_node = ArrayChunkNode(chunk_size=10)
chunks = chunk_node.prep(shared_storage) chunk_node.run(shared_storage)
results = shared_storage['chunk_results']
self.assertEqual(chunks, [(0, 10), (10, 20), (20, 25)]) self.assertEqual(results, [45, 145, 110])
def test_map_reduce_sum(self): def test_map_reduce_sum(self):
""" """

View File

@ -2,7 +2,7 @@ import unittest
import sys import sys
from pathlib import Path from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from minillmflow import Node, Flow from minillmflow import Node, Flow
class NumberNode(Node): class NumberNode(Node):
@ -10,7 +10,7 @@ class NumberNode(Node):
super().__init__() super().__init__()
self.number = number self.number = number
def exec(self, shared_storage, data): def prep(self, shared_storage):
shared_storage['current'] = self.number shared_storage['current'] = self.number
class AddNode(Node): class AddNode(Node):
@ -18,7 +18,7 @@ class AddNode(Node):
super().__init__() super().__init__()
self.number = number self.number = number
def exec(self, shared_storage, data): def prep(self, shared_storage):
shared_storage['current'] += self.number shared_storage['current'] += self.number
class MultiplyNode(Node): class MultiplyNode(Node):
@ -26,7 +26,7 @@ class MultiplyNode(Node):
super().__init__() super().__init__()
self.number = number self.number = number
def exec(self, shared_storage, data): def prep(self, shared_storage):
shared_storage['current'] *= self.number shared_storage['current'] *= self.number
class CheckPositiveNode(Node): class CheckPositiveNode(Node):
@ -37,7 +37,7 @@ class CheckPositiveNode(Node):
return 'negative' return 'negative'
class NoOpNode(Node): class NoOpNode(Node):
def exec(self, shared_storage, data): def prep(self, shared_storage):
# Do nothing, just pass # Do nothing, just pass
pass pass

View File

@ -2,8 +2,9 @@ import unittest
import asyncio import asyncio
import sys import sys
from pathlib import Path from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
sys.path.insert(0, str(Path(__file__).parent.parent))
from minillmflow import Node, Flow from minillmflow import Node, Flow
# Simple example Nodes # Simple example Nodes
@ -12,7 +13,7 @@ class NumberNode(Node):
super().__init__() super().__init__()
self.number = number self.number = number
def exec(self, shared_storage, prep_result): def prep(self, shared_storage):
shared_storage['current'] = self.number shared_storage['current'] = self.number
class AddNode(Node): class AddNode(Node):
@ -20,7 +21,7 @@ class AddNode(Node):
super().__init__() super().__init__()
self.number = number self.number = number
def exec(self, shared_storage, prep_result): def prep(self, shared_storage):
shared_storage['current'] += self.number shared_storage['current'] += self.number
class MultiplyNode(Node): class MultiplyNode(Node):
@ -28,7 +29,7 @@ class MultiplyNode(Node):
super().__init__() super().__init__()
self.number = number self.number = number
def exec(self, shared_storage, prep_result): def prep(self, shared_storage):
shared_storage['current'] *= self.number shared_storage['current'] *= self.number
class TestFlowComposition(unittest.TestCase): class TestFlowComposition(unittest.TestCase):