{ "cells": [ { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "No relevant file found: the question has no relevant file because while some files discuss startups, none specifically address how to find or generate startup ideas\n", "No question asked\n" ] }, { "data": { "text/plain": [ "'default'" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Example App for text summarization & QA using minillmflow\n", "from minillmflow import Node, BatchNode, Flow, BatchFlow, AsyncNode, AsyncFlow, BatchAsyncFlow\n", "import os\n", "\n", "# 1) Implement a simple LLM helper (OpenAI in this example).\n", "def call_LLM(prompt):\n", " # Users must set an OpenAI API key; can also load from env var, etc.\n", " openai.api_key = \"YOUR_API_KEY_HERE\"\n", " r = openai.ChatCompletion.create(\n", " model=\"gpt-4\",\n", " messages=[{\"role\": \"user\", \"content\": prompt}]\n", " )\n", " return r.choices[0].message.content\n", "\n", "# 2) Create a shared store (dict) for Node/Flow data exchange.\n", "# This can be replaced with a DB or other storage.\n", "# Design the structure / schema based on the app requirements.\n", "shared = {\"data\": {}, \"summary\": {}}\n", "\n", "# 3) Create a Node that loads data from disk into shared['data'].\n", "class LoadData(Node):\n", " # For compute-intensive operations, do them in prep().\n", " def prep(self, shared):\n", " path = \"../data/PaulGrahamEssaysLarge\"\n", " for filename in os.listdir(path):\n", " with open(os.path.join(path, filename), 'r') as f:\n", " shared['data'][filename] = f.read()\n", " # If LLM was needed, we'd handle it in exec(). Not needed here.\n", " # (idempotent so it can be retried if needed)\n", " def exec(self,shared,prep_res): pass \n", " # post() can update shared again or decide the next node (by return the action).\n", " def post(self,shared,prep_res,exec_res): pass \n", "\n", "load_data = LoadData()\n", "# Run the data-loading node once\n", "load_data.run(shared)\n", "\n", "# 4) Create a Node that summarizes a single file using the LLM.\n", "class SummarizeFile(Node):\n", " def prep(self, shared):\n", " # Use self.params (which must remain immutable during prep/exec/post).\n", " # Typically, we only store identifying info in params (e.g., filename).\n", " content = shared['data'][self.params['filename']]\n", " return content\n", " def exec(self, shared, prep_res):\n", " content = prep_res\n", " prompt = f\"{content} Respond a summary of above in 10 words\"\n", " summary = call_llm(prompt)\n", " return summary\n", " def post(self, shared, prep_res, exec_res):\n", " shared[\"summary\"][self.params['filename']] = exec_res\n", "\n", "summarize_file = SummarizeFile()\n", "# For testing, we set params directly on the node.\n", "# In real usage, you'd set them in a Flow or BatchFlow.\n", "summarize_file.set_params({\"filename\":\"addiction.txt\"})\n", "summarize_file.run(shared)\n", "\n", "# 5) If data is large, we can apply a map-reduce pattern:\n", "# - MapSummaries(BatchNode) => chunk the file and summarize each chunk\n", "# - ReduceSummaries(Node) => combine those chunk-level summaries\n", "class MapSummaries(BatchNode):\n", " def prep(self, shared):\n", " content = shared['data'][self.params['filename']]\n", " chunk_size = 10000\n", " chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]\n", " # Must return an iterable (list or generator) for a BatchNode.\n", " return chunks\n", " def exec(self, shared, prep_res):\n", " # Each iteration of prep_res corresponds to a single chunk.\n", " chunk = prep_res\n", " prompt = f\"{chunk} Respond a summary of above in 10 words\"\n", " summary = call_llm(prompt)\n", " return summary\n", " def post(self, shared, prep_res, exec_res):\n", " # exec_res is a list of exec() results (summaries for each chunk).\n", " combined_summary = [f\"{i}. {summary}\" for i, summary in enumerate(exec_res)]\n", " shared[\"summary\"][self.params['filename']] = combined_summary\n", "\n", "class ReduceSummaries(Node):\n", " def prep(self, shared):\n", " # Retrieve the list of chunk summaries from shared storage\n", " return shared[\"summary\"][self.params['filename']]\n", " def exec(self, shared, prep_res):\n", " combined_summary = prep_res\n", " prompt = f\"{combined_summary} Respond a summary of above in 10 words\"\n", " summary = call_llm(prompt)\n", " return summary\n", " def post(self, shared, prep_res, exec_res):\n", " # Store the combined summary as the final summary for this file.\n", " shared[\"summary\"][self.params['filename']] = exec_res\n", " \n", "map_summaries = MapSummaries()\n", "reduce_summaries = ReduceSummaries()\n", "# Link map_summaries to reduce_summaries with an action\n", "# By default, the action is \"default\" (when post returns None, it takes \"default\" action)\n", "# This is the same as map_summaries - \"default\" >> reduce_summaries\n", "map_summaries >> reduce_summaries\n", "\n", "# We don't directly call map_summaries.run(shared), \n", "# because that alone would process only the map step without reduce.\n", "\n", "# 6) Instead, create a Flow that starts from map_summaries (a Node) \n", "# and automatically includes reduce_summaries. \n", "# Note: A Flow can also start from any other Flow or BatchFlow.\n", "\n", "\n", "file_summary_flow = Flow(start=map_summaries)\n", "# When a flow params is set, it will recursively set its params to all nodes in the flow\n", "file_summary_flow.set_params({\"filename\":\"before.txt\"})\n", "file_summary_flow.run(shared)\n", "\n", "# 7) Summarize all files using a BatchFlow that reruns file_summary_flow for each file\n", "class SummarizeAllFiles(BatchFlow):\n", " def prep(self, shared):\n", " # Return a list of parameters to apply in each flow iteration.\n", " # Each individual param will be merged with this node's own params \n", " # Allowing nesting of multi-level BatchFlow. \n", " # E.g., first level diretcory, second level file.\n", " return [{\"filename\":filename} for filename in shared['data']]\n", "\n", "summarize_all_files = SummarizeAllFiles(start=file_summary_flow)\n", "summarize_all_files.run(shared)\n", "\n", "\n", "# 8) QA Agent: Find the most relevant file based on summary with actions\n", "# if no question is asked:\n", "# (a) end: terminate the flow \n", "# if question is asked:\n", "# if relevant file is found:\n", "# (b) answer: move to answer node and read the whole file to answer the question\n", "# if no relevant file is found:\n", "# (c) retry: retry the process to find the relevant file\n", "class FindRelevantFile(Node):\n", " def prep(self, shared):\n", " question = input(\"Enter a question: \")\n", " formatted_list = [f\"- '{filename}': {shared['summary'][filename]}\" \n", " for filename in shared['summary']]\n", " return question, formatted_list\n", " def exec(self, shared, prep_res):\n", " question, formatted_list = prep_res\n", " if not question:\n", " return {\"think\":\"no question\", \"has_relevant\":False}\n", " # Provide a structured YAML output that includes:\n", " # - The chain of thought\n", " # - Whether any relevant file was found\n", " # - The most relevant file if found\n", " prompt = f\"\"\"Question: {question} \n", "Find the most relevant file from: \n", "{formatted_list}\n", "If no relevant file, explain why\n", "Respond in yaml without additional information:\n", "think: the question has/has no relevant file ...\n", "has_relevant: true/false\n", "most_relevant: filename\"\"\"\n", " response = call_llm(prompt)\n", " import yaml\n", " result = yaml.safe_load(response)\n", " # Ensure required fields are present\n", " assert \"think\" in result\n", " assert \"has_relevant\" in result\n", " assert \"most_relevant\" in result if result[\"has_relevant\"] else True\n", " return result\n", " # handle errors by returning a default response in case of exception after retries\n", " def process_after_fail(self,shared,prep_res,exc):\n", " # if not overridden, the default is to throw the exception\n", " return {\"think\":\"error finding the file\", \"has_relevant\":False}\n", " def post(self, shared, prep_res, exec_res):\n", " question, _ = prep_res\n", " # Decide what to do next based on the results\n", " if not question:\n", " print(f\"No question asked\")\n", " return \"end\"\n", " if exec_res[\"has_relevant\"]:\n", " # Store the question and most relevant file in shared\n", " shared[\"question\"] = question\n", " shared[\"relevant_file\"] = exec_res['most_relevant']\n", " print(f\"Relevant file found: {exec_res['most_relevant']}\")\n", " return \"answer\"\n", " else:\n", " print(f\"No relevant file found: {exec_res['think']}\")\n", " return \"retry\"\n", "\n", "class AnswerQuestion(Node):\n", " def prep(self, shared):\n", " question = shared['question']\n", " relevant_file = shared['relevant_file']\n", " # Read the whole file content\n", " file_content = shared['data'][relevant_file]\n", " return question, file_content\n", " def exec(self, shared, prep_res):\n", " question, file_content = prep_res\n", " prompt = f\"\"\"Question: {question}\n", "File: {file_content}\n", "Answer the question in 50 words\"\"\"\n", " response = call_llm(prompt)\n", " return response\n", " def post(self, shared, prep_res, exec_res):\n", " print(f\"Answer: {exec_res}\")\n", "\n", "class NoOp(Node):\n", " pass\n", "\n", "# Configure the QA agent with appropriate transitions and retries\n", "find_relevant_file = FindRelevantFile(max_retries=3)\n", "answer_question = AnswerQuestion()\n", "no_op = NoOp()\n", "\n", "# Connect the nodes based on the actions they return\n", "find_relevant_file - \"answer\" >> answer_question >> find_relevant_file\n", "find_relevant_file - \"retry\" >> find_relevant_file\n", "find_relevant_file - \"end\" >> no_op\n", "\n", "qa_agent = Flow(start=find_relevant_file)\n", "qa_agent.run(shared)" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.2" } }, "nbformat": 4, "nbformat_minor": 2 }