update web interface, tracking registrations

This commit is contained in:
Peter Howell 2025-09-12 23:40:34 +00:00
parent d1111d5f27
commit 742bd77f15
5 changed files with 710 additions and 133 deletions

View File

@ -246,6 +246,10 @@ def users_in_by_depts_live(depts=[], termid='181'):
#print("Dept: %s" % d) #print("Dept: %s" % d)
match = re.search('^(%s)' % d, c['course_code']) match = re.search('^(%s)' % d, c['course_code'])
if match: if match:
if d == "STAT" and match.group() == "STAT":
print("STAT")
else:
continue
print("Getting enrollments for %s" % c['course_code']) print("Getting enrollments for %s" % c['course_code'])
if d in courses_by_by_dept: courses_by_by_dept[d].append(c) if d in courses_by_by_dept: courses_by_by_dept[d].append(c)
else: courses_by_by_dept[d] = [ c, ] else: courses_by_by_dept[d] = [ c, ]
@ -1019,7 +1023,7 @@ def enroll_id_list_to_shell(id_list, shell_id, v=0):
existing_ids = set( [ x['user_id'] for x in existing.values() ]) existing_ids = set( [ x['user_id'] for x in existing.values() ])
if v: print("To Enroll: %s" % str(id_list)) if v: print("To Enroll: %s" % str(id_list))
if v: print("\n\Already Enrolled: %s" % str(existing_ids)) if v: print(r"\n\Already Enrolled: %s" % str(existing_ids))
enroll_us = id_list.difference(existing_ids) enroll_us = id_list.difference(existing_ids)
if v: print("\n\nTO ENROLL %s" % str(enroll_us)) if v: print("\n\nTO ENROLL %s" % str(enroll_us))
@ -1207,6 +1211,18 @@ def enroll_gott_workshops():
json_str = match.group(1) # Extract the JSON string json_str = match.group(1) # Extract the JSON string
try: try:
signups = json.loads(json_str) # Convert to Python list of dicts signups = json.loads(json_str) # Convert to Python list of dicts
# Normalize NBSP and spaces in key fields to make title/date matching robust
def _norm(v):
try:
return str(v).replace('\xa0',' ').strip()
except Exception:
return v
for s in signups:
if isinstance(s, dict):
if 'training' in s and s['training'] is not None:
s['training'] = _norm(s['training'])
if 'date_rsvp' in s and s['date_rsvp'] is not None:
s['date_rsvp'] = _norm(s['date_rsvp'])
#print(json.dumps(signups,indent=2)) #print(json.dumps(signups,indent=2))
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print("Error decoding JSON:", e) print("Error decoding JSON:", e)
@ -1255,7 +1271,8 @@ def enroll_gott_workshops():
#['2025-06-01 17:00:00', 'GOTT 4: Assessments in Digital Learning', 21898], #['2025-06-01 17:00:00', 'GOTT 4: Assessments in Digital Learning', 21898],
#['2025-08-11 13:00:00', 'GOTT 1: Introduction to Online Teaching with Canvas', 23232], #['2025-08-11 13:00:00', 'GOTT 1: Introduction to Online Teaching with Canvas', 23232],
['2025-09-01 17:00:00', r'GOTT 1: Intro to Online Teaching (Canvas, Accessibility and RSI) ', 23270], #['2025-09-01 17:00:00', r'GOTT 1: Intro to Online Teaching (Canvas, Accessibility and RSI) ', 23270],
['2025-09-14 17:00:00', r'GOTT 2: Intro to Asynchronous Online Teaching and Learning', 23290],
] ]
#print(json.dumps(signups,indent=4)) #print(json.dumps(signups,indent=4))
#print(json.dumps(by_email,indent=4)) #print(json.dumps(by_email,indent=4))
@ -1280,7 +1297,8 @@ def enroll_gott_workshops():
'davidamancio791@gmail.com': 'damancio@gavilan.edu', 'davidamancio791@gmail.com': 'damancio@gavilan.edu',
'carissaamunoz83@gmail.com': 'amunoz@gavilan.edu', 'carissaamunoz83@gmail.com': 'amunoz@gavilan.edu',
'jasonwcpa@yahoo.com': 'jwolowitz@gavilan.edu', 'jasonwcpa@yahoo.com': 'jwolowitz@gavilan.edu',
'fam.grzan@charter.net': 'rgrzan@gavilan.edu',
'carissaadangelo@yahoo.com': 'cmunoz@gavilan.edu',
} }
for each_workshop in workshop_ids: for each_workshop in workshop_ids:
@ -1288,12 +1306,18 @@ def enroll_gott_workshops():
# print(f"skipping {wkshp}") # print(f"skipping {wkshp}")
# continue # continue
wkshp_date, wkshp_title, wkshp_shell_id = each_workshop wkshp_date, wkshp_title, wkshp_shell_id = each_workshop
# local normalizer consistent with signup cleaning
def _norm(v):
try:
return str(v).replace('\xa0',' ').strip()
except Exception:
return v
to_enroll = [] to_enroll = []
#from_file = [ L.strip().split(' - ') for L in codecs.open(f'cache/{student_list}', 'r', 'utf-8').readlines() ] #from_file = [ L.strip().split(' - ') for L in codecs.open(f'cache/{student_list}', 'r', 'utf-8').readlines() ]
#print(from_file) #print(from_file)
for s in signups: for s in signups:
if wkshp_date == s['date_rsvp'] and wkshp_title == s['training']: if _norm(wkshp_date) == _norm(s.get('date_rsvp')) and _norm(wkshp_title) == _norm(s.get('training')):
e = s['email'].lower() e = s['email'].lower()
if e in subs: if e in subs:
e = subs[e] e = subs[e]
@ -1418,7 +1442,7 @@ def make_ztc_list(sem='sp20'):
ztc_by_dept = {} ztc_by_dept = {}
for R in responses: for R in responses:
R = re.sub(',Yes','',R) R = re.sub(',Yes','',R)
R = re.sub('\s\s+',',',R) R = re.sub(r'\s\s+',',',R)
parts = R.split(r',') #name courselist yes parts = R.split(r',') #name courselist yes
#print(parts[1]) #print(parts[1])
@ -1903,7 +1927,8 @@ def create_sandboxes():
#(23083, ' Sandbox GOTT1 SU25'), #(23083, ' Sandbox GOTT1 SU25'),
#(23015, ' Sandbox GOTT2 SU25'), #(23015, ' Sandbox GOTT2 SU25'),
#(21898, ' Sandbox GOTT4 SU25'), #(21898, ' Sandbox GOTT4 SU25'),
(23270, ' Sandbox GOTT1 FA25SEPT'), #(23270, ' Sandbox GOTT1 FA25SEPT'),
(23290, ' Sandbox GOTT2 FA25SEPT'),
] ]
filepath = 'cache/sandbox_courses.pkl' filepath = 'cache/sandbox_courses.pkl'

View File

@ -243,6 +243,47 @@ def flask_thread(q):
def useful_info_api_bridge(): def useful_info_api_bridge():
return server.useful_info_api() return server.useful_info_api()
# Bridge roster change APIs and simple pages
@app.route('/api/rosters/changes')
def roster_changes_bridge():
return server.api_roster_changes()
@app.route('/api/rosters/changes/user/<user_id>')
def roster_changes_user_bridge(user_id):
return server.api_roster_changes_by_user(user_id)
@app.route('/api/rosters/changes/course/<course_id>')
def roster_changes_course_bridge(course_id):
return server.api_roster_changes_by_course(course_id)
@app.route('/api/rosters/terms')
def roster_terms_bridge():
return server.api_roster_terms()
@app.route('/api/rosters/users')
def roster_users_bridge():
return server.api_roster_users()
@app.route('/api/rosters/courses')
def roster_courses_bridge():
return server.api_roster_courses()
@app.route('/courses')
def courses_page_bridge():
return server.courses_page()
@app.route('/users')
def users_page_bridge():
return server.users_page()
@app.route('/courses/<course_id>')
def courses_page_deeplink_bridge(course_id):
return server.courses_page_deeplink(course_id)
@app.route('/users/<user_id>')
def users_page_deeplink_bridge(user_id):
return server.users_page_deeplink(user_id)
@app.route('/health') @app.route('/health')
def health(): def health():
return jsonify({'app': 'interactive.py', 'status': 'ok'}) return jsonify({'app': 'interactive.py', 'status': 'ok'})

View File

@ -322,6 +322,337 @@ def move_to_folder(sem,year,folder,files):
if 'login.csv' in files: if 'login.csv' in files:
safe_move('cache/rosters/login-%s.csv' % folder, 'cache/rosters/%s/login.%s.csv' % (semester,now)) safe_move('cache/rosters/login-%s.csv' % folder, 'cache/rosters/%s/login.%s.csv' % (semester,now))
# Build maps from the latest users/courses snapshot for nicer keys/names.
# Return the path to the latest `{prefix}.*.csv` file found under `sem_path`.
def _latest_snapshot_map(sem_path, prefix):
"""Return path to latest `{prefix}.*.csv` file in `sem_path` or None if missing."""
try:
files = [f for f in os.listdir(sem_path) if f.startswith(prefix + '.') and f.endswith('.csv')]
except FileNotFoundError:
return None
def ts_of(name):
try:
label = name[len(prefix)+1:-4]
return datetime.datetime.strptime(label, '%Y-%m-%dT%H-%M')
except Exception:
return datetime.datetime.min
files.sort(key=ts_of)
return os.path.join(sem_path, files[-1]) if files else None
# Helper to read CSV safely into list of dicts.
# Read a CSV file into a list of dict rows; return [] if missing.
def _read_csv_dicts(path):
"""Read a CSV file into a list of normalized dict rows; returns [] if file missing.
- Normalizes header keys to lowercase and strips whitespace.
- Strips whitespace from string values.
"""
rows = []
if not path or not os.path.exists(path):
return rows
with open(path, 'r', encoding='utf-8', newline='') as f:
reader = csv.DictReader(f)
for r in reader:
norm = {}
for k, v in (r.items() if r else []):
nk = (k.strip().lower() if isinstance(k, str) else k)
if isinstance(v, str):
v = v.strip()
norm[nk] = v
if norm:
rows.append(norm)
return rows
# Create user lookup keyed by `user_id` with basic fields for convenience.
# Expected columns: status,user_id,login_id,last_name,first_name,email,password
def _build_user_map(users_csv_path):
"""Return dict keyed by `user_id` with selected fields from users.csv.
Expected columns: status,user_id,login_id,last_name,first_name,email,password
"""
user_map = {}
for r in _read_csv_dicts(users_csv_path):
uid = r.get('user_id')
if not uid:
continue
user_map[str(uid)] = {
'user_id': str(uid),
'login_id': r.get('login_id',''),
'first_name': r.get('first_name',''),
'last_name': r.get('last_name',''),
'email': r.get('email',''),
}
return user_map
# Create course lookup keyed by `course_id` with long/short names and term.
# Expected columns: status,term_id,long_name,short_name,course_id,blueprint_course_id
def _build_course_map(courses_csv_path):
"""Return dict keyed by `course_id` from courses.csv, keeping long/short names.
Expected columns: status,term_id,long_name,short_name,course_id,blueprint_course_id
"""
course_map = {}
for r in _read_csv_dicts(courses_csv_path):
cid = r.get('course_id')
if not cid:
continue
course_map[str(cid)] = {
'course_id': str(cid),
'term_id': r.get('term_id',''),
'long_name': r.get('long_name',''),
'short_name': r.get('short_name',''),
}
return course_map
# Parse a timestamp label like 2025-01-31T14-00 into dt.
# Return datetime or None if parsing fails.
def _parse_label(label):
"""Parse the timestamp label from filenames into a datetime; return None on failure."""
try:
return datetime.datetime.strptime(label, '%Y-%m-%dT%H-%M')
except Exception:
return None
# Compute enrollment changes across semester snapshots and emit JSON indexes.
# Walk enrollments/users/courses snapshots; detect adds/drops/changes; write by_course/by_user JSON.
def compute_enrollment_changes(sem=None, year=None):
"""Walk cache/rosters/<year><sem>/enrollments.*.csv ascending, detect adds/drops/changes.
- If `sem`/`year` omitted, prompt for a semester and resolve via semesters.find_term.
- Emits JSON files by course and by user for easy UI lookup.
"""
if not sem or not year:
try:
import semesters
ans = input("Which semester? (e.g., 'fa25', 'Fall 2025', '2025 Fall'): ").strip()
rec = semesters.find_term(ans)
if not rec or not rec.get('standard'):
print("compute_enrollment_changes: could not parse semester input.")
return
std = rec['standard'] # e.g., 'Fall 2025'
parts = std.split()
season = (parts[0].lower() if len(parts) >= 2 else '').lower()
year = parts[1] if len(parts) >= 2 else ''
season_map = {'spring': 'spring', 'summer': 'summer', 'fall': 'fall', 'winter': 'winter'}
sem = season_map.get(season, season)
except Exception as ex:
print(f"compute_enrollment_changes: semester prompt failed: {ex}")
return
semester = f"{year}{sem}"
sem_path = os.path.join('cache', 'rosters', semester)
if not os.path.isdir(sem_path):
print(f"compute_enrollment_changes: missing folder {sem_path}")
return
# Discover all enrollment snapshots in time order
files = [f for f in os.listdir(sem_path) if f.startswith('enrollments.') and f.endswith('.csv')]
def snap_key(name):
label = name[len('enrollments.'):-4]
dt = _parse_label(label)
return (dt or datetime.datetime.min, label)
files.sort(key=snap_key)
if not files:
print(f"compute_enrollment_changes: no snapshots in {sem_path}")
return
# Build user/course maps from latest snapshots for enrichment
latest_users = _latest_snapshot_map(sem_path, 'users')
latest_courses = _latest_snapshot_map(sem_path, 'courses')
users_map = _build_user_map(latest_users)
courses_map = _build_course_map(latest_courses)
# Collect remote login info across all login snapshots
remote_info = {}
login_files = [f for f in os.listdir(sem_path) if f.startswith('login.') and f.endswith('.csv')]
def login_key(name):
label = name[len('login.'):-4]
dt = _parse_label(label)
return (dt or datetime.datetime.min, label)
login_files.sort(key=login_key)
for fname in login_files:
for r in _read_csv_dicts(os.path.join(sem_path, fname)):
uid = r.get('user_id')
if not uid:
continue
remote_info[str(uid)] = {
'remote': True,
'root_account': r.get('root_account','')
}
# merge remote flags into users_map
for uid, info in remote_info.items():
users_map.setdefault(uid, {'user_id': uid, 'login_id':'', 'first_name':'', 'last_name':'', 'email':''})
users_map[uid].update(info)
def choose_course_key(course_id):
cid = str(course_id or 'unknown')
detail = courses_map.get(cid, {})
# Course key at CRN level uses the course_id directly (e.g., 202570-12345)
key = cid
info = {'course_id': cid, 'long_name': detail.get('long_name',''), 'short_name': detail.get('short_name',''), 'term_id': detail.get('term_id','')}
return key, info
def choose_user_key(user_id):
uid = str(user_id or 'unknown')
info = users_map.get(uid, {})
# Prefer to show user_id (Canvas/SIS ID here), along with convenience fields
return uid, {
'user_id': uid,
'login_id': info.get('login_id',''),
'first_name': info.get('first_name',''),
'last_name': info.get('last_name',''),
'email': info.get('email',''),
'remote': info.get('remote', False),
'root_account': info.get('root_account',''),
}
# Accumulators
by_course = {}
by_user = {}
prev = {}
for fname in files:
label = fname[len('enrollments.'):-4]
snap_time = _parse_label(label)
path = os.path.join(sem_path, fname)
curr = {}
# Build state for this snapshot keyed by (course,user)
for r in _read_csv_dicts(path):
user_id = r.get('user_id')
course_id = r.get('course_id')
if not user_id and not course_id:
continue
key = (str(course_id or ''), str(user_id))
curr[key] = {
'status': r.get('status') or r.get('enrollment_state') or r.get('state') or '',
'role': r.get('role') or r.get('type') or '',
}
# Compare with previous snapshot (including the first, using empty prev for baseline)
all_keys = set(prev.keys()) | set(curr.keys())
for k in all_keys:
before = prev.get(k)
after = curr.get(k)
course_id, user_id = k
course_key, course_info = choose_course_key(course_id)
user_key, user_info = choose_user_key(user_id)
def emit(action, extra=None):
evt = {
'time': (snap_time.isoformat(timespec='minutes') if snap_time else label),
'action': action,
'course_key': course_key,
'course': course_info,
'user_key': user_key,
'user': user_info,
'role': (after or before or {}).get('role',''),
'status': (after or before or {}).get('status',''),
}
if before:
evt['before'] = before
if after:
evt['after'] = after
by_course.setdefault(course_key, []).append(evt)
by_user.setdefault(user_key, []).append(evt)
if before and not after:
# Row disappeared; if last known status was deleted, count as drop; otherwise record anomaly.
if (before.get('status','').lower() == 'deleted'):
emit('drop')
else:
emit('enrollment_row_removed')
elif after and not before:
# New row; if active, it's an add; otherwise note row added.
if (after.get('status','').lower() == 'active'):
emit('add')
elif (after.get('status','').lower() == 'deleted'):
emit('drop')
else:
emit('enrollment_row_added')
elif before and after:
# detect attribute changes
role_changed = before.get('role') != after.get('role')
status_changed = before.get('status') != after.get('status')
if status_changed:
if str(after.get('status','')).lower() == 'active':
emit('add')
elif str(after.get('status','')).lower() == 'deleted':
emit('drop')
else:
emit('status_change')
if role_changed:
emit('role_change')
prev = curr
# Also detect appearance/disappearance in users.csv and courses.csv sequences
def diff_entities(prefix, id_field, emit_fn):
seq = [f for f in os.listdir(sem_path) if f.startswith(prefix + '.') and f.endswith('.csv')]
def key_fn(name):
label = name[len(prefix)+1:-4]
dt = _parse_label(label)
return (dt or datetime.datetime.min, label)
seq.sort(key=key_fn)
prev_ids = set()
for fname in seq:
label = fname[len(prefix)+1:-4]
snap_time = _parse_label(label)
curr_ids = set()
for r in _read_csv_dicts(os.path.join(sem_path, fname)):
vid = r.get(id_field)
if vid:
curr_ids.add(str(vid))
# added
for vid in sorted(curr_ids - prev_ids):
emit_fn('added', vid, snap_time, label)
# removed
for vid in sorted(prev_ids - curr_ids):
emit_fn('removed', vid, snap_time, label)
prev_ids = curr_ids
def emit_user_presence(action, uid, snap_time, label):
user_key, user_info = choose_user_key(uid)
evt = {
'time': (snap_time.isoformat(timespec='minutes') if snap_time else label),
'action': f'user_entry_{action}',
'user_key': user_key,
'user': user_info,
}
by_user.setdefault(user_key, []).append(evt)
def emit_course_presence(action, cid, snap_time, label):
course_key, course_info = choose_course_key(cid)
evt = {
'time': (snap_time.isoformat(timespec='minutes') if snap_time else label),
'action': f'course_entry_{action}',
'course_key': course_key,
'course': course_info,
}
by_course.setdefault(course_key, []).append(evt)
diff_entities('users', 'user_id', emit_user_presence)
diff_entities('courses', 'course_id', emit_course_presence)
# Sort events by time
def sort_key(e):
try:
return datetime.datetime.fromisoformat(e['time'])
except Exception:
return datetime.datetime.min
for k in by_course:
by_course[k].sort(key=sort_key)
for k in by_user:
by_user[k].sort(key=sort_key)
# Write results
out_all = {
'semester': semester,
'generated': datetime.datetime.now().isoformat(timespec='seconds'),
'by_course': by_course,
'by_user': by_user,
}
try:
with open(os.path.join(sem_path, 'enrollment_changes.json'), 'w', encoding='utf-8') as f:
f.write(json.dumps(out_all, indent=2))
print(f"compute_enrollment_changes: wrote {sem_path}/enrollment_changes.json")
except Exception as ex:
print(f"compute_enrollment_changes: failed to write output: {ex}")
# Take raw upload (csv) files and make one big json out of them. # Take raw upload (csv) files and make one big json out of them.
@ -451,6 +782,11 @@ def fetch_current_rosters(sftp=None, label_hour=None):
log(f"post-process for semester={this_sem} year={year} label={label_hour}") log(f"post-process for semester={this_sem} year={year} label={label_hour}")
convert_roster_files(this_sem,year,label_hour) convert_roster_files(this_sem,year,label_hour)
move_to_folder(this_sem,year,label_hour,saved) move_to_folder(this_sem,year,label_hour,saved)
# After moving into semester folder, compute enrollment changes timeline
try:
compute_enrollment_changes(this_sem, year)
except Exception as ex_changes:
log(f'enrollment change computation failed: {ex_changes}')
except Exception as expp: except Exception as expp:
log(f'post-processing failed: {expp}') log(f'post-processing failed: {expp}')
else: else:
@ -606,118 +942,6 @@ def put_file(remotepath,localpath, localfile,prompt=1):
sftp.close() sftp.close()
#text =
result = []
last_type = ''
#answer_text = ''
answer = []
in_a_list = ''
# Get all the images
for k,value in doc_objects.items():
tempout.write( "->" + k + "=" + json.dumps(value,indent=2) + "\n\n\n--\n\n")
fetched = fetch_doc_image(k,value)
list_stack = []
list_depth = 0
last_list_depth = 0
for value in doc_content:
tempout.write( json.dumps(value,indent=2) + "\n\n\n")
if verbose: print(json.dumps(value, sort_keys=True, indent=4))
tag_fxn = handle_para
if 'paragraph' in value:
this_text = ''
# First we deal with if we're in a list.
if 'bullet' in value['paragraph']:
# either we're (1)starting a new list, (2)in one (do nothing),
# (3)starting a nested one, or (4)finished a nested one.
lid = value['paragraph']['bullet']['listId']
if not list_stack: # 1
list_stack.append(lid)
else:
if not lid == list_stack[0]:
if not lid in list_stack: # 3
list_stack.append(lid)
else: # 4
x = list_stack.pop()
while x != lid: list_stack.pop()
elif len(list_stack) > 0:
# current para isn't a bullet but we still have a list open.
list_stack = []
list_depth = len(list_stack)
deeper = list_depth - last_list_depth
if deeper > 0:
answer.append("<ul>" * deeper)
elif deeper < 0:
deeper = -1 * deeper
answer.append("</ul>" * deeper)
if len(list_stack):
tag_fxn = handle_li
# NOW the tag_fxn is either 'para' or 'li'... let's get the styling info next,
elements = value.get('paragraph').get('elements')
if 'paragraphStyle' in value.get('paragraph'):
style = value.get('paragraph').get('paragraphStyle')
if 'namedStyleType' in style:
type = style['namedStyleType']
# and FINALLY, the actual contents.
for elem in elements:
# text content
this_text += read_paragraph_element_2(elem,type)
# image content
if 'inlineObjectElement' in elem:
vpi = elem['inlineObjectElement']
if 'inlineObjectId' in vpi:
ii = vpi['inlineObjectId']
if ii in img_lookup:
img = img_lookup[ii]
h = img_heights[ii]
w = img_widths[ii]
this_text += '<img src="doc_images/%s" width="%i" height="%i" />' % (img,w,h)
# Now for something tricky. Call an appropriate handler, based on:
# (a) what is the paragraph style type?
# (b) is it different from the prev one?
if last_type=='NORMAL_TEXT' and type!=last_type:
if this_text.strip():
result.append(handle_answer(answer))
answer = []
#answer_text = ''
if type=='HEADING_2' and this_text.strip():
result.append( handle_sec(this_text) )
this_text = ''
elif type=='HEADING_3' and this_text.strip():
result.append(handle_question(this_text,bracket))
this_text = ''
else:
if this_text.lower().startswith('tags:'):
tag_fxn = handle_tags
if this_text.lower().startswith('icons:'):
tag_fxn = handle_icons
if this_text.strip():
answer.append(tag_fxn(this_text))
this_text = ''
last_type = type
last_list_depth = list_depth
elif 'table' in value:
pass
result.append(handle_answer(answer))
return json.dumps(result,indent=4)
def process_reg_history(term='fa25'): def process_reg_history(term='fa25'):
from collections import defaultdict from collections import defaultdict
@ -975,6 +1199,7 @@ if __name__ == "__main__":
4: ['Narrative timeline of section updates', process_reg_history], 4: ['Narrative timeline of section updates', process_reg_history],
5: ['Create narrative format all semesters', recreate_all], 5: ['Create narrative format all semesters', recreate_all],
6: ['Recreate reg_data from full reg history', recreate_reg_data], 6: ['Recreate reg_data from full reg history', recreate_reg_data],
7: ['Compute enrollment changes', compute_enrollment_changes],
} }
'''1: ['Re-create schedule csv and json files from raw html',recent_schedules] , '''1: ['Re-create schedule csv and json files from raw html',recent_schedules] ,

281
server.py
View File

@ -102,7 +102,9 @@ def a(t,h): return '<a href="%s">%s</a>' % (h,t)
@app.route('/') @app.route('/')
def homepage(): def homepage():
return tag('h1','Canvas Tools') + br + \ return tag('h1','Canvas Tools') + br + \
a('Useful Emails','/useful-info') + br + br + \ a('Useful Emails','/useful-info') + br + \
a('Course Changes','/courses') + br + \
a('User Changes','/users') + br + br + \
a('Reload server','/rl') + br + \ a('Reload server','/rl') + br + \
a('Shut down','/sd') a('Shut down','/sd')
@ -184,6 +186,28 @@ def useful_info_by_tag(tag):
return useful_info_api() return useful_info_api()
@app.route('/courses')
def courses_page():
return render_template('courses.html')
@app.route('/users')
def users_page():
return render_template('users.html')
@app.route('/courses/<course_id>')
def courses_page_deeplink(course_id):
# Client-side Vue app will read location.pathname and deep-link.
return render_template('courses.html')
@app.route('/users/<user_id>')
def users_page_deeplink(user_id):
# Client-side Vue app will read location.pathname and deep-link.
return render_template('users.html')
def _shutdown_server(): def _shutdown_server():
func = request.environ.get('werkzeug.server.shutdown') func = request.environ.get('werkzeug.server.shutdown')
if func is None: if func is None:
@ -213,6 +237,261 @@ def health():
return jsonify({'app': 'server.py', 'status': 'ok'}), 200 return jsonify({'app': 'server.py', 'status': 'ok'}), 200
# Load enrollment change index JSON for a given term like '2025fall'.
def _load_enrollment_changes(term):
base = os.path.join('cache', 'rosters', term, 'enrollment_changes.json')
if not os.path.exists(base):
return None
try:
with open(base, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return None
# Resolve a term from query args: accept 'term' directly, or 'year'+'sem'.
def _resolve_term(args):
term = args.get('term')
if term:
return term
year = args.get('year')
sem = args.get('sem') # expect 'spring'|'summer'|'fall'
if year and sem:
return f"{year}{sem}"
return None
def _list_terms_with_changes():
"""Scan cache/rosters for terms that contain enrollment_changes.json and return sorted list.
Sort order uses semesters.py by converting 'YYYY<season>' to 'Season YYYY' and then to sort key.
"""
base = os.path.join('cache', 'rosters')
out = []
if not os.path.isdir(base):
return out
# map lowercase season to title
title_map = {'spring': 'Spring', 'summer': 'Summer', 'fall': 'Fall', 'winter': 'Winter'}
try:
from semesters import season_to_number
except Exception:
season_to_number = {'Spring': '30', 'Summer': '50', 'Fall': '70', 'Winter': '10'}
for name in os.listdir(base):
term_path = os.path.join(base, name)
if not os.path.isdir(term_path):
continue
# accept both underscore and hyphen variants
f1 = os.path.join(term_path, 'enrollment_changes.json')
f2 = os.path.join(term_path, 'enrollment-changes.json')
if not (os.path.exists(f1) or os.path.exists(f2)):
continue
# parse 'YYYYseason'
yr = ''.join([c for c in name if c.isdigit()])
season = name[len(yr):]
if not (yr and season):
continue
season_title = title_map.get(season.lower())
if not season_title:
continue
human = f"{season_title} {yr}"
sis_code = f"{yr}{season_to_number.get(season_title, '00')}"
out.append({
'term': name,
'label': human,
'sis_code': sis_code,
'course_prefix': f"{sis_code}-",
'path': term_path,
})
# Sort descending by sis_code (most recent first)
out.sort(key=lambda x: x['sis_code'], reverse=True)
return out
@app.route('/api/rosters/terms')
def api_roster_terms():
return jsonify(_list_terms_with_changes())
def _read_csv_dicts(path):
"""Tolerant CSV reader: returns list of dicts with lowercase, stripped keys/values.
Tries utf-8-sig first, falls back to latin-1.
"""
import csv as _csv
rows = []
if not path or not os.path.exists(path):
return rows
for enc in ('utf-8-sig', 'utf-8', 'latin-1'):
try:
with open(path, 'r', encoding=enc, newline='') as f:
rdr = _csv.DictReader(f)
for r in rdr or []:
norm = {}
for k, v in (r.items() if r else []):
nk = (k.strip().lower() if isinstance(k, str) else k)
if isinstance(v, str):
v = v.strip()
norm[nk] = v
if norm:
rows.append(norm)
break
except Exception:
rows = []
continue
return rows
def _latest_snapshot_file(term, prefix):
base = os.path.join('cache', 'rosters', term)
if not os.path.isdir(base):
return None
try:
files = [f for f in os.listdir(base) if f.startswith(prefix + '.') and f.endswith('.csv')]
def ts_of(name):
label = name[len(prefix)+1:-4]
try:
return datetime.strptime(label, '%Y-%m-%dT%H-%M')
except Exception:
return datetime.min
files.sort(key=ts_of)
if not files:
return None
return os.path.join(base, files[-1])
except Exception:
return None
@app.route('/api/rosters/users')
def api_roster_users():
"""Return latest users list for the given term (from users.*.csv) with optional remote flags from login.*.csv."""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term'}), 400
term_path = os.path.join('cache', 'rosters', term)
users_path = _latest_snapshot_file(term, 'users')
if not users_path or not os.path.exists(users_path):
return jsonify([])
users = []
try:
for r in _read_csv_dicts(users_path):
users.append({
'user_id': r.get('user_id',''),
'login_id': r.get('login_id',''),
'first_name': r.get('first_name',''),
'last_name': r.get('last_name',''),
'email': r.get('email',''),
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Attach remote info from login snapshots if present
try:
rem = {}
files = [f for f in os.listdir(term_path) if f.startswith('login.') and f.endswith('.csv')]
def key_fn(name):
label = name[len('login.'):-4]
try:
return datetime.strptime(label, '%Y-%m-%dT%H-%M')
except Exception:
return datetime.min
files.sort(key=key_fn)
for fname in files:
for r in _read_csv_dicts(os.path.join(term_path, fname)):
uid = r.get('user_id')
if uid:
rem[str(uid)] = r.get('root_account','') or 'remote'
if rem:
for u in users:
if u['user_id'] in rem:
u['remote'] = True
u['root_account'] = rem[u['user_id']]
except Exception:
pass
return jsonify(users)
@app.route('/api/rosters/courses')
def api_roster_courses():
"""Return latest courses list for the given term (from courses.*.csv)."""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term'}), 400
courses_path = _latest_snapshot_file(term, 'courses')
if not courses_path or not os.path.exists(courses_path):
return jsonify([])
rows = []
try:
for r in _read_csv_dicts(courses_path):
rows.append({
'course_id': r.get('course_id',''),
'short_name': r.get('short_name',''),
'long_name': r.get('long_name',''),
'term_id': r.get('term_id',''),
})
except Exception as e:
return jsonify({'error': str(e)}), 500
return jsonify(rows)
# Serve enrollment change data and optional filtered views by user_id or course_id.
@app.route('/api/rosters/changes')
def api_roster_changes():
"""Return enrollment change data; supports optional filters 'user_id' or 'course_id'.
Query params: term=2025fall (or year=2025&sem=fall), user_id=..., course_id=...
"""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term; supply term=YYYY<sem> or year and sem'}), 400
data = _load_enrollment_changes(term)
if data is None:
return jsonify({'error': f'no enrollment_changes.json for term {term}'}), 404
uid = request.args.get('user_id')
cid = request.args.get('course_id')
if uid:
events = data.get('by_user', {}).get(str(uid), [])
return jsonify({'term': term, 'user_id': str(uid), 'events': events})
if cid:
events = data.get('by_course', {}).get(str(cid), [])
return jsonify({'term': term, 'course_id': str(cid), 'events': events})
# No filter: return summary keys only
return jsonify({
'term': term,
'generated': data.get('generated'),
'by_course_keys': sorted(list(data.get('by_course', {}).keys())),
'by_user_keys': sorted(list(data.get('by_user', {}).keys())),
})
# Serve events for a given user_id within a term.
@app.route('/api/rosters/changes/user/<user_id>')
def api_roster_changes_by_user(user_id):
"""Get enrollment/presence events for a specific user_id for the given term."""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term; supply term=YYYY<sem> or year and sem'}), 400
data = _load_enrollment_changes(term)
if data is None:
return jsonify({'error': f'no enrollment_changes.json for term {term}'}), 404
events = data.get('by_user', {}).get(str(user_id), [])
return jsonify({'term': term, 'user_id': str(user_id), 'events': events})
# Serve events for a given course_id within a term.
@app.route('/api/rosters/changes/course/<course_id>')
def api_roster_changes_by_course(course_id):
"""Get enrollment/presence events for a specific course_id (CRN-like) for the given term."""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term; supply term=YYYY<sem> or year and sem'}), 400
data = _load_enrollment_changes(term)
if data is None:
return jsonify({'error': f'no enrollment_changes.json for term {term}'}), 404
events = data.get('by_course', {}).get(str(course_id), [])
return jsonify({'term': term, 'course_id': str(course_id), 'events': events})
if __name__ == '__main__': if __name__ == '__main__':
host = os.environ.get('HOST', '0.0.0.0') host = os.environ.get('HOST', '0.0.0.0')
port = int(os.environ.get('PORT', '5000')) port = int(os.environ.get('PORT', '5000'))

View File

@ -22,6 +22,10 @@ def _open_with_dirs(file, mode='r', *args, **kwargs):
if isinstance(file, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')): if isinstance(file, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')):
_ensure_parent_dir(file) _ensure_parent_dir(file)
finally: finally:
# Avoid RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode
if 'b' in mode and kwargs.get('buffering', None) == 1:
kwargs = dict(kwargs)
kwargs['buffering'] = -1 # use default buffering for binary
return _orig_open(file, mode, *args, **kwargs) return _orig_open(file, mode, *args, **kwargs)
def _codecs_open_with_dirs(filename, mode='r', encoding=None, errors='strict', buffering=1): def _codecs_open_with_dirs(filename, mode='r', encoding=None, errors='strict', buffering=1):
@ -29,6 +33,9 @@ def _codecs_open_with_dirs(filename, mode='r', encoding=None, errors='strict', b
if isinstance(filename, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')): if isinstance(filename, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')):
_ensure_parent_dir(filename) _ensure_parent_dir(filename)
finally: finally:
# Avoid line-buffering with binary modes
if 'b' in mode and buffering == 1:
buffering = -1
return _orig_codecs_open(filename, mode, encoding, errors, buffering) return _orig_codecs_open(filename, mode, encoding, errors, buffering)
# Apply patches once # Apply patches once