From 4f60d32e686abeecb693be3d50bcff6cf7bff562 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Wed, 25 Dec 2024 06:02:59 +0000 Subject: [PATCH] adapt for jupyter notebook --- minillmflow/__init__.py | 64 ++++++++++-------- .../__pycache__/__init__.cpython-39.pyc | Bin 2113 -> 7862 bytes 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py index af0286a..2a7ea3d 100644 --- a/minillmflow/__init__.py +++ b/minillmflow/__init__.py @@ -33,12 +33,26 @@ class NodeMeta(type): attrs[attr_name] = _wrap_async(old_fn) return super().__new__(mcs, name, bases, attrs) + + +async def hello(text): + print("Start") + await asyncio.sleep(1) # Simulate some async work + print(text) class BaseNode(metaclass=NodeMeta): def __init__(self): self.parameters = {} self.successors = {} + # Syntactic sugar for chaining + def add_successor(self, node, condition="default"): + # warn if we're overwriting an existing successor + if condition in self.successors: + print(f"Warning: overwriting existing successor for condition '{condition}'") + self.successors[condition] = node + return node + # By default these are already async. If a subclass overrides them # with non-async definitions, they'll get wrapped automatically. def preprocess(self, shared_storage): @@ -56,28 +70,20 @@ class BaseNode(metaclass=NodeMeta): async def _run_one(self, shared_storage): preprocess_result = await self.preprocess(shared_storage) process_result = await self._process(shared_storage, preprocess_result) - condition = await self.postprocess(shared_storage, preprocess_result, process_result) - - if not self.successors: - return None - if len(self.successors) == 1: - return next(iter(self.successors.values())) + condition = await self.postprocess(shared_storage, preprocess_result, process_result) or "default" return self.successors.get(condition) def run(self, shared_storage=None): - return asyncio.run(self.run_async(shared_storage)) + asyncio.run(self._run_async(shared_storage)) - async def run_async(self, shared_storage=None): - shared_storage = shared_storage or {} + async def run_in_jupyter(self, shared_storage=None): + await self._run_async(shared_storage) + + async def _run_async(self, shared_storage): current_node = self while current_node: current_node = await current_node._run_one(shared_storage) - # Syntactic sugar for chaining - def add_successor(self, node, condition="default"): - self.successors[condition] = node - return node - def __rshift__(self, other): return self.add_successor(other) @@ -91,7 +97,6 @@ class BaseNode(metaclass=NodeMeta): def __call__(self, condition): return _ConditionalTransition(self, condition) - class _ConditionalTransition: def __init__(self, source_node, condition): self.source_node = source_node @@ -102,7 +107,6 @@ class _ConditionalTransition: raise TypeError("Target must be a BaseNode") return self.source_node.add_successor(target_node, self.condition) - class Node(BaseNode): def __init__(self, max_retries=5, delay_s=0.1): super().__init__() @@ -110,8 +114,8 @@ class Node(BaseNode): self.delay_s = delay_s def process_after_fail(self, shared_storage, data, exc): - print(f"[FAIL_ITEM] data={data}, error={exc}") - return "fail" + raise exc + # return "fail" async def _process(self, shared_storage, data): for attempt in range(self.max_retries): @@ -121,6 +125,18 @@ class Node(BaseNode): if attempt == self.max_retries - 1: return await self.process_after_fail(shared_storage, data, e) await asyncio.sleep(self.delay_s) + +class Flow(BaseNode): + def __init__(self, start_node=None): + super().__init__() + self.start_node = start_node + + async def _process(self, shared_storage, _): + if self.start_node: + current_node = self.start_node + while current_node: + current_node = await current_node._run_one(shared_storage or {}) + return "Flow done" class BatchNode(BaseNode): def __init__(self, max_retries=5, delay_s=0.1): @@ -156,18 +172,6 @@ class BatchNode(BaseNode): results.append(r) return results -class Flow(BaseNode): - def __init__(self, start_node=None): - super().__init__() - self.start_node = start_node - - async def _process(self, shared_storage, _): - if self.start_node: - current_node = self.start_node - while current_node: - current_node = await current_node._run_one(shared_storage or {}) - return "Flow done" - class BatchFlow(BaseNode): def __init__(self, start_node=None): super().__init__() diff --git a/minillmflow/__pycache__/__init__.cpython-39.pyc b/minillmflow/__pycache__/__init__.cpython-39.pyc index 549c726f48c718f75fe2e076ad03e5243998367d..57d50b08411c435ab18b53be7c516393834bf612 100644 GIT binary patch literal 7862 zcmb_hTaO$^74GZY_Tu&0aU7G7hJ+*o$tHvV0Xe}TS3|M{+rcm)wAxd>yW`AGkE?rQ z?`ReY96=r+LPA1ZMDQXW@Dn^BAztyn=vN+4{sIyb-*@^l-MeE433|7xtE%gA>Ri5a zswP-k>KJ(b{lVLtCqHHw|6ry1xTu^#ihqJk8Oj_QJwsW_e%I)kFB{5H?z@I^L+iTT zvsFVi(e9wVrP^q3hqmhc)b6=xSx}2;S-fub8mKL)Wz?2K7vq|!A5kl)uSmUx`cbut z`l{61s2@|uQ9myAjxx_1-Mc;nMaFvH`=9HuC$zI39 z^YY-GaO4GEygTY|>1Y(~#NNivsGkg?k+(5=)YD1Zbzj6&~`fW6CZT`$=R z5}7R?Y>$V#UT`%SBr%%Nw=!|p>j%T(n)IxZOHv)A-yy{(kiBP2jZF~xd-I$zFs9ZY z%=ba`O>_MwC{LX@9B!mepf}^R@%B}|ZW!)PqvWQZx-yVzx zufF=)OT*}@l!wFZ4VHZ$9}?eR8}FvezU;`qs)O-3)Gd-h78>T{0cp&>ubm!7*g^d4 zTD`w4+qGnzrW#3uZnD%ymM&B#A&D_JYq*P+UckH0O7&qEKCy=^gnlWbXDPF1ixjDI zt)BB^aIEJ-l1|!Mk5u?tm;{qIgtsjAhe1pU1PKH;dMDJ7M<01G7{aQQ+H~=W9U|5 z+%Tv1)V;^pH{(`T+G`}%v~gY9Pa9WSiGAI;;Y^LGIkl$tzJsk;>)l4$8S8MYqkb62 zX|pISjH5UyDyw

poe`y0Vvm@X7;(I`w$R4SEDT{msYb_Y>*=(81)?}rS=UPESBW%jV3>aa$tvn&8BU%q<;(WN;jmbmLeuv#6CxH-T0#=wt>y9!I3?7EzxND|P z5?)I(o(N%LEAAHPjUM81R5JrCSuu4Nr9P|Ghl9=_#S>&RI|0fWz~K7?lg*x!qp_P| zaj&5|Y5_f(YEdoW-BQcy2;Ob=aY0+N*HQNi&YHaib(cDUZ;R?~brSC-b&ooQ_p625%!!id6Rga(m^D{0RFxyiJ ze8klH4gA_~ZORg+i7!*@G)f}X$rmpMdNdeqe%XtFDOdFX4q?*^uMJ`;l(c&rktXmc zbbupy_+RG307w$7DcY_rJA+YyXKB0acyOyrfl_`s>1Q!ACO8{b2TIW~Cnw51*E;qE zyx9>C%hW>IFE?ItURJwlWuz>}TL3?-mP9((3=dGEX(nGrD*VE+a^)FZ9b)sHmd!Yz zpTeA}qXHNU1(RiBoCJnvDl4v3RupvXi|EmV56kuC=rFByy@n4mYuCxNNme{MquSSD z3>`_A3((B!2gx~tdCnje3^+a`X9mrOI6$&)anvfZsc}Qle&5PKqc8wdbFcdI%<4=4 z?B!I6^hB&RP0zp91kO;6N!EP{g>2mds`jAn@J>+lf{r~KEmrP_?2NYY)8f~R%Qk$J zvkzwKFJh{$E1ICG3Yshtolfn|Fu^D8RwSyQX6Z}JsI-T7rFTX?JQbu(AqjU$K%Kjp1IP$4^|SasZ@qMx-%fNwgNQ<3@<8{0>*g$-Gk@Mt=Us=5Te*48~=&ux&3y@YX%|NCJPOug~9WPWzIK6pNA6P54iYYfXTl?cL=0ibDjSy z`hHZj$Ltf#9%IIxX6PVrCg>nIL9G)D(_djpjDqT+UtxBZ*^D7l5(hx`eFiB$g$&`L z+iAD#{IAimJ9f)Otto6qG}T4`k%j$z^shbK))}PuUSx*?C0jWNWpIFni*IN$>=2A| z0#Ynd45I>Hq6WVf#Vo|pj_!wh?VsaAFJ-Z+jMg)qV}`k)L75Jaa?t1?ex2NR+g(xZ*Wo$l>62)OO)6%YCDb03k2C5K{k8kE-(d+E? zO=fah5eCTKX=xEs$zCYU63LvYNU?A9s)t1k9jA5%a)J-S*lJv)zkn{O9bW4nm${kLT}=S&|KO}qr7yXy~s6Rbi&XN zF-O#EqhvD{9-kWErY4Jg_v|Z+a*0=b~=wFDL0K*g1`{na4pZ_)l|>RB^;GQ5=kDO5fJ7A5@E=g@JUtlvj_ z7425boDyqLi z%B}9WH9^!c<55X)<+`!Pufi4F;vC#}#TEPruHYm2S2aa$T=M4P-Gz&`WdQi7RmzXYasJ6uo*A? zA)B#;%|y9J5+^PsJ7u@L-*s$MraB#|N@-)`yvWi#F;$_{b7dnl$ueb3p$%Mgjmpn7 z+R&JdTc!he{C#wIQH<2#^x}&zkG?t_4adV5FTVdKFMc68&qrq{CkY%>NzyBSwPCYC zUkzlNF&@Eh8e>Sk=F|pKou!k!dMifsfnw}{Fyk}!65>0CqMR~~I=ftZ_;55Tx9%9! z+E!MK<**tSWAeLZA)d#nl*u|&49cJu#;5z_(4FIqj_?>3${dKC!KAERAoaJRcA#S} zQmw#7o!9&Jqf)C<7no@TVPkxkjBn2Vi)D?Oxo;;}8Hu-yv{y#R`v11DQx>LLS=rmN zvR7N#9KK}7dR2oyUCw^AvA>bU#0m(8S9*@>R(gp z=*jpU_^Uy6HHstQbC-AcKA(mZHeBh7ejh+tn@0SPq2rRoMoBU%W)O|-kz;}qzErux87#xym0}FZNMNUXRI?3SWorMXD~}fXpLc6`Nz_#LQU=s3NpzLL;YcPse&@QYuYR z!j!2}T_s~^(ymJlqo06^tx3aeD@n3EH6}^wuqk595#BW93Z#4*qigE{atCDS8Wx#c zT!4U6!|Yca93@~8T-ddk3H^K~07kBaOe|^;c}w;jKt+%g_5Evc^$6hUuR$4@9|@Ep zXbu3B4uJA?)v6-g-|TFGGPr3=mq6+DGpt#&&^Zj|6v(^>42Sj4ou9-t2ge|vzlkgO zx@<}hV_Ec$EWfn;)#k0Hu@)W6VOC|us66R{s=t8P^nfx1knA1PtT!jU%{7fsu2CA- zHNcOECdi>{)}NFzt(5hkJ4R;HIr<^qdlwmct=~t6I9QEfy$kZ7olUe>30Qn3~jb@x^~ru$92=5P`lwbp!4`24Q`RU