diff --git a/depricated.py b/depricated.py index af395c9..b34a5c2 100644 --- a/depricated.py +++ b/depricated.py @@ -4,6 +4,22 @@ # from pipelines - canvas data +def file_doesnt_exist(name): + # Get list of files in current directory + files = os.listdir() + + # Filter out zero-size files and directories + files = [f for f in files if os.path.isfile(f) and os.path.getsize(f) > 0] + + if name in files: + print( f" * file: {name} already exists. not downloading." ) + else: + print( f" * file: {name} downloading." ) + + # Check if the file exists in the filtered list + return not (name in files) + + # read schedule file with an eye toward watching what's filling up def schedule_filling(): diff --git a/pipelines.py b/pipelines.py index 5033536..7e74f56 100644 --- a/pipelines.py +++ b/pipelines.py @@ -308,12 +308,17 @@ def move_to_folder(sem,year,folder,files): if not os.path.isdir(semester_path): print("+ Creating folder: %s" % semester_path) os.makedirs(semester_path) + def safe_move(src, dst): + try: + os.replace(src, dst) + except Exception as ex: + print(f"! move failed {src} -> {dst}: {ex}") if 'courses.csv' in files: - os.rename('cache/rosters/courses-%s.csv' % folder, 'cache/rosters/%s/courses.%s.csv' % (semester,now)) + safe_move('cache/rosters/courses-%s.csv' % folder, 'cache/rosters/%s/courses.%s.csv' % (semester,now)) if 'enrollments.csv' in files: - os.rename('cache/rosters/enrollments-%s.csv' % folder, 'cache/rosters/%s/enrollments.%s.csv' % (semester,now)) + safe_move('cache/rosters/enrollments-%s.csv' % folder, 'cache/rosters/%s/enrollments.%s.csv' % (semester,now)) if 'users.csv' in files: - os.rename('cache/rosters/users-%s.csv' % folder, 'cache/rosters/%s/users.%s.csv' % (semester,now)) + safe_move('cache/rosters/users-%s.csv' % folder, 'cache/rosters/%s/users.%s.csv' % (semester,now)) @@ -374,104 +379,196 @@ def convert_roster_files(semester="",year="",folder=""): -def file_doesnt_exist(name): - # Get list of files in current directory - files = os.listdir() - - # Filter out zero-size files and directories - files = [f for f in files if os.path.isfile(f) and os.path.getsize(f) > 0] - - if name in files: - print( f" * file: {name} already exists. not downloading." ) - else: - print( f" * file: {name} downloading." ) - - # Check if the file exists in the filtered list - return not (name in files) - # From instructure sftp site -def fetch_current_rosters(): +def fetch_current_rosters(sftp=None, label_hour=None): + """Download roster CSVs from Instructure SFTP and post-process. + - If sftp provided, reuse it (already chdir'd to SIS). + - Files are saved using label_hour (format YYYY-MM-DD-HH). If None, compute fallback label by flooring to the hour. + """ import pysftp - cnopts = pysftp.CnOpts() - cnopts.hostkeys = None - with pysftp.Connection(instructure_url,username=instructure_username, private_key=instructure_private_key,cnopts=cnopts) as sftp: + def log(msg): + ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + try: + with open('cache/pipeline.log.txt','a', encoding='utf-8') as f: + f.write(f"[{ts}] {msg}\n") + except Exception: + print(msg) + + close_after = False + if sftp is None: + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + sftp = pysftp.Connection(instructure_url, username=instructure_username, private_key=instructure_private_key, cnopts=cnopts) sftp.chdir('SIS') - files = sftp.listdir() - ff = open('cache/pipeline.log.txt','a') + close_after = True + try: now = datetime.datetime.now() - exact_time = now.strftime('%Y-%m-%d-%H-%M-%S') - rounded_hour = (now.replace(second=0, microsecond=0, minute=0, hour=now.hour) - + timedelta(hours=now.minute//30)) + if not label_hour: + # default fallback: floor to hour + label_hour = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d-%H') - rounded_time = rounded_hour.strftime('%Y-%m-%d-%H') + files = set(sftp.listdir()) + log(f"fetch_current_rosters: label={label_hour} files={sorted(files)}") + need = ['login','users','courses','enrollments'] + saved = [] + for name in need: + remote = f'{name}.csv' + target = f'cache/rosters/{name}-{label_hour}.csv' + try: + if remote in files: + if not (os.path.exists(target) and os.path.getsize(target) > 0): + temp = target + '.part' + try: + if os.path.exists(temp): + os.remove(temp) + except Exception: + pass + sftp.get(remote, temp) + # atomic replace + os.replace(temp, target) + saved.append(remote) + log(f' saved {remote} -> {target}') + else: + log(f' already exists: {target}, skipping') + else: + log(f' missing on server: {remote}') + except Exception as ex: + log(f' download failed: {remote} -> {target} err={ex}') - if len(files)>0: # and 'users.csv' in files: - print(f"--> {exact_time}: I see these files at instructure ftp site:") - [print(f" - {f}") for f in files] - i = 0 - seen_files = [] - check = ['login','users','courses','enrollments'] - - for checking in check: - try: - if f'{checking}.csv' in files and file_doesnt_exist(f'{checking}-{rounded_time}.csv'): - sftp.get(f'{checking}.csv',f'cache/rosters/{checking}-{rounded_time}.csv') - i += 1 - seen_files.append(f'{checking}.csv') - except: - print(f' * {checking}.csv not present') - print(' Saved %i data files in rosters folder.' % i) - ff.write( f" Saved {i} data files: {seen_files}") - - if i>2: - if 'courses.csv' in seen_files: - courses = open(f'cache/rosters/courses-{rounded_time}.csv','r') + if len(saved) >= 3 and 'courses.csv' in saved or os.path.exists(f'cache/rosters/courses-{label_hour}.csv'): + try: + with open(f'cache/rosters/courses-{label_hour}.csv','r') as courses: courses.readline() a = courses.readline() - print(a) - courses.close() - parts = a.split(',') - year = parts[1][0:4] - ss = parts[1][4:6] - sem = {'30':'spring', '50':'summer', '70':'fall' } - this_sem = sem[ss] - print(f" -> This semester is: {this_sem}, {year}" ) - print(f" -> Building data file... {rounded_time}") - convert_roster_files(this_sem,year,rounded_time) - print(' -> moving files...') - ff.write( f" Moved files to folder: {this_sem} {year} {rounded_time}\n") - move_to_folder(this_sem,year,rounded_time,seen_files) - else: - print(" * No courses file. Not moving files.") - ff.write( f" * No courses file. Not moving files.\n") + parts = a.split(',') + year = parts[1][0:4] + ss = parts[1][4:6] + sem = {'30':'spring', '50':'summer', '70':'fall' } + this_sem = sem.get(ss, 'spring') + log(f"post-process for semester={this_sem} year={year} label={label_hour}") + convert_roster_files(this_sem,year,label_hour) + move_to_folder(this_sem,year,label_hour,saved) + except Exception as expp: + log(f'post-processing failed: {expp}') else: - print(f"--> {exact_time}: Don't see files.") - sftp.close() + log('not enough files to post-process yet') + finally: + if close_after: + try: + sftp.close() + except Exception: + pass -def fetch_current_rosters_auto(): - fetch_minute = "56,57,58,59,00,01,02,03,04,05,06".split(",") - for m in fetch_minute: - schedule.every().hour.at(f":{m}").do(fetch_current_rosters) - - #schedule.every().day.at("12:35").do(sync_non_interactive) - #schedule.every().day.at("21:00").do(sync_non_interactive) - - - #print(f"running every hour on the :{fetch_minute}\n") - while True: +def fetch_current_rosters_auto(poll_seconds=15): + """Poll Instructure SFTP from the top of each hour until all 4 roster files appear, then download once. + - Robust logging to cache/pipeline.log.txt + - Stops polling for the remainder of that hour after success + - Tries again on the next hour + """ + import pysftp + from contextlib import contextmanager + + log_path = 'cache/pipeline.log.txt' + + def log(msg): + ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: - schedule.run_pending() - time.sleep(4) - except Exception as e: + with open(log_path, 'a', encoding='utf-8') as f: + f.write(f"[{ts}] {msg}\n") + except Exception: + print(f"[log-fail] {msg}") + + @contextmanager + def sftp_conn(): + cnopts = pysftp.CnOpts() + cnopts.hostkeys = None + conn = None + try: + conn = pysftp.Connection(instructure_url, username=instructure_username, + private_key=instructure_private_key, cnopts=cnopts) + conn.chdir('SIS') + yield conn + finally: + try: + if conn: + conn.close() + except Exception: + pass + + required = {'login.csv', 'users.csv', 'courses.csv', 'enrollments.csv'} + current_window = None # e.g., '2025-08-30-14' + window_done = False + window_end = None + + def compute_window(now): + """Return (label_hour_str, window_end_dt) if within a window, else (None, None). + Window spans [H-5m, H+5m] around each top-of-hour H. + If minute >= 55 -> window is next hour + If minute <= 5 -> window is current hour + Else -> not in window. + """ + m = now.minute + if m >= 55: + base = (now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)) + elif m <= 5: + base = now.replace(minute=0, second=0, microsecond=0) + else: + return (None, None) + label = base.strftime('%Y-%m-%d-%H') + end = base + timedelta(minutes=5) # end of window + return (label, end) + + log('fetch_current_rosters_auto: starting poll loop') + while True: + now = datetime.datetime.now() + hour_key = now.strftime('%Y-%m-%d %H') + label, enddt = compute_window(now) + if label is None: + time.sleep(max(5, int(poll_seconds))) + continue + + if label != current_window: + current_window = label + window_done = False + window_end = enddt + log(f'New window: {current_window} (until {window_end.strftime("%H:%M:%S")}). Resetting state.') + + if window_done: + time.sleep(5) + continue + + # Poll SFTP for files + try: + with sftp_conn() as sftp: + files = set(sftp.listdir()) + missing = list(required - files) + if missing: + log(f'Not ready yet. Missing: {missing}') + else: + log('All required files present inside window. Starting download...') + try: + fetch_current_rosters(sftp=sftp, label_hour=current_window) + log('Download complete. Marking window_done for this window.') + window_done = True + except Exception as ex_dl: + import traceback + log('Download failed: ' + str(ex_dl)) + log(traceback.format_exc()) + except Exception as ex_list: import traceback - print(" ---- * * * Failed with: %s" % str(e)) - ff = open('cache/pipeline.log.txt','a') - ff.write(datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + "\n") - ff.write(traceback.format_exc()+"\n---------\n\n") - ff.close() - #schedule.CancelJob - time.sleep(1) + log('SFTP list failed: ' + str(ex_list)) + log(traceback.format_exc()) + + # Check for window timeout + try: + if not window_done and window_end and now >= window_end: + log(f'REPORT: window {current_window} ended without all files. Consider alerting/email.') + window_done = True # stop further checks this window + except Exception: + pass + + time.sleep(max(5, int(poll_seconds))) # Canvas data, download all new files @@ -1292,7 +1389,12 @@ def process_reg_history(term='fa25'): def recreate_all(): - for x in 'sp20 su20 fa20 sp21 su21 fa21 sp22 su22 fa22 sp23 su23 fa23 sp24'.split(' '): + # Use canonical semester short codes from semesters.py + try: + from semesters import code as SEM_CODES + except Exception: + SEM_CODES = [] + for x in SEM_CODES: try: recreate_reg_data(x) except Exception as e: