"""Natural-language assistant that routes commands to specific personal files.""" from __future__ import annotations import re from datetime import datetime, timedelta, date, timezone from pathlib import Path from typing import Any, Dict, Optional from pocketflow import Flow, Node from assistant_resources import RESOURCE_REGISTRY, resource_descriptions from utils.call_llm import call_llm_json COMMAND_PROMPT = f""" You convert a user's short reminder into a JSON command so an agent can update files. {resource_descriptions()} Always respond with a JSON object using this schema: {{ "action": "append", "target": "", "entry": "", "metadata": {{ # optional object for structured info (e.g., calendar dates) ... }} }} If you cannot determine a valid target, respond with: {{"action": "unknown", "target": "", "entry": "", "metadata": {{}}}} Make entries concise; when the target is calendar include any useful scheduling details inside metadata. User reminder: {{command}} """.strip() REMOVE_PREFIXES = [ "remember to", "please", "don't forget to", "dont forget to", "remind me to", "i need to", "i should", ] class InterpretCommandNode(Node): """Use an LLM (with fallback heuristics) to map a reminder to an actionable command.""" def prep(self, shared: Dict[str, Any]) -> str: return shared["command"] def exec(self, command: str) -> Dict[str, Any]: try: return call_llm_json(COMMAND_PROMPT.format(command=command)) except Exception: return self._fallback(command) def post(self, shared: Dict[str, Any], prep_result: str, exec_result: Dict[str, Any]) -> str: shared["parsed_action"] = exec_result return exec_result.get("action", "unknown") @staticmethod def _fallback(command: str) -> Dict[str, Any]: lowered = command.lower() target_key: Optional[str] = None highest_score = 0 for key, resource in RESOURCE_REGISTRY.items(): score = sum(1 for kw in resource.keywords if kw in lowered) if score > highest_score: highest_score = score target_key = key if not target_key: return {"action": "unknown", "target": "", "entry": "", "metadata": {}} entry = InterpretCommandNode._extract_entry(command, target_key) metadata: Dict[str, Any] = {} if target_key == "calendar": date_match = re.search(r"(\d{4}-\d{2}-\d{2})", command) if date_match: metadata["date"] = date_match.group(1) return { "action": "append", "target": target_key, "entry": entry.strip(), "metadata": metadata, } @staticmethod def _extract_entry(command: str, target_key: str) -> str: cleaned = InterpretCommandNode._remove_prefixes(command).strip() lowered = cleaned.lower() if target_key == "shopping_list": match = re.search(r"buy\s+([a-zA-Z0-9\s]+)", lowered) if match: return match.group(1) if target_key in {"home_todo", "work_todo", "school_courses"}: for verb in ("finish", "do", "complete", "send", "write", "study"): if lowered.startswith(f"{verb} "): return cleaned if target_key == "ideas": return cleaned if target_key == "calendar": return cleaned return cleaned @staticmethod def _remove_prefixes(command: str) -> str: lowered = command.lower() for prefix in REMOVE_PREFIXES: if lowered.startswith(prefix): return command[len(prefix) :].strip() return command class AppendEntryNode(Node): """Append entries to the resource indicated by the interpreter.""" def prep(self, shared: Dict[str, Any]) -> Dict[str, Any]: return shared["parsed_action"] def exec(self, action: Dict[str, Any]) -> str: target_key = action.get("target", "") entry = (action.get("entry") or "").strip() metadata = action.get("metadata") or {} if target_key not in RESOURCE_REGISTRY: raise ValueError(f"Unsupported target '{target_key}'") if not entry: raise ValueError("Empty entry cannot be appended") resource = RESOURCE_REGISTRY[target_key] if resource.kind == "text": self._append_to_text(resource.path, entry) elif resource.kind == "calendar": self._append_to_calendar(resource.path, entry, metadata) else: raise ValueError(f"Unknown resource kind '{resource.kind}'") return f"{target_key}:{entry}" def post(self, shared: Dict[str, Any], prep_result: Dict[str, Any], exec_result: str) -> None: shared.setdefault("updates", []).append(exec_result) @staticmethod def _append_to_text(path: Path, entry: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as fh: fh.write(f"{entry}\n") @staticmethod def _append_to_calendar(path: Path, summary: str, metadata: Dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) if not path.exists(): path.write_text("BEGIN:VCALENDAR\nVERSION:2.0\nPRODID:-//PocketFlow//Assistant//EN\nEND:VCALENDAR\n", encoding="utf-8") content = path.read_text(encoding="utf-8").splitlines() if not content or content[-1].strip() != "END:VCALENDAR": content.append("END:VCALENDAR") event_lines = AppendEntryNode._build_event(summary, metadata) # insert before END:VCALENDAR end_index = len(content) - 1 content = content[:end_index] + event_lines + content[end_index:] path.write_text("\n".join(content) + "\n", encoding="utf-8") @staticmethod def _build_event(summary: str, metadata: Dict[str, Any]) -> list[str]: now = datetime.now(timezone.utc) dtstamp = now.strftime("%Y%m%dT%H%M%SZ") dtstart, dtend = AppendEntryNode._resolve_calendar_times(metadata, now) description = metadata.get("notes") or summary return [ "BEGIN:VEVENT", f"DTSTAMP:{dtstamp}", f"SUMMARY:{summary}", f"DTSTART;VALUE=DATE:{dtstart}", f"DTEND;VALUE=DATE:{dtend}", f"DESCRIPTION:{description}", "END:VEVENT", ] @staticmethod def _resolve_calendar_times(metadata: Dict[str, Any], fallback: datetime) -> tuple[str, str]: start = AppendEntryNode._parse_date(metadata.get("date") or metadata.get("start")) if start is None: start = fallback.date() end = AppendEntryNode._parse_date(metadata.get("end")) if end is None: end_date = start + timedelta(days=1) else: end_date = end return start.strftime("%Y%m%d"), end_date.strftime("%Y%m%d") @staticmethod def _parse_date(value: Any) -> Optional[date]: if not value: return None if isinstance(value, datetime): return value.date() try: # Try ISO date first dt = datetime.fromisoformat(str(value)) return dt.date() except ValueError: patterns = ["%Y-%m-%d", "%m/%d/%Y", "%b %d %Y"] text = str(value) for pattern in patterns: try: return datetime.strptime(text, pattern).date() except ValueError: continue return None class UnknownCommandNode(Node): def prep(self, shared: Dict[str, Any]) -> Dict[str, Any]: return shared.get("parsed_action", {}) def exec(self, action: Dict[str, Any]) -> None: raise ValueError(f"Unsupported command: {action}") def build_flow() -> Flow: interpreter = InterpretCommandNode() append_entry = AppendEntryNode() unknown = UnknownCommandNode() flow = Flow(start=interpreter) interpreter - "append" >> append_entry interpreter - "unknown" >> unknown return flow def handle_command(command: str) -> Dict[str, Any]: shared: Dict[str, Any] = {"command": command} flow = build_flow() try: flow.run(shared) except ValueError as exc: shared["error"] = str(exc) return shared if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Process a reminder and update the relevant file") parser.add_argument("command", help="Reminder text, e.g. 'email client tomorrow about proposal'") args = parser.parse_args() result = handle_command(args.command) print(result)