import util import codecs, json, requests, re, csv, datetime, os, jsondiff, os.path import sys, shutil, hmac, hashlib, base64, schedule, time, pathlib from datetime import timedelta from canvas_secrets import apiKey, apiSecret, FTP_SITE, FTP_USER, FTP_PW, url, domain, account_id, header, header_media from canvas_secrets import instructure_url, instructure_username, instructure_private_key import os, asyncio from dap.api import DAPClient from dap.dap_types import Credentials from dap.integration.database import DatabaseConnection from dap.replicator.sql import SQLReplicator """ Everything to do with fetching data, - From iLearn, via token - current roster uploads from instructures sftp site - raw logs and other from canvas data repo - from ssb, use firefox to scrape the schedule And some subsequent processing: - Raw roster files, into a more compact json format - Raw logs into something more useful """ verbose = False users = {} users_by_id = {} # todo: all these constants for SSB -- line 1008 # # todo: https://stackoverflow.com/questions/42656247/how-can-i-use-canvas-data-rest-api-using-python sys.setrecursionlimit( 100000 ) local_data_folder = 'cache/canvas_data/' mylog = codecs.open(local_data_folder + 'temp_log.txt','w') class FetchError(Exception): pass DEBUG = 0 def d(s,end=''): global DEBUG if end and DEBUG: print(s,end=end) elif DEBUG: print(s) ################ ################ CANVAS API MAIN FETCHING FUNCTIONS ################ ################ ################ # Main canvas querying fxn def fetch(target,verbose=0,params=0,media=0): # if there are more results, recursivly call myself, adding on to the results. results = 0 if target[0:4] != "http": target = url + target if verbose: print("++ Fetching: " + target) if media: r2 = requests.get(target, headers = header_media) elif params: r2 = requests.get(target, headers = header, params = params) else: r2 = requests.get(target, headers = header) #if verbose: #print "++ Got: " + r2.text try: results = json.loads(r2.text) count = len(results) except: print("-- Failed to parse: ", r2.text) if verbose: print("Got %i results" % count) if verbose > 1: print(r2.headers) tempout = codecs.open('cache/fetchcache.txt','a','utf-8') tempout.write(r2.text+"\n\n") tempout.close() if ('link' in r2.headers and count > 0): links = r2.headers['link'].split(',') for L in links: ll = L.split(';') link = ll[0].replace("<","") link = link.replace(">","") if re.search(r'next', ll[1]): if (verbose): print("++ More link: " + link) #link = re.sub(r'per_page=10$', 'per_page=100', link) # link.replace('per_page=10','per_page=500') #if (verbose): print("++ More link: " + link) nest = fetch(link,verbose,params,media) if isinstance(results,dict): results.update(nest) else: results.extend(nest) return results # Main canvas querying fxn - stream version - don't die on big requests def fetch_stream(target,verbose=0): # if there are more results, recursivly call myself, adding on to the results. results = 0 while target: if target[0:4] != "http": target = url + target if verbose: print("++ Fetching: " + target) r2 = requests.get(target, headers = header) if r2.status_code == 502: raise FetchError() try: results = json.loads(r2.text) count = len(results) except: print("-- Failed to parse: ", r2.text) if verbose: print("Got %i results" % count) if verbose > 1: print(r2.headers) tempout = codecs.open('cache/fetchcache.txt','a','utf-8') tempout.write(r2.text+"\n\n") tempout.close() next_link_found = 0 if ('link' in r2.headers and count > 0): links = r2.headers['link'].split(',') for L in links: ll = L.split(';') link = ll[0].replace("<","") link = link.replace(">","") if re.search(r'next', ll[1]): target = link next_link_found = 1 break if not next_link_found: target = 0 yield results # for dicts with one key, collapse that one key out, cause # paging makes problems... example: enrollment_terms def fetch_collapse(target,collapse='',verbose=0): # if there are more results, recursivly call myself, adding on to the results. results = 0 if target[0:4] != "http": target = url + target if verbose: print("++ Fetching: " + target) r2 = requests.get(target, headers = header) #if verbose: #print "++ Got: " + r2.text try: results = json.loads(r2.text) except: print("-- Failed to parse: ", r2.text) if verbose: print(r2.headers) if collapse and collapse in results: results = results[collapse] if ('link' in r2.headers): links = r2.headers['link'].split(',') for L in links: ll = L.split(';') link = ll[0].replace("<","") link = link.replace(">","") if re.search(r'next', ll[1]): if (verbose): print("++ More link: " + link) nest = fetch_collapse(link, collapse, verbose) if isinstance(results,dict): results.update(nest) else: results.extend(nest) return results ################ ################ CANVAS DATA ################ ################ ################ # Get canvas data 2024 style def canvas_data_2024_run(): print("Updating all tables.") asyncio.run(canvas_data_2024()) print("Done with all tables.") async def canvas_data_2024(): base_url: str = os.environ["DAP_API_URL"] client_id: str = os.environ["DAP_CLIENT_ID"] client_secret: str = os.environ["DAP_CLIENT_SECRET"] #connection_string: str = "postgresql://postgres:rolley34@192.168.1.6/db" # todo: use secrets connection_string: str = "postgresql://postgres:rolley34@192.168.1.199/db" desired_tables = "users,courses,communication_channels,context_modules,conversation_message_participants,conversation_messages,conversation_participants,conversations,course_sections,enrollment_states,enrollment_dates_overrides,enrollment_terms,enrollments,learning_outcome_groups,learning_outcome_question_results,learning_outcomes,pseudonyms,quizzes,scores,submissions,submission_versions,wiki_pages,wikis".split(',') credentials = Credentials.create(client_id=client_id, client_secret=client_secret) async with DatabaseConnection(connection_string).open() as db_connection: async with DAPClient(base_url, credentials) as session: #tables = await session.get_tables("canvas") for table in desired_tables: print(f" trying to update {table} ") try: #await SQLReplicator(session, db_connection).initialize("canvas", table) await SQLReplicator(session, db_connection).synchronize("canvas", table) except Exception as e: print(f" - skipping {table} because {e}") # Get canvas data 2024 style def setup_canvas_data_2024_run(): print("Setting up all tables.") asyncio.run(setup_canvas_data_2024()) print("Done with all tables.") async def setup_canvas_data_2024(): base_url: str = os.environ["DAP_API_URL"] client_id: str = os.environ["DAP_CLIENT_ID"] client_secret: str = os.environ["DAP_CLIENT_SECRET"] #connection_string: str = "postgresql://postgres:rolley34@192.168.1.6/db" connection_string: str = "postgresql://postgres:rolley34@192.168.1.192/db" desired_tables = "users,courses,communication_channels,context_modules,conversation_message_participants,conversation_messages,conversation_participants,conversations,course_sections,enrollment_states,enrollment_dates_overrides,enrollment_terms,enrollments,learning_outcome_groups,learning_outcome_question_results,learning_outcomes,pseudonyms,quizzes,scores,submissions,submission_versions,wiki_pages,wikis".split(',') credentials = Credentials.create(client_id=client_id, client_secret=client_secret) async with DatabaseConnection(connection_string).open() as db_connection: async with DAPClient(base_url, credentials) as session: #tables = await session.get_tables("canvas") for table in desired_tables: print(f" {table}") try: await SQLReplicator(session, db_connection).initialize("canvas", table) except Exception as e: print(f" - skipping {table} because {e}") ################ ################ ROSTERS AND REGISTRATION ################ ################ ################ # todo: the pipeline is disorganized. Organize it to have # a hope of taking all this to a higher level. # # todo: where does this belong in the pipeline? compare with recent_schedules() # Take the generically named rosters uploads files and move them to a semester folder and give them a date. def move_to_folder(sem,year,folder,files): semester = year+sem semester_path = 'cache/rosters/%s' % semester if not os.path.isdir('cache/rosters/'+semester): os.makedirs('cache/rosters/'+semester) now = datetime.datetime.now().strftime('%Y-%m-%dT%H-%M') print("+ Moving roster files to folder: %s" % semester_path) if not os.path.isdir(semester_path): print("+ Creating folder: %s" % semester_path) os.makedirs(semester_path) def safe_move(src, dst): try: os.replace(src, dst) except Exception as ex: print(f"! move failed {src} -> {dst}: {ex}") if 'courses.csv' in files: safe_move('cache/rosters/courses-%s.csv' % folder, 'cache/rosters/%s/courses.%s.csv' % (semester,now)) if 'enrollments.csv' in files: safe_move('cache/rosters/enrollments-%s.csv' % folder, 'cache/rosters/%s/enrollments.%s.csv' % (semester,now)) if 'users.csv' in files: safe_move('cache/rosters/users-%s.csv' % folder, 'cache/rosters/%s/users.%s.csv' % (semester,now)) if 'login.csv' in files: safe_move('cache/rosters/login-%s.csv' % folder, 'cache/rosters/%s/login.%s.csv' % (semester,now)) # Build maps from the latest users/courses snapshot for nicer keys/names. # Return the path to the latest `{prefix}.*.csv` file found under `sem_path`. def _latest_snapshot_map(sem_path, prefix): """Return path to latest `{prefix}.*.csv` file in `sem_path` or None if missing.""" try: files = [f for f in os.listdir(sem_path) if f.startswith(prefix + '.') and f.endswith('.csv')] except FileNotFoundError: return None def ts_of(name): try: label = name[len(prefix)+1:-4] return datetime.datetime.strptime(label, '%Y-%m-%dT%H-%M') except Exception: return datetime.datetime.min files.sort(key=ts_of) return os.path.join(sem_path, files[-1]) if files else None # Helper to read CSV safely into list of dicts. # Read a CSV file into a list of dict rows; return [] if missing. def _read_csv_dicts(path): """Read a CSV file into a list of normalized dict rows; returns [] if file missing. - Normalizes header keys to lowercase and strips whitespace. - Strips whitespace from string values. """ rows = [] if not path or not os.path.exists(path): return rows with open(path, 'r', encoding='utf-8', newline='') as f: reader = csv.DictReader(f) for r in reader: norm = {} for k, v in (r.items() if r else []): nk = (k.strip().lower() if isinstance(k, str) else k) if isinstance(v, str): v = v.strip() norm[nk] = v if norm: rows.append(norm) return rows # Create user lookup keyed by `user_id` with basic fields for convenience. # Expected columns: status,user_id,login_id,last_name,first_name,email,password def _build_user_map(users_csv_path): """Return dict keyed by `user_id` with selected fields from users.csv. Expected columns: status,user_id,login_id,last_name,first_name,email,password """ user_map = {} for r in _read_csv_dicts(users_csv_path): uid = r.get('user_id') if not uid: continue user_map[str(uid)] = { 'user_id': str(uid), 'login_id': r.get('login_id',''), 'first_name': r.get('first_name',''), 'last_name': r.get('last_name',''), 'email': r.get('email',''), } return user_map # Create course lookup keyed by `course_id` with long/short names and term. # Expected columns: status,term_id,long_name,short_name,course_id,blueprint_course_id def _build_course_map(courses_csv_path): """Return dict keyed by `course_id` from courses.csv, keeping long/short names. Expected columns: status,term_id,long_name,short_name,course_id,blueprint_course_id """ course_map = {} for r in _read_csv_dicts(courses_csv_path): cid = r.get('course_id') if not cid: continue course_map[str(cid)] = { 'course_id': str(cid), 'term_id': r.get('term_id',''), 'long_name': r.get('long_name',''), 'short_name': r.get('short_name',''), } return course_map # Parse a timestamp label like 2025-01-31T14-00 into dt. # Return datetime or None if parsing fails. def _parse_label(label): """Parse the timestamp label from filenames into a datetime; return None on failure.""" try: return datetime.datetime.strptime(label, '%Y-%m-%dT%H-%M') except Exception: return None # Compute enrollment changes across semester snapshots and emit JSON indexes. # Walk enrollments/users/courses snapshots; detect adds/drops/changes; write by_course/by_user JSON. def compute_enrollment_changes(sem=None, year=None): """Walk cache/rosters//enrollments.*.csv ascending, detect adds/drops/changes. - If `sem`/`year` omitted, prompt for a semester and resolve via semesters.find_term. - Emits JSON files by course and by user for easy UI lookup. """ if not sem or not year: try: import semesters ans = input("Which semester? (e.g., 'fa25', 'Fall 2025', '2025 Fall'): ").strip() rec = semesters.find_term(ans) if not rec or not rec.get('standard'): print("compute_enrollment_changes: could not parse semester input.") return std = rec['standard'] # e.g., 'Fall 2025' parts = std.split() season = (parts[0].lower() if len(parts) >= 2 else '').lower() year = parts[1] if len(parts) >= 2 else '' season_map = {'spring': 'spring', 'summer': 'summer', 'fall': 'fall', 'winter': 'winter'} sem = season_map.get(season, season) except Exception as ex: print(f"compute_enrollment_changes: semester prompt failed: {ex}") return semester = f"{year}{sem}" sem_path = os.path.join('cache', 'rosters', semester) if not os.path.isdir(sem_path): print(f"compute_enrollment_changes: missing folder {sem_path}") return # Discover all enrollment snapshots in time order files = [f for f in os.listdir(sem_path) if f.startswith('enrollments.') and f.endswith('.csv')] def snap_key(name): label = name[len('enrollments.'):-4] dt = _parse_label(label) return (dt or datetime.datetime.min, label) files.sort(key=snap_key) if not files: print(f"compute_enrollment_changes: no snapshots in {sem_path}") return # Build user/course maps from latest snapshots for enrichment latest_users = _latest_snapshot_map(sem_path, 'users') latest_courses = _latest_snapshot_map(sem_path, 'courses') users_map = _build_user_map(latest_users) courses_map = _build_course_map(latest_courses) # Collect remote login info across all login snapshots remote_info = {} login_files = [f for f in os.listdir(sem_path) if f.startswith('login.') and f.endswith('.csv')] def login_key(name): label = name[len('login.'):-4] dt = _parse_label(label) return (dt or datetime.datetime.min, label) login_files.sort(key=login_key) for fname in login_files: for r in _read_csv_dicts(os.path.join(sem_path, fname)): uid = r.get('user_id') if not uid: continue remote_info[str(uid)] = { 'remote': True, 'root_account': r.get('root_account','') } # merge remote flags into users_map for uid, info in remote_info.items(): users_map.setdefault(uid, {'user_id': uid, 'login_id':'', 'first_name':'', 'last_name':'', 'email':''}) users_map[uid].update(info) def choose_course_key(course_id): cid = str(course_id or 'unknown') detail = courses_map.get(cid, {}) # Course key at CRN level uses the course_id directly (e.g., 202570-12345) key = cid info = {'course_id': cid, 'long_name': detail.get('long_name',''), 'short_name': detail.get('short_name',''), 'term_id': detail.get('term_id','')} return key, info def choose_user_key(user_id): uid = str(user_id or 'unknown') info = users_map.get(uid, {}) # Prefer to show user_id (Canvas/SIS ID here), along with convenience fields return uid, { 'user_id': uid, 'login_id': info.get('login_id',''), 'first_name': info.get('first_name',''), 'last_name': info.get('last_name',''), 'email': info.get('email',''), 'remote': info.get('remote', False), 'root_account': info.get('root_account',''), } # Accumulators by_course = {} by_user = {} prev = {} for fname in files: label = fname[len('enrollments.'):-4] snap_time = _parse_label(label) path = os.path.join(sem_path, fname) curr = {} # Build state for this snapshot keyed by (course,user) for r in _read_csv_dicts(path): user_id = r.get('user_id') course_id = r.get('course_id') if not user_id and not course_id: continue key = (str(course_id or ''), str(user_id)) curr[key] = { 'status': r.get('status') or r.get('enrollment_state') or r.get('state') or '', 'role': r.get('role') or r.get('type') or '', } # Compare with previous snapshot (including the first, using empty prev for baseline) all_keys = set(prev.keys()) | set(curr.keys()) for k in all_keys: before = prev.get(k) after = curr.get(k) course_id, user_id = k course_key, course_info = choose_course_key(course_id) user_key, user_info = choose_user_key(user_id) def emit(action, extra=None): evt = { 'time': (snap_time.isoformat(timespec='minutes') if snap_time else label), 'action': action, 'course_key': course_key, 'course': course_info, 'user_key': user_key, 'user': user_info, 'role': (after or before or {}).get('role',''), 'status': (after or before or {}).get('status',''), } if before: evt['before'] = before if after: evt['after'] = after by_course.setdefault(course_key, []).append(evt) by_user.setdefault(user_key, []).append(evt) if before and not after: # Row disappeared; if last known status was deleted, count as drop; otherwise record anomaly. if (before.get('status','').lower() == 'deleted'): emit('drop') else: emit('enrollment_row_removed') elif after and not before: # New row; if active, it's an add; otherwise note row added. if (after.get('status','').lower() == 'active'): emit('add') elif (after.get('status','').lower() == 'deleted'): emit('drop') else: emit('enrollment_row_added') elif before and after: # detect attribute changes role_changed = before.get('role') != after.get('role') status_changed = before.get('status') != after.get('status') if status_changed: if str(after.get('status','')).lower() == 'active': emit('add') elif str(after.get('status','')).lower() == 'deleted': emit('drop') else: emit('status_change') if role_changed: emit('role_change') prev = curr # Also detect appearance/disappearance in users.csv and courses.csv sequences def diff_entities(prefix, id_field, emit_fn): seq = [f for f in os.listdir(sem_path) if f.startswith(prefix + '.') and f.endswith('.csv')] def key_fn(name): label = name[len(prefix)+1:-4] dt = _parse_label(label) return (dt or datetime.datetime.min, label) seq.sort(key=key_fn) prev_ids = set() for fname in seq: label = fname[len(prefix)+1:-4] snap_time = _parse_label(label) curr_ids = set() for r in _read_csv_dicts(os.path.join(sem_path, fname)): vid = r.get(id_field) if vid: curr_ids.add(str(vid)) # added for vid in sorted(curr_ids - prev_ids): emit_fn('added', vid, snap_time, label) # removed for vid in sorted(prev_ids - curr_ids): emit_fn('removed', vid, snap_time, label) prev_ids = curr_ids def emit_user_presence(action, uid, snap_time, label): user_key, user_info = choose_user_key(uid) evt = { 'time': (snap_time.isoformat(timespec='minutes') if snap_time else label), 'action': f'user_entry_{action}', 'user_key': user_key, 'user': user_info, } by_user.setdefault(user_key, []).append(evt) def emit_course_presence(action, cid, snap_time, label): course_key, course_info = choose_course_key(cid) evt = { 'time': (snap_time.isoformat(timespec='minutes') if snap_time else label), 'action': f'course_entry_{action}', 'course_key': course_key, 'course': course_info, } by_course.setdefault(course_key, []).append(evt) diff_entities('users', 'user_id', emit_user_presence) diff_entities('courses', 'course_id', emit_course_presence) # Sort events by time def sort_key(e): try: return datetime.datetime.fromisoformat(e['time']) except Exception: return datetime.datetime.min for k in by_course: by_course[k].sort(key=sort_key) for k in by_user: by_user[k].sort(key=sort_key) # Write results out_all = { 'semester': semester, 'generated': datetime.datetime.now().isoformat(timespec='seconds'), 'by_course': by_course, 'by_user': by_user, } try: with open(os.path.join(sem_path, 'enrollment_changes.json'), 'w', encoding='utf-8') as f: f.write(json.dumps(out_all, indent=2)) print(f"compute_enrollment_changes: wrote {sem_path}/enrollment_changes.json") except Exception as ex: print(f"compute_enrollment_changes: failed to write output: {ex}") # Take raw upload (csv) files and make one big json out of them. # This relates to enrollment files, not schedule. def convert_roster_files(semester="",year="",folder=""): if not semester: semester = input("the semester? (ex: spring) ") folder = input("Folder? (ex 2020-02-25-14-58-20) ") uf = open('cache/rosters/users-'+folder+'.csv','r') cf = open('cache/rosters/courses-'+folder+'.csv','r') ef = open('cache/rosters/enrollments-'+folder+'.csv','r') u = csv.DictReader(uf) c = csv.DictReader(cf) e = csv.DictReader(ef) uu = [i for i in u] cc = [i for i in c] ee = [i for i in e] uf.close() cf.close() ef.close() myrosterfile = 'cache/rosters/roster_%s_%s.json' % (year, semester) if os.path.exists(myrosterfile): print(" -- Moving previous combined roster json file. opening %s ..." % myrosterfile) last_fileobj = open(myrosterfile,'r') last_file = json.load(last_fileobj) last_fileobj.close() info = last_file[3] last_date = info['date_filestring'] print(' -- writing: cache/rosters/%s%s/roster_%s.json ...' % (year,semester,last_date)) try: os.rename(myrosterfile, 'cache/rosters/%s%s/roster_%s.json ...' % (year,semester,last_date)) print(' -- ok') except Exception as e: print(" ** Failed because i couldn't move the previous roster file: %s" % myrosterfile) print(e) myrosterfile = "new_" + myrosterfile pass #os.remove('cache/old_rosters/roster_'+semester+'.'+last_date+'.json') #os.rename(myrosterfile, 'cache/old_rosters/roster_'+semester+'.'+last_date+'.json') newinfo = {'date_filestring': datetime.datetime.now().strftime('%Y-%m-%dT%H-%M'), } try: new_roster = codecs.open(myrosterfile,'w', 'utf-8') new_roster.write( json.dumps( [uu,cc,ee,newinfo], indent=2 )) new_roster.close() print(" -- Wrote roster info to: %s." % myrosterfile) except Exception as e: print(" ** Failed because i couldn't move the previous roster file: %s" % myrosterfile) print(" ** " + str(e)) # From instructure sftp site def fetch_current_rosters(sftp=None, label_hour=None): """Download roster CSVs from Instructure SFTP and post-process. - If sftp provided, reuse it (already chdir'd to SIS). - Files are saved using label_hour (format YYYY-MM-DD-HH). If None, compute fallback label by flooring to the hour. """ import pysftp def log(msg): ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: with open('cache/pipeline.log.txt','a', encoding='utf-8') as f: f.write(f"[{ts}] {msg}\n") except Exception: print(msg) close_after = False if sftp is None: cnopts = pysftp.CnOpts() cnopts.hostkeys = None sftp = pysftp.Connection(instructure_url, username=instructure_username, private_key=instructure_private_key, cnopts=cnopts) sftp.chdir('SIS') close_after = True try: now = datetime.datetime.now() if not label_hour: # default fallback: floor to hour label_hour = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d-%H') files = set(sftp.listdir()) log(f"fetch_current_rosters: label={label_hour} files={sorted(files)}") need = ['login','users','courses','enrollments'] saved = [] for name in need: remote = f'{name}.csv' target = f'cache/rosters/{name}-{label_hour}.csv' try: if remote in files: if not (os.path.exists(target) and os.path.getsize(target) > 0): temp = target + '.part' try: if os.path.exists(temp): os.remove(temp) except Exception: pass sftp.get(remote, temp) # atomic replace os.replace(temp, target) saved.append(remote) log(f' saved {remote} -> {target}') else: log(f' already exists: {target}, skipping') else: log(f' missing on server: {remote}') except Exception as ex: log(f' download failed: {remote} -> {target} err={ex}') if len(saved) >= 3 and 'courses.csv' in saved or os.path.exists(f'cache/rosters/courses-{label_hour}.csv'): try: with open(f'cache/rosters/courses-{label_hour}.csv','r') as courses: courses.readline() a = courses.readline() parts = a.split(',') year = parts[1][0:4] ss = parts[1][4:6] sem = {'30':'spring', '50':'summer', '70':'fall' } this_sem = sem.get(ss, 'spring') log(f"post-process for semester={this_sem} year={year} label={label_hour}") convert_roster_files(this_sem,year,label_hour) move_to_folder(this_sem,year,label_hour,saved) # After moving into semester folder, compute enrollment changes timeline try: compute_enrollment_changes(this_sem, year) except Exception as ex_changes: log(f'enrollment change computation failed: {ex_changes}') except Exception as expp: log(f'post-processing failed: {expp}') else: log('not enough files to post-process yet') finally: if close_after: try: sftp.close() except Exception: pass def fetch_current_rosters_auto(poll_seconds=15): """Poll Instructure SFTP from the top of each hour until all 4 roster files appear, then download once. - Robust logging to cache/pipeline.log.txt - Stops polling for the remainder of that hour after success - Tries again on the next hour """ import pysftp from contextlib import contextmanager log_path = 'cache/pipeline.log.txt' def log(msg): ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: with open(log_path, 'a', encoding='utf-8') as f: f.write(f"[{ts}] {msg}\n") except Exception: print(f"[log-fail] {msg}") @contextmanager def sftp_conn(): cnopts = pysftp.CnOpts() cnopts.hostkeys = None conn = None try: conn = pysftp.Connection(instructure_url, username=instructure_username, private_key=instructure_private_key, cnopts=cnopts) conn.chdir('SIS') yield conn finally: try: if conn: conn.close() except Exception: pass required = {'login.csv', 'users.csv', 'courses.csv', 'enrollments.csv'} current_window = None # e.g., '2025-08-30-14' window_done = False window_end = None def compute_window(now): """Return (label_hour_str, window_end_dt) if within a window, else (None, None). Window spans [H-5m, H+5m] around each top-of-hour H. If minute >= 55 -> window is next hour If minute <= 5 -> window is current hour Else -> not in window. """ m = now.minute if m >= 55: base = (now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)) elif m <= 5: base = now.replace(minute=0, second=0, microsecond=0) else: return (None, None) label = base.strftime('%Y-%m-%d-%H') end = base + timedelta(minutes=5) # end of window return (label, end) log('fetch_current_rosters_auto: starting poll loop') while True: now = datetime.datetime.now() hour_key = now.strftime('%Y-%m-%d %H') label, enddt = compute_window(now) if label is None: time.sleep(max(5, int(poll_seconds))) continue if label != current_window: current_window = label window_done = False window_end = enddt log(f'New window: {current_window} (until {window_end.strftime("%H:%M:%S")}). Resetting state.') if window_done: time.sleep(5) continue # Poll SFTP for files try: with sftp_conn() as sftp: files = set(sftp.listdir()) missing = list(required - files) if missing: log(f'Not ready yet. Missing: {missing}') else: log('All required files present inside window. Starting download...') try: fetch_current_rosters(sftp=sftp, label_hour=current_window) log('Download complete. Marking window_done for this window.') window_done = True except Exception as ex_dl: import traceback log('Download failed: ' + str(ex_dl)) log(traceback.format_exc()) except Exception as ex_list: import traceback log('SFTP list failed: ' + str(ex_list)) log(traceback.format_exc()) # Check for window timeout try: if not window_done and window_end and now >= window_end: log(f'REPORT: window {current_window} ended without all files. Consider alerting/email.') window_done = True # stop further checks this window except Exception: pass time.sleep(max(5, int(poll_seconds))) ################ ################ SENDING DATA AWAY ################ ################ ################ # Upload a json file to www def put_file(remotepath,localpath, localfile,prompt=1): import pysftp show_all = 0 folder = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') cnopts = pysftp.CnOpts() cnopts.hostkeys = None with pysftp.Connection(FTP_SITE,username=FTP_USER, password=FTP_PW,cnopts=cnopts) as sftp: #todo: these paths #files = sftp.listdir() #print(folder + "\tI see these files on remote: ", files, "\n") sftp.chdir(remotepath) files = sftp.listdir() if show_all: print(folder + "\tI see these files on remote: ", files, "\n") localf = os.listdir(localpath) if show_all: print("I see these local: ", localf) if prompt: input('ready to upload') sftp.put(localpath+localfile, localfile, preserve_mtime=True) sftp.close() def process_reg_history(term='fa25'): from collections import defaultdict from itertools import groupby from operator import itemgetter def read_grouped_csv(path): with open(path, newline='') as f: fieldnames = ['datetime', 'crn', 'course', 'teacher', 'max', 'enrolled', 'waitlistmax', 'waitlisted'] reader = csv.DictReader(f, fieldnames=fieldnames) rows = sorted(reader, key=lambda r: r['datetime']) # Group by timestamp grouped = {} for ts, group in groupby(rows, key=itemgetter('datetime')): grouped[ts] = {r['crn']: r for r in group} return grouped def crossed_threshold(old_val, new_val, max_val): thresholds = [0.25, 0.5, 0.75, 1.0] if int(max_val) == 0: return False, None old_ratio = int(old_val) / int(max_val) new_ratio = int(new_val) / int(max_val) for t in thresholds: if old_ratio < t <= new_ratio: return True, int(t * 100) return False, None def detect_changes(prev, curr): changes = defaultdict(list) all_crns = prev.keys() | curr.keys() for crn in all_crns: o, n = prev.get(crn), curr.get(crn) if not o: changes[crn].append((n['datetime'], "Section was added.")) elif not n: changes[crn].append(( o['datetime'], f"Section was removed (last seen: teacher {o['teacher']}, " f"{o['enrolled']}/{o['max']} enrolled, {o['waitlisted']}/{o['waitlistmax']} waitlisted)." )) else: dt = n['datetime'] if o['teacher'] != n['teacher']: changes[crn].append((dt, f"Teacher changed from {o['teacher']} to {n['teacher']}.")) if o['enrolled'] != n['enrolled']: crossed, percent = crossed_threshold(o['enrolled'], n['enrolled'], n['max']) if crossed: changes[crn].append((dt, f"Enrollment crossed {percent}% ({n['enrolled']}/{n['max']}).")) if int(n['waitlisted']) > 10 and o['waitlisted'] != n['waitlisted']: changes[crn].append((dt, f"Waitlist exceeds 10: {n['waitlisted']}.")) return changes def time_to_iso(s): return datetime.datetime.strptime(s, "%Y-%m-%dT%H-%M").isoformat() def detect_changes_structured(prev, curr): changes = defaultdict(list) all_crns = prev.keys() | curr.keys() for crn in all_crns: o, n = prev.get(crn), curr.get(crn) if not o: changes[crn].append({'time':time_to_iso(n['datetime']), "type":'section update', 'message': "Section was added."}) elif not n: changes[crn].append( {'time':time_to_iso(o['datetime']), "type":'section update', 'message': "Section was removed.", 'value': o['enrolled'], 'capacity': o['max'], }) else: dt = time_to_iso(n['datetime']) if o['teacher'] != n['teacher']: changes[crn].append({'time':dt, "type":'teacher_change', 'message': f"Teacher changed from {o['teacher']} to {n['teacher']}.", 'old_teacher': o['teacher'], 'new_teacher': n['teacher'], }) if o['enrolled'] != n['enrolled']: crossed, percent = crossed_threshold(o['enrolled'], n['enrolled'], n['max']) if crossed: changes[crn].append({'time':dt, "type":'enrollment_milestone', 'message': f"Enrollment crossed {percent}% ({n['enrolled']}/{n['max']}).", 'percent':percent,'value':n['enrolled'],'capacity':n['max'] }) if int(n['waitlisted']) > 10 and o['waitlisted'] < n['waitlisted']: changes[crn].append({'time':dt, "type":'enrollment_milestone', 'message': f"Waitlist exceeds 10: {n['waitlisted']}).", 'value':n['waitlisted']}) return changes def process_diff_timeline(path): snapshots = read_grouped_csv(path) timeline = sorted(snapshots.keys()) timeline_diffs = [] timeline_diffs_structured = [] course_names = {} # crn -> latest known course name for i in range(1, len(timeline)): prev_ts, curr_ts = timeline[i-1], timeline[i] prev, curr = snapshots[prev_ts], snapshots[curr_ts] # update course name map for crn, row in curr.items(): course_names[crn] = row['course'] delta = detect_changes(prev, curr) timeline_diffs.append(delta) delta_structured = detect_changes_structured(prev,curr) timeline_diffs_structured.append(delta_structured) # Flatten and group by crn crn_changes = defaultdict(list) for delta in timeline_diffs: for crn, changes in delta.items(): crn_changes[crn].extend(changes) # Flatten and group by crn crn_changes_structured = defaultdict(list) for delta in timeline_diffs_structured: for crn, changes in delta.items(): crn_changes_structured[crn].extend(changes) # Sort changes for each CRN by datetime for crn in crn_changes: crn_changes[crn].sort(key=lambda x: x[0]) # Sort changes for each CRN by datetime for crn in crn_changes_structured: crn_changes[crn].sort(key=lambda x: x[0]) return crn_changes, crn_changes_structured, course_names fresh_history = requests.get(f"http://gavilan.cc/schedule/reg_history_{term}.csv").text fresh_file = codecs.open(f'cache/reg_history_{term}.csv','w','utf-8') fresh_file.write(fresh_history) fresh_file.close() output1 = codecs.open(f'cache/reg_timeline_{term}.txt','w','utf-8') output2 = codecs.open(f'cache/reg_timeline_{term}.json','w','utf-8') changes, changes_structured, course_names = process_diff_timeline(f"cache/reg_history_{term}.csv") # once for plain text for crn in sorted(changes, key=lambda c: course_names.get(c, "")): course = course_names.get(crn, "") course_output = {'code': course, 'crn':crn,'events':[]} print(f"\n{course} (CRN {crn}):") output1.write(f"\n{course} (CRN {crn}):\n") for dt, msg in changes[crn]: print(f" [{dt}] {msg}") output1.write(f" [{dt}] {msg}\n") course_output['events'].append({'message':msg, 'time':time_to_iso(dt)}) # again for structured crn_list = [] for crn in sorted(changes_structured, key=lambda c: course_names.get(c, "")): course = course_names.get(crn, "") course_output = {'code': course, 'crn':crn,'events':changes_structured[crn]} crn_list.append(course_output) output2.write( json.dumps(crn_list,indent=2) ) output2.close() def recreate_all(): # Use canonical semester short codes from semesters.py try: from semesters import code as SEM_CODES except Exception: SEM_CODES = [] for x in SEM_CODES: try: recreate_reg_data(x) except Exception as e: print(f'Failed on {x} with: {e}') def recreate_reg_data(term="fa25"): from collections import defaultdict from datetime import datetime def parse_row(row): dt = datetime.strptime(row['datetime'], "%Y-%m-%dT%H-%M") crn = row['crn'] enrolled = int(row['enrolled']) return dt, row['datetime'], crn, enrolled def reduce_latest_per_day(rows): latest = defaultdict(dict) # latest[crn][date] = (dt, ts, enrolled) latest_ts_by_date = {} # date → (dt, ts) for header naming for row in rows: dt, full_ts, crn, enrolled = parse_row(row) date_str = dt.date().isoformat() ts_header = dt.strftime("%Y-%m-%dT%H") # <-- this is what we want # for each crn, per day, keep latest reading if date_str not in latest[crn] or dt > latest[crn][date_str][0]: latest[crn][date_str] = (dt, ts_header, enrolled) # also record latest timestamp per day for consistent column headers if date_str not in latest_ts_by_date or dt > latest_ts_by_date[date_str][0]: latest_ts_by_date[date_str] = (dt, ts_header) return latest, [ts for _, ts in sorted(latest_ts_by_date.values())] def pivot_table(latest, headers): crns = sorted(latest) table = [] for crn in crns: row = [crn] for ts in headers: date_str = ts[:10] # match on YYYY-MM-DD val = latest[crn].get(date_str) if val and val[1] == ts: row.append(str(val[2])) else: row.append("") table.append(row) return ['crn'] + headers, table #with open(f"cache/reg_history_{term}.csv", newline='') as f: from io import StringIO url = f"https://gavilan.cc/schedule/reg_history_{term}.csv" # Download resp = requests.get(url) resp.raise_for_status() # raises if bad status # Wrap the text in a file-like object f = StringIO(resp.text) fieldnames = ['datetime', 'crn', 'course', 'teacher', 'max', 'enrolled', 'waitlistmax', 'waitlisted'] reader = csv.DictReader(f, fieldnames=fieldnames) rows = list(reader) latest, headers = reduce_latest_per_day(rows) header_row, table = pivot_table(latest, headers) with open(f"cache/reg_data_{term}.csv", "w", newline='') as f: writer = csv.writer(f) writer.writerow(header_row) writer.writerows(table) def cvc_report(input_csv="cache/cvc_fa25.csv", out_dir="cache"): import os, re import pandas as pd os.makedirs(out_dir, exist_ok=True) # --- Load & normalize --- df = pd.read_csv(input_csv, dtype=str).fillna("") # Trim header/values df.columns = [c.strip() for c in df.columns] for c in ["Status","Home College name","Course","CCCID","Created On"]: if c in df.columns: df[c] = df[c].astype(str).str.strip() # Parse dates for sorting (keep originals) def try_parse(s): # tolerate 'YYYY-MM-DD hh:mm:ss PDT' and blanks try: return pd.to_datetime(s.replace(" PDT","").replace(" PST",""), errors="coerce") except Exception: return pd.NaT df["_created"] = df["Created On"].apply(try_parse) # Sort by "Created On" then "CCCID" df = df.sort_values(by=["_created","CCCID"], kind="mergesort") # Group by CCCID (for student-level view) # Define gc_student per row, then roll up per student (1 if any row shows Gavilan as Home) df["gc_student"] = (df["Home College name"] == "Gavilan College").astype(int) # Normalize status and success flag status_norm = df["Status"].str.lower().str.strip() df["_status"] = status_norm df["success"] = (df["_status"] == "validated").astype(int) # Department code = leading letters before first number in "Course" def dept_from_course(s): # e.g., "CD32 - Intro..." -> "CD" m = re.match(r"\s*([A-Za-z]+)", s or "") return m.group(1).upper() if m else "UNKNOWN" df["dept"] = df["Course"].apply(dept_from_course) # --- Student-level counts (unique CCCID) --- # For each student, take gc_student = max over their rows if "CCCID" not in df.columns or df["CCCID"].eq("").all(): # Fallback: if CCCID missing, treat each row as a unique student stu = df.assign(_stu_id=df.index.astype(str)).groupby("_stu_id", as_index=False)["gc_student"].max() else: stu = df[df["CCCID"]!=""].groupby("CCCID", as_index=False)["gc_student"].max() students_by_gc = stu.groupby("gc_student", as_index=False).size().rename(columns={"size":"count_students"}) students_by_gc.to_csv(os.path.join(out_dir, "students_by_gc.csv"), index=False) # --- Outcome tallies by gc_student (row-level: sections/attempts) --- outcomes_by_gc = df.groupby(["gc_student","success"], as_index=False).size().rename(columns={"size":"count_rows"}) outcomes_by_gc.to_csv(os.path.join(out_dir, "outcomes_by_gc.csv"), index=False) # --- Department tallies, split by success and gc_student (row-level) --- dept_by_gc_success = ( df.groupby(["gc_student","success","dept"], as_index=False) .size().rename(columns={"size":"count_rows"}) .sort_values(["gc_student","success","dept"]) ) dept_by_gc_success.to_csv(os.path.join(out_dir, "dept_by_gc_success.csv"), index=False) # --- Unsuccessful reasons (status) by gc_student --- reasons_by_gc = ( df[df["success"] == 0] .assign(reason=df["_status"].replace("", "unknown")) .groupby(["gc_student","reason"], as_index=False) .size().rename(columns={"size":"count_rows"}) .sort_values(["gc_student","reason"]) ) reasons_by_gc.to_csv(os.path.join(out_dir, "reasons_by_gc.csv"), index=False) # Optional: quick prints for sanity print("Wrote:") for name in ["students_by_gc.csv","outcomes_by_gc.csv","dept_by_gc_success.csv","reasons_by_gc.csv"]: print(" -", os.path.join(out_dir, name)) # Write to one Excel file with multiple sheets output_xlsx = "cache/csv_fa25.xlsx" with pd.ExcelWriter(output_xlsx, engine="openpyxl") as writer: students_by_gc.to_excel(writer, sheet_name="students_by_gc", index=False) outcomes_by_gc.to_excel(writer, sheet_name="outcomes_by_gc", index=False) dept_by_gc_success.to_excel(writer, sheet_name="dept_by_gc_success", index=False) reasons_by_gc.to_excel(writer, sheet_name="reasons_by_gc", index=False) print("Wrote Excel workbook:", output_xlsx) def test_starfish(): api3 = f'{url}/api/v1/courses/29/gradebook_history/feed' print(f"\n\ntesting: {api3}\n\n") r3 = fetch(api3) print(r3) return api1 = f'{url}/api/v1/courses/29/enrollments' print(f"\n\ntesting: {api1}\n\n") r1 = fetch(api1) print(r1) api2 = f'{url}/api/v1/courses/29/assignments' print(f"\n\ntesting: {api2}\n\n") r2 = fetch(api2) print(r2) if __name__ == "__main__": print ('') options = { 1: ['Fetch rosters on schedule',fetch_current_rosters_auto] , 2: ['Get canvas data 2024 style', canvas_data_2024_run ], 3: ['Set up canvas data 2024 style', setup_canvas_data_2024_run], 4: ['Narrative timeline of section updates', process_reg_history], 5: ['Create narrative format all semesters', recreate_all], 6: ['Recreate reg_data from full reg history', recreate_reg_data], 7: ['Compute enrollment changes', compute_enrollment_changes], 8: ['cvc report parse', cvc_report], 9: ['test starfish account', test_starfish], } '''1: ['Re-create schedule csv and json files from raw html',recent_schedules] , 2: ['Fetch rosters',fetch_current_rosters] , 3: 4: ['Compute how registration is filling up classes', schedule_filling] , 5: ['Manually convert 3 csv files to joined json enrollment file.', convert_roster_files] , 6: ['Canvas data: interactive sync', interactive ], 7: ['Canvas data: automated sync', sync_non_interactive ], 8: 9: 16: ['Scrape schedule from ssb', scrape_schedule_multi ], 14: ['Generate latestart schedule', list_latestarts ], 15: ['Test ssb calls with python', scrape_schedule_py ], 10: ['schedule to db', scrape_for_db ], 11: ['clean argos draft schedule file', argos_data_from_cvc], 12: ['make expanded schedule json files of old semesters', expand_old_semesters ], 13: ['Parse deanza schedule', dza_sched ], ''' if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]() # Testing #if __name__ == "__main__": #users = fetch('/api/v1/courses/69/users?per_page=100',1) #print "These are the users: " #print users #getSemesterSchedule() #get_doc() #pass