canvasapp/localcache2.py

924 lines
32 KiB
Python

# Local data, saving and manipulating
import util
import os, re, gzip, codecs, funcy, pytz, json, random, functools, requests, sys, csv, time, psycopg2
import hashlib
import pandas as pd
import numpy as np
from collections import defaultdict
from datetime import datetime as dt
from datetime import timedelta
from dateutil.parser import parse
from os.path import exists, getmtime
from tabulate import tabulate
from semesters import short_to_sis
from canvas_secrets import postgres_database, postgres_password, postgres_port, postgres_user, postgres_host
#########
######### LOCAL DB
#########
CON = ''
CURSOR = ''
def db():
global CON,CURSOR
CON = psycopg2.connect(database=postgres_database,
host=postgres_host,
user=postgres_user,
password=postgres_password,
port=postgres_port)
CURSOR = CON.cursor()
return CON,CURSOR
'''
# Help the next function to upload new users directly to conf database on gavilan.
def employees_refresh_flex(data):
try:
data['a'] = 'set/newuser'
data['sis_user_id'] = data['sis_user_id'][3:]
print("\nUploading this: \n")
print(json.dumps(data, indent=2))
print("\n")
a = input("Continue (y) or skip (n) ? ")
if a == 'y':
# This is what I was missing..........
# req.add_header("Content-type", "application/x-www-form-urlencoded")
r3 = requests.post('https://www.gavilan.edu/staff/flex/2020/api.php', params=data)
print(r3.text)
#print(r3.headers)
except Exception as ex:
print("Failed on: %s\nErr: %s" % (str(data),str(ex)))
# Everyone in iLearn DB with an xyz@gavilan.edu email address.
def all_gav_employees():
(connection,cursor) = db()
connection.row_factory = dict_factory
q = """SELECT u.canvasid, u.name, u.created, u.sortablename, h.address, h.type, h.workflow_state,
h.updated_at, p.last_request_at, p.last_login_at, p.current_login_at, p.last_login_ip,
p.current_login_ip, p.sis_user_id, p.unique_name FROM users AS u
JOIN comm_channel AS h ON u.id=h.user_id
JOIN pseudonym AS p ON p.user_id=u.id
WHERE h.address LIKE "%@gavilan.edu"
ORDER BY u.sortablename"""
cursor = connection.cursor()
cursor.execute(q)
everyone = cursor.fetchall()
everyone_set = set()
for E in everyone:
try:
everyone_set.add( E['address'].lower() )
except Exception as e:
print("Exception: %s\nwith: %s" % (str(e), str(E)))
oo = open('cache/temp1.txt','w')
oo.write(json.dumps(list(everyone_set), indent=2))
existing = requests.get('https://gavilan.edu/staff/flex/2020/api.php?a=get/users')
ex = json.loads( existing.text )
already_enrolled = set()
for usr in ex['users']:
try:
#already_enrolled.add( (usr['goo'], usr['email'].lower(), usr['name']) )
already_enrolled.add( usr['email'].lower() )
except Exception as e:
print("Exception: %s\nWith: %s" % (str(e),str(usr)))
oo.write( "\n"*20 + '------------------------------------------\n'*20 + '------ - - - - - - ' )
oo.write(json.dumps(list(already_enrolled), indent=2))
# conf_users wants: goo, email, name, active
# and emails have random capitalization
# name is First Last, and sometimes with Middle in there.
#
# using sets: to_enroll = [ x for x in students if x not in already_enrolled ]
new_emp = [ x for x in everyone_set if x not in already_enrolled ]
# take the all_employee list, filter -> anyone who's in 'existing' is removed
# funcy.where( lambda x: x['email'] == ae[4] , existing )
#new_emp = list(funcy.filter( lambda ae: funcy.where( existing, email=ae['email'] ), all_emp ))
#new_emp = list(funcy.where( existing, email=b'phowell@gavilan.edu')) #ae['email'] ))
print(new_emp)
oo.write( "\n"*20 + '------------------------------------------\n'*20 + '------ - - - - - - ' )
oo.write(json.dumps(list(new_emp), indent=2))
# Now, iLearn db (everyone)... find the rows that match the email addresses
# that we've decided we need to add (new_emp)
#print(everyone)
#print( "searching for %s" % j )
#print( "searched for %s, found: %s" % (j, str(to_add) ))
#print("\nUploading...\n")
for j in new_emp:
#j = new_emp[0]
print(j)
to_add = list(funcy.where( everyone, address=j ))
if to_add:
employees_refresh_flex(to_add[0])
else:
print("Didn't find an entry for that account.")
print("done uploading")
'''
def user_from_goo(goo):
if not goo.lower().startswith("g00"):
goo = "G00" + goo
q = f"SELECT * FROM canvas.pseudonyms p JOIN canvas.users u ON p.user_id=u.id WHERE p.sis_user_id='{goo}';"
(connection,cursor) = db()
cursor.execute(q, None) # execute query with optional parameters
row = cursor.fetchone() # fetch a single row
if row:
# convert row to dict using column names as keys
return dict(zip([desc[0] for desc in cursor.description], row))
def course_sched_entry_from_id(id):
q = f"SELECT * FROM canvas.schedule s WHERE s.canvascourse={id}"
(connection,cursor) = db()
cursor.execute(q, None) # execute query with optional parameters
row = cursor.fetchone() # fetch a single row
if row:
# convert row to dict using column names as keys
return dict(zip([desc[0] for desc in cursor.description], row))
def course_from_id(id):
q = f"SELECT * FROM canvas.courses c WHERE c.id={id}"
(connection,cursor) = db()
cursor.execute(q, None) # execute query with optional parameters
row = cursor.fetchone() # fetch a single row
if row:
# convert row to dict using column names as keys
return dict(zip([desc[0] for desc in cursor.description], row))
def teachers_by_term(TERM = "202430"):
q = f"""SELECT c.id, c.name, c.course_code, c.sis_source_id, c.created_at, c.start_at, c.workflow_state, e.last_attended_at,
u.id, u.sortable_name, u.created_at FROM canvas.courses AS c
JOIN canvas.enrollments AS e ON e.course_id=c.id
JOIN canvas.users AS u ON u.id=e.user_id
WHERE c.sis_source_id LIKE '{TERM}%' AND e.type='TeacherEnrollment' ORDER BY u.sortable_name, c.course_code;"""
(connection,cursor) = db()
cursor.execute(q)
all_teachers = cursor.fetchall()
table = [ [t[9],t[1],t[3],t[6]] for t in all_teachers]
print(tabulate(table))
#for t in all_teachers:
# print("\t".join( [str(x) for x in [t[9],t[1],t[3],t[6]]]))
return all_teachers
def courses_in_term(TERM = "202430"):
q = f"""SELECT c.id, c.name, c.course_code, c.sis_source_id, c.workflow_state FROM canvas.courses AS c
WHERE c.sis_source_id LIKE '{TERM}%' ORDER BY c.course_code;"""
(connection,cursor) = db()
cursor.execute(q)
all = cursor.fetchall()
#table = [ [t[9],t[1],t[3],t[6]] for t in all_teachers]
print(tabulate(all))
def pages_in_term(TERM="202430"): #
q = f"""SELECT c.id, c.course_code, c.sis_source_id, wp.id as wp_id, wp.title, wp.url, c.name , wp.body
FROM canvas.courses c
JOIN canvas.wiki_pages wp ON wp.context_id=c.id
WHERE c.sis_source_id LIKE '{TERM}%'
ORDER BY c.sis_source_id, wp.title;"""
(connection,cursor) = db()
cursor.execute(q)
all = cursor.fetchall()
#print(tabulate(all))
return all
def user_ids_in_shell(shellid):
q = f"""select e.user_id from canvas.enrollments e
where e.course_id = {shellid} and e.type='StudentEnrollment' and e.workflow_state='active';"""
(connection,cursor) = db()
cursor.execute(q)
students = {str(row[0]) for row in cursor.fetchall()}
print(f"{len(students)} students currently in shell id {shellid}.")
return students
def users_new_this_semester(sem=''):
if not len(sem):
sem = input("which semester? (ex: 202150) ")
users_to_enroll = set()
where1 = "c.sis_source_id LIKE '%s-%%'" % sem
where2 = "c.sis_source_id NOT LIKE '%s-%%'" % sem
q = """SELECT u.id, u.name, u.sortable_name, string_agg(c.course_code, ','), COUNT(e.id) AS num FROM canvas.enrollments AS e
JOIN canvas.users AS u ON e.user_id=u.id
JOIN canvas.courses AS c ON e.course_id=c.id
WHERE %s
AND e.workflow_state='active'
AND e.type='StudentEnrollment'
AND u.id NOT IN (
SELECT u.id FROM canvas.enrollments AS e
JOIN canvas.users AS u ON e.user_id=u.id
JOIN canvas.courses AS c ON e.course_id=c.id
WHERE %s
AND e.workflow_state='active'
AND e.type='StudentEnrollment'
GROUP BY u.id
)
GROUP BY u.id
ORDER BY num DESC, u.sortable_name""" % (where1,where2)
(connection,cursor) = db()
cursor.execute(q)
#s = cursor.fetchall() # TODO see below
#if s:
for u in cursor:
users_to_enroll.add(str(u[0]))
print(u)
print("%i new users this semester." % len(users_to_enroll))
print(users_to_enroll)
return users_to_enroll
# when registrations opens for SUMMER+FALL, get new students from both semesters combined.
def users_new_this_2x_semester(sem1='', sem2=''):
if not len(sem1) or (not len(sem2)):
print("Need 2 semesters")
return 0
where1 = f"(c.sis_source_id LIKE '{sem1}-%%' or c.sis_source_id LIKE '{sem2}-%%')"
where2 = f"(c.sis_source_id NOT LIKE '{sem1}-%%' and c.sis_source_id NOT LIKE '{sem2}-%%')"
q = """SELECT u.id, u.name, u.sortable_name, string_agg(c.course_code, ','), COUNT(e.id) AS num FROM canvas.enrollments AS e
JOIN canvas.users AS u ON e.user_id=u.id
JOIN canvas.courses AS c ON e.course_id=c.id
WHERE %s
AND e.workflow_state='active'
AND e.type='StudentEnrollment'
AND u.id NOT IN (
SELECT u.id FROM canvas.enrollments AS e
JOIN canvas.users AS u ON e.user_id=u.id
JOIN canvas.courses AS c ON e.course_id=c.id
WHERE %s
AND e.workflow_state='active'
AND e.type='StudentEnrollment'
GROUP BY u.id
)
GROUP BY u.id
ORDER BY num DESC, u.sortable_name""" % (where1,where2)
(connection,cursor) = db()
cursor.execute(q)
users_to_enroll = {str(row[0]) for row in cursor.fetchall()}
print("%i new users this semester." % len(users_to_enroll))
return users_to_enroll
# Fetch the joined courses (one semester) and schedules tables
def all_sem_courses_teachers(SEM="202470"):
q = f"""SELECT c.id, c.name, c.course_code, u.name, u.sortable_name, u.id AS user_cid, p.sis_user_id, s.type, s.crn FROM canvas.courses AS c
JOIN canvas.enrollments AS e ON e.course_id=c.id
JOIN canvas.users AS u ON u.id=e.user_id
JOIN canvas.pseudonyms AS p ON p.user_id=u.id
JOIN canvas.schedule as s on c.id=s.canvascourse
WHERE c.sis_source_id LIKE '{SEM}-%'
AND NOT c.workflow_state='deleted'
AND e.type='TeacherEnrollment'
ORDER BY u.sortable_name;"""
(connection,cursor) = db()
cursor.execute(q)
courses = cursor.fetchall()
print(courses)
return courses
def all_2x_sem_courses_teachers(sem1, sem2):
q = f"""SELECT c.id, c.name, c.course_code, s.type, s.crn, u.name, u.sortable_name, u.id AS user_cid, p.sis_user_id FROM canvas.courses AS c
JOIN canvas.enrollments AS e ON e.course_id=c.id
JOIN canvas.users AS u ON u.id=e.user_id
JOIN canvas.pseudonyms AS p ON p.user_id=u.id
JOIN canvas.schedule as s on c.id=s.canvascourse
WHERE (c.sis_source_id LIKE '{sem1}-%' or c.sis_source_id LIKE '{sem2}-%')
AND NOT c.workflow_state='deleted'
AND e.type='TeacherEnrollment'
ORDER BY u.sortable_name;"""
(connection,cursor) = db()
cursor.execute(q)
courses = cursor.fetchall()
print(courses)
return courses
def create_schedule_table_if_not_exists():
conn,cur = db()
# Check if table exists
cur.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'schedule' and table_schema = 'canvas')")
table_exists = cur.fetchone()[0]
if not table_exists:
# Create table if it doesn't exist
cur.execute("""CREATE TABLE canvas.schedule (
id SERIAL PRIMARY KEY,
canvascourse INTEGER REFERENCES canvas.courses,
crn VARCHAR(20),
code VARCHAR(30),
units VARCHAR(20),
teacher TEXT,
start VARCHAR(30),
"end" VARCHAR(30),
type VARCHAR(20),
loc VARCHAR(80),
site VARCHAR(50),
partofday VARCHAR(40),
cap INTEGER,
act INTEGER,
sem VARCHAR(10)
)
""")
conn.commit()
cur.close()
# Populate schedule table and correlate to courses table
def courses_to_sched():
# TODO: fix units when they are variable... change to float in between range. round to 0.5 unit.
EXECUTE = 1
last_time = 0
seasons = {'10':'wi','30':'sp','50':'su','70':'fa'}
seasons2 = {'wi':'10', 'sp':'30', 'su':'50', 'fa':'70'}
create_schedule_table_if_not_exists()
conn,cur = db()
# get all ilearn courses
query = """SELECT c.id, c.workflow_state, c.sis_source_id, c.course_code, c.enrollment_term_id, t.name
FROM canvas.courses c
JOIN canvas.enrollment_terms t ON c.enrollment_term_id = t.id
ORDER BY c.sis_source_id, c.course_code;"""
cur.execute(query)
sis_to_sched = {}
for row in cur.fetchall():
sis_source_id = row[2] # c.sis_source_id
sis_to_sched.setdefault(sis_source_id, []).append(row)
vals_cache = []
i = 0
for year in ['16','17','18','19','20','21','22','23','24','25']:
for sem in ['sp','su','fa']:
term = f"{sem}{year}"
sis_code = f"20{year}{seasons2[sem]}"
print(term)
sched = 0
try:
sched = requests.get(f"http://gavilan.cc/schedule/{term}_sched_expanded.json").json()
except Exception as e:
print(e)
query = "INSERT INTO canvas.schedule (canvascourse, crn, code, units, teacher, start,\"end\", type, loc, site, partofday, cap, act, sem) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"
if sched:
for c in sched:
try:
pod = ''
if 'partofday' in c: pod = c['partofday']
#print(c['cred'])
cred_match = re.search(r'(\d+\.\d+)\-(\d+\.\d+)',c['cred'])
if cred_match:
#print(f"matched range: {cred_match.groups}")
cred_start = float(cred_match.group(1))
cred_end = float(cred_match.group(2))
mid = float(int( (cred_end-cred_start)/2 + cred_start ))
c['cred'] = str(mid)
#print(f"middle cred is {c['cred']}")
full_sis_code = sis_code+'-'+c['crn']
if full_sis_code in sis_to_sched:
print(c['cred'])
q = [sis_to_sched[full_sis_code][0][0], c['crn'], c['code'], c['cred'], c['teacher'], c['start'], c['end'], c['type'], c['loc'], c['site'], pod, int(c['cap']), int(c['act']), sis_code]
vals_cache.append( q ) # [ str(x) for x in q ] )
#print(f"{i}: {q}")
i += 1
if i % 100 == 0:
if EXECUTE:
cur.executemany(query, vals_cache)
conn.commit()
vals_cache = []
t = time.process_time()
delta = t - last_time
last_time = t
print(f"Loop {i} - committed to db in %0.3fs. " % delta, flush=True)
else:
print(f"{full_sis_code} not in canvas courses.")
except Exception as e:
print(e)
if EXECUTE:
cur.executemany(query, vals_cache)
conn.commit()
cur.close()
conn.close()
# Populate schedule table and correlate to courses table
def refresh_semester_schedule_db(term="fa25"):
# TODO: fix units when they are variable... change to float in between range. round to 0.5 unit.
EXECUTE = 1
last_time = 0
sis_code = short_to_sis(term)
conn,cur = db()
drop_query = f"DELETE FROM canvas.schedule WHERE sem='{sis_code}'"
print(f"executing: {drop_query}")
cur.execute(drop_query)
# get all ilearn courses
query = """SELECT c.id, c.workflow_state, c.sis_source_id, c.course_code, c.enrollment_term_id, t.name
FROM canvas.courses c
JOIN canvas.enrollment_terms t ON c.enrollment_term_id = t.id
ORDER BY c.sis_source_id, c.course_code;"""
cur.execute(query)
conn.commit()
sis_to_sched = {}
for row in cur.fetchall():
sis_source_id = row[2] # c.sis_source_id
sis_to_sched.setdefault(sis_source_id, []).append(row)
vals_cache = []
i = 0
print(sis_code)
sched = 0
try:
sched = requests.get(f"http://gavilan.cc/schedule/{term}_sched_expanded.json").json()
except Exception as e:
print(e)
query = "INSERT INTO canvas.schedule (canvascourse, crn, code, units, teacher, start,\"end\", type, loc, site, partofday, cap, act, sem) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"
if sched:
for c in sched:
try:
pod = ''
if 'partofday' in c: pod = c['partofday']
#print(c['cred'])
cred_match = re.search(r'(\d+\.\d+)\-(\d+\.\d+)',c['cred'])
if cred_match:
#print(f"matched range: {cred_match.groups}")
cred_start = float(cred_match.group(1))
cred_end = float(cred_match.group(2))
mid = float(int( (cred_end-cred_start)/2 + cred_start ))
c['cred'] = str(mid)
#print(f"middle cred is {c['cred']}")
full_sis_code = sis_code+'-'+c['crn']
if full_sis_code in sis_to_sched:
print(c['cred'])
q = [sis_to_sched[full_sis_code][0][0], c['crn'], c['code'], c['cred'], c['teacher'], c['start'], c['end'], c['type'], c['loc'], c['site'], pod, int(c['cap']), int(c['act']), sis_code]
vals_cache.append( q ) # [ str(x) for x in q ] )
#print(f"{i}: {q}")
i += 1
if i % 100 == 0:
if EXECUTE:
cur.executemany(query, vals_cache)
conn.commit()
vals_cache = []
t = time.process_time()
delta = t - last_time
last_time = t
print(f"Loop {i} - committed to db in %0.3fs. " % delta, flush=True)
else:
print(f"{full_sis_code} not in canvas courses.")
except Exception as e:
print(e)
if EXECUTE:
cur.executemany(query, vals_cache)
conn.commit()
cur.close()
conn.close()
def student_count(courseid):
conn,cursor = db()
q = f"""SELECT COUNT(u.id) AS student_count FROM canvas.courses AS c
JOIN canvas.enrollments AS e ON e.course_id=c.id
JOIN canvas.users AS u ON u.id=e.user_id
WHERE c.id={courseid}
AND e.type='StudentEnrollment';"""
cursor.execute(q)
return cursor.fetchall()[0][0]
def teacher_list(courseid):
conn,cursor = db()
q = f"""SELECT u.id, u.name FROM canvas.courses AS c
JOIN canvas.enrollments AS e ON e.course_id=c.id
JOIN canvas.users AS u ON u.id=e.user_id
WHERE c.id={courseid}
AND e.type='TeacherEnrollment';"""
cursor.execute(q)
return cursor.fetchall()
def everyone_teacher_role():
conn,cursor = db()
q = '''select distinct ON (u.name) u.name, u.id, p.sis_user_id, u.created_at, c.course_code from canvas.enrollments e
join canvas.users u on u.id=e.user_id
join canvas.courses c on e.course_id=c.id
join canvas.pseudonyms p on u.id=p.user_id
where e.type='TeacherEnrollment'
order by u.name;'''
cursor.execute(q)
return cursor.fetchall()
def iLearn_name_from_goo(goo):
goo = goo.upper()
conn,cursor = db()
q = f"select u.id, u.name, u.sortable_name, p.sis_user_id from canvas.pseudonyms p join canvas.users u on u.id=p.user_id where p.sis_user_id='{goo}';"
cursor.execute(q)
return cursor.fetchone()
# -------------------- Useful Info (summaries, events, tags) --------------------
def init_usefulinfo_schema():
"""Create tables for summaries, events, tags, and link tables if missing."""
CON, CUR = db()
try:
CUR.execute(
"""
CREATE TABLE IF NOT EXISTS useful_info_summary (
id BIGSERIAL PRIMARY KEY,
summary_hash CHAR(64) UNIQUE,
source TEXT,
date_label TEXT,
short_text TEXT,
summary_text TEXT,
created_at TIMESTAMPTZ DEFAULT now()
);
"""
)
CUR.execute(
"""
CREATE TABLE IF NOT EXISTS useful_info_event (
id BIGSERIAL PRIMARY KEY,
dt TEXT,
length TEXT,
title TEXT,
description TEXT,
created_at TIMESTAMPTZ DEFAULT now()
);
"""
)
CUR.execute(
"""
CREATE TABLE IF NOT EXISTS useful_info_summary_event (
summary_id BIGINT NOT NULL REFERENCES useful_info_summary(id) ON DELETE CASCADE,
event_id BIGINT NOT NULL REFERENCES useful_info_event(id) ON DELETE CASCADE,
PRIMARY KEY (summary_id, event_id)
);
"""
)
CUR.execute(
"""
CREATE TABLE IF NOT EXISTS useful_info_tag (
id BIGSERIAL PRIMARY KEY,
name TEXT UNIQUE
);
"""
)
CUR.execute(
"""
CREATE TABLE IF NOT EXISTS useful_info_summary_tag (
summary_id BIGINT NOT NULL REFERENCES useful_info_summary(id) ON DELETE CASCADE,
tag_id BIGINT NOT NULL REFERENCES useful_info_tag(id) ON DELETE CASCADE,
PRIMARY KEY (summary_id, tag_id)
);
"""
)
CON.commit()
finally:
CUR.close(); CON.close()
def _sha256(s):
return hashlib.sha256(s.encode('utf-8','ignore')).hexdigest()
def _get_or_create_tag_id(CUR, name):
try:
CUR.execute("INSERT INTO useful_info_tag (name) VALUES (%s) ON CONFLICT (name) DO NOTHING RETURNING id", (name,))
row = CUR.fetchone()
if row and row[0]:
return row[0]
except Exception:
pass
CUR.execute("SELECT id FROM useful_info_tag WHERE name=%s", (name,))
row = CUR.fetchone()
return row[0] if row else None
def insert_usefulinfo_record(parsed):
"""
Insert a summarize_u_info() JSON result into Postgres.
Expected keys: source, date, tags (list), short, summary, events (list of {dt,length,title,description}).
Dedups summaries using a stable hash; links tags and events via link tables.
Returns summary_id.
"""
if not isinstance(parsed, dict):
return None
source = parsed.get('source')
date_label = parsed.get('date')
short_text = parsed.get('short') or ''
summary_text = parsed.get('summary') or ''
tags = parsed.get('tags') or []
events = parsed.get('events') or []
s_hash = _sha256((source or '') + "\n" + (date_label or '') + "\n" + short_text + "\n" + summary_text)
CON, CUR = db()
summary_id = None
try:
CUR.execute(
"""
INSERT INTO useful_info_summary
(summary_hash, source, date_label, short_text, summary_text)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (summary_hash)
DO UPDATE SET short_text=EXCLUDED.short_text, summary_text=EXCLUDED.summary_text
RETURNING id
""",
(s_hash, source, date_label, short_text, summary_text)
)
row = CUR.fetchone()
summary_id = row[0] if row else None
# Tags
if summary_id and isinstance(tags, list):
for t in tags:
if not t:
continue
tag_id = _get_or_create_tag_id(CUR, str(t))
if tag_id:
try:
CUR.execute(
"INSERT INTO useful_info_summary_tag (summary_id, tag_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(summary_id, tag_id)
)
except Exception:
pass
# Events
if summary_id and isinstance(events, list):
for e in events:
try:
CUR.execute(
"""
INSERT INTO useful_info_event (dt, length, title, description)
VALUES (%s, %s, %s, %s)
RETURNING id
""",
(
(e or {}).get('dt'),
(e or {}).get('length'),
(e or {}).get('title'),
(e or {}).get('description'),
)
)
evrow = CUR.fetchone()
if evrow and evrow[0]:
CUR.execute(
"INSERT INTO useful_info_summary_event (summary_id, event_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(summary_id, evrow[0])
)
except Exception:
pass
CON.commit()
finally:
CUR.close(); CON.close()
return summary_id
def export_usefulinfo_events_to_ics(filepath='cache/useful_info_events.ics'):
"""Export events from useful info tables to an .ics file.
- Attempts to parse dt and length into DTSTART/DTEND.
- Includes title (SUMMARY), description (DESCRIPTION), and tags (CATEGORIES).
"""
from datetime import datetime, timedelta
from dateutil import parser as dtparser
CON, CUR = db()
try:
# Pull events with linked summary and aggregated tags
CUR.execute(
"""
SELECT e.id, e.dt, e.length, e.title, e.description,
s.source, s.date_label, s.short_text,
COALESCE(array_agg(t.name) FILTER (WHERE t.name IS NOT NULL), '{}') AS tags
FROM useful_info_event e
JOIN useful_info_summary_event se ON se.event_id = e.id
JOIN useful_info_summary s ON s.id = se.summary_id
LEFT JOIN useful_info_summary_tag st ON st.summary_id = s.id
LEFT JOIN useful_info_tag t ON t.id = st.tag_id
GROUP BY e.id, s.source, s.date_label, s.short_text
ORDER BY e.id
"""
)
rows = CUR.fetchall()
finally:
CUR.close(); CON.close()
def _parse_minutes(length_str):
if not length_str:
return 60
try:
n = int(length_str)
if n <= 12:
return n * 60
return n
except Exception:
pass
m = re.findall(r"(\d+(?:\.\d+)?)\s*([hm])", length_str, flags=re.I)
minutes = 0
if m:
for num, unit in m:
try:
val = float(num)
if unit.lower() == 'h':
minutes += int(val * 60)
else:
minutes += int(val)
except Exception:
pass
if minutes > 0:
return minutes
return 60
def _has_time_component(s):
if not s:
return False
if re.search(r"\d\d?:\d\d", s):
return True
if re.search(r"\b(am|pm)\b", s, re.I):
return True
return False
def _format_dt(dtobj):
# Local time (floating) format
return dtobj.strftime('%Y%m%dT%H%M%S')
now_utc = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
lines = []
lines.append('BEGIN:VCALENDAR')
lines.append('VERSION:2.0')
lines.append('PRODID:-//canvasapp//Useful Info Events//EN')
lines.append('CALSCALE:GREGORIAN')
lines.append('METHOD:PUBLISH')
lines.append('X-WR-CALNAME:Useful Info')
for r in rows:
ev_id = r[0]
dt_str = r[1]
length_str = r[2]
title = r[3] or ''
desc = r[4] or ''
source = r[5] or ''
date_label = r[6] or ''
short_text = r[7] or ''
tags = r[8] or []
# Try to parse DTSTART/DTEND
all_day = not _has_time_component(str(dt_str))
dtstart = None
dtend = None
try:
if dt_str:
parsed = dtparser.parse(str(dt_str), fuzzy=True)
if all_day:
# All-day event
dtstart = parsed.date()
dtend = (parsed.date() + timedelta(days=1))
else:
dtstart = parsed
minutes = _parse_minutes(str(length_str))
dtend = parsed + timedelta(minutes=minutes)
except Exception:
# If we cannot parse date, skip this event
continue
lines.append('BEGIN:VEVENT')
lines.append('UID:usefulinfo-event-%s@gavilan' % ev_id)
lines.append('DTSTAMP:%s' % now_utc)
if all_day and dtstart and dtend:
lines.append('DTSTART;VALUE=DATE:%s' % dtstart.strftime('%Y%m%d'))
lines.append('DTEND;VALUE=DATE:%s' % dtend.strftime('%Y%m%d'))
elif dtstart and dtend:
lines.append('DTSTART:%s' % _format_dt(dtstart))
lines.append('DTEND:%s' % _format_dt(dtend))
if title:
lines.append('SUMMARY:%s' % title.replace('\n', ' ').replace('\r', ' '))
full_desc = desc
extra = []
if short_text:
extra.append('Context: ' + short_text)
if source or date_label:
extra.append('Source: %s Date label: %s' % (source, date_label))
if extra:
if full_desc:
full_desc += '\n\n' + '\n'.join(extra)
else:
full_desc = '\n'.join(extra)
if full_desc:
# Basic escaping of commas/semicolons per RFC is often needed; we keep it simple here
lines.append('DESCRIPTION:%s' % full_desc.replace('\r', ' ').replace('\n', '\\n'))
if tags:
try:
cats = ','.join([t for t in tags if t])
if cats:
lines.append('CATEGORIES:%s' % cats)
except Exception:
pass
lines.append('END:VEVENT')
lines.append('END:VCALENDAR')
# Write file
with open(filepath, 'w', encoding='utf-8') as f:
f.write("\r\n".join(lines) + "\r\n")
if __name__ == "__main__":
print ('')
options = {
1: ['all teachers', teachers_by_term],
2: ['courses in term', courses_in_term],
3: ['pages in term', pages_in_term],
4: ['new students this semester', users_new_this_semester],
5: ['all semester courses + teachers', all_sem_courses_teachers],
6: ['Populate schedule table and correlate to courses table', courses_to_sched],
7: ['refresh db schedule 1 semester', refresh_semester_schedule_db],
}
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
resp = int(sys.argv[1])
print("\n\nPerforming: %s\n\n" % options[resp][0])
else:
print ('')
for key in options:
print(str(key) + '.\t' + options[key][0])
print('')
resp = input('Choose: ')
# Call the function in the options dict
options[ int(resp)][1]()