update parallel tutorial
This commit is contained in:
parent
60d9631204
commit
25b742e29a
1096
.cursorrules
1096
.cursorrules
File diff suppressed because it is too large
Load Diff
|
|
@ -1,18 +1,4 @@
|
||||||
{
|
{
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 0,
|
|
||||||
"metadata": {
|
|
||||||
"colab": {
|
|
||||||
"provenance": []
|
|
||||||
},
|
|
||||||
"kernelspec": {
|
|
||||||
"name": "python3",
|
|
||||||
"display_name": "Python 3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"name": "python"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"cells": [
|
"cells": [
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
|
|
@ -26,8 +12,8 @@
|
||||||
},
|
},
|
||||||
"outputs": [
|
"outputs": [
|
||||||
{
|
{
|
||||||
"output_type": "stream",
|
|
||||||
"name": "stdout",
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
"text": [
|
"text": [
|
||||||
"Collecting pocketflow\n",
|
"Collecting pocketflow\n",
|
||||||
" Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n",
|
" Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n",
|
||||||
|
|
@ -43,121 +29,16 @@
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"source": [
|
"execution_count": 3,
|
||||||
"import asyncio\n",
|
|
||||||
"import time\n",
|
|
||||||
"\n",
|
|
||||||
"from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow\n",
|
|
||||||
"\n",
|
|
||||||
"####################################\n",
|
|
||||||
"# Dummy async function (1s delay)\n",
|
|
||||||
"####################################\n",
|
|
||||||
"async def dummy_llm_summarize(text):\n",
|
|
||||||
" \"\"\"Simulates an async LLM call that takes 1 second.\"\"\"\n",
|
|
||||||
" await asyncio.sleep(1)\n",
|
|
||||||
" return f\"Summarized({len(text)} chars)\"\n",
|
|
||||||
"\n",
|
|
||||||
"###############################################\n",
|
|
||||||
"# 1) AsyncBatchNode (sequential) version\n",
|
|
||||||
"###############################################\n",
|
|
||||||
"\n",
|
|
||||||
"class SummariesAsyncNode(AsyncBatchNode):\n",
|
|
||||||
" \"\"\"\n",
|
|
||||||
" Processes items sequentially in an async manner.\n",
|
|
||||||
" The next item won't start until the previous item has finished.\n",
|
|
||||||
" \"\"\"\n",
|
|
||||||
"\n",
|
|
||||||
" async def prep_async(self, shared):\n",
|
|
||||||
" # Return a list of items to process.\n",
|
|
||||||
" # Each item is (filename, content).\n",
|
|
||||||
" return list(shared[\"data\"].items())\n",
|
|
||||||
"\n",
|
|
||||||
" async def exec_async(self, item):\n",
|
|
||||||
" filename, content = item\n",
|
|
||||||
" print(f\"[Sequential] Summarizing {filename}...\")\n",
|
|
||||||
" summary = await dummy_llm_summarize(content)\n",
|
|
||||||
" return (filename, summary)\n",
|
|
||||||
"\n",
|
|
||||||
" async def post_async(self, shared, prep_res, exec_res_list):\n",
|
|
||||||
" # exec_res_list is a list of (filename, summary)\n",
|
|
||||||
" shared[\"sequential_summaries\"] = dict(exec_res_list)\n",
|
|
||||||
" return \"done_sequential\"\n",
|
|
||||||
"\n",
|
|
||||||
"###############################################\n",
|
|
||||||
"# 2) AsyncParallelBatchNode (concurrent) version\n",
|
|
||||||
"###############################################\n",
|
|
||||||
"\n",
|
|
||||||
"class SummariesAsyncParallelNode(AsyncParallelBatchNode):\n",
|
|
||||||
" \"\"\"\n",
|
|
||||||
" Processes items in parallel. Many LLM calls start at once.\n",
|
|
||||||
" \"\"\"\n",
|
|
||||||
"\n",
|
|
||||||
" async def prep_async(self, shared):\n",
|
|
||||||
" return list(shared[\"data\"].items())\n",
|
|
||||||
"\n",
|
|
||||||
" async def exec_async(self, item):\n",
|
|
||||||
" filename, content = item\n",
|
|
||||||
" print(f\"[Parallel] Summarizing {filename}...\")\n",
|
|
||||||
" summary = await dummy_llm_summarize(content)\n",
|
|
||||||
" return (filename, summary)\n",
|
|
||||||
"\n",
|
|
||||||
" async def post_async(self, shared, prep_res, exec_res_list):\n",
|
|
||||||
" shared[\"parallel_summaries\"] = dict(exec_res_list)\n",
|
|
||||||
" return \"done_parallel\"\n",
|
|
||||||
"\n",
|
|
||||||
"###############################################\n",
|
|
||||||
"# Demo comparing the two approaches\n",
|
|
||||||
"###############################################\n",
|
|
||||||
"\n",
|
|
||||||
"async def main():\n",
|
|
||||||
" # We'll use the same data for both flows\n",
|
|
||||||
" shared_data = {\n",
|
|
||||||
" \"data\": {\n",
|
|
||||||
" \"file1.txt\": \"Hello world 1\",\n",
|
|
||||||
" \"file2.txt\": \"Hello world 2\",\n",
|
|
||||||
" \"file3.txt\": \"Hello world 3\",\n",
|
|
||||||
" }\n",
|
|
||||||
" }\n",
|
|
||||||
"\n",
|
|
||||||
" # 1) Run the sequential version\n",
|
|
||||||
" seq_node = SummariesAsyncNode()\n",
|
|
||||||
" seq_flow = AsyncFlow(start=seq_node)\n",
|
|
||||||
"\n",
|
|
||||||
" print(\"\\n=== Running Sequential (AsyncBatchNode) ===\")\n",
|
|
||||||
" t0 = time.time()\n",
|
|
||||||
" await seq_flow.run_async(shared_data)\n",
|
|
||||||
" t1 = time.time()\n",
|
|
||||||
"\n",
|
|
||||||
" # 2) Run the parallel version\n",
|
|
||||||
" par_node = SummariesAsyncParallelNode()\n",
|
|
||||||
" par_flow = AsyncFlow(start=par_node)\n",
|
|
||||||
"\n",
|
|
||||||
" print(\"\\n=== Running Parallel (AsyncParallelBatchNode) ===\")\n",
|
|
||||||
" t2 = time.time()\n",
|
|
||||||
" await par_flow.run_async(shared_data)\n",
|
|
||||||
" t3 = time.time()\n",
|
|
||||||
"\n",
|
|
||||||
" # Show times\n",
|
|
||||||
" print(\"\\n--- Results ---\")\n",
|
|
||||||
" print(f\"Sequential Summaries: {shared_data.get('sequential_summaries')}\")\n",
|
|
||||||
" print(f\"Parallel Summaries: {shared_data.get('parallel_summaries')}\")\n",
|
|
||||||
"\n",
|
|
||||||
" print(f\"Sequential took: {t1 - t0:.2f} seconds\")\n",
|
|
||||||
" print(f\"Parallel took: {t3 - t2:.2f} seconds\")\n"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"id": "mHZpGv8txy4L"
|
"id": "mHZpGv8txy4L"
|
||||||
},
|
},
|
||||||
"execution_count": 3,
|
"outputs": [],
|
||||||
"outputs": []
|
"source": []
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"source": [
|
"execution_count": 5,
|
||||||
"# if in a py project\n",
|
|
||||||
"# asyncio.run(main())\n",
|
|
||||||
"await main()"
|
|
||||||
],
|
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"colab": {
|
"colab": {
|
||||||
"base_uri": "https://localhost:8080/"
|
"base_uri": "https://localhost:8080/"
|
||||||
|
|
@ -165,11 +46,10 @@
|
||||||
"id": "zfnhW3f-0W6o",
|
"id": "zfnhW3f-0W6o",
|
||||||
"outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f"
|
"outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f"
|
||||||
},
|
},
|
||||||
"execution_count": 5,
|
|
||||||
"outputs": [
|
"outputs": [
|
||||||
{
|
{
|
||||||
"output_type": "stream",
|
|
||||||
"name": "stdout",
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
"text": [
|
"text": [
|
||||||
"\n",
|
"\n",
|
||||||
"=== Running Sequential (AsyncBatchNode) ===\n",
|
"=== Running Sequential (AsyncBatchNode) ===\n",
|
||||||
|
|
@ -189,16 +69,36 @@
|
||||||
"Parallel took: 1.00 seconds\n"
|
"Parallel took: 1.00 seconds\n"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"# if in a notebook\n",
|
||||||
|
"await main()\n",
|
||||||
|
"\n",
|
||||||
|
"asyncio.run(main())"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"source": [],
|
"execution_count": null,
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"id": "ystwa74D0Z_k"
|
"id": "ystwa74D0Z_k"
|
||||||
},
|
},
|
||||||
"execution_count": null,
|
"outputs": [],
|
||||||
"outputs": []
|
"source": []
|
||||||
}
|
}
|
||||||
]
|
],
|
||||||
}
|
"metadata": {
|
||||||
|
"colab": {
|
||||||
|
"provenance": []
|
||||||
|
},
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": "Python 3",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"name": "python"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 0
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,105 +0,0 @@
|
||||||
# PocketFlow Parallel Batch Node Example
|
|
||||||
|
|
||||||
This example demonstrates parallel processing using AsyncParallelBatchNode to summarize multiple news articles concurrently. It shows how to:
|
|
||||||
1. Process multiple items in parallel
|
|
||||||
2. Handle I/O-bound tasks efficiently
|
|
||||||
3. Manage rate limits with throttling
|
|
||||||
|
|
||||||
## What this Example Does
|
|
||||||
|
|
||||||
When you run the example:
|
|
||||||
1. It loads multiple news articles from a data directory
|
|
||||||
2. Processes them in parallel using AsyncParallelBatchNode
|
|
||||||
3. For each article:
|
|
||||||
- Extracts key information
|
|
||||||
- Generates a summary using an LLM
|
|
||||||
- Saves the results
|
|
||||||
4. Combines all summaries into a final report
|
|
||||||
|
|
||||||
## How it Works
|
|
||||||
|
|
||||||
The example uses AsyncParallelBatchNode to process articles in parallel:
|
|
||||||
|
|
||||||
```python
|
|
||||||
class ParallelSummarizer(AsyncParallelBatchNode):
|
|
||||||
async def prep_async(self, shared):
|
|
||||||
# Return list of articles to process
|
|
||||||
return shared["articles"]
|
|
||||||
|
|
||||||
async def exec_async(self, article):
|
|
||||||
# Process single article (called in parallel)
|
|
||||||
summary = await call_llm_async(f"Summarize: {article}")
|
|
||||||
return summary
|
|
||||||
|
|
||||||
async def post_async(self, shared, prep_res, summaries):
|
|
||||||
# Combine all summaries
|
|
||||||
shared["summaries"] = summaries
|
|
||||||
return "default"
|
|
||||||
```
|
|
||||||
|
|
||||||
Key features demonstrated:
|
|
||||||
- Parallel execution of `exec_async`
|
|
||||||
- Rate limiting with semaphores
|
|
||||||
- Error handling for failed requests
|
|
||||||
- Progress tracking for parallel tasks
|
|
||||||
|
|
||||||
## Project Structure
|
|
||||||
```
|
|
||||||
pocketflow-parallel-batch-node/
|
|
||||||
├── README.md
|
|
||||||
├── requirements.txt
|
|
||||||
├── data/
|
|
||||||
│ ├── article1.txt
|
|
||||||
│ ├── article2.txt
|
|
||||||
│ └── article3.txt
|
|
||||||
├── main.py
|
|
||||||
├── flow.py
|
|
||||||
├── nodes.py
|
|
||||||
└── utils.py
|
|
||||||
```
|
|
||||||
|
|
||||||
## Running the Example
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Install dependencies
|
|
||||||
pip install -r requirements.txt
|
|
||||||
|
|
||||||
# Run the example
|
|
||||||
python main.py
|
|
||||||
```
|
|
||||||
|
|
||||||
## Sample Output
|
|
||||||
```
|
|
||||||
Loading articles...
|
|
||||||
Found 3 articles to process
|
|
||||||
|
|
||||||
Processing in parallel...
|
|
||||||
[1/3] Processing article1.txt...
|
|
||||||
[2/3] Processing article2.txt...
|
|
||||||
[3/3] Processing article3.txt...
|
|
||||||
|
|
||||||
Summaries generated:
|
|
||||||
1. First article summary...
|
|
||||||
2. Second article summary...
|
|
||||||
3. Third article summary...
|
|
||||||
|
|
||||||
Final report saved to: summaries.txt
|
|
||||||
```
|
|
||||||
|
|
||||||
## Key Concepts
|
|
||||||
|
|
||||||
1. **Parallel Processing**
|
|
||||||
- Using AsyncParallelBatchNode for concurrent execution
|
|
||||||
- Managing parallel tasks efficiently
|
|
||||||
|
|
||||||
2. **Rate Limiting**
|
|
||||||
- Using semaphores to control concurrent requests
|
|
||||||
- Avoiding API rate limits
|
|
||||||
|
|
||||||
3. **Error Handling**
|
|
||||||
- Graceful handling of failed requests
|
|
||||||
- Retrying failed tasks
|
|
||||||
|
|
||||||
4. **Progress Tracking**
|
|
||||||
- Monitoring parallel task progress
|
|
||||||
- Providing user feedback
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
Article 1: AI advances in 2024...
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
Article 2: New quantum computing breakthrough...
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
Article 3: Latest developments in robotics...
|
|
||||||
|
|
@ -1,3 +0,0 @@
|
||||||
1. Summary of: Article 1: AI advances in 2024...
|
|
||||||
2. Summary of: Article 2: New quantum computi...
|
|
||||||
3. Summary of: Article 3: Latest developments...
|
|
||||||
|
|
@ -1,24 +0,0 @@
|
||||||
"""AsyncFlow implementation for parallel article processing."""
|
|
||||||
|
|
||||||
from pocketflow import AsyncFlow, Node
|
|
||||||
from nodes import LoadArticles, ParallelSummarizer
|
|
||||||
|
|
||||||
class NoOp(Node):
|
|
||||||
"""Node that does nothing, used to properly end the flow."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def create_flow():
|
|
||||||
"""Create and connect nodes into a flow."""
|
|
||||||
|
|
||||||
# Create nodes
|
|
||||||
loader = LoadArticles()
|
|
||||||
summarizer = ParallelSummarizer()
|
|
||||||
end = NoOp()
|
|
||||||
|
|
||||||
# Connect nodes
|
|
||||||
loader - "process" >> summarizer
|
|
||||||
summarizer - "default" >> end # Properly end the flow
|
|
||||||
|
|
||||||
# Create flow starting with loader
|
|
||||||
flow = AsyncFlow(start=loader)
|
|
||||||
return flow
|
|
||||||
|
|
@ -1,19 +0,0 @@
|
||||||
import asyncio
|
|
||||||
from flow import create_flow
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
"""Run the parallel processing flow."""
|
|
||||||
# Create flow
|
|
||||||
flow = create_flow()
|
|
||||||
|
|
||||||
# Create shared store
|
|
||||||
shared = {}
|
|
||||||
|
|
||||||
# Run flow
|
|
||||||
print("\nParallel Article Summarizer")
|
|
||||||
print("-------------------------")
|
|
||||||
await flow.run_async(shared)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# Run the async main function
|
|
||||||
asyncio.run(main())
|
|
||||||
|
|
@ -1,48 +0,0 @@
|
||||||
"""AsyncParallelBatchNode implementation for article summarization."""
|
|
||||||
|
|
||||||
from pocketflow import AsyncParallelBatchNode, AsyncNode
|
|
||||||
from utils import call_llm_async, load_articles, save_summaries
|
|
||||||
|
|
||||||
class LoadArticles(AsyncNode):
|
|
||||||
"""Node that loads articles to process."""
|
|
||||||
|
|
||||||
async def prep_async(self, shared):
|
|
||||||
"""Load articles from data directory."""
|
|
||||||
print("\nLoading articles...")
|
|
||||||
articles = await load_articles()
|
|
||||||
return articles
|
|
||||||
|
|
||||||
async def exec_async(self, articles):
|
|
||||||
"""No processing needed."""
|
|
||||||
return articles
|
|
||||||
|
|
||||||
async def post_async(self, shared, prep_res, exec_res):
|
|
||||||
"""Store articles in shared store."""
|
|
||||||
shared["articles"] = exec_res
|
|
||||||
print(f"Found {len(exec_res)} articles to process")
|
|
||||||
return "process"
|
|
||||||
|
|
||||||
class ParallelSummarizer(AsyncParallelBatchNode):
|
|
||||||
"""Node that summarizes articles in parallel."""
|
|
||||||
|
|
||||||
async def prep_async(self, shared):
|
|
||||||
"""Get articles from shared store."""
|
|
||||||
print("\nProcessing in parallel...")
|
|
||||||
return shared["articles"]
|
|
||||||
|
|
||||||
async def exec_async(self, article):
|
|
||||||
"""Summarize a single article (called in parallel)."""
|
|
||||||
summary = await call_llm_async(article)
|
|
||||||
return summary
|
|
||||||
|
|
||||||
async def post_async(self, shared, prep_res, summaries):
|
|
||||||
"""Store summaries and save to file."""
|
|
||||||
shared["summaries"] = summaries
|
|
||||||
|
|
||||||
print("\nSummaries generated:")
|
|
||||||
for i, summary in enumerate(summaries, 1):
|
|
||||||
print(f"{i}. {summary}")
|
|
||||||
|
|
||||||
save_summaries(summaries)
|
|
||||||
print("\nFinal report saved to: summaries.txt")
|
|
||||||
return "default"
|
|
||||||
|
|
@ -1,4 +0,0 @@
|
||||||
pocketflow
|
|
||||||
aiohttp>=3.8.0 # For async HTTP requests
|
|
||||||
openai>=1.0.0 # For async LLM calls
|
|
||||||
tqdm>=4.65.0 # For progress bars
|
|
||||||
|
|
@ -1,53 +0,0 @@
|
||||||
"""Utility functions for parallel processing."""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import asyncio
|
|
||||||
import aiohttp
|
|
||||||
from openai import AsyncOpenAI
|
|
||||||
from tqdm import tqdm
|
|
||||||
|
|
||||||
# Semaphore to limit concurrent API calls
|
|
||||||
MAX_CONCURRENT_CALLS = 3
|
|
||||||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_CALLS)
|
|
||||||
|
|
||||||
async def call_llm_async(prompt):
|
|
||||||
"""Make async LLM call with rate limiting."""
|
|
||||||
async with semaphore: # Limit concurrent calls
|
|
||||||
print(f"\nProcessing: {prompt[:50]}...")
|
|
||||||
|
|
||||||
# Simulate API call with delay
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
|
|
||||||
# Mock LLM response (in real app, would call OpenAI)
|
|
||||||
summary = f"Summary of: {prompt[:30]}..."
|
|
||||||
return summary
|
|
||||||
|
|
||||||
async def load_articles():
|
|
||||||
"""Load articles from data directory."""
|
|
||||||
# For demo, generate mock articles
|
|
||||||
articles = [
|
|
||||||
"Article 1: AI advances in 2024...",
|
|
||||||
"Article 2: New quantum computing breakthrough...",
|
|
||||||
"Article 3: Latest developments in robotics..."
|
|
||||||
]
|
|
||||||
|
|
||||||
# Create data directory if it doesn't exist
|
|
||||||
data_dir = "data"
|
|
||||||
os.makedirs(data_dir, exist_ok=True)
|
|
||||||
|
|
||||||
# Save mock articles to files
|
|
||||||
for i, content in enumerate(articles, 1):
|
|
||||||
with open(os.path.join(data_dir, f"article{i}.txt"), "w") as f:
|
|
||||||
f.write(content)
|
|
||||||
|
|
||||||
return articles
|
|
||||||
|
|
||||||
def save_summaries(summaries):
|
|
||||||
"""Save summaries to output file."""
|
|
||||||
# Create data directory if it doesn't exist
|
|
||||||
data_dir = "data"
|
|
||||||
os.makedirs(data_dir, exist_ok=True)
|
|
||||||
|
|
||||||
with open(os.path.join(data_dir, "summaries.txt"), "w") as f:
|
|
||||||
for i, summary in enumerate(summaries, 1):
|
|
||||||
f.write(f"{i}. {summary}\n")
|
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
# Sequential vs Parallel Processing
|
||||||
|
|
||||||
|
Demonstrates how AsyncParallelBatchNode accelerates processing by 3x over AsyncBatchNode.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- Processes identical tasks with two approaches
|
||||||
|
- Compares sequential vs parallel execution time
|
||||||
|
- Shows 3x speed improvement with parallel processing
|
||||||
|
|
||||||
|
## Run It
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip install pocketflow
|
||||||
|
python main.py
|
||||||
|
```
|
||||||
|
|
||||||
|
## Output
|
||||||
|
|
||||||
|
```
|
||||||
|
=== Running Sequential (AsyncBatchNode) ===
|
||||||
|
[Sequential] Summarizing file1.txt...
|
||||||
|
[Sequential] Summarizing file2.txt...
|
||||||
|
[Sequential] Summarizing file3.txt...
|
||||||
|
|
||||||
|
=== Running Parallel (AsyncParallelBatchNode) ===
|
||||||
|
[Parallel] Summarizing file1.txt...
|
||||||
|
[Parallel] Summarizing file2.txt...
|
||||||
|
[Parallel] Summarizing file3.txt...
|
||||||
|
|
||||||
|
Sequential took: 3.00 seconds
|
||||||
|
Parallel took: 1.00 seconds
|
||||||
|
```
|
||||||
|
|
||||||
|
## Key Points
|
||||||
|
|
||||||
|
- **Sequential**: Total time = sum of all item times
|
||||||
|
- Good for: Rate-limited APIs, maintaining order
|
||||||
|
|
||||||
|
- **Parallel**: Total time ≈ longest single item time
|
||||||
|
- Good for: I/O-bound tasks, independent operations
|
||||||
|
|
@ -0,0 +1,103 @@
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
|
||||||
|
from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow
|
||||||
|
|
||||||
|
####################################
|
||||||
|
# Dummy async function (1s delay)
|
||||||
|
####################################
|
||||||
|
async def dummy_llm_summarize(text):
|
||||||
|
"""Simulates an async LLM call that takes 1 second."""
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
return f"Summarized({len(text)} chars)"
|
||||||
|
|
||||||
|
###############################################
|
||||||
|
# 1) AsyncBatchNode (sequential) version
|
||||||
|
###############################################
|
||||||
|
|
||||||
|
class SummariesAsyncNode(AsyncBatchNode):
|
||||||
|
"""
|
||||||
|
Processes items sequentially in an async manner.
|
||||||
|
The next item won't start until the previous item has finished.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def prep_async(self, shared):
|
||||||
|
# Return a list of items to process.
|
||||||
|
# Each item is (filename, content).
|
||||||
|
return list(shared["data"].items())
|
||||||
|
|
||||||
|
async def exec_async(self, item):
|
||||||
|
filename, content = item
|
||||||
|
print(f"[Sequential] Summarizing {filename}...")
|
||||||
|
summary = await dummy_llm_summarize(content)
|
||||||
|
return (filename, summary)
|
||||||
|
|
||||||
|
async def post_async(self, shared, prep_res, exec_res_list):
|
||||||
|
# exec_res_list is a list of (filename, summary)
|
||||||
|
shared["sequential_summaries"] = dict(exec_res_list)
|
||||||
|
return "done_sequential"
|
||||||
|
|
||||||
|
###############################################
|
||||||
|
# 2) AsyncParallelBatchNode (concurrent) version
|
||||||
|
###############################################
|
||||||
|
|
||||||
|
class SummariesAsyncParallelNode(AsyncParallelBatchNode):
|
||||||
|
"""
|
||||||
|
Processes items in parallel. Many LLM calls start at once.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def prep_async(self, shared):
|
||||||
|
return list(shared["data"].items())
|
||||||
|
|
||||||
|
async def exec_async(self, item):
|
||||||
|
filename, content = item
|
||||||
|
print(f"[Parallel] Summarizing {filename}...")
|
||||||
|
summary = await dummy_llm_summarize(content)
|
||||||
|
return (filename, summary)
|
||||||
|
|
||||||
|
async def post_async(self, shared, prep_res, exec_res_list):
|
||||||
|
shared["parallel_summaries"] = dict(exec_res_list)
|
||||||
|
return "done_parallel"
|
||||||
|
|
||||||
|
###############################################
|
||||||
|
# Demo comparing the two approaches
|
||||||
|
###############################################
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# We'll use the same data for both flows
|
||||||
|
shared_data = {
|
||||||
|
"data": {
|
||||||
|
"file1.txt": "Hello world 1",
|
||||||
|
"file2.txt": "Hello world 2",
|
||||||
|
"file3.txt": "Hello world 3",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# 1) Run the sequential version
|
||||||
|
seq_node = SummariesAsyncNode()
|
||||||
|
seq_flow = AsyncFlow(start=seq_node)
|
||||||
|
|
||||||
|
print("\n=== Running Sequential (AsyncBatchNode) ===")
|
||||||
|
t0 = time.time()
|
||||||
|
await seq_flow.run_async(shared_data)
|
||||||
|
t1 = time.time()
|
||||||
|
|
||||||
|
# 2) Run the parallel version
|
||||||
|
par_node = SummariesAsyncParallelNode()
|
||||||
|
par_flow = AsyncFlow(start=par_node)
|
||||||
|
|
||||||
|
print("\n=== Running Parallel (AsyncParallelBatchNode) ===")
|
||||||
|
t2 = time.time()
|
||||||
|
await par_flow.run_async(shared_data)
|
||||||
|
t3 = time.time()
|
||||||
|
|
||||||
|
# Show times
|
||||||
|
print("\n--- Results ---")
|
||||||
|
print(f"Sequential Summaries: {shared_data.get('sequential_summaries')}")
|
||||||
|
print(f"Parallel Summaries: {shared_data.get('parallel_summaries')}")
|
||||||
|
|
||||||
|
print(f"Sequential took: {t1 - t0:.2f} seconds")
|
||||||
|
print(f"Parallel took: {t3 - t2:.2f} seconds")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
pocketflow>=0.0.1
|
||||||
Loading…
Reference in New Issue