diff --git a/docs/design_pattern/rag.md b/docs/design_pattern/rag.md
index fa9790e..27f6045 100644
--- a/docs/design_pattern/rag.md
+++ b/docs/design_pattern/rag.md
@@ -14,47 +14,157 @@ Most common way to retrive text-based context is through embedding:
3. Then you store the chunks in [vector databases](../utility_function/vector.md).
4. Finally, given a query, you embed the query and find the closest chunk in the vector databases.
-### Example: Question Answering
+
+

+
+
+# RAG (Retrieval Augmented Generation)
+
+For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
+
+1. **Offline stage**: Preprocess and index documents ("building the index").
+2. **Online stage**: Given a question, generate answers by retrieving the most relevant context from the index.
+
+---
+## Stage 1: Offline Indexing
+
+We create three Nodes:
+1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text.
+2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk.
+3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md).
```python
-class PrepareEmbeddings(Node):
+class ChunkDocs(BatchNode):
def prep(self, shared):
- return shared["texts"]
+ # A list of file paths in shared["files"]. We process each file.
+ return shared["files"]
- def exec(self, texts):
- # Embed each text chunk
- embs = [get_embedding(t) for t in texts]
- return embs
+ def exec(self, filepath):
+ # read file content. In real usage, do error handling.
+ with open(filepath, "r", encoding="utf-8") as f:
+ text = f.read()
+ # chunk by 100 chars each
+ chunks = []
+ size = 100
+ for i in range(0, len(text), size):
+ chunks.append(text[i : i + size])
+ return chunks
+
+ def post(self, shared, prep_res, exec_res_list):
+ # exec_res_list is a list of chunk-lists, one per file.
+ # flatten them all into a single list of chunks.
+ all_chunks = []
+ for chunk_list in exec_res_list:
+ all_chunks.extend(chunk_list)
+ shared["all_chunks"] = all_chunks
- def post(self, shared, prep_res, exec_res):
- shared["search_index"] = create_index(exec_res)
- # no action string means "default"
-
-class AnswerQuestion(Node):
+class EmbedDocs(BatchNode):
def prep(self, shared):
- question = input("Enter question: ")
- return question
+ return shared["all_chunks"]
+
+ def exec(self, chunk):
+ return get_embedding(chunk)
+
+ def post(self, shared, prep_res, exec_res_list):
+ # Store the list of embeddings.
+ shared["all_embeds"] = exec_res_list
+ print(f"Total embeddings: {len(exec_res_list)}")
+
+class StoreIndex(Node):
+ def prep(self, shared):
+ # We'll read all embeds from shared.
+ return shared["all_embeds"]
+
+ def exec(self, all_embeds):
+ # Create a vector index (faiss or other DB in real usage).
+ index = create_index(all_embeds)
+ return index
+
+ def post(self, shared, prep_res, index):
+ shared["index"] = index
+
+# Wire them in sequence
+chunk_node = ChunkDocs()
+embed_node = EmbedDocs()
+store_node = StoreIndex()
+
+chunk_node >> embed_node >> store_node
+
+OfflineFlow = Flow(start=chunk_node)
+```
+
+Usage example:
+
+```python
+shared = {
+ "files": ["doc1.txt", "doc2.txt"], # any text files
+}
+OfflineFlow.run(shared)
+```
+
+---
+## Stage 2: Online Query & Answer
+
+We have 3 nodes:
+1. `EmbedQuery` – embeds the user’s question.
+2. `RetrieveDocs` – retrieves top chunk from the index.
+3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer.
+
+```python
+class EmbedQuery(Node):
+ def prep(self, shared):
+ return shared["question"]
def exec(self, question):
- q_emb = get_embedding(question)
- idx, _ = search_index(shared["search_index"], q_emb, top_k=1)
- best_id = idx[0][0]
- relevant_text = shared["texts"][best_id]
- prompt = f"Question: {question}\nContext: {relevant_text}\nAnswer:"
+ return get_embedding(question)
+
+ def post(self, shared, prep_res, q_emb):
+ shared["q_emb"] = q_emb
+
+class RetrieveDocs(Node):
+ def prep(self, shared):
+ # We'll need the query embedding, plus the offline index/chunks
+ return shared["q_emb"], shared["index"], shared["all_chunks"]
+
+ def exec(self, inputs):
+ q_emb, index, chunks = inputs
+ I, D = search_index(index, q_emb, top_k=1)
+ best_id = I[0][0]
+ relevant_chunk = chunks[best_id]
+ return relevant_chunk
+
+ def post(self, shared, prep_res, relevant_chunk):
+ shared["retrieved_chunk"] = relevant_chunk
+ print("Retrieved chunk:", relevant_chunk[:60], "...")
+
+class GenerateAnswer(Node):
+ def prep(self, shared):
+ return shared["question"], shared["retrieved_chunk"]
+
+ def exec(self, inputs):
+ question, chunk = inputs
+ prompt = f"Question: {question}\nContext: {chunk}\nAnswer:"
return call_llm(prompt)
- def post(self, shared, p, answer):
+ def post(self, shared, prep_res, answer):
+ shared["answer"] = answer
print("Answer:", answer)
-############################################
-# Wire up the flow
-prep = PrepareEmbeddings()
-qa = AnswerQuestion()
-prep >> qa
+embed_qnode = EmbedQuery()
+retrieve_node = RetrieveDocs()
+generate_node = GenerateAnswer()
-flow = Flow(start=prep)
+embed_qnode >> retrieve_node >> generate_node
+OnlineFlow = Flow(start=embed_qnode)
+```
-# Example usage
-shared = {"texts": ["I love apples", "Cats are great", "The sky is blue"]}
-flow.run(shared)
+Usage example:
+
+```python
+# Suppose we already ran OfflineFlow and have:
+# shared["all_chunks"], shared["index"], etc.
+shared["question"] = "Why do people like cats?"
+
+OnlineFlow.run(shared)
+# final answer in shared["answer"]
```
\ No newline at end of file