From 25b742e29aca14c7c771a5a9e8b40b6b853607f5 Mon Sep 17 00:00:00 2001 From: zachary62 Date: Thu, 20 Mar 2025 13:20:10 -0400 Subject: [PATCH] update parallel tutorial --- .cursorrules | 1096 ++--------------- cookbook/parallel_exp.ipynb | 162 +-- .../pocketflow-parallel-batch-node/README.md | 105 -- .../data/article1.txt | 1 - .../data/article2.txt | 1 - .../data/article3.txt | 1 - .../data/summaries.txt | 3 - .../pocketflow-parallel-batch-node/flow.py | 24 - .../pocketflow-parallel-batch-node/main.py | 19 - .../pocketflow-parallel-batch-node/nodes.py | 48 - .../requirements.txt | 4 - .../pocketflow-parallel-batch-node/utils.py | 53 - cookbook/pocketflow-parallel-batch/README.md | 41 + cookbook/pocketflow-parallel-batch/main.py | 103 ++ .../requirements.txt | 1 + 15 files changed, 260 insertions(+), 1402 deletions(-) delete mode 100644 cookbook/pocketflow-parallel-batch-node/README.md delete mode 100644 cookbook/pocketflow-parallel-batch-node/data/article1.txt delete mode 100644 cookbook/pocketflow-parallel-batch-node/data/article2.txt delete mode 100644 cookbook/pocketflow-parallel-batch-node/data/article3.txt delete mode 100644 cookbook/pocketflow-parallel-batch-node/data/summaries.txt delete mode 100644 cookbook/pocketflow-parallel-batch-node/flow.py delete mode 100644 cookbook/pocketflow-parallel-batch-node/main.py delete mode 100644 cookbook/pocketflow-parallel-batch-node/nodes.py delete mode 100644 cookbook/pocketflow-parallel-batch-node/requirements.txt delete mode 100644 cookbook/pocketflow-parallel-batch-node/utils.py create mode 100644 cookbook/pocketflow-parallel-batch/README.md create mode 100644 cookbook/pocketflow-parallel-batch/main.py create mode 100644 cookbook/pocketflow-parallel-batch/requirements.txt diff --git a/.cursorrules b/.cursorrules index 13b5dd4..98255b1 100644 --- a/.cursorrules +++ b/.cursorrules @@ -64,15 +64,32 @@ Agentic Coding should be a collaboration between Human System Design and Agent I - **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system. - For each utility function, implement it and write a simple test. - Document their input/output, as well as why they are necessary. For example: - - *Name*: Embedding (`utils/get_embedding.py`) - - *Input*: `str` - - *Output*: a vector of 3072 floats - - *Necessity:* Used by the second node to embed text + - `name`: `get_embedding` (`utils/get_embedding.py`) + - `input`: `str` + - `output`: a vector of 3072 floats + - `necessity`: Used by the second node to embed text + - Example utility implementation: + ```python + # utils/call_llm.py + from openai import OpenAI + + def call_llm(prompt): + client = OpenAI(api_key="YOUR_API_KEY_HERE") + r = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": prompt}] + ) + return r.choices[0].message.content + + if __name__ == "__main__": + prompt = "What is the meaning of life?" + print(call_llm(prompt)) + ``` - > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. {: .best-practice } 4. **Node Design**: Plan how each node will read and write data, and use utility functions. - - One core design principle for PocketFlow is to use a shared store, so start with a shared store design: + - One core design principle for PocketFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design: - For simple systems, use an in-memory dictionary. - For more complex systems or when persistence is required, use a database. - **Don't Repeat Yourself**: Use in-memory references or foreign keys. @@ -89,7 +106,7 @@ Agentic Coding should be a collaboration between Human System Design and Agent I "results": {} # Empty dict to store outputs } ``` - - For each node, describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: + - For each [Node](./core_abstraction/node.md), describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: - `type`: Regular (or Batch, or Async) - `prep`: Read "text" from the shared store - `exec`: Call the embedding utility function @@ -133,12 +150,71 @@ my_project/ └── design.md ``` -- **`docs/design.md`**: Contains project documentation for each step above. This should be high-level and no-code. +- **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*. - **`utils/`**: Contains all utility functions. - It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. - Each file should also include a `main()` function to try that API call - **`flow.py`**: Implements the system's flow, starting with node definitions followed by the overall structure. + ```python + # flow.py + from pocketflow import Node, Flow + from utils.call_llm import call_llm + + # Example with two nodes in a flow + class GetQuestionNode(Node): + def exec(self, _): + # Get question directly from user input + user_question = input("Enter your question: ") + return user_question + + def post(self, shared, prep_res, exec_res): + # Store the user's question + shared["question"] = exec_res + return "default" # Go to the next node + + class AnswerNode(Node): + def prep(self, shared): + # Read question from shared + return shared["question"] + + def exec(self, question): + # Call LLM to get the answer + return call_llm(question) + + def post(self, shared, prep_res, exec_res): + # Store the answer in shared + shared["answer"] = exec_res + + # Create nodes + get_question_node = GetQuestionNode() + answer_node = AnswerNode() + + # Connect nodes in sequence + get_question_node >> answer_node + + # Create flow starting with input node + qa_flow = Flow(start=get_question_node) + ``` - **`main.py`**: Serves as the project's entry point. + ```python + # main.py + from flow import qa_flow + + # Example main function + # Please replace this with your own main function + def main(): + shared = { + "question": None, # Will be populated by GetQuestionNode from user input + "answer": None # Will be populated by AnswerNode + } + + qa_flow.run(shared) + print(f"Question: {shared['question']}") + print(f"Answer: {shared['answer']}") + + if __name__ == "__main__": + main() + ``` ================================================ File: docs/index.md @@ -1091,196 +1167,6 @@ print("Individual Summaries:", shared["file_summaries"]) print("\nFinal Summary:\n", shared["all_files_summary"]) ``` -================================================ -File: docs/design_pattern/multi_agent.md -================================================ ---- -layout: default -title: "(Advanced) Multi-Agents" -parent: "Design Pattern" -nav_order: 6 ---- - -# (Advanced) Multi-Agents - -Multiple [Agents](./flow.md) can work together by handling subtasks and communicating the progress. -Communication between agents is typically implemented using message queues in shared storage. - -> Most of time, you don't need Multi-Agents. Start with a simple solution first. -{: .best-practice } - -### Example Agent Communication: Message Queue - -Here's a simple example showing how to implement agent communication using `asyncio.Queue`. -The agent listens for messages, processes them, and continues listening: - -```python -class AgentNode(AsyncNode): - async def prep_async(self, _): - message_queue = self.params["messages"] - message = await message_queue.get() - print(f"Agent received: {message}") - return message - -# Create node and flow -agent = AgentNode() -agent >> agent # connect to self -flow = AsyncFlow(start=agent) - -# Create heartbeat sender -async def send_system_messages(message_queue): - counter = 0 - messages = [ - "System status: all systems operational", - "Memory usage: normal", - "Network connectivity: stable", - "Processing load: optimal" - ] - - while True: - message = f"{messages[counter % len(messages)]} | timestamp_{counter}" - await message_queue.put(message) - counter += 1 - await asyncio.sleep(1) - -async def main(): - message_queue = asyncio.Queue() - shared = {} - flow.set_params({"messages": message_queue}) - - # Run both coroutines - await asyncio.gather( - flow.run_async(shared), - send_system_messages(message_queue) - ) - -asyncio.run(main()) -``` - -The output: - -``` -Agent received: System status: all systems operational | timestamp_0 -Agent received: Memory usage: normal | timestamp_1 -Agent received: Network connectivity: stable | timestamp_2 -Agent received: Processing load: optimal | timestamp_3 -``` - -### Interactive Multi-Agent Example: Taboo Game - -Here's a more complex example where two agents play the word-guessing game Taboo. -One agent provides hints while avoiding forbidden words, and another agent tries to guess the target word: - -```python -class AsyncHinter(AsyncNode): - async def prep_async(self, shared): - guess = await shared["hinter_queue"].get() - if guess == "GAME_OVER": - return None - return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", []) - - async def exec_async(self, inputs): - if inputs is None: - return None - target, forbidden, past_guesses = inputs - prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}" - if past_guesses: - prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific." - prompt += "\nUse at most 5 words." - - hint = call_llm(prompt) - print(f"\nHinter: Here's your hint - {hint}") - return hint - - async def post_async(self, shared, prep_res, exec_res): - if exec_res is None: - return "end" - await shared["guesser_queue"].put(exec_res) - return "continue" - -class AsyncGuesser(AsyncNode): - async def prep_async(self, shared): - hint = await shared["guesser_queue"].get() - return hint, shared.get("past_guesses", []) - - async def exec_async(self, inputs): - hint, past_guesses = inputs - prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess. Directly reply a single word:" - guess = call_llm(prompt) - print(f"Guesser: I guess it's - {guess}") - return guess - - async def post_async(self, shared, prep_res, exec_res): - if exec_res.lower() == shared["target_word"].lower(): - print("Game Over - Correct guess!") - await shared["hinter_queue"].put("GAME_OVER") - return "end" - - if "past_guesses" not in shared: - shared["past_guesses"] = [] - shared["past_guesses"].append(exec_res) - - await shared["hinter_queue"].put(exec_res) - return "continue" - -async def main(): - # Set up game - shared = { - "target_word": "nostalgia", - "forbidden_words": ["memory", "past", "remember", "feeling", "longing"], - "hinter_queue": asyncio.Queue(), - "guesser_queue": asyncio.Queue() - } - - print("Game starting!") - print(f"Target word: {shared['target_word']}") - print(f"Forbidden words: {shared['forbidden_words']}") - - # Initialize by sending empty guess to hinter - await shared["hinter_queue"].put("") - - # Create nodes and flows - hinter = AsyncHinter() - guesser = AsyncGuesser() - - # Set up flows - hinter_flow = AsyncFlow(start=hinter) - guesser_flow = AsyncFlow(start=guesser) - - # Connect nodes to themselves - hinter - "continue" >> hinter - guesser - "continue" >> guesser - - # Run both agents concurrently - await asyncio.gather( - hinter_flow.run_async(shared), - guesser_flow.run_async(shared) - ) - -asyncio.run(main()) -``` - -The Output: - -``` -Game starting! -Target word: nostalgia -Forbidden words: ['memory', 'past', 'remember', 'feeling', 'longing'] - -Hinter: Here's your hint - Thinking of childhood summer days -Guesser: I guess it's - popsicle - -Hinter: Here's your hint - When childhood cartoons make you emotional -Guesser: I guess it's - nostalgic - -Hinter: Here's your hint - When old songs move you -Guesser: I guess it's - memories - -Hinter: Here's your hint - That warm emotion about childhood -Guesser: I guess it's - nostalgia -Game Over - Correct guess! -``` - ================================================ File: docs/design_pattern/rag.md ================================================ @@ -1621,178 +1507,6 @@ writing_flow.run(shared) For *dynamic cases*, consider using [Agents](./agent.md). -================================================ -File: docs/utility_function/chunking.md -================================================ ---- -layout: default -title: "Text Chunking" -parent: "Utility Function" -nav_order: 4 ---- - -# Text Chunking - -We recommend some implementations of commonly used text chunking approaches. - - -> Text Chunking is more a micro optimization, compared to the Flow Design. -> -> It's recommended to start with the Naive Chunking and optimize later. -{: .best-practice } - ---- - -## Example Python Code Samples - -### 1. Naive (Fixed-Size) Chunking -Splits text by a fixed number of words, ignoring sentence or semantic boundaries. - -```python -def fixed_size_chunk(text, chunk_size=100): - chunks = [] - for i in range(0, len(text), chunk_size): - chunks.append(text[i : i + chunk_size]) - return chunks -``` - -However, sentences are often cut awkwardly, losing coherence. - -### 2. Sentence-Based Chunking - -```python -import nltk - -def sentence_based_chunk(text, max_sentences=2): - sentences = nltk.sent_tokenize(text) - chunks = [] - for i in range(0, len(sentences), max_sentences): - chunks.append(" ".join(sentences[i : i + max_sentences])) - return chunks -``` - -However, might not handle very long sentences or paragraphs well. - -### 3. Other Chunking - -- **Paragraph-Based**: Split text by paragraphs (e.g., newlines). Large paragraphs can create big chunks. -- **Semantic**: Use embeddings or topic modeling to chunk by semantic boundaries. -- **Agentic**: Use an LLM to decide chunk boundaries based on context or meaning. - -================================================ -File: docs/utility_function/embedding.md -================================================ ---- -layout: default -title: "Embedding" -parent: "Utility Function" -nav_order: 5 ---- - -# Embedding - -Below you will find an overview table of various text embedding APIs, along with example Python code. - -> Embedding is more a micro optimization, compared to the Flow Design. -> -> It's recommended to start with the most convenient one and optimize later. -{: .best-practice } - - -| **API** | **Free Tier** | **Pricing Model** | **Docs** | -| --- | --- | --- | --- | -| **OpenAI** | ~$5 credit | ~$0.0001/1K tokens | [OpenAI Embeddings](https://platform.openai.com/docs/api-reference/embeddings) | -| **Azure OpenAI** | $200 credit | Same as OpenAI (~$0.0001/1K tokens) | [Azure OpenAI Embeddings](https://learn.microsoft.com/azure/cognitive-services/openai/how-to/create-resource?tabs=portal) | -| **Google Vertex AI** | $300 credit | ~$0.025 / million chars | [Vertex AI Embeddings](https://cloud.google.com/vertex-ai/docs/generative-ai/embeddings/get-text-embeddings) | -| **AWS Bedrock** | No free tier, but AWS credits may apply | ~$0.00002/1K tokens (Titan V2) | [Amazon Bedrock](https://docs.aws.amazon.com/bedrock/) | -| **Cohere** | Limited free tier | ~$0.0001/1K tokens | [Cohere Embeddings](https://docs.cohere.com/docs/cohere-embed) | -| **Hugging Face** | ~$0.10 free compute monthly | Pay per second of compute | [HF Inference API](https://huggingface.co/docs/api-inference) | -| **Jina** | 1M tokens free | Pay per token after | [Jina Embeddings](https://jina.ai/embeddings/) | - -## Example Python Code - -### 1. OpenAI -```python -import openai - -openai.api_key = "YOUR_API_KEY" -resp = openai.Embedding.create(model="text-embedding-ada-002", input="Hello world") -vec = resp["data"][0]["embedding"] -print(vec) -``` - -### 2. Azure OpenAI -```python -import openai - -openai.api_type = "azure" -openai.api_base = "https://YOUR_RESOURCE_NAME.openai.azure.com" -openai.api_version = "2023-03-15-preview" -openai.api_key = "YOUR_AZURE_API_KEY" - -resp = openai.Embedding.create(engine="ada-embedding", input="Hello world") -vec = resp["data"][0]["embedding"] -print(vec) -``` - -### 3. Google Vertex AI -```python -from vertexai.preview.language_models import TextEmbeddingModel -import vertexai - -vertexai.init(project="YOUR_GCP_PROJECT_ID", location="us-central1") -model = TextEmbeddingModel.from_pretrained("textembedding-gecko@001") - -emb = model.get_embeddings(["Hello world"]) -print(emb[0]) -``` - -### 4. AWS Bedrock -```python -import boto3, json - -client = boto3.client("bedrock-runtime", region_name="us-east-1") -body = {"inputText": "Hello world"} -resp = client.invoke_model(modelId="amazon.titan-embed-text-v2:0", contentType="application/json", body=json.dumps(body)) -resp_body = json.loads(resp["body"].read()) -vec = resp_body["embedding"] -print(vec) -``` - -### 5. Cohere -```python -import cohere - -co = cohere.Client("YOUR_API_KEY") -resp = co.embed(texts=["Hello world"]) -vec = resp.embeddings[0] -print(vec) -``` - -### 6. Hugging Face -```python -import requests - -API_URL = "https://api-inference.huggingface.co/models/sentence-transformers/all-MiniLM-L6-v2" -HEADERS = {"Authorization": "Bearer YOUR_HF_TOKEN"} - -res = requests.post(API_URL, headers=HEADERS, json={"inputs": "Hello world"}) -vec = res.json()[0] -print(vec) -``` - -### 7. Jina -```python -import requests - -url = "https://api.jina.ai/v2/embed" -headers = {"Authorization": "Bearer YOUR_JINA_TOKEN"} -payload = {"data": ["Hello world"], "model": "jina-embeddings-v3"} -res = requests.post(url, headers=headers, json=payload) -vec = res.json()["data"][0]["embedding"] -print(vec) -``` - ================================================ File: docs/utility_function/llm.md ================================================ @@ -1937,646 +1651,4 @@ def call_llm(prompt): response = ... # Your implementation here logging.info(f"Response: {response}") return response -``` - -================================================ -File: docs/utility_function/text_to_speech.md -================================================ ---- -layout: default -title: "Text-to-Speech" -parent: "Utility Function" -nav_order: 7 ---- - -# Text-to-Speech - -| **Service** | **Free Tier** | **Pricing Model** | **Docs** | -|----------------------|-----------------------|--------------------------------------------------------------|---------------------------------------------------------------------| -| **Amazon Polly** | 5M std + 1M neural | ~$4 /M (std), ~$16 /M (neural) after free tier | [Polly Docs](https://aws.amazon.com/polly/) | -| **Google Cloud TTS** | 4M std + 1M WaveNet | ~$4 /M (std), ~$16 /M (WaveNet) pay-as-you-go | [Cloud TTS Docs](https://cloud.google.com/text-to-speech) | -| **Azure TTS** | 500K neural ongoing | ~$15 /M (neural), discount at higher volumes | [Azure TTS Docs](https://azure.microsoft.com/products/cognitive-services/text-to-speech/) | -| **IBM Watson TTS** | 10K chars Lite plan | ~$0.02 /1K (i.e. ~$20 /M). Enterprise options available | [IBM Watson Docs](https://www.ibm.com/cloud/watson-text-to-speech) | -| **ElevenLabs** | 10K chars monthly | From ~$5/mo (30K chars) up to $330/mo (2M chars). Enterprise | [ElevenLabs Docs](https://elevenlabs.io) | - -## Example Python Code - -### Amazon Polly -```python -import boto3 - -polly = boto3.client("polly", region_name="us-east-1", - aws_access_key_id="YOUR_AWS_ACCESS_KEY_ID", - aws_secret_access_key="YOUR_AWS_SECRET_ACCESS_KEY") - -resp = polly.synthesize_speech( - Text="Hello from Polly!", - OutputFormat="mp3", - VoiceId="Joanna" -) - -with open("polly.mp3", "wb") as f: - f.write(resp["AudioStream"].read()) -``` - -### Google Cloud TTS -```python -from google.cloud import texttospeech - -client = texttospeech.TextToSpeechClient() -input_text = texttospeech.SynthesisInput(text="Hello from Google Cloud TTS!") -voice = texttospeech.VoiceSelectionParams(language_code="en-US") -audio_cfg = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) - -resp = client.synthesize_speech(input=input_text, voice=voice, audio_config=audio_cfg) - -with open("gcloud_tts.mp3", "wb") as f: - f.write(resp.audio_content) -``` - -### Azure TTS -```python -import azure.cognitiveservices.speech as speechsdk - -speech_config = speechsdk.SpeechConfig( - subscription="AZURE_KEY", region="AZURE_REGION") -audio_cfg = speechsdk.audio.AudioConfig(filename="azure_tts.wav") - -synthesizer = speechsdk.SpeechSynthesizer( - speech_config=speech_config, - audio_config=audio_cfg -) - -synthesizer.speak_text_async("Hello from Azure TTS!").get() -``` - -### IBM Watson TTS -```python -from ibm_watson import TextToSpeechV1 -from ibm_cloud_sdk_core.authenticators import IAMAuthenticator - -auth = IAMAuthenticator("IBM_API_KEY") -service = TextToSpeechV1(authenticator=auth) -service.set_service_url("IBM_SERVICE_URL") - -resp = service.synthesize( - "Hello from IBM Watson!", - voice="en-US_AllisonV3Voice", - accept="audio/mp3" -).get_result() - -with open("ibm_tts.mp3", "wb") as f: - f.write(resp.content) -``` - -### ElevenLabs -```python -import requests - -api_key = "ELEVENLABS_KEY" -voice_id = "ELEVENLABS_VOICE" -url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" -headers = {"xi-api-key": api_key, "Content-Type": "application/json"} - -json_data = { - "text": "Hello from ElevenLabs!", - "voice_settings": {"stability": 0.75, "similarity_boost": 0.75} -} - -resp = requests.post(url, headers=headers, json=json_data) - -with open("elevenlabs.mp3", "wb") as f: - f.write(resp.content) -``` - -================================================ -File: docs/utility_function/vector.md -================================================ ---- -layout: default -title: "Vector Databases" -parent: "Utility Function" -nav_order: 6 ---- - -# Vector Databases - - -Below is a table of the popular vector search solutions: - -| **Tool** | **Free Tier** | **Pricing Model** | **Docs** | -| --- | --- | --- | --- | -| **FAISS** | N/A, self-host | Open-source | [Faiss.ai](https://faiss.ai) | -| **Pinecone** | 2GB free | From $25/mo | [pinecone.io](https://pinecone.io) | -| **Qdrant** | 1GB free cloud | Pay-as-you-go | [qdrant.tech](https://qdrant.tech) | -| **Weaviate** | 14-day sandbox | From $25/mo | [weaviate.io](https://weaviate.io) | -| **Milvus** | 5GB free cloud | PAYG or $99/mo dedicated | [milvus.io](https://milvus.io) | -| **Chroma** | N/A, self-host | Free (Apache 2.0) | [trychroma.com](https://trychroma.com) | -| **Redis** | 30MB free | From $5/mo | [redis.io](https://redis.io) | - ---- -## Example Python Code - -Below are basic usage snippets for each tool. - -### FAISS -```python -import faiss -import numpy as np - -# Dimensionality of embeddings -d = 128 - -# Create a flat L2 index -index = faiss.IndexFlatL2(d) - -# Random vectors -data = np.random.random((1000, d)).astype('float32') -index.add(data) - -# Query -query = np.random.random((1, d)).astype('float32') -D, I = index.search(query, k=5) - -print("Distances:", D) -print("Neighbors:", I) -``` - -### Pinecone -```python -import pinecone - -pinecone.init(api_key="YOUR_API_KEY", environment="YOUR_ENV") - -index_name = "my-index" - -# Create the index if it doesn't exist -if index_name not in pinecone.list_indexes(): - pinecone.create_index(name=index_name, dimension=128) - -# Connect -index = pinecone.Index(index_name) - -# Upsert -vectors = [ - ("id1", [0.1]*128), - ("id2", [0.2]*128) -] -index.upsert(vectors) - -# Query -response = index.query([[0.15]*128], top_k=3) -print(response) -``` - -### Qdrant -```python -import qdrant_client -from qdrant_client.models import Distance, VectorParams, PointStruct - -client = qdrant_client.QdrantClient( - url="https://YOUR-QDRANT-CLOUD-ENDPOINT", - api_key="YOUR_API_KEY" -) - -collection = "my_collection" -client.recreate_collection( - collection_name=collection, - vectors_config=VectorParams(size=128, distance=Distance.COSINE) -) - -points = [ - PointStruct(id=1, vector=[0.1]*128, payload={"type": "doc1"}), - PointStruct(id=2, vector=[0.2]*128, payload={"type": "doc2"}), -] - -client.upsert(collection_name=collection, points=points) - -results = client.search( - collection_name=collection, - query_vector=[0.15]*128, - limit=2 -) -print(results) -``` - -### Weaviate -```python -import weaviate - -client = weaviate.Client("https://YOUR-WEAVIATE-CLOUD-ENDPOINT") - -schema = { - "classes": [ - { - "class": "Article", - "vectorizer": "none" - } - ] -} -client.schema.create(schema) - -obj = { - "title": "Hello World", - "content": "Weaviate vector search" -} -client.data_object.create(obj, "Article", vector=[0.1]*128) - -resp = ( - client.query - .get("Article", ["title", "content"]) - .with_near_vector({"vector": [0.15]*128}) - .with_limit(3) - .do() -) -print(resp) -``` - -### Milvus -```python -from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection -import numpy as np - -connections.connect(alias="default", host="localhost", port="19530") - -fields = [ - FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), - FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128) -] -schema = CollectionSchema(fields) -collection = Collection("MyCollection", schema) - -emb = np.random.rand(10, 128).astype('float32') -ids = list(range(10)) -collection.insert([ids, emb]) - -index_params = { - "index_type": "IVF_FLAT", - "params": {"nlist": 128}, - "metric_type": "L2" -} -collection.create_index("embedding", index_params) -collection.load() - -query_emb = np.random.rand(1, 128).astype('float32') -results = collection.search(query_emb, "embedding", param={"nprobe": 10}, limit=3) -print(results) -``` - -### Chroma -```python -import chromadb -from chromadb.config import Settings - -client = chromadb.Client(Settings( - chroma_db_impl="duckdb+parquet", - persist_directory="./chroma_data" -)) - -coll = client.create_collection("my_collection") - -vectors = [[0.1, 0.2, 0.3], [0.2, 0.2, 0.2]] -metas = [{"doc": "text1"}, {"doc": "text2"}] -ids = ["id1", "id2"] -coll.add(embeddings=vectors, metadatas=metas, ids=ids) - -res = coll.query(query_embeddings=[[0.15, 0.25, 0.3]], n_results=2) -print(res) -``` - -### Redis -```python -import redis -import struct - -r = redis.Redis(host="localhost", port=6379) - -# Create index -r.execute_command( - "FT.CREATE", "my_idx", "ON", "HASH", - "SCHEMA", "embedding", "VECTOR", "FLAT", "6", - "TYPE", "FLOAT32", "DIM", "128", - "DISTANCE_METRIC", "L2" -) - -# Insert -vec = struct.pack('128f', *[0.1]*128) -r.hset("doc1", mapping={"embedding": vec}) - -# Search -qvec = struct.pack('128f', *[0.15]*128) -q = "*=>[KNN 3 @embedding $BLOB AS dist]" -res = r.ft("my_idx").search(q, query_params={"BLOB": qvec}) -print(res.docs) -``` - -================================================ -File: docs/utility_function/viz.md -================================================ ---- -layout: default -title: "Viz and Debug" -parent: "Utility Function" -nav_order: 2 ---- - -# Visualization and Debugging - -Similar to LLM wrappers, we **don't** provide built-in visualization and debugging. Here, we recommend some *minimal* (and incomplete) implementations These examples can serve as a starting point for your own tooling. - -## 1. Visualization with Mermaid - -This code recursively traverses the nested graph, assigns unique IDs to each node, and treats Flow nodes as subgraphs to generate Mermaid syntax for a hierarchical visualization. - -{% raw %} -```python -def build_mermaid(start): - ids, visited, lines = {}, set(), ["graph LR"] - ctr = 1 - def get_id(n): - nonlocal ctr - return ids[n] if n in ids else (ids.setdefault(n, f"N{ctr}"), (ctr := ctr + 1))[0] - def link(a, b): - lines.append(f" {a} --> {b}") - def walk(node, parent=None): - if node in visited: - return parent and link(parent, get_id(node)) - visited.add(node) - if isinstance(node, Flow): - node.start and parent and link(parent, get_id(node.start)) - lines.append(f"\n subgraph sub_flow_{get_id(node)}[{type(node).__name__}]") - node.start and walk(node.start) - for nxt in node.successors.values(): - node.start and walk(nxt, get_id(node.start)) or (parent and link(parent, get_id(nxt))) or walk(nxt) - lines.append(" end\n") - else: - lines.append(f" {(nid := get_id(node))}['{type(node).__name__}']") - parent and link(parent, nid) - [walk(nxt, nid) for nxt in node.successors.values()] - walk(start) - return "\n".join(lines) -``` -{% endraw %} - -For example, suppose we have a complex Flow for data science: - -```python -class DataPrepBatchNode(BatchNode): - def prep(self,shared): return [] -class ValidateDataNode(Node): pass -class FeatureExtractionNode(Node): pass -class TrainModelNode(Node): pass -class EvaluateModelNode(Node): pass -class ModelFlow(Flow): pass -class DataScienceFlow(Flow):pass - -feature_node = FeatureExtractionNode() -train_node = TrainModelNode() -evaluate_node = EvaluateModelNode() -feature_node >> train_node >> evaluate_node -model_flow = ModelFlow(start=feature_node) -data_prep_node = DataPrepBatchNode() -validate_node = ValidateDataNode() -data_prep_node >> validate_node >> model_flow -data_science_flow = DataScienceFlow(start=data_prep_node) -result = build_mermaid(start=data_science_flow) -``` - -The code generates a Mermaid diagram: - -```mermaid -graph LR - subgraph sub_flow_N1[DataScienceFlow] - N2['DataPrepBatchNode'] - N3['ValidateDataNode'] - N2 --> N3 - N3 --> N4 - - subgraph sub_flow_N5[ModelFlow] - N4['FeatureExtractionNode'] - N6['TrainModelNode'] - N4 --> N6 - N7['EvaluateModelNode'] - N6 --> N7 - end - - end -``` - -## 2. Call Stack Debugging - -It would be useful to print the Node call stacks for debugging. This can be achieved by inspecting the runtime call stack: - -```python -import inspect - -def get_node_call_stack(): - stack = inspect.stack() - node_names = [] - seen_ids = set() - for frame_info in stack[1:]: - local_vars = frame_info.frame.f_locals - if 'self' in local_vars: - caller_self = local_vars['self'] - if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids: - seen_ids.add(id(caller_self)) - node_names.append(type(caller_self).__name__) - return node_names -``` - -For example, suppose we have a complex Flow for data science: - -```python -class DataPrepBatchNode(BatchNode): - def prep(self, shared): return [] -class ValidateDataNode(Node): pass -class FeatureExtractionNode(Node): pass -class TrainModelNode(Node): pass -class EvaluateModelNode(Node): - def prep(self, shared): - stack = get_node_call_stack() - print("Call stack:", stack) -class ModelFlow(Flow): pass -class DataScienceFlow(Flow):pass - -feature_node = FeatureExtractionNode() -train_node = TrainModelNode() -evaluate_node = EvaluateModelNode() -feature_node >> train_node >> evaluate_node -model_flow = ModelFlow(start=feature_node) -data_prep_node = DataPrepBatchNode() -validate_node = ValidateDataNode() -data_prep_node >> validate_node >> model_flow -data_science_flow = DataScienceFlow(start=data_prep_node) -data_science_flow.run({}) -``` - -The output would be: `Call stack: ['EvaluateModelNode', 'ModelFlow', 'DataScienceFlow']` - -================================================ -File: docs/utility_function/websearch.md -================================================ ---- -layout: default -title: "Web Search" -parent: "Utility Function" -nav_order: 3 ---- -# Web Search - -We recommend some implementations of commonly used web search tools. - -| **API** | **Free Tier** | **Pricing Model** | **Docs** | -|---------------------------------|-----------------------------------------------|-----------------------------------------------------------------|------------------------------------------------------------------------| -| **Google Custom Search JSON API** | 100 queries/day free | $5 per 1000 queries. | [Link](https://developers.google.com/custom-search/v1/overview) | -| **Bing Web Search API** | 1,000 queries/month | $15–$25 per 1,000 queries. | [Link](https://azure.microsoft.com/en-us/services/cognitive-services/bing-web-search-api/) | -| **DuckDuckGo Instant Answer** | Completely free (Instant Answers only, **no URLs**) | No paid plans; usage unlimited, but data is limited | [Link](https://duckduckgo.com/api) | -| **Brave Search API** | 2,000 queries/month free | $3 per 1k queries for Base, $5 per 1k for Pro | [Link](https://brave.com/search/api/) | -| **SerpApi** | 100 searches/month free | Start at $75/month for 5,000 searches| [Link](https://serpapi.com/) | -| **RapidAPI** | Many options | Many options | [Link](https://rapidapi.com/search?term=search&sortBy=ByRelevance) | - -## Example Python Code - -### 1. Google Custom Search JSON API -```python -import requests - -API_KEY = "YOUR_API_KEY" -CX_ID = "YOUR_CX_ID" -query = "example" - -url = "https://www.googleapis.com/customsearch/v1" -params = { - "key": API_KEY, - "cx": CX_ID, - "q": query -} - -response = requests.get(url, params=params) -results = response.json() -print(results) -``` - -### 2. Bing Web Search API -```python -import requests - -SUBSCRIPTION_KEY = "YOUR_BING_API_KEY" -query = "example" - -url = "https://api.bing.microsoft.com/v7.0/search" -headers = {"Ocp-Apim-Subscription-Key": SUBSCRIPTION_KEY} -params = {"q": query} - -response = requests.get(url, headers=headers, params=params) -results = response.json() -print(results) -``` - -### 3. DuckDuckGo Instant Answer -```python -import requests - -query = "example" -url = "https://api.duckduckgo.com/" -params = { - "q": query, - "format": "json" -} - -response = requests.get(url, params=params) -results = response.json() -print(results) -``` - -### 4. Brave Search API -```python -import requests - -SUBSCRIPTION_TOKEN = "YOUR_BRAVE_API_TOKEN" -query = "example" - -url = "https://api.search.brave.com/res/v1/web/search" -headers = { - "X-Subscription-Token": SUBSCRIPTION_TOKEN -} -params = { - "q": query -} - -response = requests.get(url, headers=headers, params=params) -results = response.json() -print(results) -``` - -### 5. SerpApi -```python -import requests - -API_KEY = "YOUR_SERPAPI_KEY" -query = "example" - -url = "https://serpapi.com/search" -params = { - "engine": "google", - "q": query, - "api_key": API_KEY -} - -response = requests.get(url, params=params) -results = response.json() -print(results) -``` - -================================================ -File: docs/_config.yml -================================================ -# Basic site settings -title: Pocket Flow -tagline: A 100-line LLM framework -description: Minimalist LLM Framework in 100 Lines, Enabling LLMs to Program Themselves - -# Theme settings -remote_theme: just-the-docs/just-the-docs - -# Navigation -nav_sort: case_sensitive - -# Aux links (shown in upper right) -aux_links: - "View on GitHub": - - "//github.com/the-pocket/PocketFlow" - -# Color scheme -color_scheme: light - -# Author settings -author: - name: Zachary Huang - url: https://www.columbia.edu/~zh2408/ - twitter: ZacharyHuang12 - -# Mermaid settings -mermaid: - version: "9.1.3" # Pick the version you want - # Default configuration - config: | - directionLR - -# Callouts settings -callouts: - warning: - title: Warning - color: red - note: - title: Note - color: blue - best-practice: - title: Best Practice - color: green - -# The custom navigation -nav: - - Home: index.md # Link to your main docs index - - GitHub: "https://github.com/the-pocket/PocketFlow" - - Discord: "https://discord.gg/hUHHE9Sa6T" \ No newline at end of file +``` \ No newline at end of file diff --git a/cookbook/parallel_exp.ipynb b/cookbook/parallel_exp.ipynb index 751e7bc..8d9cf3d 100644 --- a/cookbook/parallel_exp.ipynb +++ b/cookbook/parallel_exp.ipynb @@ -1,18 +1,4 @@ { - "nbformat": 4, - "nbformat_minor": 0, - "metadata": { - "colab": { - "provenance": [] - }, - "kernelspec": { - "name": "python3", - "display_name": "Python 3" - }, - "language_info": { - "name": "python" - } - }, "cells": [ { "cell_type": "code", @@ -26,8 +12,8 @@ }, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "Collecting pocketflow\n", " Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n", @@ -43,121 +29,16 @@ }, { "cell_type": "code", - "source": [ - "import asyncio\n", - "import time\n", - "\n", - "from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow\n", - "\n", - "####################################\n", - "# Dummy async function (1s delay)\n", - "####################################\n", - "async def dummy_llm_summarize(text):\n", - " \"\"\"Simulates an async LLM call that takes 1 second.\"\"\"\n", - " await asyncio.sleep(1)\n", - " return f\"Summarized({len(text)} chars)\"\n", - "\n", - "###############################################\n", - "# 1) AsyncBatchNode (sequential) version\n", - "###############################################\n", - "\n", - "class SummariesAsyncNode(AsyncBatchNode):\n", - " \"\"\"\n", - " Processes items sequentially in an async manner.\n", - " The next item won't start until the previous item has finished.\n", - " \"\"\"\n", - "\n", - " async def prep_async(self, shared):\n", - " # Return a list of items to process.\n", - " # Each item is (filename, content).\n", - " return list(shared[\"data\"].items())\n", - "\n", - " async def exec_async(self, item):\n", - " filename, content = item\n", - " print(f\"[Sequential] Summarizing {filename}...\")\n", - " summary = await dummy_llm_summarize(content)\n", - " return (filename, summary)\n", - "\n", - " async def post_async(self, shared, prep_res, exec_res_list):\n", - " # exec_res_list is a list of (filename, summary)\n", - " shared[\"sequential_summaries\"] = dict(exec_res_list)\n", - " return \"done_sequential\"\n", - "\n", - "###############################################\n", - "# 2) AsyncParallelBatchNode (concurrent) version\n", - "###############################################\n", - "\n", - "class SummariesAsyncParallelNode(AsyncParallelBatchNode):\n", - " \"\"\"\n", - " Processes items in parallel. Many LLM calls start at once.\n", - " \"\"\"\n", - "\n", - " async def prep_async(self, shared):\n", - " return list(shared[\"data\"].items())\n", - "\n", - " async def exec_async(self, item):\n", - " filename, content = item\n", - " print(f\"[Parallel] Summarizing {filename}...\")\n", - " summary = await dummy_llm_summarize(content)\n", - " return (filename, summary)\n", - "\n", - " async def post_async(self, shared, prep_res, exec_res_list):\n", - " shared[\"parallel_summaries\"] = dict(exec_res_list)\n", - " return \"done_parallel\"\n", - "\n", - "###############################################\n", - "# Demo comparing the two approaches\n", - "###############################################\n", - "\n", - "async def main():\n", - " # We'll use the same data for both flows\n", - " shared_data = {\n", - " \"data\": {\n", - " \"file1.txt\": \"Hello world 1\",\n", - " \"file2.txt\": \"Hello world 2\",\n", - " \"file3.txt\": \"Hello world 3\",\n", - " }\n", - " }\n", - "\n", - " # 1) Run the sequential version\n", - " seq_node = SummariesAsyncNode()\n", - " seq_flow = AsyncFlow(start=seq_node)\n", - "\n", - " print(\"\\n=== Running Sequential (AsyncBatchNode) ===\")\n", - " t0 = time.time()\n", - " await seq_flow.run_async(shared_data)\n", - " t1 = time.time()\n", - "\n", - " # 2) Run the parallel version\n", - " par_node = SummariesAsyncParallelNode()\n", - " par_flow = AsyncFlow(start=par_node)\n", - "\n", - " print(\"\\n=== Running Parallel (AsyncParallelBatchNode) ===\")\n", - " t2 = time.time()\n", - " await par_flow.run_async(shared_data)\n", - " t3 = time.time()\n", - "\n", - " # Show times\n", - " print(\"\\n--- Results ---\")\n", - " print(f\"Sequential Summaries: {shared_data.get('sequential_summaries')}\")\n", - " print(f\"Parallel Summaries: {shared_data.get('parallel_summaries')}\")\n", - "\n", - " print(f\"Sequential took: {t1 - t0:.2f} seconds\")\n", - " print(f\"Parallel took: {t3 - t2:.2f} seconds\")\n" - ], + "execution_count": 3, "metadata": { "id": "mHZpGv8txy4L" }, - "execution_count": 3, - "outputs": [] + "outputs": [], + "source": [] }, { "cell_type": "code", - "source": [ - "# if in a py project\n", - "# asyncio.run(main())\n", - "await main()" - ], + "execution_count": 5, "metadata": { "colab": { "base_uri": "https://localhost:8080/" @@ -165,11 +46,10 @@ "id": "zfnhW3f-0W6o", "outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f" }, - "execution_count": 5, "outputs": [ { - "output_type": "stream", "name": "stdout", + "output_type": "stream", "text": [ "\n", "=== Running Sequential (AsyncBatchNode) ===\n", @@ -189,16 +69,36 @@ "Parallel took: 1.00 seconds\n" ] } + ], + "source": [ + "# if in a notebook\n", + "await main()\n", + "\n", + "asyncio.run(main())" ] }, { "cell_type": "code", - "source": [], + "execution_count": null, "metadata": { "id": "ystwa74D0Z_k" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [] } - ] -} \ No newline at end of file + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/cookbook/pocketflow-parallel-batch-node/README.md b/cookbook/pocketflow-parallel-batch-node/README.md deleted file mode 100644 index 93e7c25..0000000 --- a/cookbook/pocketflow-parallel-batch-node/README.md +++ /dev/null @@ -1,105 +0,0 @@ -# PocketFlow Parallel Batch Node Example - -This example demonstrates parallel processing using AsyncParallelBatchNode to summarize multiple news articles concurrently. It shows how to: -1. Process multiple items in parallel -2. Handle I/O-bound tasks efficiently -3. Manage rate limits with throttling - -## What this Example Does - -When you run the example: -1. It loads multiple news articles from a data directory -2. Processes them in parallel using AsyncParallelBatchNode -3. For each article: - - Extracts key information - - Generates a summary using an LLM - - Saves the results -4. Combines all summaries into a final report - -## How it Works - -The example uses AsyncParallelBatchNode to process articles in parallel: - -```python -class ParallelSummarizer(AsyncParallelBatchNode): - async def prep_async(self, shared): - # Return list of articles to process - return shared["articles"] - - async def exec_async(self, article): - # Process single article (called in parallel) - summary = await call_llm_async(f"Summarize: {article}") - return summary - - async def post_async(self, shared, prep_res, summaries): - # Combine all summaries - shared["summaries"] = summaries - return "default" -``` - -Key features demonstrated: -- Parallel execution of `exec_async` -- Rate limiting with semaphores -- Error handling for failed requests -- Progress tracking for parallel tasks - -## Project Structure -``` -pocketflow-parallel-batch-node/ -├── README.md -├── requirements.txt -├── data/ -│ ├── article1.txt -│ ├── article2.txt -│ └── article3.txt -├── main.py -├── flow.py -├── nodes.py -└── utils.py -``` - -## Running the Example - -```bash -# Install dependencies -pip install -r requirements.txt - -# Run the example -python main.py -``` - -## Sample Output -``` -Loading articles... -Found 3 articles to process - -Processing in parallel... -[1/3] Processing article1.txt... -[2/3] Processing article2.txt... -[3/3] Processing article3.txt... - -Summaries generated: -1. First article summary... -2. Second article summary... -3. Third article summary... - -Final report saved to: summaries.txt -``` - -## Key Concepts - -1. **Parallel Processing** - - Using AsyncParallelBatchNode for concurrent execution - - Managing parallel tasks efficiently - -2. **Rate Limiting** - - Using semaphores to control concurrent requests - - Avoiding API rate limits - -3. **Error Handling** - - Graceful handling of failed requests - - Retrying failed tasks - -4. **Progress Tracking** - - Monitoring parallel task progress - - Providing user feedback \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-node/data/article1.txt b/cookbook/pocketflow-parallel-batch-node/data/article1.txt deleted file mode 100644 index 7705397..0000000 --- a/cookbook/pocketflow-parallel-batch-node/data/article1.txt +++ /dev/null @@ -1 +0,0 @@ -Article 1: AI advances in 2024... \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-node/data/article2.txt b/cookbook/pocketflow-parallel-batch-node/data/article2.txt deleted file mode 100644 index fd40736..0000000 --- a/cookbook/pocketflow-parallel-batch-node/data/article2.txt +++ /dev/null @@ -1 +0,0 @@ -Article 2: New quantum computing breakthrough... \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-node/data/article3.txt b/cookbook/pocketflow-parallel-batch-node/data/article3.txt deleted file mode 100644 index 84d2f94..0000000 --- a/cookbook/pocketflow-parallel-batch-node/data/article3.txt +++ /dev/null @@ -1 +0,0 @@ -Article 3: Latest developments in robotics... \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-node/data/summaries.txt b/cookbook/pocketflow-parallel-batch-node/data/summaries.txt deleted file mode 100644 index 21b71f0..0000000 --- a/cookbook/pocketflow-parallel-batch-node/data/summaries.txt +++ /dev/null @@ -1,3 +0,0 @@ -1. Summary of: Article 1: AI advances in 2024... -2. Summary of: Article 2: New quantum computi... -3. Summary of: Article 3: Latest developments... diff --git a/cookbook/pocketflow-parallel-batch-node/flow.py b/cookbook/pocketflow-parallel-batch-node/flow.py deleted file mode 100644 index 3bdafd8..0000000 --- a/cookbook/pocketflow-parallel-batch-node/flow.py +++ /dev/null @@ -1,24 +0,0 @@ -"""AsyncFlow implementation for parallel article processing.""" - -from pocketflow import AsyncFlow, Node -from nodes import LoadArticles, ParallelSummarizer - -class NoOp(Node): - """Node that does nothing, used to properly end the flow.""" - pass - -def create_flow(): - """Create and connect nodes into a flow.""" - - # Create nodes - loader = LoadArticles() - summarizer = ParallelSummarizer() - end = NoOp() - - # Connect nodes - loader - "process" >> summarizer - summarizer - "default" >> end # Properly end the flow - - # Create flow starting with loader - flow = AsyncFlow(start=loader) - return flow \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-node/main.py b/cookbook/pocketflow-parallel-batch-node/main.py deleted file mode 100644 index 2740660..0000000 --- a/cookbook/pocketflow-parallel-batch-node/main.py +++ /dev/null @@ -1,19 +0,0 @@ -import asyncio -from flow import create_flow - -async def main(): - """Run the parallel processing flow.""" - # Create flow - flow = create_flow() - - # Create shared store - shared = {} - - # Run flow - print("\nParallel Article Summarizer") - print("-------------------------") - await flow.run_async(shared) - -if __name__ == "__main__": - # Run the async main function - asyncio.run(main()) \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-node/nodes.py b/cookbook/pocketflow-parallel-batch-node/nodes.py deleted file mode 100644 index c62e480..0000000 --- a/cookbook/pocketflow-parallel-batch-node/nodes.py +++ /dev/null @@ -1,48 +0,0 @@ -"""AsyncParallelBatchNode implementation for article summarization.""" - -from pocketflow import AsyncParallelBatchNode, AsyncNode -from utils import call_llm_async, load_articles, save_summaries - -class LoadArticles(AsyncNode): - """Node that loads articles to process.""" - - async def prep_async(self, shared): - """Load articles from data directory.""" - print("\nLoading articles...") - articles = await load_articles() - return articles - - async def exec_async(self, articles): - """No processing needed.""" - return articles - - async def post_async(self, shared, prep_res, exec_res): - """Store articles in shared store.""" - shared["articles"] = exec_res - print(f"Found {len(exec_res)} articles to process") - return "process" - -class ParallelSummarizer(AsyncParallelBatchNode): - """Node that summarizes articles in parallel.""" - - async def prep_async(self, shared): - """Get articles from shared store.""" - print("\nProcessing in parallel...") - return shared["articles"] - - async def exec_async(self, article): - """Summarize a single article (called in parallel).""" - summary = await call_llm_async(article) - return summary - - async def post_async(self, shared, prep_res, summaries): - """Store summaries and save to file.""" - shared["summaries"] = summaries - - print("\nSummaries generated:") - for i, summary in enumerate(summaries, 1): - print(f"{i}. {summary}") - - save_summaries(summaries) - print("\nFinal report saved to: summaries.txt") - return "default" \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-node/requirements.txt b/cookbook/pocketflow-parallel-batch-node/requirements.txt deleted file mode 100644 index bb2146e..0000000 --- a/cookbook/pocketflow-parallel-batch-node/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -pocketflow -aiohttp>=3.8.0 # For async HTTP requests -openai>=1.0.0 # For async LLM calls -tqdm>=4.65.0 # For progress bars \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-node/utils.py b/cookbook/pocketflow-parallel-batch-node/utils.py deleted file mode 100644 index 93648d4..0000000 --- a/cookbook/pocketflow-parallel-batch-node/utils.py +++ /dev/null @@ -1,53 +0,0 @@ -"""Utility functions for parallel processing.""" - -import os -import asyncio -import aiohttp -from openai import AsyncOpenAI -from tqdm import tqdm - -# Semaphore to limit concurrent API calls -MAX_CONCURRENT_CALLS = 3 -semaphore = asyncio.Semaphore(MAX_CONCURRENT_CALLS) - -async def call_llm_async(prompt): - """Make async LLM call with rate limiting.""" - async with semaphore: # Limit concurrent calls - print(f"\nProcessing: {prompt[:50]}...") - - # Simulate API call with delay - await asyncio.sleep(1) - - # Mock LLM response (in real app, would call OpenAI) - summary = f"Summary of: {prompt[:30]}..." - return summary - -async def load_articles(): - """Load articles from data directory.""" - # For demo, generate mock articles - articles = [ - "Article 1: AI advances in 2024...", - "Article 2: New quantum computing breakthrough...", - "Article 3: Latest developments in robotics..." - ] - - # Create data directory if it doesn't exist - data_dir = "data" - os.makedirs(data_dir, exist_ok=True) - - # Save mock articles to files - for i, content in enumerate(articles, 1): - with open(os.path.join(data_dir, f"article{i}.txt"), "w") as f: - f.write(content) - - return articles - -def save_summaries(summaries): - """Save summaries to output file.""" - # Create data directory if it doesn't exist - data_dir = "data" - os.makedirs(data_dir, exist_ok=True) - - with open(os.path.join(data_dir, "summaries.txt"), "w") as f: - for i, summary in enumerate(summaries, 1): - f.write(f"{i}. {summary}\n") \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch/README.md b/cookbook/pocketflow-parallel-batch/README.md new file mode 100644 index 0000000..7082b59 --- /dev/null +++ b/cookbook/pocketflow-parallel-batch/README.md @@ -0,0 +1,41 @@ +# Sequential vs Parallel Processing + +Demonstrates how AsyncParallelBatchNode accelerates processing by 3x over AsyncBatchNode. + +## Features + +- Processes identical tasks with two approaches +- Compares sequential vs parallel execution time +- Shows 3x speed improvement with parallel processing + +## Run It + +```bash +pip install pocketflow +python main.py +``` + +## Output + +``` +=== Running Sequential (AsyncBatchNode) === +[Sequential] Summarizing file1.txt... +[Sequential] Summarizing file2.txt... +[Sequential] Summarizing file3.txt... + +=== Running Parallel (AsyncParallelBatchNode) === +[Parallel] Summarizing file1.txt... +[Parallel] Summarizing file2.txt... +[Parallel] Summarizing file3.txt... + +Sequential took: 3.00 seconds +Parallel took: 1.00 seconds +``` + +## Key Points + +- **Sequential**: Total time = sum of all item times + - Good for: Rate-limited APIs, maintaining order + +- **Parallel**: Total time ≈ longest single item time + - Good for: I/O-bound tasks, independent operations \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch/main.py b/cookbook/pocketflow-parallel-batch/main.py new file mode 100644 index 0000000..41f2031 --- /dev/null +++ b/cookbook/pocketflow-parallel-batch/main.py @@ -0,0 +1,103 @@ +import asyncio +import time + +from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow + +#################################### +# Dummy async function (1s delay) +#################################### +async def dummy_llm_summarize(text): + """Simulates an async LLM call that takes 1 second.""" + await asyncio.sleep(1) + return f"Summarized({len(text)} chars)" + +############################################### +# 1) AsyncBatchNode (sequential) version +############################################### + +class SummariesAsyncNode(AsyncBatchNode): + """ + Processes items sequentially in an async manner. + The next item won't start until the previous item has finished. + """ + + async def prep_async(self, shared): + # Return a list of items to process. + # Each item is (filename, content). + return list(shared["data"].items()) + + async def exec_async(self, item): + filename, content = item + print(f"[Sequential] Summarizing {filename}...") + summary = await dummy_llm_summarize(content) + return (filename, summary) + + async def post_async(self, shared, prep_res, exec_res_list): + # exec_res_list is a list of (filename, summary) + shared["sequential_summaries"] = dict(exec_res_list) + return "done_sequential" + +############################################### +# 2) AsyncParallelBatchNode (concurrent) version +############################################### + +class SummariesAsyncParallelNode(AsyncParallelBatchNode): + """ + Processes items in parallel. Many LLM calls start at once. + """ + + async def prep_async(self, shared): + return list(shared["data"].items()) + + async def exec_async(self, item): + filename, content = item + print(f"[Parallel] Summarizing {filename}...") + summary = await dummy_llm_summarize(content) + return (filename, summary) + + async def post_async(self, shared, prep_res, exec_res_list): + shared["parallel_summaries"] = dict(exec_res_list) + return "done_parallel" + +############################################### +# Demo comparing the two approaches +############################################### + +async def main(): + # We'll use the same data for both flows + shared_data = { + "data": { + "file1.txt": "Hello world 1", + "file2.txt": "Hello world 2", + "file3.txt": "Hello world 3", + } + } + + # 1) Run the sequential version + seq_node = SummariesAsyncNode() + seq_flow = AsyncFlow(start=seq_node) + + print("\n=== Running Sequential (AsyncBatchNode) ===") + t0 = time.time() + await seq_flow.run_async(shared_data) + t1 = time.time() + + # 2) Run the parallel version + par_node = SummariesAsyncParallelNode() + par_flow = AsyncFlow(start=par_node) + + print("\n=== Running Parallel (AsyncParallelBatchNode) ===") + t2 = time.time() + await par_flow.run_async(shared_data) + t3 = time.time() + + # Show times + print("\n--- Results ---") + print(f"Sequential Summaries: {shared_data.get('sequential_summaries')}") + print(f"Parallel Summaries: {shared_data.get('parallel_summaries')}") + + print(f"Sequential took: {t1 - t0:.2f} seconds") + print(f"Parallel took: {t3 - t2:.2f} seconds") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch/requirements.txt b/cookbook/pocketflow-parallel-batch/requirements.txt new file mode 100644 index 0000000..fcb64c3 --- /dev/null +++ b/cookbook/pocketflow-parallel-batch/requirements.txt @@ -0,0 +1 @@ +pocketflow>=0.0.1 \ No newline at end of file