diff --git a/README.md b/README.md index 54e2369..f7307f1 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,8 @@ Hence, I built this framework that lets LLMs focus on what matters. It turns out -## Example +## Example LLM apps -- Beginner Tutorial on [Text summarization + QA agent](https://colab.research.google.com/github/zachary62/miniLLMFlow/blob/main/cookbook/demo.ipynb) -- Have questions for this tutorial? Try [this prompt](https://chatgpt.com/share/676f16d2-7064-8000-b9d7-f6874346a6b5) +- Beginner Tutorial: [Text summarization for Paul Graham Essay + QA agent](https://colab.research.google.com/github/zachary62/miniLLMFlow/blob/main/cookbook/demo.ipynb) + + - Have questions for this tutorial? Ask LLM assistants through [this prompt](https://chatgpt.com/share/676f16d2-7064-8000-b9d7-f6874346a6b5) diff --git a/docs/async.md b/docs/async.md index 9d8e98e..e366327 100644 --- a/docs/async.md +++ b/docs/async.md @@ -1,7 +1,7 @@ --- layout: default title: "Async" -nav_order: 6 +nav_order: 7 --- # Async @@ -15,7 +15,7 @@ nav_order: 6 An **AsyncNode** is like a normal `Node`, except `exec()` (and optionally `prep()`) can be declared **async**. You can `await` inside these methods. For example: -`` +```python class AsyncSummarizeFile(AsyncNode): async def prep(self, shared): # Possibly do async file reads or small concurrency tasks @@ -37,7 +37,7 @@ class AsyncSummarizeFile(AsyncNode): filename = self.params["filename"] shared["summary"][filename] = exec_res return "default" -`` +``` - **`prep(shared)`** can be `async def` if you want to do asynchronous pre-processing. - **`exec(shared, prep_res)`** is typically the main place for async logic. @@ -49,7 +49,7 @@ An **AsyncFlow** is a Flow where nodes can be **AsyncNode**s or normal `Node`s. ### Minimal Example -`` +```python class MyAsyncFlow(AsyncFlow): pass # Usually, you just instantiate AsyncFlow with a start node @@ -70,7 +70,7 @@ async def main(): await my_flow.run(shared) asyncio.run(main()) -`` +``` - If the start node or any subsequent node is an `AsyncNode`, the Flow automatically calls its `prep()`, `exec()`, `post()` as async functions. - You can mix normal `Node`s and `AsyncNode`s in the same flow. **AsyncFlow** will handle the difference seamlessly. @@ -79,7 +79,7 @@ asyncio.run(main()) If you want to run a batch of flows **concurrently**, you can use `BatchAsyncFlow`. Like `BatchFlow`, it generates a list of parameter sets in `prep()`, but each iteration runs the sub-flow asynchronously. -`` +```python class SummarizeAllFilesAsync(BatchAsyncFlow): async def prep(self, shared): # Return a list of param dicts (like in BatchFlow), @@ -94,7 +94,7 @@ all_files_flow = SummarizeAllFilesAsync(start=async_summarize_flow) # Then in your async context: await all_files_flow.run(shared) -`` +``` Under the hood: 1. `prep()` returns a list of param sets. @@ -105,7 +105,7 @@ Under the hood: Just like normal Nodes, an `AsyncNode` can have `max_retries` and a `process_after_fail(...)` method: -`` +```python class RetryAsyncNode(AsyncNode): def __init__(self, max_retries=3): super().__init__(max_retries=max_retries) @@ -118,7 +118,7 @@ class RetryAsyncNode(AsyncNode): def process_after_fail(self, shared, prep_res, exc): # Provide fallback response return "Unable to complete async call due to error." -`` +``` ## 5. Best Practices diff --git a/docs/batch.md b/docs/batch.md index 12b70a4..540de5a 100644 --- a/docs/batch.md +++ b/docs/batch.md @@ -1,7 +1,7 @@ --- layout: default title: "Batch" -nav_order: 5 +nav_order: 6 --- # Batch @@ -22,7 +22,7 @@ A **BatchNode** extends `Node` but changes how `prep()` and `exec()` behave: ### Example: Map Summaries -`` +```python class MapSummaries(BatchNode): def prep(self, shared): # Suppose we have a big file; we want to chunk it @@ -43,14 +43,14 @@ class MapSummaries(BatchNode): combined = "\n".join(exec_res_list) shared["summary"]["large_text.txt"] = combined return "default" -`` +``` **Flow** usage: -`` +```python map_summaries = MapSummaries() flow = Flow(start=map_summaries) flow.run(shared) -`` +``` - After `prep()` returns multiple chunks, `exec()` is called for each chunk. - The aggregated `exec_res_list` is passed to `post()`, where you can do final processing. @@ -69,7 +69,7 @@ A **BatchFlow** runs a **Flow** multiple times, each time with a different set o ### Example: Summarize Many Files -`` +```python class SummarizeAllFiles(BatchFlow): def prep(self, shared): # Return a list of parameter dicts (one per file) @@ -78,11 +78,11 @@ class SummarizeAllFiles(BatchFlow): return params_list # No custom exec() or post(), so we rely on BatchFlow’s default -`` +``` Then define a **Flow** that handles **one** file. Suppose we have `Flow(start=summarize_file)`. -`` +```python # Example "per-file" flow (just one node): summarize_file = SummarizeFile() @@ -94,7 +94,7 @@ summarize_all_files = SummarizeAllFiles(start=summarize_file) # Running it: summarize_all_files.run(shared) -`` +``` **Under the hood**: 1. `prep(shared)` in `SummarizeAllFiles` returns a list of param dicts, e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. @@ -125,7 +125,7 @@ This can be done by making the **outer** BatchFlow’s `exec()` return a list of ## 4. Putting It All Together -`` +```python # We'll combine the ideas: class MapSummaries(BatchNode): def prep(self, shared): @@ -157,7 +157,7 @@ class SummarizeAllFiles(BatchFlow): # For now, let's just show usage: summarize_all = SummarizeAllFiles(start=map_flow) summarize_all.run(shared) -`` +``` In this snippet: diff --git a/docs/communication.md b/docs/communication.md index cefe077..b4115f6 100644 --- a/docs/communication.md +++ b/docs/communication.md @@ -1,7 +1,7 @@ --- layout: default title: "Communication" -nav_order: 4 +nav_order: 5 --- # Communication @@ -20,9 +20,9 @@ This design avoids complex message-passing or data routing. It also lets you **n ### Overview A shared store is typically a Python dictionary, like: -`` +```python shared = {"data": {}, "summary": {}, "config": { ... }, ...} -`` +``` Every Node’s `prep()`, `exec()`, and `post()` methods receive the **same** `shared` object. This makes it easy to: - Read data that another Node loaded, such as a text file or database record. @@ -31,7 +31,7 @@ Every Node’s `prep()`, `exec()`, and `post()` methods receive the **same** `sh ### Example -`` +```python class LoadData(Node): def prep(self, shared): # Suppose we read from disk or an API @@ -59,7 +59,7 @@ class Summarize(Node): def post(self, shared, prep_res, exec_res): shared["summary"]["my_file.txt"] = exec_res return "default" -`` +``` Here, - `LoadData` writes to `shared["data"]`. @@ -86,7 +86,7 @@ Common examples: ### Example -`` +```python # 1) Create a Node that uses params class SummarizeFile(Node): def prep(self, shared): @@ -109,7 +109,7 @@ node.set_params({"filename": "doc1.txt"}) # 3) Run node.run(shared) -`` +``` Because **params** are only for that Node, you don’t pollute the global `shared` with fields that might only matter to one operation. @@ -147,7 +147,7 @@ Because **params** are only for that Node, you don’t pollute the global `share ## Putting It All Together -`` +```python # Suppose you have a flow: load_data >> summarize_file my_flow = Flow(start=load_data) @@ -164,7 +164,7 @@ shared = { my_flow.run(shared) # After run, shared["summary"]["my_text.txt"] might have the LLM summary -`` +``` - `load_data` uses its param (`"path"`) to load some data into `shared["data"]`. - `summarize_file` uses its param (`"filename"`) to pick which file from `shared["data"]` to summarize. diff --git a/docs/flow.md b/docs/flow.md index 328bbcf..a4008c4 100644 --- a/docs/flow.md +++ b/docs/flow.md @@ -1,74 +1,33 @@ --- layout: default title: "Flow" -nav_order: 2 +nav_order: 4 --- # Flow -In **Mini LLM Flow**, a **Flow** orchestrates how Nodes connect and run, based on **Actions** returned from each Node’s `post()` method. You can chain Nodes in a sequence or create branching logic depending on the **Action** string. +A **Flow** orchestrates how Nodes connect and run, based on **Actions** returned from each Node’s `post()` method. You can chain Nodes in a sequence or create branching logic depending on the **Action** string. ## Action-based Transitions -Each Node’s `post(shared, prep_res, exec_res)` returns a string called **Action**. By default, if `post()` doesn’t explicitly return anything, we treat that as `"default"`. +Each Node's `post(shared, prep_res, exec_res)` method returns an **Action** string. By default, if `post()` doesn't explicitly return anything, we treat that as `"default"`. You define transitions with the syntax: -```python -node_a >> node_b -``` -- This means if `node_a.post()` returns `"default"` (or `None`), go to `node_b`. +1. Basic default transition: `node_a >> node_b` + This means if `node_a.post()` returns `"default"` (or `None`), go to `node_b`. + (Equivalent to `node_a - "default" >> node_b`) -```python -node_a - "action_name" >> node_b -``` -- This means if `node_a.post()` returns `"action_name"`, go to `node_b`. +2. Named action transition: `node_a - "action_name" >> node_b` + This means if `node_a.post()` returns `"action_name"`, go to `node_b`. -It’s possible to create loops, branching, or multi-step flows. You can also chain with multiple Actions from a single node to different successors: - -```python -# Define nodes for order processing -validate_order = ValidateOrderNode() -check_inventory = CheckInventoryNode() -process_payment = ProcessPaymentNode() -send_confirmation = SendConfirmationNode() -notify_backorder = NotifyBackorderNode() - -# Define the flow -validate_order - "valid" >> check_inventory -validate_order - "invalid" >> send_confirmation # Send rejection confirmation - -check_inventory - "in_stock" >> process_payment -check_inventory - "out_of_stock" >> notify_backorder - -process_payment - "success" >> send_confirmation -process_payment - "failure" >> send_confirmation # Send payment failure notice -``` - -```mermaid -flowchart TD - validate[Validate Order] -->|valid| inventory[Check Inventory] - validate -->|invalid| confirm[Send Confirmation] - - inventory -->|in_stock| payment[Process Payment] - inventory -->|out_of_stock| backorder[Notify Backorder] - - payment -->|success| confirm - payment -->|failure| confirm - - style validate fill:#d4f1f9 - style confirm fill:#d4f1f9 -``` +It’s possible to create loops, branching, or multi-step flows. You can also chain with multiple Actions from a single node to different successors. ## Creating a Flow A **Flow** begins with a **start** node (or flow). You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the first node, looks at its `post()` return Action, follows the corresponding transition, and continues until there’s no next node or you explicitly stop. -```flow = Flow(start=node_a)``` - - - -## Example: Simple Sequence +### Example: Simple Sequence Here’s a minimal flow of two nodes in a chain: @@ -83,69 +42,123 @@ flow.run(shared) - The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. - If `node_b.post()` returns `"default"` but we didn’t define `node_b >> something_else`, the flow ends there. -## Example: Branching & Looping +### Example: Branching & Looping -Suppose `FindRelevantFile` can return three possible Actions in its `post()`: +Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: -- `"end"`: means no question, so stop. -- `"answer"`: means we have a relevant file, move to `AnswerQuestion`. -- `"retry"`: means no relevant file found, try again. +- `"approved"`: expense is approved, move to payment processing +- `"needs_revision"`: expense needs changes, send back for revision +- `"rejected"`: expense is denied, end the process -We can wire them: +We can wire them like this: -``` -find_relevant_file - "end" >> no_op_node -find_relevant_file - "answer" >> answer_question -find_relevant_file - "retry" >> find_relevant_file -flow = Flow(start=find_relevant_file) +```python +# Define the flow connections +review - "approved" >> payment # If approved, process payment +review - "needs_revision" >> revise # If needs changes, go to revision +review - "rejected" >> end # If rejected, end the process + +revise >> review # After revision, go back for another review +payment >> end # After payment, end the process + +flow = Flow(start=review) ``` -1. If `FindRelevantFile.post()` returns `"answer"`, the flow calls `answer_question`. -2. If `FindRelevantFile.post()` returns `"retry"`, it loops back to itself. -3. If `"end"`, it goes to `no_op_node`. If `no_op_node` has no further transitions, the flow stops. +Let's see how it flows: + +1. If `review.post()` returns `"approved"`, the expense moves to `payment` node +2. If `review.post()` returns `"needs_revision"`, it goes to `revise` node, which then loops back to `review` +3. If `review.post()` returns `"rejected"`, it moves to `end` node and stops + +```mermaid +flowchart TD + review[Review Expense] -->|approved| payment[Process Payment] + review -->|needs_revision| revise[Revise Report] + review -->|rejected| end[End Process] + + revise --> review + payment --> end +``` ## Running Individual Nodes vs. Running a Flow -- **`node.run(shared)`**: Just runs that node alone (calls `prep()`, `exec()`, `post()`), returns an Action. **Does not** proceed automatically to the successor. This is mainly for debugging or testing a single node. +- **`node.run(shared)`**: Just runs that node alone (calls `prep()`, `exec()`, `post()`), returns an Action. ⚠️ **Does not** proceed automatically to the successor and may use incorrect parameters. This is mainly for debugging or testing a single node. - **`flow.run(shared)`**: Executes from the start node, follows Actions to the next node, and so on until the flow can’t continue (no next node or no next Action). Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. ## Nested Flows -A **Flow** can act like a Node. That means you can do: +A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: -```some_flow >> another_node``` -or treat `some_flow` as a node inside a larger flow. This helps you compose complex pipelines by nesting smaller flows. +1. Use a flow as a node within another flow's transitions +2. Combine multiple smaller flows into a larger pipeline +3. Create reusable flow components -## Example Code +### Basic Flow Nesting -Below is a short snippet combining these ideas: +Here's how to connect a flow to another node: -``` -# Define nodes -find_file = FindRelevantFile() -answer = AnswerQuestion() -no_op = NoOp() +```python +# Create a sub-flow +node_a >> node_b +subflow = Flow(start=node_a) -# Define transitions -find_file - "answer" >> answer -find_file - "retry" >> find_file -find_file - "end" >> no_op +# Connect it to another node +subflow >> node_c -# Build the Flow -qa_flow = Flow(start=find_file) - -# Run -qa_flow.run(shared) +# Create the parent flow +parent_flow = Flow(start=subflow) ``` -When `find_file`’s `post()` returns `"answer"`, we proceed to `answer`. If it returns `"retry"`, we loop back. If `"end"`, we move on to `no_op`. +When `parent_flow.run()` executes: +1. It starts `subflow` +2. `subflow` runs through its nodes (`node_a` then `node_b`) +3. After `subflow` completes, execution continues to `node_c` ---- +### Example: Order Processing Pipeline -**That’s Flow in a nutshell:** -- **Actions** determine which node runs next. -- **Flow** runs the pipeline from the start node to completion. -- You can chain nodes in a linear sequence or build loops and branches. -- Nodes can themselves be entire flows, allowing nested graph structures. \ No newline at end of file +Here's a practical example that breaks down order processing into nested flows: + +```python +# Payment processing sub-flow +validate_payment >> process_payment >> payment_confirmation +payment_flow = Flow(start=validate_payment) + +# Inventory sub-flow +check_stock >> reserve_items >> update_inventory +inventory_flow = Flow(start=check_stock) + +# Shipping sub-flow +create_label >> assign_carrier >> schedule_pickup +shipping_flow = Flow(start=create_label) + +# Connect the flows into a main order pipeline +payment_flow >> inventory_flow >> shipping_flow + +# Create the master flow +order_pipeline = Flow(start=payment_flow) + +# Run the entire pipeline +order_pipeline.run(shared_data) +``` + +This creates a clean separation of concerns while maintaining a clear execution path: + +```mermaid +flowchart TD + subgraph "Payment Flow" + A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] + end + + subgraph "Inventory Flow" + D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] + end + + subgraph "Shipping Flow" + G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] + end + + Payment Flow --> Inventory Flow + Inventory Flow --> Shipping Flow +``` diff --git a/docs/index.md b/docs/index.md index 9cd7db0..b2f94ee 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,16 +10,20 @@ A [100-line](https://github.com/zachary62/miniLLMFlow/blob/main/minillmflow/__in We model the LLM workflow as a **Nested Flow**: - Each **Node** handles a simple LLM task. -- Nodes are chained together to form a **Flow** for more complex tasks. -- One Node can be chained to multiple Nodes based on **Actions**. +- Nodes are chained together to form a **Flow** for compute-intensive tasks. +- One Node can be chained to multiple Nodes through **Actions** as an agent. - A Flow can be treated as a Node for **Nested Flows**. - Both Nodes and Flows can be **Batched** for data-intensive tasks. -- Nodes and Flows can be **Async**. +- Nodes and Flows can be **Async** for user inputs.
+## Preparation + +- [LLM Integration](./llm.md) + ## Core Abstraction - [Node](./node.md) diff --git a/docs/llm.md b/docs/llm.md new file mode 100644 index 0000000..fd49f9a --- /dev/null +++ b/docs/llm.md @@ -0,0 +1,74 @@ +--- +layout: default +title: "LLM Integration" +nav_order: 3 +--- + +# Call LLM + +For your LLM application, implement a function to call LLMs yourself. +You can ask an assistant like ChatGPT or Claude to generate an example. +For instance, asking ChatGPT to "implement a `call_llm` function that takes a prompt and returns the LLM response" gives: + +```python +def call_llm(prompt): + from openai import OpenAI + # Set the OpenAI API key (use environment variables, etc.) + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + +# Example usage +call_llm("How are you?") +``` + +## Improvements +You can enhance the function as needed. Examples: + +1. Handle chat history: + +```python +def call_llm(messages): + from openai import OpenAI + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4", + messages=messages + ) + return r.choices[0].message.content +``` + +2. Add in-memory caching: + +```python +from functools import lru_cache + +@lru_cache(maxsize=1000) +def call_llm(prompt): + # Your implementation here + pass +``` + +3. Enable logging: + +```python +def call_llm(prompt): + import logging + logging.info(f"Prompt: {prompt}") + response = ... + logging.info(f"Response: {response}") + return response +``` + +You can also try libraries like `litellm` + +## Why not provide an LLM call function? +I believe it is a bad practice to provide LLM-specific implementations in a general framework: +- LLM APIs change frequently. Hardcoding them makes maintenance difficult. +- You may need flexibility to switch vendors, use fine-tuned models, or deploy local LLMs. +- Custom optimizations like prompt caching, request batching, or response streaming may be required. + + diff --git a/docs/node.md b/docs/node.md index 30d9e1b..22bc81b 100644 --- a/docs/node.md +++ b/docs/node.md @@ -1,7 +1,7 @@ --- layout: default title: "Node" -nav_order: 2 +nav_order: 3 --- # Node