fix up roster download
This commit is contained in:
parent
cec2c83cb4
commit
ff5ed654eb
|
|
@ -4,6 +4,22 @@
|
|||
|
||||
# from pipelines - canvas data
|
||||
|
||||
def file_doesnt_exist(name):
|
||||
# Get list of files in current directory
|
||||
files = os.listdir()
|
||||
|
||||
# Filter out zero-size files and directories
|
||||
files = [f for f in files if os.path.isfile(f) and os.path.getsize(f) > 0]
|
||||
|
||||
if name in files:
|
||||
print( f" * file: {name} already exists. not downloading." )
|
||||
else:
|
||||
print( f" * file: {name} downloading." )
|
||||
|
||||
# Check if the file exists in the filtered list
|
||||
return not (name in files)
|
||||
|
||||
|
||||
|
||||
# read schedule file with an eye toward watching what's filling up
|
||||
def schedule_filling():
|
||||
|
|
|
|||
280
pipelines.py
280
pipelines.py
|
|
@ -308,12 +308,17 @@ def move_to_folder(sem,year,folder,files):
|
|||
if not os.path.isdir(semester_path):
|
||||
print("+ Creating folder: %s" % semester_path)
|
||||
os.makedirs(semester_path)
|
||||
def safe_move(src, dst):
|
||||
try:
|
||||
os.replace(src, dst)
|
||||
except Exception as ex:
|
||||
print(f"! move failed {src} -> {dst}: {ex}")
|
||||
if 'courses.csv' in files:
|
||||
os.rename('cache/rosters/courses-%s.csv' % folder, 'cache/rosters/%s/courses.%s.csv' % (semester,now))
|
||||
safe_move('cache/rosters/courses-%s.csv' % folder, 'cache/rosters/%s/courses.%s.csv' % (semester,now))
|
||||
if 'enrollments.csv' in files:
|
||||
os.rename('cache/rosters/enrollments-%s.csv' % folder, 'cache/rosters/%s/enrollments.%s.csv' % (semester,now))
|
||||
safe_move('cache/rosters/enrollments-%s.csv' % folder, 'cache/rosters/%s/enrollments.%s.csv' % (semester,now))
|
||||
if 'users.csv' in files:
|
||||
os.rename('cache/rosters/users-%s.csv' % folder, 'cache/rosters/%s/users.%s.csv' % (semester,now))
|
||||
safe_move('cache/rosters/users-%s.csv' % folder, 'cache/rosters/%s/users.%s.csv' % (semester,now))
|
||||
|
||||
|
||||
|
||||
|
|
@ -374,104 +379,196 @@ def convert_roster_files(semester="",year="",folder=""):
|
|||
|
||||
|
||||
|
||||
def file_doesnt_exist(name):
|
||||
# Get list of files in current directory
|
||||
files = os.listdir()
|
||||
|
||||
# Filter out zero-size files and directories
|
||||
files = [f for f in files if os.path.isfile(f) and os.path.getsize(f) > 0]
|
||||
|
||||
if name in files:
|
||||
print( f" * file: {name} already exists. not downloading." )
|
||||
else:
|
||||
print( f" * file: {name} downloading." )
|
||||
|
||||
# Check if the file exists in the filtered list
|
||||
return not (name in files)
|
||||
|
||||
|
||||
# From instructure sftp site
|
||||
def fetch_current_rosters():
|
||||
def fetch_current_rosters(sftp=None, label_hour=None):
|
||||
"""Download roster CSVs from Instructure SFTP and post-process.
|
||||
- If sftp provided, reuse it (already chdir'd to SIS).
|
||||
- Files are saved using label_hour (format YYYY-MM-DD-HH). If None, compute fallback label by flooring to the hour.
|
||||
"""
|
||||
import pysftp
|
||||
cnopts = pysftp.CnOpts()
|
||||
cnopts.hostkeys = None
|
||||
with pysftp.Connection(instructure_url,username=instructure_username, private_key=instructure_private_key,cnopts=cnopts) as sftp:
|
||||
def log(msg):
|
||||
ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
try:
|
||||
with open('cache/pipeline.log.txt','a', encoding='utf-8') as f:
|
||||
f.write(f"[{ts}] {msg}\n")
|
||||
except Exception:
|
||||
print(msg)
|
||||
|
||||
close_after = False
|
||||
if sftp is None:
|
||||
cnopts = pysftp.CnOpts()
|
||||
cnopts.hostkeys = None
|
||||
sftp = pysftp.Connection(instructure_url, username=instructure_username, private_key=instructure_private_key, cnopts=cnopts)
|
||||
sftp.chdir('SIS')
|
||||
files = sftp.listdir()
|
||||
ff = open('cache/pipeline.log.txt','a')
|
||||
close_after = True
|
||||
try:
|
||||
now = datetime.datetime.now()
|
||||
exact_time = now.strftime('%Y-%m-%d-%H-%M-%S')
|
||||
rounded_hour = (now.replace(second=0, microsecond=0, minute=0, hour=now.hour)
|
||||
+ timedelta(hours=now.minute//30))
|
||||
if not label_hour:
|
||||
# default fallback: floor to hour
|
||||
label_hour = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d-%H')
|
||||
|
||||
rounded_time = rounded_hour.strftime('%Y-%m-%d-%H')
|
||||
files = set(sftp.listdir())
|
||||
log(f"fetch_current_rosters: label={label_hour} files={sorted(files)}")
|
||||
need = ['login','users','courses','enrollments']
|
||||
saved = []
|
||||
for name in need:
|
||||
remote = f'{name}.csv'
|
||||
target = f'cache/rosters/{name}-{label_hour}.csv'
|
||||
try:
|
||||
if remote in files:
|
||||
if not (os.path.exists(target) and os.path.getsize(target) > 0):
|
||||
temp = target + '.part'
|
||||
try:
|
||||
if os.path.exists(temp):
|
||||
os.remove(temp)
|
||||
except Exception:
|
||||
pass
|
||||
sftp.get(remote, temp)
|
||||
# atomic replace
|
||||
os.replace(temp, target)
|
||||
saved.append(remote)
|
||||
log(f' saved {remote} -> {target}')
|
||||
else:
|
||||
log(f' already exists: {target}, skipping')
|
||||
else:
|
||||
log(f' missing on server: {remote}')
|
||||
except Exception as ex:
|
||||
log(f' download failed: {remote} -> {target} err={ex}')
|
||||
|
||||
if len(files)>0: # and 'users.csv' in files:
|
||||
print(f"--> {exact_time}: I see these files at instructure ftp site:")
|
||||
[print(f" - {f}") for f in files]
|
||||
i = 0
|
||||
seen_files = []
|
||||
check = ['login','users','courses','enrollments']
|
||||
|
||||
for checking in check:
|
||||
try:
|
||||
if f'{checking}.csv' in files and file_doesnt_exist(f'{checking}-{rounded_time}.csv'):
|
||||
sftp.get(f'{checking}.csv',f'cache/rosters/{checking}-{rounded_time}.csv')
|
||||
i += 1
|
||||
seen_files.append(f'{checking}.csv')
|
||||
except:
|
||||
print(f' * {checking}.csv not present')
|
||||
print(' Saved %i data files in rosters folder.' % i)
|
||||
ff.write( f" Saved {i} data files: {seen_files}")
|
||||
|
||||
if i>2:
|
||||
if 'courses.csv' in seen_files:
|
||||
courses = open(f'cache/rosters/courses-{rounded_time}.csv','r')
|
||||
if len(saved) >= 3 and 'courses.csv' in saved or os.path.exists(f'cache/rosters/courses-{label_hour}.csv'):
|
||||
try:
|
||||
with open(f'cache/rosters/courses-{label_hour}.csv','r') as courses:
|
||||
courses.readline()
|
||||
a = courses.readline()
|
||||
print(a)
|
||||
courses.close()
|
||||
parts = a.split(',')
|
||||
year = parts[1][0:4]
|
||||
ss = parts[1][4:6]
|
||||
sem = {'30':'spring', '50':'summer', '70':'fall' }
|
||||
this_sem = sem[ss]
|
||||
print(f" -> This semester is: {this_sem}, {year}" )
|
||||
print(f" -> Building data file... {rounded_time}")
|
||||
convert_roster_files(this_sem,year,rounded_time)
|
||||
print(' -> moving files...')
|
||||
ff.write( f" Moved files to folder: {this_sem} {year} {rounded_time}\n")
|
||||
move_to_folder(this_sem,year,rounded_time,seen_files)
|
||||
else:
|
||||
print(" * No courses file. Not moving files.")
|
||||
ff.write( f" * No courses file. Not moving files.\n")
|
||||
parts = a.split(',')
|
||||
year = parts[1][0:4]
|
||||
ss = parts[1][4:6]
|
||||
sem = {'30':'spring', '50':'summer', '70':'fall' }
|
||||
this_sem = sem.get(ss, 'spring')
|
||||
log(f"post-process for semester={this_sem} year={year} label={label_hour}")
|
||||
convert_roster_files(this_sem,year,label_hour)
|
||||
move_to_folder(this_sem,year,label_hour,saved)
|
||||
except Exception as expp:
|
||||
log(f'post-processing failed: {expp}')
|
||||
else:
|
||||
print(f"--> {exact_time}: Don't see files.")
|
||||
sftp.close()
|
||||
log('not enough files to post-process yet')
|
||||
finally:
|
||||
if close_after:
|
||||
try:
|
||||
sftp.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def fetch_current_rosters_auto():
|
||||
fetch_minute = "56,57,58,59,00,01,02,03,04,05,06".split(",")
|
||||
for m in fetch_minute:
|
||||
schedule.every().hour.at(f":{m}").do(fetch_current_rosters)
|
||||
|
||||
#schedule.every().day.at("12:35").do(sync_non_interactive)
|
||||
#schedule.every().day.at("21:00").do(sync_non_interactive)
|
||||
|
||||
|
||||
#print(f"running every hour on the :{fetch_minute}\n")
|
||||
while True:
|
||||
def fetch_current_rosters_auto(poll_seconds=15):
|
||||
"""Poll Instructure SFTP from the top of each hour until all 4 roster files appear, then download once.
|
||||
- Robust logging to cache/pipeline.log.txt
|
||||
- Stops polling for the remainder of that hour after success
|
||||
- Tries again on the next hour
|
||||
"""
|
||||
import pysftp
|
||||
from contextlib import contextmanager
|
||||
|
||||
log_path = 'cache/pipeline.log.txt'
|
||||
|
||||
def log(msg):
|
||||
ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
try:
|
||||
schedule.run_pending()
|
||||
time.sleep(4)
|
||||
except Exception as e:
|
||||
with open(log_path, 'a', encoding='utf-8') as f:
|
||||
f.write(f"[{ts}] {msg}\n")
|
||||
except Exception:
|
||||
print(f"[log-fail] {msg}")
|
||||
|
||||
@contextmanager
|
||||
def sftp_conn():
|
||||
cnopts = pysftp.CnOpts()
|
||||
cnopts.hostkeys = None
|
||||
conn = None
|
||||
try:
|
||||
conn = pysftp.Connection(instructure_url, username=instructure_username,
|
||||
private_key=instructure_private_key, cnopts=cnopts)
|
||||
conn.chdir('SIS')
|
||||
yield conn
|
||||
finally:
|
||||
try:
|
||||
if conn:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
required = {'login.csv', 'users.csv', 'courses.csv', 'enrollments.csv'}
|
||||
current_window = None # e.g., '2025-08-30-14'
|
||||
window_done = False
|
||||
window_end = None
|
||||
|
||||
def compute_window(now):
|
||||
"""Return (label_hour_str, window_end_dt) if within a window, else (None, None).
|
||||
Window spans [H-5m, H+5m] around each top-of-hour H.
|
||||
If minute >= 55 -> window is next hour
|
||||
If minute <= 5 -> window is current hour
|
||||
Else -> not in window.
|
||||
"""
|
||||
m = now.minute
|
||||
if m >= 55:
|
||||
base = (now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1))
|
||||
elif m <= 5:
|
||||
base = now.replace(minute=0, second=0, microsecond=0)
|
||||
else:
|
||||
return (None, None)
|
||||
label = base.strftime('%Y-%m-%d-%H')
|
||||
end = base + timedelta(minutes=5) # end of window
|
||||
return (label, end)
|
||||
|
||||
log('fetch_current_rosters_auto: starting poll loop')
|
||||
while True:
|
||||
now = datetime.datetime.now()
|
||||
hour_key = now.strftime('%Y-%m-%d %H')
|
||||
label, enddt = compute_window(now)
|
||||
if label is None:
|
||||
time.sleep(max(5, int(poll_seconds)))
|
||||
continue
|
||||
|
||||
if label != current_window:
|
||||
current_window = label
|
||||
window_done = False
|
||||
window_end = enddt
|
||||
log(f'New window: {current_window} (until {window_end.strftime("%H:%M:%S")}). Resetting state.')
|
||||
|
||||
if window_done:
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
# Poll SFTP for files
|
||||
try:
|
||||
with sftp_conn() as sftp:
|
||||
files = set(sftp.listdir())
|
||||
missing = list(required - files)
|
||||
if missing:
|
||||
log(f'Not ready yet. Missing: {missing}')
|
||||
else:
|
||||
log('All required files present inside window. Starting download...')
|
||||
try:
|
||||
fetch_current_rosters(sftp=sftp, label_hour=current_window)
|
||||
log('Download complete. Marking window_done for this window.')
|
||||
window_done = True
|
||||
except Exception as ex_dl:
|
||||
import traceback
|
||||
log('Download failed: ' + str(ex_dl))
|
||||
log(traceback.format_exc())
|
||||
except Exception as ex_list:
|
||||
import traceback
|
||||
print(" ---- * * * Failed with: %s" % str(e))
|
||||
ff = open('cache/pipeline.log.txt','a')
|
||||
ff.write(datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + "\n")
|
||||
ff.write(traceback.format_exc()+"\n---------\n\n")
|
||||
ff.close()
|
||||
#schedule.CancelJob
|
||||
time.sleep(1)
|
||||
log('SFTP list failed: ' + str(ex_list))
|
||||
log(traceback.format_exc())
|
||||
|
||||
# Check for window timeout
|
||||
try:
|
||||
if not window_done and window_end and now >= window_end:
|
||||
log(f'REPORT: window {current_window} ended without all files. Consider alerting/email.')
|
||||
window_done = True # stop further checks this window
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
time.sleep(max(5, int(poll_seconds)))
|
||||
|
||||
|
||||
# Canvas data, download all new files
|
||||
|
|
@ -1292,7 +1389,12 @@ def process_reg_history(term='fa25'):
|
|||
|
||||
|
||||
def recreate_all():
|
||||
for x in 'sp20 su20 fa20 sp21 su21 fa21 sp22 su22 fa22 sp23 su23 fa23 sp24'.split(' '):
|
||||
# Use canonical semester short codes from semesters.py
|
||||
try:
|
||||
from semesters import code as SEM_CODES
|
||||
except Exception:
|
||||
SEM_CODES = []
|
||||
for x in SEM_CODES:
|
||||
try:
|
||||
recreate_reg_data(x)
|
||||
except Exception as e:
|
||||
|
|
|
|||
Loading…
Reference in New Issue