From 7f107f60e65a1fff0e1616deb6a3da7d586ef7a8 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Wed, 25 Dec 2024 01:06:38 +0000 Subject: [PATCH] first lib --- minillmflow/__init__.py | 117 ++++++++++++++++++ .../__pycache__/__init__.cpython-39.pyc | Bin 0 -> 2113 bytes 2 files changed, 117 insertions(+) create mode 100644 minillmflow/__init__.py create mode 100644 minillmflow/__pycache__/__init__.cpython-39.pyc diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py new file mode 100644 index 0000000..a76402d --- /dev/null +++ b/minillmflow/__init__.py @@ -0,0 +1,117 @@ +import asyncio + +class BaseNode: + def __init__(self): + self.set_parameters({}) # immutable during processing; could be overwritten as node can be reused + self.successors = {} + + def set_parameters(self, parameters): + self.parameters = parameters.copy() if parameters else {} + + def add_successor(self, node, condition="default"): + self.successors[condition] = node + return node + + async def preprocess(self, shared_storage): + return None + + async def process_one(self, shared_storage, item): + return None + + async def process(self, shared_storage, preprocess_result): + return await self.process_one(shared_storage, preprocess_result) + + async def postprocess(self, shared_storage, preprocess_result, process_result): + return "default" + + async def run_one(self, shared_storage): + preprocess_result = await self.preprocess(shared_storage) + process_result = await self.process(shared_storage, preprocess_result) + condition = await self.postprocess(shared_storage, preprocess_result, process_result) + + if not self.successors: + return None + elif len(self.successors) == 1: + return next(iter(self.successors.values())) + return self.successors.get(condition) + + def run(self, shared_storage=None): + return asyncio.run(self.run_async(shared_storage)) + + async def run_async(self, shared_storage=None): + shared_storage = shared_storage or {} + current_node = self + while current_node: + current_node = await current_node.run_one(shared_storage) + + def __rshift__(self, other): + return self.add_successor(other) + + def __gt__(self, other): + if isinstance(other, str): + return _ConditionalTransition(self, other) + elif isinstance(other, BaseNode): + return self.add_successor(other) + raise TypeError("Unsupported operand type") + + def __call__(self, condition): + return _ConditionalTransition(self, condition) + +class _ConditionalTransition: + def __init__(self, source_node, condition): + self.source_node = source_node + self.condition = condition + + def __gt__(self, target_node): + if not isinstance(target_node, BaseNode): + raise TypeError("Target must be a BaseNode") + return self.source_node.add_successor(target_node, self.condition) + +class BaseSuperNode(BaseNode): + def __init__(self, start_node=None): + super().__init__() + self.start_node = start_node + + async def process_one(self, shared_storage, item): + if self.start_node: + current_node = self.start_node + while current_node: + current_node.set_parameters(self.parameters) + current_node = await current_node.run_one(shared_storage or {}) + +class BatchMixin: + async def process(self, shared_storage, items): + partial_results = [] + for item in items: + r = await self.process_one(shared_storage, item) + partial_results.append(r) + + return self.merge(shared_storage, partial_results) + + def merge(self, shared_storage, partial_results): + return partial_results + + async def preprocess(self, shared_storage): + return [] + +class BatchBaseNode(BatchMixin, BaseNode): + async def preprocess(self, shared_storage): + return [] + + async def process_one(self, shared_storage, item): + return None + +class BatchSuperNode(BatchMixin, BaseSuperNode): + async def preprocess(self, shared_storage): + return [] + + async def process_one(self, shared_storage, param_dict): + node_parameters = self.parameters.copy() + node_parameters.update(param_dict) + + if self.start_node: + current_node = self.start_node + while current_node: + current_node.set_parameters(node_parameters) + current_node = await current_node.run_one(shared_storage or {}) + diff --git a/minillmflow/__pycache__/__init__.cpython-39.pyc b/minillmflow/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..549c726f48c718f75fe2e076ad03e5243998367d GIT binary patch literal 2113 zcmbtV&2A$_5bo~z^~B?3!vYIr*%e$c2PZ-tI7HD(tboJfOE@44G#X8J5^u(yN%weJ zEazlP`vAN^e9U9;8hz!&nHwjnYCN`+<$`FdYO1H(_4`##8pjcX_4~ui^Qq6+AGq0G zE^M9wnNNUy%OsP00W=o_+zIKx&QWac47hYpS=Y-Gz&`WdQi7RmzXYasJ6uo*A? zA)B#;%|y9J5+^PsJ7u@L-*s$MraB#|N@-)`yvWi#F;$_{b7dnl$ueb3p$%Mgjmpn7 z+R&JdTc!he{C#wIQH<2#^x}&zkG?t_4adV5FTVdKFMc68&qrq{CkY%>NzyBSwPCYC zUkzlNF&@Eh8e>Sk=F|pKou!k!dMifsfnw}{Fyk}!65>0CqMR~~I=ftZ_;55Tx9%9! z+E!MK<**tSWAeLZA)d#nl*u|&49cJu#;5z_(4FIqj_?>3${dKC!KAERAoaJRcA#S} zQmw#7o!9&Jqf)C<7no@TVPkxkjBn2Vi)D?Oxo;;}8Hu-yv{y#R`v11DQx>LLS=rmN zvR7N#9KK}7dR2oyUCw^AvA>bU#0m(8S9*@>R(gp z=*jpU_^Uy6HHstQbC-AcKA(mZHeBh7ejh+tn@0SPq2rRoMoBU%W)O|-kz;}qzErux87#xym0}FZNMNUXRI?3SWorMXD~}fXpLc6`Nz_#LQU=s3NpzLL;YcPse&@QYuYR z!j!2}T_s~^(ymJlqo06^tx3aeD@n3EH6}^wuqk595#BW93Z#4*qigE{atCDS8Wx#c zT!4U6!|Yca93@~8T-ddk3H^K~07kBaOe|^;c}w;jKt+%g_5Evc^$6hUuR$4@9|@Ep zXbu3B4uJA?)v6-g-|TFGGPr3=mq6+DGpt#&&^Zj|6v(^>42Sj4ou9-t2ge|vzlkgO zx@<}hV_Ec$EWfn;)#k0Hu@)W6VOC|us66R{s=t8P^nfx1knA1PtT!jU%{7fsu2CA- zHNcOECdi>{)}NFzt(5hkJ4R;HIr<^qdlwmct=~t6I9QEfy$kZ7olUe>30Qn3~jb@x^~ru$92=5P`lwbp!4`24Q`RU literal 0 HcmV?d00001