Compare commits

...

10 Commits

Author SHA1 Message Date
Peter Howell b91879a59e my agent 2025-10-02 02:37:04 +00:00
Zachary Huang 23e36bfbdf
Merge pull request #109 from Dawinia/fix107
FIX: RecursionError when loop flow
2025-08-13 10:27:39 -07:00
士本 129b9b07c7 FIX: RecursionError when loop flow 2025-08-13 23:52:23 +08:00
zachary62 9c3def9884 update requirements 2025-08-03 17:27:20 -07:00
zachary62 fad4ed1717 update rule file 2025-08-03 16:53:38 -07:00
Zachary Huang f98e9f4806
Merge pull request #102 from cozmosisosis/main
Added PyYAML to requirements.txt in 'PocketFlow\cookbook\pocketflow-agent'
2025-07-31 08:18:26 -07:00
Zachary Huang 66d2fa09f9
Merge pull request #101 from hyh19/main
Fix AsyncNode retry mechanism to use self.cur_retry attribute
2025-07-31 08:17:19 -07:00
Cosmos Tremblay Owens 7dd0fc4a11 Updated requirements.txt to include PyYAML and improved the format 2025-07-31 10:08:27 -04:00
spadkins fd5817fdcc fix: align AsyncNode retry mechanism with Node implementation
- Change AsyncNode._exec() to use self.cur_retry instead of local variable i
- Ensures consistency between sync and async retry mechanisms
- Fixes AttributeError when accessing cur_retry in derived classes
- Maintains backward compatibility while resolving retry tracking inconsistency
2025-07-31 12:11:14 +08:00
zachary62 3f1f1d6573 Release 0.0.2 to 0.0.3 with type hints 2025-07-27 17:58:15 -07:00
17 changed files with 789 additions and 28 deletions

View File

@ -156,10 +156,119 @@ my_project/
└── design.md
```
- **`requirements.txt`**: Lists the Python dependencies for the project.
```
PyYAML
pocketflow
```
- **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*.
~~~
# Design Doc: Your Project Name
> Please DON'T remove notes for AI
## Requirements
> Notes for AI: Keep it simple and clear.
> If the requirements are abstract, write concrete user stories
## Flow Design
> Notes for AI:
> 1. Consider the design patterns of agent, map-reduce, rag, and workflow. Apply them if they fit.
> 2. Present a concise, high-level description of the workflow.
### Applicable Design Pattern:
1. Map the file summary into chunks, then reduce these chunks into a final summary.
2. Agentic file finder
- *Context*: The entire summary of the file
- *Action*: Find the file
### Flow high-level Design:
1. **First Node**: This node is for ...
2. **Second Node**: This node is for ...
3. **Third Node**: This node is for ...
```mermaid
flowchart TD
firstNode[First Node] --> secondNode[Second Node]
secondNode --> thirdNode[Third Node]
```
## Utility Functions
> Notes for AI:
> 1. Understand the utility function definition thoroughly by reviewing the doc.
> 2. Include only the necessary utility functions, based on nodes in the flow.
1. **Call LLM** (`utils/call_llm.py`)
- *Input*: prompt (str)
- *Output*: response (str)
- Generally used by most nodes for LLM tasks
2. **Embedding** (`utils/get_embedding.py`)
- *Input*: str
- *Output*: a vector of 3072 floats
- Used by the second node to embed text
## Node Design
### Shared Store
> Notes for AI: Try to minimize data redundancy
The shared store structure is organized as follows:
```python
shared = {
"key": "value"
}
```
### Node Steps
> Notes for AI: Carefully decide whether to use Batch/Async Node/Flow.
1. First Node
- *Purpose*: Provide a short explanation of the nodes function
- *Type*: Decide between Regular, Batch, or Async
- *Steps*:
- *prep*: Read "key" from the shared store
- *exec*: Call the utility function
- *post*: Write "key" to the shared store
2. Second Node
...
~~~
- **`utils/`**: Contains all utility functions.
- It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`.
- Each file should also include a `main()` function to try that API call
```python
from google import genai
import os
def call_llm(prompt: str) -> str:
client = genai.Client(
api_key=os.getenv("GEMINI_API_KEY", ""),
)
model = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
response = client.models.generate_content(model=model, contents=[prompt])
return response.text
if __name__ == "__main__":
test_prompt = "Hello, how are you?"
# First call - should hit the API
print("Making call...")
response1 = call_llm(test_prompt, use_cache=False)
print(f"Response: {response1}")
```
- **`nodes.py`**: Contains all the node definitions.
```python
# nodes.py
@ -1559,24 +1668,25 @@ Here, we provide some minimal example implementations:
def call_llm(prompt):
from anthropic import Anthropic
client = Anthropic(api_key="YOUR_API_KEY_HERE")
response = client.messages.create(
model="claude-2",
messages=[{"role": "user", "content": prompt}],
max_tokens=100
r = client.messages.create(
model="claude-sonnet-4-0",
messages=[
{"role": "user", "content": prompt}
]
)
return response.content
return r.content[0].text
```
3. Google (Generative AI Studio / PaLM API)
```python
def call_llm(prompt):
import google.generativeai as genai
genai.configure(api_key="YOUR_API_KEY_HERE")
response = genai.generate_text(
model="models/text-bison-001",
prompt=prompt
)
return response.result
from google import genai
client = genai.Client(api_key='GEMINI_API_KEY')
response = client.models.generate_content(
model='gemini-2.5-pro',
contents=prompt
)
return response.text
```
4. Azure (Azure OpenAI)

1
.gitignore vendored
View File

@ -84,3 +84,4 @@ pyproject.toml
usage.md
cookbook/pocketflow-minimal-example/viz/flow_visualization.html
cookbook/pocketflow-minimal-example/viz/flow_visualization.json
.claude/

76
assistant_resources.py Normal file
View File

@ -0,0 +1,76 @@
"""Registry describing files the natural-language assistant can modify."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List
BASE_DIR = Path(__file__).resolve().parent
@dataclass(frozen=True)
class Resource:
key: str
path: Path
description: str
kind: str # e.g., "text" or "calendar"
keywords: List[str]
RESOURCE_REGISTRY: Dict[str, Resource] = {
"shopping_list": Resource(
key="shopping_list",
path=BASE_DIR / "shopping_list.txt",
description="Items to purchase on the next store run.",
kind="text",
keywords=["buy", "purchase", "shop", "grocery", "groceries"],
),
"home_todo": Resource(
key="home_todo",
path=BASE_DIR / "home_todo.txt",
description="Household maintenance or chores.",
kind="text",
keywords=["home", "house", "chore", "laundry", "clean"],
),
"work_todo": Resource(
key="work_todo",
path=BASE_DIR / "work_todo.txt",
description="Tasks related to your job or ongoing projects.",
kind="text",
keywords=["work", "office", "project", "client", "email","gavilan", "efw", ],
),
"school_courses": Resource(
key="school_courses",
path=BASE_DIR / "school_courses.txt",
description="Assignments or study notes for school work.",
kind="text",
keywords=["school", "class", "course", "study", "assignment"],
),
"ideas": Resource(
key="ideas",
path=BASE_DIR / "ideas.txt",
description="Random ideas, inspiration, or brainstorming notes.",
kind="text",
keywords=["idea", "brainstorm", "concept", "inspiration"],
),
"calendar": Resource(
key="calendar",
path=BASE_DIR / "calendar.ics",
description="Time-based events saved in an iCalendar file.",
kind="calendar",
keywords=["meeting", "schedule", "calendar", "appointment", "event"],
),
}
def resource_descriptions() -> str:
"""Return a human-readable summary of supported resources."""
lines = [
"The assistant can update the following resources:",
]
for res in RESOURCE_REGISTRY.values():
lines.append(f"- {res.key}: {res.description} (file: {res.path.name})")
return "\n".join(lines)
__all__ = ["RESOURCE_REGISTRY", "resource_descriptions", "Resource"]

4
calendar.ics Normal file
View File

@ -0,0 +1,4 @@
BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//PocketFlow//Assistant//EN
END:VCALENDAR

View File

@ -1,5 +1,6 @@
pocketflow>=0.0.1
aiohttp>=3.8.0 # For HTTP requests
openai>=1.0.0 # For LLM calls
duckduckgo-search>=7.5.2 # For web search
requests>=2.25.1 # For HTTP requests
duckduckgo-search>=7.5.2 # For web search
aiohttp>=3.8.0 # For HTTP requests
openai>=1.0.0 # For LLM calls
requests>=2.25.1 # For HTTP requests
PyYAML>=6.0.2 # For YAML parsing

View File

@ -0,0 +1,73 @@
from async_flow import *
from pocketflow import Flow, AsyncParallelBatchNode, Node
# Create node instances
validate_payment = ValidatePayment()
process_payment = ProcessPayment()
payment_confirmation = PaymentConfirmation()
check_stock = CheckStock()
reserve_items = ReserveItems()
update_inventory = UpdateInventory()
create_label = CreateLabel()
assign_carrier = AssignCarrier()
schedule_pickup = SchedulePickup()
# Payment processing sub-flow
validate_payment >> process_payment
validate_payment - "out_of_stock" >> validate_payment # 循环重试
process_payment - 'something fail' >> validate_payment
process_payment - 'pass' >> payment_confirmation
payment_flow = AsyncFlow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = AsyncFlow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = AsyncFlow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# payment_flow >> inventory_flow >> create_label
# payment_flow >> inventory_flow >> assign_carrier
# Create the master flow
class OrderFlow(AsyncFlow):
pass
order_pipeline = OrderFlow(start=payment_flow)
# Create shared data structure
shared_data = {
"order_id": "ORD-12345",
"customer": "John Doe",
"items": [
{"id": "ITEM-001", "name": "Smartphone", "price": 999.99, "quantity": 1},
{"id": "ITEM-002", "name": "Phone case", "price": 29.99, "quantity": 1},
],
"shipping_address": {
"street": "123 Main St",
"city": "Anytown",
"state": "CA",
"zip": "12345",
},
}
# Run the entire pipeline asynchronously
async def main():
await order_pipeline.run_async(shared_data)
# Print final status
print("\nOrder processing completed!")
print(f"Payment: {shared_data.get('payment_confirmation')}")
print(f"Inventory: {shared_data.get('inventory_update')}")
print(f"Shipping: {shared_data.get('pickup_status')}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -77,6 +77,7 @@ def flow_to_json(start):
node_types = {}
flow_nodes = {} # Keep track of flow nodes
ctr = 1
visited = set()
def get_id(n):
nonlocal ctr
@ -99,6 +100,9 @@ def flow_to_json(start):
action: Action label on the edge from parent to this node
"""
node_id = get_id(node)
if (node_id, action) in visited:
return
visited.add((node_id, action))
# Add node if not already in nodes list and not a Flow
if not any(n["id"] == node_id for n in nodes) and not isinstance(node, Flow):
@ -552,8 +556,38 @@ def create_d3_visualization(
// Update positions on each tick
simulation.on("tick", () => {
// Update links with straight lines
// Update links with curved paths for bidirectional connections
link.attr("d", d => {
// Handle self-referencing links with a water-drop shape
if (d.source === d.target) {
const nodeX = d.source.x;
const nodeY = d.source.y;
const offsetX = 40;
const offsetY = 10;
const controlOffset = 50;
// Create a water-drop shaped path
return `M ${nodeX},${nodeY - 5}
C ${nodeX + controlOffset},${nodeY - 30}
${nodeX + offsetX},${nodeY + offsetY}
${nodeX},${nodeY}`;
}
// Check if there's a reverse connection
const isReverse = data.links.some(l =>
l.source === d.target && l.target === d.source
);
// If it's part of a bidirectional connection, curve the path
if (isReverse) {
const dx = d.target.x - d.source.x;
const dy = d.target.y - d.source.y;
const dr = Math.sqrt(dx * dx + dy * dy) * 0.9;
return `M${d.source.x},${d.source.y}A${dr},${dr} 0 0,1 ${d.target.x},${d.target.y}`;
}
// For unidirectional connections, use straight lines
return `M${d.source.x},${d.source.y} L${d.target.x},${d.target.y}`;
});
@ -567,10 +601,57 @@ def create_d3_visualization(
.attr("x", d => d.x)
.attr("y", d => d.y);
// Position link labels at midpoint
linkLabel
.attr("x", d => (d.source.x + d.target.x) / 2)
.attr("y", d => (d.source.y + d.target.y) / 2);
// Position link labels with offset for bidirectional connections
linkLabel.attr("x", d => {
// Handle self-referencing links
if (d.source === d.target) {
return d.source.x + 30;
}
// Check if there's a reverse connection
const reverseLink = data.links.find(l =>
l.source === d.target && l.target === d.source
);
// If it's part of a bidirectional connection, offset the label
if (reverseLink) {
const dx = d.target.x - d.source.x;
const dy = d.target.y - d.source.y;
// Calculate perpendicular offset
const length = Math.sqrt(dx * dx + dy * dy);
const offsetX = -dy / length * 10; // Perpendicular offset
return (d.source.x + d.target.x) / 2 + offsetX;
}
// For unidirectional connections, use midpoint
return (d.source.x + d.target.x) / 2;
})
.attr("y", d => {
// Handle self-referencing links
if (d.source === d.target) {
return d.source.y;
}
// Check if there's a reverse connection
const reverseLink = data.links.find(l =>
l.source === d.target && l.target === d.source
);
// If it's part of a bidirectional connection, offset the label
if (reverseLink) {
const dx = d.target.x - d.source.x;
const dy = d.target.y - d.source.y;
// Calculate perpendicular offset
const length = Math.sqrt(dx * dx + dy * dy);
const offsetY = dx / length * 10; // Perpendicular offset
return (d.source.y + d.target.y) / 2 + offsetY;
}
// For unidirectional connections, use midpoint
return (d.source.y + d.target.y) / 2;
});
// Update group containers
groupContainers.each(function(d) {
@ -893,7 +974,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Visualize a PocketFlow flow")
parser.add_argument(
"--module", default="async_flow", help="Module containing the flow"
"--module", default="async_loop_flow", help="Module containing the flow"
)
parser.add_argument(
"--flow", default="order_pipeline", help="Flow variable name in the module"

View File

@ -156,10 +156,119 @@ my_project/
└── design.md
```
- **`requirements.txt`**: Lists the Python dependencies for the project.
```
PyYAML
pocketflow
```
- **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*.
~~~
# Design Doc: Your Project Name
> Please DON'T remove notes for AI
## Requirements
> Notes for AI: Keep it simple and clear.
> If the requirements are abstract, write concrete user stories
## Flow Design
> Notes for AI:
> 1. Consider the design patterns of agent, map-reduce, rag, and workflow. Apply them if they fit.
> 2. Present a concise, high-level description of the workflow.
### Applicable Design Pattern:
1. Map the file summary into chunks, then reduce these chunks into a final summary.
2. Agentic file finder
- *Context*: The entire summary of the file
- *Action*: Find the file
### Flow high-level Design:
1. **First Node**: This node is for ...
2. **Second Node**: This node is for ...
3. **Third Node**: This node is for ...
```mermaid
flowchart TD
firstNode[First Node] --> secondNode[Second Node]
secondNode --> thirdNode[Third Node]
```
## Utility Functions
> Notes for AI:
> 1. Understand the utility function definition thoroughly by reviewing the doc.
> 2. Include only the necessary utility functions, based on nodes in the flow.
1. **Call LLM** (`utils/call_llm.py`)
- *Input*: prompt (str)
- *Output*: response (str)
- Generally used by most nodes for LLM tasks
2. **Embedding** (`utils/get_embedding.py`)
- *Input*: str
- *Output*: a vector of 3072 floats
- Used by the second node to embed text
## Node Design
### Shared Store
> Notes for AI: Try to minimize data redundancy
The shared store structure is organized as follows:
```python
shared = {
"key": "value"
}
```
### Node Steps
> Notes for AI: Carefully decide whether to use Batch/Async Node/Flow.
1. First Node
- *Purpose*: Provide a short explanation of the nodes function
- *Type*: Decide between Regular, Batch, or Async
- *Steps*:
- *prep*: Read "key" from the shared store
- *exec*: Call the utility function
- *post*: Write "key" to the shared store
2. Second Node
...
~~~
- **`utils/`**: Contains all utility functions.
- It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`.
- Each file should also include a `main()` function to try that API call
```python
from google import genai
import os
def call_llm(prompt: str) -> str:
client = genai.Client(
api_key=os.getenv("GEMINI_API_KEY", ""),
)
model = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
response = client.models.generate_content(model=model, contents=[prompt])
return response.text
if __name__ == "__main__":
test_prompt = "Hello, how are you?"
# First call - should hit the API
print("Making call...")
response1 = call_llm(test_prompt, use_cache=False)
print(f"Response: {response1}")
```
- **`nodes.py`**: Contains all the node definitions.
```python
# nodes.py

View File

@ -33,8 +33,7 @@ Here, we provide some minimal example implementations:
from anthropic import Anthropic
client = Anthropic(api_key="YOUR_API_KEY_HERE")
r = client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=3000,
model="claude-sonnet-4-0",
messages=[
{"role": "user", "content": prompt}
]
@ -48,7 +47,7 @@ Here, we provide some minimal example implementations:
from google import genai
client = genai.Client(api_key='GEMINI_API_KEY')
response = client.models.generate_content(
model='gemini-2.0-flash-001',
model='gemini-2.5-pro',
contents=prompt
)
return response.text

0
home_todo.txt Normal file
View File

View File

@ -62,10 +62,10 @@ class AsyncNode(Node):
async def exec_fallback_async(self,prep_res,exc): raise exc
async def post_async(self,shared,prep_res,exec_res): pass
async def _exec(self,prep_res):
for i in range(self.max_retries):
for self.cur_retry in range(self.max_retries):
try: return await self.exec_async(prep_res)
except Exception as e:
if i==self.max_retries-1: return await self.exec_fallback_async(prep_res,e)
if self.cur_retry==self.max_retries-1: return await self.exec_fallback_async(prep_res,e)
if self.wait>0: await asyncio.sleep(self.wait)
async def run_async(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.")

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="pocketflow",
version="0.0.2",
version="0.0.3",
packages=find_packages(),
author="Zachary Huang",
author_email="zh2408@columbia.edu",

248
shopping_assistant.py Normal file
View File

@ -0,0 +1,248 @@
"""Natural-language assistant that routes commands to specific personal files."""
from __future__ import annotations
import re
from datetime import datetime, timedelta, date, timezone
from pathlib import Path
from typing import Any, Dict, Optional
from pocketflow import Flow, Node
from assistant_resources import RESOURCE_REGISTRY, resource_descriptions
from utils.call_llm import call_llm_json
COMMAND_PROMPT = f"""
You convert a user's short reminder into a JSON command so an agent can update files.
{resource_descriptions()}
Always respond with a JSON object using this schema:
{{
"action": "append",
"target": "<one of: {', '.join(RESOURCE_REGISTRY.keys())}>",
"entry": "<text to append or store>",
"metadata": {{ # optional object for structured info (e.g., calendar dates)
...
}}
}}
If you cannot determine a valid target, respond with:
{{"action": "unknown", "target": "", "entry": "", "metadata": {{}}}}
Make entries concise; when the target is calendar include any useful scheduling details inside metadata.
User reminder: {{command}}
""".strip()
REMOVE_PREFIXES = [
"remember to",
"please",
"don't forget to",
"dont forget to",
"remind me to",
"i need to",
"i should",
]
class InterpretCommandNode(Node):
"""Use an LLM (with fallback heuristics) to map a reminder to an actionable command."""
def prep(self, shared: Dict[str, Any]) -> str:
return shared["command"]
def exec(self, command: str) -> Dict[str, Any]:
try:
return call_llm_json(COMMAND_PROMPT.format(command=command))
except Exception:
return self._fallback(command)
def post(self, shared: Dict[str, Any], prep_result: str, exec_result: Dict[str, Any]) -> str:
shared["parsed_action"] = exec_result
return exec_result.get("action", "unknown")
@staticmethod
def _fallback(command: str) -> Dict[str, Any]:
lowered = command.lower()
target_key: Optional[str] = None
highest_score = 0
for key, resource in RESOURCE_REGISTRY.items():
score = sum(1 for kw in resource.keywords if kw in lowered)
if score > highest_score:
highest_score = score
target_key = key
if not target_key:
return {"action": "unknown", "target": "", "entry": "", "metadata": {}}
entry = InterpretCommandNode._extract_entry(command, target_key)
metadata: Dict[str, Any] = {}
if target_key == "calendar":
date_match = re.search(r"(\d{4}-\d{2}-\d{2})", command)
if date_match:
metadata["date"] = date_match.group(1)
return {
"action": "append",
"target": target_key,
"entry": entry.strip(),
"metadata": metadata,
}
@staticmethod
def _extract_entry(command: str, target_key: str) -> str:
cleaned = InterpretCommandNode._remove_prefixes(command).strip()
lowered = cleaned.lower()
if target_key == "shopping_list":
match = re.search(r"buy\s+([a-zA-Z0-9\s]+)", lowered)
if match:
return match.group(1)
if target_key in {"home_todo", "work_todo", "school_courses"}:
for verb in ("finish", "do", "complete", "send", "write", "study"):
if lowered.startswith(f"{verb} "):
return cleaned
if target_key == "ideas":
return cleaned
if target_key == "calendar":
return cleaned
return cleaned
@staticmethod
def _remove_prefixes(command: str) -> str:
lowered = command.lower()
for prefix in REMOVE_PREFIXES:
if lowered.startswith(prefix):
return command[len(prefix) :].strip()
return command
class AppendEntryNode(Node):
"""Append entries to the resource indicated by the interpreter."""
def prep(self, shared: Dict[str, Any]) -> Dict[str, Any]:
return shared["parsed_action"]
def exec(self, action: Dict[str, Any]) -> str:
target_key = action.get("target", "")
entry = (action.get("entry") or "").strip()
metadata = action.get("metadata") or {}
if target_key not in RESOURCE_REGISTRY:
raise ValueError(f"Unsupported target '{target_key}'")
if not entry:
raise ValueError("Empty entry cannot be appended")
resource = RESOURCE_REGISTRY[target_key]
if resource.kind == "text":
self._append_to_text(resource.path, entry)
elif resource.kind == "calendar":
self._append_to_calendar(resource.path, entry, metadata)
else:
raise ValueError(f"Unknown resource kind '{resource.kind}'")
return f"{target_key}:{entry}"
def post(self, shared: Dict[str, Any], prep_result: Dict[str, Any], exec_result: str) -> None:
shared.setdefault("updates", []).append(exec_result)
@staticmethod
def _append_to_text(path: Path, entry: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as fh:
fh.write(f"{entry}\n")
@staticmethod
def _append_to_calendar(path: Path, summary: str, metadata: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
if not path.exists():
path.write_text("BEGIN:VCALENDAR\nVERSION:2.0\nPRODID:-//PocketFlow//Assistant//EN\nEND:VCALENDAR\n", encoding="utf-8")
content = path.read_text(encoding="utf-8").splitlines()
if not content or content[-1].strip() != "END:VCALENDAR":
content.append("END:VCALENDAR")
event_lines = AppendEntryNode._build_event(summary, metadata)
# insert before END:VCALENDAR
end_index = len(content) - 1
content = content[:end_index] + event_lines + content[end_index:]
path.write_text("\n".join(content) + "\n", encoding="utf-8")
@staticmethod
def _build_event(summary: str, metadata: Dict[str, Any]) -> list[str]:
now = datetime.now(timezone.utc)
dtstamp = now.strftime("%Y%m%dT%H%M%SZ")
dtstart, dtend = AppendEntryNode._resolve_calendar_times(metadata, now)
description = metadata.get("notes") or summary
return [
"BEGIN:VEVENT",
f"DTSTAMP:{dtstamp}",
f"SUMMARY:{summary}",
f"DTSTART;VALUE=DATE:{dtstart}",
f"DTEND;VALUE=DATE:{dtend}",
f"DESCRIPTION:{description}",
"END:VEVENT",
]
@staticmethod
def _resolve_calendar_times(metadata: Dict[str, Any], fallback: datetime) -> tuple[str, str]:
start = AppendEntryNode._parse_date(metadata.get("date") or metadata.get("start"))
if start is None:
start = fallback.date()
end = AppendEntryNode._parse_date(metadata.get("end"))
if end is None:
end_date = start + timedelta(days=1)
else:
end_date = end
return start.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
@staticmethod
def _parse_date(value: Any) -> Optional[date]:
if not value:
return None
if isinstance(value, datetime):
return value.date()
try:
# Try ISO date first
dt = datetime.fromisoformat(str(value))
return dt.date()
except ValueError:
patterns = ["%Y-%m-%d", "%m/%d/%Y", "%b %d %Y"]
text = str(value)
for pattern in patterns:
try:
return datetime.strptime(text, pattern).date()
except ValueError:
continue
return None
class UnknownCommandNode(Node):
def prep(self, shared: Dict[str, Any]) -> Dict[str, Any]:
return shared.get("parsed_action", {})
def exec(self, action: Dict[str, Any]) -> None:
raise ValueError(f"Unsupported command: {action}")
def build_flow() -> Flow:
interpreter = InterpretCommandNode()
append_entry = AppendEntryNode()
unknown = UnknownCommandNode()
flow = Flow(start=interpreter)
interpreter - "append" >> append_entry
interpreter - "unknown" >> unknown
return flow
def handle_command(command: str) -> Dict[str, Any]:
shared: Dict[str, Any] = {"command": command}
flow = build_flow()
try:
flow.run(shared)
except ValueError as exc:
shared["error"] = str(exc)
return shared
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Process a reminder and update the relevant file")
parser.add_argument("command", help="Reminder text, e.g. 'email client tomorrow about proposal'")
args = parser.parse_args()
result = handle_command(args.command)
print(result)

2
shopping_list.txt Normal file
View File

@ -0,0 +1,2 @@
apples
cheese

54
utils/call_llm.py Normal file
View File

@ -0,0 +1,54 @@
"""Minimal OpenAI Chat Completions helper used by Pocket Flow demos."""
from __future__ import annotations
import json
from typing import Iterable, Optional
try:
from openai import OpenAI
except ImportError: # pragma: no cover - optional dependency
OpenAI = None # type: ignore[misc,assignment]
try:
from llm_secrets import OPENAI_API_KEY
except ImportError as exc:
raise ImportError("Create llm_secrets.py with OPENAI_API_KEY before calling call_llm") from exc
_client: Optional[OpenAI] = None
def _get_client() -> OpenAI:
global _client
if OpenAI is None: # type: ignore[truthy-function]
raise RuntimeError("Install the 'openai' package to use call_llm")
if _client is None:
if not OPENAI_API_KEY or OPENAI_API_KEY == "REPLACE_WITH_YOUR_KEY":
raise ValueError("Set OPENAI_API_KEY in llm_secrets.py before calling call_llm")
_client = OpenAI(api_key=OPENAI_API_KEY)
return _client
def call_llm(messages: Iterable[dict] | str, model: str = "gpt-4o-mini") -> str:
"""Send a prompt or list of chat messages to OpenAI and return the text reply."""
client = _get_client()
chat_messages = (
[{"role": "user", "content": messages}]
if isinstance(messages, str)
else list(messages)
)
response = client.chat.completions.create(model=model, messages=chat_messages)
message = response.choices[0].message.content or ""
return message.strip()
def call_llm_json(messages: Iterable[dict] | str, model: str = "gpt-4o-mini") -> dict:
"""Convenience wrapper that expects a JSON object in the response."""
raw = call_llm(messages, model=model)
start = raw.find("{")
end = raw.rfind("}")
if start == -1 or end == -1:
raise ValueError(f"LLM response does not contain JSON: {raw}")
return json.loads(raw[start : end + 1])
__all__ = ["call_llm", "call_llm_json"]

1
work_todo.txt Normal file
View File

@ -0,0 +1 @@
gavilan: call ted

2
x Normal file
View File

@ -0,0 +1,2 @@
#!/usr/bin/env bash
python3 "$(dirname "$0")/shopping_assistant.py" "$@"