From 8c88175dc5868e335184919ff7c0558382236776 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Sat, 8 Mar 2025 01:25:38 -0500 Subject: [PATCH] update rag doc --- docs/design_pattern/rag.md | 168 ++++++++++++++++++++++++++++++------- 1 file changed, 139 insertions(+), 29 deletions(-) diff --git a/docs/design_pattern/rag.md b/docs/design_pattern/rag.md index fa9790e..27f6045 100644 --- a/docs/design_pattern/rag.md +++ b/docs/design_pattern/rag.md @@ -14,47 +14,157 @@ Most common way to retrive text-based context is through embedding: 3. Then you store the chunks in [vector databases](../utility_function/vector.md). 4. Finally, given a query, you embed the query and find the closest chunk in the vector databases. -### Example: Question Answering +
+ +
+ +# RAG (Retrieval Augmented Generation) + +For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline: + +1. **Offline stage**: Preprocess and index documents ("building the index"). +2. **Online stage**: Given a question, generate answers by retrieving the most relevant context from the index. + +--- +## Stage 1: Offline Indexing + +We create three Nodes: +1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text. +2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk. +3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md). ```python -class PrepareEmbeddings(Node): +class ChunkDocs(BatchNode): def prep(self, shared): - return shared["texts"] + # A list of file paths in shared["files"]. We process each file. + return shared["files"] - def exec(self, texts): - # Embed each text chunk - embs = [get_embedding(t) for t in texts] - return embs + def exec(self, filepath): + # read file content. In real usage, do error handling. + with open(filepath, "r", encoding="utf-8") as f: + text = f.read() + # chunk by 100 chars each + chunks = [] + size = 100 + for i in range(0, len(text), size): + chunks.append(text[i : i + size]) + return chunks + + def post(self, shared, prep_res, exec_res_list): + # exec_res_list is a list of chunk-lists, one per file. + # flatten them all into a single list of chunks. + all_chunks = [] + for chunk_list in exec_res_list: + all_chunks.extend(chunk_list) + shared["all_chunks"] = all_chunks - def post(self, shared, prep_res, exec_res): - shared["search_index"] = create_index(exec_res) - # no action string means "default" - -class AnswerQuestion(Node): +class EmbedDocs(BatchNode): def prep(self, shared): - question = input("Enter question: ") - return question + return shared["all_chunks"] + + def exec(self, chunk): + return get_embedding(chunk) + + def post(self, shared, prep_res, exec_res_list): + # Store the list of embeddings. + shared["all_embeds"] = exec_res_list + print(f"Total embeddings: {len(exec_res_list)}") + +class StoreIndex(Node): + def prep(self, shared): + # We'll read all embeds from shared. + return shared["all_embeds"] + + def exec(self, all_embeds): + # Create a vector index (faiss or other DB in real usage). + index = create_index(all_embeds) + return index + + def post(self, shared, prep_res, index): + shared["index"] = index + +# Wire them in sequence +chunk_node = ChunkDocs() +embed_node = EmbedDocs() +store_node = StoreIndex() + +chunk_node >> embed_node >> store_node + +OfflineFlow = Flow(start=chunk_node) +``` + +Usage example: + +```python +shared = { + "files": ["doc1.txt", "doc2.txt"], # any text files +} +OfflineFlow.run(shared) +``` + +--- +## Stage 2: Online Query & Answer + +We have 3 nodes: +1. `EmbedQuery` – embeds the user’s question. +2. `RetrieveDocs` – retrieves top chunk from the index. +3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer. + +```python +class EmbedQuery(Node): + def prep(self, shared): + return shared["question"] def exec(self, question): - q_emb = get_embedding(question) - idx, _ = search_index(shared["search_index"], q_emb, top_k=1) - best_id = idx[0][0] - relevant_text = shared["texts"][best_id] - prompt = f"Question: {question}\nContext: {relevant_text}\nAnswer:" + return get_embedding(question) + + def post(self, shared, prep_res, q_emb): + shared["q_emb"] = q_emb + +class RetrieveDocs(Node): + def prep(self, shared): + # We'll need the query embedding, plus the offline index/chunks + return shared["q_emb"], shared["index"], shared["all_chunks"] + + def exec(self, inputs): + q_emb, index, chunks = inputs + I, D = search_index(index, q_emb, top_k=1) + best_id = I[0][0] + relevant_chunk = chunks[best_id] + return relevant_chunk + + def post(self, shared, prep_res, relevant_chunk): + shared["retrieved_chunk"] = relevant_chunk + print("Retrieved chunk:", relevant_chunk[:60], "...") + +class GenerateAnswer(Node): + def prep(self, shared): + return shared["question"], shared["retrieved_chunk"] + + def exec(self, inputs): + question, chunk = inputs + prompt = f"Question: {question}\nContext: {chunk}\nAnswer:" return call_llm(prompt) - def post(self, shared, p, answer): + def post(self, shared, prep_res, answer): + shared["answer"] = answer print("Answer:", answer) -############################################ -# Wire up the flow -prep = PrepareEmbeddings() -qa = AnswerQuestion() -prep >> qa +embed_qnode = EmbedQuery() +retrieve_node = RetrieveDocs() +generate_node = GenerateAnswer() -flow = Flow(start=prep) +embed_qnode >> retrieve_node >> generate_node +OnlineFlow = Flow(start=embed_qnode) +``` -# Example usage -shared = {"texts": ["I love apples", "Cats are great", "The sky is blue"]} -flow.run(shared) +Usage example: + +```python +# Suppose we already ran OfflineFlow and have: +# shared["all_chunks"], shared["index"], etc. +shared["question"] = "Why do people like cats?" + +OnlineFlow.run(shared) +# final answer in shared["answer"] ``` \ No newline at end of file