diff --git a/docs/_config.yml b/docs/_config.yml index 814a489..1ba1cdc 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -15,4 +15,10 @@ aux_links: - "//github.com/zachary62/miniLLMFlow" # Color scheme -color_scheme: light \ No newline at end of file +color_scheme: light + +mermaid: + version: "9.1.3" # Pick the version you want + # Default configuration + config: | + directionLR \ No newline at end of file diff --git a/docs/async.md b/docs/async.md index e69de29..9d8e98e 100644 --- a/docs/async.md +++ b/docs/async.md @@ -0,0 +1,140 @@ +--- +layout: default +title: "Async" +nav_order: 6 +--- + +# Async + +**Mini LLM Flow** supports **async/await** paradigms for concurrency or parallel workloads. This is particularly useful for: +- Making **concurrent LLM calls** (e.g., if your LLM client library supports async). +- Handling **network I/O** or **external APIs** in an event loop. +- Minimizing **idle** time while waiting for responses, especially in batch operations. + +## 1. AsyncNode + +An **AsyncNode** is like a normal `Node`, except `exec()` (and optionally `prep()`) can be declared **async**. You can `await` inside these methods. For example: + +`` +class AsyncSummarizeFile(AsyncNode): + async def prep(self, shared): + # Possibly do async file reads or small concurrency tasks + filename = self.params["filename"] + return shared["data"].get(filename, "") + + async def exec(self, shared, prep_res): + # Use an async LLM client or other I/O + if not prep_res: + raise ValueError("File content is empty (async).") + + prompt = f"Summarize asynchronously: {prep_res}" + # Suppose call_llm_async is an async function + summary = await call_llm_async(prompt) + return summary + + def post(self, shared, prep_res, exec_res): + # post can remain sync + filename = self.params["filename"] + shared["summary"][filename] = exec_res + return "default" +`` + +- **`prep(shared)`** can be `async def` if you want to do asynchronous pre-processing. +- **`exec(shared, prep_res)`** is typically the main place for async logic. +- **`post`** can stay sync or be async; it’s optional to mark it `async`. + +## 2. AsyncFlow + +An **AsyncFlow** is a Flow where nodes can be **AsyncNode**s or normal `Node`s. You run it in an event loop with `await async_flow.run(shared)`. + +### Minimal Example + +`` +class MyAsyncFlow(AsyncFlow): + pass # Usually, you just instantiate AsyncFlow with a start node + +# Build your nodes +load_data_node = LoadData() # normal Node is OK, too +async_summarize = AsyncSummarizeFile() + +# Connect them +load_data_node >> async_summarize + +my_flow = MyAsyncFlow(start=load_data_node) + +# Running the flow (in an async context): +import asyncio + +async def main(): + shared = {"data": {}, "summary": {}} + await my_flow.run(shared) + +asyncio.run(main()) +`` + +- If the start node or any subsequent node is an `AsyncNode`, the Flow automatically calls its `prep()`, `exec()`, `post()` as async functions. +- You can mix normal `Node`s and `AsyncNode`s in the same flow. **AsyncFlow** will handle the difference seamlessly. + +## 3. BatchAsyncFlow + +If you want to run a batch of flows **concurrently**, you can use `BatchAsyncFlow`. Like `BatchFlow`, it generates a list of parameter sets in `prep()`, but each iteration runs the sub-flow asynchronously. + +`` +class SummarizeAllFilesAsync(BatchAsyncFlow): + async def prep(self, shared): + # Return a list of param dicts (like in BatchFlow), + # but you can do async logic here if needed. + filenames = list(shared["data"].keys()) + return [{"filename": fn} for fn in filenames] + +# Usage: +# Suppose async_summarize_flow is an AsyncFlow that processes a single file. + +all_files_flow = SummarizeAllFilesAsync(start=async_summarize_flow) + +# Then in your async context: +await all_files_flow.run(shared) +`` + +Under the hood: +1. `prep()` returns a list of param sets. +2. `BatchAsyncFlow` processes each **in sequence** by default, but each iteration is still an async run of the sub-flow. +3. If you want **true concurrency** (e.g., launch sub-flows in parallel), you can override methods or manage concurrency at a higher level. + +## 4. Combining Async with Retries & Fault Tolerance + +Just like normal Nodes, an `AsyncNode` can have `max_retries` and a `process_after_fail(...)` method: + +`` +class RetryAsyncNode(AsyncNode): + def __init__(self, max_retries=3): + super().__init__(max_retries=max_retries) + + async def exec(self, shared, prep_res): + # Potentially failing async call + response = await async_api_call(...) + return response + + def process_after_fail(self, shared, prep_res, exc): + # Provide fallback response + return "Unable to complete async call due to error." +`` + +## 5. Best Practices + +1. **Ensure Your LLM Client or I/O Library is Async-Aware** + - For truly concurrent calls, your LLM or HTTP client must support async. Otherwise, you gain little from using `AsyncNode`. +2. **Manage Rate Limits** + - Parallelizing many calls can hit LLM rate limits. Consider adding semaphores or concurrency checks. +3. **Use `asyncio.gather` If Needed** + - If you want multiple async calls in the same node, you can await them concurrently with `asyncio.gather`. +4. **Check Exceptions** + - If one call fails, how does it affect your flow? Decide if you want to retry or fallback. + +## 6. Summary + +- **AsyncNode**: A Node that supports `async def prep()/exec()/post()`. +- **AsyncFlow**: Orchestrates normal + async nodes. Run via `await flow.run(shared)`. +- **BatchAsyncFlow**: Repeats an AsyncFlow for multiple parameter sets, each iteration in an async manner. + +By taking advantage of Python’s `asyncio` and the same minimal design, you can scale up your LLM or I/O tasks without blocking, making **Mini LLM Flow** suitable for high-throughput or high-latency scenarios. diff --git a/docs/batch.md b/docs/batch.md index e69de29..12b70a4 100644 --- a/docs/batch.md +++ b/docs/batch.md @@ -0,0 +1,177 @@ +--- +layout: default +title: "Batch" +nav_order: 5 +--- + +# Batch + +**Batch** functionality in Mini LLM Flow makes it easier to handle a **list** of items in one Node or **rerun** a Flow multiple times. This is particularly useful for: + +- **Chunk-based processing** (e.g., summarizing large texts in parts). +- **Multi-file** processing. +- **Iterating** over lists of parameters (e.g., user queries, documents, or URLs). + +## 1. BatchNode + +A **BatchNode** extends `Node` but changes how `prep()` and `exec()` behave: + +- **`prep(shared)`**: Should return an **iterable** (list, generator, etc.) of items. +- **`exec(shared, item)`**: Is called **once per item** in that iterable. +- **`post(shared, prep_res, exec_res_list)`**: Receives a **list** of results from all the `exec()` calls. You can combine or store them. + +### Example: Map Summaries + +`` +class MapSummaries(BatchNode): + def prep(self, shared): + # Suppose we have a big file; we want to chunk it + content = shared["data"].get("large_text.txt", "") + chunk_size = 10000 + chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] + # Return this list. The exec() method will be called once per chunk + return chunks + + def exec(self, shared, chunk): + prompt = f"Summarize this chunk in 10 words: {chunk}" + summary = call_llm(prompt) + return summary + + def post(self, shared, prep_res, exec_res_list): + # prep_res is the list of chunks + # exec_res_list is the list of summaries from each chunk + combined = "\n".join(exec_res_list) + shared["summary"]["large_text.txt"] = combined + return "default" +`` + +**Flow** usage: +`` +map_summaries = MapSummaries() +flow = Flow(start=map_summaries) +flow.run(shared) +`` + +- After `prep()` returns multiple chunks, `exec()` is called for each chunk. +- The aggregated `exec_res_list` is passed to `post()`, where you can do final processing. + +### Key Differences from a Normal Node + +1. **`exec()`** is called once per item returned by `prep()`. +2. The final **output** of `exec()` calls is collected into a list and given to `post()`. +3. `post()` still returns an **action**—just like a regular Node. + +--- + +## 2. BatchFlow + +A **BatchFlow** runs a **Flow** multiple times, each time with a different set of `params`. You can think of it as a loop that replays the Flow for each parameter set. + +### Example: Summarize Many Files + +`` +class SummarizeAllFiles(BatchFlow): + def prep(self, shared): + # Return a list of parameter dicts (one per file) + filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] + params_list = [{"filename": fn} for fn in filenames] + return params_list + + # No custom exec() or post(), so we rely on BatchFlow’s default +`` + +Then define a **Flow** that handles **one** file. Suppose we have `Flow(start=summarize_file)`. + +`` +# Example "per-file" flow (just one node): +summarize_file = SummarizeFile() + +# Or possibly something more elaborate: +# load_file >> summarize >> reduce etc. + +# Then we wrap it into a BatchFlow: +summarize_all_files = SummarizeAllFiles(start=summarize_file) + +# Running it: +summarize_all_files.run(shared) +`` + +**Under the hood**: +1. `prep(shared)` in `SummarizeAllFiles` returns a list of param dicts, e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. +2. The BatchFlow **iterates** over these param dicts. For each one, it sets the params on the sub-Flow (in this case, `summarize_file` or a bigger flow) and calls `flow.run(shared)`. +3. Once done, you have run the same Flow for each item. + +### Nested or Multi-level Batches + +You could nest a BatchFlow inside another BatchFlow. For instance, if you wanted to: + +- Outer batch: iterate over directories (Flow that enumerates files in each directory). +- Inner batch: summarize each file in that directory. + +This can be done by making the **outer** BatchFlow’s `exec()` return a list of files, which triggers the **inner** BatchFlow each time. For most simpler use cases, a single BatchFlow is enough. + +--- + +## 3. Best Practices & Tips + +1. **Plan your Input**: For a BatchNode, design `prep()` to yield only the minimal necessary data (e.g., text chunks). +2. **Aggregating Results**: `post()` is the place to combine partial results from `exec_res_list`. +3. **Large Batches**: If you have **thousands of items**, consider processing in chunks (e.g., yield 100 items at a time) or using an **Async** approach for concurrency. +4. **Hierarchy**: + - **BatchNode** is good for a single-step repeated operation (e.g., chunk-based summarization). + - **BatchFlow** is good if you have a **multi-step** process you want to repeat for a list of parameters. + +--- + +## 4. Putting It All Together + +`` +# We'll combine the ideas: +class MapSummaries(BatchNode): + def prep(self, shared): + content = shared["data"].get("bigfile.txt", "") + chunk_size = 10000 + return [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] + + def exec(self, shared, chunk): + return call_llm(f"Summarize chunk: {chunk}") + + def post(self, shared, prep_res, exec_res_list): + combined = "\n".join(exec_res_list) + shared["summary"]["bigfile.txt"] = combined + return "default" + +map_summaries_node = MapSummaries() +map_flow = Flow(start=map_summaries_node) + +# If we want to do the above for multiple big files in shared['data']: +class SummarizeAllFiles(BatchFlow): + def prep(self, shared): + # Generate param dicts, each specifying a file + return [{"filename": fn} for fn in shared["data"]] + +# But to handle chunking inside the Flow, we might do: +# 1) A node that sets a param "filename" in a shared place +# 2) Or combine logic differently. + +# For now, let's just show usage: +summarize_all = SummarizeAllFiles(start=map_flow) +summarize_all.run(shared) +`` + +In this snippet: + +- `MapSummaries` is a `BatchNode` that chunk-summarizes one file. +- `map_flow` is a `Flow` with that single BatchNode. +- `SummarizeAllFiles` is a `BatchFlow` that runs `map_flow` for every file in `shared["data"]`. + +**Result**: Each file is chunked by `MapSummaries`, and you get a summary for each. + +--- + +## Summary + +- **BatchNode**: Single-step repetition. `prep()` returns a list, `exec()` is called once per item, `post()` aggregates results. +- **BatchFlow**: Repeatedly runs a Flow with different params. Great for multi-step or nested processes. + +By mixing these two patterns, you can easily handle **large data** or **multiple inputs** in a streamlined, scalable way. diff --git a/docs/communication.md b/docs/communication.md index e69de29..cefe077 100644 --- a/docs/communication.md +++ b/docs/communication.md @@ -0,0 +1,177 @@ +--- +layout: default +title: "Communication" +nav_order: 4 +--- + +# Communication + +In **Mini LLM Flow**, Nodes and Flows **communicate** with each other in two ways: + +1. **Shared Store** – A global data structure (often a Python dict) that every Node can read from and write to. +2. **Params** – Small pieces of metadata or configuration, set on each Node or Flow, typically used to identify items or tweak behavior. + +This design avoids complex message-passing or data routing. It also lets you **nest** Flows easily without having to manage multiple channels. + +--- + +## 1. Shared Store + +### Overview + +A shared store is typically a Python dictionary, like: +`` +shared = {"data": {}, "summary": {}, "config": { ... }, ...} +`` + +Every Node’s `prep()`, `exec()`, and `post()` methods receive the **same** `shared` object. This makes it easy to: +- Read data that another Node loaded, such as a text file or database record. +- Write results for later Nodes to consume. +- Maintain consistent state across the entire Flow. + +### Example + +`` +class LoadData(Node): + def prep(self, shared): + # Suppose we read from disk or an API + shared["data"]["my_file.txt"] = "Some text content" + return None + + def exec(self, shared, prep_res): + # Not doing anything special here + return None + + def post(self, shared, prep_res, exec_res): + return "default" + +class Summarize(Node): + def prep(self, shared): + # We can read what LoadData wrote + content = shared["data"].get("my_file.txt", "") + return content + + def exec(self, shared, prep_res): + prompt = f"Summarize: {prep_res}" + summary = call_llm(prompt) + return summary + + def post(self, shared, prep_res, exec_res): + shared["summary"]["my_file.txt"] = exec_res + return "default" +`` + +Here, +- `LoadData` writes to `shared["data"]`. +- `Summarize` reads from the same location. +No special data-passing code—just the same `shared` object. + +### Why Not Message Passing? + +**Message-passing** can be great for simple DAGs, but with **nested graphs** (Flows containing Flows, repeated or cyclic calls), routing messages can become complicated. A shared store keeps the design simpler and easier to debug. + +--- + +## 2. Params + +**Params** let you store **per-Node** or **per-Flow** configuration that does **not** need to be in the global store. They are: +- **Immutable** during a Node’s run cycle (i.e., don’t change mid-run). +- **Set** via `set_params()`. +- **Cleared** or updated each time you call the Flow or Node again. + +Common examples: +- **File names** to process. +- **Model hyperparameters** for an LLM call. +- **API credentials** or specialized flags. + +### Example + +`` +# 1) Create a Node that uses params +class SummarizeFile(Node): + def prep(self, shared): + # Access the node's param + filename = self.params["filename"] + return shared["data"].get(filename, "") + + def exec(self, shared, prep_res): + prompt = f"Summarize: {prep_res}" + return call_llm(prompt) + + def post(self, shared, prep_res, exec_res): + filename = self.params["filename"] + shared["summary"][filename] = exec_res + return "default" + +# 2) Set params +node = SummarizeFile() +node.set_params({"filename": "doc1.txt"}) + +# 3) Run +node.run(shared) +`` + +Because **params** are only for that Node, you don’t pollute the global `shared` with fields that might only matter to one operation. + +--- + +## 3. Shared Store vs. Params + +- **Shared Store**: + - Public, global. + - Great for data results, large content, or anything multiple nodes need. + - Must be carefully structured (like designing a mini schema). + +- **Params**: + - Local, ephemeral config for a single node or flow execution. + - Perfect for small values such as filenames or numeric IDs. + - Does **not** persist across different nodes unless specifically copied into `shared`. + +--- + +## 4. Best Practices + +1. **Design a Clear `shared` Schema** + - Decide on keys upfront. Example: `shared["data"]` for raw data, `shared["summary"]` for results, etc. + +2. **Use Params for Identifiers / Config** + - If you need to pass a single ID or filename to a Node, **params** are usually best. + +3. **Don’t Overuse the Shared Store** + - Keep it tidy. If a piece of data only matters to one Node, consider using `params` or discarding it after usage. + +4. **Ensure `shared` Is Accessible** + - If you switch from an in-memory dict to a database or file-based approach, the Node code can remain the same as long as your `shared` interface is consistent. + +--- + +## Putting It All Together + +`` +# Suppose you have a flow: +load_data >> summarize_file +my_flow = Flow(start=load_data) + +# Example usage: +load_data.set_params({"path": "path/to/data/folder"}) # local param for load_data +summarize_file.set_params({"filename": "my_text.txt"}) # local param for summarize_file + +# shared store +shared = { + "data": {}, + "summary": {} +} + +my_flow.run(shared) +# After run, shared["summary"]["my_text.txt"] might have the LLM summary +`` + +- `load_data` uses its param (`"path"`) to load some data into `shared["data"]`. +- `summarize_file` uses its param (`"filename"`) to pick which file from `shared["data"]` to summarize. +- They share results via `shared["summary"]`. + +That’s the **Mini LLM Flow** approach to communication: +- **A single shared store** to handle large data or results for multiple Nodes. +- **Per-node params** for minimal configuration and identification. + +Use these patterns to build powerful, modular LLM pipelines with minimal overhead. diff --git a/docs/flow.md b/docs/flow.md index b1eebde..328bbcf 100644 --- a/docs/flow.md +++ b/docs/flow.md @@ -1,7 +1,151 @@ --- layout: default title: "Flow" +nav_order: 2 --- # Flow +In **Mini LLM Flow**, a **Flow** orchestrates how Nodes connect and run, based on **Actions** returned from each Node’s `post()` method. You can chain Nodes in a sequence or create branching logic depending on the **Action** string. + +## Action-based Transitions + +Each Node’s `post(shared, prep_res, exec_res)` returns a string called **Action**. By default, if `post()` doesn’t explicitly return anything, we treat that as `"default"`. + +You define transitions with the syntax: + +```python +node_a >> node_b +``` +- This means if `node_a.post()` returns `"default"` (or `None`), go to `node_b`. + +```python +node_a - "action_name" >> node_b +``` +- This means if `node_a.post()` returns `"action_name"`, go to `node_b`. + +It’s possible to create loops, branching, or multi-step flows. You can also chain with multiple Actions from a single node to different successors: + +```python +# Define nodes for order processing +validate_order = ValidateOrderNode() +check_inventory = CheckInventoryNode() +process_payment = ProcessPaymentNode() +send_confirmation = SendConfirmationNode() +notify_backorder = NotifyBackorderNode() + +# Define the flow +validate_order - "valid" >> check_inventory +validate_order - "invalid" >> send_confirmation # Send rejection confirmation + +check_inventory - "in_stock" >> process_payment +check_inventory - "out_of_stock" >> notify_backorder + +process_payment - "success" >> send_confirmation +process_payment - "failure" >> send_confirmation # Send payment failure notice +``` + +```mermaid +flowchart TD + validate[Validate Order] -->|valid| inventory[Check Inventory] + validate -->|invalid| confirm[Send Confirmation] + + inventory -->|in_stock| payment[Process Payment] + inventory -->|out_of_stock| backorder[Notify Backorder] + + payment -->|success| confirm + payment -->|failure| confirm + + style validate fill:#d4f1f9 + style confirm fill:#d4f1f9 +``` + +## Creating a Flow + +A **Flow** begins with a **start** node (or flow). You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the first node, looks at its `post()` return Action, follows the corresponding transition, and continues until there’s no next node or you explicitly stop. + +```flow = Flow(start=node_a)``` + + + +## Example: Simple Sequence + +Here’s a minimal flow of two nodes in a chain: + +```python +node_a >> node_b +flow = Flow(start=node_a) +flow.run(shared) +``` + +- When you run the flow, it executes `node_a`. +- Suppose `node_a.post()` returns `"default"`. +- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. +- If `node_b.post()` returns `"default"` but we didn’t define `node_b >> something_else`, the flow ends there. + +## Example: Branching & Looping + +Suppose `FindRelevantFile` can return three possible Actions in its `post()`: + +- `"end"`: means no question, so stop. +- `"answer"`: means we have a relevant file, move to `AnswerQuestion`. +- `"retry"`: means no relevant file found, try again. + +We can wire them: + +``` +find_relevant_file - "end" >> no_op_node +find_relevant_file - "answer" >> answer_question +find_relevant_file - "retry" >> find_relevant_file +flow = Flow(start=find_relevant_file) +``` + +1. If `FindRelevantFile.post()` returns `"answer"`, the flow calls `answer_question`. +2. If `FindRelevantFile.post()` returns `"retry"`, it loops back to itself. +3. If `"end"`, it goes to `no_op_node`. If `no_op_node` has no further transitions, the flow stops. + +## Running Individual Nodes vs. Running a Flow + +- **`node.run(shared)`**: Just runs that node alone (calls `prep()`, `exec()`, `post()`), returns an Action. **Does not** proceed automatically to the successor. This is mainly for debugging or testing a single node. +- **`flow.run(shared)`**: Executes from the start node, follows Actions to the next node, and so on until the flow can’t continue (no next node or no next Action). + +Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. + +## Nested Flows + +A **Flow** can act like a Node. That means you can do: + +```some_flow >> another_node``` +or treat `some_flow` as a node inside a larger flow. This helps you compose complex pipelines by nesting smaller flows. + +## Example Code + +Below is a short snippet combining these ideas: + +``` +# Define nodes +find_file = FindRelevantFile() +answer = AnswerQuestion() +no_op = NoOp() + +# Define transitions +find_file - "answer" >> answer +find_file - "retry" >> find_file +find_file - "end" >> no_op + +# Build the Flow +qa_flow = Flow(start=find_file) + +# Run +qa_flow.run(shared) +``` + +When `find_file`’s `post()` returns `"answer"`, we proceed to `answer`. If it returns `"retry"`, we loop back. If `"end"`, we move on to `no_op`. + +--- + +**That’s Flow in a nutshell:** +- **Actions** determine which node runs next. +- **Flow** runs the pipeline from the start node to completion. +- You can chain nodes in a linear sequence or build loops and branches. +- Nodes can themselves be entire flows, allowing nested graph structures. \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 9ace43d..9cd7db0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -6,7 +6,7 @@ nav_order: 1 # Mini LLM Flow -A 100-line minimalist LLM framework for agents, task decomposition, RAG, etc. +A [100-line](https://github.com/zachary62/miniLLMFlow/blob/main/minillmflow/__init__.py) minimalist LLM framework for agents, task decomposition, RAG, etc. We model the LLM workflow as a **Nested Flow**: - Each **Node** handles a simple LLM task. diff --git a/docs/node.md b/docs/node.md index 614cbca..30d9e1b 100644 --- a/docs/node.md +++ b/docs/node.md @@ -68,5 +68,15 @@ class SummarizeFile(Node): filename = self.params["filename"] shared["summary"][filename] = exec_res # Return "default" by not returning anything + +summarize_node = SummarizeFile(max_retries=3) + +# Run the node standalone for testing (calls prep->exec->post). +# If exec() fails, it retries up to 3 times before calling process_after_fail(). +summarize_node.set_params({"filename": "test_file.txt"}) +action_result = summarize_node.run(shared) + +print("Action returned:", action_result) # Usually "default" +print("Summary stored:", shared["summary"].get("test_file.txt")) ```