# Example App for text summarization & QA using minillmflow from minillmflow import Node, BatchNode, Flow, BatchFlow, AsyncNode, AsyncFlow, BatchAsyncFlow import os # 1) Implement a simple LLM helper (OpenAI in this example). def call_llm(prompt): # Users must set an OpenAI API key; can also load from env var, etc. openai.api_key = "YOUR_API_KEY_HERE" r = openai.ChatCompletion.create( model="gpt-4", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content # 2) Create a shared store (dict) for Node/Flow data exchange. # This can be replaced with a DB or other storage. # Design the structure / schema based on the app requirements. shared = {"data": {}, "summary": {}} # 3) Create a Node that loads data from disk into shared['data']. class LoadData(Node): # For compute-intensive operations, do them in prep(). def prep(self, shared): path = "../data/PaulGrahamEssaysLarge" for filename in os.listdir(path): with open(os.path.join(path, filename), 'r') as f: shared['data'][filename] = f.read() # If LLM was needed, we'd handle it in exec(). Not needed here. # (idempotent so it can be retried if needed) def exec(self,shared,prep_res): pass # post() can update shared again or decide the next node (by return the action). def post(self,shared,prep_res,exec_res): pass load_data = LoadData() # Run the data-loading node once load_data.run(shared) # 4) Create a Node that summarizes a single file using the LLM. class SummarizeFile(Node): def prep(self, shared): # Use self.params (which must remain immutable during prep/exec/post). # Typically, we only store identifying info in params (e.g., filename). content = shared['data'][self.params['filename']] return content def exec(self, shared, prep_res): content = prep_res prompt = f"{content} Respond a summary of above in 10 words" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): shared["summary"][self.params['filename']] = exec_res summarize_file = SummarizeFile() # For testing, we set params directly on the node. # In real usage, you'd set them in a Flow or BatchFlow. summarize_file.set_params({"filename":"addiction.txt"}) summarize_file.run(shared) # 5) If data is large, we can apply a map-reduce pattern: # - MapSummaries(BatchNode) => chunk the file and summarize each chunk # - ReduceSummaries(Node) => combine those chunk-level summaries class MapSummaries(BatchNode): def prep(self, shared): content = shared['data'][self.params['filename']] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] # Must return an iterable (list or generator) for a BatchNode. return chunks def exec(self, shared, prep_res): # Each iteration of prep_res corresponds to a single chunk. chunk = prep_res prompt = f"{chunk} Respond a summary of above in 10 words" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # exec_res is a list of exec() results (summaries for each chunk). combined_summary = [f"{i}. {summary}" for i, summary in enumerate(exec_res)] shared["summary"][self.params['filename']] = combined_summary class ReduceSummaries(Node): def prep(self, shared): # Retrieve the list of chunk summaries from shared storage return shared["summary"][self.params['filename']] def exec(self, shared, prep_res): combined_summary = prep_res prompt = f"{combined_summary} Respond a summary of above in 10 words" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # Store the combined summary as the final summary for this file. shared["summary"][self.params['filename']] = exec_res map_summaries = MapSummaries() reduce_summaries = ReduceSummaries() # Link map_summaries to reduce_summaries with an action # By default, the action is "default" (when post returns None, it takes "default" action) # This is the same as map_summaries - "default" >> reduce_summaries map_summaries >> reduce_summaries # We don't directly call map_summaries.run(shared), # because that alone would process only the map step without reduce. # 6) Instead, create a Flow that starts from map_summaries (a Node) # and automatically includes reduce_summaries. # Note: A Flow can also start from any other Flow or BatchFlow. file_summary_flow = Flow(start=map_summaries) # When a flow params is set, it will recursively set its params to all nodes in the flow file_summary_flow.set_params({"filename":"before.txt"}) file_summary_flow.run(shared) # 7) Summarize all files using a BatchFlow that reruns file_summary_flow for each file class SummarizeAllFiles(BatchFlow): def prep(self, shared): # Return a list of parameters to apply in each flow iteration. # Each individual param will be merged with this node's own params # Allowing nesting of multi-level BatchFlow. # E.g., first level diretcory, second level file. return [{"filename":filename} for filename in shared['data']] summarize_all_files = SummarizeAllFiles(start=file_summary_flow) summarize_all_files.run(shared) # 8) QA Agent: Find the most relevant file based on summary with actions # if no question is asked: # (a) end: terminate the flow # if question is asked: # if relevant file is found: # (b) answer: move to answer node and read the whole file to answer the question # if no relevant file is found: # (c) retry: retry the process to find the relevant file class FindRelevantFile(Node): def prep(self, shared): question = input("Enter a question: ") formatted_list = [f"- '{filename}': {shared['summary'][filename]}" for filename in shared['summary']] return question, formatted_list def exec(self, shared, prep_res): question, formatted_list = prep_res if not question: return {"think":"no question", "has_relevant":False} # Provide a structured YAML output that includes: # - The chain of thought # - Whether any relevant file was found # - The most relevant file if found prompt = f"""Question: {question} Find the most relevant file from: {formatted_list} If no relevant file, explain why Respond in yaml without additional information: think: the question has/has no relevant file ... has_relevant: true/false most_relevant: filename""" response = call_llm(prompt) import yaml result = yaml.safe_load(response) # Ensure required fields are present assert "think" in result assert "has_relevant" in result assert "most_relevant" in result if result["has_relevant"] else True return result # handle errors by returning a default response in case of exception after retries def exec_fallback(self,shared,prep_res,exc): # if not overridden, the default is to throw the exception return {"think":"error finding the file", "has_relevant":False} def post(self, shared, prep_res, exec_res): question, _ = prep_res # Decide what to do next based on the results if not question: print(f"No question asked") return "end" if exec_res["has_relevant"]: # Store the question and most relevant file in shared shared["question"] = question shared["relevant_file"] = exec_res['most_relevant'] print(f"Relevant file found: {exec_res['most_relevant']}") return "answer" else: print(f"No relevant file found: {exec_res['think']}") return "retry" class AnswerQuestion(Node): def prep(self, shared): question = shared['question'] relevant_file = shared['relevant_file'] # Read the whole file content file_content = shared['data'][relevant_file] return question, file_content def exec(self, shared, prep_res): question, file_content = prep_res prompt = f"""Question: {question} File: {file_content} Answer the question in 50 words""" response = call_llm(prompt) return response def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") class NoOp(Node): pass # Configure the QA agent with appropriate transitions and retries find_relevant_file = FindRelevantFile(max_retries=3) answer_question = AnswerQuestion() no_op = NoOp() # Connect the nodes based on the actions they return find_relevant_file - "answer" >> answer_question >> find_relevant_file find_relevant_file - "retry" >> find_relevant_file find_relevant_file - "end" >> no_op qa_agent = Flow(start=find_relevant_file) qa_agent.run(shared) # Above example demonstrates the use of minillmflow # Next, build another app based on the same principles # First, given the app's requirements, design the Node/Flow structure # Then, design the data structure within shared storage, and how it's updated # Finally, implement the Nodes and Flows to achieve the desired functionality