# Local data, saving and manipulating import util import os, re, gzip, codecs, funcy, pytz, json, random, functools, requests, sys, csv, time, psycopg2 import hashlib import pandas as pd import numpy as np from collections import defaultdict from datetime import datetime as dt from datetime import timedelta from dateutil.parser import parse from os.path import exists, getmtime from tabulate import tabulate from semesters import short_to_sis from canvas_secrets import postgres_database, postgres_password, postgres_port, postgres_user, postgres_host ######### ######### LOCAL DB ######### CON = '' CURSOR = '' def db(): global CON,CURSOR CON = psycopg2.connect(database=postgres_database, host=postgres_host, user=postgres_user, password=postgres_password, port=postgres_port) CURSOR = CON.cursor() return CON,CURSOR ''' # Help the next function to upload new users directly to conf database on gavilan. def employees_refresh_flex(data): try: data['a'] = 'set/newuser' data['sis_user_id'] = data['sis_user_id'][3:] print("\nUploading this: \n") print(json.dumps(data, indent=2)) print("\n") a = input("Continue (y) or skip (n) ? ") if a == 'y': # This is what I was missing.......... # req.add_header("Content-type", "application/x-www-form-urlencoded") r3 = requests.post('https://www.gavilan.edu/staff/flex/2020/api.php', params=data) print(r3.text) #print(r3.headers) except Exception as ex: print("Failed on: %s\nErr: %s" % (str(data),str(ex))) # Everyone in iLearn DB with an xyz@gavilan.edu email address. def all_gav_employees(): (connection,cursor) = db() connection.row_factory = dict_factory q = """SELECT u.canvasid, u.name, u.created, u.sortablename, h.address, h.type, h.workflow_state, h.updated_at, p.last_request_at, p.last_login_at, p.current_login_at, p.last_login_ip, p.current_login_ip, p.sis_user_id, p.unique_name FROM users AS u JOIN comm_channel AS h ON u.id=h.user_id JOIN pseudonym AS p ON p.user_id=u.id WHERE h.address LIKE "%@gavilan.edu" ORDER BY u.sortablename""" cursor = connection.cursor() cursor.execute(q) everyone = cursor.fetchall() everyone_set = set() for E in everyone: try: everyone_set.add( E['address'].lower() ) except Exception as e: print("Exception: %s\nwith: %s" % (str(e), str(E))) oo = open('cache/temp1.txt','w') oo.write(json.dumps(list(everyone_set), indent=2)) existing = requests.get('https://gavilan.edu/staff/flex/2020/api.php?a=get/users') ex = json.loads( existing.text ) already_enrolled = set() for usr in ex['users']: try: #already_enrolled.add( (usr['goo'], usr['email'].lower(), usr['name']) ) already_enrolled.add( usr['email'].lower() ) except Exception as e: print("Exception: %s\nWith: %s" % (str(e),str(usr))) oo.write( "\n"*20 + '------------------------------------------\n'*20 + '------ - - - - - - ' ) oo.write(json.dumps(list(already_enrolled), indent=2)) # conf_users wants: goo, email, name, active # and emails have random capitalization # name is First Last, and sometimes with Middle in there. # # using sets: to_enroll = [ x for x in students if x not in already_enrolled ] new_emp = [ x for x in everyone_set if x not in already_enrolled ] # take the all_employee list, filter -> anyone who's in 'existing' is removed # funcy.where( lambda x: x['email'] == ae[4] , existing ) #new_emp = list(funcy.filter( lambda ae: funcy.where( existing, email=ae['email'] ), all_emp )) #new_emp = list(funcy.where( existing, email=b'phowell@gavilan.edu')) #ae['email'] )) print(new_emp) oo.write( "\n"*20 + '------------------------------------------\n'*20 + '------ - - - - - - ' ) oo.write(json.dumps(list(new_emp), indent=2)) # Now, iLearn db (everyone)... find the rows that match the email addresses # that we've decided we need to add (new_emp) #print(everyone) #print( "searching for %s" % j ) #print( "searched for %s, found: %s" % (j, str(to_add) )) #print("\nUploading...\n") for j in new_emp: #j = new_emp[0] print(j) to_add = list(funcy.where( everyone, address=j )) if to_add: employees_refresh_flex(to_add[0]) else: print("Didn't find an entry for that account.") print("done uploading") ''' def user_from_goo(goo): if not goo.lower().startswith("g00"): goo = "G00" + goo q = f"SELECT * FROM canvas.pseudonyms p JOIN canvas.users u ON p.user_id=u.id WHERE p.sis_user_id='{goo}';" (connection,cursor) = db() cursor.execute(q, None) # execute query with optional parameters row = cursor.fetchone() # fetch a single row if row: # convert row to dict using column names as keys return dict(zip([desc[0] for desc in cursor.description], row)) def course_sched_entry_from_id(id): q = f"SELECT * FROM canvas.schedule s WHERE s.canvascourse={id}" (connection,cursor) = db() cursor.execute(q, None) # execute query with optional parameters row = cursor.fetchone() # fetch a single row if row: # convert row to dict using column names as keys return dict(zip([desc[0] for desc in cursor.description], row)) def course_from_id(id): q = f"SELECT * FROM canvas.courses c WHERE c.id={id}" (connection,cursor) = db() cursor.execute(q, None) # execute query with optional parameters row = cursor.fetchone() # fetch a single row if row: # convert row to dict using column names as keys return dict(zip([desc[0] for desc in cursor.description], row)) def teachers_by_term(TERM = "202430"): q = f"""SELECT c.id, c.name, c.course_code, c.sis_source_id, c.created_at, c.start_at, c.workflow_state, e.last_attended_at, u.id, u.sortable_name, u.created_at FROM canvas.courses AS c JOIN canvas.enrollments AS e ON e.course_id=c.id JOIN canvas.users AS u ON u.id=e.user_id WHERE c.sis_source_id LIKE '{TERM}%' AND e.type='TeacherEnrollment' ORDER BY u.sortable_name, c.course_code;""" (connection,cursor) = db() cursor.execute(q) all_teachers = cursor.fetchall() table = [ [t[9],t[1],t[3],t[6]] for t in all_teachers] print(tabulate(table)) #for t in all_teachers: # print("\t".join( [str(x) for x in [t[9],t[1],t[3],t[6]]])) return all_teachers def courses_in_term(TERM = "202430"): q = f"""SELECT c.id, c.name, c.course_code, c.sis_source_id, c.workflow_state FROM canvas.courses AS c WHERE c.sis_source_id LIKE '{TERM}%' ORDER BY c.course_code;""" (connection,cursor) = db() cursor.execute(q) all = cursor.fetchall() #table = [ [t[9],t[1],t[3],t[6]] for t in all_teachers] print(tabulate(all)) def pages_in_term(TERM="202430"): # q = f"""SELECT c.id, c.course_code, c.sis_source_id, wp.id as wp_id, wp.title, wp.url, c.name , wp.body FROM canvas.courses c JOIN canvas.wiki_pages wp ON wp.context_id=c.id WHERE c.sis_source_id LIKE '{TERM}%' ORDER BY c.sis_source_id, wp.title;""" (connection,cursor) = db() cursor.execute(q) all = cursor.fetchall() #print(tabulate(all)) return all def user_ids_in_shell(shellid): q = f"""select e.user_id from canvas.enrollments e where e.course_id = {shellid} and e.type='StudentEnrollment' and e.workflow_state='active';""" (connection,cursor) = db() cursor.execute(q) students = {str(row[0]) for row in cursor.fetchall()} print(f"{len(students)} students currently in shell id {shellid}.") return students def users_new_this_semester(sem=''): if not len(sem): sem = input("which semester? (ex: 202150) ") users_to_enroll = set() where1 = "c.sis_source_id LIKE '%s-%%'" % sem where2 = "c.sis_source_id NOT LIKE '%s-%%'" % sem q = """SELECT u.id, u.name, u.sortable_name, string_agg(c.course_code, ','), COUNT(e.id) AS num FROM canvas.enrollments AS e JOIN canvas.users AS u ON e.user_id=u.id JOIN canvas.courses AS c ON e.course_id=c.id WHERE %s AND e.workflow_state='active' AND e.type='StudentEnrollment' AND u.id NOT IN ( SELECT u.id FROM canvas.enrollments AS e JOIN canvas.users AS u ON e.user_id=u.id JOIN canvas.courses AS c ON e.course_id=c.id WHERE %s AND e.workflow_state='active' AND e.type='StudentEnrollment' GROUP BY u.id ) GROUP BY u.id ORDER BY num DESC, u.sortable_name""" % (where1,where2) (connection,cursor) = db() cursor.execute(q) #s = cursor.fetchall() # TODO see below #if s: for u in cursor: users_to_enroll.add(str(u[0])) print(u) print("%i new users this semester." % len(users_to_enroll)) print(users_to_enroll) return users_to_enroll # when registrations opens for SUMMER+FALL, get new students from both semesters combined. def users_new_this_2x_semester(sem1='', sem2=''): if not len(sem1) or (not len(sem2)): print("Need 2 semesters") return 0 where1 = f"(c.sis_source_id LIKE '{sem1}-%%' or c.sis_source_id LIKE '{sem2}-%%')" where2 = f"(c.sis_source_id NOT LIKE '{sem1}-%%' and c.sis_source_id NOT LIKE '{sem2}-%%')" q = """SELECT u.id, u.name, u.sortable_name, string_agg(c.course_code, ','), COUNT(e.id) AS num FROM canvas.enrollments AS e JOIN canvas.users AS u ON e.user_id=u.id JOIN canvas.courses AS c ON e.course_id=c.id WHERE %s AND e.workflow_state='active' AND e.type='StudentEnrollment' AND u.id NOT IN ( SELECT u.id FROM canvas.enrollments AS e JOIN canvas.users AS u ON e.user_id=u.id JOIN canvas.courses AS c ON e.course_id=c.id WHERE %s AND e.workflow_state='active' AND e.type='StudentEnrollment' GROUP BY u.id ) GROUP BY u.id ORDER BY num DESC, u.sortable_name""" % (where1,where2) (connection,cursor) = db() cursor.execute(q) users_to_enroll = {str(row[0]) for row in cursor.fetchall()} print("%i new users this semester." % len(users_to_enroll)) return users_to_enroll # Fetch the joined courses (one semester) and schedules tables def all_sem_courses_teachers(SEM="202470"): q = f"""SELECT c.id, c.name, c.course_code, u.name, u.sortable_name, u.id AS user_cid, p.sis_user_id, s.type, s.crn FROM canvas.courses AS c JOIN canvas.enrollments AS e ON e.course_id=c.id JOIN canvas.users AS u ON u.id=e.user_id JOIN canvas.pseudonyms AS p ON p.user_id=u.id JOIN canvas.schedule as s on c.id=s.canvascourse WHERE c.sis_source_id LIKE '{SEM}-%' AND NOT c.workflow_state='deleted' AND e.type='TeacherEnrollment' ORDER BY u.sortable_name;""" (connection,cursor) = db() cursor.execute(q) courses = cursor.fetchall() print(courses) return courses def all_2x_sem_courses_teachers(sem1, sem2): q = f"""SELECT c.id, c.name, c.course_code, s.type, s.crn, u.name, u.sortable_name, u.id AS user_cid, p.sis_user_id FROM canvas.courses AS c JOIN canvas.enrollments AS e ON e.course_id=c.id JOIN canvas.users AS u ON u.id=e.user_id JOIN canvas.pseudonyms AS p ON p.user_id=u.id JOIN canvas.schedule as s on c.id=s.canvascourse WHERE (c.sis_source_id LIKE '{sem1}-%' or c.sis_source_id LIKE '{sem2}-%') AND NOT c.workflow_state='deleted' AND e.type='TeacherEnrollment' ORDER BY u.sortable_name;""" (connection,cursor) = db() cursor.execute(q) courses = cursor.fetchall() print(courses) return courses def create_schedule_table_if_not_exists(): conn,cur = db() # Check if table exists cur.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'schedule' and table_schema = 'canvas')") table_exists = cur.fetchone()[0] if not table_exists: # Create table if it doesn't exist cur.execute("""CREATE TABLE canvas.schedule ( id SERIAL PRIMARY KEY, canvascourse INTEGER REFERENCES canvas.courses, crn VARCHAR(20), code VARCHAR(30), units VARCHAR(20), teacher TEXT, start VARCHAR(30), "end" VARCHAR(30), type VARCHAR(20), loc VARCHAR(80), site VARCHAR(50), partofday VARCHAR(40), cap INTEGER, act INTEGER, sem VARCHAR(10) ) """) conn.commit() cur.close() # Populate schedule table and correlate to courses table def courses_to_sched(): # TODO: fix units when they are variable... change to float in between range. round to 0.5 unit. EXECUTE = 1 last_time = 0 seasons = {'10':'wi','30':'sp','50':'su','70':'fa'} seasons2 = {'wi':'10', 'sp':'30', 'su':'50', 'fa':'70'} create_schedule_table_if_not_exists() conn,cur = db() # get all ilearn courses query = """SELECT c.id, c.workflow_state, c.sis_source_id, c.course_code, c.enrollment_term_id, t.name FROM canvas.courses c JOIN canvas.enrollment_terms t ON c.enrollment_term_id = t.id ORDER BY c.sis_source_id, c.course_code;""" cur.execute(query) sis_to_sched = {} for row in cur.fetchall(): sis_source_id = row[2] # c.sis_source_id sis_to_sched.setdefault(sis_source_id, []).append(row) vals_cache = [] i = 0 for year in ['16','17','18','19','20','21','22','23','24','25']: for sem in ['sp','su','fa']: term = f"{sem}{year}" sis_code = f"20{year}{seasons2[sem]}" print(term) sched = 0 try: sched = requests.get(f"http://gavilan.cc/schedule/{term}_sched_expanded.json").json() except Exception as e: print(e) query = "INSERT INTO canvas.schedule (canvascourse, crn, code, units, teacher, start,\"end\", type, loc, site, partofday, cap, act, sem) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);" if sched: for c in sched: try: pod = '' if 'partofday' in c: pod = c['partofday'] #print(c['cred']) cred_match = re.search(r'(\d+\.\d+)\-(\d+\.\d+)',c['cred']) if cred_match: #print(f"matched range: {cred_match.groups}") cred_start = float(cred_match.group(1)) cred_end = float(cred_match.group(2)) mid = float(int( (cred_end-cred_start)/2 + cred_start )) c['cred'] = str(mid) #print(f"middle cred is {c['cred']}") full_sis_code = sis_code+'-'+c['crn'] if full_sis_code in sis_to_sched: print(c['cred']) q = [sis_to_sched[full_sis_code][0][0], c['crn'], c['code'], c['cred'], c['teacher'], c['start'], c['end'], c['type'], c['loc'], c['site'], pod, int(c['cap']), int(c['act']), sis_code] vals_cache.append( q ) # [ str(x) for x in q ] ) #print(f"{i}: {q}") i += 1 if i % 100 == 0: if EXECUTE: cur.executemany(query, vals_cache) conn.commit() vals_cache = [] t = time.process_time() delta = t - last_time last_time = t print(f"Loop {i} - committed to db in %0.3fs. " % delta, flush=True) else: print(f"{full_sis_code} not in canvas courses.") except Exception as e: print(e) if EXECUTE: cur.executemany(query, vals_cache) conn.commit() cur.close() conn.close() # Populate schedule table and correlate to courses table def refresh_semester_schedule_db(term="fa25"): # TODO: fix units when they are variable... change to float in between range. round to 0.5 unit. EXECUTE = 1 last_time = 0 sis_code = short_to_sis(term) conn,cur = db() drop_query = f"DELETE FROM canvas.schedule WHERE sem='{sis_code}'" print(f"executing: {drop_query}") cur.execute(drop_query) # get all ilearn courses query = """SELECT c.id, c.workflow_state, c.sis_source_id, c.course_code, c.enrollment_term_id, t.name FROM canvas.courses c JOIN canvas.enrollment_terms t ON c.enrollment_term_id = t.id ORDER BY c.sis_source_id, c.course_code;""" cur.execute(query) conn.commit() sis_to_sched = {} for row in cur.fetchall(): sis_source_id = row[2] # c.sis_source_id sis_to_sched.setdefault(sis_source_id, []).append(row) vals_cache = [] i = 0 print(sis_code) sched = 0 try: sched = requests.get(f"http://gavilan.cc/schedule/{term}_sched_expanded.json").json() except Exception as e: print(e) query = "INSERT INTO canvas.schedule (canvascourse, crn, code, units, teacher, start,\"end\", type, loc, site, partofday, cap, act, sem) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);" if sched: for c in sched: try: pod = '' if 'partofday' in c: pod = c['partofday'] #print(c['cred']) cred_match = re.search(r'(\d+\.\d+)\-(\d+\.\d+)',c['cred']) if cred_match: #print(f"matched range: {cred_match.groups}") cred_start = float(cred_match.group(1)) cred_end = float(cred_match.group(2)) mid = float(int( (cred_end-cred_start)/2 + cred_start )) c['cred'] = str(mid) #print(f"middle cred is {c['cred']}") full_sis_code = sis_code+'-'+c['crn'] if full_sis_code in sis_to_sched: print(c['cred']) q = [sis_to_sched[full_sis_code][0][0], c['crn'], c['code'], c['cred'], c['teacher'], c['start'], c['end'], c['type'], c['loc'], c['site'], pod, int(c['cap']), int(c['act']), sis_code] vals_cache.append( q ) # [ str(x) for x in q ] ) #print(f"{i}: {q}") i += 1 if i % 100 == 0: if EXECUTE: cur.executemany(query, vals_cache) conn.commit() vals_cache = [] t = time.process_time() delta = t - last_time last_time = t print(f"Loop {i} - committed to db in %0.3fs. " % delta, flush=True) else: print(f"{full_sis_code} not in canvas courses.") except Exception as e: print(e) if EXECUTE: cur.executemany(query, vals_cache) conn.commit() cur.close() conn.close() def student_count(courseid): conn,cursor = db() q = f"""SELECT COUNT(u.id) AS student_count FROM canvas.courses AS c JOIN canvas.enrollments AS e ON e.course_id=c.id JOIN canvas.users AS u ON u.id=e.user_id WHERE c.id={courseid} AND e.type='StudentEnrollment';""" cursor.execute(q) return cursor.fetchall()[0][0] def teacher_list(courseid): conn,cursor = db() q = f"""SELECT u.id, u.name FROM canvas.courses AS c JOIN canvas.enrollments AS e ON e.course_id=c.id JOIN canvas.users AS u ON u.id=e.user_id WHERE c.id={courseid} AND e.type='TeacherEnrollment';""" cursor.execute(q) return cursor.fetchall() def everyone_teacher_role(): conn,cursor = db() q = '''select distinct ON (u.name) u.name, u.id, p.sis_user_id, u.created_at, c.course_code from canvas.enrollments e join canvas.users u on u.id=e.user_id join canvas.courses c on e.course_id=c.id join canvas.pseudonyms p on u.id=p.user_id where e.type='TeacherEnrollment' order by u.name;''' cursor.execute(q) return cursor.fetchall() def iLearn_name_from_goo(goo): goo = goo.upper() conn,cursor = db() q = f"select u.id, u.name, u.sortable_name, p.sis_user_id from canvas.pseudonyms p join canvas.users u on u.id=p.user_id where p.sis_user_id='{goo}';" cursor.execute(q) return cursor.fetchone() # -------------------- Useful Info (summaries, events, tags) -------------------- def init_usefulinfo_schema(): """Create tables for summaries, events, tags, and link tables if missing.""" CON, CUR = db() try: CUR.execute( """ CREATE TABLE IF NOT EXISTS useful_info_summary ( id BIGSERIAL PRIMARY KEY, summary_hash CHAR(64) UNIQUE, source TEXT, date_label TEXT, short_text TEXT, summary_text TEXT, created_at TIMESTAMPTZ DEFAULT now() ); """ ) CUR.execute( """ CREATE TABLE IF NOT EXISTS useful_info_event ( id BIGSERIAL PRIMARY KEY, dt TEXT, length TEXT, title TEXT, description TEXT, created_at TIMESTAMPTZ DEFAULT now() ); """ ) CUR.execute( """ CREATE TABLE IF NOT EXISTS useful_info_summary_event ( summary_id BIGINT NOT NULL REFERENCES useful_info_summary(id) ON DELETE CASCADE, event_id BIGINT NOT NULL REFERENCES useful_info_event(id) ON DELETE CASCADE, PRIMARY KEY (summary_id, event_id) ); """ ) CUR.execute( """ CREATE TABLE IF NOT EXISTS useful_info_tag ( id BIGSERIAL PRIMARY KEY, name TEXT UNIQUE ); """ ) CUR.execute( """ CREATE TABLE IF NOT EXISTS useful_info_summary_tag ( summary_id BIGINT NOT NULL REFERENCES useful_info_summary(id) ON DELETE CASCADE, tag_id BIGINT NOT NULL REFERENCES useful_info_tag(id) ON DELETE CASCADE, PRIMARY KEY (summary_id, tag_id) ); """ ) CON.commit() finally: CUR.close(); CON.close() def _sha256(s): return hashlib.sha256(s.encode('utf-8','ignore')).hexdigest() def _get_or_create_tag_id(CUR, name): try: CUR.execute("INSERT INTO useful_info_tag (name) VALUES (%s) ON CONFLICT (name) DO NOTHING RETURNING id", (name,)) row = CUR.fetchone() if row and row[0]: return row[0] except Exception: pass CUR.execute("SELECT id FROM useful_info_tag WHERE name=%s", (name,)) row = CUR.fetchone() return row[0] if row else None def insert_usefulinfo_record(parsed): """ Insert a summarize_u_info() JSON result into Postgres. Expected keys: source, date, tags (list), short, summary, events (list of {dt,length,title,description}). Dedups summaries using a stable hash; links tags and events via link tables. Returns summary_id. """ if not isinstance(parsed, dict): return None source = parsed.get('source') date_label = parsed.get('date') short_text = parsed.get('short') or '' summary_text = parsed.get('summary') or '' # Normalize tags into a flat list of strings def _norm_tags(t): if t is None: return [] if isinstance(t, list): out = [] for x in t: if isinstance(x, (list, tuple)): out.extend([str(y) for y in x if y is not None]) elif isinstance(x, dict): # accept {name: "..."} name = x.get('name') if 'name' in x else None if name: out.append(str(name)) elif x is not None: out.append(str(x)) return out if isinstance(t, str): # try json first try: j = json.loads(t) return _norm_tags(j) except Exception: pass # split by comma return [s.strip() for s in t.split(',') if s.strip()] return [str(t)] # Normalize events into a list of dicts with expected keys def _norm_events(ev): def _one(e): if not isinstance(e, dict): return None dt = e.get('dt') or e.get('datetime') or e.get('date') or e.get('when') length = e.get('length') or e.get('duration') or e.get('dur') title = e.get('title') or e.get('name') or e.get('summary') desc = e.get('description') or e.get('details') or e.get('note') or e.get('body') return {'dt': dt, 'length': length, 'title': title, 'description': desc} if ev is None: return [] if isinstance(ev, list): out = [] for x in ev: if isinstance(x, dict): y = _one(x) if y: out.append(y) elif isinstance(x, str): try: j = json.loads(x) if isinstance(j, dict): y = _one(j) if y: out.append(y) elif isinstance(j, list): out.extend(_norm_events(j)) except Exception: pass return out if isinstance(ev, dict): y = _one(ev) return [y] if y else [] if isinstance(ev, str): try: j = json.loads(ev) return _norm_events(j) except Exception: return [] return [] tags = _norm_tags(parsed.get('tags')) events = _norm_events(parsed.get('events')) s_hash = _sha256((source or '') + "\n" + (date_label or '') + "\n" + short_text + "\n" + summary_text) CON, CUR = db() summary_id = None try: CUR.execute( """ INSERT INTO useful_info_summary (summary_hash, source, date_label, short_text, summary_text) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (summary_hash) DO UPDATE SET short_text=EXCLUDED.short_text, summary_text=EXCLUDED.summary_text RETURNING id """, (s_hash, source, date_label, short_text, summary_text) ) row = CUR.fetchone() summary_id = row[0] if row else None # Tags if summary_id and isinstance(tags, list): for t in tags: if not t: continue tag_id = _get_or_create_tag_id(CUR, str(t)) if tag_id: try: CUR.execute( "INSERT INTO useful_info_summary_tag (summary_id, tag_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (summary_id, tag_id) ) except Exception: pass # Events if summary_id and isinstance(events, list): for idx, e in enumerate(events, start=1): # Diagnostics: validate dt and length raw_dt = (e or {}).get('dt') raw_len = (e or {}).get('length') raw_title = (e or {}).get('title') raw_desc = (e or {}).get('description') dt_str = str(raw_dt) if raw_dt is not None else '' parsed_ok = False parsed_iso = '' all_day = False try: if dt_str: # all-day if no explicit time markers all_day = not (re.search(r"\d\d?:\d\d", dt_str) or re.search(r"\b(am|pm)\b", dt_str, re.I)) p = parse(dt_str) parsed_ok = True parsed_iso = p.isoformat() except Exception as ex_dt: print("[usefulinfo][event-parse] bad dt:", dt_str, "error:", str(ex_dt)) def _mins(s): if not s: return 60 try: n = int(s) if n <= 12: return n * 60 return n except Exception: pass m = re.findall(r"(\d+(?:\.\d+)?)\s*([hm])", str(s), flags=re.I) total = 0 for num, unit in m: try: val = float(num) total += int(val * 60) if unit.lower() == 'h' else int(val) except Exception: pass return total or 60 computed_minutes = _mins(raw_len) try: CUR.execute( """ INSERT INTO useful_info_event (dt, length, title, description) VALUES (%s, %s, %s, %s) RETURNING id """, (raw_dt, raw_len, raw_title, raw_desc) ) evrow = CUR.fetchone() if evrow and evrow[0]: CUR.execute( "INSERT INTO useful_info_summary_event (summary_id, event_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (summary_id, evrow[0]) ) print(f"[usefulinfo] inserted event id={evrow[0]} (summary_id={summary_id}) dt='{dt_str}' parsed={parsed_ok} iso='{parsed_iso}' all_day={all_day} minutes={computed_minutes} title='{raw_title}'") else: print(f"[usefulinfo][warn] no event id returned for dt='{dt_str}' title='{raw_title}'") except Exception as ex_ins: print("[usefulinfo][event-insert-failed] summary_id=", summary_id, " event=", e, " dt_str=", dt_str, " parsed_ok=", parsed_ok, " parsed_iso=", parsed_iso, " all_day=", all_day, " computed_minutes=", computed_minutes, " error=", str(ex_ins)) CON.commit() finally: CUR.close(); CON.close() return summary_id def export_usefulinfo_events_to_ics(filepath='cache/useful_info_events.ics'): """Export events from useful info tables to an .ics file. - Attempts to parse dt and length into DTSTART/DTEND. - Includes title (SUMMARY), description (DESCRIPTION), and tags (CATEGORIES). """ from datetime import datetime, timedelta from dateutil import parser as dtparser CON, CUR = db() try: # Pull events with linked summary and aggregated tags CUR.execute( """ SELECT e.id, e.dt, e.length, e.title, e.description, s.source, s.date_label, s.short_text, COALESCE(array_agg(t.name) FILTER (WHERE t.name IS NOT NULL), '{}') AS tags FROM useful_info_event e JOIN useful_info_summary_event se ON se.event_id = e.id JOIN useful_info_summary s ON s.id = se.summary_id LEFT JOIN useful_info_summary_tag st ON st.summary_id = s.id LEFT JOIN useful_info_tag t ON t.id = st.tag_id GROUP BY e.id, s.source, s.date_label, s.short_text ORDER BY e.id """ ) rows = CUR.fetchall() finally: CUR.close(); CON.close() def _parse_minutes(length_str): if not length_str: return 60 try: n = int(length_str) if n <= 12: return n * 60 return n except Exception: pass m = re.findall(r"(\d+(?:\.\d+)?)\s*([hm])", length_str, flags=re.I) minutes = 0 if m: for num, unit in m: try: val = float(num) if unit.lower() == 'h': minutes += int(val * 60) else: minutes += int(val) except Exception: pass if minutes > 0: return minutes return 60 def _has_time_component(s): if not s: return False if re.search(r"\d\d?:\d\d", s): return True if re.search(r"\b(am|pm)\b", s, re.I): return True return False def _format_dt(dtobj): # Local time (floating) format return dtobj.strftime('%Y%m%dT%H%M%S') now_utc = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ') lines = [] lines.append('BEGIN:VCALENDAR') lines.append('VERSION:2.0') lines.append('PRODID:-//canvasapp//Useful Info Events//EN') lines.append('CALSCALE:GREGORIAN') lines.append('METHOD:PUBLISH') lines.append('X-WR-CALNAME:Useful Info') for r in rows: ev_id = r[0] dt_str = r[1] length_str = r[2] title = r[3] or '' desc = r[4] or '' source = r[5] or '' date_label = r[6] or '' short_text = r[7] or '' tags = r[8] or [] # Try to parse DTSTART/DTEND all_day = not _has_time_component(str(dt_str)) dtstart = None dtend = None try: if dt_str: parsed = dtparser.parse(str(dt_str), fuzzy=True) if all_day: # All-day event dtstart = parsed.date() dtend = (parsed.date() + timedelta(days=1)) else: dtstart = parsed minutes = _parse_minutes(str(length_str)) dtend = parsed + timedelta(minutes=minutes) except Exception: # If we cannot parse date, skip this event continue lines.append('BEGIN:VEVENT') lines.append('UID:usefulinfo-event-%s@gavilan' % ev_id) lines.append('DTSTAMP:%s' % now_utc) if all_day and dtstart and dtend: lines.append('DTSTART;VALUE=DATE:%s' % dtstart.strftime('%Y%m%d')) lines.append('DTEND;VALUE=DATE:%s' % dtend.strftime('%Y%m%d')) elif dtstart and dtend: lines.append('DTSTART:%s' % _format_dt(dtstart)) lines.append('DTEND:%s' % _format_dt(dtend)) if title: lines.append('SUMMARY:%s' % title.replace('\n', ' ').replace('\r', ' ')) full_desc = desc extra = [] if short_text: extra.append('Context: ' + short_text) if source or date_label: extra.append('Source: %s Date label: %s' % (source, date_label)) if extra: if full_desc: full_desc += '\n\n' + '\n'.join(extra) else: full_desc = '\n'.join(extra) if full_desc: # Basic escaping of commas/semicolons per RFC is often needed; we keep it simple here lines.append('DESCRIPTION:%s' % full_desc.replace('\r', ' ').replace('\n', '\\n')) if tags: try: cats = ','.join([t for t in tags if t]) if cats: lines.append('CATEGORIES:%s' % cats) except Exception: pass lines.append('END:VEVENT') lines.append('END:VCALENDAR') # Write file with open(filepath, 'w', encoding='utf-8') as f: f.write("\r\n".join(lines) + "\r\n") if __name__ == "__main__": print ('') options = { 1: ['all teachers', teachers_by_term], 2: ['courses in term', courses_in_term], 3: ['pages in term', pages_in_term], 4: ['new students this semester', users_new_this_semester], 5: ['all semester courses + teachers', all_sem_courses_teachers], 6: ['Populate schedule table and correlate to courses table', courses_to_sched], 7: ['refresh db schedule 1 semester', refresh_semester_schedule_db], } if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]()