callouts dont work...

This commit is contained in:
zachary62 2025-01-02 05:19:25 +00:00
parent b88c319778
commit ec8d5b0c48
14 changed files with 1045 additions and 13 deletions

View File

@ -5,6 +5,9 @@ description: Minimalist LLM Framework in 100 Lines, Enabling LLMs to Program The
# Theme settings # Theme settings
remote_theme: just-the-docs/just-the-docs remote_theme: just-the-docs/just-the-docs
# Navigation
nav_sort: case_sensitive
# Aux links (shown in upper right) # Aux links (shown in upper right)
aux_links: aux_links:
"View on GitHub": "View on GitHub":
@ -18,9 +21,3 @@ mermaid:
# Default configuration # Default configuration
config: | config: |
directionLR directionLR
callouts:
warning:
title: Warning
color: red

61
docs/async.md Normal file
View File

@ -0,0 +1,61 @@
---
layout: default
title: "(Advanced) Async"
parent: "Core Abstraction"
nav_order: 5
---
# (Advanced) Async
**Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for:
1. **prep_async()**
- For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way.
2. **exec_async()**
- Typically used for async LLM calls.
3. **post_async()**
- For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`.
**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes.
### Example
```python
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
# Example: read a file asynchronously
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
# Example: async LLM call
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
# Example: wait for user feedback
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "approve"
return "deny"
summarize_node = SummarizeThenVerify()
final_node = Finalize()
# Define transitions
summarize_node - "approve" >> final_node
summarize_node - "deny" >> summarize_node # retry
flow = AsyncFlow(start=summarize_node)
async def main():
shared = {"doc_path": "document.txt"}
await flow.run_async(shared)
print("Final Summary:", shared.get("summary"))
asyncio.run(main())
```

107
docs/batch.md Normal file
View File

@ -0,0 +1,107 @@
---
layout: default
title: "Batch"
parent: "Core Abstraction"
nav_order: 4
---
# Batch
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Handy for:
- **Chunk-based** processing (e.g., splitting large texts).
- **Multi-file** processing.
- **Iterating** over lists of params (e.g., user queries, documents, URLs).
## 1. BatchNode
A **BatchNode** extends `Node` but changes `prep()` and `exec()`:
- **`prep(shared)`**: returns an **iterable** (e.g., list, generator).
- **`exec(item)`**: called **once** per item in that iterable.
- **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**.
### Example: Summarize a Large File
```python
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"].get("large_text.txt", "")
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"]["large_text.txt"] = combined
return "default"
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)
```
---
## 2. BatchFlow
A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set.
### Example: Summarize Many Files
```python
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)
```
### Under the Hood
1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`.
2. The **BatchFlow** loops through each dict. For each one:
- It merges the dict with the BatchFlows own `params`.
- It calls `flow.run(shared)` using the merged result.
3. This means the sub-Flow is run **repeatedly**, once for every param dict.
---
## 3. Nested or Multi-Level Batches
You can nest a **BatchFlow** in another **BatchFlow**. For instance:
- **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...).
- **Inner** batch: returning a list of per-file param dicts.
At each level, **BatchFlow** merges its own param dict with the parents. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
```python
class FileBatchFlow(BatchFlow):
def prep(self, shared):
directory = self.params["directory"]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)
```

138
docs/communication.md Normal file
View File

@ -0,0 +1,138 @@
---
layout: default
title: "Communication"
parent: "Core Abstraction"
nav_order: 3
---
# Communication
Nodes and Flows **communicate** in two ways:
1. **Shared Store** A global data structure (often an in-mem dict) that all nodes can read from and write to. Every Nodes `prep()` and `post()` methods receive the **same** `shared` store.
2. **Params** Each node and Flow has a `params` dict assigned by the **parent Flow**. Params mostly serve as identifiers, letting each node/flow know what task its assigned.
If you know memory management, **Shared Store** is like a **heap** shared across function calls, while **Params** is like a **stack** assigned by parent function calls.
> **Why not use other communication models like Message Passing?** *Message passing* works well for simple DAGs, but with *nested graphs* (Flows containing Flows, repeated or cyclic calls), routing messages becomes hard to maintain. A shared store keeps the design simple and easy.
{: .note }
---
## 1. Shared Store
### Overview
A shared store is typically an in-mem dictionary, like:
```python
shared = {"data": {}, "summary": {}, "config": {...}, ...}
```
It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
### Example
```python
class LoadData(Node):
def prep(self, shared):
# Suppose we read from disk or an API
shared["data"]["my_file.txt"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
# We can read what LoadData wrote
content = shared["data"].get("my_file.txt", "")
return content
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
shared["summary"]["my_file.txt"] = exec_res
return "default"
load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)
shared = {}
flow.run(shared)
```
Here:
- `LoadData` writes to `shared["data"]`.
- `Summarize` reads from the same location.
No special data-passing—just the same `shared` object.
---
## 2. Params
**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are:
- **Immutable** during a Nodes run cycle (i.e., they dont change mid-`prep`, `exec`, `post`).
- **Set** via `set_params()`.
- **Cleared** and updated each time a parent Flow calls it.
> Only set the uppermost Flow params because others will be overwritten by the parent Flow. If you need to set child node params, see [Batch](./batch.md).
{: .warning }
Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
### Example
```python
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
# 2) Set params
node = SummarizeFile()
# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)
# 4) Create Flow
flow = Flow(start=node)
# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1
```
---
## 3. Shared Store vs. Params
Think of the **Shared Store** like a heap and **Params** like a stack.
- **Shared Store**:
- Public, global.
- You can design and populate ahead, e.g., for the input to process.
- Great for data results, large content, or anything multiple nodes need.
- Keep it tidy—structure it carefully (like a mini schema).
- **Params**:
- Local, ephemeral.
- Passed in by parent Flows. You should only set it for the uppermost flow.
- Perfect for small values like filenames or numeric IDs.
- Do **not** persist across different nodes and are reset.

6
docs/core_abstraction.md Normal file
View File

@ -0,0 +1,6 @@
---
layout: default
title: "Core Abstraction"
nav_order: 2
has_children: true
---

171
docs/flow.md Normal file
View File

@ -0,0 +1,171 @@
---
layout: default
title: "Flow"
parent: "Core Abstraction"
nav_order: 2
---
# Flow
A **Flow** orchestrates how Nodes connect and run, based on **Actions** returned from each Nodes `post()` method. You can chain Nodes in a sequence or create branching logic depending on the **Action** string.
## 1. Action-based Transitions
Each Node's `post(shared, prep_res, exec_res)` method returns an **Action** string. By default, if `post()` doesn't explicitly return anything, we treat that as `"default"`.
You define transitions with the syntax:
1. Basic default transition: `node_a >> node_b`
This means if `node_a.post()` returns `"default"` (or `None`), go to `node_b`.
(Equivalent to `node_a - "default" >> node_b`)
2. Named action transition: `node_a - "action_name" >> node_b`
This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
Its possible to create loops, branching, or multi-step flows.
## 2. Creating a Flow
A **Flow** begins with a **start** node (or flow). You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the first node, looks at its `post()` return Action, follows the corresponding transition, and continues until theres no next node or you explicitly stop.
### Example: Simple Sequence
Heres a minimal flow of two nodes in a chain:
```python
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)
```
- When you run the flow, it executes `node_a`.
- Suppose `node_a.post()` returns `"default"`.
- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`.
- If `node_b.post()` returns `"default"` but we didnt define `node_b >> something_else`, the flow ends there.
### Example: Branching & Looping
Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions:
- `"approved"`: expense is approved, move to payment processing
- `"needs_revision"`: expense needs changes, send back for revision
- `"rejected"`: expense is denied, finish the process
We can wire them like this:
```python
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)
```
Let's see how it flows:
1. If `review.post()` returns `"approved"`, the expense moves to `payment` node
2. If `review.post()` returns `"needs_revision"`, it goes to `revise` node, which then loops back to `review`
3. If `review.post()` returns `"rejected"`, it moves to `finish` node and stops
```mermaid
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
```
### Running Individual Nodes vs. Running a Flow
- `node.run(shared)`: Just runs that node alone (calls `prep()`, `exec()`, `post()`), returns an Action.
- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow cant continue (no next node or no next Action).
> node.run(shared) **does not** proceed automatically to the successor and may use incorrect parameters.
> This is mainly for debugging or testing a single node.
> Always use `flow.run(...)` in production to ensure the full pipeline runs correctly.
{: .warning }
## 3. Nested Flows
A **Flow** can act like a Node, which enables powerful composition patterns. This means you can:
1. Use a Flow as a Node within another Flow's transitions.
2. Combine multiple smaller Flows into a larger Flow for reuse.
3. Node `params` will be a merging of **all** parents' `params`.
### Basic Flow Nesting
Here's how to connect a flow to another node:
```python
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)
```
When `parent_flow.run()` executes:
1. It starts `subflow`
2. `subflow` runs through its nodes (`node_a` then `node_b`)
3. After `subflow` completes, execution continues to `node_c`
### Example: Order Processing Pipeline
Here's a practical example that breaks down order processing into nested flows:
```python
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
```
This creates a clean separation of concerns while maintaining a clear execution path:
```mermaid
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end
```

View File

@ -4,14 +4,60 @@ title: "Home"
nav_order: 1 nav_order: 1
--- ---
> **Heads up!** This is a note callout. # Mini LLM Flow
A [100-line](https://github.com/zachary62/miniLLMFlow/blob/main/minillmflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*.
We model the LLM workflow as a **Nested Directed Graph**:
- **Nodes** handle simple (LLM) tasks.
- Nodes connect through **Actions** (labeled edges) for *Agents*.
- **Flows** orchestrate a directed graph of Nodes for *Task Decomposition*.
- A Flow can be used as a Node (for **Nesting**).
- **Batch** Nodes/Flows for data-intensive tasks.
- **Async** Nodes/Flows allow waits or **Parallel** execution
<div align="center">
<img src="https://github.com/zachary62/miniLLMFlow/blob/main/assets/minillmflow.jpg?raw=true" width="400"/>
</div>
{: .note } {: .note }
> Have questions? Chat with [AI Assistant](https://chatgpt.com/g/g-677464af36588191b9eba4901946557b-mini-llm-flow-assistant)
## Core Abstraction
- [Node](./node.md)
- [Flow](./flow.md)
- [Communication](./communication.md)
- [Batch](./batch.md)
- [(Advanced) Async](./async.md)
- [(Advanced) Parallel](./parallel.md)
## Low-Level Details
- [LLM Wrapper](./llm.md)
- [Tool](./tool.md)
> This is a tip callout.
{: .tip }
> This is a warning callout.
{: .warning } {: .warning }
> We do not provide built-in implementation for low-level details. Example implementations are provided as reference.
> This is an important callout.
{: .important } ## High-Level Paradigm
- [Structured Output](./structure.md)
- Task Decomposition
- Map Reduce
- RAG
- Chat Memory
- Agent
- Multi-Agent
- Evaluation
## Example Projects
- Coming soon ...

69
docs/llm.md Normal file
View File

@ -0,0 +1,69 @@
---
layout: default
title: "LLM Wrapper"
parent: "Preparation"
nav_order: 1
---
# LLM Wrappers
We **don't** provide built-in LLM wrappers. Instead, please implement your own, for example by asking an assistant like ChatGPT or Claude. If you ask ChatGPT to "implement a `call_llm` function that takes a prompt and returns the LLM response," you shall get something like:
```python
def call_llm(prompt):
from openai import OpenAI
# Set the OpenAI API key (use environment variables, etc.)
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
# Example usage
call_llm("How are you?")
```
## Improvements
Feel free to enhance your `call_llm` function as needed. Here are examples:
- Handle chat history:
```python
def call_llm(messages):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4",
messages=messages
)
return r.choices[0].message.content
```
- Add in-memory caching:
```python
from functools import lru_cache
@lru_cache(maxsize=1000)
def call_llm(prompt):
# Your implementation here
pass
```
- Enable logging:
```python
def call_llm(prompt):
import logging
logging.info(f"Prompt: {prompt}")
response = ... # Your implementation here
logging.info(f"Response: {response}")
return response
```
## Why Not Provide Built-in LLM Wrappers?
I believe it is a **bad practice** to provide LLM-specific implementations in a general framework:
- **LLM APIs change frequently**. Hardcoding them makes maintenance a nighmare.
- You may need **flexibility** to switch vendors, use fine-tuned models, or deploy local LLMs.
- You may need **optimizations** like prompt caching, request batching, or response streaming.

95
docs/node.md Normal file
View File

@ -0,0 +1,95 @@
---
layout: default
title: "Node"
parent: "Core Abstraction"
nav_order: 1
---
# Node
A **Node** is the smallest building block of Mini LLM Flow. Each Node has 3 steps:
1. **`prep(shared)`**
- Reads and preprocesses data from the `shared` store for LLMs.
- Examples: *query DB, read files, or serialize data into a string*.
- Returns `prep_res`, which will be passed to both `exec()` and `post()`.
2. **`exec(prep_res)`**
- The main execution step where the LLM is called.
- Optionally has built-in retry and error handling (below).
- ⚠️ If retry enabled, ensure implementation is idempotent.
- Returns `exec_res`, which is passed to `post()`.
3. **`post(shared, prep_res, exec_res)`**
- Writes results back to the `shared` store or decides the next action.
- Examples: *finalize outputs, trigger next steps, or log results*.
- Returns a **string** to specify the next action (`"default"` if nothing or `None` is returned).
> All 3 steps are optional. For example, you might only need to run the Prep without calling the LLM.
{: .note }
## Fault Tolerance & Retries
Nodes in Mini LLM Flow can **retry** execution if `exec()` raises an exception. You control this via two parameters when you create the Node:
- `max_retries` (int): How many times to try running `exec()`. The default is `1`, which means **no** retry.
- `wait` (int): The time to wait (in **seconds**) before each retry attempt. By default, `wait=0` (i.e., no waiting). Increasing this is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
```python
my_node = SummarizeFile(max_retries=3, wait=10)
```
When an exception occurs in `exec()`, the Node automatically retries until:
- It either succeeds, or
- The Node has retried `max_retries - 1` times already and fails on the last attempt.
### Graceful Fallback
If you want to **gracefully handle** the error rather than raising it, you can override:
```python
def exec_fallback(self, shared, prep_res, exc):
raise exc
```
By default, it just re-raises `exc`. But you can return a fallback result instead.
That fallback result becomes the `exec_res` passed to `post()`.
## Example
```python
class SummarizeFile(Node):
def prep(self, shared):
filename = self.params["filename"]
return shared["data"][filename]
def exec(self, prep_res):
if not prep_res:
raise ValueError("Empty file content!")
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, shared, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
# Return "default" by not returning anything
summarize_node = SummarizeFile(max_retries=3)
# Run the node standalone for testing (calls prep->exec->post).
# If exec() fails, it retries up to 3 times before calling exec_fallback().
summarize_node.set_params({"filename": "test_file.txt"})
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # Usually "default"
print("Summary stored:", shared["summary"].get("test_file.txt"))
```

6
docs/paradigm.md Normal file
View File

@ -0,0 +1,6 @@
---
layout: default
title: "Paradigm"
nav_order: 4
has_children: true
---

54
docs/parallel.md Normal file
View File

@ -0,0 +1,54 @@
---
layout: default
title: "(Advanced) Parallel"
parent: "Core Abstraction"
nav_order: 6
---
# (Advanced) Parallel
**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute.
## AsyncParallelBatchNode
Like **AsyncBatchNode**, but run `exec_async()` in **parallel**:
```python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
# e.g., multiple texts
return shared["texts"]
async def exec_async(self, text):
prompt = f"Summarize: {text}"
return await call_llm_async(prompt)
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"
node = ParallelSummaries()
flow = AsyncFlow(start=node)
```
## AsyncParallelBatchFlow
Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters:
```python
class SummarizeMultipleFiles(AsyncParallelBatchFlow):
async def prep_async(self, shared):
return [{"filename": f} for f in shared["files"]]
sub_flow = AsyncFlow(start=LoadAndSummarizeFile())
parallel_flow = SummarizeMultipleFiles(start=sub_flow)
await parallel_flow.run_async(shared)
```
## Best Practices
- **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize.
- **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals).
- **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits.

6
docs/preparation.md Normal file
View File

@ -0,0 +1,6 @@
---
layout: default
title: "Preparation"
nav_order: 3
has_children: true
---

111
docs/structure.md Normal file
View File

@ -0,0 +1,111 @@
---
layout: default
title: "Structured Output"
parent: "Paradigm"
nav_order: 1
---
# Structured Output
In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys.
There are several approaches to achieve a structured output:
- **Prompting** the LLM to strictly return a defined structure.
- Using LLMs that natively support **schema enforcement**.
- **Post-processing** the LLMs response to extract structured content.
In practice, **Prompting** is simple and reliable for modern LLMs.
### Example Use Cases
- Extracting Key Information
```yaml
product:
name: Widget Pro
price: 199.99
description: |
A high-quality widget designed for professionals.
Recommended for advanced users.
```
- Summarizing Documents into Bullet Points
```yaml
summary:
- This product is easy to use.
- It is cost-effective.
- Suitable for all skill levels.
```
- Generating Configuration Files
```yaml
server:
host: 127.0.0.1
port: 8080
ssl: true
```
## Prompt Engineering
When prompting the LLM to produce **structured** output:
1. **Wrap** the structure in code fences (e.g., ` ```yaml`).
2. **Validate** that all required fields exist (and let `Node` handles retry).
### Example Text Summarization
```python
class SummarizeNode(Node):
def exec(self, prep_res):
# Suppose `prep_res` is the text to summarize.
prompt = f"""
Please summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Now, output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_result
```
### Why YAML instead of JSON?
Current LLMs struggle with escaping. YAML is easier with strings since they dont always need quotes.
**In JSON**
```json
{
"dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\""
}
```
- Every double quote inside the string must be escaped with `\"`.
- Each newline in the dialogue must be represented as `\n`.
**In YAML**
```yaml
dialogue: |
Alice said: "Hello Bob.
How are you?
I am good."
```
- No need to escape interior quotes—just place the entire text under a block literal (`|`).
- Newlines are naturally preserved without needing `\n`.

165
docs/tool.md Normal file
View File

@ -0,0 +1,165 @@
---
layout: default
title: "Tool"
parent: "Preparation"
nav_order: 2
---
# Tool
Similar to LLM wrappers, we **don't** provide built-in tools. Here, we recommend some *minimal* (and incomplete) implementations of commonly used tools. These examples can serve as a starting point for your own tooling.
---
## 1. Embedding Calls
```python
def get_embedding(text):
import openai
# Set your API key elsewhere, e.g., environment variables
r = openai.Embedding.create(
model="text-embedding-ada-002",
input=text
)
return r["data"][0]["embedding"]
```
---
## 2. Vector Database (Faiss)
```python
import faiss
import numpy as np
def create_index(embeddings):
dim = len(embeddings[0])
index = faiss.IndexFlatL2(dim)
index.add(np.array(embeddings).astype('float32'))
return index
def search_index(index, query_embedding, top_k=5):
D, I = index.search(
np.array([query_embedding]).astype('float32'),
top_k
)
return I, D
```
---
## 3. Local Database
```python
import sqlite3
def execute_sql(query):
conn = sqlite3.connect("mydb.db")
cursor = conn.cursor()
cursor.execute(query)
result = cursor.fetchall()
conn.commit()
conn.close()
return result
```
---
## 4. Python Function Execution
```python
def run_code(code_str):
env = {}
exec(code_str, env)
return env
```
---
## 5. PDF Extraction
```python
def extract_text_from_pdf(file_path):
import PyPDF2
pdfFileObj = open(file_path, "rb")
reader = PyPDF2.PdfReader(pdfFileObj)
text = ""
for page in reader.pages:
text += page.extract_text()
pdfFileObj.close()
return text
```
---
## 6. Web Crawling
```python
def crawl_web(url):
import requests
from bs4 import BeautifulSoup
html = requests.get(url).text
soup = BeautifulSoup(html, "html.parser")
return soup.title.string, soup.get_text()
```
---
## 7. Basic Search (SerpAPI example)
```python
def search_google(query):
import requests
params = {
"engine": "google",
"q": query,
"api_key": "YOUR_API_KEY"
}
r = requests.get("https://serpapi.com/search", params=params)
return r.json()
```
---
## 8. Audio Transcription (OpenAI Whisper)
```python
def transcribe_audio(file_path):
import openai
audio_file = open(file_path, "rb")
transcript = openai.Audio.transcribe("whisper-1", audio_file)
return transcript["text"]
```
---
## 9. Text-to-Speech (TTS)
```python
def text_to_speech(text):
import pyttsx3
engine = pyttsx3.init()
engine.say(text)
engine.runAndWait()
```
---
## 10. Sending Email
```python
def send_email(to_address, subject, body, from_address, password):
import smtplib
from email.mime.text import MIMEText
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = from_address
msg["To"] = to_address
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(from_address, password)
server.sendmail(from_address, [to_address], msg.as_string())
```