diff --git a/docs/_config.yml b/docs/_config.yml index ee6af41..5c1855a 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -5,6 +5,9 @@ description: Minimalist LLM Framework in 100 Lines, Enabling LLMs to Program The # Theme settings remote_theme: just-the-docs/just-the-docs +# Navigation +nav_sort: case_sensitive + # Aux links (shown in upper right) aux_links: "View on GitHub": @@ -17,10 +20,4 @@ mermaid: version: "9.1.3" # Pick the version you want # Default configuration config: | - directionLR - - -callouts: - warning: - title: Warning - color: red + directionLR \ No newline at end of file diff --git a/docs/async.md b/docs/async.md new file mode 100644 index 0000000..c9d265a --- /dev/null +++ b/docs/async.md @@ -0,0 +1,61 @@ +--- +layout: default +title: "(Advanced) Async" +parent: "Core Abstraction" +nav_order: 5 +--- + +# (Advanced) Async + +**Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: + +1. **prep_async()** + - For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. + +2. **exec_async()** + - Typically used for async LLM calls. + +3. **post_async()** + - For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. + + +**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. + +### Example + +```python +class SummarizeThenVerify(AsyncNode): + async def prep_async(self, shared): + # Example: read a file asynchronously + doc_text = await read_file_async(shared["doc_path"]) + return doc_text + + async def exec_async(self, prep_res): + # Example: async LLM call + summary = await call_llm_async(f"Summarize: {prep_res}") + return summary + + async def post_async(self, shared, prep_res, exec_res): + # Example: wait for user feedback + decision = await gather_user_feedback(exec_res) + if decision == "approve": + shared["summary"] = exec_res + return "approve" + return "deny" + +summarize_node = SummarizeThenVerify() +final_node = Finalize() + +# Define transitions +summarize_node - "approve" >> final_node +summarize_node - "deny" >> summarize_node # retry + +flow = AsyncFlow(start=summarize_node) + +async def main(): + shared = {"doc_path": "document.txt"} + await flow.run_async(shared) + print("Final Summary:", shared.get("summary")) + +asyncio.run(main()) +``` \ No newline at end of file diff --git a/docs/batch.md b/docs/batch.md new file mode 100644 index 0000000..b263845 --- /dev/null +++ b/docs/batch.md @@ -0,0 +1,107 @@ +--- +layout: default +title: "Batch" +parent: "Core Abstraction" +nav_order: 4 +--- + +# Batch + +**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Handy for: +- **Chunk-based** processing (e.g., splitting large texts). +- **Multi-file** processing. +- **Iterating** over lists of params (e.g., user queries, documents, URLs). + +## 1. BatchNode + +A **BatchNode** extends `Node` but changes `prep()` and `exec()`: + +- **`prep(shared)`**: returns an **iterable** (e.g., list, generator). +- **`exec(item)`**: called **once** per item in that iterable. +- **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. + + +### Example: Summarize a Large File + +```python +class MapSummaries(BatchNode): + def prep(self, shared): + # Suppose we have a big file; chunk it + content = shared["data"].get("large_text.txt", "") + chunk_size = 10000 + chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] + return chunks + + def exec(self, chunk): + prompt = f"Summarize this chunk in 10 words: {chunk}" + summary = call_llm(prompt) + return summary + + def post(self, shared, prep_res, exec_res_list): + combined = "\n".join(exec_res_list) + shared["summary"]["large_text.txt"] = combined + return "default" + +map_summaries = MapSummaries() +flow = Flow(start=map_summaries) +flow.run(shared) +``` + +--- + +## 2. BatchFlow + +A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. + + +### Example: Summarize Many Files + +```python +class SummarizeAllFiles(BatchFlow): + def prep(self, shared): + # Return a list of param dicts (one per file) + filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] + return [{"filename": fn} for fn in filenames] + +# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): +summarize_file = SummarizeFile(start=load_file) + +# Wrap that flow into a BatchFlow: +summarize_all_files = SummarizeAllFiles(start=summarize_file) +summarize_all_files.run(shared) +``` + +### Under the Hood +1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. +2. The **BatchFlow** loops through each dict. For each one: + - It merges the dict with the BatchFlow’s own `params`. + - It calls `flow.run(shared)` using the merged result. +3. This means the sub-Flow is run **repeatedly**, once for every param dict. + +--- + +## 3. Nested or Multi-Level Batches + +You can nest a **BatchFlow** in another **BatchFlow**. For instance: +- **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). +- **Inner** batch: returning a list of per-file param dicts. + +At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. + +```python + +class FileBatchFlow(BatchFlow): + def prep(self, shared): + directory = self.params["directory"] + files = [f for f in os.listdir(directory) if f.endswith(".txt")] + return [{"filename": f} for f in files] + +class DirectoryBatchFlow(BatchFlow): + def prep(self, shared): + directories = [ "/path/to/dirA", "/path/to/dirB"] + return [{"directory": d} for d in directories] + + +inner_flow = FileBatchFlow(start=MapSummaries()) +outer_flow = DirectoryBatchFlow(start=inner_flow) +``` \ No newline at end of file diff --git a/docs/communication.md b/docs/communication.md new file mode 100644 index 0000000..ebba2e2 --- /dev/null +++ b/docs/communication.md @@ -0,0 +1,138 @@ +--- +layout: default +title: "Communication" +parent: "Core Abstraction" +nav_order: 3 +--- + +# Communication + +Nodes and Flows **communicate** in two ways: + +1. **Shared Store** – A global data structure (often an in-mem dict) that all nodes can read from and write to. Every Node’s `prep()` and `post()` methods receive the **same** `shared` store. +2. **Params** – Each node and Flow has a `params` dict assigned by the **parent Flow**. Params mostly serve as identifiers, letting each node/flow know what task it’s assigned. + +If you know memory management, **Shared Store** is like a **heap** shared across function calls, while **Params** is like a **stack** assigned by parent function calls. + + + +> **Why not use other communication models like Message Passing?** *Message passing* works well for simple DAGs, but with *nested graphs* (Flows containing Flows, repeated or cyclic calls), routing messages becomes hard to maintain. A shared store keeps the design simple and easy. +{: .note } + +--- + +## 1. Shared Store + +### Overview + +A shared store is typically an in-mem dictionary, like: +```python +shared = {"data": {}, "summary": {}, "config": {...}, ...} +``` + +It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. + +### Example + +```python +class LoadData(Node): + def prep(self, shared): + # Suppose we read from disk or an API + shared["data"]["my_file.txt"] = "Some text content" + return None + +class Summarize(Node): + def prep(self, shared): + # We can read what LoadData wrote + content = shared["data"].get("my_file.txt", "") + return content + + def exec(self, prep_res): + prompt = f"Summarize: {prep_res}" + summary = call_llm(prompt) + return summary + + def post(self, shared, prep_res, exec_res): + shared["summary"]["my_file.txt"] = exec_res + return "default" + +load_data = LoadData() +summarize = Summarize() +load_data >> summarize +flow = Flow(start=load_data) + +shared = {} +flow.run(shared) +``` + +Here: +- `LoadData` writes to `shared["data"]`. +- `Summarize` reads from the same location. +No special data-passing—just the same `shared` object. + +--- + +## 2. Params + +**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: +- **Immutable** during a Node’s run cycle (i.e., they don’t change mid-`prep`, `exec`, `post`). +- **Set** via `set_params()`. +- **Cleared** and updated each time a parent Flow calls it. + + +> Only set the uppermost Flow params because others will be overwritten by the parent Flow. If you need to set child node params, see [Batch](./batch.md). +{: .warning } + +Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. + +### Example + +```python +# 1) Create a Node that uses params +class SummarizeFile(Node): + def prep(self, shared): + # Access the node's param + filename = self.params["filename"] + return shared["data"].get(filename, "") + + def exec(self, prep_res): + prompt = f"Summarize: {prep_res}" + return call_llm(prompt) + + def post(self, shared, prep_res, exec_res): + filename = self.params["filename"] + shared["summary"][filename] = exec_res + return "default" + +# 2) Set params +node = SummarizeFile() + +# 3) Set Node params directly (for testing) +node.set_params({"filename": "doc1.txt"}) +node.run(shared) + +# 4) Create Flow +flow = Flow(start=node) + +# 5) Set Flow params (overwrites node params) +flow.set_params({"filename": "doc2.txt"}) +flow.run(shared) # The node summarizes doc2, not doc1 +``` + +--- + +## 3. Shared Store vs. Params + +Think of the **Shared Store** like a heap and **Params** like a stack. + +- **Shared Store**: + - Public, global. + - You can design and populate ahead, e.g., for the input to process. + - Great for data results, large content, or anything multiple nodes need. + - Keep it tidy—structure it carefully (like a mini schema). + +- **Params**: + - Local, ephemeral. + - Passed in by parent Flows. You should only set it for the uppermost flow. + - Perfect for small values like filenames or numeric IDs. + - Do **not** persist across different nodes and are reset. \ No newline at end of file diff --git a/docs/core_abstraction.md b/docs/core_abstraction.md new file mode 100644 index 0000000..6043574 --- /dev/null +++ b/docs/core_abstraction.md @@ -0,0 +1,6 @@ +--- +layout: default +title: "Core Abstraction" +nav_order: 2 +has_children: true +--- \ No newline at end of file diff --git a/docs/flow.md b/docs/flow.md new file mode 100644 index 0000000..6ac6208 --- /dev/null +++ b/docs/flow.md @@ -0,0 +1,171 @@ +--- +layout: default +title: "Flow" +parent: "Core Abstraction" +nav_order: 2 +--- + +# Flow + +A **Flow** orchestrates how Nodes connect and run, based on **Actions** returned from each Node’s `post()` method. You can chain Nodes in a sequence or create branching logic depending on the **Action** string. + +## 1. Action-based Transitions + +Each Node's `post(shared, prep_res, exec_res)` method returns an **Action** string. By default, if `post()` doesn't explicitly return anything, we treat that as `"default"`. + +You define transitions with the syntax: + +1. Basic default transition: `node_a >> node_b` + This means if `node_a.post()` returns `"default"` (or `None`), go to `node_b`. + (Equivalent to `node_a - "default" >> node_b`) + +2. Named action transition: `node_a - "action_name" >> node_b` + This means if `node_a.post()` returns `"action_name"`, go to `node_b`. + +It’s possible to create loops, branching, or multi-step flows. + +## 2. Creating a Flow + +A **Flow** begins with a **start** node (or flow). You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the first node, looks at its `post()` return Action, follows the corresponding transition, and continues until there’s no next node or you explicitly stop. + +### Example: Simple Sequence + +Here’s a minimal flow of two nodes in a chain: + +```python +node_a >> node_b +flow = Flow(start=node_a) +flow.run(shared) +``` + +- When you run the flow, it executes `node_a`. +- Suppose `node_a.post()` returns `"default"`. +- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. +- If `node_b.post()` returns `"default"` but we didn’t define `node_b >> something_else`, the flow ends there. + +### Example: Branching & Looping + +Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: + +- `"approved"`: expense is approved, move to payment processing +- `"needs_revision"`: expense needs changes, send back for revision +- `"rejected"`: expense is denied, finish the process + +We can wire them like this: + +```python +# Define the flow connections +review - "approved" >> payment # If approved, process payment +review - "needs_revision" >> revise # If needs changes, go to revision +review - "rejected" >> finish # If rejected, finish the process + +revise >> review # After revision, go back for another review +payment >> finish # After payment, finish the process + +flow = Flow(start=review) +``` + +Let's see how it flows: + +1. If `review.post()` returns `"approved"`, the expense moves to `payment` node +2. If `review.post()` returns `"needs_revision"`, it goes to `revise` node, which then loops back to `review` +3. If `review.post()` returns `"rejected"`, it moves to `finish` node and stops + +```mermaid +flowchart TD + review[Review Expense] -->|approved| payment[Process Payment] + review -->|needs_revision| revise[Revise Report] + review -->|rejected| finish[Finish Process] + + revise --> review + payment --> finish +``` + +### Running Individual Nodes vs. Running a Flow + +- `node.run(shared)`: Just runs that node alone (calls `prep()`, `exec()`, `post()`), returns an Action. +- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can’t continue (no next node or no next Action). + + +> node.run(shared) **does not** proceed automatically to the successor and may use incorrect parameters. +> This is mainly for debugging or testing a single node. +> Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. +{: .warning } + +## 3. Nested Flows + +A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: + +1. Use a Flow as a Node within another Flow's transitions. +2. Combine multiple smaller Flows into a larger Flow for reuse. +3. Node `params` will be a merging of **all** parents' `params`. + +### Basic Flow Nesting + +Here's how to connect a flow to another node: + +```python +# Create a sub-flow +node_a >> node_b +subflow = Flow(start=node_a) + +# Connect it to another node +subflow >> node_c + +# Create the parent flow +parent_flow = Flow(start=subflow) +``` + +When `parent_flow.run()` executes: +1. It starts `subflow` +2. `subflow` runs through its nodes (`node_a` then `node_b`) +3. After `subflow` completes, execution continues to `node_c` + +### Example: Order Processing Pipeline + +Here's a practical example that breaks down order processing into nested flows: + +```python +# Payment processing sub-flow +validate_payment >> process_payment >> payment_confirmation +payment_flow = Flow(start=validate_payment) + +# Inventory sub-flow +check_stock >> reserve_items >> update_inventory +inventory_flow = Flow(start=check_stock) + +# Shipping sub-flow +create_label >> assign_carrier >> schedule_pickup +shipping_flow = Flow(start=create_label) + +# Connect the flows into a main order pipeline +payment_flow >> inventory_flow >> shipping_flow + +# Create the master flow +order_pipeline = Flow(start=payment_flow) + +# Run the entire pipeline +order_pipeline.run(shared_data) +``` + +This creates a clean separation of concerns while maintaining a clear execution path: + +```mermaid +flowchart LR + subgraph order_pipeline[Order Pipeline] + subgraph paymentFlow["Payment Flow"] + A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] + end + + subgraph inventoryFlow["Inventory Flow"] + D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] + end + + subgraph shippingFlow["Shipping Flow"] + G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] + end + + paymentFlow --> inventoryFlow + inventoryFlow --> shippingFlow + end +``` diff --git a/docs/index.md b/docs/index.md index bc50f0a..7099c1f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,14 +4,60 @@ title: "Home" nav_order: 1 --- -> **Heads up!** This is a note callout. +# Mini LLM Flow + +A [100-line](https://github.com/zachary62/miniLLMFlow/blob/main/minillmflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. + + +We model the LLM workflow as a **Nested Directed Graph**: +- **Nodes** handle simple (LLM) tasks. +- Nodes connect through **Actions** (labeled edges) for *Agents*. +- **Flows** orchestrate a directed graph of Nodes for *Task Decomposition*. +- A Flow can be used as a Node (for **Nesting**). +- **Batch** Nodes/Flows for data-intensive tasks. +- **Async** Nodes/Flows allow waits or **Parallel** execution + + +
+ +
+ + {: .note } +> Have questions? Chat with [AI Assistant](https://chatgpt.com/g/g-677464af36588191b9eba4901946557b-mini-llm-flow-assistant) + + + +## Core Abstraction + +- [Node](./node.md) +- [Flow](./flow.md) +- [Communication](./communication.md) +- [Batch](./batch.md) +- [(Advanced) Async](./async.md) +- [(Advanced) Parallel](./parallel.md) + +## Low-Level Details + +- [LLM Wrapper](./llm.md) +- [Tool](./tool.md) -> This is a tip callout. -{: .tip } -> This is a warning callout. {: .warning } +> We do not provide built-in implementation for low-level details. Example implementations are provided as reference. -> This is an important callout. -{: .important } \ No newline at end of file + +## High-Level Paradigm + +- [Structured Output](./structure.md) +- Task Decomposition +- Map Reduce +- RAG +- Chat Memory +- Agent +- Multi-Agent +- Evaluation + +## Example Projects + +- Coming soon ... diff --git a/docs/llm.md b/docs/llm.md new file mode 100644 index 0000000..789cd06 --- /dev/null +++ b/docs/llm.md @@ -0,0 +1,69 @@ +--- +layout: default +title: "LLM Wrapper" +parent: "Preparation" +nav_order: 1 +--- + +# LLM Wrappers + +We **don't** provide built-in LLM wrappers. Instead, please implement your own, for example by asking an assistant like ChatGPT or Claude. If you ask ChatGPT to "implement a `call_llm` function that takes a prompt and returns the LLM response," you shall get something like: + +```python +def call_llm(prompt): + from openai import OpenAI + # Set the OpenAI API key (use environment variables, etc.) + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + +# Example usage +call_llm("How are you?") +``` + +## Improvements +Feel free to enhance your `call_llm` function as needed. Here are examples: + +- Handle chat history: + +```python +def call_llm(messages): + from openai import OpenAI + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4", + messages=messages + ) + return r.choices[0].message.content +``` + +- Add in-memory caching: + +```python +from functools import lru_cache + +@lru_cache(maxsize=1000) +def call_llm(prompt): + # Your implementation here + pass +``` + +- Enable logging: + +```python +def call_llm(prompt): + import logging + logging.info(f"Prompt: {prompt}") + response = ... # Your implementation here + logging.info(f"Response: {response}") + return response +``` + +## Why Not Provide Built-in LLM Wrappers? +I believe it is a **bad practice** to provide LLM-specific implementations in a general framework: +- **LLM APIs change frequently**. Hardcoding them makes maintenance a nighmare. +- You may need **flexibility** to switch vendors, use fine-tuned models, or deploy local LLMs. +- You may need **optimizations** like prompt caching, request batching, or response streaming. diff --git a/docs/node.md b/docs/node.md new file mode 100644 index 0000000..aeaa61e --- /dev/null +++ b/docs/node.md @@ -0,0 +1,95 @@ +--- +layout: default +title: "Node" +parent: "Core Abstraction" +nav_order: 1 +--- + +# Node + +A **Node** is the smallest building block of Mini LLM Flow. Each Node has 3 steps: + +1. **`prep(shared)`** + - Reads and preprocesses data from the `shared` store for LLMs. + - Examples: *query DB, read files, or serialize data into a string*. + - Returns `prep_res`, which will be passed to both `exec()` and `post()`. + +2. **`exec(prep_res)`** + - The main execution step where the LLM is called. + - Optionally has built-in retry and error handling (below). + - ⚠️ If retry enabled, ensure implementation is idempotent. + - Returns `exec_res`, which is passed to `post()`. + +3. **`post(shared, prep_res, exec_res)`** + - Writes results back to the `shared` store or decides the next action. + - Examples: *finalize outputs, trigger next steps, or log results*. + - Returns a **string** to specify the next action (`"default"` if nothing or `None` is returned). + + +> All 3 steps are optional. For example, you might only need to run the Prep without calling the LLM. +{: .note } + + +## Fault Tolerance & Retries + +Nodes in Mini LLM Flow can **retry** execution if `exec()` raises an exception. You control this via two parameters when you create the Node: + +- `max_retries` (int): How many times to try running `exec()`. The default is `1`, which means **no** retry. +- `wait` (int): The time to wait (in **seconds**) before each retry attempt. By default, `wait=0` (i.e., no waiting). Increasing this is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. + +```python +my_node = SummarizeFile(max_retries=3, wait=10) +``` + +When an exception occurs in `exec()`, the Node automatically retries until: + +- It either succeeds, or +- The Node has retried `max_retries - 1` times already and fails on the last attempt. + +### Graceful Fallback + +If you want to **gracefully handle** the error rather than raising it, you can override: + +```python +def exec_fallback(self, shared, prep_res, exc): + raise exc +``` + +By default, it just re-raises `exc`. But you can return a fallback result instead. +That fallback result becomes the `exec_res` passed to `post()`. + +## Example + +```python +class SummarizeFile(Node): + def prep(self, shared): + filename = self.params["filename"] + return shared["data"][filename] + + def exec(self, prep_res): + if not prep_res: + raise ValueError("Empty file content!") + prompt = f"Summarize this text in 10 words: {prep_res}" + summary = call_llm(prompt) # might fail + return summary + + def exec_fallback(self, shared, prep_res, exc): + # Provide a simple fallback instead of crashing + return "There was an error processing your request." + + def post(self, shared, prep_res, exec_res): + filename = self.params["filename"] + shared["summary"][filename] = exec_res + # Return "default" by not returning anything + +summarize_node = SummarizeFile(max_retries=3) + +# Run the node standalone for testing (calls prep->exec->post). +# If exec() fails, it retries up to 3 times before calling exec_fallback(). +summarize_node.set_params({"filename": "test_file.txt"}) +action_result = summarize_node.run(shared) + +print("Action returned:", action_result) # Usually "default" +print("Summary stored:", shared["summary"].get("test_file.txt")) +``` + diff --git a/docs/paradigm.md b/docs/paradigm.md new file mode 100644 index 0000000..ec5072f --- /dev/null +++ b/docs/paradigm.md @@ -0,0 +1,6 @@ +--- +layout: default +title: "Paradigm" +nav_order: 4 +has_children: true +--- \ No newline at end of file diff --git a/docs/parallel.md b/docs/parallel.md new file mode 100644 index 0000000..f822fbe --- /dev/null +++ b/docs/parallel.md @@ -0,0 +1,54 @@ +--- +layout: default +title: "(Advanced) Parallel" +parent: "Core Abstraction" +nav_order: 6 +--- + +# (Advanced) Parallel + +**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. + +## AsyncParallelBatchNode + +Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: + +```python +class ParallelSummaries(AsyncParallelBatchNode): + async def prep_async(self, shared): + # e.g., multiple texts + return shared["texts"] + + async def exec_async(self, text): + prompt = f"Summarize: {text}" + return await call_llm_async(prompt) + + async def post_async(self, shared, prep_res, exec_res_list): + shared["summary"] = "\n\n".join(exec_res_list) + return "default" + +node = ParallelSummaries() +flow = AsyncFlow(start=node) +``` + +## AsyncParallelBatchFlow + +Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: + +```python +class SummarizeMultipleFiles(AsyncParallelBatchFlow): + async def prep_async(self, shared): + return [{"filename": f} for f in shared["files"]] + +sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) +parallel_flow = SummarizeMultipleFiles(start=sub_flow) +await parallel_flow.run_async(shared) +``` + +## Best Practices + +- **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. + +- **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). + +- **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. diff --git a/docs/preparation.md b/docs/preparation.md new file mode 100644 index 0000000..9e8cbf9 --- /dev/null +++ b/docs/preparation.md @@ -0,0 +1,6 @@ +--- +layout: default +title: "Preparation" +nav_order: 3 +has_children: true +--- \ No newline at end of file diff --git a/docs/structure.md b/docs/structure.md new file mode 100644 index 0000000..cc2b123 --- /dev/null +++ b/docs/structure.md @@ -0,0 +1,111 @@ +--- +layout: default +title: "Structured Output" +parent: "Paradigm" +nav_order: 1 +--- + +# Structured Output + +In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. + +There are several approaches to achieve a structured output: +- **Prompting** the LLM to strictly return a defined structure. +- Using LLMs that natively support **schema enforcement**. +- **Post-processing** the LLM’s response to extract structured content. + +In practice, **Prompting** is simple and reliable for modern LLMs. + +### Example Use Cases + +- Extracting Key Information + +```yaml +product: + name: Widget Pro + price: 199.99 + description: | + A high-quality widget designed for professionals. + Recommended for advanced users. +``` + +- Summarizing Documents into Bullet Points + +```yaml +summary: + - This product is easy to use. + - It is cost-effective. + - Suitable for all skill levels. +``` + +- Generating Configuration Files + +```yaml +server: + host: 127.0.0.1 + port: 8080 + ssl: true +``` + +## Prompt Engineering + +When prompting the LLM to produce **structured** output: +1. **Wrap** the structure in code fences (e.g., ` ```yaml`). +2. **Validate** that all required fields exist (and let `Node` handles retry). + +### Example Text Summarization + +```python +class SummarizeNode(Node): + def exec(self, prep_res): + # Suppose `prep_res` is the text to summarize. + prompt = f""" +Please summarize the following text as YAML, with exactly 3 bullet points + +{prep_res} + +Now, output: +```yaml +summary: + - bullet 1 + - bullet 2 + - bullet 3 +```""" + response = call_llm(prompt) + yaml_str = response.split("```yaml")[1].split("```")[0].strip() + + import yaml + structured_result = yaml.safe_load(yaml_str) + + assert "summary" in structured_result + assert isinstance(structured_result["summary"], list) + + return structured_result +``` + +### Why YAML instead of JSON? + +Current LLMs struggle with escaping. YAML is easier with strings since they don’t always need quotes. + +**In JSON** + +```json +{ + "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" +} +``` + +- Every double quote inside the string must be escaped with `\"`. +- Each newline in the dialogue must be represented as `\n`. + +**In YAML** + +```yaml +dialogue: | + Alice said: "Hello Bob. + How are you? + I am good." +``` + +- No need to escape interior quotes—just place the entire text under a block literal (`|`). +- Newlines are naturally preserved without needing `\n`. \ No newline at end of file diff --git a/docs/tool.md b/docs/tool.md new file mode 100644 index 0000000..a0256b0 --- /dev/null +++ b/docs/tool.md @@ -0,0 +1,165 @@ +--- +layout: default +title: "Tool" +parent: "Preparation" +nav_order: 2 +--- + +# Tool + +Similar to LLM wrappers, we **don't** provide built-in tools. Here, we recommend some *minimal* (and incomplete) implementations of commonly used tools. These examples can serve as a starting point for your own tooling. + +--- + + +## 1. Embedding Calls + +```python +def get_embedding(text): + import openai + # Set your API key elsewhere, e.g., environment variables + r = openai.Embedding.create( + model="text-embedding-ada-002", + input=text + ) + return r["data"][0]["embedding"] +``` + +--- + +## 2. Vector Database (Faiss) + +```python +import faiss +import numpy as np + +def create_index(embeddings): + dim = len(embeddings[0]) + index = faiss.IndexFlatL2(dim) + index.add(np.array(embeddings).astype('float32')) + return index + +def search_index(index, query_embedding, top_k=5): + D, I = index.search( + np.array([query_embedding]).astype('float32'), + top_k + ) + return I, D +``` + +--- + +## 3. Local Database + +```python +import sqlite3 + +def execute_sql(query): + conn = sqlite3.connect("mydb.db") + cursor = conn.cursor() + cursor.execute(query) + result = cursor.fetchall() + conn.commit() + conn.close() + return result +``` + +--- + +## 4. Python Function Execution + +```python +def run_code(code_str): + env = {} + exec(code_str, env) + return env +``` + +--- + +## 5. PDF Extraction + +```python +def extract_text_from_pdf(file_path): + import PyPDF2 + pdfFileObj = open(file_path, "rb") + reader = PyPDF2.PdfReader(pdfFileObj) + text = "" + for page in reader.pages: + text += page.extract_text() + pdfFileObj.close() + return text +``` + +--- + +## 6. Web Crawling + +```python +def crawl_web(url): + import requests + from bs4 import BeautifulSoup + html = requests.get(url).text + soup = BeautifulSoup(html, "html.parser") + return soup.title.string, soup.get_text() +``` + +--- + +## 7. Basic Search (SerpAPI example) + +```python +def search_google(query): + import requests + params = { + "engine": "google", + "q": query, + "api_key": "YOUR_API_KEY" + } + r = requests.get("https://serpapi.com/search", params=params) + return r.json() +``` + +--- + + +## 8. Audio Transcription (OpenAI Whisper) + +```python +def transcribe_audio(file_path): + import openai + audio_file = open(file_path, "rb") + transcript = openai.Audio.transcribe("whisper-1", audio_file) + return transcript["text"] +``` + +--- + +## 9. Text-to-Speech (TTS) + +```python +def text_to_speech(text): + import pyttsx3 + engine = pyttsx3.init() + engine.say(text) + engine.runAndWait() +``` + +--- + +## 10. Sending Email + +```python +def send_email(to_address, subject, body, from_address, password): + import smtplib + from email.mime.text import MIMEText + + msg = MIMEText(body) + msg["Subject"] = subject + msg["From"] = from_address + msg["To"] = to_address + + with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server: + server.login(from_address, password) + server.sendmail(from_address, [to_address], msg.as_string()) +``` \ No newline at end of file