From 83dbc13054c26307ac7e573f6e347f222f0ed803 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Wed, 25 Dec 2024 20:19:21 +0000 Subject: [PATCH] asyncnode --- minillmflow/__init__.py | 181 +++++++----------- .../__pycache__/__init__.cpython-39.pyc | Bin 8594 -> 6738 bytes 2 files changed, 71 insertions(+), 110 deletions(-) diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py index 9d560b1..f3a8146 100644 --- a/minillmflow/__init__.py +++ b/minillmflow/__init__.py @@ -1,3 +1,5 @@ +import asyncio + class BaseNode: """ A base node that provides: @@ -33,9 +35,6 @@ class BaseNode: return "default" def run(self, shared_storage=None): - if not shared_storage: - shared_storage = {} - prep = self.preprocess(shared_storage) proc = self._process(shared_storage, prep) return self.postprocess(shared_storage, prep, proc) @@ -93,118 +92,80 @@ class Node(BaseNode): if attempt == self.max_retries - 1: return self.process_after_fail(shared_storage, data, e) -class InteractiveNode(BaseNode): - """ - Interactive node. Instead of returning a condition, - we 'signal' the condition via a callback provided by the Flow. - """ - - def postprocess(self, shared_storage, prep_result, proc_result, next_node_callback): - """ - We do NOT return anything. We call 'next_node_callback("some_condition")' - to tell the Flow which successor to pick. - """ - # e.g. here we pick "default", but in real usage you'd do logic or rely on user input - next_node_callback("default") - - def run(self, shared_storage=None): - """ - Run just THIS node (no chain). - """ - if not shared_storage: - shared_storage = {} - - # 1) Preprocess - prep = self.preprocess(shared_storage) - - # 2) Process - proc = self._process(shared_storage, prep) - - # 3) Postprocess with a dummy callback - def dummy_callback(condition="default"): - print("[Dummy callback] To run the flow, pass this node into a Flow instance.") - - self.postprocess(shared_storage, prep, proc, dummy_callback) - - def is_interactive(self): - return True - -class Flow: - """ - A Flow that runs through a chain of nodes, from a start node onward. - Each iteration: - - preprocess - - process - - postprocess - The postprocess is given a callback to choose the next node. - We'll 'yield' the current node each time, so the caller can see progress. - """ +class Flow(BaseNode): def __init__(self, start_node=None): self.start_node = start_node - - def run(self, shared_storage=None): - if shared_storage is None: - shared_storage = {} - - current_node = self.start_node - print("hihihi") + def _process(self, shared_storage, _): + current_node = self.start_node while current_node: - # 1) Preprocess - prep_result = current_node.preprocess(shared_storage) - print("prep") - # 2) Process - proc_result = current_node._process(shared_storage, prep_result) - - # Prepare next_node variable - next_node = [None] - - # We'll define a callback only if this is an interactive node. - # The callback sets next_node[0] based on condition. - def next_node_callback(condition="default"): - nxt = current_node.successors.get(condition) - next_node[0] = nxt - - # 3) Check if it's an interactive node - is_interactive = ( - hasattr(current_node, 'is_interactive') - and current_node.is_interactive() - ) - - if is_interactive: - print("ineractive") - # - # ---- INTERACTIVE CASE ---- - # - # a) yield so that external code can do UI, etc. - # yield current_node, prep_result, proc_result, next_node_callback - - # # b) Now we do postprocess WITH the callback: - # current_node.postprocess( - # shared_storage, - # prep_result, - # proc_result, - # next_node_callback - # ) - # # once postprocess is done, next_node[0] should be set - - else: - # - # ---- NON-INTERACTIVE CASE ---- - # - # We just call postprocess WITHOUT callback, - # and let it return the condition string: - condition = current_node.postprocess( - shared_storage, - prep_result, - proc_result - ) - # Then we figure out the next node: - next_node[0] = current_node.successors.get(condition, None) - - # 5) Move on to the next node - current_node = next_node[0] + condition = current_node.run(shared_storage) + current_node = current_node.successors.get(condition, None) + def postprocess(self, shared_storage, prep_result, proc_result): + return None + + + +class AsyncNode(Node): + """ + A Node whose postprocess step is async. + You can also override process() to be async if needed. + """ + + async def postprocess_async(self, shared_storage, prep_result, proc_result): + """ + Async version of postprocess. By default, returns "default". + Override as needed. + """ + await asyncio.sleep(0) # trivial async pause (no-op) + return "default" + + async def run_async(self, shared_storage=None): + """ + Async version of run. + If your process method is also async, you'll need to adapt accordingly. + """ + # We can keep preprocess synchronous or make it async as well, + # depending on your usage. Here it's left as sync for simplicity. + prep = self.preprocess(shared_storage) + + # process can remain sync if you prefer, or you can define an async process. + proc = self._process(shared_storage, prep) + + # postprocess is async + return await self.postprocess_async(shared_storage, prep, proc) + + +class AsyncFlow(Flow): + """ + A Flow that can handle a mixture of sync and async nodes. + If the node is an AsyncNode, calls `run_async`. + Otherwise, calls `run`. + """ + async def _process(self, shared_storage, _): + current_node = self.start_node + while current_node: + if hasattr(current_node, "run_async") and callable(current_node.run_async): + # If it's an async node, await its run_async + condition = await current_node.run_async(shared_storage) + else: + # Otherwise, assume it's a sync node + condition = current_node.run(shared_storage) + + current_node = current_node.successors.get(condition, None) + + async def run_async(self, shared_storage=None): + """ + Kicks off the async flow. Similar to Flow.run, + but uses our async _process method. + """ + prep = self.preprocess(shared_storage) + # Note: flows typically don't need a meaningful process step + # because the "process" is the iteration through the nodes. + await self._process(shared_storage, prep) + return self.postprocess(shared_storage, prep, None) + class BatchNode(BaseNode): def __init__(self, max_retries=5, delay_s=0.1): super().__init__() diff --git a/minillmflow/__pycache__/__init__.cpython-39.pyc b/minillmflow/__pycache__/__init__.cpython-39.pyc index 1400bbbd090d158bca1d472f5a6cfca4e4a66302..912c60f9adb78562f9db0da84fe3b270c18781f1 100644 GIT binary patch delta 2499 zcma)7+fN-u7~eU2IeV_5m(v0T3W&f$MJsKwSS__hus5yslAPMEo3jH2&t?0~VlUE= zD2*@n0ml~;V`*bzFeaw)#WxfG0izGbtntD4?3?fUeRF!)!wE4SGCyW#zU%M%X6ECQ zKOf3_xm;Sq-}+p6exk6J?_zJSjlR^ywGZ^7F{AluKeMX&8TD=Y*^8Q!d-!?CvJQ(3 z!OL?S8d$O?@kP6dMj|Wlz^e$b@S_e%86|#UrhKRuGR><$>7^n1-bjGb%!{iYaNiYS z4ZL~2#lj`yO-IS)VZ_E@ z53Ux`Vy(2D#UIVH#sEslAgQAKMYo+F(P%P46srndB1Vy7U z`I&WRYzNL744Ad>Du-6BwCjSnx-b{I?(nxSCkNO8(uq>9-3<6NKnyY5l>Myi)(<@MB&8Jh$%1mv%tvSM{JZ|I_p{RBh~ zJWimPjv*=kP7NeY+p;sjz96db%a7eGO7A7La*(uoh02TQ}J$Kw}i03MT6ck+}L1I3K=bXLSXG;lcaP`{*#?iec#+Qk>Pux z7wP=QmZp)F++KD;-pPG`pVu`HQUF9AnR^rqzP1ROr6?3B~d=|$e z=JEzIH%g^n`zAF;*11+>zE({5wx96zRaVq}!#7v8qTyS9691b%!)qoL8M3#hQs7Q<0K9-jIgXNYPa=e9R`;5b+*jzv)jh9zL;3o zmknRLpSZ1GvwZEM1_>0#$|zT=y~#|ZV}~`f8ZkG@mX-i`^-<@wsgkq=N8EZi1VyPa ziy17aSlWs_*`G8v{N+h%FqwqOzq^uu$irBuZ!oha$^vUk%JjYFz(Bu-nKh*aK@LC5JH z|#I99W(3Kg;Upr=x(?-Lhy2yMPO|aE{w5Q3O>&=B6+&OQd{I1-!@6VM(WX6WWq^EM6rr8mK83a5&;%2q9 zP>W1a<~#tRpt&9DZYN0o8G;tyS+~jgdnh0%ZjI$xg0&;crLJK5G_FLJ7X-ZOV_8+h zD|Sp`M?}f`LnoB4h_yZw>U7?bBi&Br1X_X@5S;Y3-Ju>Yl_4bu?udwim& zaAOgNptsXzdDGVM&$QDprEbLEyWby@m)w;f_RRiwm#oRZdOqz-kpX3j z-k_rG;^RrZu*tjpa(`dPagvHdR-Hx(eIozZ|KRW$oNdaJ5o0y@DDa;ZUa@*)ou;4A3f_S4Q^w3Fw{#I*c$yKmv`3@O`RJ dDWS+dR;l?*W&S+riZnw0cs|>Wzm)(0 literal 8594 zcmbVRON<=Xb**1_SND84{D`7HvCFiQVaMc%5|mhxDVh{N5@TxYnBy>MM=2M(YG#`1 z>1w{JAvuknz%hl0Uc`Vm25iHC2FQk4WS_N{+1Q%^DP$955eV5OivR)=z~dV#=o;seH=8dpdInjfJ&u{vy7 zp^cV}mWh@V*3ha&#uK|^p^b$=Xk9@``luqKV}?e@3eAqKDd{+xhE6SfF>HkHM`ouUHpA9OMyC-jgp2s= zhD+fx{x-v7;R^n?!k5Am;Yo~K2v3F2p=U8X9X^l0OW~RD1^itOkAs$%Gxwb!iPqyV z8vVHC;koMFLx(p&kC$!-sW(*d!(JFAZ^>uRyMUeuo!ux&&b2F@b3JjA&W}^OgL7@~ z!UgaCZjyRvC0;L0qW-3LZV-Fj?Vva44Yu^UT@(B$9==;B*HDrbR6S#C___we?ikO^ z^|qC@hJgxpqBK%T)=GA}Br{gYGc$9z%QHhYF!!^IuWrXX(W|5F*WbAK=Bqo{^!Dxd zZ}#Iy^4{<7Z1Uar@gep7wc%do7L6tepa2- zo55~B?Q);u;X4l^#GS*-*dlok4fSGbhUVD%IdtI{_7jjgHu+_2UBVmW#$F$N^=_be zRd0Fm!$>_+y%Y^EdfZForDVOg87r?F4?+xx2j1#u<|@v|ZL``?^#*C%)~9tSb6K+- zI(zJ;APoI-SxP`cb{J;MTtOL~F3&zSaPB%NNOEG*P+o!T_AF=oaQIE^(5=ASGHn0%k(_vh`d8KF*L&1`{d_$o?pX|ww`ae`@6%_c+*s?1;wl@o9l3Wo zu6iE5JagPV;>-)f37mKsTrmCArF4n>HWRmJ>_eo05)smf0K*9sU5`Wq@3Bm!xQVlEubnj4sV zL#bFPkDW9%x)R{7`~DXA`5Jn)K-5|QF8&)GD+NN2Ihv=G3Gg)MBxK@imk?dRys#Z^b#Y)ud zHjMBw+R6cpuzf}#pGj>?)4*?HV4ooK`@H8jRw~jgY1iUlluwT zLK5$)ZX~xU_m!JZbpewOyFlUZyZAWqS-yC=Nplj!Hf%~zs30Oi<(1c4a;XuccK86Brr}i|ji?&KXq-|$4U=jeCdXX*c1dm}6Q`L)-LQ9Ds@MoJ) z%zlt0P@#ziaMHI>^2-LYERmwg)fPv@81el|8%<7)ZQyU=l*+2ph7QhYklE4WIXibE zcLDrh6P~xf8T9%aq$0oA=Y@v(F;gl)6TN^UJR0GJD0d$=;kxm`{e61``vIQW+DzX? zEMS6C$4H$$X!O2g8lyMIFy8m!xt=jTwA5e1w$#LsI|ue%qi*~;yuE$ruN*z|ZrOg& z()|~Wiw55jO2AKf6%@b<)PVEw0LgQ8krh!XYuVhi=w*!{O{1M* znwgQJFjcOI3;Y>MLd-MFWz#ZGp+D2Y~pWd{ntzBvUT@2_wm^{Z5XY9!hWYhfL6WlbVch<)fl@LSojTLavD&`o;} zbvXHb0l7-An5H%F7Hn!1z}#+nkn3HEA-rG`A)J?ak0Nh1>1_>y{wl)IX!6;6*b6uu zCh1<#{jdm6L+{?63}g(qrty^mRXlvBFkV9u#*ZDk8;HTQyYaU&1@W(UH}b;RTmR-p zPM#MG_R?*{Hf#LEeZAE|VNb<0l8$q;u19;dTrrKk6n9gS=RMl)b+^4Tl<_cN*z10% z$tDeAO4qdlRz-_mvU(9!c4D4{%FWBA(@)*xaYYI1qn9doq4n`7hndWxqzJQU_CMcY zhC~n?&3$L$I**+LXr#zSeRQ#8#kY3{I`H3k@75g=h)LMro;m-a(f=JDh>DXkU1RDP z8ezrg=6A2}?(FQ9g7Ja35i`9YTxT$U-Wvie(d;F9V}zbDB#8p!6^`E8GgBSI>d?Kz ze4*u#dz6?~?Vmvj49lwJP^#xW}Sk2fmx zY25&eE1%U1otr3HP>yORP61dOg%PQS5T5yP)(s#BZQ#_N;$~R z6?jEOhk@pO2j{>G5z4Fj{w33e8>4)vcz0_Xf+5irVQK&anw7EpCSnhMy1=ytc`7@0bv zBf&sO(zM)-R*6!3y{I4N@VTp0G|0)0NNU>KiOzcopo!>;W#PeggMpVs*j&Y1$UG!^ zwdr7*62Nked}l189o5onXq!m8kwxP1=o$GE|K>+rB^RnCf$l@IXp|0$=$ASC zA5e|qrtBhW&{6z-Xs+hWh)b62UXBN3W%O3&&yK4vj7Ihi2IP*!bAow8Wol z?mMaT6sg@OR(ub$YX`=DZCq2|$NYw#&wRkx*>wICZpXmGn7X(Y$Pw(l1d7T72!Q6zZ5l1xb$I0O2rx?SCjar=9@U8Jque)?aIf(1f1UaX zd*l|UBtqf;=L8cHa!&Aff=};Y*N?cr_NAO=%U0~`qD(!!A3h0>TB6f22^@${3?_(8 z22hGweFHv~&g$3mtbQZU>LUO_fYF2?tz^x2g0#C$`~4|3;y-@&Q$GLr^(j}HqEv`f zLVsa4++d>Dhb9!sKS%qv;#t0i@yr+ZgFQbfqDywF_gT$&vz+p46k!K*vB-&^C~2I; z9VDlC_znQp7(zB}6@+LRr54Ph1@mwJj8dQl@_soV)_%KeYjHA0W0?cw-kI4_ry~J_ zemsc&rLy&N)vX1}IbtsR$?LhsX-r1YUD5gUvHf%77mm0tNH4aX_0jR~-n@G2wts8m z#`_=8gzUUMa*@Gf8tLuKVJxM-jwPz0m|91N`U6%I8Krt;r;8{HBvqoZf5o?4+fP{| z@6Z!ehEGWo55mCgmt5l~s49Wk-ykqMQ@DKx)-8b4dqyQVv-f_K2WQ`!3eL85;tz6& z;H;Q68=|E(pe(TUz|XDoAU)>P`I7@ft%O^+&A!1QlX3WWTz-7y))a z!cSNf=PB7Yg#O0tIj z@gNad$;{aduLO_(iAiN{+BHuwH(k}U;Z}T<=uN1 zHW@+kS5Em&R*v$v`VLmE9#q{$hhpAG{ROLOa1m9h!6r^A;aOv^aYix6v|7dQ?(G5_yS(BWfw5D#Vw5QRZIKiT=x+FFL*a&hp~Q M@s(GWZ?ByFe?QvYkpKVy