From c4c78b1939ace6ded234fdac70e10f736459c917 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Wed, 25 Dec 2024 21:54:34 +0000 Subject: [PATCH] debug empty shared memory --- minillmflow/__init__.py | 257 ++++++++++-------- .../__pycache__/__init__.cpython-39.pyc | Bin 6738 -> 9893 bytes 2 files changed, 151 insertions(+), 106 deletions(-) diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py index f3a8146..dee6f93 100644 --- a/minillmflow/__init__.py +++ b/minillmflow/__init__.py @@ -1,5 +1,6 @@ import asyncio - +import warnings + class BaseNode: """ A base node that provides: @@ -62,10 +63,19 @@ class BaseNode: """ return _ConditionalTransition(self, condition) + def __sub__(self, condition): + """ + For chaining with - operator, e.g. node - "some_condition" >> next_node + """ + if isinstance(condition, str): + return _ConditionalTransition(self, condition) + raise TypeError("Condition must be a string") + class _ConditionalTransition: """ Helper for Node > 'condition' >> AnotherNode style + (and also Node - 'condition' >> AnotherNode now). """ def __init__(self, source_node, condition): self.source_node = source_node @@ -74,7 +84,6 @@ class _ConditionalTransition: def __rshift__(self, target_node): return self.source_node.add_successor(target_node, self.condition) -# robust running process class Node(BaseNode): def __init__(self, max_retries=1): super().__init__() @@ -82,7 +91,6 @@ class Node(BaseNode): def process_after_fail(self, shared_storage, data, exc): raise exc - # return "fail" def _process(self, shared_storage, data): for attempt in range(self.max_retries): @@ -91,28 +99,16 @@ class Node(BaseNode): except Exception as e: if attempt == self.max_retries - 1: return self.process_after_fail(shared_storage, data, e) - -class Flow(BaseNode): - def __init__(self, start_node=None): - self.start_node = start_node - - def _process(self, shared_storage, _): - current_node = self.start_node - while current_node: - condition = current_node.run(shared_storage) - current_node = current_node.successors.get(condition, None) - - def postprocess(self, shared_storage, prep_result, proc_result): - return None - - class AsyncNode(Node): """ A Node whose postprocess step is async. You can also override process() to be async if needed. """ - + def postprocess(self, shared_storage, prep_result, proc_result): + # Not used in async workflow; define postprocess_async() instead. + raise NotImplementedError("AsyncNode requires postprocess_async, and should be run in an AsyncFlow") + async def postprocess_async(self, shared_storage, prep_result, proc_result): """ Async version of postprocess. By default, returns "default". @@ -122,106 +118,155 @@ class AsyncNode(Node): return "default" async def run_async(self, shared_storage=None): - """ - Async version of run. - If your process method is also async, you'll need to adapt accordingly. - """ - # We can keep preprocess synchronous or make it async as well, - # depending on your usage. Here it's left as sync for simplicity. prep = self.preprocess(shared_storage) - - # process can remain sync if you prefer, or you can define an async process. proc = self._process(shared_storage, prep) - - # postprocess is async return await self.postprocess_async(shared_storage, prep, proc) -class AsyncFlow(Flow): +class BaseFlow(BaseNode): """ - A Flow that can handle a mixture of sync and async nodes. - If the node is an AsyncNode, calls `run_async`. - Otherwise, calls `run`. + Abstract base flow that provides the main logic of: + - Starting from self.start_node + - Looping until no more successors + Subclasses must define how they *call* each node (sync or async). """ - async def _process(self, shared_storage, _): - current_node = self.start_node - while current_node: - if hasattr(current_node, "run_async") and callable(current_node.run_async): - # If it's an async node, await its run_async - condition = await current_node.run_async(shared_storage) - else: - # Otherwise, assume it's a sync node - condition = current_node.run(shared_storage) - - current_node = current_node.successors.get(condition, None) - - async def run_async(self, shared_storage=None): - """ - Kicks off the async flow. Similar to Flow.run, - but uses our async _process method. - """ - prep = self.preprocess(shared_storage) - # Note: flows typically don't need a meaningful process step - # because the "process" is the iteration through the nodes. - await self._process(shared_storage, prep) - return self.postprocess(shared_storage, prep, None) - -class BatchNode(BaseNode): - def __init__(self, max_retries=5, delay_s=0.1): - super().__init__() - self.max_retries = max_retries - self.delay_s = delay_s - - def preprocess(self, shared_storage): - return [] - - def process_one(self, shared_storage, item): - return None - - def process_one_after_fail(self, shared_storage, item, exc): - print(f"[FAIL_ITEM] item={item}, error={exc}") - # By default, just return a "fail" marker. Could be anything you want. - return "fail" - - async def _process_one(self, shared_storage, item): - for attempt in range(self.max_retries): - try: - return await self.process_one(shared_storage, item) - except Exception as e: - if attempt == self.max_retries - 1: - # If out of retries, let a subclass handle what to do next - return await self.process_one_after_fail(shared_storage, item, e) - await asyncio.sleep(self.delay_s) - - async def _process(self, shared_storage, items): - results = [] - for item in items: - r = await self._process_one(shared_storage, item) - results.append(r) - return results - -class BatchFlow(BaseNode): def __init__(self, start_node=None): super().__init__() self.start_node = start_node + def get_next_node(self, current_node, condition): + next_node = current_node.successors.get(condition, None) + + if next_node is None and current_node.successors: + warnings.warn(f"Flow will end. Condition '{condition}' not found among possible conditions: {list(current_node.successors.keys())}") + + return next_node + + def run(self, shared_storage=None): + """ + By default, do nothing (or raise). + Subclasses (Flow, AsyncFlow) will implement. + """ + raise NotImplementedError("BaseFlow.run must be implemented by subclasses") + + async def run_async(self, shared_storage=None): + """ + By default, do nothing (or raise). + Subclasses (Flow, AsyncFlow) will implement. + """ + raise NotImplementedError("BaseFlow.run_async must be implemented by subclasses") + +class Flow(BaseFlow): + """ + Synchronous flow: each node is called with .run(shared_storage). + """ + def _process_flow(self, shared_storage): + current_node = self.start_node + while current_node: + # Pass down the Flow's parameters to the current node + current_node.set_parameters(self.parameters) + # Synchronous run + condition = current_node.run(shared_storage) + # Decide next node + current_node = self.get_next_node(current_node, condition) + + def run(self, shared_storage=None): + prep_result = self.preprocess(shared_storage) + self._process_flow(shared_storage) + return self.postprocess(shared_storage, prep_result, None) + +class AsyncFlow(BaseFlow): + """ + Asynchronous flow: if a node has .run_async, we await it. + Otherwise, we fallback to .run. + """ + async def _process_flow_async(self, shared_storage): + current_node = self.start_node + while current_node: + current_node.set_parameters(self.parameters) + + # If node is async-capable, call run_async; otherwise run sync + if hasattr(current_node, "run_async") and callable(current_node.run_async): + condition = await current_node.run_async(shared_storage) + else: + condition = current_node.run(shared_storage) + + current_node = self.get_next_node(current_node, condition) + + async def run_async(self, shared_storage=None): + prep_result = self.preprocess(shared_storage) + await self._process_flow_async(shared_storage) + return self.postprocess(shared_storage, prep_result, None) + + def run(self, shared_storage=None): + return asyncio.run(self.run_async(shared_storage)) + +class BaseBatchFlow(BaseFlow): + """ + Abstract base for a flow that runs multiple times (a batch), + once for each set of parameters or items from preprocess(). + """ def preprocess(self, shared_storage): + """ + By default, returns an iterable of parameter-dicts or items + for the flow to process in a batch. + """ return [] - async def _process_one(self, shared_storage, param_dict): - node_parameters = self.parameters.copy() - node_parameters.update(param_dict) + def post_batch_run(self, all_results): + """ + Hook for after the entire batch is done, to combine results, etc. + """ + return all_results - if self.start_node: - current_node = self.start_node - while current_node: - # set the combined parameters - current_node.set_parameters(node_parameters) - current_node = await current_node._run_one(shared_storage or {}) +class BatchFlow(BaseBatchFlow, Flow): + """ + Synchronous batch flow: calls the flow repeatedly + for each set of parameters/items in preprocess(). + """ + def run(self, shared_storage=None): + prep_result = self.preprocess(shared_storage) + all_results = [] - async def _process(self, shared_storage, items): - results = [] - for param_dict in items: - await self._process_one(shared_storage, param_dict) - results.append(f"Ran sub-flow for param_dict={param_dict}") - return results \ No newline at end of file + # For each set of parameters (or items) we got from preprocess + for param_dict in prep_result: + # Merge param_dict into the Flow's parameters + original_params = self.parameters.copy() + self.parameters.update(param_dict) + + # Run from the start node to end + self._process_flow(shared_storage) + + # Optionally collect results from shared_storage or a custom method + all_results.append(f"Finished run with parameters: {param_dict}") + + # Reset the parameters if needed + self.parameters = original_params + + # Postprocess the entire batch + result = self.post_batch_run(all_results) + return self.postprocess(shared_storage, prep_result, result) + +class BatchAsyncFlow(BaseBatchFlow, AsyncFlow): + """ + Asynchronous batch flow: calls the flow repeatedly in an async manner + for each set of parameters/items in preprocess(). + """ + async def run_async(self, shared_storage=None): + prep_result = self.preprocess(shared_storage) + all_results = [] + + for param_dict in prep_result: + original_params = self.parameters.copy() + self.parameters.update(param_dict) + + await self._process_flow_async(shared_storage) + + all_results.append(f"Finished async run with parameters: {param_dict}") + + # Reset back to original parameters if needed + self.parameters = original_params + + # Combine or process results at the end + result = self.post_batch_run(all_results) + return self.postprocess(shared_storage, prep_result, result) \ No newline at end of file diff --git a/minillmflow/__pycache__/__init__.cpython-39.pyc b/minillmflow/__pycache__/__init__.cpython-39.pyc index 912c60f9adb78562f9db0da84fe3b270c18781f1..12b1cf933cc1098859fda47a847d544cb78f75d3 100644 GIT binary patch literal 9893 zcmb_iTaO&ab?)0-_rm4!YFe_}v>fgzuBvnDt8>0{n*P#K+rUpfAME_@cMRh{S*iYNsBGYh{|QB41m?i#>c3{!eBTJH z!2Zk#>|?8Iq2>fN)M}{NsMUi8YK_o3w!04M&7g&PD|Aq=q23M_P+ySeb<`JwCDfO~ zI-WOBUk=WpeopF5)K`M@sGpa53-#6D0_qpSR@gXpx^1*v3@)MN5?Y#QSqRKWM(6Uc zuySLw$FlmfQQ5#1Cn!RrYX(Nw3e2vpW$HRwnrfOh3k=xt#`@{ZlM~~Yxp_lx}dkfb?T=5EuzA-U8T|#k> zjF;wS$4c8HU-^4s5~?_D$NN1lGg5KKOdZbh(ohY&`^BBxyU|{Fd%XL`n|I#2y@yFZ z{P6ql4Wa|N9}M=kdGC36ka*tu=rC>Ol|?SVL1CD6b9_G6-+KPM$5rM(-gBFaRZonk z#tU;|o|q;;{K!yz+OZS|OzZojz)wO{>rxe2Vp*{+nrvQ2(|-(NPnR)o5UG1CQ(;i)jjrk$R@ksYg7DwD5NCYO^q zGjp-i1n|Uo!9BH3tQ>--x{7vnnFWuCE(CB*FPre2XxGwORp4h&dz>Nv@qSuX;>Oxk zOL!n-zjS2ZMO<|St&}%Pa6lvzmkpr7oz_wT#WC{#XhdeaTpch<#lf0yk%O>Y#h zc1*R3fpdV@qGUHzr&rzfJQeTuw_)*q6P>k+opFil^7+~8e-m#ez*b@&Tk2v06;E*Y zNVmaDt2IdH&;X*X7MRE8Uz-7*ckFSmSneE2&k)qT9>b7(h1IxTi~$LUvx*pB{lPHa zAB`fFgn=G&7`VydDBSGS({?}Z597oi_QKST6O~@}9_FUYA3Rq6FcveWxMiZzwDlP8 zzN=KEl*`6bXQ%FYJJ9EM@hBFaQ)>eb{BL}&0EF&ywodsXbed@rnE1UyN@|K*y>`7+ zK}W<6pC_KU@%V6<_|JhybKEJ{G3I@*Y*6t!1L~L+2UKA>VeOmp1oif@682) zJ+ezKb1&liKV!iY7$S7Kkns&#Iya zJiUDl4=!iR^B&-tKFp5nU1@H!4$vwsTL&a$g65@F&j?`Wh><@Uq=y}if{XzBw zBL}c{o@#Mg+GN3#HaKF63#+4Ochz+kH&}F7yw0Koo7~Y_3p8o+;%g|HrqgITO}kxh z+W3!mwPwA!(5y?p9=c5ZxYrxFVi(1iQqnq(Y(um+R-*O~!U0T?xcg1A$NTP$0+Khl zOK%Uw>d3=5IUI!Y+M18^=MUn@l?S)J?5$yR&{>zaX1p60E(V@-h-}3ZHsc93%t%(c z5J&r}7s{~^;Ah-J-NsAvPJ(RCAK_t1_`W&cBAiYCxEJNhGWrKLRq?&HUH~n$UY6z| z-G#(gJ0N>J#+AL6Z6+KZb7hp=Fvs6Ib6vSjhk;YH>*^+pTP)UD2mm=;Wr`9`v=x7d zOVAHEIZa1)eFi#|lfU>46a{peKQ^{u-w~s=5Z6d_)^((fv01ZQ`!OhJmtbn84iWGx zvW6MULLqGqPI8nO$MD-H;1{ydRzc!Ac3OjRhi9Wq>B64>9CQMQK8(RR>5J8Gp?GPg zEm*jLAICUpVdqWn!g{kvNCSG>#n87{4_E19}SDf7STJQhxy_T@yCsoY+r{y74in@{#kG zj(+n=QGeFf?RShj2Jf+vAbE6maHlmmOwecQToDyQq|(;A&wJsB(SUl3{c5F_)NiAq z+whYl+#4mS87k6=$`;Wkdbna5Ov79@E%PGE=$kw%&%1?s1>T12J!J zYh!q@8zC02Fkg7EaOC!5*XIk`6n-4-yFGuXZ75?gg>ao=yN=M^BogPx7u^1~I}F1h z3^FTv8qkE?LyAE4Cdjo2JJXvweyv!gtHNjdeFP_Z9n}@$Y+cxllJD(}2H{>f1j*9e zh|Z{@UePQW^&V#vq+8=l#RTgWnPiKiNrftCsVD`lP9zxX*fa(C;z2l6ZiW()M zA%a6KF~iSjySmtQ_ral?MaDNFRI;y-kGP(-T-QqCARpyw_2Vf8@)-^gk~`Y_TWLdf zrXRr&4MOBe=E$9%uSn-f|7R3OHlwy;j=w%v54z|4&_>ij*yWMK*`W;-WqgpEDfquK zzI3*}0SKBPc1$4(p#_gl#<#8_pP(ggoL9_(@w zPOQC6v#HqH!!`BGu^Zn<^VD%$5dQ*i1D^wr6V#FKsv)(radCUDUibhPC><^Rv036=ki&W?q365AgKHA?BKZTW20RreC>1wsg&nMx2vqG#>II5vN3 z{LEoWZNlS^?9?^RhZuPI-i7xapvaJ#ooSBFnJ}t?6!t^6jnjIoT(+@b@|hu zPMB@Zhg-N%8;&u@3&xY&R6!d$^{4HfkHV;_E1;Od<)m0b|$@b7;aGIyflJ z)S+g_f1qs*7{acKhS7d32JddgtblUR(qJDY@3f9xUYkym=7AY4Oa9~0=0y=DEMO8D zuE|pC6API)20=WuIMhrCNe<4D44idjJ*yXN8Wd4X4L4ab}E4ZJ<&8jbdy0PJ5ajp(wS~zz!&+Tz>eKqRBY?vn)*?;4P5a}6mxOY zDsa;-+$a&Wka1Um(T>1K%*8a0Ks$Zyo_0Z?2p)YBZ@LFa%N+Rq1RsI4HTj5%$pe@q zdA1GAZTY=VNMiX~MqB4$>~pT4(OZGB$5o6a_5`2Skbb-ew_!c49h-0|7GoUYj-W(G z_OOkXdV$2oG!hxtJjD2&<|#0y4M>HMMWs!i5g&d;I+h?+AK|^q@fHtoVG3+oA^$=l zmQe^Y%|-j7xoWko@vDUmUZ3;Ue4vU#)-cp8P&Eg{!8;qSkMZK!aHSsp6;}aQuvX0l zbIfXaX6$uwf>!W$!tA>&;Y8BGI!$A%4LqyZ#vbPH5gg|6}gQSm`%1!!vAcJc@h$#l?+>`+#B&;R#CPWb5{v^;TzJ$6Mv5$}% zcAbp0{5-Wb&uUYoBC|pBy~6;|93RS5xFe<9M$B`p6DPV`#{BT;sriYO&o+5nyH|l zXFglSXmxxiG>_`IYe=4~PLB^CG5*GT_(ucrE~pZAQPZXJV?CmZrVhDzuu`WNjSdy5 zh@vY`>;7njcty#!Fvt&STLC-c&sKaQ3gSylASQ7f-(!Qb72>2^h;oLBdUjVEtt9u% zI$ojW0SEE|RHcfd3Q{S&kNycf^DpL=2a3OtC%*5@MI&i9siGLww7Prxa<1Pw-NC{_ z!@%=ot-Jo(vxf06Hp-8K#$~+GuTZ>gC_|Z_;(ctj*s_$3mW`H)mZK_YRYK#j-LlZG zsv6ogY1?SKs*ZMD+78+cHHY?`v@2@t8n;v!^3mM`v@Ig4?SMI5yaj=_aAmt7`-l^J?}Jn zLUgvnC|YTjI%j(NQ9L_NZ}(Q3-s#ic=5`c&XhmKpj>7Jmx6-5&sb=xN7 zkw2_kc(37&mQZwzk>RH$1a{YWYOXe|q%jC|uocFkj*>>S-6olR9W~9w;Vw@NUB%q5 z&Yj)pZ-r-v8|Pm=cj4?7HobG_otxeMCsOZrx7JwqeSC<0|IA<~aq~u<1khv*vuX|x z=khy~e{Yk@{9&EyEvOzDo5mA!WbT0k5jEJ2y*hmDUZAO}*S-G3P=BI3F&bX@s1r%05WTn7*Iv8dQy9?id8fWI zPk}#fn^Xq6(~FyS%B@42OX|hYX|@*vrTk)9T0rEYFwBOzgm-wX;C*7?j5=sY!ZB$m zR3N)O%>=&~Z3H^RZgF1+>)|vdjj;zD!y38ppD73!f1i=}*X^DPLO+8wlR2*8>o9`S zruVO9ZuMMg^QF?};)ydm7n+{Ow2|?II<@w!+yqU19Q_*jn7T;1sTiE=XB)myY-p7Y zKl|F|4*A3SB(Jm^YgKdiRM?K_UD(x>V+%a@+XwJGHyFc-hrzkfkB<2s4_KX0I!U2F ziPBG^k}|(%?!w&Zixk7Ijk?A9$lkMaq1gH;#_JO-o=4Fv(cE&Vq6B5XJ*^=d!1i5? zj|euj(0Ow>Ur>sV69i6x1VdF$AS7lEuV)zRc{k}v8A5uWbm9%~(j~7y2z3BEec21o zte+9!eue$?rt`(rrl}8O;oQJ0{dgnP(^NNnUq>6AH6R|*AQiDQ9NYEy`@!_TikUHN zD@Od(M`MV1gxcM-4^dRFfIEi(h`tJpce^@ zX_~f3G^xLjxi_@#YyE9@QsZeMzEk!6bw&{MMv{e88ZZa`hKCA+NXHy3Q%;0TGfBcG z-YB@Fq^RnZ<*^8w!gly5_9cu*JH0q~RLp=WEMvVa$Y=uTG|`B4?V#KB{hyF*i7igq z;j^z;iQe2q;623n*hAp;{RC#T)mPhH*5`fy!FJHizSMkQ!NYw`;MQ|2XqBmx3H%hE zrinHBMHX~f{XG_A*-DK97EMqa5vRJQQ*#~HZd6?xKh`R4)t!@3Z48)r$k=7Pk%!`I z^R6oFvnFC2UKByz3cK(-iEXP)l`eUw3i+R+_^X)JWVt~y7xlMwI}~k-zOs<1zmG}#DNy8l9UsSe%8UCO^1)dPQFBaLmbD-WD1{6= zB|ub2`ZJSi&BcLUM~*`4E9p$sMUCDfWnxw4@H+?6l|vEyQon>5`Vje;sMKB>gQ%`k3&$t;%P}oLV%C zdg4&Y|Dg@ecrcl?xj4f}Y&=G|*+WV7)5al)z6RsQgH+StJ;s<1A zry=?t@_!SQI!5g5z@v8^(->YDA#iUZTRmZXZ0QdXTPhO7ojv=WQ8j*n3~$%@jg!v2 zSF|5A(*ASCIfHc^ACQ~83DWxy3a6m1H z<8W&bCuXS6a@tr$T;Kz|5%JP67fs7NigK|~RVF7Os?boIwsDIr3G#_wr5hDKeV6%s#eh8IR(j_ ztxt~Z^q{rtjOF}CB z;pEhGR)>dwdGpHcJO1tWZ@hCK_dDU%n?n}|6lTe9CJsXdkZutxm1B**jtv&rQs9ylt-R_a*e;BC`F#%Bl4WckAr*GZN!jUMk)H(JAcZe z&$|=RXCpn?z!DLC@>$a{D6TxI0;6hsW;(*e%6XKUjY^uwsiv!cgugT(-C*s0{!umZ%t!=&`A1sHbCBRqVU{Ws$nsnr!Oe-%Zuv2Vbohw2+F zC~F~3cF;T*l|(s?x|i``6R!k%*IcZ~Ii%t?G{3a*PCNymQlSN)Hug(Ci}))y%&n>b zXbtW*mjUR(0olgl-;{x-iH@M9MC6N@vmY@2k!z(shX|L7buv2tKVb25?jj4B!xR?N zSXbOUPuL!vo8M+Y?rw-B~`ahNj>E_96CCwH9( z-Z+v})5}?>-5=~|-u|XJ1M}9F>*F$qXj=BU>=N2nop#(h)I%;_x~|o4afL}Y6wCTE z`ZMpt^?+4H*l{e#826g+zEs8~XPuD%r63X@WE9@_(yaGE7RZbM-VR_}WRv(T3r9IQ!$=&t4`)jLmwP1#@7@Zp0&HK#6YQK#a P@vX)ArNc`LOUwTS3N99P