update parallel flow

This commit is contained in:
zachary62 2025-03-25 14:24:18 -04:00
parent 16766d486c
commit 32e8902c8b
6 changed files with 103 additions and 98 deletions

View File

@ -73,6 +73,7 @@ From there, it's easy to implement popular design patterns like ([Multi-](https:
| [Multi-Agent](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-multi-agent) | ★☆☆ <br> *Beginner* | A Taboo word game for asynchronous communication between two agents | | [Multi-Agent](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-multi-agent) | ★☆☆ <br> *Beginner* | A Taboo word game for asynchronous communication between two agents |
| [Supervisor](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-supervisor) | ★☆☆ <br> *Beginner* | Research agent is getting unreliable... Let's build a supervision process| | [Supervisor](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-supervisor) | ★☆☆ <br> *Beginner* | Research agent is getting unreliable... Let's build a supervision process|
| [Parallel](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-parallel-batch) | ★☆☆ <br> *Beginner* | A parallel execution demo that shows 3x speedup | | [Parallel](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-parallel-batch) | ★☆☆ <br> *Beginner* | A parallel execution demo that shows 3x speedup |
| [Parallel Flow](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-parallel-batch-flow) | ★☆☆ <br> *Beginner* | A parallel image processing demo showing 8x speedup with multiple filters |
| [Thinking](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | ★☆☆ <br> *Beginner* | Solve complex reasoning problems through Chain-of-Thought | | [Thinking](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | ★☆☆ <br> *Beginner* | Solve complex reasoning problems through Chain-of-Thought |
| [Memory](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory) | ★☆☆ <br> *Beginner* | A chat bot with short-term and long-term memory | | [Memory](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory) | ★☆☆ <br> *Beginner* | A chat bot with short-term and long-term memory |

View File

@ -1,75 +1,61 @@
# Parallel Image Processor (AsyncParallelBatchFlow Example) # Parallel Image Processor
This example demonstrates how to use `AsyncParallelBatchFlow` to process multiple images with multiple filters in parallel. Demonstrates how AsyncParallelBatchFlow processes multiple images with multiple filters >8x faster than sequential processing.
## How it Works ## Features
1. **Image Generation**: Creates sample images (gradient, checkerboard, circles) ```mermaid
2. **Filter Application**: Applies different filters (grayscale, blur, sepia) to each image graph TD
3. **Parallel Processing**: Processes all image-filter combinations concurrently subgraph AsyncParallelBatchFlow[Image Processing Flow]
subgraph AsyncFlow[Per Image-Filter Flow]
A[Load Image] --> B[Apply Filter]
B --> C[Save Image]
end
end
```
- Processes images with multiple filters in parallel
- Applies three different filters (grayscale, blur, sepia)
- Shows significant speed improvement over sequential processing
- Manages system resources with semaphores
### Flow Structure ## Run It
```mermaid
graph TD
subgraph AsyncParallelBatchFlow[Image Processing Flow]
subgraph AsyncFlow[Per Image-Filter Flow]
A[Load Image] --> B[Apply Filter]
B --> C[Save Image]
end
end
```
### Key Components
1. **LoadImage (AsyncNode)**
- Loads an image from file
- Uses PIL for image handling
2. **ApplyFilter (AsyncNode)**
- Applies the specified filter
- Supports grayscale, blur, and sepia
3. **SaveImage (AsyncNode)**
- Saves the processed image
- Creates output directory if needed
4. **ImageBatchFlow (AsyncParallelBatchFlow)**
- Manages parallel processing of all image-filter combinations
- Returns parameters for each sub-flow
## Running the Example
1. Install dependencies:
```bash ```bash
pip install -r requirements.txt pip install -r requirements.txt
```
2. Run the example:
```bash
python main.py python main.py
``` ```
## Sample Output ## Output
The example will: ```=== Processing Images in Parallel ===
1. Create 3 sample images: `cat.jpg`, `dog.jpg`, `bird.jpg` Parallel Image Processor
2. Apply 3 filters to each image ------------------------------
3. Save results in `output/` directory (9 total images) Found 3 images:
- images/bird.jpg
- images/cat.jpg
- images/dog.jpg
Example output structure: Running sequential batch flow...
``` Processing 3 images with 3 filters...
output/ Total combinations: 9
├── cat_grayscale.jpg Loading image: images/bird.jpg
├── cat_blur.jpg Applying grayscale filter...
├── cat_sepia.jpg Saved: output/bird_grayscale.jpg
├── dog_grayscale.jpg
...etc ...etc
Timing Results:
Sequential batch processing: 13.76 seconds
Parallel batch processing: 1.71 seconds
Speedup: 8.04x
Processing complete! Check the output/ directory for results.
``` ```
## Key Concepts ## Key Points
1. **Parallel Flow Execution**: Each image-filter combination runs as a separate flow in parallel - **Sequential**: Total time = sum of all item times
2. **Parameter Management**: The batch flow generates parameters for each sub-flow - Good for: Rate-limited APIs, maintaining order
3. **Resource Management**: Uses semaphores to limit concurrent image processing
4. **Error Handling**: Gracefully handles failures in individual flows - **Parallel**: Total time ≈ longest single item time
- Good for: I/O-bound tasks, independent operations

View File

@ -1,6 +1,6 @@
"""Flow definitions for parallel image processing.""" """Flow definitions for parallel image processing."""
from pocketflow import AsyncFlow, AsyncParallelBatchFlow from pocketflow import AsyncFlow, AsyncParallelBatchFlow, AsyncBatchFlow
from nodes import LoadImage, ApplyFilter, SaveImage, NoOp from nodes import LoadImage, ApplyFilter, SaveImage, NoOp
def create_base_flow(): def create_base_flow():
@ -17,10 +17,10 @@ def create_base_flow():
save - "default" >> noop save - "default" >> noop
# Create flow # Create flow
return AsyncFlow(start=load) return load
class ImageBatchFlow(AsyncParallelBatchFlow): class ImageBatchFlow(AsyncBatchFlow):
"""Flow that processes multiple images with multiple filters in parallel.""" """Flow that processes multiple images with multiple filters in batch."""
async def prep_async(self, shared): async def prep_async(self, shared):
"""Generate parameters for each image-filter combination.""" """Generate parameters for each image-filter combination."""
@ -37,14 +37,36 @@ class ImageBatchFlow(AsyncParallelBatchFlow):
"filter": filter_type "filter": filter_type
}) })
print(f"\nProcessing {len(images)} images with {len(filters)} filters...") print(f"Processing {len(images)} images with {len(filters)} filters...")
print(f"Total combinations: {len(params)}") print(f"Total combinations: {len(params)}")
return params return params
def create_flow(): class ImageParallelBatchFlow(AsyncParallelBatchFlow):
"""Flow that processes multiple images with multiple filters in parallel."""
async def prep_async(self, shared):
"""Generate parameters for each image-filter combination."""
# Get list of images and filters
images = shared.get("images", [])
filters = ["grayscale", "blur", "sepia"]
# Create parameter combinations
params = []
for image_path in images:
for filter_type in filters:
params.append({
"image_path": image_path,
"filter": filter_type
})
print(f"Processing {len(images)} images with {len(filters)} filters...")
print(f"Total combinations: {len(params)}")
return params
def create_flows():
"""Create the complete parallel processing flow.""" """Create the complete parallel processing flow."""
# Create base flow for single image processing # Create base flow for single image processing
base_flow = create_base_flow() base_flow = create_base_flow()
# Wrap in parallel batch flow # Wrap in parallel batch flow
return ImageBatchFlow(start=base_flow) return ImageBatchFlow(start=base_flow), ImageParallelBatchFlow(start=base_flow)

View File

@ -1,8 +1,7 @@
import os import os
import asyncio import asyncio
import numpy as np import time
from PIL import Image from flow import create_flows
from flow import create_flow
def get_image_paths(): def get_image_paths():
"""Get paths of existing images in the images directory.""" """Get paths of existing images in the images directory."""
@ -19,7 +18,7 @@ def get_image_paths():
if not image_paths: if not image_paths:
raise ValueError(f"No images found in '{images_dir}' directory!") raise ValueError(f"No images found in '{images_dir}' directory!")
print(f"\nFound {len(image_paths)} images:") print(f"Found {len(image_paths)} images:")
for path in image_paths: for path in image_paths:
print(f"- {path}") print(f"- {path}")
@ -27,7 +26,7 @@ def get_image_paths():
async def main(): async def main():
"""Run the parallel image processing example.""" """Run the parallel image processing example."""
print("\nParallel Image Processor") print("Parallel Image Processor")
print("-" * 30) print("-" * 30)
# Get existing image paths # Get existing image paths
@ -36,10 +35,26 @@ async def main():
# Create shared store with image paths # Create shared store with image paths
shared = {"images": image_paths} shared = {"images": image_paths}
# Create and run flow # Create both flows
flow = create_flow() batch_flow, parallel_batch_flow = create_flows()
await flow.run_async(shared) # Run and time batch flow
start_time = time.time()
print("\nRunning sequential batch flow...")
await batch_flow.run_async(shared)
batch_time = time.time() - start_time
# Run and time parallel batch flow
start_time = time.time()
print("\nRunning parallel batch flow...")
await parallel_batch_flow.run_async(shared)
parallel_time = time.time() - start_time
# Print timing results
print("\nTiming Results:")
print(f"Sequential batch processing: {batch_time:.2f} seconds")
print(f"Parallel batch processing: {parallel_time:.2f} seconds")
print(f"Speedup: {batch_time/parallel_time:.2f}x")
print("\nProcessing complete! Check the output/ directory for results.") print("\nProcessing complete! Check the output/ directory for results.")

View File

@ -1,39 +1,22 @@
"""AsyncNode implementations for image processing.""" """AsyncNode implementations for image processing."""
import os import os
import asyncio import asyncio
from PIL import Image, ImageFilter from PIL import Image, ImageFilter
import numpy as np import numpy as np
from pocketflow import AsyncNode from pocketflow import AsyncNode
class NoOp(AsyncNode):
"""Node that does nothing, used as a terminal node."""
async def prep_async(self, shared):
"""No preparation needed."""
return None
async def exec_async(self, prep_res):
"""No execution needed."""
return None
async def post_async(self, shared, prep_res, exec_res):
"""No post-processing needed."""
return None
class LoadImage(AsyncNode): class LoadImage(AsyncNode):
"""Node that loads an image from file.""" """Node that loads an image from file."""
async def prep_async(self, shared): async def prep_async(self, shared):
"""Get image path from parameters.""" """Get image path from parameters."""
image_path = self.params["image_path"] image_path = self.params["image_path"]
print(f"\nLoading image: {image_path}") print(f"Loading image: {image_path}")
return image_path return image_path
async def exec_async(self, image_path): async def exec_async(self, image_path):
"""Load image using PIL.""" """Load image using PIL."""
# Simulate I/O delay # Simulate I/O delay
await asyncio.sleep(0.1) await asyncio.sleep(0.5)
return Image.open(image_path) return Image.open(image_path)
async def post_async(self, shared, prep_res, exec_res): async def post_async(self, shared, prep_res, exec_res):
@ -43,7 +26,6 @@ class LoadImage(AsyncNode):
class ApplyFilter(AsyncNode): class ApplyFilter(AsyncNode):
"""Node that applies a filter to an image.""" """Node that applies a filter to an image."""
async def prep_async(self, shared): async def prep_async(self, shared):
"""Get image and filter type.""" """Get image and filter type."""
image = shared["image"] image = shared["image"]
@ -83,7 +65,6 @@ class ApplyFilter(AsyncNode):
class SaveImage(AsyncNode): class SaveImage(AsyncNode):
"""Node that saves the processed image.""" """Node that saves the processed image."""
async def prep_async(self, shared): async def prep_async(self, shared):
"""Prepare output path.""" """Prepare output path."""
image = shared["filtered_image"] image = shared["filtered_image"]
@ -101,7 +82,7 @@ class SaveImage(AsyncNode):
image, output_path = inputs image, output_path = inputs
# Simulate I/O delay # Simulate I/O delay
await asyncio.sleep(0.1) await asyncio.sleep(0.5)
image.save(output_path) image.save(output_path)
return output_path return output_path

View File

@ -29,7 +29,7 @@ We model the LLM workflow as a **Graph + Shared Store**:
- [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks. - [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
<div align="center"> <div align="center">
<img src="https://github.com/the-pocket/.github/raw/main/assets/abstraction.png" width="900"/> <img src="https://github.com/the-pocket/.github/raw/main/assets/abstraction.png" width="700"/>
</div> </div>
## Design Pattern ## Design Pattern
@ -44,7 +44,7 @@ From there, its easy to implement popular design patterns:
- [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents. - [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
<div align="center"> <div align="center">
<img src="https://github.com/the-pocket/.github/raw/main/assets/design.png" width="900"/> <img src="https://github.com/the-pocket/.github/raw/main/assets/design.png" width="700"/>
</div> </div>
## Utility Function ## Utility Function