#!/usr/bin/env python3 import datetime as dt import json import os import re import sys import html import unicodedata try: import pymysql except ImportError: # pragma: no cover - depends on local env pymysql = None try: from bs4 import BeautifulSoup except ImportError: # pragma: no cover - depends on local env BeautifulSoup = None CONFIG_FILE = "peter_db.php" def load_db_config(): if not os.path.exists(CONFIG_FILE): raise FileNotFoundError(f"Missing {CONFIG_FILE}") with open(CONFIG_FILE, "r", encoding="utf-8") as fh: text = fh.read() def pick(key): match = re.search(rf"\\$this->{key}\\s*=\\s*'([^']*)'", text) return match.group(1) if match else None cfg = { "host": pick("DBServer"), "user": pick("DBUser"), "password": pick("DBPass"), "database": pick("DBName"), } missing = [k for k, v in cfg.items() if not v] if missing: raise ValueError(f"Missing DB config values: {', '.join(missing)}") return cfg def get_conn(): from secretdir import dbhost, dbuser,dbpassword if pymysql is None: raise RuntimeError("pymysql is not installed. Install it with: pip install pymysql") #cfg = load_db_config() return pymysql.connect( host=dbhost, user=dbuser, password=dbpassword, database="db", charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, ) def parse_day(input_str): input_str = input_str.strip() for fmt in ("%m/%d/%y", "%m/%d/%Y", "%Y-%m-%d"): try: d = dt.datetime.strptime(input_str, fmt).date() if d.year < 100: d = d.replace(year=2000 + d.year) return d except ValueError: continue raise ValueError("Expected a date like 1/22/26 or 2026-01-22") def json_filename(day): return f"sessions_{day.strftime('%Y-%m-%d')}.json" def normalize_value(val): if isinstance(val, (dt.datetime, dt.date)): if isinstance(val, dt.date) and not isinstance(val, dt.datetime): return val.strftime("%Y-%m-%d") return val.strftime("%Y-%m-%d %H:%M:%S") return val def sql_literal(val): if val is None: return "NULL" if isinstance(val, (dt.datetime, dt.date)): if isinstance(val, dt.date) and not isinstance(val, dt.datetime): return f"'{val.strftime('%Y-%m-%d')}'" return f"'{val.strftime('%Y-%m-%d %H:%M:%S')}'" if isinstance(val, (int, float)) and not isinstance(val, bool): return str(val) text = str(val) if pymysql is not None: text = pymysql.converters.escape_string(text) return "'" + text + "'" def clean_desc_for_export(text): if text is None: return None if BeautifulSoup is None: raise RuntimeError("beautifulsoup4 is not installed. Install it with: pip install beautifulsoup4") allowed = ["b", "i", "em", "strong", "ul", "ol", "li", "p", "br"] s = unicodedata.normalize("NFKD", str(text)) # Remove script/style blocks entirely. s = re.sub(r"(?is)<(script|style)[^>]*>.*?", "", s) soup = BeautifulSoup(s, "html.parser") # Drop disallowed tags, keep allowed tags without attributes. for tag in soup.find_all(True): if tag.name not in allowed: tag.unwrap() else: tag.attrs = {} s = "".join(str(x) for x in soup.contents) s = html.unescape(s) s = s.replace("\u00a0", " ") s = s.replace("\u200b", "") '''s = re.sub(r"[ \\t\\f\\v]+", " ", s) s = re.sub(r"\\s*
\\s*", "
", s) s = re.sub(r"(?:
\\s*){3,}", "

", s) s = re.sub(r"\\s*

\\s*", "

", s) s = re.sub(r"\\s*

\\s*", "

", s)''' return s.strip() def db_dump_day(): day = parse_day(input("Enter day (e.g., 1/22/26): ")) fname = json_filename(day) conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM conf_sessions WHERE DATE(starttime) = %s ORDER BY starttime, id", (day.strftime("%Y-%m-%d"),), ) rows = cur.fetchall() finally: conn.close() with open(fname, "w", encoding="utf-8") as fh: json.dump(rows, fh, indent=2, sort_keys=True, default=str) print(f"Wrote {len(rows)} session(s) to {fname}") def db_reload_day_json(): day = parse_day(input("Enter day (e.g., 1/22/26): ")) fname = json_filename(day) if not os.path.exists(fname): raise FileNotFoundError(f"Missing {fname}") with open(fname, "r", encoding="utf-8") as fh: rows = json.load(fh) if not isinstance(rows, list): raise ValueError("JSON file must be a list of session rows") conn = get_conn() updated = 0 try: with conn.cursor() as cur: for row in rows: if "id" not in row: continue ses_id = row["id"] cur.execute("SELECT * FROM conf_sessions WHERE id = %s", (ses_id,)) current = cur.fetchone() if not current: print(f"Skipping missing id {ses_id}") continue changes = {} for key, new_val in row.items(): if key == "id" or key not in current: continue cur_val = current[key] norm_new = normalize_value(new_val) norm_cur = normalize_value(cur_val) if norm_new != norm_cur: changes[key] = new_val if changes: set_sql = ", ".join([f"`{k}`=%s" for k in changes.keys()]) params = list(changes.values()) + [ses_id] cur.execute(f"UPDATE conf_sessions SET {set_sql} WHERE id=%s", params) updated += 1 conn.commit() finally: conn.close() print(f"Updated {updated} session(s) from {fname}") def db_export_sessions_sql(): if pymysql is None: raise RuntimeError("pymysql is not installed. Install it with: pip install pymysql") conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT * FROM conf_sessions WHERE id > 1462 ORDER BY id") rows = cur.fetchall() columns = [col[0] for col in cur.description] finally: conn.close() if not rows: print("No sessions found with id > 1462") return if "id" in columns: columns = [c for c in columns if c != "id"] os.makedirs("data", exist_ok=True) out_path = os.path.join("data", "sessions.sql") with open(out_path, "w", encoding="utf-8") as fh: for row in rows: values = [] for col in columns: val = row.get(col) if col == "desc": val = clean_desc_for_export(val) values.append(sql_literal(val)) col_sql = ", ".join([f"`{c}`" for c in columns]) val_sql = ", ".join(values) fh.write(f"INSERT INTO conf_sessions ({col_sql}) VALUES ({val_sql});\n") print(f"Wrote {len(rows)} INSERT statements to {out_path}") def main(): actions = [ ("Dump day's sessions from db", db_dump_day), ("Reload day's session from json", db_reload_day_json), ("Export sessions id>1462 to SQL inserts", db_export_sessions_sql), ("Reload ops.py", "__RELOAD__"), ("Quit", None), ] while True: print("\nOps Menu") for i, (label, _) in enumerate(actions, 1): print(f"{i}. {label}") choice = input("Choose an option: ").strip() if not choice.isdigit(): print("Please enter a number.") continue idx = int(choice) - 1 if idx < 0 or idx >= len(actions): print("Invalid choice.") continue label, action = actions[idx] if action is None: return if action == "__RELOAD__": os.execv(sys.executable, [sys.executable] + sys.argv) try: action() except Exception as exc: print(f"Error: {exc}") if __name__ == "__main__": main()