From dcf72251310b39da20297d7d45ecaebe07d6d8fc Mon Sep 17 00:00:00 2001 From: zachary62 Date: Wed, 25 Dec 2024 19:49:54 +0000 Subject: [PATCH] bad attempt for async --- .gitignore | 71 ++++++ minillmflow/__init__.py | 231 +++++++++++------- .../__pycache__/__init__.cpython-39.pyc | Bin 7862 -> 8594 bytes 3 files changed, 220 insertions(+), 82 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..004e932 --- /dev/null +++ b/.gitignore @@ -0,0 +1,71 @@ +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# IDE specific files +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Node +node_modules/ +npm-debug.log +yarn-debug.log +yarn-error.log +.env +.env.local +.env.development.local +.env.test.local +.env.production.local + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +venv/ +ENV/ + +# Logs and databases +*.log +*.sql +*.sqlite + +# Build output +dist/ +build/ +out/ + +# Coverage reports +coverage/ +.coverage +.coverage.* +htmlcov/ + +# Misc +*.bak +*.tmp +*.temp \ No newline at end of file diff --git a/minillmflow/__init__.py b/minillmflow/__init__.py index 2a7ea3d..9d560b1 100644 --- a/minillmflow/__init__.py +++ b/minillmflow/__init__.py @@ -1,93 +1,56 @@ -import asyncio - -def _wrap_async(fn): +class BaseNode: """ - Given a synchronous function fn, return a coroutine (async function) that - simply awaits the (synchronous) call. + A base node that provides: + - preprocess() + - process() + - postprocess() + - run() -- just runs itself (no chaining) """ - async def _async_wrapper(self, *args, **kwargs): - return fn(self, *args, **kwargs) - return _async_wrapper - - -class NodeMeta(type): - """ - Metaclass that converts certain methods into async if they are not already. - """ - def __new__(mcs, name, bases, attrs): - # Add ANY method names you want to auto-wrap here: - methods_to_wrap = ( - "preprocess", - "process", - "postprocess", - "process_after_fail", - "process_one", - "process_one_after_fail", - ) - - for attr_name in methods_to_wrap: - if attr_name in attrs: - # If it's not already a coroutine function, wrap it - if not asyncio.iscoroutinefunction(attrs[attr_name]): - old_fn = attrs[attr_name] - attrs[attr_name] = _wrap_async(old_fn) - - return super().__new__(mcs, name, bases, attrs) - - -async def hello(text): - print("Start") - await asyncio.sleep(1) # Simulate some async work - print(text) - -class BaseNode(metaclass=NodeMeta): def __init__(self): self.parameters = {} self.successors = {} - # Syntactic sugar for chaining + def set_parameters(self, params): + self.parameters.update(params) + def add_successor(self, node, condition="default"): - # warn if we're overwriting an existing successor if condition in self.successors: print(f"Warning: overwriting existing successor for condition '{condition}'") self.successors[condition] = node return node - - # By default these are already async. If a subclass overrides them - # with non-async definitions, they'll get wrapped automatically. + def preprocess(self, shared_storage): return None - def process(self, shared_storage, data): + def process(self, shared_storage, prep_result): return None - def postprocess(self, shared_storage, preprocess_result, process_result): + def _process(self, shared_storage, prep_result): + # Could have retry logic or other wrap logic + return self.process(shared_storage, prep_result) + + def postprocess(self, shared_storage, prep_result, proc_result): return "default" - async def _process(self, shared_storage, data): - return await self.process(shared_storage, data) - - async def _run_one(self, shared_storage): - preprocess_result = await self.preprocess(shared_storage) - process_result = await self._process(shared_storage, preprocess_result) - condition = await self.postprocess(shared_storage, preprocess_result, process_result) or "default" - return self.successors.get(condition) - def run(self, shared_storage=None): - asyncio.run(self._run_async(shared_storage)) + if not shared_storage: + shared_storage = {} - async def run_in_jupyter(self, shared_storage=None): - await self._run_async(shared_storage) - - async def _run_async(self, shared_storage): - current_node = self - while current_node: - current_node = await current_node._run_one(shared_storage) + prep = self.preprocess(shared_storage) + proc = self._process(shared_storage, prep) + return self.postprocess(shared_storage, prep, proc) def __rshift__(self, other): + """ + For chaining with >> operator, e.g. node1 >> node2 + """ return self.add_successor(other) def __gt__(self, other): + """ + For chaining with > operator, e.g. node1 > "some_condition" + then >> node2 + """ if isinstance(other, str): return _ConditionalTransition(self, other) elif isinstance(other, BaseNode): @@ -95,49 +58,153 @@ class BaseNode(metaclass=NodeMeta): raise TypeError("Unsupported operand type") def __call__(self, condition): + """ + For node("condition") >> next_node syntax + """ return _ConditionalTransition(self, condition) + class _ConditionalTransition: + """ + Helper for Node > 'condition' >> AnotherNode style + """ def __init__(self, source_node, condition): self.source_node = source_node self.condition = condition - def __gt__(self, target_node): - if not isinstance(target_node, BaseNode): - raise TypeError("Target must be a BaseNode") + def __rshift__(self, target_node): return self.source_node.add_successor(target_node, self.condition) +# robust running process class Node(BaseNode): - def __init__(self, max_retries=5, delay_s=0.1): + def __init__(self, max_retries=1): super().__init__() self.max_retries = max_retries - self.delay_s = delay_s def process_after_fail(self, shared_storage, data, exc): raise exc # return "fail" - async def _process(self, shared_storage, data): + def _process(self, shared_storage, data): for attempt in range(self.max_retries): try: - return await super()._process(shared_storage, data) + return super()._process(shared_storage, data) except Exception as e: if attempt == self.max_retries - 1: - return await self.process_after_fail(shared_storage, data, e) - await asyncio.sleep(self.delay_s) + return self.process_after_fail(shared_storage, data, e) -class Flow(BaseNode): +class InteractiveNode(BaseNode): + """ + Interactive node. Instead of returning a condition, + we 'signal' the condition via a callback provided by the Flow. + """ + + def postprocess(self, shared_storage, prep_result, proc_result, next_node_callback): + """ + We do NOT return anything. We call 'next_node_callback("some_condition")' + to tell the Flow which successor to pick. + """ + # e.g. here we pick "default", but in real usage you'd do logic or rely on user input + next_node_callback("default") + + def run(self, shared_storage=None): + """ + Run just THIS node (no chain). + """ + if not shared_storage: + shared_storage = {} + + # 1) Preprocess + prep = self.preprocess(shared_storage) + + # 2) Process + proc = self._process(shared_storage, prep) + + # 3) Postprocess with a dummy callback + def dummy_callback(condition="default"): + print("[Dummy callback] To run the flow, pass this node into a Flow instance.") + + self.postprocess(shared_storage, prep, proc, dummy_callback) + + def is_interactive(self): + return True + +class Flow: + """ + A Flow that runs through a chain of nodes, from a start node onward. + Each iteration: + - preprocess + - process + - postprocess + The postprocess is given a callback to choose the next node. + We'll 'yield' the current node each time, so the caller can see progress. + """ def __init__(self, start_node=None): - super().__init__() self.start_node = start_node - async def _process(self, shared_storage, _): - if self.start_node: - current_node = self.start_node - while current_node: - current_node = await current_node._run_one(shared_storage or {}) - return "Flow done" + def run(self, shared_storage=None): + if shared_storage is None: + shared_storage = {} + current_node = self.start_node + print("hihihi") + + while current_node: + # 1) Preprocess + prep_result = current_node.preprocess(shared_storage) + print("prep") + # 2) Process + proc_result = current_node._process(shared_storage, prep_result) + + # Prepare next_node variable + next_node = [None] + + # We'll define a callback only if this is an interactive node. + # The callback sets next_node[0] based on condition. + def next_node_callback(condition="default"): + nxt = current_node.successors.get(condition) + next_node[0] = nxt + + # 3) Check if it's an interactive node + is_interactive = ( + hasattr(current_node, 'is_interactive') + and current_node.is_interactive() + ) + + if is_interactive: + print("ineractive") + # + # ---- INTERACTIVE CASE ---- + # + # a) yield so that external code can do UI, etc. + # yield current_node, prep_result, proc_result, next_node_callback + + # # b) Now we do postprocess WITH the callback: + # current_node.postprocess( + # shared_storage, + # prep_result, + # proc_result, + # next_node_callback + # ) + # # once postprocess is done, next_node[0] should be set + + else: + # + # ---- NON-INTERACTIVE CASE ---- + # + # We just call postprocess WITHOUT callback, + # and let it return the condition string: + condition = current_node.postprocess( + shared_storage, + prep_result, + proc_result + ) + # Then we figure out the next node: + next_node[0] = current_node.successors.get(condition, None) + + # 5) Move on to the next node + current_node = next_node[0] + class BatchNode(BaseNode): def __init__(self, max_retries=5, delay_s=0.1): super().__init__() diff --git a/minillmflow/__pycache__/__init__.cpython-39.pyc b/minillmflow/__pycache__/__init__.cpython-39.pyc index 57d50b08411c435ab18b53be7c516393834bf612..1400bbbd090d158bca1d472f5a6cfca4e4a66302 100644 GIT binary patch literal 8594 zcmbVRON<=Xb**1_SND84{D`7HvCFiQVaMc%5|mhxDVh{N5@TxYnBy>MM=2M(YG#`1 z>1w{JAvuknz%hl0Uc`Vm25iHC2FQk4WS_N{+1Q%^DP$955eV5OivR)=z~dV#=o;seH=8dpdInjfJ&u{vy7 zp^cV}mWh@V*3ha&#uK|^p^b$=Xk9@``luqKV}?e@3eAqKDd{+xhE6SfF>HkHM`ouUHpA9OMyC-jgp2s= zhD+fx{x-v7;R^n?!k5Am;Yo~K2v3F2p=U8X9X^l0OW~RD1^itOkAs$%Gxwb!iPqyV z8vVHC;koMFLx(p&kC$!-sW(*d!(JFAZ^>uRyMUeuo!ux&&b2F@b3JjA&W}^OgL7@~ z!UgaCZjyRvC0;L0qW-3LZV-Fj?Vva44Yu^UT@(B$9==;B*HDrbR6S#C___we?ikO^ z^|qC@hJgxpqBK%T)=GA}Br{gYGc$9z%QHhYF!!^IuWrXX(W|5F*WbAK=Bqo{^!Dxd zZ}#Iy^4{<7Z1Uar@gep7wc%do7L6tepa2- zo55~B?Q);u;X4l^#GS*-*dlok4fSGbhUVD%IdtI{_7jjgHu+_2UBVmW#$F$N^=_be zRd0Fm!$>_+y%Y^EdfZForDVOg87r?F4?+xx2j1#u<|@v|ZL``?^#*C%)~9tSb6K+- zI(zJ;APoI-SxP`cb{J;MTtOL~F3&zSaPB%NNOEG*P+o!T_AF=oaQIE^(5=ASGHn0%k(_vh`d8KF*L&1`{d_$o?pX|ww`ae`@6%_c+*s?1;wl@o9l3Wo zu6iE5JagPV;>-)f37mKsTrmCArF4n>HWRmJ>_eo05)smf0K*9sU5`Wq@3Bm!xQVlEubnj4sV zL#bFPkDW9%x)R{7`~DXA`5Jn)K-5|QF8&)GD+NN2Ihv=G3Gg)MBxK@imk?dRys#Z^b#Y)ud zHjMBw+R6cpuzf}#pGj>?)4*?HV4ooK`@H8jRw~jgY1iUlluwT zLK5$)ZX~xU_m!JZbpewOyFlUZyZAWqS-yC=Nplj!Hf%~zs30Oi<(1c4a;XuccK86Brr}i|ji?&KXq-|$4U=jeCdXX*c1dm}6Q`L)-LQ9Ds@MoJ) z%zlt0P@#ziaMHI>^2-LYERmwg)fPv@81el|8%<7)ZQyU=l*+2ph7QhYklE4WIXibE zcLDrh6P~xf8T9%aq$0oA=Y@v(F;gl)6TN^UJR0GJD0d$=;kxm`{e61``vIQW+DzX? zEMS6C$4H$$X!O2g8lyMIFy8m!xt=jTwA5e1w$#LsI|ue%qi*~;yuE$ruN*z|ZrOg& z()|~Wiw55jO2AKf6%@b<)PVEw0LgQ8krh!XYuVhi=w*!{O{1M* znwgQJFjcOI3;Y>MLd-MFWz#ZGp+D2Y~pWd{ntzBvUT@2_wm^{Z5XY9!hWYhfL6WlbVch<)fl@LSojTLavD&`o;} zbvXHb0l7-An5H%F7Hn!1z}#+nkn3HEA-rG`A)J?ak0Nh1>1_>y{wl)IX!6;6*b6uu zCh1<#{jdm6L+{?63}g(qrty^mRXlvBFkV9u#*ZDk8;HTQyYaU&1@W(UH}b;RTmR-p zPM#MG_R?*{Hf#LEeZAE|VNb<0l8$q;u19;dTrrKk6n9gS=RMl)b+^4Tl<_cN*z10% z$tDeAO4qdlRz-_mvU(9!c4D4{%FWBA(@)*xaYYI1qn9doq4n`7hndWxqzJQU_CMcY zhC~n?&3$L$I**+LXr#zSeRQ#8#kY3{I`H3k@75g=h)LMro;m-a(f=JDh>DXkU1RDP z8ezrg=6A2}?(FQ9g7Ja35i`9YTxT$U-Wvie(d;F9V}zbDB#8p!6^`E8GgBSI>d?Kz ze4*u#dz6?~?Vmvj49lwJP^#xW}Sk2fmx zY25&eE1%U1otr3HP>yORP61dOg%PQS5T5yP)(s#BZQ#_N;$~R z6?jEOhk@pO2j{>G5z4Fj{w33e8>4)vcz0_Xf+5irVQK&anw7EpCSnhMy1=ytc`7@0bv zBf&sO(zM)-R*6!3y{I4N@VTp0G|0)0NNU>KiOzcopo!>;W#PeggMpVs*j&Y1$UG!^ zwdr7*62Nked}l189o5onXq!m8kwxP1=o$GE|K>+rB^RnCf$l@IXp|0$=$ASC zA5e|qrtBhW&{6z-Xs+hWh)b62UXBN3W%O3&&yK4vj7Ihi2IP*!bAow8Wol z?mMaT6sg@OR(ub$YX`=DZCq2|$NYw#&wRkx*>wICZpXmGn7X(Y$Pw(l1d7T72!Q6zZ5l1xb$I0O2rx?SCjar=9@U8Jque)?aIf(1f1UaX zd*l|UBtqf;=L8cHa!&Aff=};Y*N?cr_NAO=%U0~`qD(!!A3h0>TB6f22^@${3?_(8 z22hGweFHv~&g$3mtbQZU>LUO_fYF2?tz^x2g0#C$`~4|3;y-@&Q$GLr^(j}HqEv`f zLVsa4++d>Dhb9!sKS%qv;#t0i@yr+ZgFQbfqDywF_gT$&vz+p46k!K*vB-&^C~2I; z9VDlC_znQp7(zB}6@+LRr54Ph1@mwJj8dQl@_soV)_%KeYjHA0W0?cw-kI4_ry~J_ zemsc&rLy&N)vX1}IbtsR$?LhsX-r1YUD5gUvHf%77mm0tNH4aX_0jR~-n@G2wts8m z#`_=8gzUUMa*@Gf8tLuKVJxM-jwPz0m|91N`U6%I8Krt;r;8{HBvqoZf5o?4+fP{| z@6Z!ehEGWo55mCgmt5l~s49Wk-ykqMQ@DKx)-8b4dqyQVv-f_K2WQ`!3eL85;tz6& z;H;Q68=|E(pe(TUz|XDoAU)>P`I7@ft%O^+&A!1QlX3WWTz-7y))a z!cSNf=PB7Yg#O0tIj z@gNad$;{aduLO_(iAiN{+BHuwH(k}U;Z}T<=uN1 zHW@+kS5Em&R*v$v`VLmE9#q{$hhpAG{ROLOa1m9h!6r^A;aOv^aYix6v|7dQ?(G5_yS(BWfw5D#Vw5QRZIKiT=x+FFL*a&hp~Q M@s(GWZ?ByFe?QvYkpKVy literal 7862 zcmb_hTaO$^74GZY_Tu&0aU7G7hJ+*o$tHvV0Xe}TS3|M{+rcm)wAxd>yW`AGkE?rQ z?`ReY96=r+LPA1ZMDQXW@Dn^BAztyn=vN+4{sIyb-*@^l-MeE433|7xtE%gA>Ri5a zswP-k>KJ(b{lVLtCqHHw|6ry1xTu^#ihqJk8Oj_QJwsW_e%I)kFB{5H?z@I^L+iTT zvsFVi(e9wVrP^q3hqmhc)b6=xSx}2;S-fub8mKL)Wz?2K7vq|!A5kl)uSmUx`cbut z`l{61s2@|uQ9myAjxx_1-Mc;nMaFvH`=9HuC$zI39 z^YY-GaO4GEygTY|>1Y(~#NNivsGkg?k+(5=)YD1Zbzj6&~`fW6CZT`$=R z5}7R?Y>$V#UT`%SBr%%Nw=!|p>j%T(n)IxZOHv)A-yy{(kiBP2jZF~xd-I$zFs9ZY z%=ba`O>_MwC{LX@9B!mepf}^R@%B}|ZW!)PqvWQZx-yVzx zufF=)OT*}@l!wFZ4VHZ$9}?eR8}FvezU;`qs)O-3)Gd-h78>T{0cp&>ubm!7*g^d4 zTD`w4+qGnzrW#3uZnD%ymM&B#A&D_JYq*P+UckH0O7&qEKCy=^gnlWbXDPF1ixjDI zt)BB^aIEJ-l1|!Mk5u?tm;{qIgtsjAhe1pU1PKH;dMDJ7M<01G7{aQQ+H~=W9U|5 z+%Tv1)V;^pH{(`T+G`}%v~gY9Pa9WSiGAI;;Y^LGIkl$tzJsk;>)l4$8S8MYqkb62 zX|pISjH5UyDyw

poe`y0Vvm@X7;(I`w$R4SEDT{msYb_Y>*=(81)?}rS=UPESBW%jV3>aa$tvn&8BU%q<;(WN;jmbmLeuv#6CxH-T0#=wt>y9!I3?7EzxND|P z5?)I(o(N%LEAAHPjUM81R5JrCSuu4Nr9P|Ghl9=_#S>&RI|0fWz~K7?lg*x!qp_P| zaj&5|Y5_f(YEdoW-BQcy2;Ob=aY0+N*HQNi&YHaib(cDUZ;R?~brSC-b&ooQ_p625%!!id6Rga(m^D{0RFxyiJ ze8klH4gA_~ZORg+i7!*@G)f}X$rmpMdNdeqe%XtFDOdFX4q?*^uMJ`;l(c&rktXmc zbbupy_+RG307w$7DcY_rJA+YyXKB0acyOyrfl_`s>1Q!ACO8{b2TIW~Cnw51*E;qE zyx9>C%hW>IFE?ItURJwlWuz>}TL3?-mP9((3=dGEX(nGrD*VE+a^)FZ9b)sHmd!Yz zpTeA}qXHNU1(RiBoCJnvDl4v3RupvXi|EmV56kuC=rFByy@n4mYuCxNNme{MquSSD z3>`_A3((B!2gx~tdCnje3^+a`X9mrOI6$&)anvfZsc}Qle&5PKqc8wdbFcdI%<4=4 z?B!I6^hB&RP0zp91kO;6N!EP{g>2mds`jAn@J>+lf{r~KEmrP_?2NYY)8f~R%Qk$J zvkzwKFJh{$E1ICG3Yshtolfn|Fu^D8RwSyQX6Z}JsI-T7rFTX?JQbu(AqjU$K%Kjp1IP$4^|SasZ@qMx-%fNwgNQ<3@<8{0>*g$-Gk@Mt=Us=5Te*48~=&ux&3y@YX%|NCJPOug~9WPWzIK6pNA6P54iYYfXTl?cL=0ibDjSy z`hHZj$Ltf#9%IIxX6PVrCg>nIL9G)D(_djpjDqT+UtxBZ*^D7l5(hx`eFiB$g$&`L z+iAD#{IAimJ9f)Otto6qG}T4`k%j$z^shbK))}PuUSx*?C0jWNWpIFni*IN$>=2A| z0#Ynd45I>Hq6WVf#Vo|pj_!wh?VsaAFJ-Z+jMg)qV}`k)L75Jaa?t1?ex2NR+g(xZ*Wo$l>62)OO)6%YCDb03k2C5K{k8kE-(d+E? zO=fah5eCTKX=xEs$zCYU63LvYNU?A9s)t1k9jA5%a)J-S*lJv)zkn{O9bW4nm${kLT}=S&|KO}qr7yXy~s6Rbi&XN zF-O#EqhvD{9-kWErY4Jg_v|Z+a*0=b~=wFDL0K*g1`{na4pZ_)l|>RB^;GQ5=kDO5fJ7A5@E=g@JUtlvj_ z7425boDyqLi z%B}9WH9^!c<55X)<+`!Pufi4F;vC#}#TEPruHYm2S2aa$T=M4P