pocketflow/shopping_assistant.py

249 lines
8.6 KiB
Python

"""Natural-language assistant that routes commands to specific personal files."""
from __future__ import annotations
import re
from datetime import datetime, timedelta, date, timezone
from pathlib import Path
from typing import Any, Dict, Optional
from pocketflow import Flow, Node
from assistant_resources import RESOURCE_REGISTRY, resource_descriptions
from utils.call_llm import call_llm_json
COMMAND_PROMPT = f"""
You convert a user's short reminder into a JSON command so an agent can update files.
{resource_descriptions()}
Always respond with a JSON object using this schema:
{{
"action": "append",
"target": "<one of: {', '.join(RESOURCE_REGISTRY.keys())}>",
"entry": "<text to append or store>",
"metadata": {{ # optional object for structured info (e.g., calendar dates)
...
}}
}}
If you cannot determine a valid target, respond with:
{{"action": "unknown", "target": "", "entry": "", "metadata": {{}}}}
Make entries concise; when the target is calendar include any useful scheduling details inside metadata.
User reminder: {{command}}
""".strip()
REMOVE_PREFIXES = [
"remember to",
"please",
"don't forget to",
"dont forget to",
"remind me to",
"i need to",
"i should",
]
class InterpretCommandNode(Node):
"""Use an LLM (with fallback heuristics) to map a reminder to an actionable command."""
def prep(self, shared: Dict[str, Any]) -> str:
return shared["command"]
def exec(self, command: str) -> Dict[str, Any]:
try:
return call_llm_json(COMMAND_PROMPT.format(command=command))
except Exception:
return self._fallback(command)
def post(self, shared: Dict[str, Any], prep_result: str, exec_result: Dict[str, Any]) -> str:
shared["parsed_action"] = exec_result
return exec_result.get("action", "unknown")
@staticmethod
def _fallback(command: str) -> Dict[str, Any]:
lowered = command.lower()
target_key: Optional[str] = None
highest_score = 0
for key, resource in RESOURCE_REGISTRY.items():
score = sum(1 for kw in resource.keywords if kw in lowered)
if score > highest_score:
highest_score = score
target_key = key
if not target_key:
return {"action": "unknown", "target": "", "entry": "", "metadata": {}}
entry = InterpretCommandNode._extract_entry(command, target_key)
metadata: Dict[str, Any] = {}
if target_key == "calendar":
date_match = re.search(r"(\d{4}-\d{2}-\d{2})", command)
if date_match:
metadata["date"] = date_match.group(1)
return {
"action": "append",
"target": target_key,
"entry": entry.strip(),
"metadata": metadata,
}
@staticmethod
def _extract_entry(command: str, target_key: str) -> str:
cleaned = InterpretCommandNode._remove_prefixes(command).strip()
lowered = cleaned.lower()
if target_key == "shopping_list":
match = re.search(r"buy\s+([a-zA-Z0-9\s]+)", lowered)
if match:
return match.group(1)
if target_key in {"home_todo", "work_todo", "school_courses"}:
for verb in ("finish", "do", "complete", "send", "write", "study"):
if lowered.startswith(f"{verb} "):
return cleaned
if target_key == "ideas":
return cleaned
if target_key == "calendar":
return cleaned
return cleaned
@staticmethod
def _remove_prefixes(command: str) -> str:
lowered = command.lower()
for prefix in REMOVE_PREFIXES:
if lowered.startswith(prefix):
return command[len(prefix) :].strip()
return command
class AppendEntryNode(Node):
"""Append entries to the resource indicated by the interpreter."""
def prep(self, shared: Dict[str, Any]) -> Dict[str, Any]:
return shared["parsed_action"]
def exec(self, action: Dict[str, Any]) -> str:
target_key = action.get("target", "")
entry = (action.get("entry") or "").strip()
metadata = action.get("metadata") or {}
if target_key not in RESOURCE_REGISTRY:
raise ValueError(f"Unsupported target '{target_key}'")
if not entry:
raise ValueError("Empty entry cannot be appended")
resource = RESOURCE_REGISTRY[target_key]
if resource.kind == "text":
self._append_to_text(resource.path, entry)
elif resource.kind == "calendar":
self._append_to_calendar(resource.path, entry, metadata)
else:
raise ValueError(f"Unknown resource kind '{resource.kind}'")
return f"{target_key}:{entry}"
def post(self, shared: Dict[str, Any], prep_result: Dict[str, Any], exec_result: str) -> None:
shared.setdefault("updates", []).append(exec_result)
@staticmethod
def _append_to_text(path: Path, entry: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as fh:
fh.write(f"{entry}\n")
@staticmethod
def _append_to_calendar(path: Path, summary: str, metadata: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
if not path.exists():
path.write_text("BEGIN:VCALENDAR\nVERSION:2.0\nPRODID:-//PocketFlow//Assistant//EN\nEND:VCALENDAR\n", encoding="utf-8")
content = path.read_text(encoding="utf-8").splitlines()
if not content or content[-1].strip() != "END:VCALENDAR":
content.append("END:VCALENDAR")
event_lines = AppendEntryNode._build_event(summary, metadata)
# insert before END:VCALENDAR
end_index = len(content) - 1
content = content[:end_index] + event_lines + content[end_index:]
path.write_text("\n".join(content) + "\n", encoding="utf-8")
@staticmethod
def _build_event(summary: str, metadata: Dict[str, Any]) -> list[str]:
now = datetime.now(timezone.utc)
dtstamp = now.strftime("%Y%m%dT%H%M%SZ")
dtstart, dtend = AppendEntryNode._resolve_calendar_times(metadata, now)
description = metadata.get("notes") or summary
return [
"BEGIN:VEVENT",
f"DTSTAMP:{dtstamp}",
f"SUMMARY:{summary}",
f"DTSTART;VALUE=DATE:{dtstart}",
f"DTEND;VALUE=DATE:{dtend}",
f"DESCRIPTION:{description}",
"END:VEVENT",
]
@staticmethod
def _resolve_calendar_times(metadata: Dict[str, Any], fallback: datetime) -> tuple[str, str]:
start = AppendEntryNode._parse_date(metadata.get("date") or metadata.get("start"))
if start is None:
start = fallback.date()
end = AppendEntryNode._parse_date(metadata.get("end"))
if end is None:
end_date = start + timedelta(days=1)
else:
end_date = end
return start.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
@staticmethod
def _parse_date(value: Any) -> Optional[date]:
if not value:
return None
if isinstance(value, datetime):
return value.date()
try:
# Try ISO date first
dt = datetime.fromisoformat(str(value))
return dt.date()
except ValueError:
patterns = ["%Y-%m-%d", "%m/%d/%Y", "%b %d %Y"]
text = str(value)
for pattern in patterns:
try:
return datetime.strptime(text, pattern).date()
except ValueError:
continue
return None
class UnknownCommandNode(Node):
def prep(self, shared: Dict[str, Any]) -> Dict[str, Any]:
return shared.get("parsed_action", {})
def exec(self, action: Dict[str, Any]) -> None:
raise ValueError(f"Unsupported command: {action}")
def build_flow() -> Flow:
interpreter = InterpretCommandNode()
append_entry = AppendEntryNode()
unknown = UnknownCommandNode()
flow = Flow(start=interpreter)
interpreter - "append" >> append_entry
interpreter - "unknown" >> unknown
return flow
def handle_command(command: str) -> Dict[str, Any]:
shared: Dict[str, Any] = {"command": command}
flow = build_flow()
try:
flow.run(shared)
except ValueError as exc:
shared["error"] = str(exc)
return shared
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Process a reminder and update the relevant file")
parser.add_argument("command", help="Reminder text, e.g. 'email client tomorrow about proposal'")
args = parser.parse_args()
result = handle_command(args.command)
print(result)