pocketflow/cookbook/pocketflow-parallel-batch-flow/nodes.py

112 lines
3.5 KiB
Python

"""AsyncNode implementations for image processing."""
import os
import asyncio
from PIL import Image, ImageFilter
import numpy as np
from pocketflow import AsyncNode
class NoOp(AsyncNode):
"""Node that does nothing, used as a terminal node."""
async def prep_async(self, shared):
"""No preparation needed."""
return None
async def exec_async(self, prep_res):
"""No execution needed."""
return None
async def post_async(self, shared, prep_res, exec_res):
"""No post-processing needed."""
return None
class LoadImage(AsyncNode):
"""Node that loads an image from file."""
async def prep_async(self, shared):
"""Get image path from parameters."""
image_path = self.params["image_path"]
print(f"\nLoading image: {image_path}")
return image_path
async def exec_async(self, image_path):
"""Load image using PIL."""
# Simulate I/O delay
await asyncio.sleep(0.1)
return Image.open(image_path)
async def post_async(self, shared, prep_res, exec_res):
"""Store image in shared store."""
shared["image"] = exec_res
return "apply_filter"
class ApplyFilter(AsyncNode):
"""Node that applies a filter to an image."""
async def prep_async(self, shared):
"""Get image and filter type."""
image = shared["image"]
filter_type = self.params["filter"]
print(f"Applying {filter_type} filter...")
return image, filter_type
async def exec_async(self, inputs):
"""Apply the specified filter."""
image, filter_type = inputs
# Simulate processing delay
await asyncio.sleep(0.5)
if filter_type == "grayscale":
return image.convert("L")
elif filter_type == "blur":
return image.filter(ImageFilter.BLUR)
elif filter_type == "sepia":
# Convert to array for sepia calculation
img_array = np.array(image)
sepia_matrix = np.array([
[0.393, 0.769, 0.189],
[0.349, 0.686, 0.168],
[0.272, 0.534, 0.131]
])
sepia_array = img_array.dot(sepia_matrix.T)
sepia_array = np.clip(sepia_array, 0, 255).astype(np.uint8)
return Image.fromarray(sepia_array)
else:
raise ValueError(f"Unknown filter: {filter_type}")
async def post_async(self, shared, prep_res, exec_res):
"""Store filtered image."""
shared["filtered_image"] = exec_res
return "save"
class SaveImage(AsyncNode):
"""Node that saves the processed image."""
async def prep_async(self, shared):
"""Prepare output path."""
image = shared["filtered_image"]
base_name = os.path.splitext(os.path.basename(self.params["image_path"]))[0]
filter_type = self.params["filter"]
output_path = f"output/{base_name}_{filter_type}.jpg"
# Create output directory if needed
os.makedirs("output", exist_ok=True)
return image, output_path
async def exec_async(self, inputs):
"""Save the image."""
image, output_path = inputs
# Simulate I/O delay
await asyncio.sleep(0.1)
image.save(output_path)
return output_path
async def post_async(self, shared, prep_res, exec_res):
"""Print success message."""
print(f"Saved: {exec_res}")
return "default"