252 lines
8.1 KiB
Python
252 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import html
|
|
import unicodedata
|
|
|
|
try:
|
|
import pymysql
|
|
except ImportError: # pragma: no cover - depends on local env
|
|
pymysql = None
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
except ImportError: # pragma: no cover - depends on local env
|
|
BeautifulSoup = None
|
|
|
|
|
|
CONFIG_FILE = "peter_db.php"
|
|
|
|
|
|
def load_db_config():
|
|
if not os.path.exists(CONFIG_FILE):
|
|
raise FileNotFoundError(f"Missing {CONFIG_FILE}")
|
|
with open(CONFIG_FILE, "r", encoding="utf-8") as fh:
|
|
text = fh.read()
|
|
def pick(key):
|
|
match = re.search(rf"\\$this->{key}\\s*=\\s*'([^']*)'", text)
|
|
return match.group(1) if match else None
|
|
cfg = {
|
|
"host": pick("DBServer"),
|
|
"user": pick("DBUser"),
|
|
"password": pick("DBPass"),
|
|
"database": pick("DBName"),
|
|
}
|
|
missing = [k for k, v in cfg.items() if not v]
|
|
if missing:
|
|
raise ValueError(f"Missing DB config values: {', '.join(missing)}")
|
|
return cfg
|
|
|
|
|
|
def get_conn():
|
|
from secretdir import dbhost, dbuser,dbpassword
|
|
if pymysql is None:
|
|
raise RuntimeError("pymysql is not installed. Install it with: pip install pymysql")
|
|
#cfg = load_db_config()
|
|
return pymysql.connect(
|
|
host=dbhost,
|
|
user=dbuser,
|
|
password=dbpassword,
|
|
database="db",
|
|
charset="utf8mb4",
|
|
cursorclass=pymysql.cursors.DictCursor,
|
|
)
|
|
|
|
|
|
def parse_day(input_str):
|
|
input_str = input_str.strip()
|
|
for fmt in ("%m/%d/%y", "%m/%d/%Y", "%Y-%m-%d"):
|
|
try:
|
|
d = dt.datetime.strptime(input_str, fmt).date()
|
|
if d.year < 100:
|
|
d = d.replace(year=2000 + d.year)
|
|
return d
|
|
except ValueError:
|
|
continue
|
|
raise ValueError("Expected a date like 1/22/26 or 2026-01-22")
|
|
|
|
|
|
def json_filename(day):
|
|
return f"sessions_{day.strftime('%Y-%m-%d')}.json"
|
|
|
|
|
|
def normalize_value(val):
|
|
if isinstance(val, (dt.datetime, dt.date)):
|
|
if isinstance(val, dt.date) and not isinstance(val, dt.datetime):
|
|
return val.strftime("%Y-%m-%d")
|
|
return val.strftime("%Y-%m-%d %H:%M:%S")
|
|
return val
|
|
|
|
|
|
def sql_literal(val):
|
|
if val is None:
|
|
return "NULL"
|
|
if isinstance(val, (dt.datetime, dt.date)):
|
|
if isinstance(val, dt.date) and not isinstance(val, dt.datetime):
|
|
return f"'{val.strftime('%Y-%m-%d')}'"
|
|
return f"'{val.strftime('%Y-%m-%d %H:%M:%S')}'"
|
|
if isinstance(val, (int, float)) and not isinstance(val, bool):
|
|
return str(val)
|
|
text = str(val)
|
|
if pymysql is not None:
|
|
text = pymysql.converters.escape_string(text)
|
|
return "'" + text + "'"
|
|
|
|
|
|
def clean_desc_for_export(text):
|
|
if text is None:
|
|
return None
|
|
if BeautifulSoup is None:
|
|
raise RuntimeError("beautifulsoup4 is not installed. Install it with: pip install beautifulsoup4")
|
|
allowed = ["b", "i", "em", "strong", "ul", "ol", "li", "p", "br"]
|
|
s = unicodedata.normalize("NFKD", str(text))
|
|
# Remove script/style blocks entirely.
|
|
s = re.sub(r"(?is)<(script|style)[^>]*>.*?</\\1>", "", s)
|
|
soup = BeautifulSoup(s, "html.parser")
|
|
# Drop disallowed tags, keep allowed tags without attributes.
|
|
for tag in soup.find_all(True):
|
|
if tag.name not in allowed:
|
|
tag.unwrap()
|
|
else:
|
|
tag.attrs = {}
|
|
s = "".join(str(x) for x in soup.contents)
|
|
s = html.unescape(s)
|
|
s = s.replace("\u00a0", " ")
|
|
s = s.replace("\u200b", "")
|
|
'''s = re.sub(r"[ \\t\\f\\v]+", " ", s)
|
|
s = re.sub(r"\\s*<br>\\s*", "<br>", s)
|
|
s = re.sub(r"(?:<br>\\s*){3,}", "<br><br>", s)
|
|
s = re.sub(r"\\s*</p>\\s*", "</p>", s)
|
|
s = re.sub(r"\\s*<p>\\s*", "<p>", s)'''
|
|
return s.strip()
|
|
|
|
|
|
def db_dump_day():
|
|
day = parse_day(input("Enter day (e.g., 1/22/26): "))
|
|
fname = json_filename(day)
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM conf_sessions WHERE DATE(starttime) = %s ORDER BY starttime, id",
|
|
(day.strftime("%Y-%m-%d"),),
|
|
)
|
|
rows = cur.fetchall()
|
|
finally:
|
|
conn.close()
|
|
with open(fname, "w", encoding="utf-8") as fh:
|
|
json.dump(rows, fh, indent=2, sort_keys=True, default=str)
|
|
print(f"Wrote {len(rows)} session(s) to {fname}")
|
|
|
|
|
|
def db_reload_day_json():
|
|
day = parse_day(input("Enter day (e.g., 1/22/26): "))
|
|
fname = json_filename(day)
|
|
if not os.path.exists(fname):
|
|
raise FileNotFoundError(f"Missing {fname}")
|
|
with open(fname, "r", encoding="utf-8") as fh:
|
|
rows = json.load(fh)
|
|
if not isinstance(rows, list):
|
|
raise ValueError("JSON file must be a list of session rows")
|
|
conn = get_conn()
|
|
updated = 0
|
|
try:
|
|
with conn.cursor() as cur:
|
|
for row in rows:
|
|
if "id" not in row:
|
|
continue
|
|
ses_id = row["id"]
|
|
cur.execute("SELECT * FROM conf_sessions WHERE id = %s", (ses_id,))
|
|
current = cur.fetchone()
|
|
if not current:
|
|
print(f"Skipping missing id {ses_id}")
|
|
continue
|
|
changes = {}
|
|
for key, new_val in row.items():
|
|
if key == "id" or key not in current:
|
|
continue
|
|
cur_val = current[key]
|
|
norm_new = normalize_value(new_val)
|
|
norm_cur = normalize_value(cur_val)
|
|
if norm_new != norm_cur:
|
|
changes[key] = new_val
|
|
if changes:
|
|
set_sql = ", ".join([f"`{k}`=%s" for k in changes.keys()])
|
|
params = list(changes.values()) + [ses_id]
|
|
cur.execute(f"UPDATE conf_sessions SET {set_sql} WHERE id=%s", params)
|
|
updated += 1
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
print(f"Updated {updated} session(s) from {fname}")
|
|
|
|
|
|
def db_export_sessions_sql():
|
|
if pymysql is None:
|
|
raise RuntimeError("pymysql is not installed. Install it with: pip install pymysql")
|
|
conn = get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT * FROM conf_sessions WHERE id > 1462 ORDER BY id")
|
|
rows = cur.fetchall()
|
|
columns = [col[0] for col in cur.description]
|
|
finally:
|
|
conn.close()
|
|
if not rows:
|
|
print("No sessions found with id > 1462")
|
|
return
|
|
if "id" in columns:
|
|
columns = [c for c in columns if c != "id"]
|
|
os.makedirs("data", exist_ok=True)
|
|
out_path = os.path.join("data", "sessions.sql")
|
|
with open(out_path, "w", encoding="utf-8") as fh:
|
|
for row in rows:
|
|
values = []
|
|
for col in columns:
|
|
val = row.get(col)
|
|
if col == "desc":
|
|
val = clean_desc_for_export(val)
|
|
values.append(sql_literal(val))
|
|
col_sql = ", ".join([f"`{c}`" for c in columns])
|
|
val_sql = ", ".join(values)
|
|
fh.write(f"INSERT INTO conf_sessions ({col_sql}) VALUES ({val_sql});\n")
|
|
print(f"Wrote {len(rows)} INSERT statements to {out_path}")
|
|
|
|
|
|
def main():
|
|
actions = [
|
|
("Dump day's sessions from db", db_dump_day),
|
|
("Reload day's session from json", db_reload_day_json),
|
|
("Export sessions id>1462 to SQL inserts", db_export_sessions_sql),
|
|
("Reload ops.py", "__RELOAD__"),
|
|
("Quit", None),
|
|
]
|
|
while True:
|
|
print("\nOps Menu")
|
|
for i, (label, _) in enumerate(actions, 1):
|
|
print(f"{i}. {label}")
|
|
choice = input("Choose an option: ").strip()
|
|
if not choice.isdigit():
|
|
print("Please enter a number.")
|
|
continue
|
|
idx = int(choice) - 1
|
|
if idx < 0 or idx >= len(actions):
|
|
print("Invalid choice.")
|
|
continue
|
|
label, action = actions[idx]
|
|
if action is None:
|
|
return
|
|
if action == "__RELOAD__":
|
|
os.execv(sys.executable, [sys.executable] + sys.argv)
|
|
try:
|
|
action()
|
|
except Exception as exc:
|
|
print(f"Error: {exc}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|