From b13225c8661dda2f0eccbb9eeb9b19ac4d22bf45 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Tue, 31 Dec 2024 22:01:41 +0000 Subject: [PATCH] simplify prompt --- assets/prompt | 268 ++++++++++++++------------------------------ cookbook/demo.ipynb | 2 +- 2 files changed, 85 insertions(+), 185 deletions(-) diff --git a/assets/prompt b/assets/prompt index e991612..fb659b6 100644 --- a/assets/prompt +++ b/assets/prompt @@ -1,10 +1,8 @@ -# Example App for text summarization & QA using minillmflow -from minillmflow import Node, BatchNode, Flow, BatchFlow, AsyncNode, AsyncFlow, BatchAsyncFlow -import os +from minillmflow import * +import openai, os, yaml -# 1) Implement a simple LLM helper (OpenAI in this example). +# Minimal LLM wrapper def call_llm(prompt): - # Users must set an OpenAI API key; can also load from env var, etc. openai.api_key = "YOUR_API_KEY_HERE" r = openai.ChatCompletion.create( model="gpt-4", @@ -12,211 +10,113 @@ def call_llm(prompt): ) return r.choices[0].message.content -# 2) Create a shared store (dict) for Node/Flow data exchange. -# This can be replaced with a DB or other storage. -# Design the structure / schema based on the app requirements. shared = {"data": {}, "summary": {}} -# 3) Create a Node that loads data from disk into shared['data']. +# Load data into shared['data'] class LoadData(Node): - # For compute-intensive operations, do them in prep(). def prep(self, shared): path = "../data/PaulGrahamEssaysLarge" - for filename in os.listdir(path): - with open(os.path.join(path, filename), 'r') as f: - shared['data'][filename] = f.read() - # If LLM was needed, we'd handle it in exec(). Not needed here. - # (idempotent so it can be retried if needed) - def exec(self,shared,prep_res): pass - # post() can update shared again or decide the next node (by return the action). - def post(self,shared,prep_res,exec_res): pass + for fn in os.listdir(path): + with open(os.path.join(path, fn), 'r') as f: + shared['data'][fn] = f.read() + def exec(self, res): pass + def post(self, s, pr, er): pass -load_data = LoadData() -# Run the data-loading node once -load_data.run(shared) +LoadData().run(shared) -# 4) Create a Node that summarizes a single file using the LLM. +# Summarize one file class SummarizeFile(Node): - def prep(self, shared): - # Use self.params (which must remain immutable during prep/exec/post). - # Typically, we only store identifying info in params (e.g., filename). - content = shared['data'][self.params['filename']] - return content - def exec(self, shared, prep_res): - content = prep_res - prompt = f"{content} Respond a summary of above in 10 words" - summary = call_llm(prompt) - return summary - def post(self, shared, prep_res, exec_res): - shared["summary"][self.params['filename']] = exec_res + def prep(self, s): return s['data'][self.params['filename']] + def exec(self, content): + return call_llm(f"{content} Summarize in 10 words.") + def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr -summarize_file = SummarizeFile() -# For testing, we set params directly on the node. -# In real usage, you'd set them in a Flow or BatchFlow. -summarize_file.set_params({"filename":"addiction.txt"}) -summarize_file.run(shared) +node_summ = SummarizeFile() +node_summ.set_params({"filename":"addiction.txt"}) +node_summ.run(shared) -# 5) If data is large, we can apply a map-reduce pattern: -# - MapSummaries(BatchNode) => chunk the file and summarize each chunk -# - ReduceSummaries(Node) => combine those chunk-level summaries +# Map-Reduce summarization class MapSummaries(BatchNode): - def prep(self, shared): - content = shared['data'][self.params['filename']] - chunk_size = 10000 - chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] - # Must return an iterable (list or generator) for a BatchNode. - return chunks - def exec(self, shared, prep_res): - # Each iteration of prep_res corresponds to a single chunk. - chunk = prep_res - prompt = f"{chunk} Respond a summary of above in 10 words" - summary = call_llm(prompt) - return summary - def post(self, shared, prep_res, exec_res): - # exec_res is a list of exec() results (summaries for each chunk). - combined_summary = [f"{i}. {summary}" for i, summary in enumerate(exec_res)] - shared["summary"][self.params['filename']] = combined_summary + def prep(self, s): + text = s['data'][self.params['filename']] + return [text[i:i+10000] for i in range(0, len(text), 10000)] + def exec(self, chunk): + return call_llm(f"{chunk} Summarize in 10 words.") + def post(self, s, pr, er): + s["summary"][self.params['filename']] = [f"{i}. {r}" for i,r in enumerate(er)] class ReduceSummaries(Node): - def prep(self, shared): - # Retrieve the list of chunk summaries from shared storage - return shared["summary"][self.params['filename']] - def exec(self, shared, prep_res): - combined_summary = prep_res - prompt = f"{combined_summary} Respond a summary of above in 10 words" - summary = call_llm(prompt) - return summary - def post(self, shared, prep_res, exec_res): - # Store the combined summary as the final summary for this file. - shared["summary"][self.params['filename']] = exec_res - -map_summaries = MapSummaries() -reduce_summaries = ReduceSummaries() -# Link map_summaries to reduce_summaries with an action -# By default, the action is "default" (when post returns None, it takes "default" action) -# This is the same as map_summaries - "default" >> reduce_summaries -map_summaries >> reduce_summaries + def prep(self, s): return s["summary"][self.params['filename']] + def exec(self, chunks): + return call_llm(f"{chunks} Combine into 10 words summary.") + def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr -# We don't directly call map_summaries.run(shared), -# because that alone would process only the map step without reduce. +map_summ = MapSummaries() +reduce_summ = ReduceSummaries() +map_summ >> reduce_summ -# 6) Instead, create a Flow that starts from map_summaries (a Node) -# and automatically includes reduce_summaries. -# Note: A Flow can also start from any other Flow or BatchFlow. +flow = Flow(start=map_summ) +flow.set_params({"filename":"before.txt"}) +flow.run(shared) - -file_summary_flow = Flow(start=map_summaries) -# When a flow params is set, it will recursively set its params to all nodes in the flow -file_summary_flow.set_params({"filename":"before.txt"}) -file_summary_flow.run(shared) - -# 7) Summarize all files using a BatchFlow that reruns file_summary_flow for each file +# Summarize all files class SummarizeAllFiles(BatchFlow): - def prep(self, shared): - # Return a list of parameters to apply in each flow iteration. - # Each individual param will be merged with this node's own params - # Allowing nesting of multi-level BatchFlow. - # E.g., first level diretcory, second level file. - return [{"filename":filename} for filename in shared['data']] + def prep(self, s): return [{"filename":fn} for fn in s['data']] -summarize_all_files = SummarizeAllFiles(start=file_summary_flow) -summarize_all_files.run(shared) +SummarizeAllFiles(start=flow).run(shared) - -# 8) QA Agent: Find the most relevant file based on summary with actions -# if no question is asked: -# (a) end: terminate the flow -# if question is asked: -# if relevant file is found: -# (b) answer: move to answer node and read the whole file to answer the question -# if no relevant file is found: -# (c) retry: retry the process to find the relevant file +# QA agent class FindRelevantFile(Node): - def prep(self, shared): - question = input("Enter a question: ") - formatted_list = [f"- '{filename}': {shared['summary'][filename]}" - for filename in shared['summary']] - return question, formatted_list - def exec(self, shared, prep_res): - question, formatted_list = prep_res - if not question: - return {"think":"no question", "has_relevant":False} - # Provide a structured YAML output that includes: - # - The chain of thought - # - Whether any relevant file was found - # - The most relevant file if found - prompt = f"""Question: {question} -Find the most relevant file from: -{formatted_list} -If no relevant file, explain why -Respond in yaml without additional information: -think: the question has/has no relevant file ... -has_relevant: true/false -most_relevant: filename""" - response = call_llm(prompt) - import yaml - result = yaml.safe_load(response) - # Ensure required fields are present - assert "think" in result - assert "has_relevant" in result - assert "most_relevant" in result if result["has_relevant"] else True - return result - # handle errors by returning a default response in case of exception after retries - def exec_fallback(self,shared,prep_res,exc): - # if not overridden, the default is to throw the exception - return {"think":"error finding the file", "has_relevant":False} - def post(self, shared, prep_res, exec_res): - question, _ = prep_res - # Decide what to do next based on the results - if not question: - print(f"No question asked") - return "end" - if exec_res["has_relevant"]: - # Store the question and most relevant file in shared - shared["question"] = question - shared["relevant_file"] = exec_res['most_relevant'] - print(f"Relevant file found: {exec_res['most_relevant']}") + def prep(self, s): + q = input("Enter a question: ") + summ = [f"- '{fn}': {s['summary'][fn]}" for fn in s['summary']] + return q, summ + def exec(self, p): + q, summ = p + if not q: + return {"think":"no question","has_relevant":False} + resp = call_llm(f""" +Question: {q} +Find the most relevant file from: {summ} +If none, explain why +Respond in YAML: +think: ... +has_relevant: ... +most_relevant: ... +""") + r = yaml.safe_load(resp) + return r + def exec_fallback(self, p, exc): return {"think":"error","has_relevant":False} + def post(self, s, pr, res): + q, _ = pr + if not q: + print("No question asked"); return "end" + if res["has_relevant"]: + s["question"], s["relevant_file"] = q, res["most_relevant"] + print("Relevant file:", res["most_relevant"]) return "answer" else: - print(f"No relevant file found: {exec_res['think']}") + print("No relevant file:", res["think"]) return "retry" class AnswerQuestion(Node): - def prep(self, shared): - question = shared['question'] - relevant_file = shared['relevant_file'] - # Read the whole file content - file_content = shared['data'][relevant_file] - return question, file_content - def exec(self, shared, prep_res): - question, file_content = prep_res - prompt = f"""Question: {question} -File: {file_content} -Answer the question in 50 words""" - response = call_llm(prompt) - return response - def post(self, shared, prep_res, exec_res): - print(f"Answer: {exec_res}") + def prep(self, s): + return s['question'], s['data'][s['relevant_file']] + def exec(self, p): + q, txt = p + return call_llm(f"Question: {q}\nText: {txt}\nAnswer in 50 words.") + def post(self, s, pr, ex): + print("Answer:", ex) -class NoOp(Node): - pass +class NoOp(Node): pass -# Configure the QA agent with appropriate transitions and retries -find_relevant_file = FindRelevantFile(max_retries=3) -answer_question = AnswerQuestion() -no_op = NoOp() +frf = FindRelevantFile(max_retries=3) +aq = AnswerQuestion() +noop = NoOp() -# Connect the nodes based on the actions they return -find_relevant_file - "answer" >> answer_question >> find_relevant_file -find_relevant_file - "retry" >> find_relevant_file -find_relevant_file - "end" >> no_op +frf - "answer" >> aq >> frf +frf - "retry" >> frf +frf - "end" >> noop -qa_agent = Flow(start=find_relevant_file) -qa_agent.run(shared) - - -# Above example demonstrates the use of minillmflow -# Next, build another app based on the same principles -# First, given the app's requirements, design the Node/Flow structure -# Then, design the data structure within shared storage, and how it's updated -# Finally, implement the Nodes and Flows to achieve the desired functionality \ No newline at end of file +qa = Flow(start=frf) +qa.run(shared) diff --git a/cookbook/demo.ipynb b/cookbook/demo.ipynb index c4d54e7..b332768 100644 --- a/cookbook/demo.ipynb +++ b/cookbook/demo.ipynb @@ -355,7 +355,7 @@ " assert \"most_relevant\" in result if result[\"has_relevant\"] else True\n", " return result\n", " # handle errors by returning a default response in case of exception after retries\n", - " def exec_fallback(self,shared,prep_res,exc):\n", + " def exec_fallback(self,prep_res,exc):\n", " # if not overridden, the default is to throw the exception\n", " return {\"think\":\"error finding the file\", \"has_relevant\":False}\n", " def post(self, shared, prep_res, exec_res):\n",