From 20366246db34c959bb4c732b0df855e6b3ecfba8 Mon Sep 17 00:00:00 2001 From: Peter Howell Date: Fri, 29 Aug 2025 20:19:44 +0000 Subject: [PATCH] db --- gpt.py | 175 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 154 insertions(+), 21 deletions(-) diff --git a/gpt.py b/gpt.py index 8f1c851..43d3254 100644 --- a/gpt.py +++ b/gpt.py @@ -109,7 +109,7 @@ def fetch_useful_info(save_attachments=True, folder_name='useful info ref'): prefix_re = re.compile(r'^\s*(re|fw|fwd|aw|sv|vb|tr|wg)\s*:\s*', re.I) # common locales too bracket_tag_re = re.compile(r'^\s*(\[[^\]]+\]\s*)+', re.I) - def normalize_subject(s: str) -> str: + def normalize_subject(s): if not s: return "(no subject)" s = s.strip() @@ -122,7 +122,7 @@ def fetch_useful_info(save_attachments=True, folder_name='useful info ref'): s = re.sub(r'\s+', ' ', s).strip() return s or "(no subject)" - def safe_name(s: str) -> str: + def safe_name(s): # reasonable Windows-safe folder name for a subject s = re.sub(r'[<>:"/\\|?*\x00-\x1F]', '_', s) return s[:120] @@ -253,8 +253,6 @@ def fetch_useful_info(save_attachments=True, folder_name='useful info ref'): def process_useful_info(): import re from pathlib import Path - from typing import Callable, List, Dict, Optional - from typing import Iterable # Optional import heavy libs only when needed try: @@ -273,7 +271,7 @@ def process_useful_info(): HEADER_RE = re.compile(r'\r?\n###\s(.*)\r?\n') # your pattern, CRLF-safe COUNT_RE = re.compile(r'^(?P.*?)\s+—\s+(?P\d+)\s+message', re.I) - def parse_groups(text: str) -> List[Dict]: + def parse_groups(text): """ Return a list of groups found in the log file. Each group is a dict: {header, subject, count, content} @@ -291,7 +289,7 @@ def process_useful_info(): # Try to extract subject and count if present subject = header - count: Optional[int] = None + count = None cm = COUNT_RE.search(header) if cm: subject = cm.group('subject').strip() @@ -308,12 +306,9 @@ def process_useful_info(): }) return groups - def for_each_group( - log_path: str = "cache/email_usefulinfo_sorted.txt", - f: Callable[[int, Dict], None] = lambda idx, g: None, - start: int = 1, - count: int = -1 - ) -> None: + from usefulinfo_db import init_schema, insert_summary_and_events + + def for_each_group(log_path="cache/email_usefulinfo_sorted.txt", f=lambda idx, g: None, start=1, count=-1): """ Read the grouped log, split into groups, and call f(index, group) on each. start: 1-based index to begin processing (useful for resuming). @@ -335,8 +330,8 @@ def process_useful_info(): if count != -1 and done >= count: return - def _parse_attachment_paths(block: str) -> List[str]: - paths: List[str] = [] + def _parse_attachment_paths(block): + paths = [] for line in block.splitlines(): if line.startswith("Attachments:"): # After the colon, comma-separated file paths @@ -353,13 +348,13 @@ def process_useful_info(): uniq.append(p) return uniq - def _safe_read_textfile(p: Path, max_chars: int = 8000) -> str: + def _safe_read_textfile(p, max_chars=8000): try: return p.read_text(encoding="utf-8", errors="replace")[:max_chars] except Exception: return "" - def _extract_pdf_text(p: Path, max_pages: int = 10, max_chars: int = 12000) -> str: + def _extract_pdf_text(p, max_pages=10, max_chars=12000): if not PyPDF2: return "" text = [] @@ -376,7 +371,7 @@ def process_useful_info(): return "" return "\n".join(text)[:max_chars] - def _extract_docx_text(p: Path, max_chars: int = 12000) -> str: + def _extract_docx_text(p, max_chars=12000): if not docx: return "" try: @@ -386,12 +381,12 @@ def process_useful_info(): except Exception: return "" - def _extract_pptx_text(p: Path, max_chars: int = 12000) -> str: + def _extract_pptx_text(p, max_chars=12000): if not Presentation: return "" try: pres = Presentation(str(p)) - chunks: List[str] = [] + chunks = [] for slide in pres.slides: for shape in slide.shapes: try: @@ -406,8 +401,8 @@ def process_useful_info(): except Exception: return "" - def _extract_attachment_text(paths: Iterable[str]) -> str: - out_chunks: List[str] = [] + def _extract_attachment_text(paths): + out_chunks = [] for raw in paths: p = Path(raw) # Ensure relative paths still resolve from repo root @@ -432,6 +427,134 @@ def process_useful_info(): OUT_JSONL = Path("cache/useful_info_summaries.jsonl") + # --- PostgreSQL schema + insert helpers (via localcache2.db) --- + def _pg_init_schema(): + try: + from localcache2 import db as _db + CON, CURSOR = _db() + try: + CURSOR.execute( + """ + CREATE TABLE IF NOT EXISTS useful_info_summary ( + id BIGSERIAL PRIMARY KEY, + summary_hash CHAR(64) UNIQUE, + grp_index INTEGER, + subject TEXT, + thread_count INTEGER, + source TEXT, + date_label TEXT, + tags_json JSONB, + short_text TEXT, + summary_text TEXT, + attachments_json JSONB, + raw_json TEXT, + created_at TIMESTAMPTZ DEFAULT now() + ); + """ + ) + CURSOR.execute( + """ + CREATE TABLE IF NOT EXISTS useful_info_event ( + id BIGSERIAL PRIMARY KEY, + summary_id BIGINT NOT NULL REFERENCES useful_info_summary(id) ON DELETE CASCADE, + dt TEXT, + length TEXT, + title TEXT, + description TEXT, + created_at TIMESTAMPTZ DEFAULT now() + ); + """ + ) + CON.commit() + finally: + CURSOR.close() + CON.close() + except Exception as e: + print("[warn] could not init PostgreSQL schema:", e) + + def _sha256(s): + import hashlib + return hashlib.sha256(s.encode('utf-8', 'ignore')).hexdigest() + + def _pg_insert_summary_and_events(idx, subject, count, attachments, parsed, raw): + try: + from localcache2 import db as _db + import json as _json + CON, CURSOR = _db() + try: + source = None + date_label = None + tags = None + short_text = '' + summary_text = '' + events = [] + if parsed: + source = parsed.get('source') + date_label = parsed.get('date') + tags = parsed.get('tags') + short_text = parsed.get('short') or '' + summary_text = parsed.get('summary') or '' + events = parsed.get('events') or [] + + s_hash = _sha256((subject or '') + "\n" + short_text + "\n" + summary_text) + CURSOR.execute( + """ + INSERT INTO useful_info_summary + (summary_hash, grp_index, subject, thread_count, source, date_label, + tags_json, short_text, summary_text, attachments_json, raw_json) + VALUES + (%s, %s, %s, %s, %s, %s, + CAST(%s AS JSONB), %s, %s, CAST(%s AS JSONB), %s) + ON CONFLICT (summary_hash) + DO UPDATE SET grp_index = EXCLUDED.grp_index + RETURNING id + """, + ( + s_hash, + idx, + subject, + count, + source, + date_label, + _json.dumps(tags) if tags is not None else None, + short_text, + summary_text, + _json.dumps(attachments) if attachments else None, + raw, + ), + ) + row = CURSOR.fetchone() + summary_id = row[0] if row else None + + if summary_id and isinstance(events, list): + for e in events: + try: + CURSOR.execute( + """ + INSERT INTO useful_info_event + (summary_id, dt, length, title, description) + VALUES (%s, %s, %s, %s, %s) + """, + ( + summary_id, + (e or {}).get('dt'), + (e or {}).get('length'), + (e or {}).get('title'), + (e or {}).get('description'), + ), + ) + except Exception: + pass + CON.commit() + finally: + CURSOR.close() + CON.close() + except Exception as e: + print("[warn] PostgreSQL insert failed:", e) + + # Ensure DB schema exists + _pg_init_schema() + def demo_f(idx, g): print(f"[{idx}] {g['subject']} (count: {g['count']})") content = g['content'] @@ -457,6 +580,16 @@ def process_useful_info(): with open(OUT_JSONL, "a", encoding="utf-8") as outf: outf.write(json.dumps(record, ensure_ascii=False) + "\n") + # Also persist to PostgreSQL using localcache2 + if 'summary' in record: + _pg_insert_summary_and_events( + idx, record.get('subject'), record.get('count'), attach_paths, record['summary'], None + ) + else: + _pg_insert_summary_and_events( + idx, record.get('subject'), record.get('count'), attach_paths, None, record.get('summary_raw') + ) + for_each_group( log_path="cache/email_usefulinfo_sorted.txt", f=demo_f,