268 lines
12 KiB
Plaintext
268 lines
12 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"No relevant file found: the question has no relevant file because while some files discuss startups, none specifically address how to find or generate startup ideas\n",
|
|
"No question asked\n"
|
|
]
|
|
},
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"'default'"
|
|
]
|
|
},
|
|
"execution_count": 2,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"# Example App for text summarization & QA using minillmflow\n",
|
|
"from minillmflow import Node, BatchNode, Flow, BatchFlow, AsyncNode, AsyncFlow, BatchAsyncFlow\n",
|
|
"import os\n",
|
|
"\n",
|
|
"# 1) Implement a simple LLM helper (OpenAI in this example).\n",
|
|
"def call_LLM(prompt):\n",
|
|
" # Users must set an OpenAI API key; can also load from env var, etc.\n",
|
|
" openai.api_key = \"YOUR_API_KEY_HERE\"\n",
|
|
" r = openai.ChatCompletion.create(\n",
|
|
" model=\"gpt-4\",\n",
|
|
" messages=[{\"role\": \"user\", \"content\": prompt}]\n",
|
|
" )\n",
|
|
" return r.choices[0].message.content\n",
|
|
"\n",
|
|
"# 2) Create a shared store (dict) for Node/Flow data exchange.\n",
|
|
"# This can be replaced with a DB or other storage.\n",
|
|
"# Design the structure / schema based on the app requirements.\n",
|
|
"shared = {\"data\": {}, \"summary\": {}}\n",
|
|
"\n",
|
|
"# 3) Create a Node that loads data from disk into shared['data'].\n",
|
|
"class LoadData(Node):\n",
|
|
" # For compute-intensive operations, do them in prep().\n",
|
|
" def prep(self, shared):\n",
|
|
" path = \"../data/PaulGrahamEssaysLarge\"\n",
|
|
" for filename in os.listdir(path):\n",
|
|
" with open(os.path.join(path, filename), 'r') as f:\n",
|
|
" shared['data'][filename] = f.read()\n",
|
|
" # If LLM was needed, we'd handle it in exec(). Not needed here.\n",
|
|
" # (idempotent so it can be retried if needed)\n",
|
|
" def exec(self,shared,prep_res): pass \n",
|
|
" # post() can update shared again or decide the next node (by return the action).\n",
|
|
" def post(self,shared,prep_res,exec_res): pass \n",
|
|
"\n",
|
|
"load_data = LoadData()\n",
|
|
"# Run the data-loading node once\n",
|
|
"load_data.run(shared)\n",
|
|
"\n",
|
|
"# 4) Create a Node that summarizes a single file using the LLM.\n",
|
|
"class SummarizeFile(Node):\n",
|
|
" def prep(self, shared):\n",
|
|
" # Use self.params (which must remain immutable during prep/exec/post).\n",
|
|
" # Typically, we only store identifying info in params (e.g., filename).\n",
|
|
" content = shared['data'][self.params['filename']]\n",
|
|
" return content\n",
|
|
" def exec(self, shared, prep_res):\n",
|
|
" content = prep_res\n",
|
|
" prompt = f\"{content} Respond a summary of above in 10 words\"\n",
|
|
" summary = call_llm(prompt)\n",
|
|
" return summary\n",
|
|
" def post(self, shared, prep_res, exec_res):\n",
|
|
" shared[\"summary\"][self.params['filename']] = exec_res\n",
|
|
"\n",
|
|
"summarize_file = SummarizeFile()\n",
|
|
"# For testing, we set params directly on the node.\n",
|
|
"# In real usage, you'd set them in a Flow or BatchFlow.\n",
|
|
"summarize_file.set_params({\"filename\":\"addiction.txt\"})\n",
|
|
"summarize_file.run(shared)\n",
|
|
"\n",
|
|
"# 5) If data is large, we can apply a map-reduce pattern:\n",
|
|
"# - MapSummaries(BatchNode) => chunk the file and summarize each chunk\n",
|
|
"# - ReduceSummaries(Node) => combine those chunk-level summaries\n",
|
|
"class MapSummaries(BatchNode):\n",
|
|
" def prep(self, shared):\n",
|
|
" content = shared['data'][self.params['filename']]\n",
|
|
" chunk_size = 10000\n",
|
|
" chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]\n",
|
|
" # Must return an iterable (list or generator) for a BatchNode.\n",
|
|
" return chunks\n",
|
|
" def exec(self, shared, prep_res):\n",
|
|
" # Each iteration of prep_res corresponds to a single chunk.\n",
|
|
" chunk = prep_res\n",
|
|
" prompt = f\"{chunk} Respond a summary of above in 10 words\"\n",
|
|
" summary = call_llm(prompt)\n",
|
|
" return summary\n",
|
|
" def post(self, shared, prep_res, exec_res):\n",
|
|
" # exec_res is a list of exec() results (summaries for each chunk).\n",
|
|
" combined_summary = [f\"{i}. {summary}\" for i, summary in enumerate(exec_res)]\n",
|
|
" shared[\"summary\"][self.params['filename']] = combined_summary\n",
|
|
"\n",
|
|
"class ReduceSummaries(Node):\n",
|
|
" def prep(self, shared):\n",
|
|
" # Retrieve the list of chunk summaries from shared storage\n",
|
|
" return shared[\"summary\"][self.params['filename']]\n",
|
|
" def exec(self, shared, prep_res):\n",
|
|
" combined_summary = prep_res\n",
|
|
" prompt = f\"{combined_summary} Respond a summary of above in 10 words\"\n",
|
|
" summary = call_llm(prompt)\n",
|
|
" return summary\n",
|
|
" def post(self, shared, prep_res, exec_res):\n",
|
|
" # Store the combined summary as the final summary for this file.\n",
|
|
" shared[\"summary\"][self.params['filename']] = exec_res\n",
|
|
" \n",
|
|
"map_summaries = MapSummaries()\n",
|
|
"reduce_summaries = ReduceSummaries()\n",
|
|
"# Link map_summaries to reduce_summaries with an action\n",
|
|
"# By default, the action is \"default\" (when post returns None, it takes \"default\" action)\n",
|
|
"# This is the same as map_summaries - \"default\" >> reduce_summaries\n",
|
|
"map_summaries >> reduce_summaries\n",
|
|
"\n",
|
|
"# We don't directly call map_summaries.run(shared), \n",
|
|
"# because that alone would process only the map step without reduce.\n",
|
|
"\n",
|
|
"# 6) Instead, create a Flow that starts from map_summaries (a Node) \n",
|
|
"# and automatically includes reduce_summaries. \n",
|
|
"# Note: A Flow can also start from any other Flow or BatchFlow.\n",
|
|
"\n",
|
|
"\n",
|
|
"file_summary_flow = Flow(start=map_summaries)\n",
|
|
"# When a flow params is set, it will recursively set its params to all nodes in the flow\n",
|
|
"file_summary_flow.set_params({\"filename\":\"before.txt\"})\n",
|
|
"file_summary_flow.run(shared)\n",
|
|
"\n",
|
|
"# 7) Summarize all files using a BatchFlow that reruns file_summary_flow for each file\n",
|
|
"class SummarizeAllFiles(BatchFlow):\n",
|
|
" def prep(self, shared):\n",
|
|
" # Return a list of parameters to apply in each flow iteration.\n",
|
|
" # Each individual param will be merged with this node's own params \n",
|
|
" # Allowing nesting of multi-level BatchFlow. \n",
|
|
" # E.g., first level diretcory, second level file.\n",
|
|
" return [{\"filename\":filename} for filename in shared['data']]\n",
|
|
"\n",
|
|
"summarize_all_files = SummarizeAllFiles(start=file_summary_flow)\n",
|
|
"summarize_all_files.run(shared)\n",
|
|
"\n",
|
|
"\n",
|
|
"# 8) QA Agent: Find the most relevant file based on summary with actions\n",
|
|
"# if no question is asked:\n",
|
|
"# (a) end: terminate the flow \n",
|
|
"# if question is asked:\n",
|
|
"# if relevant file is found:\n",
|
|
"# (b) answer: move to answer node and read the whole file to answer the question\n",
|
|
"# if no relevant file is found:\n",
|
|
"# (c) retry: retry the process to find the relevant file\n",
|
|
"class FindRelevantFile(Node):\n",
|
|
" def prep(self, shared):\n",
|
|
" question = input(\"Enter a question: \")\n",
|
|
" formatted_list = [f\"- '{filename}': {shared['summary'][filename]}\" \n",
|
|
" for filename in shared['summary']]\n",
|
|
" return question, formatted_list\n",
|
|
" def exec(self, shared, prep_res):\n",
|
|
" question, formatted_list = prep_res\n",
|
|
" if not question:\n",
|
|
" return {\"think\":\"no question\", \"has_relevant\":False}\n",
|
|
" # Provide a structured YAML output that includes:\n",
|
|
" # - The chain of thought\n",
|
|
" # - Whether any relevant file was found\n",
|
|
" # - The most relevant file if found\n",
|
|
" prompt = f\"\"\"Question: {question} \n",
|
|
"Find the most relevant file from: \n",
|
|
"{formatted_list}\n",
|
|
"If no relevant file, explain why\n",
|
|
"Respond in yaml without additional information:\n",
|
|
"think: the question has/has no relevant file ...\n",
|
|
"has_relevant: true/false\n",
|
|
"most_relevant: filename\"\"\"\n",
|
|
" response = call_llm(prompt)\n",
|
|
" import yaml\n",
|
|
" result = yaml.safe_load(response)\n",
|
|
" # Ensure required fields are present\n",
|
|
" assert \"think\" in result\n",
|
|
" assert \"has_relevant\" in result\n",
|
|
" assert \"most_relevant\" in result if result[\"has_relevant\"] else True\n",
|
|
" return result\n",
|
|
" # handle errors by returning a default response in case of exception after retries\n",
|
|
" def process_after_fail(self,shared,prep_res,exc):\n",
|
|
" # if not overridden, the default is to throw the exception\n",
|
|
" return {\"think\":\"error finding the file\", \"has_relevant\":False}\n",
|
|
" def post(self, shared, prep_res, exec_res):\n",
|
|
" question, _ = prep_res\n",
|
|
" # Decide what to do next based on the results\n",
|
|
" if not question:\n",
|
|
" print(f\"No question asked\")\n",
|
|
" return \"end\"\n",
|
|
" if exec_res[\"has_relevant\"]:\n",
|
|
" # Store the question and most relevant file in shared\n",
|
|
" shared[\"question\"] = question\n",
|
|
" shared[\"relevant_file\"] = exec_res['most_relevant']\n",
|
|
" print(f\"Relevant file found: {exec_res['most_relevant']}\")\n",
|
|
" return \"answer\"\n",
|
|
" else:\n",
|
|
" print(f\"No relevant file found: {exec_res['think']}\")\n",
|
|
" return \"retry\"\n",
|
|
"\n",
|
|
"class AnswerQuestion(Node):\n",
|
|
" def prep(self, shared):\n",
|
|
" question = shared['question']\n",
|
|
" relevant_file = shared['relevant_file']\n",
|
|
" # Read the whole file content\n",
|
|
" file_content = shared['data'][relevant_file]\n",
|
|
" return question, file_content\n",
|
|
" def exec(self, shared, prep_res):\n",
|
|
" question, file_content = prep_res\n",
|
|
" prompt = f\"\"\"Question: {question}\n",
|
|
"File: {file_content}\n",
|
|
"Answer the question in 50 words\"\"\"\n",
|
|
" response = call_llm(prompt)\n",
|
|
" return response\n",
|
|
" def post(self, shared, prep_res, exec_res):\n",
|
|
" print(f\"Answer: {exec_res}\")\n",
|
|
"\n",
|
|
"class NoOp(Node):\n",
|
|
" pass\n",
|
|
"\n",
|
|
"# Configure the QA agent with appropriate transitions and retries\n",
|
|
"find_relevant_file = FindRelevantFile(max_retries=3)\n",
|
|
"answer_question = AnswerQuestion()\n",
|
|
"no_op = NoOp()\n",
|
|
"\n",
|
|
"# Connect the nodes based on the actions they return\n",
|
|
"find_relevant_file - \"answer\" >> answer_question >> find_relevant_file\n",
|
|
"find_relevant_file - \"retry\" >> find_relevant_file\n",
|
|
"find_relevant_file - \"end\" >> no_op\n",
|
|
"\n",
|
|
"qa_agent = Flow(start=find_relevant_file)\n",
|
|
"qa_agent.run(shared)"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": ".venv",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.9.2"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
}
|