pocketflow/cookbook/demo.ipynb

283 lines
12 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example App for text summarization & QA using minillmflow\n",
"from minillmflow import Node, BatchNode, Flow, BatchFlow, AsyncNode, AsyncFlow, BatchAsyncFlow\n",
"import os\n",
"\n",
"# 1) Implement a simple LLM helper (OpenAI in this example).\n",
"def call_LLM(prompt):\n",
" # Users must set an OpenAI API key; can also load from env var, etc.\n",
" openai.api_key = \"YOUR_API_KEY_HERE\"\n",
" r = openai.ChatCompletion.create(\n",
" model=\"gpt-4\",\n",
" messages=[{\"role\": \"user\", \"content\": prompt}]\n",
" )\n",
" return r.choices[0].message.content"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 2) Create a shared store (dict) for Node/Flow data exchange.\n",
"# This can be replaced with a DB or other storage.\n",
"# Design the structure / schema based on the app requirements.\n",
"shared = {\"data\": {}, \"summary\": {}}\n",
"\n",
"# 3) Create a Node that loads data from disk into shared['data'].\n",
"class LoadData(Node):\n",
" # For compute-intensive operations, do them in prep().\n",
" def prep(self, shared):\n",
" path = \"../data/PaulGrahamEssaysLarge\"\n",
" for filename in os.listdir(path):\n",
" with open(os.path.join(path, filename), 'r') as f:\n",
" shared['data'][filename] = f.read()\n",
" # If LLM was needed, we'd handle it in exec(). Not needed here.\n",
" # (idempotent so it can be retried if needed)\n",
" def exec(self,shared,prep_res): pass \n",
" # post() can update shared again or decide the next node (by return the action).\n",
" def post(self,shared,prep_res,exec_res): pass \n",
"\n",
"load_data = LoadData()\n",
"# Run the data-loading node once\n",
"load_data.run(shared)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 4) Create a Node that summarizes a single file using the LLM.\n",
"class SummarizeFile(Node):\n",
" def prep(self, shared):\n",
" # Use self.params (which must remain immutable during prep/exec/post).\n",
" # Typically, we only store identifying info in params (e.g., filename).\n",
" content = shared['data'][self.params['filename']]\n",
" return content\n",
" def exec(self, shared, prep_res):\n",
" content = prep_res\n",
" prompt = f\"{content} Respond a summary of above in 10 words\"\n",
" summary = call_llm(prompt)\n",
" return summary\n",
" def post(self, shared, prep_res, exec_res):\n",
" shared[\"summary\"][self.params['filename']] = exec_res\n",
"\n",
"summarize_file = SummarizeFile()\n",
"# For testing, we set params directly on the node.\n",
"# In real usage, you'd set them in a Flow or BatchFlow.\n",
"summarize_file.set_params({\"filename\":\"addiction.txt\"})\n",
"summarize_file.run(shared)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 5) If data is large, we can apply a map-reduce pattern:\n",
"# - MapSummaries(BatchNode) => chunk the file and summarize each chunk\n",
"# - ReduceSummaries(Node) => combine those chunk-level summaries\n",
"class MapSummaries(BatchNode):\n",
" def prep(self, shared):\n",
" content = shared['data'][self.params['filename']]\n",
" chunk_size = 10000\n",
" chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]\n",
" # Must return an iterable (list or generator) for a BatchNode.\n",
" return chunks\n",
" def exec(self, shared, prep_res):\n",
" # Each iteration of prep_res corresponds to a single chunk.\n",
" chunk = prep_res\n",
" prompt = f\"{chunk} Respond a summary of above in 10 words\"\n",
" summary = call_llm(prompt)\n",
" return summary\n",
" def post(self, shared, prep_res, exec_res):\n",
" # exec_res is a list of exec() results (summaries for each chunk).\n",
" combined_summary = [f\"{i}. {summary}\" for i, summary in enumerate(exec_res)]\n",
" shared[\"summary\"][self.params['filename']] = combined_summary\n",
"\n",
"class ReduceSummaries(Node):\n",
" def prep(self, shared):\n",
" # Retrieve the list of chunk summaries from shared storage\n",
" return shared[\"summary\"][self.params['filename']]\n",
" def exec(self, shared, prep_res):\n",
" combined_summary = prep_res\n",
" prompt = f\"{combined_summary} Respond a summary of above in 10 words\"\n",
" summary = call_llm(prompt)\n",
" return summary\n",
" def post(self, shared, prep_res, exec_res):\n",
" # Store the combined summary as the final summary for this file.\n",
" shared[\"summary\"][self.params['filename']] = exec_res\n",
" \n",
"map_summaries = MapSummaries()\n",
"reduce_summaries = ReduceSummaries()\n",
"# Link map_summaries to reduce_summaries with an action\n",
"# By default, the action is \"default\" (when post returns None, it takes \"default\" action)\n",
"# This is the same as map_summaries - \"default\" >> reduce_summaries\n",
"map_summaries >> reduce_summaries\n",
"\n",
"# We don't directly call map_summaries.run(shared), \n",
"# because that alone would process only the map step without reduce.\n",
"\n",
"# 6) Instead, create a Flow that starts from map_summaries (a Node) \n",
"# and automatically includes reduce_summaries. \n",
"# Note: A Flow can also start from any other Flow or BatchFlow.\n",
"\n",
"\n",
"file_summary_flow = Flow(start=map_summaries)\n",
"# When a flow params is set, it will recursively set its params to all nodes in the flow\n",
"file_summary_flow.set_params({\"filename\":\"before.txt\"})\n",
"file_summary_flow.run(shared)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 7) Summarize all files using a BatchFlow that reruns file_summary_flow for each file\n",
"class SummarizeAllFiles(BatchFlow):\n",
" def prep(self, shared):\n",
" # Return a list of parameters to apply in each flow iteration.\n",
" # Each individual param will be merged with this node's own params \n",
" # Allowing nesting of multi-level BatchFlow. \n",
" # E.g., first level diretcory, second level file.\n",
" return [{\"filename\":filename} for filename in shared['data']]\n",
"\n",
"summarize_all_files = SummarizeAllFiles(start=file_summary_flow)\n",
"summarize_all_files.run(shared)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 8) QA Agent: Find the most relevant file based on summary with actions\n",
"# if no question is asked:\n",
"# (a) end: terminate the flow \n",
"# if question is asked:\n",
"# if relevant file is found:\n",
"# (b) answer: move to answer node and read the whole file to answer the question\n",
"# if no relevant file is found:\n",
"# (c) retry: retry the process to find the relevant file\n",
"class FindRelevantFile(Node):\n",
" def prep(self, shared):\n",
" question = input(\"Enter a question: \")\n",
" formatted_list = [f\"- '{filename}': {shared['summary'][filename]}\" \n",
" for filename in shared['summary']]\n",
" return question, formatted_list\n",
" def exec(self, shared, prep_res):\n",
" question, formatted_list = prep_res\n",
" if not question:\n",
" return {\"think\":\"no question\", \"has_relevant\":False}\n",
" # Provide a structured YAML output that includes:\n",
" # - The chain of thought\n",
" # - Whether any relevant file was found\n",
" # - The most relevant file if found\n",
" prompt = f\"\"\"Question: {question} \n",
"Find the most relevant file from: \n",
"{formatted_list}\n",
"If no relevant file, explain why\n",
"Respond in yaml without additional information:\n",
"think: the question has/has no relevant file ...\n",
"has_relevant: true/false\n",
"most_relevant: filename\"\"\"\n",
" response = call_llm(prompt)\n",
" import yaml\n",
" result = yaml.safe_load(response)\n",
" # Ensure required fields are present\n",
" assert \"think\" in result\n",
" assert \"has_relevant\" in result\n",
" assert \"most_relevant\" in result if result[\"has_relevant\"] else True\n",
" return result\n",
" # handle errors by returning a default response in case of exception after retries\n",
" def process_after_fail(self,shared,prep_res,exc):\n",
" # if not overridden, the default is to throw the exception\n",
" return {\"think\":\"error finding the file\", \"has_relevant\":False}\n",
" def post(self, shared, prep_res, exec_res):\n",
" question, _ = prep_res\n",
" # Decide what to do next based on the results\n",
" if not question:\n",
" print(f\"No question asked\")\n",
" return \"end\"\n",
" if exec_res[\"has_relevant\"]:\n",
" # Store the question and most relevant file in shared\n",
" shared[\"question\"] = question\n",
" shared[\"relevant_file\"] = exec_res['most_relevant']\n",
" print(f\"Relevant file found: {exec_res['most_relevant']}\")\n",
" return \"answer\"\n",
" else:\n",
" print(f\"No relevant file found: {exec_res['think']}\")\n",
" return \"retry\"\n",
"\n",
"class AnswerQuestion(Node):\n",
" def prep(self, shared):\n",
" question = shared['question']\n",
" relevant_file = shared['relevant_file']\n",
" # Read the whole file content\n",
" file_content = shared['data'][relevant_file]\n",
" return question, file_content\n",
" def exec(self, shared, prep_res):\n",
" question, file_content = prep_res\n",
" prompt = f\"\"\"Question: {question}\n",
"File: {file_content}\n",
"Answer the question in 50 words\"\"\"\n",
" response = call_llm(prompt)\n",
" return response\n",
" def post(self, shared, prep_res, exec_res):\n",
" print(f\"Answer: {exec_res}\")\n",
"\n",
"class NoOp(Node):\n",
" pass\n",
"\n",
"# Configure the QA agent with appropriate transitions and retries\n",
"find_relevant_file = FindRelevantFile(max_retries=3)\n",
"answer_question = AnswerQuestion()\n",
"no_op = NoOp()\n",
"\n",
"# Connect the nodes based on the actions they return\n",
"find_relevant_file - \"answer\" >> answer_question >> find_relevant_file\n",
"find_relevant_file - \"retry\" >> find_relevant_file\n",
"find_relevant_file - \"end\" >> no_op\n",
"\n",
"qa_agent = Flow(start=find_relevant_file)\n",
"qa_agent.run(shared)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}