import util import codecs, json, requests, re, csv, datetime, os, jsondiff, os.path import sys, shutil, hmac, hashlib, base64, schedule, time, pathlib from datetime import timedelta from canvas_secrets import apiKey, apiSecret, FTP_SITE, FTP_USER, FTP_PW, url, domain, account_id, header, header_media from canvas_secrets import instructure_url, instructure_username, instructure_private_key import os, asyncio from dap.api import DAPClient from dap.dap_types import Credentials from dap.integration.database import DatabaseConnection from dap.replicator.sql import SQLReplicator """ Everything to do with fetching data, - From iLearn, via token - current roster uploads from instructures sftp site - raw logs and other from canvas data repo - from ssb, use firefox to scrape the schedule And some subsequent processing: - Raw roster files, into a more compact json format - Raw logs into something more useful """ verbose = False users = {} users_by_id = {} # todo: all these constants for SSB -- line 1008 # # todo: https://stackoverflow.com/questions/42656247/how-can-i-use-canvas-data-rest-api-using-python sys.setrecursionlimit( 100000 ) local_data_folder = 'cache/canvas_data/' mylog = codecs.open(local_data_folder + 'temp_log.txt','w') class FetchError(Exception): pass DEBUG = 0 def d(s,end=''): global DEBUG if end and DEBUG: print(s,end=end) elif DEBUG: print(s) ################ ################ CANVAS API MAIN FETCHING FUNCTIONS ################ ################ ################ # Main canvas querying fxn def fetch(target,verbose=0,params=0,media=0): # if there are more results, recursivly call myself, adding on to the results. results = 0 if target[0:4] != "http": target = url + target if verbose: print("++ Fetching: " + target) if media: r2 = requests.get(target, headers = header_media) elif params: r2 = requests.get(target, headers = header, params = params) else: r2 = requests.get(target, headers = header) #if verbose: #print "++ Got: " + r2.text try: results = json.loads(r2.text) count = len(results) except: print("-- Failed to parse: ", r2.text) if verbose: print("Got %i results" % count) if verbose > 1: print(r2.headers) tempout = codecs.open('cache/fetchcache.txt','a','utf-8') tempout.write(r2.text+"\n\n") tempout.close() if ('link' in r2.headers and count > 0): links = r2.headers['link'].split(',') for L in links: ll = L.split(';') link = ll[0].replace("<","") link = link.replace(">","") if re.search(r'next', ll[1]): if (verbose): print("++ More link: " + link) #link = re.sub(r'per_page=10$', 'per_page=100', link) # link.replace('per_page=10','per_page=500') #if (verbose): print("++ More link: " + link) nest = fetch(link,verbose,params,media) if isinstance(results,dict): results.update(nest) else: results.extend(nest) return results # Main canvas querying fxn - stream version - don't die on big requests def fetch_stream(target,verbose=0): # if there are more results, recursivly call myself, adding on to the results. results = 0 while target: if target[0:4] != "http": target = url + target if verbose: print("++ Fetching: " + target) r2 = requests.get(target, headers = header) if r2.status_code == 502: raise FetchError() try: results = json.loads(r2.text) count = len(results) except: print("-- Failed to parse: ", r2.text) if verbose: print("Got %i results" % count) if verbose > 1: print(r2.headers) tempout = codecs.open('cache/fetchcache.txt','a','utf-8') tempout.write(r2.text+"\n\n") tempout.close() next_link_found = 0 if ('link' in r2.headers and count > 0): links = r2.headers['link'].split(',') for L in links: ll = L.split(';') link = ll[0].replace("<","") link = link.replace(">","") if re.search(r'next', ll[1]): target = link next_link_found = 1 break if not next_link_found: target = 0 yield results # for dicts with one key, collapse that one key out, cause # paging makes problems... example: enrollment_terms def fetch_collapse(target,collapse='',verbose=0): # if there are more results, recursivly call myself, adding on to the results. results = 0 if target[0:4] != "http": target = url + target if verbose: print("++ Fetching: " + target) r2 = requests.get(target, headers = header) #if verbose: #print "++ Got: " + r2.text try: results = json.loads(r2.text) except: print("-- Failed to parse: ", r2.text) if verbose: print(r2.headers) if collapse and collapse in results: results = results[collapse] if ('link' in r2.headers): links = r2.headers['link'].split(',') for L in links: ll = L.split(';') link = ll[0].replace("<","") link = link.replace(">","") if re.search(r'next', ll[1]): if (verbose): print("++ More link: " + link) nest = fetch_collapse(link, collapse, verbose) if isinstance(results,dict): results.update(nest) else: results.extend(nest) return results ################ ################ CANVAS DATA ################ ################ ################ # Get canvas data 2024 style def canvas_data_2024_run(): print("Updating all tables.") asyncio.run(canvas_data_2024()) print("Done with all tables.") async def canvas_data_2024(): base_url: str = os.environ["DAP_API_URL"] client_id: str = os.environ["DAP_CLIENT_ID"] client_secret: str = os.environ["DAP_CLIENT_SECRET"] #connection_string: str = "postgresql://postgres:rolley34@192.168.1.6/db" # todo: use secrets connection_string: str = "postgresql://postgres:rolley34@192.168.1.199/db" desired_tables = "users,courses,communication_channels,context_modules,conversation_message_participants,conversation_messages,conversation_participants,conversations,course_sections,enrollment_states,enrollment_dates_overrides,enrollment_terms,enrollments,learning_outcome_groups,learning_outcome_question_results,learning_outcomes,pseudonyms,quizzes,scores,submissions,submission_versions,wiki_pages,wikis".split(',') credentials = Credentials.create(client_id=client_id, client_secret=client_secret) async with DatabaseConnection(connection_string).open() as db_connection: async with DAPClient(base_url, credentials) as session: #tables = await session.get_tables("canvas") for table in desired_tables: print(f" trying to update {table} ") try: #await SQLReplicator(session, db_connection).initialize("canvas", table) await SQLReplicator(session, db_connection).synchronize("canvas", table) except Exception as e: print(f" - skipping {table} because {e}") # Get canvas data 2024 style def setup_canvas_data_2024_run(): print("Setting up all tables.") asyncio.run(setup_canvas_data_2024()) print("Done with all tables.") async def setup_canvas_data_2024(): base_url: str = os.environ["DAP_API_URL"] client_id: str = os.environ["DAP_CLIENT_ID"] client_secret: str = os.environ["DAP_CLIENT_SECRET"] #connection_string: str = "postgresql://postgres:rolley34@192.168.1.6/db" connection_string: str = "postgresql://postgres:rolley34@192.168.1.192/db" desired_tables = "users,courses,communication_channels,context_modules,conversation_message_participants,conversation_messages,conversation_participants,conversations,course_sections,enrollment_states,enrollment_dates_overrides,enrollment_terms,enrollments,learning_outcome_groups,learning_outcome_question_results,learning_outcomes,pseudonyms,quizzes,scores,submissions,submission_versions,wiki_pages,wikis".split(',') credentials = Credentials.create(client_id=client_id, client_secret=client_secret) async with DatabaseConnection(connection_string).open() as db_connection: async with DAPClient(base_url, credentials) as session: #tables = await session.get_tables("canvas") for table in desired_tables: print(f" {table}") try: await SQLReplicator(session, db_connection).initialize("canvas", table) except Exception as e: print(f" - skipping {table} because {e}") ################ ################ ROSTERS AND REGISTRATION ################ ################ ################ # todo: the pipeline is disorganized. Organize it to have # a hope of taking all this to a higher level. # # todo: where does this belong in the pipeline? compare with recent_schedules() # Take the generically named rosters uploads files and move them to a semester folder and give them a date. def move_to_folder(sem,year,folder,files): semester = year+sem semester_path = 'cache/rosters/%s' % semester if not os.path.isdir('cache/rosters/'+semester): os.makedirs('cache/rosters/'+semester) now = datetime.datetime.now().strftime('%Y-%m-%dT%H-%M') print("+ Moving roster files to folder: %s" % semester_path) if not os.path.isdir(semester_path): print("+ Creating folder: %s" % semester_path) os.makedirs(semester_path) def safe_move(src, dst): try: os.replace(src, dst) except Exception as ex: print(f"! move failed {src} -> {dst}: {ex}") if 'courses.csv' in files: safe_move('cache/rosters/courses-%s.csv' % folder, 'cache/rosters/%s/courses.%s.csv' % (semester,now)) if 'enrollments.csv' in files: safe_move('cache/rosters/enrollments-%s.csv' % folder, 'cache/rosters/%s/enrollments.%s.csv' % (semester,now)) if 'users.csv' in files: safe_move('cache/rosters/users-%s.csv' % folder, 'cache/rosters/%s/users.%s.csv' % (semester,now)) if 'login.csv' in files: safe_move('cache/rosters/login-%s.csv' % folder, 'cache/rosters/%s/login.%s.csv' % (semester,now)) # Take raw upload (csv) files and make one big json out of them. # This relates to enrollment files, not schedule. def convert_roster_files(semester="",year="",folder=""): if not semester: semester = input("the semester? (ex: spring) ") folder = input("Folder? (ex 2020-02-25-14-58-20) ") uf = open('cache/rosters/users-'+folder+'.csv','r') cf = open('cache/rosters/courses-'+folder+'.csv','r') ef = open('cache/rosters/enrollments-'+folder+'.csv','r') u = csv.DictReader(uf) c = csv.DictReader(cf) e = csv.DictReader(ef) uu = [i for i in u] cc = [i for i in c] ee = [i for i in e] uf.close() cf.close() ef.close() myrosterfile = 'cache/rosters/roster_%s_%s.json' % (year, semester) if os.path.exists(myrosterfile): print(" -- Moving previous combined roster json file. opening %s ..." % myrosterfile) last_fileobj = open(myrosterfile,'r') last_file = json.load(last_fileobj) last_fileobj.close() info = last_file[3] last_date = info['date_filestring'] print(' -- writing: cache/rosters/%s%s/roster_%s.json ...' % (year,semester,last_date)) try: os.rename(myrosterfile, 'cache/rosters/%s%s/roster_%s.json ...' % (year,semester,last_date)) print(' -- ok') except Exception as e: print(" ** Failed because i couldn't move the previous roster file: %s" % myrosterfile) print(e) myrosterfile = "new_" + myrosterfile pass #os.remove('cache/old_rosters/roster_'+semester+'.'+last_date+'.json') #os.rename(myrosterfile, 'cache/old_rosters/roster_'+semester+'.'+last_date+'.json') newinfo = {'date_filestring': datetime.datetime.now().strftime('%Y-%m-%dT%H-%M'), } try: new_roster = codecs.open(myrosterfile,'w', 'utf-8') new_roster.write( json.dumps( [uu,cc,ee,newinfo], indent=2 )) new_roster.close() print(" -- Wrote roster info to: %s." % myrosterfile) except Exception as e: print(" ** Failed because i couldn't move the previous roster file: %s" % myrosterfile) print(" ** " + str(e)) # From instructure sftp site def fetch_current_rosters(sftp=None, label_hour=None): """Download roster CSVs from Instructure SFTP and post-process. - If sftp provided, reuse it (already chdir'd to SIS). - Files are saved using label_hour (format YYYY-MM-DD-HH). If None, compute fallback label by flooring to the hour. """ import pysftp def log(msg): ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: with open('cache/pipeline.log.txt','a', encoding='utf-8') as f: f.write(f"[{ts}] {msg}\n") except Exception: print(msg) close_after = False if sftp is None: cnopts = pysftp.CnOpts() cnopts.hostkeys = None sftp = pysftp.Connection(instructure_url, username=instructure_username, private_key=instructure_private_key, cnopts=cnopts) sftp.chdir('SIS') close_after = True try: now = datetime.datetime.now() if not label_hour: # default fallback: floor to hour label_hour = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d-%H') files = set(sftp.listdir()) log(f"fetch_current_rosters: label={label_hour} files={sorted(files)}") need = ['login','users','courses','enrollments'] saved = [] for name in need: remote = f'{name}.csv' target = f'cache/rosters/{name}-{label_hour}.csv' try: if remote in files: if not (os.path.exists(target) and os.path.getsize(target) > 0): temp = target + '.part' try: if os.path.exists(temp): os.remove(temp) except Exception: pass sftp.get(remote, temp) # atomic replace os.replace(temp, target) saved.append(remote) log(f' saved {remote} -> {target}') else: log(f' already exists: {target}, skipping') else: log(f' missing on server: {remote}') except Exception as ex: log(f' download failed: {remote} -> {target} err={ex}') if len(saved) >= 3 and 'courses.csv' in saved or os.path.exists(f'cache/rosters/courses-{label_hour}.csv'): try: with open(f'cache/rosters/courses-{label_hour}.csv','r') as courses: courses.readline() a = courses.readline() parts = a.split(',') year = parts[1][0:4] ss = parts[1][4:6] sem = {'30':'spring', '50':'summer', '70':'fall' } this_sem = sem.get(ss, 'spring') log(f"post-process for semester={this_sem} year={year} label={label_hour}") convert_roster_files(this_sem,year,label_hour) move_to_folder(this_sem,year,label_hour,saved) except Exception as expp: log(f'post-processing failed: {expp}') else: log('not enough files to post-process yet') finally: if close_after: try: sftp.close() except Exception: pass def fetch_current_rosters_auto(poll_seconds=15): """Poll Instructure SFTP from the top of each hour until all 4 roster files appear, then download once. - Robust logging to cache/pipeline.log.txt - Stops polling for the remainder of that hour after success - Tries again on the next hour """ import pysftp from contextlib import contextmanager log_path = 'cache/pipeline.log.txt' def log(msg): ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: with open(log_path, 'a', encoding='utf-8') as f: f.write(f"[{ts}] {msg}\n") except Exception: print(f"[log-fail] {msg}") @contextmanager def sftp_conn(): cnopts = pysftp.CnOpts() cnopts.hostkeys = None conn = None try: conn = pysftp.Connection(instructure_url, username=instructure_username, private_key=instructure_private_key, cnopts=cnopts) conn.chdir('SIS') yield conn finally: try: if conn: conn.close() except Exception: pass required = {'login.csv', 'users.csv', 'courses.csv', 'enrollments.csv'} current_window = None # e.g., '2025-08-30-14' window_done = False window_end = None def compute_window(now): """Return (label_hour_str, window_end_dt) if within a window, else (None, None). Window spans [H-5m, H+5m] around each top-of-hour H. If minute >= 55 -> window is next hour If minute <= 5 -> window is current hour Else -> not in window. """ m = now.minute if m >= 55: base = (now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)) elif m <= 5: base = now.replace(minute=0, second=0, microsecond=0) else: return (None, None) label = base.strftime('%Y-%m-%d-%H') end = base + timedelta(minutes=5) # end of window return (label, end) log('fetch_current_rosters_auto: starting poll loop') while True: now = datetime.datetime.now() hour_key = now.strftime('%Y-%m-%d %H') label, enddt = compute_window(now) if label is None: time.sleep(max(5, int(poll_seconds))) continue if label != current_window: current_window = label window_done = False window_end = enddt log(f'New window: {current_window} (until {window_end.strftime("%H:%M:%S")}). Resetting state.') if window_done: time.sleep(5) continue # Poll SFTP for files try: with sftp_conn() as sftp: files = set(sftp.listdir()) missing = list(required - files) if missing: log(f'Not ready yet. Missing: {missing}') else: log('All required files present inside window. Starting download...') try: fetch_current_rosters(sftp=sftp, label_hour=current_window) log('Download complete. Marking window_done for this window.') window_done = True except Exception as ex_dl: import traceback log('Download failed: ' + str(ex_dl)) log(traceback.format_exc()) except Exception as ex_list: import traceback log('SFTP list failed: ' + str(ex_list)) log(traceback.format_exc()) # Check for window timeout try: if not window_done and window_end and now >= window_end: log(f'REPORT: window {current_window} ended without all files. Consider alerting/email.') window_done = True # stop further checks this window except Exception: pass time.sleep(max(5, int(poll_seconds))) ################ ################ SENDING DATA AWAY ################ ################ ################ # Upload a json file to www def put_file(remotepath,localpath, localfile,prompt=1): import pysftp show_all = 0 folder = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') cnopts = pysftp.CnOpts() cnopts.hostkeys = None with pysftp.Connection(FTP_SITE,username=FTP_USER, password=FTP_PW,cnopts=cnopts) as sftp: #todo: these paths #files = sftp.listdir() #print(folder + "\tI see these files on remote: ", files, "\n") sftp.chdir(remotepath) files = sftp.listdir() if show_all: print(folder + "\tI see these files on remote: ", files, "\n") localf = os.listdir(localpath) if show_all: print("I see these local: ", localf) if prompt: input('ready to upload') sftp.put(localpath+localfile, localfile, preserve_mtime=True) sftp.close() #text = result = [] last_type = '' #answer_text = '' answer = [] in_a_list = '' # Get all the images for k,value in doc_objects.items(): tempout.write( "->" + k + "=" + json.dumps(value,indent=2) + "\n\n\n--\n\n") fetched = fetch_doc_image(k,value) list_stack = [] list_depth = 0 last_list_depth = 0 for value in doc_content: tempout.write( json.dumps(value,indent=2) + "\n\n\n") if verbose: print(json.dumps(value, sort_keys=True, indent=4)) tag_fxn = handle_para if 'paragraph' in value: this_text = '' # First we deal with if we're in a list. if 'bullet' in value['paragraph']: # either we're (1)starting a new list, (2)in one (do nothing), # (3)starting a nested one, or (4)finished a nested one. lid = value['paragraph']['bullet']['listId'] if not list_stack: # 1 list_stack.append(lid) else: if not lid == list_stack[0]: if not lid in list_stack: # 3 list_stack.append(lid) else: # 4 x = list_stack.pop() while x != lid: list_stack.pop() elif len(list_stack) > 0: # current para isn't a bullet but we still have a list open. list_stack = [] list_depth = len(list_stack) deeper = list_depth - last_list_depth if deeper > 0: answer.append("