From 337dcc0d66781057fcea33de18bbc919fa324651 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Sun, 30 Mar 2025 19:57:57 -0400 Subject: [PATCH] update rag tutorial --- cookbook/pocketflow-rag/README.md | 17 ++++++++++++----- cookbook/pocketflow-rag/flow.py | 7 ++++--- cookbook/pocketflow-rag/nodes.py | 24 +++++++++++++++++++++++- cookbook/pocketflow-rag/utils.py | 5 +++++ 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/cookbook/pocketflow-rag/README.md b/cookbook/pocketflow-rag/README.md index 3383f88..4d35994 100644 --- a/cookbook/pocketflow-rag/README.md +++ b/cookbook/pocketflow-rag/README.md @@ -4,6 +4,7 @@ This project demonstrates a simplified RAG system that retrieves relevant docume ## Features +- Document chunking for better retrieval granularity - Simple vector-based document retrieval - Two-stage pipeline (offline indexing, online querying) - FAISS-powered similarity search @@ -57,6 +58,7 @@ The magic happens through a two-stage pipeline implemented with PocketFlow: ```mermaid graph TD subgraph OfflineFlow[Offline Document Indexing] + ChunkDocs[ChunkDocumentsNode] --> EmbedDocs[EmbedDocumentsNode] EmbedDocs[EmbedDocumentsNode] --> CreateIndex[CreateIndexNode] end @@ -66,14 +68,19 @@ graph TD ``` Here's what each part does: -1. **EmbedDocumentsNode**: Converts documents into vector representations -2. **CreateIndexNode**: Creates a searchable FAISS index from embeddings -3. **EmbedQueryNode**: Converts user query into the same vector space -4. **RetrieveDocumentNode**: Finds the most similar document using vector search +1. **ChunkDocumentsNode**: Splits documents into smaller chunks for more granular retrieval +2. **EmbedDocumentsNode**: Converts document chunks into vector representations +3. **CreateIndexNode**: Creates a searchable FAISS index from embeddings +4. **EmbedQueryNode**: Converts user query into the same vector space +5. **RetrieveDocumentNode**: Finds the most similar document chunk using vector search ## Example Output ``` +================================================== +PocketFlow RAG Document Retrieval +================================================== +✅ Created 5 chunks from 5 documents ✅ Created 5 document embeddings 🔍 Creating search index... ✅ Index created with 5 vectors @@ -88,4 +95,4 @@ Here's what each part does: - [`main.py`](./main.py): Main entry point for running the RAG demonstration - [`flow.py`](./flow.py): Configures the flows that connect the nodes - [`nodes.py`](./nodes.py): Defines the nodes for document processing and retrieval -- [`utils.py`](./utils.py): Utility functions including the embedding function +- [`utils.py`](./utils.py): Utility functions including chunking and embedding functions \ No newline at end of file diff --git a/cookbook/pocketflow-rag/flow.py b/cookbook/pocketflow-rag/flow.py index a022237..a2df4be 100644 --- a/cookbook/pocketflow-rag/flow.py +++ b/cookbook/pocketflow-rag/flow.py @@ -1,12 +1,13 @@ from pocketflow import Flow -from nodes import EmbedDocumentsNode, CreateIndexNode, EmbedQueryNode, RetrieveDocumentNode +from nodes import EmbedDocumentsNode, CreateIndexNode, EmbedQueryNode, RetrieveDocumentNode, ChunkDocumentsNode def get_offline_flow(): # Create offline flow for document indexing + chunk_docs_node = ChunkDocumentsNode() embed_docs_node = EmbedDocumentsNode() create_index_node = CreateIndexNode() - embed_docs_node >> create_index_node - offline_flow = Flow(start=embed_docs_node) + chunk_docs_node >> embed_docs_node >> create_index_node + offline_flow = Flow(start=chunk_docs_node) return offline_flow def get_online_flow(): diff --git a/cookbook/pocketflow-rag/nodes.py b/cookbook/pocketflow-rag/nodes.py index d0e5519..1526a18 100644 --- a/cookbook/pocketflow-rag/nodes.py +++ b/cookbook/pocketflow-rag/nodes.py @@ -1,9 +1,31 @@ from pocketflow import Node, Flow, BatchNode import numpy as np import faiss -from utils import get_embedding, get_openai_embedding +from utils import get_embedding, get_openai_embedding, fixed_size_chunk # Nodes for the offline flow +class ChunkDocumentsNode(BatchNode): + def prep(self, shared): + """Read texts from shared store""" + return shared["texts"] + + def exec(self, text): + """Chunk a single text into smaller pieces""" + return fixed_size_chunk(text) + + def post(self, shared, prep_res, exec_res_list): + """Store chunked texts in the shared store""" + # Flatten the list of lists into a single list of chunks + all_chunks = [] + for chunks in exec_res_list: + all_chunks.extend(chunks) + + # Replace the original texts with the flat list of chunks + shared["texts"] = all_chunks + + print(f"✅ Created {len(all_chunks)} chunks from {len(prep_res)} documents") + return "default" + class EmbedDocumentsNode(BatchNode): def prep(self, shared): """Read texts from shared store and return as an iterable""" diff --git a/cookbook/pocketflow-rag/utils.py b/cookbook/pocketflow-rag/utils.py index 4ced69b..415c6fe 100644 --- a/cookbook/pocketflow-rag/utils.py +++ b/cookbook/pocketflow-rag/utils.py @@ -41,6 +41,11 @@ def get_openai_embedding(text): # Convert to numpy array for consistency with other embedding functions return np.array(embedding, dtype=np.float32) +def fixed_size_chunk(text, chunk_size=2000): + chunks = [] + for i in range(0, len(text), chunk_size): + chunks.append(text[i : i + chunk_size]) + return chunks if __name__ == "__main__": # Test the embedding function