From a20827ec58b7c168181b8b1624d96423350e68c2 Mon Sep 17 00:00:00 2001 From: BO WEN Date: Sun, 20 Apr 2025 16:18:34 -0400 Subject: [PATCH] replace .cursorrules with the new cursor mdc files; create a script to generate/update the mdc files from the doc folder --- .cursor/rules/core_abstraction/async.mdc | 53 + .cursor/rules/core_abstraction/batch.mdc | 105 ++ .../rules/core_abstraction/communication.mdc | 124 ++ .cursor/rules/core_abstraction/flow.mdc | 176 ++ .cursor/rules/core_abstraction/node.mdc | 101 + .cursor/rules/core_abstraction/parallel.mdc | 54 + .cursor/rules/design_pattern/agent.mdc | 146 ++ .cursor/rules/design_pattern/mapreduce.mdc | 69 + .cursor/rules/design_pattern/multi_agent.mdc | 184 ++ .cursor/rules/design_pattern/rag.mdc | 157 ++ .cursor/rules/design_pattern/structure.mdc | 112 ++ .cursor/rules/design_pattern/workflow.mdc | 49 + .cursor/rules/guide.mdc | 227 +++ .cursor/rules/index.mdc | 62 + .cursor/rules/utility_function/chunking.mdc | 52 + .cursor/rules/utility_function/embedding.mdc | 115 ++ .cursor/rules/utility_function/llm.mdc | 156 ++ .../rules/utility_function/text_to_speech.mdc | 105 ++ .cursor/rules/utility_function/vector.mdc | 216 +++ .cursor/rules/utility_function/viz.mdc | 138 ++ .cursor/rules/utility_function/websearch.mdc | 112 ++ .cursorrules | 1664 ----------------- utils/update_pocketflow_mdc.py | 319 ++++ 23 files changed, 2832 insertions(+), 1664 deletions(-) create mode 100644 .cursor/rules/core_abstraction/async.mdc create mode 100644 .cursor/rules/core_abstraction/batch.mdc create mode 100644 .cursor/rules/core_abstraction/communication.mdc create mode 100644 .cursor/rules/core_abstraction/flow.mdc create mode 100644 .cursor/rules/core_abstraction/node.mdc create mode 100644 .cursor/rules/core_abstraction/parallel.mdc create mode 100644 .cursor/rules/design_pattern/agent.mdc create mode 100644 .cursor/rules/design_pattern/mapreduce.mdc create mode 100644 .cursor/rules/design_pattern/multi_agent.mdc create mode 100644 .cursor/rules/design_pattern/rag.mdc create mode 100644 .cursor/rules/design_pattern/structure.mdc create mode 100644 .cursor/rules/design_pattern/workflow.mdc create mode 100644 .cursor/rules/guide.mdc create mode 100644 .cursor/rules/index.mdc create mode 100644 .cursor/rules/utility_function/chunking.mdc create mode 100644 .cursor/rules/utility_function/embedding.mdc create mode 100644 .cursor/rules/utility_function/llm.mdc create mode 100644 .cursor/rules/utility_function/text_to_speech.mdc create mode 100644 .cursor/rules/utility_function/vector.mdc create mode 100644 .cursor/rules/utility_function/viz.mdc create mode 100644 .cursor/rules/utility_function/websearch.mdc delete mode 100644 .cursorrules create mode 100644 utils/update_pocketflow_mdc.py diff --git a/.cursor/rules/core_abstraction/async.mdc b/.cursor/rules/core_abstraction/async.mdc new file mode 100644 index 0000000..3e6b523 --- /dev/null +++ b/.cursor/rules/core_abstraction/async.mdc @@ -0,0 +1,53 @@ +--- +description: Guidelines for using PocketFlow, Core Abstraction, (Advanced) Async +globs: +alwaysApply: false +--- +# (Advanced) Async + +**Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: + +1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. +2. **exec_async()**: Typically used for async LLM calls. +3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. + +**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. + +### Example + +```python +class SummarizeThenVerify(AsyncNode): + async def prep_async(self, shared): + # Example: read a file asynchronously + doc_text = await read_file_async(shared["doc_path"]) + return doc_text + + async def exec_async(self, prep_res): + # Example: async LLM call + summary = await call_llm_async(f"Summarize: {prep_res}") + return summary + + async def post_async(self, shared, prep_res, exec_res): + # Example: wait for user feedback + decision = await gather_user_feedback(exec_res) + if decision == "approve": + shared["summary"] = exec_res + return "approve" + return "deny" + +summarize_node = SummarizeThenVerify() +final_node = Finalize() + +# Define transitions +summarize_node - "approve" >> final_node +summarize_node - "deny" >> summarize_node # retry + +flow = AsyncFlow(start=summarize_node) + +async def main(): + shared = {"doc_path": "document.txt"} + await flow.run_async(shared) + print("Final Summary:", shared.get("summary")) + +asyncio.run(main()) +``` \ No newline at end of file diff --git a/.cursor/rules/core_abstraction/batch.mdc b/.cursor/rules/core_abstraction/batch.mdc new file mode 100644 index 0000000..5d8eeff --- /dev/null +++ b/.cursor/rules/core_abstraction/batch.mdc @@ -0,0 +1,105 @@ +--- +description: Guidelines for using PocketFlow, Core Abstraction, Batch +globs: +alwaysApply: false +--- +# Batch + +**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: +- **Chunk-based** processing (e.g., splitting large texts). +- **Iterative** processing over lists of input items (e.g., user queries, files, URLs). + +## 1. BatchNode + +A **BatchNode** extends `Node` but changes `prep()` and `exec()`: + +- **`prep(shared)`**: returns an **iterable** (e.g., list, generator). +- **`exec(item)`**: called **once** per item in that iterable. +- **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. + + +### Example: Summarize a Large File + +```python +class MapSummaries(BatchNode): + def prep(self, shared): + # Suppose we have a big file; chunk it + content = shared["data"] + chunk_size = 10000 + chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] + return chunks + + def exec(self, chunk): + prompt = f"Summarize this chunk in 10 words: {chunk}" + summary = call_llm(prompt) + return summary + + def post(self, shared, prep_res, exec_res_list): + combined = "\n".join(exec_res_list) + shared["summary"] = combined + return "default" + +map_summaries = MapSummaries() +flow = Flow(start=map_summaries) +flow.run(shared) +``` + +--- + +## 2. BatchFlow + +A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. + + +### Example: Summarize Many Files + +```python +class SummarizeAllFiles(BatchFlow): + def prep(self, shared): + # Return a list of param dicts (one per file) + filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] + return [{"filename": fn} for fn in filenames] + +# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): +summarize_file = SummarizeFile(start=load_file) + +# Wrap that flow into a BatchFlow: +summarize_all_files = SummarizeAllFiles(start=summarize_file) +summarize_all_files.run(shared) +``` + +### Under the Hood +1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. +2. The **BatchFlow** loops through each dict. For each one: + - It merges the dict with the BatchFlow’s own `params`. + - It calls `flow.run(shared)` using the merged result. +3. This means the sub-Flow is run **repeatedly**, once for every param dict. + +--- + +## 3. Nested or Multi-Level Batches + +You can nest a **BatchFlow** in another **BatchFlow**. For instance: +- **Outer** batch: returns a list of directory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). +- **Inner** batch: returning a list of per-file param dicts. + +At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. + +```python + +class FileBatchFlow(BatchFlow): + def prep(self, shared): + directory = self.params["directory"] + # e.g., files = ["file1.txt", "file2.txt", ...] + files = [f for f in os.listdir(directory) if f.endswith(".txt")] + return [{"filename": f} for f in files] + +class DirectoryBatchFlow(BatchFlow): + def prep(self, shared): + directories = [ "/path/to/dirA", "/path/to/dirB"] + return [{"directory": d} for d in directories] + +# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} +inner_flow = FileBatchFlow(start=MapSummaries()) +outer_flow = DirectoryBatchFlow(start=inner_flow) +``` diff --git a/.cursor/rules/core_abstraction/communication.mdc b/.cursor/rules/core_abstraction/communication.mdc new file mode 100644 index 0000000..bfb6f99 --- /dev/null +++ b/.cursor/rules/core_abstraction/communication.mdc @@ -0,0 +1,124 @@ +--- +description: Guidelines for using PocketFlow, Core Abstraction, Communication +globs: +alwaysApply: false +--- +# Communication + +Nodes and Flows **communicate** in 2 ways: + +1. **Shared Store (for almost all the cases)** + + - A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`). + - Great for data results, large content, or anything multiple nodes need. + - You shall design the data structure and populate it ahead. + + - > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](mdc:batch.md). + {: .best-practice } + +2. **Params (only for [Batch](mdc:batch.md))** + - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. + - Good for identifiers like filenames or numeric IDs, in Batch mode. + +If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). + +--- + +## 1. Shared Store + +### Overview + +A shared store is typically an in-mem dictionary, like: +```python +shared = {"data": {}, "summary": {}, "config": {...}, ...} +``` + +It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. + +### Example + +```python +class LoadData(Node): + def post(self, shared, prep_res, exec_res): + # We write data to shared store + shared["data"] = "Some text content" + return None + +class Summarize(Node): + def prep(self, shared): + # We read data from shared store + return shared["data"] + + def exec(self, prep_res): + # Call LLM to summarize + prompt = f"Summarize: {prep_res}" + summary = call_llm(prompt) + return summary + + def post(self, shared, prep_res, exec_res): + # We write summary to shared store + shared["summary"] = exec_res + return "default" + +load_data = LoadData() +summarize = Summarize() +load_data >> summarize +flow = Flow(start=load_data) + +shared = {} +flow.run(shared) +``` + +Here: +- `LoadData` writes to `shared["data"]`. +- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. + +--- + +## 2. Params + +**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: +- **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`). +- **Set** via `set_params()`. +- **Cleared** and updated each time a parent Flow calls it. + +> Only set the uppermost Flow params because others will be overwritten by the parent Flow. +> +> If you need to set child node params, see [Batch](mdc:batch.md). +{: .warning } + +Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. + +### Example + +```python +# 1) Create a Node that uses params +class SummarizeFile(Node): + def prep(self, shared): + # Access the node's param + filename = self.params["filename"] + return shared["data"].get(filename, "") + + def exec(self, prep_res): + prompt = f"Summarize: {prep_res}" + return call_llm(prompt) + + def post(self, shared, prep_res, exec_res): + filename = self.params["filename"] + shared["summary"][filename] = exec_res + return "default" + +# 2) Set params +node = SummarizeFile() + +# 3) Set Node params directly (for testing) +node.set_params({"filename": "doc1.txt"}) +node.run(shared) + +# 4) Create Flow +flow = Flow(start=node) + +# 5) Set Flow params (overwrites node params) +flow.set_params({"filename": "doc2.txt"}) +flow.run(shared) # The node summarizes doc2, not doc1 +``` diff --git a/.cursor/rules/core_abstraction/flow.mdc b/.cursor/rules/core_abstraction/flow.mdc new file mode 100644 index 0000000..b834e91 --- /dev/null +++ b/.cursor/rules/core_abstraction/flow.mdc @@ -0,0 +1,176 @@ +--- +description: Guidelines for using PocketFlow, Core Abstraction, Flow +globs: +alwaysApply: false +--- +# Flow + +A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`. + +## 1. Action-based Transitions + +Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`. + +You define transitions with the syntax: + +1. **Basic default transition**: `node_a >> node_b` + This means if `node_a.post()` returns `"default"`, go to `node_b`. + (Equivalent to `node_a - "default" >> node_b`) + +2. **Named action transition**: `node_a - "action_name" >> node_b` + This means if `node_a.post()` returns `"action_name"`, go to `node_b`. + +It's possible to create loops, branching, or multi-step flows. + +## 2. Creating a Flow + +A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node. + +### Example: Simple Sequence + +Here's a minimal flow of two nodes in a chain: + +```python +node_a >> node_b +flow = Flow(start=node_a) +flow.run(shared) +``` + +- When you run the flow, it executes `node_a`. +- Suppose `node_a.post()` returns `"default"`. +- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. +- `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there. + +### Example: Branching & Looping + +Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: + +- `"approved"`: expense is approved, move to payment processing +- `"needs_revision"`: expense needs changes, send back for revision +- `"rejected"`: expense is denied, finish the process + +We can wire them like this: + +```python +# Define the flow connections +review - "approved" >> payment # If approved, process payment +review - "needs_revision" >> revise # If needs changes, go to revision +review - "rejected" >> finish # If rejected, finish the process + +revise >> review # After revision, go back for another review +payment >> finish # After payment, finish the process + +flow = Flow(start=review) +``` + +Let's see how it flows: + +1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node +2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review` +3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops + +```mermaid +flowchart TD + review[Review Expense] -->|approved| payment[Process Payment] + review -->|needs_revision| revise[Revise Report] + review -->|rejected| finish[Finish Process] + + revise --> review + payment --> finish +``` + +### Running Individual Nodes vs. Running a Flow + +- `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. +- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. + +> `node.run(shared)` **does not** proceed to the successor. +> This is mainly for debugging or testing a single node. +> +> Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. +{: .warning } + +## 3. Nested Flows + +A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: + +1. Use a Flow as a Node within another Flow's transitions. +2. Combine multiple smaller Flows into a larger Flow for reuse. +3. Node `params` will be a merging of **all** parents' `params`. + +### Flow's Node Methods + +A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: + +- It **won't** run `exec()`, as its main logic is to orchestrate its nodes. +- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. + +### Basic Flow Nesting + +Here's how to connect a flow to another node: + +```python +# Create a sub-flow +node_a >> node_b +subflow = Flow(start=node_a) + +# Connect it to another node +subflow >> node_c + +# Create the parent flow +parent_flow = Flow(start=subflow) +``` + +When `parent_flow.run()` executes: +1. It starts `subflow` +2. `subflow` runs through its nodes (`node_a->node_b`) +3. After `subflow` completes, execution continues to `node_c` + +### Example: Order Processing Pipeline + +Here's a practical example that breaks down order processing into nested flows: + +```python +# Payment processing sub-flow +validate_payment >> process_payment >> payment_confirmation +payment_flow = Flow(start=validate_payment) + +# Inventory sub-flow +check_stock >> reserve_items >> update_inventory +inventory_flow = Flow(start=check_stock) + +# Shipping sub-flow +create_label >> assign_carrier >> schedule_pickup +shipping_flow = Flow(start=create_label) + +# Connect the flows into a main order pipeline +payment_flow >> inventory_flow >> shipping_flow + +# Create the master flow +order_pipeline = Flow(start=payment_flow) + +# Run the entire pipeline +order_pipeline.run(shared_data) +``` + +This creates a clean separation of concerns while maintaining a clear execution path: + +```mermaid +flowchart LR + subgraph order_pipeline[Order Pipeline] + subgraph paymentFlow["Payment Flow"] + A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] + end + + subgraph inventoryFlow["Inventory Flow"] + D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] + end + + subgraph shippingFlow["Shipping Flow"] + G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] + end + + paymentFlow --> inventoryFlow + inventoryFlow --> shippingFlow + end +``` \ No newline at end of file diff --git a/.cursor/rules/core_abstraction/node.mdc b/.cursor/rules/core_abstraction/node.mdc new file mode 100644 index 0000000..235915c --- /dev/null +++ b/.cursor/rules/core_abstraction/node.mdc @@ -0,0 +1,101 @@ +--- +description: Guidelines for using PocketFlow, Core Abstraction, Node +globs: +alwaysApply: false +--- +# Node + +A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`: + + + +1. `prep(shared)` + - **Read and preprocess data** from `shared` store. + - Examples: *query DB, read files, or serialize data into a string*. + - Return `prep_res`, which is used by `exec()` and `post()`. + +2. `exec(prep_res)` + - **Execute compute logic**, with optional retries and error handling (below). + - Examples: *(mostly) LLM calls, remote APIs, tool use*. + - ⚠️ This shall be only for compute and **NOT** access `shared`. + - ⚠️ If retries enabled, ensure idempotent implementation. + - Return `exec_res`, which is passed to `post()`. + +3. `post(shared, prep_res, exec_res)` + - **Postprocess and write data** back to `shared`. + - Examples: *update DB, change states, log results*. + - **Decide the next action** by returning a *string* (`action = "default"` if *None*). + +> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. +> +> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. +{: .note } + +### Fault Tolerance & Retries + +You can **retry** `exec()` if it raises an exception via two parameters when define the Node: + +- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). +- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). +`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. + +```python +my_node = SummarizeFile(max_retries=3, wait=10) +``` + +When an exception occurs in `exec()`, the Node automatically retries until: + +- It either succeeds, or +- The Node has retried `max_retries - 1` times already and fails on the last attempt. + +You can get the current retry times (0-based) from `self.cur_retry`. + +```python +class RetryNode(Node): + def exec(self, prep_res): + print(f"Retry {self.cur_retry} times") + raise Exception("Failed") +``` + +### Graceful Fallback + +To **gracefully handle** the exception (after all retries) rather than raising it, override: + +```python +def exec_fallback(self, prep_res, exc): + raise exc +``` + +By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. + +### Example: Summarize file + +```python +class SummarizeFile(Node): + def prep(self, shared): + return shared["data"] + + def exec(self, prep_res): + if not prep_res: + return "Empty file content" + prompt = f"Summarize this text in 10 words: {prep_res}" + summary = call_llm(prompt) # might fail + return summary + + def exec_fallback(self, prep_res, exc): + # Provide a simple fallback instead of crashing + return "There was an error processing your request." + + def post(self, shared, prep_res, exec_res): + shared["summary"] = exec_res + # Return "default" by not returning + +summarize_node = SummarizeFile(max_retries=3) + +# node.run() calls prep->exec->post +# If exec() fails, it retries up to 3 times before calling exec_fallback() +action_result = summarize_node.run(shared) + +print("Action returned:", action_result) # "default" +print("Summary stored:", shared["summary"]) +``` \ No newline at end of file diff --git a/.cursor/rules/core_abstraction/parallel.mdc b/.cursor/rules/core_abstraction/parallel.mdc new file mode 100644 index 0000000..88a0b42 --- /dev/null +++ b/.cursor/rules/core_abstraction/parallel.mdc @@ -0,0 +1,54 @@ +--- +description: Guidelines for using PocketFlow, Core Abstraction, (Advanced) Parallel +globs: +alwaysApply: false +--- +# (Advanced) Parallel + +**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. + +> Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. +{: .warning } + +> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. +> +> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). +> +> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. +{: .best-practice } + +## AsyncParallelBatchNode + +Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: + +```python +class ParallelSummaries(AsyncParallelBatchNode): + async def prep_async(self, shared): + # e.g., multiple texts + return shared["texts"] + + async def exec_async(self, text): + prompt = f"Summarize: {text}" + return await call_llm_async(prompt) + + async def post_async(self, shared, prep_res, exec_res_list): + shared["summary"] = "\n\n".join(exec_res_list) + return "default" + +node = ParallelSummaries() +flow = AsyncFlow(start=node) +``` + +## AsyncParallelBatchFlow + +Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: + +```python +class SummarizeMultipleFiles(AsyncParallelBatchFlow): + async def prep_async(self, shared): + return [{"filename": f} for f in shared["files"]] + +sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) +parallel_flow = SummarizeMultipleFiles(start=sub_flow) +await parallel_flow.run_async(shared) +``` \ No newline at end of file diff --git a/.cursor/rules/design_pattern/agent.mdc b/.cursor/rules/design_pattern/agent.mdc new file mode 100644 index 0000000..9ad2b4d --- /dev/null +++ b/.cursor/rules/design_pattern/agent.mdc @@ -0,0 +1,146 @@ +--- +description: Guidelines for using PocketFlow, Design Pattern, Agent +globs: +alwaysApply: false +--- +# Agent + +Agent is a powerful design pattern in which nodes can take dynamic actions based on the context. + + + +## Implement Agent with Graph + +1. **Context and Action:** Implement nodes that supply context and perform actions. +2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](mdc:../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step. +3. **Agent Node:** Provide a prompt to decide action—for example: + +```python +f""" +### CONTEXT +Task: {task_description} +Previous Actions: {previous_actions} +Current State: {current_state} + +### ACTION SPACE +[1] search + Description: Use web search to get results + Parameters: + - query (str): What to search for + +[2] answer + Description: Conclude based on the results + Parameters: + - result (str): Final answer to provide + +### NEXT ACTION +Decide the next action based on the current context and available action space. +Return your response in the following format: + +```yaml +thinking: | + +action: +parameters: + : +```""" +``` + +The core of building **high-performance** and **reliable** agents boils down to: + +1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](mdc:rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](mdc:https:/arxiv.org/abs/2307.03172), overlooking mid-prompt content. + +2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database. + +## Example Good Action Design + +- **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once. + +- **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). + +- **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. + +- **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. + +## Example: Search Agent + +This agent: +1. Decides whether to search or answer +2. If searches, loops back to decide if more search needed +3. Answers when enough context gathered + +```python +class DecideAction(Node): + def prep(self, shared): + context = shared.get("context", "No previous search") + query = shared["query"] + return query, context + + def exec(self, inputs): + query, context = inputs + prompt = f""" +Given input: {query} +Previous search results: {context} +Should I: 1) Search web for more info 2) Answer with current knowledge +Output in yaml: +```yaml +action: search/answer +reason: why this action +search_term: search phrase if action is search +```""" + resp = call_llm(prompt) + yaml_str = resp.split("```yaml")[1].split("```")[0].strip() + result = yaml.safe_load(yaml_str) + + assert isinstance(result, dict) + assert "action" in result + assert "reason" in result + assert result["action"] in ["search", "answer"] + if result["action"] == "search": + assert "search_term" in result + + return result + + def post(self, shared, prep_res, exec_res): + if exec_res["action"] == "search": + shared["search_term"] = exec_res["search_term"] + return exec_res["action"] + +class SearchWeb(Node): + def prep(self, shared): + return shared["search_term"] + + def exec(self, search_term): + return search_web(search_term) + + def post(self, shared, prep_res, exec_res): + prev_searches = shared.get("context", []) + shared["context"] = prev_searches + [ + {"term": shared["search_term"], "result": exec_res} + ] + return "decide" + +class DirectAnswer(Node): + def prep(self, shared): + return shared["query"], shared.get("context", "") + + def exec(self, inputs): + query, context = inputs + return call_llm(f"Context: {context}\nAnswer: {query}") + + def post(self, shared, prep_res, exec_res): + print(f"Answer: {exec_res}") + shared["answer"] = exec_res + +# Connect nodes +decide = DecideAction() +search = SearchWeb() +answer = DirectAnswer() + +decide - "search" >> search +decide - "answer" >> answer +search - "decide" >> decide # Loop back + +flow = Flow(start=decide) +flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) +``` diff --git a/.cursor/rules/design_pattern/mapreduce.mdc b/.cursor/rules/design_pattern/mapreduce.mdc new file mode 100644 index 0000000..e1ae927 --- /dev/null +++ b/.cursor/rules/design_pattern/mapreduce.mdc @@ -0,0 +1,69 @@ +--- +description: Guidelines for using PocketFlow, Design Pattern, Map Reduce +globs: +alwaysApply: false +--- +# Map Reduce + +MapReduce is a design pattern suitable when you have either: +- Large input data (e.g., multiple files to process), or +- Large output data (e.g., multiple forms to fill) + +and there is a logical way to break the task into smaller, ideally independent parts. + + + +You first break down the task using [BatchNode](mdc:../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. + +### Example: Document Summarization + +```python +class SummarizeAllFiles(BatchNode): + def prep(self, shared): + files_dict = shared["files"] # e.g. 10 files + return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] + + def exec(self, one_file): + filename, file_content = one_file + summary_text = call_llm(f"Summarize the following file:\n{file_content}") + return (filename, summary_text) + + def post(self, shared, prep_res, exec_res_list): + shared["file_summaries"] = dict(exec_res_list) + +class CombineSummaries(Node): + def prep(self, shared): + return shared["file_summaries"] + + def exec(self, file_summaries): + # format as: "File1: summary\nFile2: summary...\n" + text_list = [] + for fname, summ in file_summaries.items(): + text_list.append(f"{fname} summary:\n{summ}\n") + big_text = "\n---\n".join(text_list) + + return call_llm(f"Combine these file summaries into one final summary:\n{big_text}") + + def post(self, shared, prep_res, final_summary): + shared["all_files_summary"] = final_summary + +batch_node = SummarizeAllFiles() +combine_node = CombineSummaries() +batch_node >> combine_node + +flow = Flow(start=batch_node) + +shared = { + "files": { + "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", + "file2.txt": "Some other interesting text ...", + # ... + } +} +flow.run(shared) +print("Individual Summaries:", shared["file_summaries"]) +print("\nFinal Summary:\n", shared["all_files_summary"]) +``` + +> **Performance Tip**: The example above works sequentially. You can speed up the map phase by running it in parallel. See [(Advanced) Parallel](mdc:../core_abstraction/parallel.md) for more details. +{: .note } \ No newline at end of file diff --git a/.cursor/rules/design_pattern/multi_agent.mdc b/.cursor/rules/design_pattern/multi_agent.mdc new file mode 100644 index 0000000..2ee0216 --- /dev/null +++ b/.cursor/rules/design_pattern/multi_agent.mdc @@ -0,0 +1,184 @@ +--- +description: Guidelines for using PocketFlow, Design Pattern, (Advanced) Multi-Agents +globs: +alwaysApply: false +--- +# (Advanced) Multi-Agents + +Multiple [Agents](mdc:flow.md) can work together by handling subtasks and communicating the progress. +Communication between agents is typically implemented using message queues in shared storage. + +> Most of time, you don't need Multi-Agents. Start with a simple solution first. +{: .best-practice } + +### Example Agent Communication: Message Queue + +Here's a simple example showing how to implement agent communication using `asyncio.Queue`. +The agent listens for messages, processes them, and continues listening: + +```python +class AgentNode(AsyncNode): + async def prep_async(self, _): + message_queue = self.params["messages"] + message = await message_queue.get() + print(f"Agent received: {message}") + return message + +# Create node and flow +agent = AgentNode() +agent >> agent # connect to self +flow = AsyncFlow(start=agent) + +# Create heartbeat sender +async def send_system_messages(message_queue): + counter = 0 + messages = [ + "System status: all systems operational", + "Memory usage: normal", + "Network connectivity: stable", + "Processing load: optimal" + ] + + while True: + message = f"{messages[counter % len(messages)]} | timestamp_{counter}" + await message_queue.put(message) + counter += 1 + await asyncio.sleep(1) + +async def main(): + message_queue = asyncio.Queue() + shared = {} + flow.set_params({"messages": message_queue}) + + # Run both coroutines + await asyncio.gather( + flow.run_async(shared), + send_system_messages(message_queue) + ) + +asyncio.run(main()) +``` + +The output: + +``` +Agent received: System status: all systems operational | timestamp_0 +Agent received: Memory usage: normal | timestamp_1 +Agent received: Network connectivity: stable | timestamp_2 +Agent received: Processing load: optimal | timestamp_3 +``` + +### Interactive Multi-Agent Example: Taboo Game + +Here's a more complex example where two agents play the word-guessing game Taboo. +One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word: + +```python +class AsyncHinter(AsyncNode): + async def prep_async(self, shared): + guess = await shared["hinter_queue"].get() + if guess == "GAME_OVER": + return None + return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", []) + + async def exec_async(self, inputs): + if inputs is None: + return None + target, forbidden, past_guesses = inputs + prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" + if past_guesses: + prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific." + prompt += "\nUse at most 5 words." + + hint = call_llm(prompt) + print(f"\nHinter: Here's your hint - {hint}") + return hint + + async def post_async(self, shared, prep_res, exec_res): + if exec_res is None: + return "end" + await shared["guesser_queue"].put(exec_res) + return "continue" + +class AsyncGuesser(AsyncNode): + async def prep_async(self, shared): + hint = await shared["guesser_queue"].get() + return hint, shared.get("past_guesses", []) + + async def exec_async(self, inputs): + hint, past_guesses = inputs + prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:" + guess = call_llm(prompt) + print(f"Guesser: I guess it's - {guess}") + return guess + + async def post_async(self, shared, prep_res, exec_res): + if exec_res.lower() == shared["target_word"].lower(): + print("Game Over - Correct guess!") + await shared["hinter_queue"].put("GAME_OVER") + return "end" + + if "past_guesses" not in shared: + shared["past_guesses"] = [] + shared["past_guesses"].append(exec_res) + + await shared["hinter_queue"].put(exec_res) + return "continue" + +async def main(): + # Set up game + shared = { + "target_word": "nostalgia", + "forbidden_words": ["memory", "past", "remember", "feeling", "longing"], + "hinter_queue": asyncio.Queue(), + "guesser_queue": asyncio.Queue() + } + + print("Game starting!") + print(f"Target word: {shared['target_word']}") + print(f"Forbidden words: {shared['forbidden_words']}") + + # Initialize by sending empty guess to hinter + await shared["hinter_queue"].put("") + + # Create nodes and flows + hinter = AsyncHinter() + guesser = AsyncGuesser() + + # Set up flows + hinter_flow = AsyncFlow(start=hinter) + guesser_flow = AsyncFlow(start=guesser) + + # Connect nodes to themselves + hinter - "continue" >> hinter + guesser - "continue" >> guesser + + # Run both agents concurrently + await asyncio.gather( + hinter_flow.run_async(shared), + guesser_flow.run_async(shared) + ) + +asyncio.run(main()) +``` + +The Output: + +``` +Game starting! +Target word: nostalgia +Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing'] + +Hinter: Here's your hint - Thinking of childhood summer days +Guesser: I guess it's - popsicle + +Hinter: Here's your hint - When childhood cartoons make you emotional +Guesser: I guess it's - nostalgic + +Hinter: Here's your hint - When old songs move you +Guesser: I guess it's - memories + +Hinter: Here's your hint - That warm emotion about childhood +Guesser: I guess it's - nostalgia +Game Over - Correct guess! +``` \ No newline at end of file diff --git a/.cursor/rules/design_pattern/rag.mdc b/.cursor/rules/design_pattern/rag.mdc new file mode 100644 index 0000000..6acea43 --- /dev/null +++ b/.cursor/rules/design_pattern/rag.mdc @@ -0,0 +1,157 @@ +--- +description: Guidelines for using PocketFlow, Design Pattern, RAG +globs: +alwaysApply: false +--- +# RAG (Retrieval Augmented Generation) + +For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline: + + + +1. **Offline stage**: Preprocess and index documents ("building the index"). +2. **Online stage**: Given a question, generate answers by retrieving the most relevant context. + +--- +## Stage 1: Offline Indexing + +We create three Nodes: +1. `ChunkDocs` – [chunks](mdc:../utility_function/chunking.md) raw text. +2. `EmbedDocs` – [embeds](mdc:../utility_function/embedding.md) each chunk. +3. `StoreIndex` – stores embeddings into a [vector database](mdc:../utility_function/vector.md). + +```python +class ChunkDocs(BatchNode): + def prep(self, shared): + # A list of file paths in shared["files"]. We process each file. + return shared["files"] + + def exec(self, filepath): + # read file content. In real usage, do error handling. + with open(filepath, "r", encoding="utf-8") as f: + text = f.read() + # chunk by 100 chars each + chunks = [] + size = 100 + for i in range(0, len(text), size): + chunks.append(text[i : i + size]) + return chunks + + def post(self, shared, prep_res, exec_res_list): + # exec_res_list is a list of chunk-lists, one per file. + # flatten them all into a single list of chunks. + all_chunks = [] + for chunk_list in exec_res_list: + all_chunks.extend(chunk_list) + shared["all_chunks"] = all_chunks + +class EmbedDocs(BatchNode): + def prep(self, shared): + return shared["all_chunks"] + + def exec(self, chunk): + return get_embedding(chunk) + + def post(self, shared, prep_res, exec_res_list): + # Store the list of embeddings. + shared["all_embeds"] = exec_res_list + print(f"Total embeddings: {len(exec_res_list)}") + +class StoreIndex(Node): + def prep(self, shared): + # We'll read all embeds from shared. + return shared["all_embeds"] + + def exec(self, all_embeds): + # Create a vector index (faiss or other DB in real usage). + index = create_index(all_embeds) + return index + + def post(self, shared, prep_res, index): + shared["index"] = index + +# Wire them in sequence +chunk_node = ChunkDocs() +embed_node = EmbedDocs() +store_node = StoreIndex() + +chunk_node >> embed_node >> store_node + +OfflineFlow = Flow(start=chunk_node) +``` + +Usage example: + +```python +shared = { + "files": ["doc1.txt", "doc2.txt"], # any text files +} +OfflineFlow.run(shared) +``` + +--- +## Stage 2: Online Query & Answer + +We have 3 nodes: +1. `EmbedQuery` – embeds the user’s question. +2. `RetrieveDocs` – retrieves top chunk from the index. +3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer. + +```python +class EmbedQuery(Node): + def prep(self, shared): + return shared["question"] + + def exec(self, question): + return get_embedding(question) + + def post(self, shared, prep_res, q_emb): + shared["q_emb"] = q_emb + +class RetrieveDocs(Node): + def prep(self, shared): + # We'll need the query embedding, plus the offline index/chunks + return shared["q_emb"], shared["index"], shared["all_chunks"] + + def exec(self, inputs): + q_emb, index, chunks = inputs + I, D = search_index(index, q_emb, top_k=1) + best_id = I[0][0] + relevant_chunk = chunks[best_id] + return relevant_chunk + + def post(self, shared, prep_res, relevant_chunk): + shared["retrieved_chunk"] = relevant_chunk + print("Retrieved chunk:", relevant_chunk[:60], "...") + +class GenerateAnswer(Node): + def prep(self, shared): + return shared["question"], shared["retrieved_chunk"] + + def exec(self, inputs): + question, chunk = inputs + prompt = f"Question: {question}\nContext: {chunk}\nAnswer:" + return call_llm(prompt) + + def post(self, shared, prep_res, answer): + shared["answer"] = answer + print("Answer:", answer) + +embed_qnode = EmbedQuery() +retrieve_node = RetrieveDocs() +generate_node = GenerateAnswer() + +embed_qnode >> retrieve_node >> generate_node +OnlineFlow = Flow(start=embed_qnode) +``` + +Usage example: + +```python +# Suppose we already ran OfflineFlow and have: +# shared["all_chunks"], shared["index"], etc. +shared["question"] = "Why do people like cats?" + +OnlineFlow.run(shared) +# final answer in shared["answer"] +``` \ No newline at end of file diff --git a/.cursor/rules/design_pattern/structure.mdc b/.cursor/rules/design_pattern/structure.mdc new file mode 100644 index 0000000..d580ac6 --- /dev/null +++ b/.cursor/rules/design_pattern/structure.mdc @@ -0,0 +1,112 @@ +--- +description: Guidelines for using PocketFlow, Design Pattern, Structured Output +globs: +alwaysApply: false +--- +# Structured Output + +In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. + +There are several approaches to achieve a structured output: +- **Prompting** the LLM to strictly return a defined structure. +- Using LLMs that natively support **schema enforcement**. +- **Post-processing** the LLM's response to extract structured content. + +In practice, **Prompting** is simple and reliable for modern LLMs. + +### Example Use Cases + +- Extracting Key Information + +```yaml +product: + name: Widget Pro + price: 199.99 + description: | + A high-quality widget designed for professionals. + Recommended for advanced users. +``` + +- Summarizing Documents into Bullet Points + +```yaml +summary: + - This product is easy to use. + - It is cost-effective. + - Suitable for all skill levels. +``` + +- Generating Configuration Files + +```yaml +server: + host: 127.0.0.1 + port: 8080 + ssl: true +``` + +## Prompt Engineering + +When prompting the LLM to produce **structured** output: +1. **Wrap** the structure in code fences (e.g., `yaml`). +2. **Validate** that all required fields exist (and let `Node` handles retry). + +### Example Text Summarization + +```python +class SummarizeNode(Node): + def exec(self, prep_res): + # Suppose `prep_res` is the text to summarize. + prompt = f""" +Please summarize the following text as YAML, with exactly 3 bullet points + +{prep_res} + +Now, output: +```yaml +summary: + - bullet 1 + - bullet 2 + - bullet 3 +```""" + response = call_llm(prompt) + yaml_str = response.split("```yaml")[1].split("```")[0].strip() + + import yaml + structured_result = yaml.safe_load(yaml_str) + + assert "summary" in structured_result + assert isinstance(structured_result["summary"], list) + + return structured_result +``` + +> Besides using `assert` statements, another popular way to validate schemas is [Pydantic](mdc:https:/github.com/pydantic/pydantic) +{: .note } + +### Why YAML instead of JSON? + +Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. + +**In JSON** + +```json +{ + "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" +} +``` + +- Every double quote inside the string must be escaped with `\"`. +- Each newline in the dialogue must be represented as `\n`. + +**In YAML** + +```yaml +dialogue: | + Alice said: "Hello Bob. + How are you? + I am good." +``` + +- No need to escape interior quotes—just place the entire text under a block literal (`|`). +- Newlines are naturally preserved without needing `\n`. \ No newline at end of file diff --git a/.cursor/rules/design_pattern/workflow.mdc b/.cursor/rules/design_pattern/workflow.mdc new file mode 100644 index 0000000..1d903d2 --- /dev/null +++ b/.cursor/rules/design_pattern/workflow.mdc @@ -0,0 +1,49 @@ +--- +description: Guidelines for using PocketFlow, Design Pattern, Workflow +globs: +alwaysApply: false +--- +# Workflow + +Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](mdc:../core_abstraction/flow.md) of multiple Nodes. + + + +> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. +> - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. +> +> You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](mdc:agent.md). +{: .best-practice } + +### Example: Article Writing + +```python +class GenerateOutline(Node): + def prep(self, shared): return shared["topic"] + def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") + def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res + +class WriteSection(Node): + def prep(self, shared): return shared["outline"] + def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") + def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res + +class ReviewAndRefine(Node): + def prep(self, shared): return shared["draft"] + def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") + def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res + +# Connect nodes +outline = GenerateOutline() +write = WriteSection() +review = ReviewAndRefine() + +outline >> write >> review + +# Create and run flow +writing_flow = Flow(start=outline) +shared = {"topic": "AI Safety"} +writing_flow.run(shared) +``` + +For *dynamic cases*, consider using [Agents](mdc:agent.md). \ No newline at end of file diff --git a/.cursor/rules/guide.mdc b/.cursor/rules/guide.mdc new file mode 100644 index 0000000..a84c25e --- /dev/null +++ b/.cursor/rules/guide.mdc @@ -0,0 +1,227 @@ +--- +description: Guidelines for using PocketFlow, Agentic Coding +globs: **/*.py +alwaysApply: true +--- +# Agentic Coding: Humans Design, Agents code! + +> If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification. +{: .warning } + +## Agentic Coding Steps + +Agentic Coding should be a collaboration between Human System Design and Agent Implementation: + +| Steps | Human | AI | Comment | +|:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------| +| 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. | +| 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. | +| 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. | +| 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. | +| 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. | +| 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. | +| 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. | + +1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. + - Understand AI systems' strengths and limitations: + - **Good for**: Routine tasks requiring common sense (filling forms, replying to emails) + - **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL) + - **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning) + - **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features. + - **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early. + +2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes. + - Identify applicable design patterns (e.g., [Map Reduce], [Agent], [RAG]). + - For each node in the flow, start with a high-level one-line description of what it does. + - If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine). + - If using **Agent**, specify what are the inputs (context) and what are the possible actions. + - If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows. + - Outline the flow and draw it in a mermaid diagram. For example: + ```mermaid + flowchart LR + start[Start] --> batch[Batch] + batch --> check[Check] + check -->|OK| process + check -->|Error| fix[Fix] + fix --> check + + subgraph process[Process] + step1[Step 1] --> step2[Step 2] + end + + process --> endNode[End] + ``` + - > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition. + {: .best-practice } + +3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions. + - Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world: + + + - Reading inputs (e.g., retrieving Slack messages, reading emails) + - Writing outputs (e.g., generating reports, sending emails) + - Using external tools (e.g., calling LLMs, searching the web) + - **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system. + - For each utility function, implement it and write a simple test. + - Document their input/output, as well as why they are necessary. For example: + - `name`: `get_embedding` (`utils/get_embedding.py`) + - `input`: `str` + - `output`: a vector of 3072 floats + - `necessity`: Used by the second node to embed text + - Example utility implementation: + ```python + # utils/call_llm.py + from openai import OpenAI + + def call_llm(prompt): + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + + if __name__ == "__main__": + prompt = "What is the meaning of life?" + print(call_llm(prompt)) + ``` + - > **Sometimes, design Utilities before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. + {: .best-practice } + +4. **Node Design**: Plan how each node will read and write data, and use utility functions. + - One core design principle for PocketFlow is to use a [shared store], so start with a shared store design: + - For simple systems, use an in-memory dictionary. + - For more complex systems or when persistence is required, use a database. + - **Don't Repeat Yourself**: Use in-memory references or foreign keys. + - Example shared store design: + ```python + shared = { + "user": { + "id": "user123", + "context": { # Another nested dict + "weather": {"temp": 72, "condition": "sunny"}, + "location": "San Francisco" + } + }, + "results": {} # Empty dict to store outputs + } + ``` + - For each [Node], describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: + - `type`: Regular (or Batch, or Async) + - `prep`: Read "text" from the shared store + - `exec`: Call the embedding utility function + - `post`: Write "embedding" to the shared store + +5. **Implementation**: Implement the initial nodes and flows based on the design. + - 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins! + - **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking. + - **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system. + - Add logging throughout the code to facilitate debugging. + +7. **Optimization**: + - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. + - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. + - If your flow design is already solid, move on to micro-optimizations: + - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. + - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. + + - > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times. + > + > + {: .best-practice } + +8. **Reliability** + - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. + - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. + - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. + +## Example LLM Project File Structure + +``` +my_project/ +├── main.py +├── nodes.py +├── flow.py +├── utils/ +│ ├── __init__.py +│ ├── call_llm.py +│ └── search_web.py +├── requirements.txt +└── docs/ + └── design.md +``` + +- **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*. +- **`utils/`**: Contains all utility functions. + - It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. + - Each file should also include a `main()` function to try that API call +- **`nodes.py`**: Contains all the node definitions. + ```python + # nodes.py + from pocketflow import Node + from utils.call_llm import call_llm + + class GetQuestionNode(Node): + def exec(self, _): + # Get question directly from user input + user_question = input("Enter your question: ") + return user_question + + def post(self, shared, prep_res, exec_res): + # Store the user's question + shared["question"] = exec_res + return "default" # Go to the next node + + class AnswerNode(Node): + def prep(self, shared): + # Read question from shared + return shared["question"] + + def exec(self, question): + # Call LLM to get the answer + return call_llm(question) + + def post(self, shared, prep_res, exec_res): + # Store the answer in shared + shared["answer"] = exec_res + ``` +- **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them. + ```python + # flow.py + from pocketflow import Flow + from nodes import GetQuestionNode, AnswerNode + + def create_qa_flow(): + """Create and return a question-answering flow.""" + # Create nodes + get_question_node = GetQuestionNode() + answer_node = AnswerNode() + + # Connect nodes in sequence + get_question_node >> answer_node + + # Create flow starting with input node + return Flow(start=get_question_node) + ``` +- **`main.py`**: Serves as the project's entry point. + ```python + # main.py + from flow import create_qa_flow + + # Example main function + # Please replace this with your own main function + def main(): + shared = { + "question": None, # Will be populated by GetQuestionNode from user input + "answer": None # Will be populated by AnswerNode + } + + # Create the flow and run it + qa_flow = create_qa_flow() + qa_flow.run(shared) + print(f"Question: {shared['question']}") + print(f"Answer: {shared['answer']}") + + if __name__ == "__main__": + main() + ``` diff --git a/.cursor/rules/index.mdc b/.cursor/rules/index.mdc new file mode 100644 index 0000000..1cd3cb2 --- /dev/null +++ b/.cursor/rules/index.mdc @@ -0,0 +1,62 @@ +--- +description: Guidelines for using PocketFlow, a minimalist LLM framework +globs: **/*.py +alwaysApply: true +--- +# Pocket Flow + +A [100-line](mdc:https:/github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. + +- **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in. +- **Expressive**: Everything you love from larger frameworks—([Multi-])[Agents], [Workflow], [RAG], and more. +- **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications. + + + + +## Core Abstraction + +We model the LLM workflow as a **Graph + Shared Store**: + +- [Node] handles simple (LLM) tasks. +- [Flow] connects nodes through **Actions** (labeled edges). +- [Shared Store] enables communication between nodes within flows. +- [Batch] nodes/flows allow for data-intensive tasks. +- [Async] nodes/flows allow waiting for asynchronous tasks. +- [(Advanced) Parallel] nodes/flows handle I/O-bound tasks. + + + +## Design Pattern + +From there, it’s easy to implement popular design patterns: + +- [Agent] autonomously makes decisions. +- [Workflow] chains multiple tasks into pipelines. +- [RAG] integrates data retrieval with generation. +- [Map Reduce] splits data tasks into Map and Reduce steps. +- [Structured Output] formats outputs consistently. +- [(Advanced) Multi-Agents] coordinate multiple agents. + + + +## Utility Function + +We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*: + +- [LLM Wrapper] +- [Viz and Debug] +- [Web Search] +- [Chunking] +- [Embedding] +- [Vector Databases] +- [Text-to-Speech] + +**Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework: +- *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs. +- *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally. +- *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in. + +## Ready to build your Apps? + +Check out [Agentic Coding Guidance], the fastest way to develop LLM projects with Pocket Flow! diff --git a/.cursor/rules/utility_function/chunking.mdc b/.cursor/rules/utility_function/chunking.mdc new file mode 100644 index 0000000..c3ac3d4 --- /dev/null +++ b/.cursor/rules/utility_function/chunking.mdc @@ -0,0 +1,52 @@ +--- +description: Guidelines for using PocketFlow, Utility Function, Text Chunking +globs: +alwaysApply: false +--- +# Text Chunking + +We recommend some implementations of commonly used text chunking approaches. + + +> Text Chunking is more a micro optimization, compared to the Flow Design. +> +> It's recommended to start with the Naive Chunking and optimize later. +{: .best-practice } + +--- + +## Example Python Code Samples + +### 1. Naive (Fixed-Size) Chunking +Splits text by a fixed number of words, ignoring sentence or semantic boundaries. + +```python +def fixed_size_chunk(text, chunk_size=100): + chunks = [] + for i in range(0, len(text), chunk_size): + chunks.append(text[i : i + chunk_size]) + return chunks +``` + +However, sentences are often cut awkwardly, losing coherence. + +### 2. Sentence-Based Chunking + +```python +import nltk + +def sentence_based_chunk(text, max_sentences=2): + sentences = nltk.sent_tokenize(text) + chunks = [] + for i in range(0, len(sentences), max_sentences): + chunks.append(" ".join(sentences[i : i + max_sentences])) + return chunks +``` + +However, might not handle very long sentences or paragraphs well. + +### 3. Other Chunking + +- **Paragraph-Based**: Split text by paragraphs (e.g., newlines). Large paragraphs can create big chunks. +- **Semantic**: Use embeddings or topic modeling to chunk by semantic boundaries. +- **Agentic**: Use an LLM to decide chunk boundaries based on context or meaning. \ No newline at end of file diff --git a/.cursor/rules/utility_function/embedding.mdc b/.cursor/rules/utility_function/embedding.mdc new file mode 100644 index 0000000..eb42efe --- /dev/null +++ b/.cursor/rules/utility_function/embedding.mdc @@ -0,0 +1,115 @@ +--- +description: Guidelines for using PocketFlow, Utility Function, Embedding +globs: +alwaysApply: false +--- +# Embedding + +Below you will find an overview table of various text embedding APIs, along with example Python code. + +> Embedding is more a micro optimization, compared to the Flow Design. +> +> It's recommended to start with the most convenient one and optimize later. +{: .best-practice } + + +| **API** | **Free Tier** | **Pricing Model** | **Docs** | +| --- | --- | --- | --- | +| **OpenAI** | ~$5 credit | ~$0.0001/1K tokens | [OpenAI Embeddings](https://platform.openai.com/docs/api-reference/embeddings) | +| **Azure OpenAI** | $200 credit | Same as OpenAI (~$0.0001/1K tokens) | [Azure OpenAI Embeddings](https://learn.microsoft.com/azure/cognitive-services/openai/how-to/create-resource?tabs=portal) | +| **Google Vertex AI** | $300 credit | ~$0.025 / million chars | [Vertex AI Embeddings](https://cloud.google.com/vertex-ai/docs/generative-ai/embeddings/get-text-embeddings) | +| **AWS Bedrock** | No free tier, but AWS credits may apply | ~$0.00002/1K tokens (Titan V2) | [Amazon Bedrock](https://docs.aws.amazon.com/bedrock/) | +| **Cohere** | Limited free tier | ~$0.0001/1K tokens | [Cohere Embeddings](https://docs.cohere.com/docs/cohere-embed) | +| **Hugging Face** | ~$0.10 free compute monthly | Pay per second of compute | [HF Inference API](https://huggingface.co/docs/api-inference) | +| **Jina** | 1M tokens free | Pay per token after | [Jina Embeddings](https://jina.ai/embeddings/) | + +## Example Python Code + +### 1. OpenAI +```python +from openai import OpenAI + +client = OpenAI(api_key="YOUR_API_KEY") +response = client.embeddings.create( + model="text-embedding-ada-002", + input=text +) + +# Extract the embedding vector from the response +embedding = response.data[0].embedding +embedding = np.array(embedding, dtype=np.float32) +print(embedding) +``` + +### 2. Azure OpenAI +```python +import openai + +openai.api_type = "azure" +openai.api_base = "https://YOUR_RESOURCE_NAME.openai.azure.com" +openai.api_version = "2023-03-15-preview" +openai.api_key = "YOUR_AZURE_API_KEY" + +resp = openai.Embedding.create(engine="ada-embedding", input="Hello world") +vec = resp["data"][0]["embedding"] +print(vec) +``` + +### 3. Google Vertex AI +```python +from vertexai.preview.language_models import TextEmbeddingModel +import vertexai + +vertexai.init(project="YOUR_GCP_PROJECT_ID", location="us-central1") +model = TextEmbeddingModel.from_pretrained("textembedding-gecko@001") + +emb = model.get_embeddings(["Hello world"]) +print(emb[0]) +``` + +### 4. AWS Bedrock +```python +import boto3, json + +client = boto3.client("bedrock-runtime", region_name="us-east-1") +body = {"inputText": "Hello world"} +resp = client.invoke_model(modelId="amazon.titan-embed-text-v2:0", contentType="application/json", body=json.dumps(body)) +resp_body = json.loads(resp["body"].read()) +vec = resp_body["embedding"] +print(vec) +``` + +### 5. Cohere +```python +import cohere + +co = cohere.Client("YOUR_API_KEY") +resp = co.embed(texts=["Hello world"]) +vec = resp.embeddings[0] +print(vec) +``` + +### 6. Hugging Face +```python +import requests + +API_URL = "https://api-inference.huggingface.co/models/sentence-transformers/all-MiniLM-L6-v2" +HEADERS = {"Authorization": "Bearer YOUR_HF_TOKEN"} + +res = requests.post(API_URL, headers=HEADERS, json={"inputs": "Hello world"}) +vec = res.json()[0] +print(vec) +``` + +### 7. Jina +```python +import requests + +url = "https://api.jina.ai/v2/embed" +headers = {"Authorization": "Bearer YOUR_JINA_TOKEN"} +payload = {"data": ["Hello world"], "model": "jina-embeddings-v3"} +res = requests.post(url, headers=headers, json=payload) +vec = res.json()["data"][0]["embedding"] +print(vec) +``` + diff --git a/.cursor/rules/utility_function/llm.mdc b/.cursor/rules/utility_function/llm.mdc new file mode 100644 index 0000000..5a1ee20 --- /dev/null +++ b/.cursor/rules/utility_function/llm.mdc @@ -0,0 +1,156 @@ +--- +description: Guidelines for using PocketFlow, Utility Function, LLM Wrapper +globs: +alwaysApply: false +--- +# LLM Wrappers + +Check out libraries like [litellm](https://github.com/BerriAI/litellm). +Here, we provide some minimal example implementations: + +1. OpenAI + ```python + def call_llm(prompt): + from openai import OpenAI + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + + # Example usage + call_llm("How are you?") + ``` + > Store the API key in an environment variable like OPENAI_API_KEY for security. + {: .best-practice } + +2. Claude (Anthropic) + ```python + def call_llm(prompt): + from anthropic import Anthropic + client = Anthropic(api_key="YOUR_API_KEY_HERE") + r = client.messages.create( + model="claude-3-7-sonnet-20250219", + max_tokens=3000, + messages=[ + {"role": "user", "content": prompt} + ] + ) + return r.content[0].text + ``` + +3. Google (Generative AI Studio / PaLM API) + ```python + def call_llm(prompt): + import google.generativeai as genai + genai.configure(api_key="YOUR_API_KEY_HERE") + r = genai.generate_text( + model="models/text-bison-001", + prompt=prompt + ) + return r.result + ``` + +4. Azure (Azure OpenAI) + ```python + def call_llm(prompt): + from openai import AzureOpenAI + client = AzureOpenAI( + azure_endpoint="https://.openai.azure.com/", + api_key="YOUR_API_KEY_HERE", + api_version="2023-05-15" + ) + r = client.chat.completions.create( + model="", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + ``` + +5. Ollama (Local LLM) + ```python + def call_llm(prompt): + from ollama import chat + response = chat( + model="llama2", + messages=[{"role": "user", "content": prompt}] + ) + return response.message.content + ``` + +6. DeepSeek + ```python + def call_llm(prompt): + from openai import OpenAI + client = OpenAI(api_key="YOUR_DEEPSEEK_API_KEY", base_url="https://api.deepseek.com") + r = client.chat.completions.create( + model="deepseek-chat", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + ``` + + +## Improvements +Feel free to enhance your `call_llm` function as needed. Here are examples: + +- Handle chat history: + +```python +def call_llm(messages): + from openai import OpenAI + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4o", + messages=messages + ) + return r.choices[0].message.content +``` + +- Add in-memory caching + +```python +from functools import lru_cache + +@lru_cache(maxsize=1000) +def call_llm(prompt): + # Your implementation here + pass +``` + +> ⚠️ Caching conflicts with Node retries, as retries yield the same result. +> +> To address this, you could use cached results only if not retried. +{: .warning } + + +```python +from functools import lru_cache + +@lru_cache(maxsize=1000) +def cached_call(prompt): + pass + +def call_llm(prompt, use_cache): + if use_cache: + return cached_call(prompt) + # Call the underlying function directly + return cached_call.__wrapped__(prompt) + +class SummarizeNode(Node): + def exec(self, text): + return call_llm(f"Summarize: {text}", self.cur_retry==0) +``` + +- Enable logging: + +```python +def call_llm(prompt): + import logging + logging.info(f"Prompt: {prompt}") + response = ... # Your implementation here + logging.info(f"Response: {response}") + return response +``` + diff --git a/.cursor/rules/utility_function/text_to_speech.mdc b/.cursor/rules/utility_function/text_to_speech.mdc new file mode 100644 index 0000000..a032642 --- /dev/null +++ b/.cursor/rules/utility_function/text_to_speech.mdc @@ -0,0 +1,105 @@ +--- +description: Guidelines for using PocketFlow, Utility Function, Text-to-Speech +globs: +alwaysApply: false +--- +# Text-to-Speech + +| **Service** | **Free Tier** | **Pricing Model** | **Docs** | +|----------------------|-----------------------|--------------------------------------------------------------|---------------------------------------------------------------------| +| **Amazon Polly** | 5M std + 1M neural | ~$4 /M (std), ~$16 /M (neural) after free tier | [Polly Docs](https://aws.amazon.com/polly/) | +| **Google Cloud TTS** | 4M std + 1M WaveNet | ~$4 /M (std), ~$16 /M (WaveNet) pay-as-you-go | [Cloud TTS Docs](https://cloud.google.com/text-to-speech) | +| **Azure TTS** | 500K neural ongoing | ~$15 /M (neural), discount at higher volumes | [Azure TTS Docs](https://azure.microsoft.com/products/cognitive-services/text-to-speech/) | +| **IBM Watson TTS** | 10K chars Lite plan | ~$0.02 /1K (i.e. ~$20 /M). Enterprise options available | [IBM Watson Docs](https://www.ibm.com/cloud/watson-text-to-speech) | +| **ElevenLabs** | 10K chars monthly | From ~$5/mo (30K chars) up to $330/mo (2M chars). Enterprise | [ElevenLabs Docs](https://elevenlabs.io) | + +## Example Python Code + +### Amazon Polly +```python +import boto3 + +polly = boto3.client("polly", region_name="us-east-1", + aws_access_key_id="YOUR_AWS_ACCESS_KEY_ID", + aws_secret_access_key="YOUR_AWS_SECRET_ACCESS_KEY") + +resp = polly.synthesize_speech( + Text="Hello from Polly!", + OutputFormat="mp3", + VoiceId="Joanna" +) + +with open("polly.mp3", "wb") as f: + f.write(resp["AudioStream"].read()) +``` + +### Google Cloud TTS +```python +from google.cloud import texttospeech + +client = texttospeech.TextToSpeechClient() +input_text = texttospeech.SynthesisInput(text="Hello from Google Cloud TTS!") +voice = texttospeech.VoiceSelectionParams(language_code="en-US") +audio_cfg = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) + +resp = client.synthesize_speech(input=input_text, voice=voice, audio_config=audio_cfg) + +with open("gcloud_tts.mp3", "wb") as f: + f.write(resp.audio_content) +``` + +### Azure TTS +```python +import azure.cognitiveservices.speech as speechsdk + +speech_config = speechsdk.SpeechConfig( + subscription="AZURE_KEY", region="AZURE_REGION") +audio_cfg = speechsdk.audio.AudioConfig(filename="azure_tts.wav") + +synthesizer = speechsdk.SpeechSynthesizer( + speech_config=speech_config, + audio_config=audio_cfg +) + +synthesizer.speak_text_async("Hello from Azure TTS!").get() +``` + +### IBM Watson TTS +```python +from ibm_watson import TextToSpeechV1 +from ibm_cloud_sdk_core.authenticators import IAMAuthenticator + +auth = IAMAuthenticator("IBM_API_KEY") +service = TextToSpeechV1(authenticator=auth) +service.set_service_url("IBM_SERVICE_URL") + +resp = service.synthesize( + "Hello from IBM Watson!", + voice="en-US_AllisonV3Voice", + accept="audio/mp3" +).get_result() + +with open("ibm_tts.mp3", "wb") as f: + f.write(resp.content) +``` + +### ElevenLabs +```python +import requests + +api_key = "ELEVENLABS_KEY" +voice_id = "ELEVENLABS_VOICE" +url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" +headers = {"xi-api-key": api_key, "Content-Type": "application/json"} + +json_data = { + "text": "Hello from ElevenLabs!", + "voice_settings": {"stability": 0.75, "similarity_boost": 0.75} +} + +resp = requests.post(url, headers=headers, json=json_data) + +with open("elevenlabs.mp3", "wb") as f: + f.write(resp.content) +``` + diff --git a/.cursor/rules/utility_function/vector.mdc b/.cursor/rules/utility_function/vector.mdc new file mode 100644 index 0000000..d0db6e5 --- /dev/null +++ b/.cursor/rules/utility_function/vector.mdc @@ -0,0 +1,216 @@ +--- +description: Guidelines for using PocketFlow, Utility Function, Vector Databases +globs: +alwaysApply: false +--- +# Vector Databases + + +Below is a table of the popular vector search solutions: + +| **Tool** | **Free Tier** | **Pricing Model** | **Docs** | +| --- | --- | --- | --- | +| **FAISS** | N/A, self-host | Open-source | [Faiss.ai](https://faiss.ai) | +| **Pinecone** | 2GB free | From $25/mo | [pinecone.io](https://pinecone.io) | +| **Qdrant** | 1GB free cloud | Pay-as-you-go | [qdrant.tech](https://qdrant.tech) | +| **Weaviate** | 14-day sandbox | From $25/mo | [weaviate.io](https://weaviate.io) | +| **Milvus** | 5GB free cloud | PAYG or $99/mo dedicated | [milvus.io](https://milvus.io) | +| **Chroma** | N/A, self-host | Free (Apache 2.0) | [trychroma.com](https://trychroma.com) | +| **Redis** | 30MB free | From $5/mo | [redis.io](https://redis.io) | + +--- +## Example Python Code + +Below are basic usage snippets for each tool. + +### FAISS +```python +import faiss +import numpy as np + +# Dimensionality of embeddings +d = 128 + +# Create a flat L2 index +index = faiss.IndexFlatL2(d) + +# Random vectors +data = np.random.random((1000, d)).astype('float32') +index.add(data) + +# Query +query = np.random.random((1, d)).astype('float32') +D, I = index.search(query, k=5) + +print("Distances:", D) +print("Neighbors:", I) +``` + +### Pinecone +```python +import pinecone + +pinecone.init(api_key="YOUR_API_KEY", environment="YOUR_ENV") + +index_name = "my-index" + +# Create the index if it doesn't exist +if index_name not in pinecone.list_indexes(): + pinecone.create_index(name=index_name, dimension=128) + +# Connect +index = pinecone.Index(index_name) + +# Upsert +vectors = [ + ("id1", [0.1]*128), + ("id2", [0.2]*128) +] +index.upsert(vectors) + +# Query +response = index.query([[0.15]*128], top_k=3) +print(response) +``` + +### Qdrant +```python +import qdrant_client +from qdrant_client.models import Distance, VectorParams, PointStruct + +client = qdrant_client.QdrantClient( + url="https://YOUR-QDRANT-CLOUD-ENDPOINT", + api_key="YOUR_API_KEY" +) + +collection = "my_collection" +client.recreate_collection( + collection_name=collection, + vectors_config=VectorParams(size=128, distance=Distance.COSINE) +) + +points = [ + PointStruct(id=1, vector=[0.1]*128, payload={"type": "doc1"}), + PointStruct(id=2, vector=[0.2]*128, payload={"type": "doc2"}), +] + +client.upsert(collection_name=collection, points=points) + +results = client.search( + collection_name=collection, + query_vector=[0.15]*128, + limit=2 +) +print(results) +``` + +### Weaviate +```python +import weaviate + +client = weaviate.Client("https://YOUR-WEAVIATE-CLOUD-ENDPOINT") + +schema = { + "classes": [ + { + "class": "Article", + "vectorizer": "none" + } + ] +} +client.schema.create(schema) + +obj = { + "title": "Hello World", + "content": "Weaviate vector search" +} +client.data_object.create(obj, "Article", vector=[0.1]*128) + +resp = ( + client.query + .get("Article", ["title", "content"]) + .with_near_vector({"vector": [0.15]*128}) + .with_limit(3) + .do() +) +print(resp) +``` + +### Milvus +```python +from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection +import numpy as np + +connections.connect(alias="default", host="localhost", port="19530") + +fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128) +] +schema = CollectionSchema(fields) +collection = Collection("MyCollection", schema) + +emb = np.random.rand(10, 128).astype('float32') +ids = list(range(10)) +collection.insert([ids, emb]) + +index_params = { + "index_type": "IVF_FLAT", + "params": {"nlist": 128}, + "metric_type": "L2" +} +collection.create_index("embedding", index_params) +collection.load() + +query_emb = np.random.rand(1, 128).astype('float32') +results = collection.search(query_emb, "embedding", param={"nprobe": 10}, limit=3) +print(results) +``` + +### Chroma +```python +import chromadb +from chromadb.config import Settings + +client = chromadb.Client(Settings( + chroma_db_impl="duckdb+parquet", + persist_directory="./chroma_data" +)) + +coll = client.create_collection("my_collection") + +vectors = [[0.1, 0.2, 0.3], [0.2, 0.2, 0.2]] +metas = [{"doc": "text1"}, {"doc": "text2"}] +ids = ["id1", "id2"] +coll.add(embeddings=vectors, metadatas=metas, ids=ids) + +res = coll.query(query_embeddings=[[0.15, 0.25, 0.3]], n_results=2) +print(res) +``` + +### Redis +```python +import redis +import struct + +r = redis.Redis(host="localhost", port=6379) + +# Create index +r.execute_command( + "FT.CREATE", "my_idx", "ON", "HASH", + "SCHEMA", "embedding", "VECTOR", "FLAT", "6", + "TYPE", "FLOAT32", "DIM", "128", + "DISTANCE_METRIC", "L2" +) + +# Insert +vec = struct.pack('128f', *[0.1]*128) +r.hset("doc1", mapping={"embedding": vec}) + +# Search +qvec = struct.pack('128f', *[0.15]*128) +q = "*=>[KNN 3 @embedding $BLOB AS dist]" +res = r.ft("my_idx").search(q, query_params={"BLOB": qvec}) +print(res.docs) +``` + diff --git a/.cursor/rules/utility_function/viz.mdc b/.cursor/rules/utility_function/viz.mdc new file mode 100644 index 0000000..254362c --- /dev/null +++ b/.cursor/rules/utility_function/viz.mdc @@ -0,0 +1,138 @@ +--- +description: Guidelines for using PocketFlow, Utility Function, Viz and Debug +globs: +alwaysApply: false +--- +# Visualization and Debugging + +Similar to LLM wrappers, we **don't** provide built-in visualization and debugging. Here, we recommend some *minimal* (and incomplete) implementations These examples can serve as a starting point for your own tooling. + +## 1. Visualization with Mermaid + +This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization. + +{% raw %} +```python +def build_mermaid(start): + ids, visited, lines = {}, set(), ["graph LR"] + ctr = 1 + def get_id(n): + nonlocal ctr + return ids[n] if n in ids else (ids.setdefault(n, f"N{ctr}"), (ctr := ctr + 1))[0] + def link(a, b): + lines.append(f" {a} --> {b}") + def walk(node, parent=None): + if node in visited: + return parent and link(parent, get_id(node)) + visited.add(node) + if isinstance(node, Flow): + node.start and parent and link(parent, get_id(node.start)) + lines.append(f"\n subgraph sub_flow_{get_id(node)}[{type(node).__name__}]") + node.start and walk(node.start) + for nxt in node.successors.values(): + node.start and walk(nxt, get_id(node.start)) or (parent and link(parent, get_id(nxt))) or walk(nxt) + lines.append(" end\n") + else: + lines.append(f" {(nid := get_id(node))}['{type(node).__name__}']") + parent and link(parent, nid) + [walk(nxt, nid) for nxt in node.successors.values()] + walk(start) + return "\n".join(lines) +``` +{% endraw %} + + +For example, suppose we have a complex Flow for data science: + +```python +class DataPrepBatchNode(BatchNode): + def prep(self,shared): return [] +class ValidateDataNode(Node): pass +class FeatureExtractionNode(Node): pass +class TrainModelNode(Node): pass +class EvaluateModelNode(Node): pass +class ModelFlow(Flow): pass +class DataScienceFlow(Flow):pass + +feature_node = FeatureExtractionNode() +train_node = TrainModelNode() +evaluate_node = EvaluateModelNode() +feature_node >> train_node >> evaluate_node +model_flow = ModelFlow(start=feature_node) +data_prep_node = DataPrepBatchNode() +validate_node = ValidateDataNode() +data_prep_node >> validate_node >> model_flow +data_science_flow = DataScienceFlow(start=data_prep_node) +result = build_mermaid(start=data_science_flow) +``` + +The code generates a Mermaid diagram: + +```mermaid +graph LR + subgraph sub_flow_N1[DataScienceFlow] + N2['DataPrepBatchNode'] + N3['ValidateDataNode'] + N2 --> N3 + N3 --> N4 + + subgraph sub_flow_N5[ModelFlow] + N4['FeatureExtractionNode'] + N6['TrainModelNode'] + N4 --> N6 + N7['EvaluateModelNode'] + N6 --> N7 + end + + end +``` + +## 2. Call Stack Debugging + +It would be useful to print the Node call stacks for debugging. This can be achieved by inspecting the runtime call stack: + +```python +import inspect + +def get_node_call_stack(): + stack = inspect.stack() + node_names = [] + seen_ids = set() + for frame_info in stack[1:]: + local_vars = frame_info.frame.f_locals + if 'self' in local_vars: + caller_self = local_vars['self'] + if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids: + seen_ids.add(id(caller_self)) + node_names.append(type(caller_self).__name__) + return node_names +``` + +For example, suppose we have a complex Flow for data science: + +```python +class DataPrepBatchNode(BatchNode): + def prep(self, shared): return [] +class ValidateDataNode(Node): pass +class FeatureExtractionNode(Node): pass +class TrainModelNode(Node): pass +class EvaluateModelNode(Node): + def prep(self, shared): + stack = get_node_call_stack() + print("Call stack:", stack) +class ModelFlow(Flow): pass +class DataScienceFlow(Flow):pass + +feature_node = FeatureExtractionNode() +train_node = TrainModelNode() +evaluate_node = EvaluateModelNode() +feature_node >> train_node >> evaluate_node +model_flow = ModelFlow(start=feature_node) +data_prep_node = DataPrepBatchNode() +validate_node = ValidateDataNode() +data_prep_node >> validate_node >> model_flow +data_science_flow = DataScienceFlow(start=data_prep_node) +data_science_flow.run({}) +``` + +The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']` \ No newline at end of file diff --git a/.cursor/rules/utility_function/websearch.mdc b/.cursor/rules/utility_function/websearch.mdc new file mode 100644 index 0000000..2af4951 --- /dev/null +++ b/.cursor/rules/utility_function/websearch.mdc @@ -0,0 +1,112 @@ +--- +description: Guidelines for using PocketFlow, Utility Function, Web Search +globs: +alwaysApply: false +--- +# Web Search + +We recommend some implementations of commonly used web search tools. + +| **API** | **Free Tier** | **Pricing Model** | **Docs** | +|---------------------------------|-----------------------------------------------|-----------------------------------------------------------------|------------------------------------------------------------------------| +| **Google Custom Search JSON API** | 100 queries/day free | $5 per 1000 queries. | [Link](https://developers.google.com/custom-search/v1/overview) | +| **Bing Web Search API** | 1,000 queries/month | $15–$25 per 1,000 queries. | [Link](https://azure.microsoft.com/en-us/services/cognitive-services/bing-web-search-api/) | +| **DuckDuckGo Instant Answer** | Completely free (Instant Answers only, **no URLs**) | No paid plans; usage unlimited, but data is limited | [Link](https://duckduckgo.com/api) | +| **Brave Search API** | 2,000 queries/month free | $3 per 1k queries for Base, $5 per 1k for Pro | [Link](https://brave.com/search/api/) | +| **SerpApi** | 100 searches/month free | Start at $75/month for 5,000 searches| [Link](https://serpapi.com/) | +| **RapidAPI** | Many options | Many options | [Link](https://rapidapi.com/search?term=search&sortBy=ByRelevance) | + +## Example Python Code + +### 1. Google Custom Search JSON API +```python +import requests + +API_KEY = "YOUR_API_KEY" +CX_ID = "YOUR_CX_ID" +query = "example" + +url = "https://www.googleapis.com/customsearch/v1" +params = { + "key": API_KEY, + "cx": CX_ID, + "q": query +} + +response = requests.get(url, params=params) +results = response.json() +print(results) +``` + +### 2. Bing Web Search API +```python +import requests + +SUBSCRIPTION_KEY = "YOUR_BING_API_KEY" +query = "example" + +url = "https://api.bing.microsoft.com/v7.0/search" +headers = {"Ocp-Apim-Subscription-Key": SUBSCRIPTION_KEY} +params = {"q": query} + +response = requests.get(url, headers=headers, params=params) +results = response.json() +print(results) +``` + +### 3. DuckDuckGo Instant Answer +```python +import requests + +query = "example" +url = "https://api.duckduckgo.com/" +params = { + "q": query, + "format": "json" +} + +response = requests.get(url, params=params) +results = response.json() +print(results) +``` + +### 4. Brave Search API +```python +import requests + +SUBSCRIPTION_TOKEN = "YOUR_BRAVE_API_TOKEN" +query = "example" + +url = "https://api.search.brave.com/res/v1/web/search" +headers = { + "X-Subscription-Token": SUBSCRIPTION_TOKEN +} +params = { + "q": query +} + +response = requests.get(url, headers=headers, params=params) +results = response.json() +print(results) +``` + +### 5. SerpApi +```python +import requests + +API_KEY = "YOUR_SERPAPI_KEY" +query = "example" + +url = "https://serpapi.com/search" +params = { + "engine": "google", + "q": query, + "api_key": API_KEY +} + +response = requests.get(url, params=params) +results = response.json() +print(results) +``` + + diff --git a/.cursorrules b/.cursorrules deleted file mode 100644 index 64bdd49..0000000 --- a/.cursorrules +++ /dev/null @@ -1,1664 +0,0 @@ ---- -layout: default -title: "Agentic Coding" ---- - -# Agentic Coding: Humans Design, Agents code! - -> If you are an AI agent involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification. -{: .warning } - -## Agentic Coding Steps - -Agentic Coding should be a collaboration between Human System Design and Agent Implementation: - -| Steps | Human | AI | Comment | -|:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------| -| 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. | -| 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. | -| 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. | -| 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. | -| 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. | -| 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. | -| 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. | - -1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. - - Understand AI systems' strengths and limitations: - - **Good for**: Routine tasks requiring common sense (filling forms, replying to emails) - - **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL) - - **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning) - - **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features. - - **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early. - -2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes. - - Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)). - - For each node in the flow, start with a high-level one-line description of what it does. - - If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine). - - If using **Agent**, specify what are the inputs (context) and what are the possible actions. - - If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows. - - Outline the flow and draw it in a mermaid diagram. For example: - ```mermaid - flowchart LR - start[Start] --> batch[Batch] - batch --> check[Check] - check -->|OK| process - check -->|Error| fix[Fix] - fix --> check - - subgraph process[Process] - step1[Step 1] --> step2[Step 2] - end - - process --> endNode[End] - ``` - - > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition. - {: .best-practice } - -3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions. - - Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world: -
- - - Reading inputs (e.g., retrieving Slack messages, reading emails) - - Writing outputs (e.g., generating reports, sending emails) - - Using external tools (e.g., calling LLMs, searching the web) - - **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system. - - For each utility function, implement it and write a simple test. - - Document their input/output, as well as why they are necessary. For example: - - `name`: `get_embedding` (`utils/get_embedding.py`) - - `input`: `str` - - `output`: a vector of 3072 floats - - `necessity`: Used by the second node to embed text - - Example utility implementation: - ```python - # utils/call_llm.py - from openai import OpenAI - - def call_llm(prompt): - client = OpenAI(api_key="YOUR_API_KEY_HERE") - r = client.chat.completions.create( - model="gpt-4o", - messages=[{"role": "user", "content": prompt}] - ) - return r.choices[0].message.content - - if __name__ == "__main__": - prompt = "What is the meaning of life?" - print(call_llm(prompt)) - ``` - - > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. - {: .best-practice } - -4. **Node Design**: Plan how each node will read and write data, and use utility functions. - - One core design principle for PocketFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design: - - For simple systems, use an in-memory dictionary. - - For more complex systems or when persistence is required, use a database. - - **Don't Repeat Yourself**: Use in-memory references or foreign keys. - - Example shared store design: - ```python - shared = { - "user": { - "id": "user123", - "context": { # Another nested dict - "weather": {"temp": 72, "condition": "sunny"}, - "location": "San Francisco" - } - }, - "results": {} # Empty dict to store outputs - } - ``` - - For each [Node](./core_abstraction/node.md), describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: - - `type`: Regular (or Batch, or Async) - - `prep`: Read "text" from the shared store - - `exec`: Call the embedding utility function - - `post`: Write "embedding" to the shared store - -5. **Implementation**: Implement the initial nodes and flows based on the design. - - 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins! - - **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking. - - **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system. - - Add logging throughout the code to facilitate debugging. - -7. **Optimization**: - - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. - - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. - - If your flow design is already solid, move on to micro-optimizations: - - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. - - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. - - - > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times. - > - >
- {: .best-practice } - -8. **Reliability** - - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. - - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. - - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. - -## Example LLM Project File Structure - -``` -my_project/ -├── main.py -├── nodes.py -├── flow.py -├── utils/ -│ ├── __init__.py -│ ├── call_llm.py -│ └── search_web.py -├── requirements.txt -└── docs/ - └── design.md -``` - -- **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*. -- **`utils/`**: Contains all utility functions. - - It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. - - Each file should also include a `main()` function to try that API call -- **`nodes.py`**: Contains all the node definitions. - ```python - # nodes.py - from pocketflow import Node - from utils.call_llm import call_llm - - class GetQuestionNode(Node): - def exec(self, _): - # Get question directly from user input - user_question = input("Enter your question: ") - return user_question - - def post(self, shared, prep_res, exec_res): - # Store the user's question - shared["question"] = exec_res - return "default" # Go to the next node - - class AnswerNode(Node): - def prep(self, shared): - # Read question from shared - return shared["question"] - - def exec(self, question): - # Call LLM to get the answer - return call_llm(question) - - def post(self, shared, prep_res, exec_res): - # Store the answer in shared - shared["answer"] = exec_res - ``` -- **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them. - ```python - # flow.py - from pocketflow import Flow - from nodes import GetQuestionNode, AnswerNode - - def create_qa_flow(): - """Create and return a question-answering flow.""" - # Create nodes - get_question_node = GetQuestionNode() - answer_node = AnswerNode() - - # Connect nodes in sequence - get_question_node >> answer_node - - # Create flow starting with input node - return Flow(start=get_question_node) - ``` -- **`main.py`**: Serves as the project's entry point. - ```python - # main.py - from flow import create_qa_flow - - # Example main function - # Please replace this with your own main function - def main(): - shared = { - "question": None, # Will be populated by GetQuestionNode from user input - "answer": None # Will be populated by AnswerNode - } - - # Create the flow and run it - qa_flow = create_qa_flow() - qa_flow.run(shared) - print(f"Question: {shared['question']}") - print(f"Answer: {shared['answer']}") - - if __name__ == "__main__": - main() - ``` - -================================================ -File: docs/index.md -================================================ ---- -layout: default -title: "Home" -nav_order: 1 ---- - -# Pocket Flow - -A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. - -- **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in. -- **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more. -- **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications. - -
- -
- -## Core Abstraction - -We model the LLM workflow as a **Graph + Shared Store**: - -- [Node](./core_abstraction/node.md) handles simple (LLM) tasks. -- [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges). -- [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows. -- [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks. -- [Async](./core_abstraction/async.md) nodes/flows allow waiting for asynchronous tasks. -- [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks. - -
- -
- -## Design Pattern - -From there, it’s easy to implement popular design patterns: - -- [Agent](./design_pattern/agent.md) autonomously makes decisions. -- [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines. -- [RAG](./design_pattern/rag.md) integrates data retrieval with generation. -- [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps. -- [Structured Output](./design_pattern/structure.md) formats outputs consistently. -- [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents. - -
- -
- -## Utility Function - -We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*: - -- [LLM Wrapper](./utility_function/llm.md) -- [Viz and Debug](./utility_function/viz.md) -- [Web Search](./utility_function/websearch.md) -- [Chunking](./utility_function/chunking.md) -- [Embedding](./utility_function/embedding.md) -- [Vector Databases](./utility_function/vector.md) -- [Text-to-Speech](./utility_function/text_to_speech.md) - -**Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework: -- *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs. -- *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally. -- *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in. - -## Ready to build your Apps? - -Check out [Agentic Coding Guidance](./guide.md), the fastest way to develop LLM projects with Pocket Flow! - -================================================ -File: docs/core_abstraction/async.md -================================================ ---- -layout: default -title: "(Advanced) Async" -parent: "Core Abstraction" -nav_order: 5 ---- - -# (Advanced) Async - -**Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: - -1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. -2. **exec_async()**: Typically used for async LLM calls. -3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. - -**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. - -### Example - -```python -class SummarizeThenVerify(AsyncNode): - async def prep_async(self, shared): - # Example: read a file asynchronously - doc_text = await read_file_async(shared["doc_path"]) - return doc_text - - async def exec_async(self, prep_res): - # Example: async LLM call - summary = await call_llm_async(f"Summarize: {prep_res}") - return summary - - async def post_async(self, shared, prep_res, exec_res): - # Example: wait for user feedback - decision = await gather_user_feedback(exec_res) - if decision == "approve": - shared["summary"] = exec_res - return "approve" - return "deny" - -summarize_node = SummarizeThenVerify() -final_node = Finalize() - -# Define transitions -summarize_node - "approve" >> final_node -summarize_node - "deny" >> summarize_node # retry - -flow = AsyncFlow(start=summarize_node) - -async def main(): - shared = {"doc_path": "document.txt"} - await flow.run_async(shared) - print("Final Summary:", shared.get("summary")) - -asyncio.run(main()) -``` - -================================================ -File: docs/core_abstraction/batch.md -================================================ ---- -layout: default -title: "Batch" -parent: "Core Abstraction" -nav_order: 4 ---- - -# Batch - -**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: -- **Chunk-based** processing (e.g., splitting large texts). -- **Iterative** processing over lists of input items (e.g., user queries, files, URLs). - -## 1. BatchNode - -A **BatchNode** extends `Node` but changes `prep()` and `exec()`: - -- **`prep(shared)`**: returns an **iterable** (e.g., list, generator). -- **`exec(item)`**: called **once** per item in that iterable. -- **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. - - -### Example: Summarize a Large File - -```python -class MapSummaries(BatchNode): - def prep(self, shared): - # Suppose we have a big file; chunk it - content = shared["data"] - chunk_size = 10000 - chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] - return chunks - - def exec(self, chunk): - prompt = f"Summarize this chunk in 10 words: {chunk}" - summary = call_llm(prompt) - return summary - - def post(self, shared, prep_res, exec_res_list): - combined = "\n".join(exec_res_list) - shared["summary"] = combined - return "default" - -map_summaries = MapSummaries() -flow = Flow(start=map_summaries) -flow.run(shared) -``` - ---- - -## 2. BatchFlow - -A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. - -### Example: Summarize Many Files - -```python -class SummarizeAllFiles(BatchFlow): - def prep(self, shared): - # Return a list of param dicts (one per file) - filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] - return [{"filename": fn} for fn in filenames] - -# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): -summarize_file = SummarizeFile(start=load_file) - -# Wrap that flow into a BatchFlow: -summarize_all_files = SummarizeAllFiles(start=summarize_file) -summarize_all_files.run(shared) -``` - -### Under the Hood -1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. -2. The **BatchFlow** loops through each dict. For each one: - - It merges the dict with the BatchFlow’s own `params`. - - It calls `flow.run(shared)` using the merged result. -3. This means the sub-Flow is run **repeatedly**, once for every param dict. - ---- - -## 3. Nested or Multi-Level Batches - -You can nest a **BatchFlow** in another **BatchFlow**. For instance: -- **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). -- **Inner** batch: returning a list of per-file param dicts. - -At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. - -```python - -class FileBatchFlow(BatchFlow): - def prep(self, shared): - directory = self.params["directory"] - # e.g., files = ["file1.txt", "file2.txt", ...] - files = [f for f in os.listdir(directory) if f.endswith(".txt")] - return [{"filename": f} for f in files] - -class DirectoryBatchFlow(BatchFlow): - def prep(self, shared): - directories = [ "/path/to/dirA", "/path/to/dirB"] - return [{"directory": d} for d in directories] - -# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} -inner_flow = FileBatchFlow(start=MapSummaries()) -outer_flow = DirectoryBatchFlow(start=inner_flow) -``` - -================================================ -File: docs/core_abstraction/communication.md -================================================ ---- -layout: default -title: "Communication" -parent: "Core Abstraction" -nav_order: 3 ---- - -# Communication - -Nodes and Flows **communicate** in 2 ways: - -1. **Shared Store (for almost all the cases)** - - - A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`). - - Great for data results, large content, or anything multiple nodes need. - - You shall design the data structure and populate it ahead. - - - > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md). - {: .best-practice } - -2. **Params (only for [Batch](./batch.md))** - - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. - - Good for identifiers like filenames or numeric IDs, in Batch mode. - -If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). - ---- - -## 1. Shared Store - -### Overview - -A shared store is typically an in-mem dictionary, like: -```python -shared = {"data": {}, "summary": {}, "config": {...}, ...} -``` - -It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. - -### Example - -```python -class LoadData(Node): - def post(self, shared, prep_res, exec_res): - # We write data to shared store - shared["data"] = "Some text content" - return None - -class Summarize(Node): - def prep(self, shared): - # We read data from shared store - return shared["data"] - - def exec(self, prep_res): - # Call LLM to summarize - prompt = f"Summarize: {prep_res}" - summary = call_llm(prompt) - return summary - - def post(self, shared, prep_res, exec_res): - # We write summary to shared store - shared["summary"] = exec_res - return "default" - -load_data = LoadData() -summarize = Summarize() -load_data >> summarize -flow = Flow(start=load_data) - -shared = {} -flow.run(shared) -``` - -Here: -- `LoadData` writes to `shared["data"]`. -- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. - ---- - -## 2. Params - -**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: -- **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`). -- **Set** via `set_params()`. -- **Cleared** and updated each time a parent Flow calls it. - -> Only set the uppermost Flow params because others will be overwritten by the parent Flow. -> -> If you need to set child node params, see [Batch](./batch.md). -{: .warning } - -Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. - -### Example - -```python -# 1) Create a Node that uses params -class SummarizeFile(Node): - def prep(self, shared): - # Access the node's param - filename = self.params["filename"] - return shared["data"].get(filename, "") - - def exec(self, prep_res): - prompt = f"Summarize: {prep_res}" - return call_llm(prompt) - - def post(self, shared, prep_res, exec_res): - filename = self.params["filename"] - shared["summary"][filename] = exec_res - return "default" - -# 2) Set params -node = SummarizeFile() - -# 3) Set Node params directly (for testing) -node.set_params({"filename": "doc1.txt"}) -node.run(shared) - -# 4) Create Flow -flow = Flow(start=node) - -# 5) Set Flow params (overwrites node params) -flow.set_params({"filename": "doc2.txt"}) -flow.run(shared) # The node summarizes doc2, not doc1 -``` - -================================================ -File: docs/core_abstraction/flow.md -================================================ ---- -layout: default -title: "Flow" -parent: "Core Abstraction" -nav_order: 2 ---- - -# Flow - -A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`. - -## 1. Action-based Transitions - -Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`. - -You define transitions with the syntax: - -1. **Basic default transition**: `node_a >> node_b` - This means if `node_a.post()` returns `"default"`, go to `node_b`. - (Equivalent to `node_a - "default" >> node_b`) - -2. **Named action transition**: `node_a - "action_name" >> node_b` - This means if `node_a.post()` returns `"action_name"`, go to `node_b`. - -It's possible to create loops, branching, or multi-step flows. - -## 2. Creating a Flow - -A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node. - -### Example: Simple Sequence - -Here's a minimal flow of two nodes in a chain: - -```python -node_a >> node_b -flow = Flow(start=node_a) -flow.run(shared) -``` - -- When you run the flow, it executes `node_a`. -- Suppose `node_a.post()` returns `"default"`. -- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. -- `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there. - -### Example: Branching & Looping - -Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: - -- `"approved"`: expense is approved, move to payment processing -- `"needs_revision"`: expense needs changes, send back for revision -- `"rejected"`: expense is denied, finish the process - -We can wire them like this: - -```python -# Define the flow connections -review - "approved" >> payment # If approved, process payment -review - "needs_revision" >> revise # If needs changes, go to revision -review - "rejected" >> finish # If rejected, finish the process - -revise >> review # After revision, go back for another review -payment >> finish # After payment, finish the process - -flow = Flow(start=review) -``` - -Let's see how it flows: - -1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node -2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review` -3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops - -```mermaid -flowchart TD - review[Review Expense] -->|approved| payment[Process Payment] - review -->|needs_revision| revise[Revise Report] - review -->|rejected| finish[Finish Process] - - revise --> review - payment --> finish -``` - -### Running Individual Nodes vs. Running a Flow - -- `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. -- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. - -> `node.run(shared)` **does not** proceed to the successor. -> This is mainly for debugging or testing a single node. -> -> Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. -{: .warning } - -## 3. Nested Flows - -A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: - -1. Use a Flow as a Node within another Flow's transitions. -2. Combine multiple smaller Flows into a larger Flow for reuse. -3. Node `params` will be a merging of **all** parents' `params`. - -### Flow's Node Methods - -A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - -- It **won't** run `exec()`, as its main logic is to orchestrate its nodes. -- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. - -### Basic Flow Nesting - -Here's how to connect a flow to another node: - -```python -# Create a sub-flow -node_a >> node_b -subflow = Flow(start=node_a) - -# Connect it to another node -subflow >> node_c - -# Create the parent flow -parent_flow = Flow(start=subflow) -``` - -When `parent_flow.run()` executes: -1. It starts `subflow` -2. `subflow` runs through its nodes (`node_a->node_b`) -3. After `subflow` completes, execution continues to `node_c` - -### Example: Order Processing Pipeline - -Here's a practical example that breaks down order processing into nested flows: - -```python -# Payment processing sub-flow -validate_payment >> process_payment >> payment_confirmation -payment_flow = Flow(start=validate_payment) - -# Inventory sub-flow -check_stock >> reserve_items >> update_inventory -inventory_flow = Flow(start=check_stock) - -# Shipping sub-flow -create_label >> assign_carrier >> schedule_pickup -shipping_flow = Flow(start=create_label) - -# Connect the flows into a main order pipeline -payment_flow >> inventory_flow >> shipping_flow - -# Create the master flow -order_pipeline = Flow(start=payment_flow) - -# Run the entire pipeline -order_pipeline.run(shared_data) -``` - -This creates a clean separation of concerns while maintaining a clear execution path: - -```mermaid -flowchart LR - subgraph order_pipeline[Order Pipeline] - subgraph paymentFlow["Payment Flow"] - A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] - end - - subgraph inventoryFlow["Inventory Flow"] - D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] - end - - subgraph shippingFlow["Shipping Flow"] - G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] - end - - paymentFlow --> inventoryFlow - inventoryFlow --> shippingFlow - end -``` - -================================================ -File: docs/core_abstraction/node.md -================================================ ---- -layout: default -title: "Node" -parent: "Core Abstraction" -nav_order: 1 ---- - -# Node - -A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`: - -
- -
- -1. `prep(shared)` - - **Read and preprocess data** from `shared` store. - - Examples: *query DB, read files, or serialize data into a string*. - - Return `prep_res`, which is used by `exec()` and `post()`. - -2. `exec(prep_res)` - - **Execute compute logic**, with optional retries and error handling (below). - - Examples: *(mostly) LLM calls, remote APIs, tool use*. - - ⚠️ This shall be only for compute and **NOT** access `shared`. - - ⚠️ If retries enabled, ensure idempotent implementation. - - Return `exec_res`, which is passed to `post()`. - -3. `post(shared, prep_res, exec_res)` - - **Postprocess and write data** back to `shared`. - - Examples: *update DB, change states, log results*. - - **Decide the next action** by returning a *string* (`action = "default"` if *None*). - -> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. -> -> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. -{: .note } - -### Fault Tolerance & Retries - -You can **retry** `exec()` if it raises an exception via two parameters when define the Node: - -- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). -- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). -`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. - -```python -my_node = SummarizeFile(max_retries=3, wait=10) -``` - -When an exception occurs in `exec()`, the Node automatically retries until: - -- It either succeeds, or -- The Node has retried `max_retries - 1` times already and fails on the last attempt. - -You can get the current retry times (0-based) from `self.cur_retry`. - -```python -class RetryNode(Node): - def exec(self, prep_res): - print(f"Retry {self.cur_retry} times") - raise Exception("Failed") -``` - -### Graceful Fallback - -To **gracefully handle** the exception (after all retries) rather than raising it, override: - -```python -def exec_fallback(self, prep_res, exc): - raise exc -``` - -By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. - -### Example: Summarize file - -```python -class SummarizeFile(Node): - def prep(self, shared): - return shared["data"] - - def exec(self, prep_res): - if not prep_res: - return "Empty file content" - prompt = f"Summarize this text in 10 words: {prep_res}" - summary = call_llm(prompt) # might fail - return summary - - def exec_fallback(self, prep_res, exc): - # Provide a simple fallback instead of crashing - return "There was an error processing your request." - - def post(self, shared, prep_res, exec_res): - shared["summary"] = exec_res - # Return "default" by not returning - -summarize_node = SummarizeFile(max_retries=3) - -# node.run() calls prep->exec->post -# If exec() fails, it retries up to 3 times before calling exec_fallback() -action_result = summarize_node.run(shared) - -print("Action returned:", action_result) # "default" -print("Summary stored:", shared["summary"]) -``` - - -================================================ -File: docs/core_abstraction/parallel.md -================================================ ---- -layout: default -title: "(Advanced) Parallel" -parent: "Core Abstraction" -nav_order: 6 ---- - -# (Advanced) Parallel - -**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. - -> Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. -{: .warning } - -> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. -> -> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). -> -> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. -{: .best-practice } - -## AsyncParallelBatchNode - -Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: - -```python -class ParallelSummaries(AsyncParallelBatchNode): - async def prep_async(self, shared): - # e.g., multiple texts - return shared["texts"] - - async def exec_async(self, text): - prompt = f"Summarize: {text}" - return await call_llm_async(prompt) - - async def post_async(self, shared, prep_res, exec_res_list): - shared["summary"] = "\n\n".join(exec_res_list) - return "default" - -node = ParallelSummaries() -flow = AsyncFlow(start=node) -``` - -## AsyncParallelBatchFlow - -Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: - -```python -class SummarizeMultipleFiles(AsyncParallelBatchFlow): - async def prep_async(self, shared): - return [{"filename": f} for f in shared["files"]] - -sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) -parallel_flow = SummarizeMultipleFiles(start=sub_flow) -await parallel_flow.run_async(shared) -``` - -================================================ -File: docs/design_pattern/agent.md -================================================ ---- -layout: default -title: "Agent" -parent: "Design Pattern" -nav_order: 1 ---- - -# Agent - -Agent is a powerful design pattern in which nodes can take dynamic actions based on the context. - -
- -
- -## Implement Agent with Graph - -1. **Context and Action:** Implement nodes that supply context and perform actions. -2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step. -3. **Agent Node:** Provide a prompt to decide action—for example: - -```python -f""" -### CONTEXT -Task: {task_description} -Previous Actions: {previous_actions} -Current State: {current_state} - -### ACTION SPACE -[1] search - Description: Use web search to get results - Parameters: - - query (str): What to search for - -[2] answer - Description: Conclude based on the results - Parameters: - - result (str): Final answer to provide - -### NEXT ACTION -Decide the next action based on the current context and available action space. -Return your response in the following format: - -```yaml -thinking: | - -action: -parameters: - : -```""" -``` - -The core of building **high-performance** and **reliable** agents boils down to: - -1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content. - -2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database. - -## Example Good Action Design - -- **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once. - -- **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). - -- **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. - -- **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. - -## Example: Search Agent - -This agent: -1. Decides whether to search or answer -2. If searches, loops back to decide if more search needed -3. Answers when enough context gathered - -```python -class DecideAction(Node): - def prep(self, shared): - context = shared.get("context", "No previous search") - query = shared["query"] - return query, context - - def exec(self, inputs): - query, context = inputs - prompt = f""" -Given input: {query} -Previous search results: {context} -Should I: 1) Search web for more info 2) Answer with current knowledge -Output in yaml: -```yaml -action: search/answer -reason: why this action -search_term: search phrase if action is search -```""" - resp = call_llm(prompt) - yaml_str = resp.split("```yaml")[1].split("```")[0].strip() - result = yaml.safe_load(yaml_str) - - assert isinstance(result, dict) - assert "action" in result - assert "reason" in result - assert result["action"] in ["search", "answer"] - if result["action"] == "search": - assert "search_term" in result - - return result - - def post(self, shared, prep_res, exec_res): - if exec_res["action"] == "search": - shared["search_term"] = exec_res["search_term"] - return exec_res["action"] - -class SearchWeb(Node): - def prep(self, shared): - return shared["search_term"] - - def exec(self, search_term): - return search_web(search_term) - - def post(self, shared, prep_res, exec_res): - prev_searches = shared.get("context", []) - shared["context"] = prev_searches + [ - {"term": shared["search_term"], "result": exec_res} - ] - return "decide" - -class DirectAnswer(Node): - def prep(self, shared): - return shared["query"], shared.get("context", "") - - def exec(self, inputs): - query, context = inputs - return call_llm(f"Context: {context}\nAnswer: {query}") - - def post(self, shared, prep_res, exec_res): - print(f"Answer: {exec_res}") - shared["answer"] = exec_res - -# Connect nodes -decide = DecideAction() -search = SearchWeb() -answer = DirectAnswer() - -decide - "search" >> search -decide - "answer" >> answer -search - "decide" >> decide # Loop back - -flow = Flow(start=decide) -flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) -``` - -================================================ -File: docs/design_pattern/mapreduce.md -================================================ ---- -layout: default -title: "Map Reduce" -parent: "Design Pattern" -nav_order: 4 ---- - -# Map Reduce - -MapReduce is a design pattern suitable when you have either: -- Large input data (e.g., multiple files to process), or -- Large output data (e.g., multiple forms to fill) - -and there is a logical way to break the task into smaller, ideally independent parts. - -
- -
- -You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. - -### Example: Document Summarization - -```python -class SummarizeAllFiles(BatchNode): - def prep(self, shared): - files_dict = shared["files"] # e.g. 10 files - return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] - - def exec(self, one_file): - filename, file_content = one_file - summary_text = call_llm(f"Summarize the following file:\n{file_content}") - return (filename, summary_text) - - def post(self, shared, prep_res, exec_res_list): - shared["file_summaries"] = dict(exec_res_list) - -class CombineSummaries(Node): - def prep(self, shared): - return shared["file_summaries"] - - def exec(self, file_summaries): - # format as: "File1: summary\nFile2: summary...\n" - text_list = [] - for fname, summ in file_summaries.items(): - text_list.append(f"{fname} summary:\n{summ}\n") - big_text = "\n---\n".join(text_list) - - return call_llm(f"Combine these file summaries into one final summary:\n{big_text}") - - def post(self, shared, prep_res, final_summary): - shared["all_files_summary"] = final_summary - -batch_node = SummarizeAllFiles() -combine_node = CombineSummaries() -batch_node >> combine_node - -flow = Flow(start=batch_node) - -shared = { - "files": { - "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", - "file2.txt": "Some other interesting text ...", - # ... - } -} -flow.run(shared) -print("Individual Summaries:", shared["file_summaries"]) -print("\nFinal Summary:\n", shared["all_files_summary"]) -``` - -================================================ -File: docs/design_pattern/rag.md -================================================ ---- -layout: default -title: "RAG" -parent: "Design Pattern" -nav_order: 3 ---- - -# RAG (Retrieval Augmented Generation) - -For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline: - -
- -
- -1. **Offline stage**: Preprocess and index documents ("building the index"). -2. **Online stage**: Given a question, generate answers by retrieving the most relevant context. - ---- -## Stage 1: Offline Indexing - -We create three Nodes: -1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text. -2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk. -3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md). - -```python -class ChunkDocs(BatchNode): - def prep(self, shared): - # A list of file paths in shared["files"]. We process each file. - return shared["files"] - - def exec(self, filepath): - # read file content. In real usage, do error handling. - with open(filepath, "r", encoding="utf-8") as f: - text = f.read() - # chunk by 100 chars each - chunks = [] - size = 100 - for i in range(0, len(text), size): - chunks.append(text[i : i + size]) - return chunks - - def post(self, shared, prep_res, exec_res_list): - # exec_res_list is a list of chunk-lists, one per file. - # flatten them all into a single list of chunks. - all_chunks = [] - for chunk_list in exec_res_list: - all_chunks.extend(chunk_list) - shared["all_chunks"] = all_chunks - -class EmbedDocs(BatchNode): - def prep(self, shared): - return shared["all_chunks"] - - def exec(self, chunk): - return get_embedding(chunk) - - def post(self, shared, prep_res, exec_res_list): - # Store the list of embeddings. - shared["all_embeds"] = exec_res_list - print(f"Total embeddings: {len(exec_res_list)}") - -class StoreIndex(Node): - def prep(self, shared): - # We'll read all embeds from shared. - return shared["all_embeds"] - - def exec(self, all_embeds): - # Create a vector index (faiss or other DB in real usage). - index = create_index(all_embeds) - return index - - def post(self, shared, prep_res, index): - shared["index"] = index - -# Wire them in sequence -chunk_node = ChunkDocs() -embed_node = EmbedDocs() -store_node = StoreIndex() - -chunk_node >> embed_node >> store_node - -OfflineFlow = Flow(start=chunk_node) -``` - -Usage example: - -```python -shared = { - "files": ["doc1.txt", "doc2.txt"], # any text files -} -OfflineFlow.run(shared) -``` - ---- -## Stage 2: Online Query & Answer - -We have 3 nodes: -1. `EmbedQuery` – embeds the user’s question. -2. `RetrieveDocs` – retrieves top chunk from the index. -3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer. - -```python -class EmbedQuery(Node): - def prep(self, shared): - return shared["question"] - - def exec(self, question): - return get_embedding(question) - - def post(self, shared, prep_res, q_emb): - shared["q_emb"] = q_emb - -class RetrieveDocs(Node): - def prep(self, shared): - # We'll need the query embedding, plus the offline index/chunks - return shared["q_emb"], shared["index"], shared["all_chunks"] - - def exec(self, inputs): - q_emb, index, chunks = inputs - I, D = search_index(index, q_emb, top_k=1) - best_id = I[0][0] - relevant_chunk = chunks[best_id] - return relevant_chunk - - def post(self, shared, prep_res, relevant_chunk): - shared["retrieved_chunk"] = relevant_chunk - print("Retrieved chunk:", relevant_chunk[:60], "...") - -class GenerateAnswer(Node): - def prep(self, shared): - return shared["question"], shared["retrieved_chunk"] - - def exec(self, inputs): - question, chunk = inputs - prompt = f"Question: {question}\nContext: {chunk}\nAnswer:" - return call_llm(prompt) - - def post(self, shared, prep_res, answer): - shared["answer"] = answer - print("Answer:", answer) - -embed_qnode = EmbedQuery() -retrieve_node = RetrieveDocs() -generate_node = GenerateAnswer() - -embed_qnode >> retrieve_node >> generate_node -OnlineFlow = Flow(start=embed_qnode) -``` - -Usage example: - -```python -# Suppose we already ran OfflineFlow and have: -# shared["all_chunks"], shared["index"], etc. -shared["question"] = "Why do people like cats?" - -OnlineFlow.run(shared) -# final answer in shared["answer"] -``` - -================================================ -File: docs/design_pattern/structure.md -================================================ ---- -layout: default -title: "Structured Output" -parent: "Design Pattern" -nav_order: 5 ---- - -# Structured Output - -In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. - -There are several approaches to achieve a structured output: -- **Prompting** the LLM to strictly return a defined structure. -- Using LLMs that natively support **schema enforcement**. -- **Post-processing** the LLM's response to extract structured content. - -In practice, **Prompting** is simple and reliable for modern LLMs. - -### Example Use Cases - -- Extracting Key Information - -```yaml -product: - name: Widget Pro - price: 199.99 - description: | - A high-quality widget designed for professionals. - Recommended for advanced users. -``` - -- Summarizing Documents into Bullet Points - -```yaml -summary: - - This product is easy to use. - - It is cost-effective. - - Suitable for all skill levels. -``` - -- Generating Configuration Files - -```yaml -server: - host: 127.0.0.1 - port: 8080 - ssl: true -``` - -## Prompt Engineering - -When prompting the LLM to produce **structured** output: -1. **Wrap** the structure in code fences (e.g., `yaml`). -2. **Validate** that all required fields exist (and let `Node` handles retry). - -### Example Text Summarization - -```python -class SummarizeNode(Node): - def exec(self, prep_res): - # Suppose `prep_res` is the text to summarize. - prompt = f""" -Please summarize the following text as YAML, with exactly 3 bullet points - -{prep_res} - -Now, output: -```yaml -summary: - - bullet 1 - - bullet 2 - - bullet 3 -```""" - response = call_llm(prompt) - yaml_str = response.split("```yaml")[1].split("```")[0].strip() - - import yaml - structured_result = yaml.safe_load(yaml_str) - - assert "summary" in structured_result - assert isinstance(structured_result["summary"], list) - - return structured_result -``` - -> Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic) -{: .note } - -### Why YAML instead of JSON? - -Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. - -**In JSON** - -```json -{ - "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" -} -``` - -- Every double quote inside the string must be escaped with `\"`. -- Each newline in the dialogue must be represented as `\n`. - -**In YAML** - -```yaml -dialogue: | - Alice said: "Hello Bob. - How are you? - I am good." -``` - -- No need to escape interior quotes—just place the entire text under a block literal (`|`). -- Newlines are naturally preserved without needing `\n`. - -================================================ -File: docs/design_pattern/workflow.md -================================================ ---- -layout: default -title: "Workflow" -parent: "Design Pattern" -nav_order: 2 ---- - -# Workflow - -Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes. - -
- -
- -> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. -> - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. -> -> You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md). -{: .best-practice } - -### Example: Article Writing - -```python -class GenerateOutline(Node): - def prep(self, shared): return shared["topic"] - def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") - def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res - -class WriteSection(Node): - def prep(self, shared): return shared["outline"] - def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") - def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res - -class ReviewAndRefine(Node): - def prep(self, shared): return shared["draft"] - def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") - def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res - -# Connect nodes -outline = GenerateOutline() -write = WriteSection() -review = ReviewAndRefine() - -outline >> write >> review - -# Create and run flow -writing_flow = Flow(start=outline) -shared = {"topic": "AI Safety"} -writing_flow.run(shared) -``` - -For *dynamic cases*, consider using [Agents](./agent.md). - -================================================ -File: docs/utility_function/llm.md -================================================ ---- -layout: default -title: "LLM Wrapper" -parent: "Utility Function" -nav_order: 1 ---- - -# LLM Wrappers - -Check out libraries like [litellm](https://github.com/BerriAI/litellm). -Here, we provide some minimal example implementations: - -1. OpenAI - ```python - def call_llm(prompt): - from openai import OpenAI - client = OpenAI(api_key="YOUR_API_KEY_HERE") - r = client.chat.completions.create( - model="gpt-4o", - messages=[{"role": "user", "content": prompt}] - ) - return r.choices[0].message.content - - # Example usage - call_llm("How are you?") - ``` - > Store the API key in an environment variable like OPENAI_API_KEY for security. - {: .best-practice } - -2. Claude (Anthropic) - ```python - def call_llm(prompt): - from anthropic import Anthropic - client = Anthropic(api_key="YOUR_API_KEY_HERE") - response = client.messages.create( - model="claude-2", - messages=[{"role": "user", "content": prompt}], - max_tokens=100 - ) - return response.content - ``` - -3. Google (Generative AI Studio / PaLM API) - ```python - def call_llm(prompt): - import google.generativeai as genai - genai.configure(api_key="YOUR_API_KEY_HERE") - response = genai.generate_text( - model="models/text-bison-001", - prompt=prompt - ) - return response.result - ``` - -4. Azure (Azure OpenAI) - ```python - def call_llm(prompt): - from openai import AzureOpenAI - client = AzureOpenAI( - azure_endpoint="https://.openai.azure.com/", - api_key="YOUR_API_KEY_HERE", - api_version="2023-05-15" - ) - r = client.chat.completions.create( - model="", - messages=[{"role": "user", "content": prompt}] - ) - return r.choices[0].message.content - ``` - -5. Ollama (Local LLM) - ```python - def call_llm(prompt): - from ollama import chat - response = chat( - model="llama2", - messages=[{"role": "user", "content": prompt}] - ) - return response.message.content - ``` - -## Improvements -Feel free to enhance your `call_llm` function as needed. Here are examples: - -- Handle chat history: - -```python -def call_llm(messages): - from openai import OpenAI - client = OpenAI(api_key="YOUR_API_KEY_HERE") - r = client.chat.completions.create( - model="gpt-4o", - messages=messages - ) - return r.choices[0].message.content -``` - -- Add in-memory caching - -```python -from functools import lru_cache - -@lru_cache(maxsize=1000) -def call_llm(prompt): - # Your implementation here - pass -``` - -> ⚠️ Caching conflicts with Node retries, as retries yield the same result. -> -> To address this, you could use cached results only if not retried. -{: .warning } - - -```python -from functools import lru_cache - -@lru_cache(maxsize=1000) -def cached_call(prompt): - pass - -def call_llm(prompt, use_cache): - if use_cache: - return cached_call(prompt) - # Call the underlying function directly - return cached_call.__wrapped__(prompt) - -class SummarizeNode(Node): - def exec(self, text): - return call_llm(f"Summarize: {text}", self.cur_retry==0) -``` - -- Enable logging: - -```python -def call_llm(prompt): - import logging - logging.info(f"Prompt: {prompt}") - response = ... # Your implementation here - logging.info(f"Response: {response}") - return response -``` \ No newline at end of file diff --git a/utils/update_pocketflow_mdc.py b/utils/update_pocketflow_mdc.py new file mode 100644 index 0000000..bb4e404 --- /dev/null +++ b/utils/update_pocketflow_mdc.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +""" +Script to generate MDC files from the PocketFlow docs folder, creating one MDC file per MD file. + +Usage: + python update_pocketflow_mdc.py [--docs-dir PATH] [--rules-dir PATH] +""" + +import os +import re +import shutil +from pathlib import Path +import sys +import html.parser + +class HTMLTagStripper(html.parser.HTMLParser): + """HTML Parser subclass to strip HTML tags from content""" + def __init__(self): + super().__init__() + self.reset() + self.strict = False + self.convert_charrefs = True + self.text = [] + + def handle_data(self, data): + self.text.append(data) + + def get_text(self): + return ''.join(self.text) + +def strip_html_tags(html_content): + """Remove HTML tags from content""" + stripper = HTMLTagStripper() + stripper.feed(html_content) + return stripper.get_text() + +def extract_frontmatter(file_path): + """Extract title, parent, and nav_order from markdown frontmatter""" + frontmatter = {} + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Extract frontmatter between --- markers + fm_match = re.search(r'^---\s*(.+?)\s*---', content, re.DOTALL) + if fm_match: + frontmatter_text = fm_match.group(1) + + # Extract fields + title_match = re.search(r'title:\s*"?([^"\n]+)"?', frontmatter_text) + parent_match = re.search(r'parent:\s*"?([^"\n]+)"?', frontmatter_text) + nav_order_match = re.search(r'nav_order:\s*(\d+)', frontmatter_text) + + if title_match: + frontmatter['title'] = title_match.group(1) + if parent_match: + frontmatter['parent'] = parent_match.group(1) + if nav_order_match: + frontmatter['nav_order'] = int(nav_order_match.group(1)) + except Exception as e: + print(f"Error reading frontmatter from {file_path}: {e}") + + return frontmatter + +def extract_first_heading(file_path): + """Extract the first heading from markdown content""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # Remove frontmatter + content = re.sub(r'^---.*?---\s*', '', content, flags=re.DOTALL) + + # Find first heading + heading_match = re.search(r'#\s+(.+)', content) + if heading_match: + return heading_match.group(1).strip() + except Exception as e: + print(f"Error extracting heading from {file_path}: {e}") + + # Fallback to filename if no heading found + return Path(file_path).stem.replace('_', ' ').title() + +def get_mdc_description(md_file, frontmatter, heading): + """Generate a description for the MDC file based on file metadata""" + section = "" + subsection = "" + + # Determine section from path + path_parts = Path(md_file).parts + if 'core_abstraction' in path_parts: + section = "Core Abstraction" + elif 'design_pattern' in path_parts: + section = "Design Pattern" + elif 'utility_function' in path_parts: + section = "Utility Function" + + # Use frontmatter title or heading as subsection + if 'title' in frontmatter: + subsection = frontmatter['title'] + else: + subsection = heading + + # For index.md at root level, use a different format + if Path(md_file).name == "index.md" and section == "": + return "Guidelines for using PocketFlow, a minimalist LLM framework" + + # For other files, create a more specific description + if section: + return f"Guidelines for using PocketFlow, {section}, {subsection}" + else: + return f"Guidelines for using PocketFlow, {subsection}" + +def process_markdown_content(content, remove_local_refs=False): + """Process markdown content to make it suitable for MDC file""" + # Remove frontmatter + content = re.sub(r'^---.*?---\s*', '', content, flags=re.DOTALL) + + # Replace HTML div tags and their content + content = re.sub(r'.*?', '', content, flags=re.DOTALL) + + if remove_local_refs: + # Replace markdown links to local documentation with just the text in brackets + # This prevents automatically including all docs when the file is loaded + # Keep the brackets around the text for better discoverability + content = re.sub(r'\[([^\]]+)\]\(\./[^)]+\)', r'[\1]', content) + else: + # Adjust relative links to maintain references within the docs structure + content = re.sub(r'\]\(\./([^)]+)\)', r'](mdc:./\1)', content) + + # Ensure links to md files work correctly + content = re.sub(r'\]\(mdc:\./(.+?)\.md\)', r'](mdc:./\1.md)', content) + content = re.sub(r'\]\(mdc:\./(.+?)\.html\)', r'](mdc:./\1.md)', content) + + # Strip remaining HTML tags + content = strip_html_tags(content) + + return content + +def generate_mdc_header(md_file, description, always_apply=False): + """Generate MDC file header with appropriate frontmatter""" + # Determine if we should include globs + # For index.md and guide.md, we include **/*.py to provide high-level context for Python files + # For other files, leave it empty to be less intrusive + globs = "**/*.py" if always_apply else "" + + return f"""--- +description: {description} +globs: {globs} +alwaysApply: {"true" if always_apply else "false"} +--- +""" + +def has_substantive_content(content): + """Check if the processed content has substantive content beyond the frontmatter""" + # Remove frontmatter + content_without_frontmatter = re.sub(r'^---.*?---\s*', '', content, flags=re.DOTALL) + + # Remove whitespace and common HTML/markdown formatting + cleaned_content = re.sub(r'\s+', '', content_without_frontmatter) + cleaned_content = re.sub(r'{:.*?}', '', cleaned_content) + + # If there's almost nothing left after cleaning, consider it empty + return len(cleaned_content) > 20 # Arbitrary threshold, adjust as needed + +def convert_md_to_mdc(md_file, output_dir, docs_dir, special_treatment=False): + """Convert a markdown file to MDC format and save to the output directory""" + try: + print(f"Processing: {md_file}") + + # Skip empty index.md files in subfolders + file_name = Path(md_file).name + parent_dir = Path(md_file).parent.name + + # Check if this is an index.md in a subfolder (not the main index.md) + if (file_name == "index.md" and parent_dir != "docs" and + parent_dir in ["core_abstraction", "design_pattern", "utility_function"]): + + # Read the content + with open(md_file, 'r', encoding='utf-8') as f: + content = f.read() + + # Skip if it doesn't have substantive content + if not has_substantive_content(content): + print(f"Skipping empty subfolder index: {md_file}") + return True + + # Extract metadata from file + frontmatter = extract_frontmatter(md_file) + heading = extract_first_heading(md_file) + description = get_mdc_description(md_file, frontmatter, heading) + + # Read the content + with open(md_file, 'r', encoding='utf-8') as f: + content = f.read() + + # Check if this file should have special treatment (index.md or guide.md) + is_special = special_treatment or Path(md_file).name == "guide.md" + + # Process the content + processed_content = process_markdown_content(content, remove_local_refs=is_special) + + # Generate the MDC header + mdc_header = generate_mdc_header(md_file, description, always_apply=is_special) + + # Combine header and processed content + mdc_content = mdc_header + processed_content + + # Perform a final check to ensure the processed content is substantive + if not has_substantive_content(processed_content): + print(f"Skipping file with no substantive content after processing: {md_file}") + return True + + # Get the path relative to the docs directory + rel_path = os.path.relpath(md_file, start=Path(docs_dir)) + + # Extract just the filename and directory structure without the 'docs/' prefix + path_parts = Path(rel_path).parts + if len(path_parts) > 1 and path_parts[0] == 'docs': + # Remove the 'docs/' prefix from the path + rel_path = os.path.join(*path_parts[1:]) + + # Create the output path + output_path = Path(output_dir) / rel_path + + # Create output directory if it doesn't exist + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Change extension from .md to .mdc + output_path = output_path.with_suffix('.mdc') + + # Write the MDC file + with open(output_path, 'w', encoding='utf-8') as f: + f.write(mdc_content) + + print(f"Created MDC file: {output_path}") + return True + + except Exception as e: + print(f"Error converting {md_file} to MDC: {e}") + return False + +def generate_mdc_files(docs_dir, rules_dir): + """Generate MDC files from all markdown files in the docs directory""" + docs_path = Path(docs_dir) + rules_path = Path(rules_dir) + + # Make sure the docs directory exists + if not docs_path.exists() or not docs_path.is_dir(): + raise ValueError(f"Directory not found: {docs_dir}") + + print(f"Generating MDC files from docs in: {docs_dir}") + print(f"Output will be written to: {rules_dir}") + + # Create the rules directory if it doesn't exist + rules_path.mkdir(parents=True, exist_ok=True) + + # Process the main index.md file first + index_file = docs_path / "index.md" + if index_file.exists(): + convert_md_to_mdc(index_file, rules_path, docs_dir, special_treatment=True) + + # Process guide.md file with special treatment (if it exists) + guide_file = docs_path / "guide.md" + if guide_file.exists(): + convert_md_to_mdc(guide_file, rules_path, docs_dir, special_treatment=True) + + # Process all other markdown files + success_count = 0 + failure_count = 0 + + # Find all markdown files + md_files = list(docs_path.glob("**/*.md")) + + # Skip the main index.md and guide.md files as we've already processed them + md_files = [f for f in md_files if f != index_file and f != guide_file] + + # Process each markdown file + for md_file in md_files: + if convert_md_to_mdc(md_file, rules_path, docs_dir): + success_count += 1 + else: + failure_count += 1 + + print(f"\nProcessed {len(md_files) + 2} markdown files:") + print(f" - Successfully converted: {success_count + 2}") + print(f" - Failed conversions: {failure_count}") + + return success_count > 0 and failure_count == 0 + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Generate MDC files from PocketFlow docs") + + # Get script directory + script_dir = Path(__file__).parent.absolute() + + # Default to PocketFlow/docs directory relative to script location + default_docs_dir = (script_dir.parent / "docs").as_posix() + + # Default rules directory - changed to .cursor/rules + default_rules_dir = (script_dir.parent / ".cursor" / "rules").as_posix() + + parser.add_argument("--docs-dir", + default=default_docs_dir, + help="Path to PocketFlow docs directory") + parser.add_argument("--rules-dir", + default=default_rules_dir, + help="Output directory for MDC files") + + args = parser.parse_args() + + try: + success = generate_mdc_files(args.docs_dir, args.rules_dir) + sys.exit(0 if success else 1) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) \ No newline at end of file