This commit is contained in:
zachary62 2024-12-28 04:05:08 +00:00
parent cf017707ef
commit fd54f325bc
8 changed files with 220 additions and 128 deletions

View File

@ -30,7 +30,8 @@ Hence, I built this framework that lets LLMs focus on what matters. It turns out
<img src="/assets/minillmflow.jpg" width="400"/>
</div>
## Example
## Example LLM apps
- Beginner Tutorial on [Text summarization + QA agent](https://colab.research.google.com/github/zachary62/miniLLMFlow/blob/main/cookbook/demo.ipynb)
- Have questions for this tutorial? Try [this prompt](https://chatgpt.com/share/676f16d2-7064-8000-b9d7-f6874346a6b5)
- Beginner Tutorial: [Text summarization for Paul Graham Essay + QA agent](https://colab.research.google.com/github/zachary62/miniLLMFlow/blob/main/cookbook/demo.ipynb)
- Have questions for this tutorial? Ask LLM assistants through [this prompt](https://chatgpt.com/share/676f16d2-7064-8000-b9d7-f6874346a6b5)

View File

@ -1,7 +1,7 @@
---
layout: default
title: "Async"
nav_order: 6
nav_order: 7
---
# Async
@ -15,7 +15,7 @@ nav_order: 6
An **AsyncNode** is like a normal `Node`, except `exec()` (and optionally `prep()`) can be declared **async**. You can `await` inside these methods. For example:
``
```python
class AsyncSummarizeFile(AsyncNode):
async def prep(self, shared):
# Possibly do async file reads or small concurrency tasks
@ -37,7 +37,7 @@ class AsyncSummarizeFile(AsyncNode):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
``
```
- **`prep(shared)`** can be `async def` if you want to do asynchronous pre-processing.
- **`exec(shared, prep_res)`** is typically the main place for async logic.
@ -49,7 +49,7 @@ An **AsyncFlow** is a Flow where nodes can be **AsyncNode**s or normal `Node`s.
### Minimal Example
``
```python
class MyAsyncFlow(AsyncFlow):
pass # Usually, you just instantiate AsyncFlow with a start node
@ -70,7 +70,7 @@ async def main():
await my_flow.run(shared)
asyncio.run(main())
``
```
- If the start node or any subsequent node is an `AsyncNode`, the Flow automatically calls its `prep()`, `exec()`, `post()` as async functions.
- You can mix normal `Node`s and `AsyncNode`s in the same flow. **AsyncFlow** will handle the difference seamlessly.
@ -79,7 +79,7 @@ asyncio.run(main())
If you want to run a batch of flows **concurrently**, you can use `BatchAsyncFlow`. Like `BatchFlow`, it generates a list of parameter sets in `prep()`, but each iteration runs the sub-flow asynchronously.
``
```python
class SummarizeAllFilesAsync(BatchAsyncFlow):
async def prep(self, shared):
# Return a list of param dicts (like in BatchFlow),
@ -94,7 +94,7 @@ all_files_flow = SummarizeAllFilesAsync(start=async_summarize_flow)
# Then in your async context:
await all_files_flow.run(shared)
``
```
Under the hood:
1. `prep()` returns a list of param sets.
@ -105,7 +105,7 @@ Under the hood:
Just like normal Nodes, an `AsyncNode` can have `max_retries` and a `process_after_fail(...)` method:
``
```python
class RetryAsyncNode(AsyncNode):
def __init__(self, max_retries=3):
super().__init__(max_retries=max_retries)
@ -118,7 +118,7 @@ class RetryAsyncNode(AsyncNode):
def process_after_fail(self, shared, prep_res, exc):
# Provide fallback response
return "Unable to complete async call due to error."
``
```
## 5. Best Practices

View File

@ -1,7 +1,7 @@
---
layout: default
title: "Batch"
nav_order: 5
nav_order: 6
---
# Batch
@ -22,7 +22,7 @@ A **BatchNode** extends `Node` but changes how `prep()` and `exec()` behave:
### Example: Map Summaries
``
```python
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; we want to chunk it
@ -43,14 +43,14 @@ class MapSummaries(BatchNode):
combined = "\n".join(exec_res_list)
shared["summary"]["large_text.txt"] = combined
return "default"
``
```
**Flow** usage:
``
```python
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)
``
```
- After `prep()` returns multiple chunks, `exec()` is called for each chunk.
- The aggregated `exec_res_list` is passed to `post()`, where you can do final processing.
@ -69,7 +69,7 @@ A **BatchFlow** runs a **Flow** multiple times, each time with a different set o
### Example: Summarize Many Files
``
```python
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# Return a list of parameter dicts (one per file)
@ -78,11 +78,11 @@ class SummarizeAllFiles(BatchFlow):
return params_list
# No custom exec() or post(), so we rely on BatchFlows default
``
```
Then define a **Flow** that handles **one** file. Suppose we have `Flow(start=summarize_file)`.
``
```python
# Example "per-file" flow (just one node):
summarize_file = SummarizeFile()
@ -94,7 +94,7 @@ summarize_all_files = SummarizeAllFiles(start=summarize_file)
# Running it:
summarize_all_files.run(shared)
``
```
**Under the hood**:
1. `prep(shared)` in `SummarizeAllFiles` returns a list of param dicts, e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`.
@ -125,7 +125,7 @@ This can be done by making the **outer** BatchFlows `exec()` return a list of
## 4. Putting It All Together
``
```python
# We'll combine the ideas:
class MapSummaries(BatchNode):
def prep(self, shared):
@ -157,7 +157,7 @@ class SummarizeAllFiles(BatchFlow):
# For now, let's just show usage:
summarize_all = SummarizeAllFiles(start=map_flow)
summarize_all.run(shared)
``
```
In this snippet:

View File

@ -1,7 +1,7 @@
---
layout: default
title: "Communication"
nav_order: 4
nav_order: 5
---
# Communication
@ -20,9 +20,9 @@ This design avoids complex message-passing or data routing. It also lets you **n
### Overview
A shared store is typically a Python dictionary, like:
``
```python
shared = {"data": {}, "summary": {}, "config": { ... }, ...}
``
```
Every Nodes `prep()`, `exec()`, and `post()` methods receive the **same** `shared` object. This makes it easy to:
- Read data that another Node loaded, such as a text file or database record.
@ -31,7 +31,7 @@ Every Nodes `prep()`, `exec()`, and `post()` methods receive the **same** `sh
### Example
``
```python
class LoadData(Node):
def prep(self, shared):
# Suppose we read from disk or an API
@ -59,7 +59,7 @@ class Summarize(Node):
def post(self, shared, prep_res, exec_res):
shared["summary"]["my_file.txt"] = exec_res
return "default"
``
```
Here,
- `LoadData` writes to `shared["data"]`.
@ -86,7 +86,7 @@ Common examples:
### Example
``
```python
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
@ -109,7 +109,7 @@ node.set_params({"filename": "doc1.txt"})
# 3) Run
node.run(shared)
``
```
Because **params** are only for that Node, you dont pollute the global `shared` with fields that might only matter to one operation.
@ -147,7 +147,7 @@ Because **params** are only for that Node, you dont pollute the global `share
## Putting It All Together
``
```python
# Suppose you have a flow:
load_data >> summarize_file
my_flow = Flow(start=load_data)
@ -164,7 +164,7 @@ shared = {
my_flow.run(shared)
# After run, shared["summary"]["my_text.txt"] might have the LLM summary
``
```
- `load_data` uses its param (`"path"`) to load some data into `shared["data"]`.
- `summarize_file` uses its param (`"filename"`) to pick which file from `shared["data"]` to summarize.

View File

@ -1,74 +1,33 @@
---
layout: default
title: "Flow"
nav_order: 2
nav_order: 4
---
# Flow
In **Mini LLM Flow**, a **Flow** orchestrates how Nodes connect and run, based on **Actions** returned from each Nodes `post()` method. You can chain Nodes in a sequence or create branching logic depending on the **Action** string.
A **Flow** orchestrates how Nodes connect and run, based on **Actions** returned from each Nodes `post()` method. You can chain Nodes in a sequence or create branching logic depending on the **Action** string.
## Action-based Transitions
Each Nodes `post(shared, prep_res, exec_res)` returns a string called **Action**. By default, if `post()` doesnt explicitly return anything, we treat that as `"default"`.
Each Node's `post(shared, prep_res, exec_res)` method returns an **Action** string. By default, if `post()` doesn't explicitly return anything, we treat that as `"default"`.
You define transitions with the syntax:
```python
node_a >> node_b
```
- This means if `node_a.post()` returns `"default"` (or `None`), go to `node_b`.
1. Basic default transition: `node_a >> node_b`
This means if `node_a.post()` returns `"default"` (or `None`), go to `node_b`.
(Equivalent to `node_a - "default" >> node_b`)
```python
node_a - "action_name" >> node_b
```
- This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
2. Named action transition: `node_a - "action_name" >> node_b`
This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
Its possible to create loops, branching, or multi-step flows. You can also chain with multiple Actions from a single node to different successors:
```python
# Define nodes for order processing
validate_order = ValidateOrderNode()
check_inventory = CheckInventoryNode()
process_payment = ProcessPaymentNode()
send_confirmation = SendConfirmationNode()
notify_backorder = NotifyBackorderNode()
# Define the flow
validate_order - "valid" >> check_inventory
validate_order - "invalid" >> send_confirmation # Send rejection confirmation
check_inventory - "in_stock" >> process_payment
check_inventory - "out_of_stock" >> notify_backorder
process_payment - "success" >> send_confirmation
process_payment - "failure" >> send_confirmation # Send payment failure notice
```
```mermaid
flowchart TD
validate[Validate Order] -->|valid| inventory[Check Inventory]
validate -->|invalid| confirm[Send Confirmation]
inventory -->|in_stock| payment[Process Payment]
inventory -->|out_of_stock| backorder[Notify Backorder]
payment -->|success| confirm
payment -->|failure| confirm
style validate fill:#d4f1f9
style confirm fill:#d4f1f9
```
Its possible to create loops, branching, or multi-step flows. You can also chain with multiple Actions from a single node to different successors.
## Creating a Flow
A **Flow** begins with a **start** node (or flow). You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the first node, looks at its `post()` return Action, follows the corresponding transition, and continues until theres no next node or you explicitly stop.
```flow = Flow(start=node_a)```
## Example: Simple Sequence
### Example: Simple Sequence
Heres a minimal flow of two nodes in a chain:
@ -83,69 +42,123 @@ flow.run(shared)
- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`.
- If `node_b.post()` returns `"default"` but we didnt define `node_b >> something_else`, the flow ends there.
## Example: Branching & Looping
### Example: Branching & Looping
Suppose `FindRelevantFile` can return three possible Actions in its `post()`:
Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions:
- `"end"`: means no question, so stop.
- `"answer"`: means we have a relevant file, move to `AnswerQuestion`.
- `"retry"`: means no relevant file found, try again.
- `"approved"`: expense is approved, move to payment processing
- `"needs_revision"`: expense needs changes, send back for revision
- `"rejected"`: expense is denied, end the process
We can wire them:
We can wire them like this:
```
find_relevant_file - "end" >> no_op_node
find_relevant_file - "answer" >> answer_question
find_relevant_file - "retry" >> find_relevant_file
flow = Flow(start=find_relevant_file)
```python
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> end # If rejected, end the process
revise >> review # After revision, go back for another review
payment >> end # After payment, end the process
flow = Flow(start=review)
```
1. If `FindRelevantFile.post()` returns `"answer"`, the flow calls `answer_question`.
2. If `FindRelevantFile.post()` returns `"retry"`, it loops back to itself.
3. If `"end"`, it goes to `no_op_node`. If `no_op_node` has no further transitions, the flow stops.
Let's see how it flows:
1. If `review.post()` returns `"approved"`, the expense moves to `payment` node
2. If `review.post()` returns `"needs_revision"`, it goes to `revise` node, which then loops back to `review`
3. If `review.post()` returns `"rejected"`, it moves to `end` node and stops
```mermaid
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| end[End Process]
revise --> review
payment --> end
```
## Running Individual Nodes vs. Running a Flow
- **`node.run(shared)`**: Just runs that node alone (calls `prep()`, `exec()`, `post()`), returns an Action. **Does not** proceed automatically to the successor. This is mainly for debugging or testing a single node.
- **`node.run(shared)`**: Just runs that node alone (calls `prep()`, `exec()`, `post()`), returns an Action. ⚠️ **Does not** proceed automatically to the successor and may use incorrect parameters. This is mainly for debugging or testing a single node.
- **`flow.run(shared)`**: Executes from the start node, follows Actions to the next node, and so on until the flow cant continue (no next node or no next Action).
Always use `flow.run(...)` in production to ensure the full pipeline runs correctly.
## Nested Flows
A **Flow** can act like a Node. That means you can do:
A **Flow** can act like a Node, which enables powerful composition patterns. This means you can:
```some_flow >> another_node```
or treat `some_flow` as a node inside a larger flow. This helps you compose complex pipelines by nesting smaller flows.
1. Use a flow as a node within another flow's transitions
2. Combine multiple smaller flows into a larger pipeline
3. Create reusable flow components
## Example Code
### Basic Flow Nesting
Below is a short snippet combining these ideas:
Here's how to connect a flow to another node:
```
# Define nodes
find_file = FindRelevantFile()
answer = AnswerQuestion()
no_op = NoOp()
```python
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Define transitions
find_file - "answer" >> answer
find_file - "retry" >> find_file
find_file - "end" >> no_op
# Connect it to another node
subflow >> node_c
# Build the Flow
qa_flow = Flow(start=find_file)
# Run
qa_flow.run(shared)
# Create the parent flow
parent_flow = Flow(start=subflow)
```
When `find_file`s `post()` returns `"answer"`, we proceed to `answer`. If it returns `"retry"`, we loop back. If `"end"`, we move on to `no_op`.
When `parent_flow.run()` executes:
1. It starts `subflow`
2. `subflow` runs through its nodes (`node_a` then `node_b`)
3. After `subflow` completes, execution continues to `node_c`
---
### Example: Order Processing Pipeline
**Thats Flow in a nutshell:**
- **Actions** determine which node runs next.
- **Flow** runs the pipeline from the start node to completion.
- You can chain nodes in a linear sequence or build loops and branches.
- Nodes can themselves be entire flows, allowing nested graph structures.
Here's a practical example that breaks down order processing into nested flows:
```python
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
```
This creates a clean separation of concerns while maintaining a clear execution path:
```mermaid
flowchart TD
subgraph "Payment Flow"
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph "Inventory Flow"
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph "Shipping Flow"
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
Payment Flow --> Inventory Flow
Inventory Flow --> Shipping Flow
```

View File

@ -10,16 +10,20 @@ A [100-line](https://github.com/zachary62/miniLLMFlow/blob/main/minillmflow/__in
We model the LLM workflow as a **Nested Flow**:
- Each **Node** handles a simple LLM task.
- Nodes are chained together to form a **Flow** for more complex tasks.
- One Node can be chained to multiple Nodes based on **Actions**.
- Nodes are chained together to form a **Flow** for compute-intensive tasks.
- One Node can be chained to multiple Nodes through **Actions** as an agent.
- A Flow can be treated as a Node for **Nested Flows**.
- Both Nodes and Flows can be **Batched** for data-intensive tasks.
- Nodes and Flows can be **Async**.
- Nodes and Flows can be **Async** for user inputs.
<div align="center">
<img src="https://github.com/zachary62/miniLLMFlow/blob/main/assets/minillmflow.jpg?raw=true" width="400"/>
</div>
## Preparation
- [LLM Integration](./llm.md)
## Core Abstraction
- [Node](./node.md)

74
docs/llm.md Normal file
View File

@ -0,0 +1,74 @@
---
layout: default
title: "LLM Integration"
nav_order: 3
---
# Call LLM
For your LLM application, implement a function to call LLMs yourself.
You can ask an assistant like ChatGPT or Claude to generate an example.
For instance, asking ChatGPT to "implement a `call_llm` function that takes a prompt and returns the LLM response" gives:
```python
def call_llm(prompt):
from openai import OpenAI
# Set the OpenAI API key (use environment variables, etc.)
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
# Example usage
call_llm("How are you?")
```
## Improvements
You can enhance the function as needed. Examples:
1. Handle chat history:
```python
def call_llm(messages):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4",
messages=messages
)
return r.choices[0].message.content
```
2. Add in-memory caching:
```python
from functools import lru_cache
@lru_cache(maxsize=1000)
def call_llm(prompt):
# Your implementation here
pass
```
3. Enable logging:
```python
def call_llm(prompt):
import logging
logging.info(f"Prompt: {prompt}")
response = ...
logging.info(f"Response: {response}")
return response
```
You can also try libraries like `litellm`
## Why not provide an LLM call function?
I believe it is a bad practice to provide LLM-specific implementations in a general framework:
- LLM APIs change frequently. Hardcoding them makes maintenance difficult.
- You may need flexibility to switch vendors, use fine-tuned models, or deploy local LLMs.
- Custom optimizations like prompt caching, request batching, or response streaming may be required.

View File

@ -1,7 +1,7 @@
---
layout: default
title: "Node"
nav_order: 2
nav_order: 3
---
# Node