feat: add new examples from pocketflow-academy

This commit is contained in:
Alan ALves 2025-03-19 10:31:04 -03:00
parent 84720ceebd
commit 557a14f695
129 changed files with 13455 additions and 0 deletions

View File

@ -0,0 +1,90 @@
# PocketFlow Async Basic Example
This example demonstrates async operations using a simple Recipe Finder that:
1. Fetches recipes from an API (async HTTP)
2. Processes them with an LLM (async LLM)
3. Waits for user confirmation (async input)
## What this Example Does
When you run the example:
1. You enter an ingredient (e.g., "chicken")
2. It searches for recipes (async API call)
3. It suggests a recipe (async LLM call)
4. You approve or reject the suggestion
5. If rejected, it tries again with a different recipe
## How it Works
1. **FetchRecipes (AsyncNode)**
```python
async def prep_async(self, shared):
ingredient = input("Enter ingredient: ")
return ingredient
async def exec_async(self, ingredient):
# Async API call
recipes = await fetch_recipes(ingredient)
return recipes
```
2. **SuggestRecipe (AsyncNode)**
```python
async def exec_async(self, recipes):
# Async LLM call
suggestion = await call_llm_async(
f"Choose best recipe from: {recipes}"
)
return suggestion
```
3. **GetApproval (AsyncNode)**
```python
async def post_async(self, shared, prep_res, suggestion):
# Async user input
answer = await get_user_input(
f"Accept {suggestion}? (y/n): "
)
return "accept" if answer == "y" else "retry"
```
## Running the Example
```bash
pip install -r requirements.txt
python main.py
```
## Sample Interaction
```
Enter ingredient: chicken
Fetching recipes...
Found 3 recipes.
Suggesting best recipe...
How about: Grilled Chicken with Herbs
Accept this recipe? (y/n): n
Suggesting another recipe...
How about: Chicken Stir Fry
Accept this recipe? (y/n): y
Great choice! Here's your recipe...
```
## Key Concepts
1. **Async Operations**: Using `async/await` for:
- API calls (non-blocking I/O)
- LLM calls (potentially slow)
- User input (waiting for response)
2. **AsyncNode Methods**:
- `prep_async`: Setup and data gathering
- `exec_async`: Main async processing
- `post_async`: Post-processing and decisions
3. **Flow Control**:
- Actions ("accept"/"retry") control flow
- Retry loop for rejected suggestions

View File

@ -0,0 +1,27 @@
"""AsyncFlow implementation for recipe finder."""
from pocketflow import AsyncFlow, Node
from nodes import FetchRecipes, SuggestRecipe, GetApproval
class NoOp(Node):
"""Node that does nothing, used to properly end the flow."""
pass
def create_flow():
"""Create and connect nodes into a flow."""
# Create nodes
fetch = FetchRecipes()
suggest = SuggestRecipe()
approve = GetApproval()
end = NoOp()
# Connect nodes
fetch - "suggest" >> suggest
suggest - "approve" >> approve
approve - "retry" >> suggest # Loop back for another suggestion
approve - "accept" >> end # Properly end the flow
# Create flow starting with fetch
flow = AsyncFlow(start=fetch)
return flow

View File

@ -0,0 +1,20 @@
import asyncio
from flow import create_flow
async def main():
"""Run the recipe finder flow."""
# Create flow
flow = create_flow()
# Create shared store
shared = {}
# Run flow
print("\nWelcome to Recipe Finder!")
print("------------------------")
await flow.run_async(shared)
print("\nThanks for using Recipe Finder!")
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())

View File

@ -0,0 +1,63 @@
from pocketflow import AsyncNode
from utils import fetch_recipes, call_llm_async, get_user_input
class FetchRecipes(AsyncNode):
"""AsyncNode that fetches recipes."""
async def prep_async(self, shared):
"""Get ingredient from user."""
ingredient = await get_user_input("Enter ingredient: ")
return ingredient
async def exec_async(self, ingredient):
"""Fetch recipes asynchronously."""
recipes = await fetch_recipes(ingredient)
return recipes
async def post_async(self, shared, prep_res, recipes):
"""Store recipes and continue."""
shared["recipes"] = recipes
shared["ingredient"] = prep_res
return "suggest"
class SuggestRecipe(AsyncNode):
"""AsyncNode that suggests a recipe using LLM."""
async def prep_async(self, shared):
"""Get recipes from shared store."""
return shared["recipes"]
async def exec_async(self, recipes):
"""Get suggestion from LLM."""
suggestion = await call_llm_async(
f"Choose best recipe from: {', '.join(recipes)}"
)
return suggestion
async def post_async(self, shared, prep_res, suggestion):
"""Store suggestion and continue."""
shared["suggestion"] = suggestion
return "approve"
class GetApproval(AsyncNode):
"""AsyncNode that gets user approval."""
async def prep_async(self, shared):
"""Get current suggestion."""
return shared["suggestion"]
async def exec_async(self, suggestion):
"""Ask for user approval."""
answer = await get_user_input(f"\nAccept this recipe? (y/n): ")
return answer
async def post_async(self, shared, prep_res, answer):
"""Handle user's decision."""
if answer == "y":
print("\nGreat choice! Here's your recipe...")
print(f"Recipe: {shared['suggestion']}")
print(f"Ingredient: {shared['ingredient']}")
return "accept"
else:
print("\nLet's try another recipe...")
return "retry"

View File

@ -0,0 +1,3 @@
pocketflow
aiohttp>=3.8.0 # For async HTTP requests
openai>=1.0.0 # For async LLM calls

View File

@ -0,0 +1,45 @@
import asyncio
import aiohttp
from openai import AsyncOpenAI
async def fetch_recipes(ingredient):
"""Fetch recipes from an API asynchronously."""
print(f"Fetching recipes for {ingredient}...")
# Simulate API call with delay
await asyncio.sleep(1)
# Mock recipes (in real app, would fetch from API)
recipes = [
f"{ingredient} Stir Fry",
f"Grilled {ingredient} with Herbs",
f"Baked {ingredient} with Vegetables"
]
print(f"Found {len(recipes)} recipes.")
return recipes
async def call_llm_async(prompt):
"""Make async LLM call."""
print("\nSuggesting best recipe...")
# Simulate LLM call with delay
await asyncio.sleep(1)
# Mock LLM response (in real app, would call OpenAI)
recipes = prompt.split(": ")[1].split(", ")
suggestion = recipes[1] # Always suggest second recipe
print(f"How about: {suggestion}")
return suggestion
async def get_user_input(prompt):
"""Get user input asynchronously."""
# Create event loop to handle async input
loop = asyncio.get_event_loop()
# Get input in a non-blocking way
answer = await loop.run_in_executor(None, input, prompt)
return answer.lower()

View File

@ -0,0 +1,72 @@
# PocketFlow BatchFlow Example
This example demonstrates the BatchFlow concept in PocketFlow by implementing an image processor that applies different filters to multiple images.
## What this Example Demonstrates
- How to use BatchFlow to run a Flow multiple times with different parameters
- Key concepts of BatchFlow:
1. Creating a base Flow for single-item processing
2. Using BatchFlow to process multiple items with different parameters
3. Managing parameters across multiple Flow executions
## Project Structure
```
pocketflow-batch-flow/
├── README.md
├── requirements.txt
├── images/
│ ├── cat.jpg # Sample image 1
│ ├── dog.jpg # Sample image 2
│ └── bird.jpg # Sample image 3
├── main.py # Entry point
├── flow.py # Flow and BatchFlow definitions
└── nodes.py # Node implementations for image processing
```
## How it Works
The example processes multiple images with different filters:
1. **Base Flow**: Processes a single image
- Load image
- Apply filter (grayscale, blur, or sepia)
- Save processed image
2. **BatchFlow**: Processes multiple image-filter combinations
- Takes a list of parameters (image + filter combinations)
- Runs the base Flow for each parameter set
- Organizes output in a structured way
## Installation
```bash
pip install -r requirements.txt
```
## Usage
```bash
python main.py
```
## Sample Output
```
Processing images with filters...
Processing cat.jpg with grayscale filter...
Processing cat.jpg with blur filter...
Processing dog.jpg with sepia filter...
...
All images processed successfully!
Check the 'output' directory for results.
```
## Key Concepts Illustrated
1. **Parameter Management**: Shows how BatchFlow manages different parameter sets
2. **Flow Reuse**: Demonstrates running the same Flow multiple times
3. **Batch Processing**: Shows how to process multiple items efficiently
4. **Real-world Application**: Provides a practical example of batch processing

View File

@ -0,0 +1,48 @@
from pocketflow import Flow, BatchFlow
from nodes import LoadImage, ApplyFilter, SaveImage
def create_base_flow():
"""Create the base Flow for processing a single image."""
# Create nodes
load = LoadImage()
filter_node = ApplyFilter()
save = SaveImage()
# Connect nodes
load - "apply_filter" >> filter_node
filter_node - "save" >> save
# Create and return flow
return Flow(start=load)
class ImageBatchFlow(BatchFlow):
"""BatchFlow for processing multiple images with different filters."""
def prep(self, shared):
"""Generate parameters for each image-filter combination."""
# List of images to process
images = ["cat.jpg", "dog.jpg", "bird.jpg"]
# List of filters to apply
filters = ["grayscale", "blur", "sepia"]
# Generate all combinations
params = []
for img in images:
for f in filters:
params.append({
"input": img,
"filter": f
})
return params
def create_flow():
"""Create the complete batch processing flow."""
# Create base flow for single image processing
base_flow = create_base_flow()
# Wrap in BatchFlow for multiple images
batch_flow = ImageBatchFlow(start=base_flow)
return batch_flow

Binary file not shown.

After

Width:  |  Height:  |  Size: 249 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 214 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 147 KiB

View File

@ -0,0 +1,17 @@
import os
from PIL import Image
import numpy as np
from flow import create_flow
def main():
# Create and run flow
print("Processing images with filters...")
flow = create_flow()
flow.run({})
print("\nAll images processed successfully!")
print("Check the 'output' directory for results.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,76 @@
"""Node implementations for image processing."""
import os
from PIL import Image, ImageEnhance, ImageFilter
from pocketflow import Node
class LoadImage(Node):
"""Node that loads an image file."""
def prep(self, shared):
"""Get image path from parameters."""
return os.path.join("images", self.params["input"])
def exec(self, image_path):
"""Load the image using PIL."""
return Image.open(image_path)
def post(self, shared, prep_res, exec_res):
"""Store the image in shared store."""
shared["image"] = exec_res
return "apply_filter"
class ApplyFilter(Node):
"""Node that applies a filter to an image."""
def prep(self, shared):
"""Get image and filter type."""
return shared["image"], self.params["filter"]
def exec(self, inputs):
"""Apply the specified filter."""
image, filter_type = inputs
if filter_type == "grayscale":
return image.convert("L")
elif filter_type == "blur":
return image.filter(ImageFilter.BLUR)
elif filter_type == "sepia":
# Sepia implementation
enhancer = ImageEnhance.Color(image)
grayscale = enhancer.enhance(0.3)
colorize = ImageEnhance.Brightness(grayscale)
return colorize.enhance(1.2)
else:
raise ValueError(f"Unknown filter: {filter_type}")
def post(self, shared, prep_res, exec_res):
"""Store the filtered image."""
shared["filtered_image"] = exec_res
return "save"
class SaveImage(Node):
"""Node that saves the processed image."""
def prep(self, shared):
"""Get filtered image and prepare output path."""
# Create output directory if it doesn't exist
os.makedirs("output", exist_ok=True)
# Generate output filename
input_name = os.path.splitext(self.params["input"])[0]
filter_name = self.params["filter"]
output_path = os.path.join("output", f"{input_name}_{filter_name}.jpg")
return shared["filtered_image"], output_path
def exec(self, inputs):
"""Save the image to file."""
image, output_path = inputs
image.save(output_path, "JPEG")
return output_path
def post(self, shared, prep_res, exec_res):
"""Print success message."""
print(f"Saved filtered image to: {exec_res}")
return "default"

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 68 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

View File

@ -0,0 +1,2 @@
pocketflow
Pillow>=10.0.0

View File

@ -0,0 +1,63 @@
# PocketFlow BatchNode Example
This example demonstrates the BatchNode concept in PocketFlow by implementing a CSV processor that handles large files by processing them in chunks.
## What this Example Demonstrates
- How to use BatchNode to process large inputs in chunks
- The three key methods of BatchNode:
1. `prep`: Splits input into chunks
2. `exec`: Processes each chunk independently
3. `post`: Combines results from all chunks
## Project Structure
```
pocketflow-batch-node/
├── README.md
├── requirements.txt
├── data/
│ └── sales.csv # Sample large CSV file
├── main.py # Entry point
├── flow.py # Flow definition
└── nodes.py # BatchNode implementation
```
## How it Works
The example processes a large CSV file containing sales data:
1. **Chunking (prep)**: The CSV file is read and split into chunks of N rows
2. **Processing (exec)**: Each chunk is processed to calculate:
- Total sales
- Average sale value
- Number of transactions
3. **Combining (post)**: Results from all chunks are aggregated into final statistics
## Installation
```bash
pip install -r requirements.txt
```
## Usage
```bash
python main.py
```
## Sample Output
```
Processing sales.csv in chunks...
Final Statistics:
- Total Sales: $1,234,567.89
- Average Sale: $123.45
- Total Transactions: 10,000
```
## Key Concepts Illustrated
1. **Chunk-based Processing**: Shows how BatchNode handles large inputs by breaking them into manageable pieces
2. **Independent Processing**: Demonstrates how each chunk is processed separately
3. **Result Aggregation**: Shows how individual results are combined into a final output

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,30 @@
from pocketflow import Flow, Node
from nodes import CSVProcessor
class ShowStats(Node):
"""Node to display the final statistics."""
def prep(self, shared):
"""Get statistics from shared store."""
return shared["statistics"]
def post(self, shared, prep_res, exec_res):
"""Display the statistics."""
stats = prep_res
print("\nFinal Statistics:")
print(f"- Total Sales: ${stats['total_sales']:,.2f}")
print(f"- Average Sale: ${stats['average_sale']:,.2f}")
print(f"- Total Transactions: {stats['total_transactions']:,}\n")
return "end"
def create_flow():
"""Create and return the processing flow."""
# Create nodes
processor = CSVProcessor(chunk_size=1000)
show_stats = ShowStats()
# Connect nodes
processor - "show_stats" >> show_stats
# Create and return flow
return Flow(start=processor)

View File

@ -0,0 +1,36 @@
import os
from flow import create_flow
def main():
"""Run the batch processing example."""
# Create data directory if it doesn't exist
os.makedirs("data", exist_ok=True)
# Create sample CSV if it doesn't exist
if not os.path.exists("data/sales.csv"):
print("Creating sample sales.csv...")
import pandas as pd
import numpy as np
# Generate sample data
np.random.seed(42)
n_rows = 10000
df = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=n_rows),
"amount": np.random.normal(100, 30, n_rows).round(2),
"product": np.random.choice(["A", "B", "C"], n_rows)
})
df.to_csv("data/sales.csv", index=False)
# Initialize shared store
shared = {
"input_file": "data/sales.csv"
}
# Create and run flow
print(f"Processing sales.csv in chunks...")
flow = create_flow()
flow.run(shared)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,61 @@
import pandas as pd
from pocketflow import BatchNode
class CSVProcessor(BatchNode):
"""BatchNode that processes a large CSV file in chunks."""
def __init__(self, chunk_size=1000):
"""Initialize with chunk size."""
super().__init__()
self.chunk_size = chunk_size
def prep(self, shared):
"""Split CSV file into chunks.
Returns an iterator of DataFrames, each containing chunk_size rows.
"""
# Read CSV in chunks
chunks = pd.read_csv(
shared["input_file"],
chunksize=self.chunk_size
)
return chunks
def exec(self, chunk):
"""Process a single chunk of the CSV.
Args:
chunk: pandas DataFrame containing chunk_size rows
Returns:
dict: Statistics for this chunk
"""
return {
"total_sales": chunk["amount"].sum(),
"num_transactions": len(chunk),
"total_amount": chunk["amount"].sum()
}
def post(self, shared, prep_res, exec_res_list):
"""Combine results from all chunks.
Args:
prep_res: Original chunks iterator
exec_res_list: List of results from each chunk
Returns:
str: Action to take next
"""
# Combine statistics from all chunks
total_sales = sum(res["total_sales"] for res in exec_res_list)
total_transactions = sum(res["num_transactions"] for res in exec_res_list)
total_amount = sum(res["total_amount"] for res in exec_res_list)
# Calculate final statistics
shared["statistics"] = {
"total_sales": total_sales,
"average_sale": total_amount / total_transactions,
"total_transactions": total_transactions
}
return "show_stats"

View File

@ -0,0 +1,2 @@
pocketflow
pandas>=2.0.0

View File

@ -0,0 +1,52 @@
# PocketFlow Communication Example
This example demonstrates the [Communication](https://the-pocket.github.io/PocketFlow/communication.html) concept in PocketFlow, specifically focusing on the Shared Store pattern.
## Overview
The example implements a simple word counter that shows how nodes can communicate using a shared store. It demonstrates:
- How to initialize and structure a shared store
- How nodes can read from and write to the shared store
- How to maintain state across multiple node executions
- Best practices for shared store usage
## Project Structure
```
pocketflow-communication/
├── README.md
├── requirements.txt
├── main.py
├── flow.py
└── nodes.py
```
## Installation
```bash
pip install -r requirements.txt
```
## Usage
```bash
python main.py
```
Enter text when prompted. The program will:
1. Count words in the text
2. Store statistics in the shared store
3. Display running statistics (total texts, total words, average)
Enter 'q' to quit.
## How it Works
The example uses three nodes:
1. `TextInput`: Reads user input and initializes the shared store
2. `WordCounter`: Counts words and updates statistics in the shared store
3. `ShowStats`: Displays statistics from the shared store
This demonstrates how nodes can share and maintain state using the shared store pattern.

View File

@ -0,0 +1,21 @@
"""Flow configuration for the communication example."""
from pocketflow import Flow
from nodes import TextInput, WordCounter, ShowStats, EndNode
def create_flow():
"""Create and configure the flow with all nodes."""
# Create nodes
text_input = TextInput()
word_counter = WordCounter()
show_stats = ShowStats()
end_node = EndNode()
# Configure transitions
text_input - "count" >> word_counter
word_counter - "show" >> show_stats
show_stats - "continue" >> text_input
text_input - "exit" >> end_node
# Create and return flow
return Flow(start=text_input)

View File

@ -0,0 +1,10 @@
from flow import create_flow
def main():
"""Run the communication example."""
flow = create_flow()
shared = {}
flow.run(shared)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,64 @@
"""Node implementations for the communication example."""
from pocketflow import Node
class EndNode(Node):
"""Node that handles flow termination."""
pass
class TextInput(Node):
"""Node that reads text input and initializes the shared store."""
def prep(self, shared):
"""Get user input and ensure shared store is initialized."""
return input("Enter text (or 'q' to quit): ")
def post(self, shared, prep_res, exec_res):
"""Store text and initialize/update statistics."""
if prep_res == 'q':
return "exit"
# Store the text
shared["text"] = prep_res
# Initialize statistics if they don't exist
if "stats" not in shared:
shared["stats"] = {
"total_texts": 0,
"total_words": 0
}
shared["stats"]["total_texts"] += 1
return "count"
class WordCounter(Node):
"""Node that counts words in the text."""
def prep(self, shared):
"""Get text from shared store."""
return shared["text"]
def exec(self, text):
"""Count words in the text."""
return len(text.split())
def post(self, shared, prep_res, exec_res):
"""Update word count statistics."""
shared["stats"]["total_words"] += exec_res
return "show"
class ShowStats(Node):
"""Node that displays statistics from the shared store."""
def prep(self, shared):
"""Get statistics from shared store."""
return shared["stats"]
def post(self, shared, prep_res, exec_res):
"""Display statistics and continue the flow."""
stats = prep_res
print(f"\nStatistics:")
print(f"- Texts processed: {stats['total_texts']}")
print(f"- Total words: {stats['total_words']}")
print(f"- Average words per text: {stats['total_words'] / stats['total_texts']:.1f}\n")
return "continue"

View File

@ -0,0 +1 @@
pocketflow==0.1.0

View File

@ -0,0 +1,70 @@
# PocketFlow Text Converter
A practical example demonstrating how to use PocketFlow to create an interactive text converter. This example showcases important concepts like data flow between nodes, user choice-based branching, and state management using the shared store.
## Features
- Convert text to UPPERCASE
- Convert text to lowercase
- Reverse text
- Remove extra spaces
- Interactive command-line interface
- Continuous flow with option to process multiple texts
## Project Structure
```
.
├── flow.py # Nodes and flow implementation
├── main.py # Application entry point
└── README.md # Documentation
```
## Implementation Details
1. **TextInput Node**:
- `prep()`: Gets text input from user
- `post()`: Shows options menu and returns action based on choice
2. **TextTransform Node**:
- `prep()`: Gets text and choice from shared store
- `exec()`: Applies the chosen transformation
- `post()`: Shows result and asks if continue
3. **Flow Structure**:
- Input → Transform → (loop back to Input or exit)
- Demonstrates branching based on actions ("transform", "input", "exit")
## How to Run
1. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run the example:
```bash
python main.py
```
## What You'll Learn
This example demonstrates several important PocketFlow concepts:
- **Node Architecture**: How to structure logic using prep/exec/post pattern
- **Flow Control**: How to use actions to control flow between nodes
- **Shared Store**: How to share data between nodes
- **Interactivity**: How to create interactive flows with user input
- **Branching**: How to implement different paths based on choices
## Additional Resources
- [PocketFlow Documentation](https://the-pocket.github.io/PocketFlow/)
- [Flow Guide](https://the-pocket.github.io/PocketFlow/flow.html)
- [Node Guide](https://the-pocket.github.io/PocketFlow/node.html)

View File

@ -0,0 +1,67 @@
from pocketflow import Node, Flow
class TextInput(Node):
def prep(self, shared):
"""Get text input from user."""
if "text" not in shared:
text = input("\nEnter text to convert: ")
shared["text"] = text
return shared["text"]
def post(self, shared, prep_res, exec_res):
print("\nChoose transformation:")
print("1. Convert to UPPERCASE")
print("2. Convert to lowercase")
print("3. Reverse text")
print("4. Remove extra spaces")
print("5. Exit")
choice = input("\nYour choice (1-5): ")
if choice == "5":
return "exit"
shared["choice"] = choice
return "transform"
class TextTransform(Node):
def prep(self, shared):
return shared["text"], shared["choice"]
def exec(self, inputs):
text, choice = inputs
if choice == "1":
return text.upper()
elif choice == "2":
return text.lower()
elif choice == "3":
return text[::-1]
elif choice == "4":
return " ".join(text.split())
else:
return "Invalid option!"
def post(self, shared, prep_res, exec_res):
print("\nResult:", exec_res)
if input("\nConvert another text? (y/n): ").lower() == 'y':
shared.pop("text", None) # Remove previous text
return "input"
return "exit"
class EndNode(Node):
pass
# Create nodes
text_input = TextInput()
text_transform = TextTransform()
end_node = EndNode()
# Connect nodes
text_input - "transform" >> text_transform
text_transform - "input" >> text_input
text_transform - "exit" >> end_node
# Create flow
flow = Flow(start=text_input)

View File

@ -0,0 +1,16 @@
from flow import flow
def main():
print("\nWelcome to Text Converter!")
print("=========================")
# Initialize shared store
shared = {}
# Run the flow
flow.run(shared)
print("\nThank you for using Text Converter!")
if __name__ == "__main__":
main()

View File

@ -0,0 +1 @@
pocketflow>=0.1.0

View File

@ -0,0 +1,42 @@
# PocketFlow Hello World
Your first PocketFlow application! This simple example demonstrates how to create a basic PocketFlow app from scratch.
## Project Structure
```
.
├── docs/ # Documentation files
├── utils/ # Utility functions
├── flow.py # PocketFlow implementation
├── main.py # Main application entry point
└── README.md # Project documentation
```
## Setup
1. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Run the example:
```bash
python main.py
```
## What This Example Demonstrates
- How to create your first PocketFlow application
- Basic PocketFlow concepts and usage
- Simple example of PocketFlow's capabilities
## Additional Resources
- [PocketFlow Documentation](https://the-pocket.github.io/PocketFlow/)

View File

@ -0,0 +1,46 @@
# Your Project Title
## Project Requirements
A description of the project requirements.
## Utility Functions
1. **Call LLM** (`utils/call_llm.py`)
## Flow Design
1. **First Node**
2. **Second Node**
3. **Third Node**
### Flow Diagram
```mermaid
flowchart TD
firstNode[First Node] --> secondNode[Second Node]
secondNode --> thirdNode[Third Node]
```
## Data Structure
The shared memory structure will be organized as follows:
```python
shared = {
"key": "value"
}
```
## Node Designs
### 1. First Node
- **Purpose**: What the node does
- **Design**: Regular Node (no Batch/Async)
- **Data Access**:
- Read: "key" from shared store
- Write: "key" to shared store
### 2. Second Node
...
### 3. Third Node

View File

@ -0,0 +1,19 @@
from pocketflow import Node, Flow
from utils.call_llm import call_llm
# An example node and flow
# Please replace this with your own node and flow
class AnswerNode(Node):
def prep(self, shared):
# Read question from shared
return shared["question"]
def exec(self, question):
return call_llm(question)
def post(self, shared, prep_res, exec_res):
# Store the answer in shared
shared["answer"] = exec_res
answer_node = AnswerNode()
qa_flow = Flow(start=answer_node)

View File

@ -0,0 +1,16 @@
from flow import qa_flow
# Example main function
# Please replace this with your own main function
def main():
shared = {
"question": "In one sentence, what's the end of universe?",
"answer": None
}
qa_flow.run(shared)
print("Question:", shared["question"])
print("Answer:", shared["answer"])
if __name__ == "__main__":
main()

View File

@ -0,0 +1,13 @@
from openai import OpenAI
def call_llm(prompt):
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
if __name__ == "__main__":
prompt = "What is the meaning of life?"
print(call_llm(prompt))

View File

@ -0,0 +1,64 @@
# PocketFlow Nested BatchFlow Example
This example demonstrates Nested BatchFlow using a simple school grades calculator.
## What this Example Does
Calculates average grades for:
1. Each student in a class
2. Each class in the school
## Structure
```
school/
├── class_a/
│ ├── student1.txt (grades: 7.5, 8.0, 9.0)
│ └── student2.txt (grades: 8.5, 7.0, 9.5)
└── class_b/
├── student3.txt (grades: 6.5, 8.5, 7.0)
└── student4.txt (grades: 9.0, 9.5, 8.0)
```
## How it Works
1. **Outer BatchFlow (SchoolBatchFlow)**
- Processes each class folder
- Returns parameters like: `{"class": "class_a"}`
2. **Inner BatchFlow (ClassBatchFlow)**
- Processes each student file in a class
- Returns parameters like: `{"student": "student1.txt"}`
3. **Base Flow**
- Loads student grades
- Calculates average
- Saves result
## Running the Example
```bash
pip install -r requirements.txt
python main.py
```
## Expected Output
```
Processing class_a...
- student1: Average = 8.2
- student2: Average = 8.3
Class A Average: 8.25
Processing class_b...
- student3: Average = 7.3
- student4: Average = 8.8
Class B Average: 8.05
School Average: 8.15
```
## Key Concepts
1. **Nested BatchFlow**: One BatchFlow inside another
2. **Parameter Inheritance**: Inner flow gets parameters from outer flow
3. **Hierarchical Processing**: Process data in a tree-like structure

View File

@ -0,0 +1,73 @@
import os
from pocketflow import Flow, BatchFlow
from nodes import LoadGrades, CalculateAverage
def create_base_flow():
"""Create base flow for processing one student's grades."""
# Create nodes
load = LoadGrades()
calc = CalculateAverage()
# Connect nodes
load - "calculate" >> calc
# Create and return flow
return Flow(start=load)
class ClassBatchFlow(BatchFlow):
"""BatchFlow for processing all students in a class."""
def prep(self, shared):
"""Generate parameters for each student in the class."""
# Get class folder from parameters
class_folder = self.params["class"]
# List all student files
class_path = os.path.join("school", class_folder)
students = [f for f in os.listdir(class_path) if f.endswith(".txt")]
# Return parameters for each student
return [{"student": student} for student in students]
def post(self, shared, prep_res, exec_res):
"""Calculate and print class average."""
class_name = self.params["class"]
class_results = shared["results"][class_name]
class_average = sum(class_results.values()) / len(class_results)
print(f"Class {class_name.split('_')[1].upper()} Average: {class_average:.2f}\n")
return "default"
class SchoolBatchFlow(BatchFlow):
"""BatchFlow for processing all classes in the school."""
def prep(self, shared):
"""Generate parameters for each class."""
# List all class folders
classes = [d for d in os.listdir("school") if os.path.isdir(os.path.join("school", d))]
# Return parameters for each class
return [{"class": class_name} for class_name in classes]
def post(self, shared, prep_res, exec_res):
"""Calculate and print school average."""
all_grades = []
for class_results in shared["results"].values():
all_grades.extend(class_results.values())
school_average = sum(all_grades) / len(all_grades)
print(f"School Average: {school_average:.2f}")
return "default"
def create_flow():
"""Create the complete nested batch processing flow."""
# Create base flow for single student
base_flow = create_base_flow()
# Wrap in ClassBatchFlow for processing all students in a class
class_flow = ClassBatchFlow(start=base_flow)
# Wrap in SchoolBatchFlow for processing all classes
school_flow = SchoolBatchFlow(start=class_flow)
return school_flow

View File

@ -0,0 +1,42 @@
import os
from flow import create_flow
def create_sample_data():
"""Create sample grade files."""
# Create directory structure
os.makedirs("school/class_a", exist_ok=True)
os.makedirs("school/class_b", exist_ok=True)
# Sample grades
data = {
"class_a": {
"student1.txt": [7.5, 8.0, 9.0],
"student2.txt": [8.5, 7.0, 9.5]
},
"class_b": {
"student3.txt": [6.5, 8.5, 7.0],
"student4.txt": [9.0, 9.5, 8.0]
}
}
# Create files
for class_name, students in data.items():
for student, grades in students.items():
file_path = os.path.join("school", class_name, student)
with open(file_path, 'w') as f:
for grade in grades:
f.write(f"{grade}\n")
def main():
"""Run the nested batch example."""
# Create sample data
create_sample_data()
print("Processing school grades...\n")
# Create and run flow
flow = create_flow()
flow.run({})
if __name__ == "__main__":
main()

View File

@ -0,0 +1,52 @@
import os
from pocketflow import Node
class LoadGrades(Node):
"""Node that loads grades from a student's file."""
def prep(self, shared):
"""Get file path from parameters."""
class_name = self.params["class"]
student_file = self.params["student"]
return os.path.join("school", class_name, student_file)
def exec(self, file_path):
"""Load and parse grades from file."""
with open(file_path, 'r') as f:
# Each line is a grade
grades = [float(line.strip()) for line in f]
return grades
def post(self, shared, prep_res, grades):
"""Store grades in shared store."""
shared["grades"] = grades
return "calculate"
class CalculateAverage(Node):
"""Node that calculates average grade."""
def prep(self, shared):
"""Get grades from shared store."""
return shared["grades"]
def exec(self, grades):
"""Calculate average."""
return sum(grades) / len(grades)
def post(self, shared, prep_res, average):
"""Store and print result."""
# Store in results dictionary
if "results" not in shared:
shared["results"] = {}
class_name = self.params["class"]
student = self.params["student"]
if class_name not in shared["results"]:
shared["results"][class_name] = {}
shared["results"][class_name][student] = average
# Print individual result
print(f"- {student}: Average = {average:.1f}")
return "default"

View File

@ -0,0 +1 @@
pocketflow

View File

@ -0,0 +1,3 @@
7.5
8.0
9.0

View File

@ -0,0 +1,3 @@
8.5
7.0
9.5

View File

@ -0,0 +1,3 @@
6.5
8.5
7.0

View File

@ -0,0 +1,3 @@
9.0
9.5
8.0

View File

@ -0,0 +1,82 @@
# PocketFlow Summarize
A practical example demonstrating how to use PocketFlow to build a robust text summarization tool with error handling and retries. This example showcases core PocketFlow concepts in a real-world application.
## Features
- Text summarization using LLMs (Large Language Models)
- Automatic retry mechanism (up to 3 attempts) on API failures
- Graceful error handling with fallback responses
- Clean separation of concerns using PocketFlow's Node architecture
## Project Structure
```
.
├── docs/ # Documentation files
├── utils/ # Utility functions (LLM API wrapper)
├── flow.py # PocketFlow implementation with Summarize Node
├── main.py # Main application entry point
└── README.md # Project documentation
```
## Implementation Details
The example implements a simple but robust text summarization workflow:
1. **Summarize Node** (`flow.py`):
- `prep()`: Retrieves text from the shared store
- `exec()`: Calls LLM to summarize text in 10 words
- `exec_fallback()`: Provides graceful error handling
- `post()`: Stores the summary back in shared store
2. **Flow Structure**:
- Single node flow for demonstration
- Configured with 3 retries for reliability
- Uses shared store for data passing
## Setup
1. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Configure your environment:
- Set up your LLM API key (check utils/call_llm.py for configuration)
4. Run the example:
```bash
python main.py
```
## Example Usage
The example comes with a sample text about PocketFlow, but you can modify `main.py` to summarize your own text:
```python
shared = {"data": "Your text to summarize here..."}
flow.run(shared)
print("Summary:", shared["summary"])
```
## What You'll Learn
This example demonstrates several key PocketFlow concepts:
- **Node Architecture**: How to structure LLM tasks using prep/exec/post pattern
- **Error Handling**: Implementing retry mechanisms and fallbacks
- **Shared Store**: Using shared storage for data flow between steps
- **Flow Creation**: Setting up a basic PocketFlow workflow
## Additional Resources
- [PocketFlow Documentation](https://the-pocket.github.io/PocketFlow/)
- [Node Concept Guide](https://the-pocket.github.io/PocketFlow/node.html)
- [Flow Design Patterns](https://the-pocket.github.io/PocketFlow/flow.html)

View File

@ -0,0 +1,28 @@
from pocketflow import Node, Flow
from utils.call_llm import call_llm
class Summarize(Node):
def prep(self, shared):
"""Read and preprocess data from shared store."""
return shared["data"]
def exec(self, prep_res):
"""Execute the summarization using LLM."""
if not prep_res:
return "Empty text"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, shared, prep_res, exc):
"""Provide a simple fallback instead of crashing."""
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
"""Store the summary in shared store."""
shared["summary"] = exec_res
# Return "default" by not returning
# Create the flow
summarize_node = Summarize(max_retries=3)
flow = Flow(start=summarize_node)

View File

@ -0,0 +1,23 @@
from flow import flow
def main():
# Example text to summarize
text = """
PocketFlow is a minimalist LLM framework that models workflows as a Nested Directed Graph.
Nodes handle simple LLM tasks, connecting through Actions for Agents.
Flows orchestrate these nodes for Task Decomposition, and can be nested.
It also supports Batch processing and Async execution.
"""
# Initialize shared store
shared = {"data": text}
# Run the flow
flow.run(shared)
# Print result
print("\nInput text:", text)
print("\nSummary:", shared["summary"])
if __name__ == "__main__":
main()

View File

@ -0,0 +1,2 @@
pocketflow
openai>=1.0.0

View File

@ -0,0 +1,13 @@
from openai import OpenAI
def call_llm(prompt):
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
if __name__ == "__main__":
prompt = "What is the meaning of life?"
print(call_llm(prompt))

View File

@ -0,0 +1,75 @@
# Parallel Image Processor (AsyncParallelBatchFlow Example)
This example demonstrates how to use `AsyncParallelBatchFlow` to process multiple images with multiple filters in parallel.
## How it Works
1. **Image Generation**: Creates sample images (gradient, checkerboard, circles)
2. **Filter Application**: Applies different filters (grayscale, blur, sepia) to each image
3. **Parallel Processing**: Processes all image-filter combinations concurrently
### Flow Structure
```mermaid
graph TD
subgraph AsyncParallelBatchFlow[Image Processing Flow]
subgraph AsyncFlow[Per Image-Filter Flow]
A[Load Image] --> B[Apply Filter]
B --> C[Save Image]
end
end
```
### Key Components
1. **LoadImage (AsyncNode)**
- Loads an image from file
- Uses PIL for image handling
2. **ApplyFilter (AsyncNode)**
- Applies the specified filter
- Supports grayscale, blur, and sepia
3. **SaveImage (AsyncNode)**
- Saves the processed image
- Creates output directory if needed
4. **ImageBatchFlow (AsyncParallelBatchFlow)**
- Manages parallel processing of all image-filter combinations
- Returns parameters for each sub-flow
## Running the Example
1. Install dependencies:
```bash
pip install -r requirements.txt
```
2. Run the example:
```bash
python main.py
```
## Sample Output
The example will:
1. Create 3 sample images: `cat.jpg`, `dog.jpg`, `bird.jpg`
2. Apply 3 filters to each image
3. Save results in `output/` directory (9 total images)
Example output structure:
```
output/
├── cat_grayscale.jpg
├── cat_blur.jpg
├── cat_sepia.jpg
├── dog_grayscale.jpg
...etc
```
## Key Concepts
1. **Parallel Flow Execution**: Each image-filter combination runs as a separate flow in parallel
2. **Parameter Management**: The batch flow generates parameters for each sub-flow
3. **Resource Management**: Uses semaphores to limit concurrent image processing
4. **Error Handling**: Gracefully handles failures in individual flows

View File

@ -0,0 +1,50 @@
"""Flow definitions for parallel image processing."""
from pocketflow import AsyncFlow, AsyncParallelBatchFlow
from nodes import LoadImage, ApplyFilter, SaveImage, NoOp
def create_base_flow():
"""Create flow for processing a single image with one filter."""
# Create nodes
load = LoadImage()
apply_filter = ApplyFilter()
save = SaveImage()
noop = NoOp()
# Connect nodes
load - "apply_filter" >> apply_filter
apply_filter - "save" >> save
save - "default" >> noop
# Create flow
return AsyncFlow(start=load)
class ImageBatchFlow(AsyncParallelBatchFlow):
"""Flow that processes multiple images with multiple filters in parallel."""
async def prep_async(self, shared):
"""Generate parameters for each image-filter combination."""
# Get list of images and filters
images = shared.get("images", [])
filters = ["grayscale", "blur", "sepia"]
# Create parameter combinations
params = []
for image_path in images:
for filter_type in filters:
params.append({
"image_path": image_path,
"filter": filter_type
})
print(f"\nProcessing {len(images)} images with {len(filters)} filters...")
print(f"Total combinations: {len(params)}")
return params
def create_flow():
"""Create the complete parallel processing flow."""
# Create base flow for single image processing
base_flow = create_base_flow()
# Wrap in parallel batch flow
return ImageBatchFlow(start=base_flow)

Binary file not shown.

After

Width:  |  Height:  |  Size: 249 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 214 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 147 KiB

View File

@ -0,0 +1,47 @@
import os
import asyncio
import numpy as np
from PIL import Image
from flow import create_flow
def get_image_paths():
"""Get paths of existing images in the images directory."""
images_dir = "images"
if not os.path.exists(images_dir):
raise ValueError(f"Directory '{images_dir}' not found!")
# List all jpg files in the images directory
image_paths = []
for filename in os.listdir(images_dir):
if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
image_paths.append(os.path.join(images_dir, filename))
if not image_paths:
raise ValueError(f"No images found in '{images_dir}' directory!")
print(f"\nFound {len(image_paths)} images:")
for path in image_paths:
print(f"- {path}")
return image_paths
async def main():
"""Run the parallel image processing example."""
print("\nParallel Image Processor")
print("-" * 30)
# Get existing image paths
image_paths = get_image_paths()
# Create shared store with image paths
shared = {"images": image_paths}
# Create and run flow
flow = create_flow()
await flow.run_async(shared)
print("\nProcessing complete! Check the output/ directory for results.")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,112 @@
"""AsyncNode implementations for image processing."""
import os
import asyncio
from PIL import Image, ImageFilter
import numpy as np
from pocketflow import AsyncNode
class NoOp(AsyncNode):
"""Node that does nothing, used as a terminal node."""
async def prep_async(self, shared):
"""No preparation needed."""
return None
async def exec_async(self, prep_res):
"""No execution needed."""
return None
async def post_async(self, shared, prep_res, exec_res):
"""No post-processing needed."""
return None
class LoadImage(AsyncNode):
"""Node that loads an image from file."""
async def prep_async(self, shared):
"""Get image path from parameters."""
image_path = self.params["image_path"]
print(f"\nLoading image: {image_path}")
return image_path
async def exec_async(self, image_path):
"""Load image using PIL."""
# Simulate I/O delay
await asyncio.sleep(0.1)
return Image.open(image_path)
async def post_async(self, shared, prep_res, exec_res):
"""Store image in shared store."""
shared["image"] = exec_res
return "apply_filter"
class ApplyFilter(AsyncNode):
"""Node that applies a filter to an image."""
async def prep_async(self, shared):
"""Get image and filter type."""
image = shared["image"]
filter_type = self.params["filter"]
print(f"Applying {filter_type} filter...")
return image, filter_type
async def exec_async(self, inputs):
"""Apply the specified filter."""
image, filter_type = inputs
# Simulate processing delay
await asyncio.sleep(0.5)
if filter_type == "grayscale":
return image.convert("L")
elif filter_type == "blur":
return image.filter(ImageFilter.BLUR)
elif filter_type == "sepia":
# Convert to array for sepia calculation
img_array = np.array(image)
sepia_matrix = np.array([
[0.393, 0.769, 0.189],
[0.349, 0.686, 0.168],
[0.272, 0.534, 0.131]
])
sepia_array = img_array.dot(sepia_matrix.T)
sepia_array = np.clip(sepia_array, 0, 255).astype(np.uint8)
return Image.fromarray(sepia_array)
else:
raise ValueError(f"Unknown filter: {filter_type}")
async def post_async(self, shared, prep_res, exec_res):
"""Store filtered image."""
shared["filtered_image"] = exec_res
return "save"
class SaveImage(AsyncNode):
"""Node that saves the processed image."""
async def prep_async(self, shared):
"""Prepare output path."""
image = shared["filtered_image"]
base_name = os.path.splitext(os.path.basename(self.params["image_path"]))[0]
filter_type = self.params["filter"]
output_path = f"output/{base_name}_{filter_type}.jpg"
# Create output directory if needed
os.makedirs("output", exist_ok=True)
return image, output_path
async def exec_async(self, inputs):
"""Save the image."""
image, output_path = inputs
# Simulate I/O delay
await asyncio.sleep(0.1)
image.save(output_path)
return output_path
async def post_async(self, shared, prep_res, exec_res):
"""Print success message."""
print(f"Saved: {exec_res}")
return "default"

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

View File

@ -0,0 +1,3 @@
pocketflow
Pillow>=10.0.0 # For image processing
numpy>=1.24.0 # For image array operations

View File

@ -0,0 +1,105 @@
# PocketFlow Parallel Batch Node Example
This example demonstrates parallel processing using AsyncParallelBatchNode to summarize multiple news articles concurrently. It shows how to:
1. Process multiple items in parallel
2. Handle I/O-bound tasks efficiently
3. Manage rate limits with throttling
## What this Example Does
When you run the example:
1. It loads multiple news articles from a data directory
2. Processes them in parallel using AsyncParallelBatchNode
3. For each article:
- Extracts key information
- Generates a summary using an LLM
- Saves the results
4. Combines all summaries into a final report
## How it Works
The example uses AsyncParallelBatchNode to process articles in parallel:
```python
class ParallelSummarizer(AsyncParallelBatchNode):
async def prep_async(self, shared):
# Return list of articles to process
return shared["articles"]
async def exec_async(self, article):
# Process single article (called in parallel)
summary = await call_llm_async(f"Summarize: {article}")
return summary
async def post_async(self, shared, prep_res, summaries):
# Combine all summaries
shared["summaries"] = summaries
return "default"
```
Key features demonstrated:
- Parallel execution of `exec_async`
- Rate limiting with semaphores
- Error handling for failed requests
- Progress tracking for parallel tasks
## Project Structure
```
pocketflow-parallel-batch-node/
├── README.md
├── requirements.txt
├── data/
│ ├── article1.txt
│ ├── article2.txt
│ └── article3.txt
├── main.py
├── flow.py
├── nodes.py
└── utils.py
```
## Running the Example
```bash
# Install dependencies
pip install -r requirements.txt
# Run the example
python main.py
```
## Sample Output
```
Loading articles...
Found 3 articles to process
Processing in parallel...
[1/3] Processing article1.txt...
[2/3] Processing article2.txt...
[3/3] Processing article3.txt...
Summaries generated:
1. First article summary...
2. Second article summary...
3. Third article summary...
Final report saved to: summaries.txt
```
## Key Concepts
1. **Parallel Processing**
- Using AsyncParallelBatchNode for concurrent execution
- Managing parallel tasks efficiently
2. **Rate Limiting**
- Using semaphores to control concurrent requests
- Avoiding API rate limits
3. **Error Handling**
- Graceful handling of failed requests
- Retrying failed tasks
4. **Progress Tracking**
- Monitoring parallel task progress
- Providing user feedback

View File

@ -0,0 +1 @@
Article 1: AI advances in 2024...

View File

@ -0,0 +1 @@
Article 2: New quantum computing breakthrough...

View File

@ -0,0 +1 @@
Article 3: Latest developments in robotics...

View File

@ -0,0 +1,3 @@
1. Summary of: Article 1: AI advances in 2024...
2. Summary of: Article 2: New quantum computi...
3. Summary of: Article 3: Latest developments...

View File

@ -0,0 +1,24 @@
"""AsyncFlow implementation for parallel article processing."""
from pocketflow import AsyncFlow, Node
from nodes import LoadArticles, ParallelSummarizer
class NoOp(Node):
"""Node that does nothing, used to properly end the flow."""
pass
def create_flow():
"""Create and connect nodes into a flow."""
# Create nodes
loader = LoadArticles()
summarizer = ParallelSummarizer()
end = NoOp()
# Connect nodes
loader - "process" >> summarizer
summarizer - "default" >> end # Properly end the flow
# Create flow starting with loader
flow = AsyncFlow(start=loader)
return flow

View File

@ -0,0 +1,19 @@
import asyncio
from flow import create_flow
async def main():
"""Run the parallel processing flow."""
# Create flow
flow = create_flow()
# Create shared store
shared = {}
# Run flow
print("\nParallel Article Summarizer")
print("-------------------------")
await flow.run_async(shared)
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())

View File

@ -0,0 +1,48 @@
"""AsyncParallelBatchNode implementation for article summarization."""
from pocketflow import AsyncParallelBatchNode, AsyncNode
from utils import call_llm_async, load_articles, save_summaries
class LoadArticles(AsyncNode):
"""Node that loads articles to process."""
async def prep_async(self, shared):
"""Load articles from data directory."""
print("\nLoading articles...")
articles = await load_articles()
return articles
async def exec_async(self, articles):
"""No processing needed."""
return articles
async def post_async(self, shared, prep_res, exec_res):
"""Store articles in shared store."""
shared["articles"] = exec_res
print(f"Found {len(exec_res)} articles to process")
return "process"
class ParallelSummarizer(AsyncParallelBatchNode):
"""Node that summarizes articles in parallel."""
async def prep_async(self, shared):
"""Get articles from shared store."""
print("\nProcessing in parallel...")
return shared["articles"]
async def exec_async(self, article):
"""Summarize a single article (called in parallel)."""
summary = await call_llm_async(article)
return summary
async def post_async(self, shared, prep_res, summaries):
"""Store summaries and save to file."""
shared["summaries"] = summaries
print("\nSummaries generated:")
for i, summary in enumerate(summaries, 1):
print(f"{i}. {summary}")
save_summaries(summaries)
print("\nFinal report saved to: summaries.txt")
return "default"

View File

@ -0,0 +1,4 @@
pocketflow
aiohttp>=3.8.0 # For async HTTP requests
openai>=1.0.0 # For async LLM calls
tqdm>=4.65.0 # For progress bars

View File

@ -0,0 +1,53 @@
"""Utility functions for parallel processing."""
import os
import asyncio
import aiohttp
from openai import AsyncOpenAI
from tqdm import tqdm
# Semaphore to limit concurrent API calls
MAX_CONCURRENT_CALLS = 3
semaphore = asyncio.Semaphore(MAX_CONCURRENT_CALLS)
async def call_llm_async(prompt):
"""Make async LLM call with rate limiting."""
async with semaphore: # Limit concurrent calls
print(f"\nProcessing: {prompt[:50]}...")
# Simulate API call with delay
await asyncio.sleep(1)
# Mock LLM response (in real app, would call OpenAI)
summary = f"Summary of: {prompt[:30]}..."
return summary
async def load_articles():
"""Load articles from data directory."""
# For demo, generate mock articles
articles = [
"Article 1: AI advances in 2024...",
"Article 2: New quantum computing breakthrough...",
"Article 3: Latest developments in robotics..."
]
# Create data directory if it doesn't exist
data_dir = "data"
os.makedirs(data_dir, exist_ok=True)
# Save mock articles to files
for i, content in enumerate(articles, 1):
with open(os.path.join(data_dir, f"article{i}.txt"), "w") as f:
f.write(content)
return articles
def save_summaries(summaries):
"""Save summaries to output file."""
# Create data directory if it doesn't exist
data_dir = "data"
os.makedirs(data_dir, exist_ok=True)
with open(os.path.join(data_dir, "summaries.txt"), "w") as f:
for i, summary in enumerate(summaries, 1):
f.write(f"{i}. {summary}\n")

View File

@ -0,0 +1,71 @@
# Web Crawler with Content Analysis
A web crawler tool built with PocketFlow that crawls websites and analyzes content using LLM.
## Features
- Crawls websites while respecting domain boundaries
- Extracts text content and links from pages
- Analyzes content using GPT-4 to generate:
- Page summaries
- Main topics/keywords
- Content type classification
- Processes pages in batches for efficiency
- Generates a comprehensive analysis report
## Installation
1. Clone the repository
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Set your OpenAI API key:
```bash
export OPENAI_API_KEY='your-api-key'
```
## Usage
Run the crawler:
```bash
python main.py
```
You will be prompted to:
1. Enter the website URL to crawl
2. Specify maximum number of pages to crawl (default: 10)
The tool will then:
1. Crawl the specified website
2. Extract and analyze content using GPT-4
3. Generate a report with findings
## Project Structure
```
pocketflow-tool-crawler/
├── tools/
│ ├── crawler.py # Web crawling functionality
│ └── parser.py # Content analysis using LLM
├── utils/
│ └── call_llm.py # LLM API wrapper
├── nodes.py # PocketFlow nodes
├── flow.py # Flow configuration
├── main.py # Main script
└── requirements.txt # Dependencies
```
## Limitations
- Only crawls within the same domain
- Text content only (no images/media)
- Rate limited by OpenAI API
- Basic error handling
## Dependencies
- pocketflow: Flow-based processing
- requests: HTTP requests
- beautifulsoup4: HTML parsing
- openai: GPT-4 API access

View File

@ -0,0 +1,19 @@
from pocketflow import Flow
from nodes import CrawlWebsiteNode, AnalyzeContentBatchNode, GenerateReportNode
def create_flow() -> Flow:
"""Create and configure the crawling flow
Returns:
Flow: Configured flow ready to run
"""
# Create nodes
crawl = CrawlWebsiteNode()
analyze = AnalyzeContentBatchNode()
report = GenerateReportNode()
# Connect nodes
crawl >> analyze >> report
# Create flow starting with crawl
return Flow(start=crawl)

View File

@ -0,0 +1,26 @@
import os
from flow import create_flow
def main():
"""Run the web crawler flow"""
# Get website URL from user
url = input("Enter website URL to crawl (e.g., https://example.com): ")
if not url:
print("Error: URL is required")
return
# Initialize shared data
shared = {
"base_url": url,
"max_pages": 1
}
# Create and run flow
flow = create_flow()
flow.run(shared)
# Results are in shared["report"]
if __name__ == "__main__":
main()

View File

@ -0,0 +1,75 @@
from pocketflow import Node, BatchNode
from tools.crawler import WebCrawler
from tools.parser import analyze_site
from typing import List, Dict
class CrawlWebsiteNode(Node):
"""Node to crawl a website and extract content"""
def prep(self, shared):
return shared.get("base_url"), shared.get("max_pages", 10)
def exec(self, inputs):
base_url, max_pages = inputs
if not base_url:
return []
crawler = WebCrawler(base_url, max_pages)
return crawler.crawl()
def post(self, shared, prep_res, exec_res):
shared["crawl_results"] = exec_res
return "default"
class AnalyzeContentBatchNode(BatchNode):
"""Node to analyze crawled content in batches"""
def prep(self, shared):
results = shared.get("crawl_results", [])
# Process in batches of 5 pages
batch_size = 5
return [results[i:i+batch_size] for i in range(0, len(results), batch_size)]
def exec(self, batch):
return analyze_site(batch)
def post(self, shared, prep_res, exec_res_list):
# Flatten results from all batches
all_results = []
for batch_results in exec_res_list:
all_results.extend(batch_results)
shared["analyzed_results"] = all_results
return "default"
class GenerateReportNode(Node):
"""Node to generate a summary report of the analysis"""
def prep(self, shared):
return shared.get("analyzed_results", [])
def exec(self, results):
if not results:
return "No results to report"
report = []
report.append(f"Analysis Report\n")
report.append(f"Total pages analyzed: {len(results)}\n")
for page in results:
report.append(f"\nPage: {page['url']}")
report.append(f"Title: {page['title']}")
analysis = page.get("analysis", {})
report.append(f"Summary: {analysis.get('summary', 'N/A')}")
report.append(f"Topics: {', '.join(analysis.get('topics', []))}")
report.append(f"Content Type: {analysis.get('content_type', 'unknown')}")
report.append("-" * 80)
return "\n".join(report)
def post(self, shared, prep_res, exec_res):
shared["report"] = exec_res
print("\nReport generated:")
print(exec_res)
return "default"

View File

@ -0,0 +1,4 @@
pocketflow>=0.1.0
requests>=2.31.0
beautifulsoup4>=4.12.0
openai>=1.0.0 # for content analysis

View File

@ -0,0 +1,74 @@
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from typing import Dict, List, Set
class WebCrawler:
"""Simple web crawler that extracts content and follows links"""
def __init__(self, base_url: str, max_pages: int = 10):
self.base_url = base_url
self.max_pages = max_pages
self.visited: Set[str] = set()
def is_valid_url(self, url: str) -> bool:
"""Check if URL belongs to the same domain"""
base_domain = urlparse(self.base_url).netloc
url_domain = urlparse(url).netloc
return base_domain == url_domain
def extract_page_content(self, url: str) -> Dict:
"""Extract content from a single page"""
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Extract main content
content = {
"url": url,
"title": soup.title.string if soup.title else "",
"text": soup.get_text(separator="\n", strip=True),
"links": []
}
# Extract links
for link in soup.find_all("a"):
href = link.get("href")
if href:
absolute_url = urljoin(url, href)
if self.is_valid_url(absolute_url):
content["links"].append(absolute_url)
return content
except Exception as e:
print(f"Error crawling {url}: {str(e)}")
return None
def crawl(self) -> List[Dict]:
"""Crawl website starting from base_url"""
to_visit = [self.base_url]
results = []
while to_visit and len(self.visited) < self.max_pages:
url = to_visit.pop(0)
if url in self.visited:
continue
print(f"Crawling: {url}")
content = self.extract_page_content(url)
if content:
self.visited.add(url)
results.append(content)
# Add new URLs to visit
new_urls = [url for url in content["links"]
if url not in self.visited
and url not in to_visit]
to_visit.extend(new_urls)
return results

View File

@ -0,0 +1,77 @@
from typing import Dict, List
from utils.call_llm import call_llm
def analyze_content(content: Dict) -> Dict:
"""Analyze webpage content using LLM
Args:
content (Dict): Webpage content with url, title and text
Returns:
Dict: Analysis results including summary and topics
"""
prompt = f"""
Analyze this webpage content:
Title: {content['title']}
URL: {content['url']}
Content: {content['text'][:2000]} # Limit content length
Please provide:
1. A brief summary (2-3 sentences)
2. Main topics/keywords (up to 5)
3. Content type (article, product page, etc)
Output in YAML format:
```yaml
summary: >
brief summary here
topics:
- topic 1
- topic 2
content_type: type here
```
"""
try:
response = call_llm(prompt)
# Extract YAML between code fences
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
analysis = yaml.safe_load(yaml_str)
# Validate required fields
assert "summary" in analysis
assert "topics" in analysis
assert "content_type" in analysis
assert isinstance(analysis["topics"], list)
return analysis
except Exception as e:
print(f"Error analyzing content: {str(e)}")
return {
"summary": "Error analyzing content",
"topics": [],
"content_type": "unknown"
}
def analyze_site(crawl_results: List[Dict]) -> List[Dict]:
"""Analyze all crawled pages
Args:
crawl_results (List[Dict]): List of crawled page contents
Returns:
List[Dict]: Original content with added analysis
"""
analyzed_results = []
for content in crawl_results:
if content and content.get("text"):
analysis = analyze_content(content)
content["analysis"] = analysis
analyzed_results.append(content)
return analyzed_results

View File

@ -0,0 +1,30 @@
from openai import OpenAI
import os
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def call_llm(prompt: str) -> str:
"""Call OpenAI API to analyze text
Args:
prompt (str): Input prompt for the model
Returns:
str: Model response
"""
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
print(f"Error calling LLM API: {str(e)}")
return ""
if __name__ == "__main__":
# Test LLM call
response = call_llm("What is web crawling?")
print("Response:", response)

View File

@ -0,0 +1,89 @@
# SQLite Database with PocketFlow
This example demonstrates how to properly integrate SQLite database operations with PocketFlow, focusing on:
1. Clean code organization with separation of concerns:
- Tools layer for database operations (`tools/database.py`)
- Node implementation for PocketFlow integration (`nodes.py`)
- Flow configuration (`flow.py`)
- Safe SQL query execution with parameter binding
2. Best practices for database operations:
- Connection management with proper closing
- SQL injection prevention using parameterized queries
- Error handling and resource cleanup
- Simple schema management
3. Example task management system:
- Database initialization
- Task creation
- Task listing
- Status tracking
## Project Structure
```
pocketflow-tool-database/
├── tools/
│ └── database.py # SQLite database operations
├── nodes.py # PocketFlow node implementation
├── flow.py # Flow configuration
└── main.py # Example usage
```
## Setup
1. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
## Usage
Run the example:
```bash
python main.py
```
This will:
1. Initialize a SQLite database with a tasks table
2. Create an example task
3. List all tasks in the database
4. Display the results
## Key Concepts Demonstrated
1. **Database Operations**
- Safe connection handling
- Query parameterization
- Schema management
2. **Code Organization**
- Clear separation between database operations and PocketFlow components
- Modular project structure
- Type hints and documentation
3. **PocketFlow Integration**
- Node implementation with prep->exec->post lifecycle
- Flow configuration
- Shared store usage for data passing
## Example Output
```
Database Status: Database initialized
Task Status: Task created successfully
All Tasks:
- ID: 1
Title: Example Task
Description: This is an example task created using PocketFlow
Status: pending
Created: 2024-03-02 12:34:56
```

Binary file not shown.

View File

@ -0,0 +1,16 @@
from pocketflow import Flow
from nodes import InitDatabaseNode, CreateTaskNode, ListTasksNode
def create_database_flow():
"""Create a flow for database operations"""
# Create nodes
init_db = InitDatabaseNode()
create_task = CreateTaskNode()
list_tasks = ListTasksNode()
# Connect nodes
init_db >> create_task >> list_tasks
# Create and return flow
return Flow(start=init_db)

View File

@ -0,0 +1,29 @@
from flow import create_database_flow
def main():
# Create the flow
flow = create_database_flow()
# Prepare example task data
shared = {
"task_title": "Example Task",
"task_description": "This is an example task created using PocketFlow"
}
# Run the flow
flow.run(shared)
# Print results
print("Database Status:", shared.get("db_status"))
print("Task Status:", shared.get("task_status"))
print("\nAll Tasks:")
for task in shared.get("tasks", []):
print(f"- ID: {task[0]}")
print(f" Title: {task[1]}")
print(f" Description: {task[2]}")
print(f" Status: {task[3]}")
print(f" Created: {task[4]}")
print()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,43 @@
from pocketflow import Node
from tools.database import execute_sql, init_db
class InitDatabaseNode(Node):
"""Node for initializing the database"""
def exec(self, _):
init_db()
return "Database initialized"
def post(self, shared, prep_res, exec_res):
shared["db_status"] = exec_res
return "default"
class CreateTaskNode(Node):
"""Node for creating a new task"""
def prep(self, shared):
return (
shared.get("task_title", ""),
shared.get("task_description", "")
)
def exec(self, inputs):
title, description = inputs
query = "INSERT INTO tasks (title, description) VALUES (?, ?)"
execute_sql(query, (title, description))
return "Task created successfully"
def post(self, shared, prep_res, exec_res):
shared["task_status"] = exec_res
return "default"
class ListTasksNode(Node):
"""Node for listing all tasks"""
def exec(self, _):
query = "SELECT * FROM tasks"
return execute_sql(query)
def post(self, shared, prep_res, exec_res):
shared["tasks"] = exec_res
return "default"

View File

@ -0,0 +1,2 @@
pocketflow>=0.1.0
python-dotenv>=0.19.0

Some files were not shown because too many files have changed in this diff Show More