diff --git a/README.md b/README.md index 29d884f..a779fcd 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ From there, it's easy to implement popular design patterns like ([Multi-](https: | [Multi-Agent](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-multi-agent) | ★☆☆
*Beginner* | A Taboo word game for asynchronous communication between two agents | | [Supervisor](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-supervisor) | ★☆☆
*Beginner* | Research agent is getting unreliable... Let's build a supervision process| | [Parallel](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-parallel-batch) | ★☆☆
*Beginner* | A parallel execution demo that shows 3x speedup | +| [Parallel Flow](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-parallel-batch-flow) | ★☆☆
*Beginner* | A parallel image processing demo showing 8x speedup with multiple filters | | [Thinking](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-thinking) | ★☆☆
*Beginner* | Solve complex reasoning problems through Chain-of-Thought | | [Memory](https://github.com/The-Pocket/PocketFlow/tree/main/cookbook/pocketflow-chat-memory) | ★☆☆
*Beginner* | A chat bot with short-term and long-term memory | diff --git a/cookbook/pocketflow-parallel-batch-flow/README.md b/cookbook/pocketflow-parallel-batch-flow/README.md index 0122705..9f20fec 100644 --- a/cookbook/pocketflow-parallel-batch-flow/README.md +++ b/cookbook/pocketflow-parallel-batch-flow/README.md @@ -1,75 +1,61 @@ -# Parallel Image Processor (AsyncParallelBatchFlow Example) +# Parallel Image Processor -This example demonstrates how to use `AsyncParallelBatchFlow` to process multiple images with multiple filters in parallel. +Demonstrates how AsyncParallelBatchFlow processes multiple images with multiple filters >8x faster than sequential processing. -## How it Works +## Features -1. **Image Generation**: Creates sample images (gradient, checkerboard, circles) -2. **Filter Application**: Applies different filters (grayscale, blur, sepia) to each image -3. **Parallel Processing**: Processes all image-filter combinations concurrently + ```mermaid + graph TD + subgraph AsyncParallelBatchFlow[Image Processing Flow] + subgraph AsyncFlow[Per Image-Filter Flow] + A[Load Image] --> B[Apply Filter] + B --> C[Save Image] + end + end + ``` + +- Processes images with multiple filters in parallel +- Applies three different filters (grayscale, blur, sepia) +- Shows significant speed improvement over sequential processing +- Manages system resources with semaphores -### Flow Structure +## Run It -```mermaid -graph TD - subgraph AsyncParallelBatchFlow[Image Processing Flow] - subgraph AsyncFlow[Per Image-Filter Flow] - A[Load Image] --> B[Apply Filter] - B --> C[Save Image] - end - end -``` - -### Key Components - -1. **LoadImage (AsyncNode)** - - Loads an image from file - - Uses PIL for image handling - -2. **ApplyFilter (AsyncNode)** - - Applies the specified filter - - Supports grayscale, blur, and sepia - -3. **SaveImage (AsyncNode)** - - Saves the processed image - - Creates output directory if needed - -4. **ImageBatchFlow (AsyncParallelBatchFlow)** - - Manages parallel processing of all image-filter combinations - - Returns parameters for each sub-flow - -## Running the Example - -1. Install dependencies: ```bash pip install -r requirements.txt -``` - -2. Run the example: -```bash python main.py ``` -## Sample Output +## Output -The example will: -1. Create 3 sample images: `cat.jpg`, `dog.jpg`, `bird.jpg` -2. Apply 3 filters to each image -3. Save results in `output/` directory (9 total images) +```=== Processing Images in Parallel === +Parallel Image Processor +------------------------------ +Found 3 images: +- images/bird.jpg +- images/cat.jpg +- images/dog.jpg -Example output structure: -``` -output/ -├── cat_grayscale.jpg -├── cat_blur.jpg -├── cat_sepia.jpg -├── dog_grayscale.jpg +Running sequential batch flow... +Processing 3 images with 3 filters... +Total combinations: 9 +Loading image: images/bird.jpg +Applying grayscale filter... +Saved: output/bird_grayscale.jpg ...etc + +Timing Results: +Sequential batch processing: 13.76 seconds +Parallel batch processing: 1.71 seconds +Speedup: 8.04x + +Processing complete! Check the output/ directory for results. ``` -## Key Concepts +## Key Points -1. **Parallel Flow Execution**: Each image-filter combination runs as a separate flow in parallel -2. **Parameter Management**: The batch flow generates parameters for each sub-flow -3. **Resource Management**: Uses semaphores to limit concurrent image processing -4. **Error Handling**: Gracefully handles failures in individual flows \ No newline at end of file +- **Sequential**: Total time = sum of all item times + - Good for: Rate-limited APIs, maintaining order + +- **Parallel**: Total time ≈ longest single item time + - Good for: I/O-bound tasks, independent operations diff --git a/cookbook/pocketflow-parallel-batch-flow/flow.py b/cookbook/pocketflow-parallel-batch-flow/flow.py index ea4320c..7813e10 100644 --- a/cookbook/pocketflow-parallel-batch-flow/flow.py +++ b/cookbook/pocketflow-parallel-batch-flow/flow.py @@ -1,6 +1,6 @@ """Flow definitions for parallel image processing.""" -from pocketflow import AsyncFlow, AsyncParallelBatchFlow +from pocketflow import AsyncFlow, AsyncParallelBatchFlow, AsyncBatchFlow from nodes import LoadImage, ApplyFilter, SaveImage, NoOp def create_base_flow(): @@ -17,10 +17,10 @@ def create_base_flow(): save - "default" >> noop # Create flow - return AsyncFlow(start=load) + return load -class ImageBatchFlow(AsyncParallelBatchFlow): - """Flow that processes multiple images with multiple filters in parallel.""" +class ImageBatchFlow(AsyncBatchFlow): + """Flow that processes multiple images with multiple filters in batch.""" async def prep_async(self, shared): """Generate parameters for each image-filter combination.""" @@ -37,14 +37,36 @@ class ImageBatchFlow(AsyncParallelBatchFlow): "filter": filter_type }) - print(f"\nProcessing {len(images)} images with {len(filters)} filters...") + print(f"Processing {len(images)} images with {len(filters)} filters...") print(f"Total combinations: {len(params)}") return params -def create_flow(): +class ImageParallelBatchFlow(AsyncParallelBatchFlow): + """Flow that processes multiple images with multiple filters in parallel.""" + + async def prep_async(self, shared): + """Generate parameters for each image-filter combination.""" + # Get list of images and filters + images = shared.get("images", []) + filters = ["grayscale", "blur", "sepia"] + + # Create parameter combinations + params = [] + for image_path in images: + for filter_type in filters: + params.append({ + "image_path": image_path, + "filter": filter_type + }) + + print(f"Processing {len(images)} images with {len(filters)} filters...") + print(f"Total combinations: {len(params)}") + return params + +def create_flows(): """Create the complete parallel processing flow.""" # Create base flow for single image processing base_flow = create_base_flow() # Wrap in parallel batch flow - return ImageBatchFlow(start=base_flow) \ No newline at end of file + return ImageBatchFlow(start=base_flow), ImageParallelBatchFlow(start=base_flow) \ No newline at end of file diff --git a/cookbook/pocketflow-parallel-batch-flow/main.py b/cookbook/pocketflow-parallel-batch-flow/main.py index e5c89b3..1ec2860 100644 --- a/cookbook/pocketflow-parallel-batch-flow/main.py +++ b/cookbook/pocketflow-parallel-batch-flow/main.py @@ -1,8 +1,7 @@ import os import asyncio -import numpy as np -from PIL import Image -from flow import create_flow +import time +from flow import create_flows def get_image_paths(): """Get paths of existing images in the images directory.""" @@ -19,7 +18,7 @@ def get_image_paths(): if not image_paths: raise ValueError(f"No images found in '{images_dir}' directory!") - print(f"\nFound {len(image_paths)} images:") + print(f"Found {len(image_paths)} images:") for path in image_paths: print(f"- {path}") @@ -27,7 +26,7 @@ def get_image_paths(): async def main(): """Run the parallel image processing example.""" - print("\nParallel Image Processor") + print("Parallel Image Processor") print("-" * 30) # Get existing image paths @@ -36,10 +35,26 @@ async def main(): # Create shared store with image paths shared = {"images": image_paths} - # Create and run flow - flow = create_flow() + # Create both flows + batch_flow, parallel_batch_flow = create_flows() - await flow.run_async(shared) + # Run and time batch flow + start_time = time.time() + print("\nRunning sequential batch flow...") + await batch_flow.run_async(shared) + batch_time = time.time() - start_time + + # Run and time parallel batch flow + start_time = time.time() + print("\nRunning parallel batch flow...") + await parallel_batch_flow.run_async(shared) + parallel_time = time.time() - start_time + + # Print timing results + print("\nTiming Results:") + print(f"Sequential batch processing: {batch_time:.2f} seconds") + print(f"Parallel batch processing: {parallel_time:.2f} seconds") + print(f"Speedup: {batch_time/parallel_time:.2f}x") print("\nProcessing complete! Check the output/ directory for results.") diff --git a/cookbook/pocketflow-parallel-batch-flow/nodes.py b/cookbook/pocketflow-parallel-batch-flow/nodes.py index b71526e..7f05951 100644 --- a/cookbook/pocketflow-parallel-batch-flow/nodes.py +++ b/cookbook/pocketflow-parallel-batch-flow/nodes.py @@ -1,39 +1,22 @@ """AsyncNode implementations for image processing.""" - import os import asyncio from PIL import Image, ImageFilter import numpy as np from pocketflow import AsyncNode -class NoOp(AsyncNode): - """Node that does nothing, used as a terminal node.""" - - async def prep_async(self, shared): - """No preparation needed.""" - return None - - async def exec_async(self, prep_res): - """No execution needed.""" - return None - - async def post_async(self, shared, prep_res, exec_res): - """No post-processing needed.""" - return None - class LoadImage(AsyncNode): """Node that loads an image from file.""" - async def prep_async(self, shared): """Get image path from parameters.""" image_path = self.params["image_path"] - print(f"\nLoading image: {image_path}") + print(f"Loading image: {image_path}") return image_path async def exec_async(self, image_path): """Load image using PIL.""" # Simulate I/O delay - await asyncio.sleep(0.1) + await asyncio.sleep(0.5) return Image.open(image_path) async def post_async(self, shared, prep_res, exec_res): @@ -43,7 +26,6 @@ class LoadImage(AsyncNode): class ApplyFilter(AsyncNode): """Node that applies a filter to an image.""" - async def prep_async(self, shared): """Get image and filter type.""" image = shared["image"] @@ -83,7 +65,6 @@ class ApplyFilter(AsyncNode): class SaveImage(AsyncNode): """Node that saves the processed image.""" - async def prep_async(self, shared): """Prepare output path.""" image = shared["filtered_image"] @@ -101,7 +82,7 @@ class SaveImage(AsyncNode): image, output_path = inputs # Simulate I/O delay - await asyncio.sleep(0.1) + await asyncio.sleep(0.5) image.save(output_path) return output_path diff --git a/docs/index.md b/docs/index.md index deb1138..a56a48c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -29,7 +29,7 @@ We model the LLM workflow as a **Graph + Shared Store**: - [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
- +
## Design Pattern @@ -44,7 +44,7 @@ From there, it’s easy to implement popular design patterns: - [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
- +
## Utility Function