================================================ File: docs/guide.md ================================================ --- layout: default title: "Design Guidance" parent: "Apps" nav_order: 1 --- # LLM System Design Guidance ## Example LLM Project File Structure ``` my_project/ ├── main.py ├── flow.py ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py ├── tests/ │ ├── __init__.py │ ├── test_flow.py │ └── test_nodes.py ├── requirements.txt └── docs/ └── design.md ``` ### `docs/` Store the documentation of the project. It should include a `design.md` file, which describes - Project requirements - Required utility functions - High-level flow with a mermaid diagram - Shared memory data structure - For each node, discuss - Node purpose and design (e.g., should it be a batch or async node?) - How the data shall be read (for `prep`) and written (for `post`) - How the data shall be processed (for `exec`) ### `utils/` Houses functions for external API calls (e.g., LLMs, web searches, etc.). It’s recommended to dedicate one Python file per API call, with names like `call_llm.py` or `search_web.py`. Each file should include: - The function to call the API - A main function to run that API call For instance, here’s a simplified `call_llm.py` example: ```python from openai import OpenAI def call_llm(prompt): client = OpenAI(api_key="YOUR_API_KEY_HERE") response = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content def main(): prompt = "Hello, how are you?" print(call_llm(prompt)) if __name__ == "__main__": main() ``` ### `main.py` Serves as the project’s entry point. ### `flow.py` Implements the application’s flow, starting with node followed by the flow structure. ### `tests/` Optionally contains all tests. Use `pytest` for testing flows, nodes, and utility functions. For example, `test_call_llm.py` might look like: ```python from utils.call_llm import call_llm def test_call_llm(): prompt = "Hello, how are you?" assert call_llm(prompt) is not None ``` ## System Design Steps 1. **Project Requirements** - Identify the project's core entities. - Define each functional requirement and map out how these entities interact step by step. 2. **Utility Functions** - Determine the low-level utility functions you’ll need (e.g., for LLM calls, web searches, file handling). - Implement these functions and write basic tests to confirm they work correctly. 3. **Flow Design** - Develop a high-level process flow that meets the project’s requirements. - Specify which utility functions are used at each step. - Identify possible decision points for *Node Actions* and data-intensive operations for *Batch* tasks. - Illustrate the flow with a Mermaid diagram. 4. **Data Structure** - Decide how to store and update state, whether in memory (for smaller applications) or a database (for larger or persistent needs). - Define data schemas or models that detail how information is stored, accessed, and updated. 5. **Implementation** - Start coding with a simple, direct approach (avoid over-engineering at first). - For each node in your flow: - **prep**: Determine how data is accessed or retrieved. - **exec**: Outline the actual processing or logic needed. - **post**: Handle any final updates or data persistence tasks. 6. **Optimization** - **Prompt Engineering**: Use clear and specific instructions with illustrative examples to reduce ambiguity. - **Task Decomposition**: Break large, complex tasks into manageable, logical steps. 7. **Reliability** - **Structured Output**: Verify outputs conform to the required format. Consider increasing `max_retries` if needed. - **Test Cases**: Develop clear, reproducible tests for each part of the flow. - **Self-Evaluation**: Introduce an additional Node (powered by LLMs) to review outputs when the results are uncertain. ================================================ File: docs/agent.md ================================================ --- layout: default title: "Agent" parent: "Paradigm" nav_order: 6 --- # Agent For many tasks, we need agents that take dynamic and recursive actions based on the inputs they receive. You can create these agents as **Nodes** connected by *Actions* in a directed graph using [Flow](./flow.md). ### Example: Search Agent This agent: 1. Decides whether to search or answer 2. If searches, loops back to decide if more search needed 3. Answers when enough context gathered ```python class DecideAction(Node): def prep(self, shared): context = shared.get("context", "No previous search") query = shared["query"] return query, context def exec(self, inputs): query, context = inputs prompt = f""" Given input: {query} Previous search results: {context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: ```yaml action: search/answer reason: why this action search_term: search phrase if action is search ```""" resp = call_llm(prompt) yaml_str = resp.split("```yaml")[1].split("```")[0].strip() result = yaml.safe_load(yaml_str) assert isinstance(result, dict) assert "action" in result assert "reason" in result assert result["action"] in ["search", "answer"] if result["action"] == "search": assert "search_term" in result return result def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_term"] = exec_res["search_term"] return exec_res["action"] class SearchWeb(Node): def prep(self, shared): return shared["search_term"] def exec(self, search_term): return search_web(search_term) def post(self, shared, prep_res, exec_res): prev_searches = shared.get("context", []) shared["context"] = prev_searches + [ {"term": shared["search_term"], "result": exec_res} ] return "decide" class DirectAnswer(Node): def prep(self, shared): return shared["query"], shared.get("context", "") def exec(self, inputs): query, context = inputs return call_llm(f"Context: {context}\nAnswer: {query}") def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") shared["answer"] = exec_res # Connect nodes decide = DecideAction() search = SearchWeb() answer = DirectAnswer() decide - "search" >> search decide - "answer" >> answer search - "decide" >> decide # Loop back flow = Flow(start=decide) flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) ``` ================================================ File: docs/async.md ================================================ --- layout: default title: "(Advanced) Async" parent: "Core Abstraction" nav_order: 5 --- # (Advanced) Async **Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: 1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. 2. **exec_async()**: Typically used for async LLM calls. 3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. **Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. ### Example ```python class SummarizeThenVerify(AsyncNode): async def prep_async(self, shared): # Example: read a file asynchronously doc_text = await read_file_async(shared["doc_path"]) return doc_text async def exec_async(self, prep_res): # Example: async LLM call summary = await call_llm_async(f"Summarize: {prep_res}") return summary async def post_async(self, shared, prep_res, exec_res): # Example: wait for user feedback decision = await gather_user_feedback(exec_res) if decision == "approve": shared["summary"] = exec_res return "approve" return "deny" summarize_node = SummarizeThenVerify() final_node = Finalize() # Define transitions summarize_node - "approve" >> final_node summarize_node - "deny" >> summarize_node # retry flow = AsyncFlow(start=summarize_node) async def main(): shared = {"doc_path": "document.txt"} await flow.run_async(shared) print("Final Summary:", shared.get("summary")) asyncio.run(main()) ``` ================================================ File: docs/batch.md ================================================ --- layout: default title: "Batch" parent: "Core Abstraction" nav_order: 4 --- # Batch **Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: - **Chunk-based** processing (e.g., splitting large texts). - **Iterative** processing over lists of input items (e.g., user queries, files, URLs). ## 1. BatchNode A **BatchNode** extends `Node` but changes `prep()` and `exec()`: - **`prep(shared)`**: returns an **iterable** (e.g., list, generator). - **`exec(item)`**: called **once** per item in that iterable. - **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. ### Example: Summarize a Large File ```python class MapSummaries(BatchNode): def prep(self, shared): # Suppose we have a big file; chunk it content = shared["data"] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] return chunks def exec(self, chunk): prompt = f"Summarize this chunk in 10 words: {chunk}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res_list): combined = "\n".join(exec_res_list) shared["summary"] = combined return "default" map_summaries = MapSummaries() flow = Flow(start=map_summaries) flow.run(shared) ``` --- ## 2. BatchFlow A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. ### Example: Summarize Many Files ```python class SummarizeAllFiles(BatchFlow): def prep(self, shared): # Return a list of param dicts (one per file) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames] # Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): summarize_file = SummarizeFile(start=load_file) # Wrap that flow into a BatchFlow: summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files.run(shared) ``` ### Under the Hood 1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. 2. The **BatchFlow** loops through each dict. For each one: - It merges the dict with the BatchFlow’s own `params`. - It calls `flow.run(shared)` using the merged result. 3. This means the sub-Flow is run **repeatedly**, once for every param dict. --- ## 3. Nested or Multi-Level Batches You can nest a **BatchFlow** in another **BatchFlow**. For instance: - **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). - **Inner** batch: returning a list of per-file param dicts. At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. ```python class FileBatchFlow(BatchFlow): def prep(self, shared): directory = self.params["directory"] # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files] class DirectoryBatchFlow(BatchFlow): def prep(self, shared): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] # MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} inner_flow = FileBatchFlow(start=MapSummaries()) outer_flow = DirectoryBatchFlow(start=inner_flow) ``` ================================================ File: docs/communication.md ================================================ --- layout: default title: "Communication" parent: "Core Abstraction" nav_order: 3 --- # Communication Nodes and Flows **communicate** in two ways: 1. **Shared Store (recommended)** - A global data structure (often an in-mem dict) that all nodes can read and write by `prep()` and `post()`. - Great for data results, large content, or anything multiple nodes need. - You shall design the data structure and populate it ahead. 2. **Params (only for [Batch](./batch.md))** - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. - Good for identifiers like filenames or numeric IDs, in Batch mode. If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). > **Best Practice:** Use `Shared Store` for almost all cases. It's flexible and easy to manage. It separates data storage from data processing, making the code more readable and easier to maintain. > > `Params` is more a syntax sugar for [Batch](./batch.md). {: .note } --- ## 1. Shared Store ### Overview A shared store is typically an in-mem dictionary, like: ```python shared = {"data": {}, "summary": {}, "config": {...}, ...} ``` It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. ### Example ```python class LoadData(Node): def post(self, shared, prep_res, exec_res): # We write data to shared store shared["data"] = "Some text content" return None class Summarize(Node): def prep(self, shared): # We read data from shared store return shared["data"] def exec(self, prep_res): # Call LLM to summarize prompt = f"Summarize: {prep_res}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # We write summary to shared store shared["summary"] = exec_res return "default" load_data = LoadData() summarize = Summarize() load_data >> summarize flow = Flow(start=load_data) shared = {} flow.run(shared) ``` Here: - `LoadData` writes to `shared["data"]`. - `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. --- ## 2. Params **Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: - **Immutable** during a Node’s run cycle (i.e., they don’t change mid-`prep->exec->post`). - **Set** via `set_params()`. - **Cleared** and updated each time a parent Flow calls it. > Only set the uppermost Flow params because others will be overwritten by the parent Flow. > > If you need to set child node params, see [Batch](./batch.md). {: .warning } Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. ### Example ```python # 1) Create a Node that uses params class SummarizeFile(Node): def prep(self, shared): # Access the node's param filename = self.params["filename"] return shared["data"].get(filename, "") def exec(self, prep_res): prompt = f"Summarize: {prep_res}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): filename = self.params["filename"] shared["summary"][filename] = exec_res return "default" # 2) Set params node = SummarizeFile() # 3) Set Node params directly (for testing) node.set_params({"filename": "doc1.txt"}) node.run(shared) # 4) Create Flow flow = Flow(start=node) # 5) Set Flow params (overwrites node params) flow.set_params({"filename": "doc2.txt"}) flow.run(shared) # The node summarizes doc2, not doc1 ``` --- ================================================ File: docs/decomp.md ================================================ --- layout: default title: "Task Decomposition" parent: "Paradigm" nav_order: 2 --- # Task Decomposition Many real-world tasks are too complex for one LLM call. The solution is to decompose them into multiple calls as a [Flow](./flow.md) of Nodes. ### Example: Article Writing ```python class GenerateOutline(Node): def prep(self, shared): return shared["topic"] def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res class WriteSection(Node): def prep(self, shared): return shared["outline"] def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res class ReviewAndRefine(Node): def prep(self, shared): return shared["draft"] def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res # Connect nodes outline = GenerateOutline() write = WriteSection() review = ReviewAndRefine() outline >> write >> review # Create and run flow writing_flow = Flow(start=outline) shared = {"topic": "AI Safety"} writing_flow.run(shared) ``` ================================================ File: docs/essay.md ================================================ --- layout: default title: "Essay" parent: "Apps" nav_order: 2 --- # Summarization + QA agent for Paul Graham Essay ```python from pocketflow import * import openai, os, yaml # Minimal LLM wrapper def call_llm(prompt): openai.api_key = "YOUR_API_KEY_HERE" r = openai.ChatCompletion.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content shared = {"data": {}, "summary": {}} # Load data into shared['data'] class LoadData(Node): def prep(self, shared): path = "./PocketFlow/data/PaulGrahamEssaysLarge" for fn in os.listdir(path): with open(os.path.join(path, fn), 'r') as f: shared['data'][fn] = f.read() def exec(self, res): pass def post(self, s, pr, er): pass LoadData().run(shared) # Summarize one file class SummarizeFile(Node): def prep(self, s): return s['data'][self.params['filename']] def exec(self, content): return call_llm(f"{content} Summarize in 10 words.") def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr node_summ = SummarizeFile() node_summ.set_params({"filename":"addiction.txt"}) node_summ.run(shared) # Map-Reduce summarization class MapSummaries(BatchNode): def prep(self, s): text = s['data'][self.params['filename']] return [text[i:i+10000] for i in range(0, len(text), 10000)] def exec(self, chunk): return call_llm(f"{chunk} Summarize in 10 words.") def post(self, s, pr, er): s["summary"][self.params['filename']] = [f"{i}. {r}" for i,r in enumerate(er)] class ReduceSummaries(Node): def prep(self, s): return s["summary"][self.params['filename']] def exec(self, chunks): return call_llm(f"{chunks} Combine into 10 words summary.") def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr map_summ = MapSummaries() reduce_summ = ReduceSummaries() map_summ >> reduce_summ flow = Flow(start=map_summ) flow.set_params({"filename":"before.txt"}) flow.run(shared) # Summarize all files class SummarizeAllFiles(BatchFlow): def prep(self, s): return [{"filename":fn} for fn in s['data']] SummarizeAllFiles(start=flow).run(shared) # QA agent class FindRelevantFile(Node): def prep(self, s): q = input("Enter a question: ") filenames = list(s['summary'].keys()) file_summaries = [f"- '{fn}': {s['summary'][fn]}" for fn in filenames] return q, filenames, file_summaries def exec(self, p): q, filenames, file_summaries = p if not q: return {"think":"no question", "has_relevant":False} resp = call_llm(f""" Question: {q} Find the most relevant file from: {file_summaries} If none, explain why Output in code fence: ```yaml think: > reasoning about relevance has_relevant: true/false most_relevant: filename if relevant ```""") yaml_str = resp.split("```yaml")[1].split("```")[0].strip() result = yaml.safe_load(yaml_str) # Validate response assert isinstance(result, dict) assert "think" in result assert "has_relevant" in result assert isinstance(result["has_relevant"], bool) if result["has_relevant"]: assert "most_relevant" in result assert result["most_relevant"] in filenames return result def exec_fallback(self, p, exc): return {"think":"error","has_relevant":False} def post(self, s, pr, res): q, _ = pr if not q: print("No question asked"); return "end" if res["has_relevant"]: s["question"], s["relevant_file"] = q, res["most_relevant"] print("Relevant file:", res["most_relevant"]) return "answer" else: print("No relevant file:", res["think"]) return "retry" class AnswerQuestion(Node): def prep(self, s): return s['question'], s['data'][s['relevant_file']] def exec(self, p): q, txt = p return call_llm(f"Question: {q}\nText: {txt}\nAnswer in 50 words.") def post(self, s, pr, ex): print("Answer:", ex) class NoOp(Node): pass frf = FindRelevantFile(max_retries=3) aq = AnswerQuestion() noop = NoOp() frf - "answer" >> aq >> frf frf - "retry" >> frf frf - "end" >> noop qa = Flow(start=frf) qa.run(shared) ``` ================================================ File: docs/flow.md ================================================ --- layout: default title: "Flow" parent: "Core Abstraction" nav_order: 2 --- # Flow A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`. ## 1. Action-based Transitions Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`. You define transitions with the syntax: 1. **Basic default transition**: `node_a >> node_b` This means if `node_a.post()` returns `"default"`, go to `node_b`. (Equivalent to `node_a - "default" >> node_b`) 2. **Named action transition**: `node_a - "action_name" >> node_b` This means if `node_a.post()` returns `"action_name"`, go to `node_b`. It's possible to create loops, branching, or multi-step flows. ## 2. Creating a Flow A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node. ### Example: Simple Sequence Here's a minimal flow of two nodes in a chain: ```python node_a >> node_b flow = Flow(start=node_a) flow.run(shared) ``` - When you run the flow, it executes `node_a`. - Suppose `node_a.post()` returns `"default"`. - The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. - `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there. ### Example: Branching & Looping Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: - `"approved"`: expense is approved, move to payment processing - `"needs_revision"`: expense needs changes, send back for revision - `"rejected"`: expense is denied, finish the process We can wire them like this: ```python # Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process revise >> review # After revision, go back for another review payment >> finish # After payment, finish the process flow = Flow(start=review) ``` Let's see how it flows: 1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node 2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review` 3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops ```mermaid flowchart TD review[Review Expense] -->|approved| payment[Process Payment] review -->|needs_revision| revise[Revise Report] review -->|rejected| finish[Finish Process] revise --> review payment --> finish ``` ### Running Individual Nodes vs. Running a Flow - `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. - `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. > `node.run(shared)` **does not** proceed to the successor. > This is mainly for debugging or testing a single node. > > Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. {: .warning } ## 3. Nested Flows A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: 1. Use a Flow as a Node within another Flow's transitions. 2. Combine multiple smaller Flows into a larger Flow for reuse. 3. Node `params` will be a merging of **all** parents' `params`. ### Flow's Node Methods A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - It **won't** run `exec()`, as its main logic is to orchestrate its nodes. - `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. ### Basic Flow Nesting Here's how to connect a flow to another node: ```python # Create a sub-flow node_a >> node_b subflow = Flow(start=node_a) # Connect it to another node subflow >> node_c # Create the parent flow parent_flow = Flow(start=subflow) ``` When `parent_flow.run()` executes: 1. It starts `subflow` 2. `subflow` runs through its nodes (`node_a->node_b`) 3. After `subflow` completes, execution continues to `node_c` ### Example: Order Processing Pipeline Here's a practical example that breaks down order processing into nested flows: ```python # Payment processing sub-flow validate_payment >> process_payment >> payment_confirmation payment_flow = Flow(start=validate_payment) # Inventory sub-flow check_stock >> reserve_items >> update_inventory inventory_flow = Flow(start=check_stock) # Shipping sub-flow create_label >> assign_carrier >> schedule_pickup shipping_flow = Flow(start=create_label) # Connect the flows into a main order pipeline payment_flow >> inventory_flow >> shipping_flow # Create the master flow order_pipeline = Flow(start=payment_flow) # Run the entire pipeline order_pipeline.run(shared_data) ``` This creates a clean separation of concerns while maintaining a clear execution path: ```mermaid flowchart LR subgraph order_pipeline[Order Pipeline] subgraph paymentFlow["Payment Flow"] A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] end subgraph inventoryFlow["Inventory Flow"] D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] end subgraph shippingFlow["Shipping Flow"] G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end ``` ================================================ File: docs/index.md ================================================ --- layout: default title: "Home" nav_order: 1 --- # Pocket Flow A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. We model the LLM workflow as a **Nested Directed Graph**: - **Nodes** handle simple (LLM) tasks. - Nodes connect through **Actions** (labeled edges) for *Agents*. - **Flows** orchestrate a directed graph of Nodes for *Task Decomposition*. - A Flow can be used as a Node (for **Nesting**). - **Batch** Nodes/Flows for data-intensive tasks. - **Async** Nodes/Flows allow waits or **Parallel** execution
> Have questions? Chat with [AI Assistant](https://chatgpt.com/g/g-677464af36588191b9eba4901946557b-mini-llm-flow-assistant) {: .note } ## Core Abstraction - [Node](./node.md) - [Flow](./flow.md) - [Communication](./communication.md) - [Batch](./batch.md) - [(Advanced) Async](./async.md) - [(Advanced) Parallel](./parallel.md) ## Low-Level Details - [LLM Wrapper](./llm.md) - [Tool](./tool.md) - [Viz and Debug](./viz.md) - Chunking > We do not provide built-in implementations. > > Example implementations are provided as reference. {: .warning } ## High-Level Paradigm - [Structured Output](./structure.md) - [Task Decomposition](./decomp.md) - [Map Reduce](./mapreduce.md) - [RAG](./rag.md) - [Chat Memory](./memory.md) - [Agent](./agent.md) - [(Advanced) Multi-Agents](./multi_agent.md) - Evaluation ## Example LLM Apps [LLM System Design Guidance](./guide.md) - [Summarization + QA agent for Paul Graham Essay](./essay.md) - More coming soon... ================================================ File: docs/llm.md ================================================ --- layout: default title: "LLM Wrapper" parent: "Details" nav_order: 1 --- # LLM Wrappers We **don't** provide built-in LLM wrappers. Instead, please implement your own, for example by asking an assistant like ChatGPT or Claude. If you ask ChatGPT to "implement a `call_llm` function that takes a prompt and returns the LLM response," you shall get something like: ```python def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content # Example usage call_llm("How are you?") ``` > Store the API key in an environment variable like OPENAI_API_KEY for security. {: .note } ## Improvements Feel free to enhance your `call_llm` function as needed. Here are examples: - Handle chat history: ```python def call_llm(messages): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=messages ) return r.choices[0].message.content ``` - Add in-memory caching ```python from functools import lru_cache @lru_cache(maxsize=1000) def call_llm(prompt): # Your implementation here pass ``` > ⚠️ Caching conflicts with Node retries, as retries yield the same result. > > To address this, you could use cached results only if not retried. {: .warning } ```python from functools import lru_cache @lru_cache(maxsize=1000) def cached_call(prompt): pass def call_llm(prompt, use_cache): if use_cache: return cached_call(prompt) # Call the underlying function directly return cached_call.__wrapped__(prompt) class SummarizeNode(Node): def exec(self, text): return call_llm(f"Summarize: {text}", self.cur_retry==0) ``` - Enable logging: ```python def call_llm(prompt): import logging logging.info(f"Prompt: {prompt}") response = ... # Your implementation here logging.info(f"Response: {response}") return response ``` ## Why Not Provide Built-in LLM Wrappers? I believe it is a **bad practice** to provide LLM-specific implementations in a general framework: - **LLM APIs change frequently**. Hardcoding them makes maintenance a nighmare. - You may need **flexibility** to switch vendors, use fine-tuned models, or deploy local LLMs. - You may need **optimizations** like prompt caching, request batching, or response streaming. ================================================ File: docs/mapreduce.md ================================================ --- layout: default title: "Map Reduce" parent: "Paradigm" nav_order: 3 --- # Map Reduce Process large inputs by splitting them into chunks using [BatchNode](./batch.md), then combining results. ### Example: Document Summarization ```python class MapSummaries(BatchNode): def prep(self, shared): return [shared["text"][i:i+10000] for i in range(0, len(shared["text"]), 10000)] def exec(self, chunk): return call_llm(f"Summarize this chunk: {chunk}") def post(self, shared, prep_res, exec_res_list): shared["summaries"] = exec_res_list class ReduceSummaries(Node): def prep(self, shared): return shared["summaries"] def exec(self, summaries): return call_llm(f"Combine these summaries: {summaries}") def post(self, shared, prep_res, exec_res): shared["final_summary"] = exec_res # Connect nodes map_node = MapSummaries() reduce_node = ReduceSummaries() map_node >> reduce_node # Create flow summarize_flow = Flow(start=map_node) summarize_flow.run(shared) ``` ================================================ File: docs/memory.md ================================================ --- layout: default title: "Chat Memory" parent: "Paradigm" nav_order: 5 --- # Chat Memory Multi-turn conversations require memory management to maintain context while avoiding overwhelming the LLM. ### 1. Naive Approach: Full History Sending the full chat history may overwhelm LLMs. ```python class ChatNode(Node): def prep(self, shared): if "history" not in shared: shared["history"] = [] user_input = input("You: ") return shared["history"], user_input def exec(self, inputs): history, user_input = inputs messages = [{"role": "system", "content": "You are a helpful assistant"}] for h in history: messages.append(h) messages.append({"role": "user", "content": user_input}) response = call_llm(messages) return response def post(self, shared, prep_res, exec_res): shared["history"].append({"role": "user", "content": prep_res[1]}) shared["history"].append({"role": "assistant", "content": exec_res}) return "continue" chat = ChatNode() chat - "continue" >> chat flow = Flow(start=chat) ``` ### 2. Improved Memory Management We can: 1. Limit the chat history to the most recent 4. 2. Use [vector search](./tool.md) to retrieve relevant exchanges beyond the last 4. ```python class ChatWithMemory(Node): def prep(self, s): # Initialize shared dict s.setdefault("history", []) s.setdefault("memory_index", None) user_input = input("You: ") # Retrieve relevant past if we have enough history and an index relevant = [] if len(s["history"]) > 8 and s["memory_index"]: idx, _ = search_index(s["memory_index"], get_embedding(user_input), top_k=2) relevant = [s["history"][i[0]] for i in idx] return {"user_input": user_input, "recent": s["history"][-8:], "relevant": relevant} def exec(self, c): messages = [{"role": "system", "content": "You are a helpful assistant."}] # Include relevant history if any if c["relevant"]: messages.append({"role": "system", "content": f"Relevant: {c['relevant']}"}) # Add recent history and the current user input messages += c["recent"] + [{"role": "user", "content": c["user_input"]}] return call_llm(messages) def post(self, s, pre, ans): # Update chat history s["history"] += [ {"role": "user", "content": pre["user_input"]}, {"role": "assistant", "content": ans} ] # When first reaching 8 messages, create index if len(s["history"]) == 8: embeddings = [] for i in range(0, 8, 2): e = s["history"][i]["content"] + " " + s["history"][i+1]["content"] embeddings.append(get_embedding(e)) s["memory_index"] = create_index(embeddings) # Embed older exchanges once we exceed 8 messages elif len(s["history"]) > 8: pair = s["history"][-10:-8] embedding = get_embedding(pair[0]["content"] + " " + pair[1]["content"]) s["memory_index"].add(np.array([embedding]).astype('float32')) print(f"Assistant: {ans}") return "continue" chat = ChatWithMemory() chat - "continue" >> chat flow = Flow(start=chat) flow.run({}) ``` ================================================ File: docs/multi_agent.md ================================================ --- layout: default title: "(Advanced) Multi-Agents" parent: "Paradigm" nav_order: 7 --- # (Advanced) Multi-Agents Multiple [Agents](./flow.md) can work together by handling subtasks and communicating the progress. Communication between agents is typically implemented using message queues in shared storage. ### Example Agent Communication: Message Queue Here's a simple example showing how to implement agent communication using `asyncio.Queue`. The agent listens for messages, processes them, and continues listening: ```python class AgentNode(AsyncNode): async def prep_async(self, _): message_queue = self.params["messages"] message = await message_queue.get() print(f"Agent received: {message}") return message # Create node and flow agent = AgentNode() agent >> agent # connect to self flow = AsyncFlow(start=agent) # Create heartbeat sender async def send_system_messages(message_queue): counter = 0 messages = [ "System status: all systems operational", "Memory usage: normal", "Network connectivity: stable", "Processing load: optimal" ] while True: message = f"{messages[counter % len(messages)]} | timestamp_{counter}" await message_queue.put(message) counter += 1 await asyncio.sleep(1) async def main(): message_queue = asyncio.Queue() shared = {} flow.set_params({"messages": message_queue}) # Run both coroutines await asyncio.gather( flow.run_async(shared), send_system_messages(message_queue) ) asyncio.run(main()) ``` The output: ``` Agent received: System status: all systems operational | timestamp_0 Agent received: Memory usage: normal | timestamp_1 Agent received: Network connectivity: stable | timestamp_2 Agent received: Processing load: optimal | timestamp_3 ``` ### Interactive Multi-Agent Example: Taboo Game Here's a more complex example where two agents play the word-guessing game Taboo. One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word: ```python class AsyncHinter(AsyncNode): async def prep_async(self, shared): guess = await shared["hinter_queue"].get() if guess == "GAME_OVER": return None return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", []) async def exec_async(self, inputs): if inputs is None: return None target, forbidden, past_guesses = inputs prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" if past_guesses: prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific." prompt += "\nUse at most 5 words." hint = call_llm(prompt) print(f"\nHinter: Here's your hint - {hint}") return hint async def post_async(self, shared, prep_res, exec_res): if exec_res is None: return "end" await shared["guesser_queue"].put(exec_res) return "continue" class AsyncGuesser(AsyncNode): async def prep_async(self, shared): hint = await shared["guesser_queue"].get() return hint, shared.get("past_guesses", []) async def exec_async(self, inputs): hint, past_guesses = inputs prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:" guess = call_llm(prompt) print(f"Guesser: I guess it's - {guess}") return guess async def post_async(self, shared, prep_res, exec_res): if exec_res.lower() == shared["target_word"].lower(): print("Game Over - Correct guess!") await shared["hinter_queue"].put("GAME_OVER") return "end" if "past_guesses" not in shared: shared["past_guesses"] = [] shared["past_guesses"].append(exec_res) await shared["hinter_queue"].put(exec_res) return "continue" async def main(): # Set up game shared = { "target_word": "nostalgia", "forbidden_words": ["memory", "past", "remember", "feeling", "longing"], "hinter_queue": asyncio.Queue(), "guesser_queue": asyncio.Queue() } print("Game starting!") print(f"Target word: {shared['target_word']}") print(f"Forbidden words: {shared['forbidden_words']}") # Initialize by sending empty guess to hinter await shared["hinter_queue"].put("") # Create nodes and flows hinter = AsyncHinter() guesser = AsyncGuesser() # Set up flows hinter_flow = AsyncFlow(start=hinter) guesser_flow = AsyncFlow(start=guesser) # Connect nodes to themselves hinter - "continue" >> hinter guesser - "continue" >> guesser # Run both agents concurrently await asyncio.gather( hinter_flow.run_async(shared), guesser_flow.run_async(shared) ) asyncio.run(main()) ``` The Output: ``` Game starting! Target word: nostalgia Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing'] Hinter: Here's your hint - Thinking of childhood summer days Guesser: I guess it's - popsicle Hinter: Here's your hint - When childhood cartoons make you emotional Guesser: I guess it's - nostalgic Hinter: Here's your hint - When old songs move you Guesser: I guess it's - memories Hinter: Here's your hint - That warm emotion about childhood Guesser: I guess it's - nostalgia Game Over - Correct guess! ``` ================================================ File: docs/node.md ================================================ --- layout: default title: "Node" parent: "Core Abstraction" nav_order: 1 --- # Node A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`: 1. `prep(shared)` - **Read and preprocess data** from `shared` store. - Examples: *query DB, read files, or serialize data into a string*. - Return `prep_res`, which is used by `exec()` and `post()`. 2. `exec(prep_res)` - **Execute compute logic**, with optional retries and error handling (below). - Examples: *(mostly) LLM calls, remote APIs, tool use*. - ⚠️ This shall be only for compute and **NOT** access `shared`. - ⚠️ If retries enabled, ensure idempotent implementation. - Return `exec_res`, which is passed to `post()`. 3. `post(shared, prep_res, exec_res)` - **Postprocess and write data** back to `shared`. - Examples: *update DB, change states, log results*. - **Decide the next action** by returning a *string* (`action = "default"` if *None*). > **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. > > All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. {: .note } ### Fault Tolerance & Retries You can **retry** `exec()` if it raises an exception via two parameters when define the Node: - `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). - `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). `wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. ```python my_node = SummarizeFile(max_retries=3, wait=10) ``` When an exception occurs in `exec()`, the Node automatically retries until: - It either succeeds, or - The Node has retried `max_retries - 1` times already and fails on the last attempt. You can get the current retry times (0-based) from `self.cur_retry`. ```python class RetryNode(Node): def exec(self, prep_res): print(f"Retry {self.cur_retry} times") raise Exception("Failed") ``` ### Graceful Fallback To **gracefully handle** the exception (after all retries) rather than raising it, override: ```python def exec_fallback(self, shared, prep_res, exc): raise exc ``` By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. ### Example: Summarize file ```python class SummarizeFile(Node): def prep(self, shared): return shared["data"] def exec(self, prep_res): if not prep_res: return "Empty file content" prompt = f"Summarize this text in 10 words: {prep_res}" summary = call_llm(prompt) # might fail return summary def exec_fallback(self, shared, prep_res, exc): # Provide a simple fallback instead of crashing return "There was an error processing your request." def post(self, shared, prep_res, exec_res): shared["summary"] = exec_res # Return "default" by not returning summarize_node = SummarizeFile(max_retries=3) # node.run() calls prep->exec->post # If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = summarize_node.run(shared) print("Action returned:", action_result) # "default" print("Summary stored:", shared["summary"]) ``` ================================================ File: docs/paradigm.md ================================================ --- layout: default title: "Paradigm" nav_order: 4 has_children: true --- ================================================ File: docs/parallel.md ================================================ --- layout: default title: "(Advanced) Parallel" parent: "Core Abstraction" nav_order: 6 --- # (Advanced) Parallel **Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. > Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning } ## AsyncParallelBatchNode Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: ```python class ParallelSummaries(AsyncParallelBatchNode): async def prep_async(self, shared): # e.g., multiple texts return shared["texts"] async def exec_async(self, text): prompt = f"Summarize: {text}" return await call_llm_async(prompt) async def post_async(self, shared, prep_res, exec_res_list): shared["summary"] = "\n\n".join(exec_res_list) return "default" node = ParallelSummaries() flow = AsyncFlow(start=node) ``` ## AsyncParallelBatchFlow Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: ```python class SummarizeMultipleFiles(AsyncParallelBatchFlow): async def prep_async(self, shared): return [{"filename": f} for f in shared["files"]] sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) parallel_flow = SummarizeMultipleFiles(start=sub_flow) await parallel_flow.run_async(shared) ``` ## Best Practices - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. ================================================ File: docs/preparation.md ================================================ --- layout: default title: "Details" nav_order: 3 has_children: true --- ================================================ File: docs/rag.md ================================================ --- layout: default title: "RAG" parent: "Paradigm" nav_order: 4 --- # RAG (Retrieval Augmented Generation) For certain LLM tasks like answering questions, providing context is essential. Use [vector search](./tool.md) to find relevant context for LLM responses. ### Example: Question Answering ```python class PrepareEmbeddings(Node): def prep(self, shared): texts = shared["texts"] embeddings = [get_embedding(text) for text in texts] shared["search_index"] = create_index(embeddings) class AnswerQuestion(Node): def prep(self, shared): question = input("Enter question: ") query_embedding = get_embedding(question) indices, _ = search_index(shared["search_index"], query_embedding, top_k=1) relevant_text = shared["texts"][indices[0][0]] return question, relevant_text def exec(self, inputs): question, context = inputs prompt = f"Question: {question}\nContext: {context}\nAnswer: " return call_llm(prompt) def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") # Connect nodes prep = PrepareEmbeddings() qa = AnswerQuestion() prep >> qa # Create flow qa_flow = Flow(start=prep) qa_flow.run(shared) ``` ================================================ File: docs/structure.md ================================================ --- layout: default title: "Structured Output" parent: "Paradigm" nav_order: 1 --- # Structured Output In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. There are several approaches to achieve a structured output: - **Prompting** the LLM to strictly return a defined structure. - Using LLMs that natively support **schema enforcement**. - **Post-processing** the LLM's response to extract structured content. In practice, **Prompting** is simple and reliable for modern LLMs. ### Example Use Cases - Extracting Key Information ```yaml product: name: Widget Pro price: 199.99 description: | A high-quality widget designed for professionals. Recommended for advanced users. ``` - Summarizing Documents into Bullet Points ```yaml summary: - This product is easy to use. - It is cost-effective. - Suitable for all skill levels. ``` - Generating Configuration Files ```yaml server: host: 127.0.0.1 port: 8080 ssl: true ``` ## Prompt Engineering When prompting the LLM to produce **structured** output: 1. **Wrap** the structure in code fences (e.g., `yaml`). 2. **Validate** that all required fields exist (and let `Node` handles retry). ### Example Text Summarization ```python class SummarizeNode(Node): def exec(self, prep_res): # Suppose `prep_res` is the text to summarize. prompt = f""" Please summarize the following text as YAML, with exactly 3 bullet points {prep_res} Now, output: ```yaml summary: - bullet 1 - bullet 2 - bullet 3 ```""" response = call_llm(prompt) yaml_str = response.split("```yaml")[1].split("```")[0].strip() import yaml structured_result = yaml.safe_load(yaml_str) assert "summary" in structured_result assert isinstance(structured_result["summary"], list) return structured_result ``` ### Why YAML instead of JSON? Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. **In JSON** ```json { "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" } ``` - Every double quote inside the string must be escaped with `\"`. - Each newline in the dialogue must be represented as `\n`. **In YAML** ```yaml dialogue: | Alice said: "Hello Bob. How are you? I am good." ``` - No need to escape interior quotes—just place the entire text under a block literal (`|`). - Newlines are naturally preserved without needing `\n`. ================================================ File: docs/tool.md ================================================ --- layout: default title: "Tool" parent: "Details" nav_order: 2 --- # Tool Similar to LLM wrappers, we **don't** provide built-in tools. Here, we recommend some *minimal* (and incomplete) implementations of commonly used tools. These examples can serve as a starting point for your own tooling. --- ## 1. Embedding Calls ```python def get_embedding(text): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.embeddings.create( model="text-embedding-ada-002", input=text ) return r.data[0].embedding get_embedding("What's the meaning of life?") ``` --- ## 2. Vector Database (Faiss) ```python import faiss import numpy as np def create_index(embeddings): dim = len(embeddings[0]) index = faiss.IndexFlatL2(dim) index.add(np.array(embeddings).astype('float32')) return index def search_index(index, query_embedding, top_k=5): D, I = index.search( np.array([query_embedding]).astype('float32'), top_k ) return I, D index = create_index(embeddings) search_index(index, query_embedding) ``` --- ## 3. Local Database ```python import sqlite3 def execute_sql(query): conn = sqlite3.connect("mydb.db") cursor = conn.cursor() cursor.execute(query) result = cursor.fetchall() conn.commit() conn.close() return result ``` > ⚠️ Beware of SQL injection risk {: .warning } --- ## 4. Python Function Execution ```python def run_code(code_str): env = {} exec(code_str, env) return env run_code("print('Hello, world!')") ``` > ⚠️ exec() is dangerous with untrusted input {: .warning } --- ## 5. PDF Extraction If your PDFs are text-based, use PyMuPDF: ```python import fitz # PyMuPDF def extract_text(pdf_path): doc = fitz.open(pdf_path) text = "" for page in doc: text += page.get_text() doc.close() return text extract_text("document.pdf") ``` For image-based PDFs (e.g., scanned), OCR is needed. A easy and fast option is using an LLM with vision capabilities: ```python from openai import OpenAI import base64 def call_llm_vision(prompt, image_data): client = OpenAI(api_key="YOUR_API_KEY_HERE") img_base64 = base64.b64encode(image_data).decode('utf-8') response = client.chat.completions.create( model="gpt-4o", messages=[{ "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_base64}"}} ] }] ) return response.choices[0].message.content pdf_document = fitz.open("document.pdf") page_num = 0 page = pdf_document[page_num] pix = page.get_pixmap() img_data = pix.tobytes("png") call_llm_vision("Extract text from this image", img_data) ``` --- ## 6. Web Crawling ```python def crawl_web(url): import requests from bs4 import BeautifulSoup html = requests.get(url).text soup = BeautifulSoup(html, "html.parser") return soup.title.string, soup.get_text() ``` --- ## 7. Basic Search (SerpAPI example) ```python def search_google(query): import requests params = { "engine": "google", "q": query, "api_key": "YOUR_API_KEY" } r = requests.get("https://serpapi.com/search", params=params) return r.json() ``` --- ## 8. Audio Transcription (OpenAI Whisper) ```python def transcribe_audio(file_path): import openai audio_file = open(file_path, "rb") transcript = openai.Audio.transcribe("whisper-1", audio_file) return transcript["text"] ``` --- ## 9. Text-to-Speech (TTS) ```python def text_to_speech(text): import pyttsx3 engine = pyttsx3.init() engine.say(text) engine.runAndWait() ``` --- ## 10. Sending Email ```python def send_email(to_address, subject, body, from_address, password): import smtplib from email.mime.text import MIMEText msg = MIMEText(body) msg["Subject"] = subject msg["From"] = from_address msg["To"] = to_address with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server: server.login(from_address, password) server.sendmail(from_address, [to_address], msg.as_string()) ``` ================================================ File: docs/viz.md ================================================ --- layout: default title: "Viz and Debug" parent: "Details" nav_order: 3 --- # Visualization and Debugging Similar to LLM wrappers, we **don't** provide built-in visualization and debugging. Here, we recommend some *minimal* (and incomplete) implementations These examples can serve as a starting point for your own tooling. ## 1. Visualization with Mermaid This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization. {% raw %} ```python def build_mermaid(start): ids, visited, lines = {}, set(), ["graph LR"] ctr = 1 def get_id(n): nonlocal ctr return ids[n] if n in ids else (ids.setdefault(n, f"N{ctr}"), (ctr := ctr + 1))[0] def link(a, b): lines.append(f" {a} --> {b}") def walk(node, parent=None): if node in visited: return parent and link(parent, get_id(node)) visited.add(node) if isinstance(node, Flow): node.start and parent and link(parent, get_id(node.start)) lines.append(f"\n subgraph sub_flow_{get_id(node)}[{type(node).__name__}]") node.start and walk(node.start) for nxt in node.successors.values(): node.start and walk(nxt, get_id(node.start)) or (parent and link(parent, get_id(nxt))) or walk(nxt) lines.append(" end\n") else: lines.append(f" {(nid := get_id(node))}['{type(node).__name__}']") parent and link(parent, nid) [walk(nxt, nid) for nxt in node.successors.values()] walk(start) return "\n".join(lines) ``` {% endraw %} For example, suppose we have a complex Flow for data science: ```python class DataPrepBatchNode(BatchNode): def prep(self,shared): return [] class ValidateDataNode(Node): pass class FeatureExtractionNode(Node): pass class TrainModelNode(Node): pass class EvaluateModelNode(Node): pass class ModelFlow(Flow): pass class DataScienceFlow(Flow):pass feature_node = FeatureExtractionNode() train_node = TrainModelNode() evaluate_node = EvaluateModelNode() feature_node >> train_node >> evaluate_node model_flow = ModelFlow(start=feature_node) data_prep_node = DataPrepBatchNode() validate_node = ValidateDataNode() data_prep_node >> validate_node >> model_flow data_science_flow = DataScienceFlow(start=data_prep_node) result = build_mermaid(start=data_science_flow) ``` The code generates a Mermaid diagram: ```mermaid graph LR subgraph sub_flow_N1[DataScienceFlow] N2['DataPrepBatchNode'] N3['ValidateDataNode'] N2 --> N3 N3 --> N4 subgraph sub_flow_N5[ModelFlow] N4['FeatureExtractionNode'] N6['TrainModelNode'] N4 --> N6 N7['EvaluateModelNode'] N6 --> N7 end end ``` ## 2. Call Stack Debugging It would be useful to print the Node call stacks for debugging. This can be achieved by inspecting the runtime call stack: ```python import inspect def get_node_call_stack(): stack = inspect.stack() node_names = [] seen_ids = set() for frame_info in stack[1:]: local_vars = frame_info.frame.f_locals if 'self' in local_vars: caller_self = local_vars['self'] if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids: seen_ids.add(id(caller_self)) node_names.append(type(caller_self).__name__) return node_names ``` For example, suppose we have a complex Flow for data science: ```python class DataPrepBatchNode(BatchNode): def prep(self, shared): return [] class ValidateDataNode(Node): pass class FeatureExtractionNode(Node): pass class TrainModelNode(Node): pass class EvaluateModelNode(Node): def prep(self, shared): stack = get_node_call_stack() print("Call stack:", stack) class ModelFlow(Flow): pass class DataScienceFlow(Flow):pass feature_node = FeatureExtractionNode() train_node = TrainModelNode() evaluate_node = EvaluateModelNode() feature_node >> train_node >> evaluate_node model_flow = ModelFlow(start=feature_node) data_prep_node = DataPrepBatchNode() validate_node = ValidateDataNode() data_prep_node >> validate_node >> model_flow data_science_flow = DataScienceFlow(start=data_prep_node) data_science_flow.run({}) ``` The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']`