simplify prompt
This commit is contained in:
parent
d316dfda37
commit
b13225c866
268
assets/prompt
268
assets/prompt
|
|
@ -1,10 +1,8 @@
|
|||
# Example App for text summarization & QA using minillmflow
|
||||
from minillmflow import Node, BatchNode, Flow, BatchFlow, AsyncNode, AsyncFlow, BatchAsyncFlow
|
||||
import os
|
||||
from minillmflow import *
|
||||
import openai, os, yaml
|
||||
|
||||
# 1) Implement a simple LLM helper (OpenAI in this example).
|
||||
# Minimal LLM wrapper
|
||||
def call_llm(prompt):
|
||||
# Users must set an OpenAI API key; can also load from env var, etc.
|
||||
openai.api_key = "YOUR_API_KEY_HERE"
|
||||
r = openai.ChatCompletion.create(
|
||||
model="gpt-4",
|
||||
|
|
@ -12,211 +10,113 @@ def call_llm(prompt):
|
|||
)
|
||||
return r.choices[0].message.content
|
||||
|
||||
# 2) Create a shared store (dict) for Node/Flow data exchange.
|
||||
# This can be replaced with a DB or other storage.
|
||||
# Design the structure / schema based on the app requirements.
|
||||
shared = {"data": {}, "summary": {}}
|
||||
|
||||
# 3) Create a Node that loads data from disk into shared['data'].
|
||||
# Load data into shared['data']
|
||||
class LoadData(Node):
|
||||
# For compute-intensive operations, do them in prep().
|
||||
def prep(self, shared):
|
||||
path = "../data/PaulGrahamEssaysLarge"
|
||||
for filename in os.listdir(path):
|
||||
with open(os.path.join(path, filename), 'r') as f:
|
||||
shared['data'][filename] = f.read()
|
||||
# If LLM was needed, we'd handle it in exec(). Not needed here.
|
||||
# (idempotent so it can be retried if needed)
|
||||
def exec(self,shared,prep_res): pass
|
||||
# post() can update shared again or decide the next node (by return the action).
|
||||
def post(self,shared,prep_res,exec_res): pass
|
||||
for fn in os.listdir(path):
|
||||
with open(os.path.join(path, fn), 'r') as f:
|
||||
shared['data'][fn] = f.read()
|
||||
def exec(self, res): pass
|
||||
def post(self, s, pr, er): pass
|
||||
|
||||
load_data = LoadData()
|
||||
# Run the data-loading node once
|
||||
load_data.run(shared)
|
||||
LoadData().run(shared)
|
||||
|
||||
# 4) Create a Node that summarizes a single file using the LLM.
|
||||
# Summarize one file
|
||||
class SummarizeFile(Node):
|
||||
def prep(self, shared):
|
||||
# Use self.params (which must remain immutable during prep/exec/post).
|
||||
# Typically, we only store identifying info in params (e.g., filename).
|
||||
content = shared['data'][self.params['filename']]
|
||||
return content
|
||||
def exec(self, shared, prep_res):
|
||||
content = prep_res
|
||||
prompt = f"{content} Respond a summary of above in 10 words"
|
||||
summary = call_llm(prompt)
|
||||
return summary
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
shared["summary"][self.params['filename']] = exec_res
|
||||
def prep(self, s): return s['data'][self.params['filename']]
|
||||
def exec(self, content):
|
||||
return call_llm(f"{content} Summarize in 10 words.")
|
||||
def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr
|
||||
|
||||
summarize_file = SummarizeFile()
|
||||
# For testing, we set params directly on the node.
|
||||
# In real usage, you'd set them in a Flow or BatchFlow.
|
||||
summarize_file.set_params({"filename":"addiction.txt"})
|
||||
summarize_file.run(shared)
|
||||
node_summ = SummarizeFile()
|
||||
node_summ.set_params({"filename":"addiction.txt"})
|
||||
node_summ.run(shared)
|
||||
|
||||
# 5) If data is large, we can apply a map-reduce pattern:
|
||||
# - MapSummaries(BatchNode) => chunk the file and summarize each chunk
|
||||
# - ReduceSummaries(Node) => combine those chunk-level summaries
|
||||
# Map-Reduce summarization
|
||||
class MapSummaries(BatchNode):
|
||||
def prep(self, shared):
|
||||
content = shared['data'][self.params['filename']]
|
||||
chunk_size = 10000
|
||||
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
|
||||
# Must return an iterable (list or generator) for a BatchNode.
|
||||
return chunks
|
||||
def exec(self, shared, prep_res):
|
||||
# Each iteration of prep_res corresponds to a single chunk.
|
||||
chunk = prep_res
|
||||
prompt = f"{chunk} Respond a summary of above in 10 words"
|
||||
summary = call_llm(prompt)
|
||||
return summary
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
# exec_res is a list of exec() results (summaries for each chunk).
|
||||
combined_summary = [f"{i}. {summary}" for i, summary in enumerate(exec_res)]
|
||||
shared["summary"][self.params['filename']] = combined_summary
|
||||
def prep(self, s):
|
||||
text = s['data'][self.params['filename']]
|
||||
return [text[i:i+10000] for i in range(0, len(text), 10000)]
|
||||
def exec(self, chunk):
|
||||
return call_llm(f"{chunk} Summarize in 10 words.")
|
||||
def post(self, s, pr, er):
|
||||
s["summary"][self.params['filename']] = [f"{i}. {r}" for i,r in enumerate(er)]
|
||||
|
||||
class ReduceSummaries(Node):
|
||||
def prep(self, shared):
|
||||
# Retrieve the list of chunk summaries from shared storage
|
||||
return shared["summary"][self.params['filename']]
|
||||
def exec(self, shared, prep_res):
|
||||
combined_summary = prep_res
|
||||
prompt = f"{combined_summary} Respond a summary of above in 10 words"
|
||||
summary = call_llm(prompt)
|
||||
return summary
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
# Store the combined summary as the final summary for this file.
|
||||
shared["summary"][self.params['filename']] = exec_res
|
||||
|
||||
map_summaries = MapSummaries()
|
||||
reduce_summaries = ReduceSummaries()
|
||||
# Link map_summaries to reduce_summaries with an action
|
||||
# By default, the action is "default" (when post returns None, it takes "default" action)
|
||||
# This is the same as map_summaries - "default" >> reduce_summaries
|
||||
map_summaries >> reduce_summaries
|
||||
def prep(self, s): return s["summary"][self.params['filename']]
|
||||
def exec(self, chunks):
|
||||
return call_llm(f"{chunks} Combine into 10 words summary.")
|
||||
def post(self, s, pr, sr): s["summary"][self.params['filename']] = sr
|
||||
|
||||
# We don't directly call map_summaries.run(shared),
|
||||
# because that alone would process only the map step without reduce.
|
||||
map_summ = MapSummaries()
|
||||
reduce_summ = ReduceSummaries()
|
||||
map_summ >> reduce_summ
|
||||
|
||||
# 6) Instead, create a Flow that starts from map_summaries (a Node)
|
||||
# and automatically includes reduce_summaries.
|
||||
# Note: A Flow can also start from any other Flow or BatchFlow.
|
||||
flow = Flow(start=map_summ)
|
||||
flow.set_params({"filename":"before.txt"})
|
||||
flow.run(shared)
|
||||
|
||||
|
||||
file_summary_flow = Flow(start=map_summaries)
|
||||
# When a flow params is set, it will recursively set its params to all nodes in the flow
|
||||
file_summary_flow.set_params({"filename":"before.txt"})
|
||||
file_summary_flow.run(shared)
|
||||
|
||||
# 7) Summarize all files using a BatchFlow that reruns file_summary_flow for each file
|
||||
# Summarize all files
|
||||
class SummarizeAllFiles(BatchFlow):
|
||||
def prep(self, shared):
|
||||
# Return a list of parameters to apply in each flow iteration.
|
||||
# Each individual param will be merged with this node's own params
|
||||
# Allowing nesting of multi-level BatchFlow.
|
||||
# E.g., first level diretcory, second level file.
|
||||
return [{"filename":filename} for filename in shared['data']]
|
||||
def prep(self, s): return [{"filename":fn} for fn in s['data']]
|
||||
|
||||
summarize_all_files = SummarizeAllFiles(start=file_summary_flow)
|
||||
summarize_all_files.run(shared)
|
||||
SummarizeAllFiles(start=flow).run(shared)
|
||||
|
||||
|
||||
# 8) QA Agent: Find the most relevant file based on summary with actions
|
||||
# if no question is asked:
|
||||
# (a) end: terminate the flow
|
||||
# if question is asked:
|
||||
# if relevant file is found:
|
||||
# (b) answer: move to answer node and read the whole file to answer the question
|
||||
# if no relevant file is found:
|
||||
# (c) retry: retry the process to find the relevant file
|
||||
# QA agent
|
||||
class FindRelevantFile(Node):
|
||||
def prep(self, shared):
|
||||
question = input("Enter a question: ")
|
||||
formatted_list = [f"- '{filename}': {shared['summary'][filename]}"
|
||||
for filename in shared['summary']]
|
||||
return question, formatted_list
|
||||
def exec(self, shared, prep_res):
|
||||
question, formatted_list = prep_res
|
||||
if not question:
|
||||
return {"think":"no question", "has_relevant":False}
|
||||
# Provide a structured YAML output that includes:
|
||||
# - The chain of thought
|
||||
# - Whether any relevant file was found
|
||||
# - The most relevant file if found
|
||||
prompt = f"""Question: {question}
|
||||
Find the most relevant file from:
|
||||
{formatted_list}
|
||||
If no relevant file, explain why
|
||||
Respond in yaml without additional information:
|
||||
think: the question has/has no relevant file ...
|
||||
has_relevant: true/false
|
||||
most_relevant: filename"""
|
||||
response = call_llm(prompt)
|
||||
import yaml
|
||||
result = yaml.safe_load(response)
|
||||
# Ensure required fields are present
|
||||
assert "think" in result
|
||||
assert "has_relevant" in result
|
||||
assert "most_relevant" in result if result["has_relevant"] else True
|
||||
return result
|
||||
# handle errors by returning a default response in case of exception after retries
|
||||
def exec_fallback(self,shared,prep_res,exc):
|
||||
# if not overridden, the default is to throw the exception
|
||||
return {"think":"error finding the file", "has_relevant":False}
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
question, _ = prep_res
|
||||
# Decide what to do next based on the results
|
||||
if not question:
|
||||
print(f"No question asked")
|
||||
return "end"
|
||||
if exec_res["has_relevant"]:
|
||||
# Store the question and most relevant file in shared
|
||||
shared["question"] = question
|
||||
shared["relevant_file"] = exec_res['most_relevant']
|
||||
print(f"Relevant file found: {exec_res['most_relevant']}")
|
||||
def prep(self, s):
|
||||
q = input("Enter a question: ")
|
||||
summ = [f"- '{fn}': {s['summary'][fn]}" for fn in s['summary']]
|
||||
return q, summ
|
||||
def exec(self, p):
|
||||
q, summ = p
|
||||
if not q:
|
||||
return {"think":"no question","has_relevant":False}
|
||||
resp = call_llm(f"""
|
||||
Question: {q}
|
||||
Find the most relevant file from: {summ}
|
||||
If none, explain why
|
||||
Respond in YAML:
|
||||
think: ...
|
||||
has_relevant: ...
|
||||
most_relevant: ...
|
||||
""")
|
||||
r = yaml.safe_load(resp)
|
||||
return r
|
||||
def exec_fallback(self, p, exc): return {"think":"error","has_relevant":False}
|
||||
def post(self, s, pr, res):
|
||||
q, _ = pr
|
||||
if not q:
|
||||
print("No question asked"); return "end"
|
||||
if res["has_relevant"]:
|
||||
s["question"], s["relevant_file"] = q, res["most_relevant"]
|
||||
print("Relevant file:", res["most_relevant"])
|
||||
return "answer"
|
||||
else:
|
||||
print(f"No relevant file found: {exec_res['think']}")
|
||||
print("No relevant file:", res["think"])
|
||||
return "retry"
|
||||
|
||||
class AnswerQuestion(Node):
|
||||
def prep(self, shared):
|
||||
question = shared['question']
|
||||
relevant_file = shared['relevant_file']
|
||||
# Read the whole file content
|
||||
file_content = shared['data'][relevant_file]
|
||||
return question, file_content
|
||||
def exec(self, shared, prep_res):
|
||||
question, file_content = prep_res
|
||||
prompt = f"""Question: {question}
|
||||
File: {file_content}
|
||||
Answer the question in 50 words"""
|
||||
response = call_llm(prompt)
|
||||
return response
|
||||
def post(self, shared, prep_res, exec_res):
|
||||
print(f"Answer: {exec_res}")
|
||||
def prep(self, s):
|
||||
return s['question'], s['data'][s['relevant_file']]
|
||||
def exec(self, p):
|
||||
q, txt = p
|
||||
return call_llm(f"Question: {q}\nText: {txt}\nAnswer in 50 words.")
|
||||
def post(self, s, pr, ex):
|
||||
print("Answer:", ex)
|
||||
|
||||
class NoOp(Node):
|
||||
pass
|
||||
class NoOp(Node): pass
|
||||
|
||||
# Configure the QA agent with appropriate transitions and retries
|
||||
find_relevant_file = FindRelevantFile(max_retries=3)
|
||||
answer_question = AnswerQuestion()
|
||||
no_op = NoOp()
|
||||
frf = FindRelevantFile(max_retries=3)
|
||||
aq = AnswerQuestion()
|
||||
noop = NoOp()
|
||||
|
||||
# Connect the nodes based on the actions they return
|
||||
find_relevant_file - "answer" >> answer_question >> find_relevant_file
|
||||
find_relevant_file - "retry" >> find_relevant_file
|
||||
find_relevant_file - "end" >> no_op
|
||||
frf - "answer" >> aq >> frf
|
||||
frf - "retry" >> frf
|
||||
frf - "end" >> noop
|
||||
|
||||
qa_agent = Flow(start=find_relevant_file)
|
||||
qa_agent.run(shared)
|
||||
|
||||
|
||||
# Above example demonstrates the use of minillmflow
|
||||
# Next, build another app based on the same principles
|
||||
# First, given the app's requirements, design the Node/Flow structure
|
||||
# Then, design the data structure within shared storage, and how it's updated
|
||||
# Finally, implement the Nodes and Flows to achieve the desired functionality
|
||||
qa = Flow(start=frf)
|
||||
qa.run(shared)
|
||||
|
|
|
|||
|
|
@ -355,7 +355,7 @@
|
|||
" assert \"most_relevant\" in result if result[\"has_relevant\"] else True\n",
|
||||
" return result\n",
|
||||
" # handle errors by returning a default response in case of exception after retries\n",
|
||||
" def exec_fallback(self,shared,prep_res,exc):\n",
|
||||
" def exec_fallback(self,prep_res,exc):\n",
|
||||
" # if not overridden, the default is to throw the exception\n",
|
||||
" return {\"think\":\"error finding the file\", \"has_relevant\":False}\n",
|
||||
" def post(self, shared, prep_res, exec_res):\n",
|
||||
|
|
|
|||
Loading…
Reference in New Issue