3.4 KiB
| layout | title | parent | nav_order |
|---|---|---|---|
| default | Node | Core Abstraction | 1 |
Node
A Node is the smallest building block. Each Node has 3 steps prep->exec->post:
-
prep(shared)- Read and preprocess data from
sharedstore. - Examples: query DB, read files, or serialize data into a string.
- Return
prep_res, which is used byexec()andpost().
- Read and preprocess data from
-
exec(prep_res)- Execute compute logic, with optional retries and error handling (below).
- Examples: (mostly) LLM calls, remote APIs, tool use.
- ⚠️ This shall be only for compute and NOT access
shared. - ⚠️ If retries enabled, ensure idempotent implementation.
- Return
exec_res, which is passed topost().
-
post(shared, prep_res, exec_res)- Postprocess and write data back to
shared. - Examples: update DB, change states, log results.
- Decide the next action by returning a string (
action = "default"if None).
- Postprocess and write data back to
Why 3 steps? To enforce the principle of separation of concerns. The data model are operated separately from the business logic on them.
All steps are optional. E.g., you can only implement
prepandpostif you just need to process data. {: .note }
Fault Tolerance & Retries
You can retry exec() if it raises an exception via two parameters when define the Node:
max_retries(int): Max times to runexec(). The default is1(no retry).wait(int): The time to wait (in seconds) before next retry. By default,wait=0(no waiting).waitis helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
my_node = SummarizeFile(max_retries=3, wait=10)
When an exception occurs in exec(), the Node automatically retries until:
- It either succeeds, or
- The Node has retried
max_retries - 1times already and fails on the last attempt.
You can get the current retry times (0-based) from self.cur_retry.
class RetryNode(Node):
def exec(self, prep_res):
print(f"Retry {self.cur_retry} times")
raise Exception("Failed")
Graceful Fallback
To gracefully handle the exception (after all retries) rather than raising it, override:
def exec_fallback(self, shared, prep_res, exc):
raise exc
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the exec_res passed to post().
Example: Summarize file
class SummarizeFile(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, shared, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])