104 lines
3.4 KiB
Markdown
104 lines
3.4 KiB
Markdown
---
|
|
layout: default
|
|
title: "Node"
|
|
parent: "Core Abstraction"
|
|
nav_order: 1
|
|
---
|
|
|
|
# Node
|
|
|
|
A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
|
|
|
|
1. `prep(shared)`
|
|
- **Read and preprocess data** from `shared` store.
|
|
- Examples: *query DB, read files, or serialize data into a string*.
|
|
- Return `prep_res`, which is used by `exec()` and `post()`.
|
|
|
|
2. `exec(prep_res)`
|
|
- **Execute compute logic**, with optional retries and error handling (below).
|
|
- Examples: *(mostly) LLM calls, remote APIs, tool use*.
|
|
- ⚠️ This shall be only for compute and **NOT** access `shared`.
|
|
- ⚠️ If retries enabled, ensure idempotent implementation.
|
|
- Return `exec_res`, which is passed to `post()`.
|
|
|
|
3. `post(shared, prep_res, exec_res)`
|
|
- **Postprocess and write data** back to `shared`.
|
|
- Examples: *update DB, change states, log results*.
|
|
- **Decide the next action** by returning a *string* (`action = "default"` if *None*).
|
|
|
|
> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately.
|
|
>
|
|
> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data.
|
|
{: .note }
|
|
|
|
|
|
### Fault Tolerance & Retries
|
|
|
|
You can **retry** `exec()` if it raises an exception via two parameters when define the Node:
|
|
|
|
- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry).
|
|
- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting).
|
|
`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
|
|
|
|
```python
|
|
my_node = SummarizeFile(max_retries=3, wait=10)
|
|
```
|
|
|
|
When an exception occurs in `exec()`, the Node automatically retries until:
|
|
|
|
- It either succeeds, or
|
|
- The Node has retried `max_retries - 1` times already and fails on the last attempt.
|
|
|
|
You can get the current retry times (0-based) from `self.cur_retry`.
|
|
|
|
```python
|
|
class RetryNode(Node):
|
|
def exec(self, prep_res):
|
|
print(f"Retry {self.cur_retry} times")
|
|
raise Exception("Failed")
|
|
```
|
|
|
|
### Graceful Fallback
|
|
|
|
To **gracefully handle** the exception (after all retries) rather than raising it, override:
|
|
|
|
```python
|
|
def exec_fallback(self, shared, prep_res, exc):
|
|
raise exc
|
|
```
|
|
|
|
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`.
|
|
|
|
### Example: Summarize file
|
|
|
|
```python
|
|
class SummarizeFile(Node):
|
|
def prep(self, shared):
|
|
return shared["data"]
|
|
|
|
def exec(self, prep_res):
|
|
if not prep_res:
|
|
return "Empty file content"
|
|
prompt = f"Summarize this text in 10 words: {prep_res}"
|
|
summary = call_llm(prompt) # might fail
|
|
return summary
|
|
|
|
def exec_fallback(self, shared, prep_res, exc):
|
|
# Provide a simple fallback instead of crashing
|
|
return "There was an error processing your request."
|
|
|
|
def post(self, shared, prep_res, exec_res):
|
|
shared["summary"] = exec_res
|
|
# Return "default" by not returning
|
|
|
|
summarize_node = SummarizeFile(max_retries=3)
|
|
|
|
# node.run() calls prep->exec->post
|
|
# If exec() fails, it retries up to 3 times before calling exec_fallback()
|
|
action_result = summarize_node.run(shared)
|
|
|
|
print("Action returned:", action_result) # "default"
|
|
print("Summary stored:", shared["summary"])
|
|
```
|
|
|