canvasapp/schedules.py

1393 lines
47 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# schedule.py
#
# experimenting with manipulating and querying the schedule of courses
#from telnetlib import GA
from sqlite3 import Row
import json, re, sys, os, codecs, csv, time,requests
#from tkinter.tix import ROW
from typing import Generator
from pathlib import Path
from bs4 import BeautifulSoup as bs
from datetime import datetime
from fast_autocomplete import AutoComplete
from io import StringIO
from time import strptime
from deepdiff import DeepDiff
from datetime import datetime as dt
from dateutil import parser
from util import fix_t_name, split_class_dept, split_class_code, split_class_code_letter
import pandas as pd
from semesters import short_to_long
from pipelines import put_file
from collections import defaultdict
from semesters import short_to_sis, sis_to_human
from localcache2 import everyone_teacher_role, iLearn_name_from_goo
import funcy
from canvas_secrets import GOO, GOO_PIN
DEBUG = 0
def d(s,end=''):
global DEBUG
if end and DEBUG: print(s,end=end)
elif DEBUG: print(s)
##
## DEPTS, GUIDED PATHWAY CLUSTERS, AREAS and DEANS
def campus_dept_hierarchy():
courses_csv = '''Course,GP,AreaCode
ACCT,info,cwp
AE,skill,cwp
AH,well,nah
AJ,skill,cwp
AMT,skill,cwp
ANTH,soc,ahss
APE,skill,cwp
ART,art,ahss
ASTR,stem,stem
ATH,well,nah
BIO,stem,stem
BIOT,stem,stem
BOT,info,cwp
BUS,info,cwp
CARP,skill,cwp
CD,skill,cwp
CHEM,stem,stem
CHN,comm,ahss
CMGT,skill,cwp
CMUN,comm,ahss
COMM,comm,ahss
COMMC,comm,ahss
COS,skill,cwp
CSIS,stem,cwp
CUL,skill,cwp
CWE,skill,cwp
DE,comm,stem
DM,info,cwp
DRLT,skill,cwp
ECOL,stem,stem
ECON,info,cwp
ENGL,soc,ahss
ENGLC,soc,ahss
ENGR,stem,stem
ENVS,stem,stem
ESL,comm,cwp
ETHN,comm,ahss
FRNH,comm,ahss
GEOG,stem,stem
GEOL,stem,stem
GUID,soc,c
HE,well,nah
HIST,soc,ahss
HUM,soc,ahss
HVAC,skill,cwp
CGD,skill,cwp
JFT,skill,cwp
JLE,skill,cwp
JOUR,comm,ahss
JPN,comm,ahss
KIN,well,nah
LIB,comm,stem
LIFE,well,nah
MATH,stem,stem
STATC,stem,stem
STAT,stem,stem
MCTV,art,ahss
FTVE,art,ahss
MUS,art,ahss
PHIL,soc,ahss
PHYS,stem,stem
POLS,soc,ahss
POLSC,soc,ahss
PSCI,stem,stem
PSYC,soc,ahss
PSYCC,soc,ahss
RE,skill,cwp
SJS,soc,ahss
SOC,soc,ahss
SPAN,comm,ahss
THEA,art,ahss
WELD,skill,cwp
HORT,skill,cwp
WTRM,skill,cwp
MGMT,skill,cwp
MKTG,skill,cwp
HTM,skill,cwp'''
areas_csv = '''Area,AreaCode,DeanCode,DeanName
Nursing and Allied Health,nah,et,[Dean: Nursing/Allied Health]
Career Education and Workforce Pathways,cwp,ss,Vins Chacko
Arts Humanities and Social Sciences,ahss,nl,[Dean: AHSS]
Counseling,c,de,Diego Espinoza
Student Support and Special Programs,sssp,de,Diego Espinoza
Science Technology Engineering and Mathematics,stem,jn,Jennifer Nari'''
courses_df = pd.read_csv(StringIO(courses_csv))
areas_df = pd.read_csv(StringIO(areas_csv))
# Recreate gp dictionary
course_to_gp = dict(zip(courses_df['Course'], courses_df['GP']))
# Recreate area dictionary
course_to_area = dict(zip(courses_df['Course'], courses_df['AreaCode']))
# Recreate areas dictionary (mapping AreaCode to full Area Name)
areacode_to_area = dict(zip(areas_df['AreaCode'], areas_df['Area']))
# Recreate dean dictionary (mapping Course -> DeanCode)
area_to_dean = dict(zip(areas_df['AreaCode'], areas_df['DeanCode']))
course_to_dean = {course: area_to_dean[area_code] for course, area_code in course_to_area.items()}
# Recreate dean_names dictionary
dean_code_to_name = dict(zip(areas_df['DeanCode'], areas_df['DeanName']))
# Print samples to verify
print("gp:", list(course_to_gp.items())[:5])
print("area:", list(course_to_area.items())[:5])
print("areas:", list(areacode_to_area.items())[:5])
print("dean:", list(course_to_dean.items())[:5])
print("dean_names:", list(dean_code_to_name.items())[:5])
return (course_to_gp, course_to_area, areacode_to_area, area_to_dean, course_to_dean, dean_code_to_name)
##
## SEMESTER FETCHING
##
##
def scrape_schedule_multi():
global SEMESTER, short_sem, semester_begin, filename, filename_html
(gp, course_to_area, areacode_to_area, area_to_dean, dean, dean_code_to_name) = campus_dept_hierarchy()
SEMESTER = 'Spring 2025'
short_sem = 'sp25'
semester_begin = strptime('01/27', '%m/%d')
filename = 'sp25_sched.json'
filename_html = 'sp25_sched.html'
SEM = ['Fall 2022', 'Summer 2022 (View only)', 'Spring 2022 (View only)',
'Fall 2021 (View only)', 'Summer 2021 (View only)', 'Spring 2021 (View only)', 'Fall 2020 (View only)', 'Summer 2020 (View only)', 'Spring 2020 (View only)',
'Fall 2019 (View only)', 'Summer 2019 (View only)', 'Spring 2019 (View only)', 'Fall 2018 (View only)', 'Summer 2018 (View only)', 'Spring 2018 (View only)' ]
srt = 'fa22,su22,sp22,fa21,su21,sp21,fa20,su20,sp20,fa19,su19,sp19,fa18,su18,sp18'.split(',')
beg = ['08/22','06/13','01/31','08/23','06/14','02/01','08/24','06/15','01/27','08/26','06/17','01/28','08/27','06/18','01/29']
#for i in [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14]:
#SEMESTER = SEM[i]
#short_sem = srt[i]
#semester_begin = strptime(beg[i], '%m/%d')
#filename = '%s_sched.json' % short_sem
#filename_html = '%s_sched.html' % short_sem
as_dict = scrape_schedule()
expanded = list_latestarts(short_sem)
fields = "gp,dean,dept,num,code,crn,teacher,name,act,cap,site,type".split(",")
ffcsv = codecs.open('cache/enrollment_%s.csv' % short_sem, 'w', 'utf-8')
with ffcsv as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(fields)
for S in expanded:
parts = S['code'].split(' ')
S['dept'] = parts[0]
S['num'] = parts[1]
S['gp'] = gp[parts[0]]
S['dean'] = dean[parts[0]]
S['sem'] = short_sem
# S['act'] = S['cap']
if S['loc'] == "ONLINE LIVE": S['site'] = 'OnlineLive'
csvwriter.writerow( [ S[x] for x in fields ] )
put_file('/home/public/schedule/', 'cache/', 'enrollment_%s.csv' % short_sem, 0)
# Input: xxxx_sched.json. Output: xxxx_latestarts.txt
def list_latestarts(term="fa23"):
show_summary = 1
the_year = '20' + term[2:4]
print("year: ", the_year, " semester: ", term)
#term_in = "cache/%s_sched.json" % term
term_out = "cache/%s_latestarts.txt" % term
expanded_out = "%s_sched_expanded.json" % term
print("Writing output to " + term_out)
#infile = codecs.open(term_in, "r", "utf-8")
outfile = codecs.open(term_out, "w", "utf-8")
exoutfile = codecs.open('cache/' + expanded_out, "w", "utf-8")
expanded = []
#sched = json.loads(infile.read())
sched = requests.get(f"http://gavilan.cc/schedule/{term}_sched.json").json()
#print sched
by_date = {}
if show_summary: print("course \t loc \t type \t time")
for C in sched:
if (not C['type']) and C['loc'] != 'ONLINE': # and C['time']:
C['type'] = 'in-person'
if show_summary: print("%s \t %s \t %s \t %s" % (C['code'],C['loc'],C['type'],C['time']))
if 'extra' in C:
if 'partofday' in C and ('type' in C['extra'][0]) and (C['extra'][0]['type'] == 'online') and C['loc'] != "ONLINE LIVE":
C['type'] = 'hybrid'
times = C['time'].split("-")
if len(times) > 1:
time_start = times[0]
time_end = times[1]
try:
startt = time.strptime(time_start,"%I:%M %p")
endt = time.strptime(time_end,"%I:%M %p")
min_start = startt.tm_min
min_end = endt.tm_min
if min_start == 0: min_start = "00"
else: min_start = str(min_start)
if min_end == 0: min_end = "00"
else: min_end = str(min_end)
C['time_start'] = "%i:%s" % (startt.tm_hour, min_start )
C['time_end'] = "%i:%s" % (endt.tm_hour, min_end )
if 0:
print("+ Parsed %s into %s and %s." % (C['time'], C['time_start'], C['time_end']))
except Exception as e:
print(e, "\n-- problem parsing time ", time_start, " or ", time_end)
else:
C['time_start'] = ''
C['time_end'] = ''
if re.search('TBA',C['date']):
C['start'] = ''
C['end'] = ''
C['doy'] = ''
expanded.append(C)
continue
parts = C['date'].split("-")
start = parts[0] + "/" + the_year
end = parts[1] + "/" + the_year
try:
startd = parser.parse(start)
endd = parser.parse(end)
C['start'] = "%i-%i" % (startd.month,startd.day)
C['end'] = "%i-%i" % (endd.month,endd.day)
C['doy'] = startd.timetuple().tm_yday
expanded.append(C)
except Exception as e:
print(e, "\n-- problem parsing ", start, " or ", end)
if not startd in by_date:
by_date[startd] = []
by_date[startd].append(C)
exoutfile.write( json.dumps(expanded,indent=2) )
exoutfile.close()
put_file('/home/public/schedule/', 'cache/', expanded_out, 0)
for X in sorted(by_date.keys()):
#print("Start: ", X)
if len(by_date[X]) < 200:
prettydate = X.strftime("%A, %B %d")
#print(prettydate + ": " + str(len(by_date[X])) + " courses")
outfile.write(prettydate + ": " + str(len(by_date[X])) + " courses" + "\n")
for Y in by_date[X]:
#print "\t" + Y['code'] + " " + Y['crn'] + "\t" + Y['teacher']
#print(Y)
#outfile.write("\t" + Y['code'] + " " + Y['crn'] + "\t" + Y['teacher'] + "\t" + Y['type'] +"\n")
outfile.write("\t" + Y['code'] + " " + Y['crn'] + "\t" + Y['teacher'] + "\t" + Y['type'] + "\t" + "\n")
outfile.close()
put_file('/home/public/schedule/', 'cache/', "%s_latestarts.txt" % term, 0)
return expanded
# Schedule / course filling history
# csv headers: crn, code, teacher, datetime, cap, act, wlcap, wlact
# Log the history of enrollments per course during registration
def log_section_filling(current_sched_list, short_sem):
rows = 'timestamp crn code teacher cap act wl_cap wl_act'.split(' ')
rows_j = 'crn code teacher cap act wl_cap wl_act'.split(' ')
print(rows_j)
now = datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')
csv_fn = 'cache/reg_history_' + short_sem + '.csv'
with codecs.open(csv_fn,'a','utf-8') as f:
writer = csv.writer(f)
for S in current_sched_list:
#print(S)
items = [now,]
[ items.append( S[X] ) for X in rows_j ]
writer.writerow(items)
# Same as above, but compressed, act only
def log_section_filling2(current_sched_list, short_sem):
now = datetime.datetime.now().strftime('%Y-%m-%dT%H')
todays_data = { int(S['crn']): S['act'] for S in current_sched_list }
#print(todays_data)
todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now])
todays_df = todays_df.rename_axis('crn')
#print(todays_df)
todays_df.to_csv('cache/reg_today_new.csv', index=True)
try:
myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv')
print(myframe)
except:
fff = open('cache/reg_data_'+short_sem+'.csv','w')
fff.write('crn\n')
fff.close()
myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv')
#myframe = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now])
#myframe = myframe.rename_axis('crn')
print("Creating new data file for this semester.")
new_df = myframe.join( todays_df, on='crn', how='outer' )
new_df = new_df.rename_axis('crn')
print(new_df)
reg_data_filename = 'reg_data_' + short_sem + '.csv'
new_df.to_csv('cache/' + reg_data_filename, index=False)
put_file('/home/public/schedule/', 'cache/', reg_data_filename, 0)
# Use Firefox and log in to ssb and get full schedule. Only works where selenium is installed
def scrape_schedule(short_sem, semester_form_text="SPRING 2025"):
filename = f"{short_sem}_sched.json"
#url = "https://ssb.gavilan.edu/prod/twbkwbis.P_GenMenu?name=bmenu.P_StuMainMnu"
url = "https://ssb-prod.ec.gavilan.edu/PROD/twbkwbis.P_GenMenu?name=bmenu.P_MainMnu"
text = ''
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
try:
driver = webdriver.Firefox()
driver.get(url)
driver.find_element_by_id("UserID").clear()
driver.find_element_by_id("UserID").send_keys(GOO)
driver.find_element_by_name("PIN").send_keys(GOO_PIN)
driver.find_element_by_name("loginform").submit()
driver.implicitly_wait(5)
print(driver.title)
driver.find_element_by_link_text("Students").click()
driver.implicitly_wait(5)
print(driver.title)
driver.find_element_by_link_text("Registration").click()
driver.implicitly_wait(5)
print(driver.title)
driver.find_element_by_link_text("Search for Classes").click()
driver.implicitly_wait(15)
print(driver.title)
dd = Select(driver.find_element_by_name("p_term"))
if (dd):
dd.select_by_visible_text(SEMESTER)
driver.find_element_by_xpath("/html/body/div/div[4]/form").submit()
driver.implicitly_wait(15)
print(driver.title)
driver.find_element_by_xpath("/html/body/div/div[4]/form/input[18]").click()
driver.implicitly_wait(10)
print(driver.title)
driver.find_element_by_name("SUB_BTN").click()
driver.implicitly_wait(40)
time.sleep(15)
driver.implicitly_wait(40)
print(driver.title)
text = driver.page_source
driver.quit()
except Exception as e:
print("Got an exception: ", e)
finally:
print("")
#driver.quit()
codecs.open('cache/' + filename_html,'w', 'utf-8').write(text)
#print(text)
as_list = ssb_to_csv(text)
#print(as_list)
as_dict = to_section_list(as_list)
jj = json.dumps(as_dict,indent=2)
# TODO
try:
ps = codecs.open('cache/'+filename,'r','utf-8')
prev_sched = json.loads(ps.read())
ps.close()
if 1: # sometimes I want to re-run this without affecting the logs.
log_section_filling(as_dict, short_sem)
log_section_filling2(as_dict, short_sem)
dd = DeepDiff(prev_sched, as_dict, ignore_order=True)
pretty_json = json.dumps( json.loads( dd.to_json() ), indent=2 )
codecs.open('cache/%s_sched_diff.json' % short_sem,'w','utf-8').write( pretty_json ) # dd.to_json() )
except Exception as e:
print(e)
print("Can't do diff?")
# Next, rename the prev sched_xxYY.json data file to have its date,
# make this new one, and then upload it to the website.
# Maybe even count the entries and do a little sanity checking
#
# print("Last modified: %s" % time.ctime(os.path.getmtime("test.txt")))
# print("Created: %s" % time.ctime(os.path.getctime("test.txt")))
try:
last_mod = time.ctime(os.path.getmtime('cache/' + filename))
import pathlib
prev_stat = pathlib.Path('cache/' + filename).stat()
mtime = dt.fromtimestamp(prev_stat.st_mtime)
print(mtime)
except:
print("Couldn't Diff.")
# fname = pathlib.Path('test.py')
# assert fname.exists(), f'No such file: {fname}' # check that the file exists
# print(fname.stat())
#
# os.stat_result(st_mode=33206, st_ino=5066549581564298, st_dev=573948050, st_nlink=1, st_uid=0, st_gid=0, st_size=413,
# st_atime=1523480272, st_mtime=1539787740, st_ctime=1523480272)
codecs.open('cache/' + filename, 'w', 'utf-8').write(jj)
put_file('/home/public/schedule/', 'cache/', filename, 0) # /gavilan.edu/_files/php/
return as_dict
def dza_sched():
text = codecs.open('cache/sched_fa22_deanza.html','r','utf-8').read()
as_list = ssb_to_csv(text)
#print(as_list)
as_dict = to_section_list(as_list)
jj = json.dumps(as_dict,indent=2)
codecs.open('cache/fa22_sched_deanza.json','w','utf-8').write(jj)
# recreate schedule json files with most current online schedule format.
def recent_schedules():
# # todo: sems is a global in this file. Is that the right thing to do?
#all_scheds = [ os.listdir( 'cache/rosters/' + short_to_long(s)) for s in sems ]
#for i,s in enumerate(sems):
for s in ['sp21',]:
filename = 'cache/sched_' + s + '.html'
print("Filename is %s" % filename)
input = codecs.open( filename, 'r', 'utf-8').read()
output = ssb_to_csv(input)
csv_fn = 'cache/temp_sched_' + s + '.csv'
if os.path.isfile(csv_fn):
os.remove(csv_fn)
codecs.open(csv_fn,'w','utf-8').write(output)
jsn = to_section_list(output)
jsn_fn = 'cache/semesters/'+short_to_long(s)+'/'+s+'_sched.json'
if os.path.isfile(jsn_fn):
os.remove(jsn_fn)
codecs.open(jsn_fn,'w').write(json.dumps(jsn))
print("I put the most recent schedule JSON files in ./cache/semesters/... folders.")
# Take banner's html and make a csv(?) file
def ssb_to_csv(src):
#out = codecs.open(schedfile,'w','utf-8')
output = 'crn,code,sec,cmp,cred,name,days,time,cap,act,rem,wl_cap,wl_act,wl_rem,teacher,date,loc,ztc,note\n'
b = bs(src, 'html.parser')
tab = b.find(class_="datadisplaytable")
if not tab:
print("hmm... didn't find a 'datadisplaytable' in this html: ")
#print(src)
return 0
rows = tab.find_all('tr')
drows = list(filter(row_has_data,rows))
for dd in drows:
t = row_text(dd)
output += t
return output
# take text lines and condense them to one dict per section
def to_section_list(input_text,verbose=0):
this_course = ''
#todo: no output files
#jout = codecs.open(filename, 'w', 'utf-8')
#input = csv.DictReader(open(schedfile,'r'))
#input = UnicodeDictReader(input_text.splitlines())
all_courses = []
try:
f = StringIO(input_text)
except:
print("ERROR with this input_text:")
print(input_text)
reader = csv.reader(f, delimiter=',')
headers = next(reader)
for r in reader:
d = dict(list(zip(headers,r)))
#pdb.set_trace()
# clean funny unicode char in blank entries
r = {k: clean_funny2(v) for k,v in list(d.items()) }
if verbose: print("Cleaned: " + str(r))
if 'time' in r:
if r['time']=='TBA': r['time'] = ''
if r['time']: r['partofday'] = time_to_partofday(r['time'])
r['type'] = ''
if 'loc' in r:
if r['loc'] == 'ONLINE': r['type'] = 'online'
if r['loc'] == 'ONLINE' and r['time']: r['type'] = 'online live'
if r['loc'] == 'ONLINE LIVE': r['type'] = 'online live'
if r['loc']: r['site'] = room_to_site(r['loc'],verbose)
if 'code' in r:
if re.search(r'ONLINE\sLIVE',r['code']):
r['type'] = 'online live'
elif re.search(r'ONLINE',r['code']):
r['type'] = 'online'
# does it have a section? it is the last course
if r['crn']: # is a new course or a continuation?
if verbose: print(" it's a new section.")
if this_course:
if not this_course['extra']: this_course.pop('extra',None)
all_courses.append(this_course)
this_course = r
#print(r['name'])
this_course['extra'] = []
else:
# is a continuation line
if verbose: print(" additional meeting: " + str(r))
for k,v in list(r.items()):
if not v: r.pop(k,None)
# TODO: if extra line is different type?
#if this_course['type']=='online' and r['type'] != 'online': this_course['type'] = 'hybrid'
#elif this_course['type']!='online' and r['type'] == 'online': this_course['type'] = 'hybrid'
this_course['extra'].append(r)
return all_courses
##
## SCHEDULE PARSE HELPERS
##
##
def time_to_partofday(t):
#todo: account for multiple sites/rows
# 11:20 am-12:10 pm
mor = strptime('12:00 PM', '%I:%M %p')
mid = strptime( '2:00 PM', '%I:%M %p')
aft = strptime( '6:00 PM', '%I:%M %p')
if t == 'TBA':
return 'TBA'
t = t.upper()
parts = t.split('-')
try:
begin = strptime(parts[0], '%I:%M %p')
end = strptime(parts[1], '%I:%M %p')
if end > aft:
return "Evening"
if end > mid:
return "Afternoon"
if end > mor:
return "Midday"
return "Morning"
#return begin,end
except Exception as e:
#print 'problem parsing: ', t, " ",
return ""
# Deduce a 'site' field, based on room name and known offsite locations
def room_to_site(room,verbose=0):
#todo: account for multiple sites/rows
#todo: better way to store these offsite labels
othersites = 'AV,SBHS I-243,SBHS I-244,LOADCS,HOPEH,HOPEG,PLY,SAS,SBHS,LOHS,CHS,SBRAT,'.split(',')
# is it gilroy, mh, hol, other, online or hybrid?
site = 'Gilroy'
#if len(course[0]) > 13:
# room = course[0][13]
if room in othersites:
site = "Other"
if room == 'TBA':
site = 'TBA'
if room == 'AV':
site = 'San Martin Airport'
if re.search('MHG',room):
site = 'Morgan Hill'
if re.search('HOL',room):
site = 'Hollister'
if re.search('COY',room):
site = 'Coyote Valley'
if re.search('OFFSTE',room):
site = 'Other'
if re.search('ONLINE',room):
site = 'Online'
if verbose: print(room, '\t', end=' ')
return site
def row_has_data(r): # helper
if r.find_all('th'):
return False
if len(r.find_all('td')) > 2:
return True
if re.search('Note\:', r.get_text()):
return True
return False
def row_text(r): # helper
#global dbg
d("Row Txt Fxn gets: ")
arr = []
for t in r.find_all('td'):
if t.contents and len(t.contents) and t.contents[0].name == 'img':
arr.append("1")
d("img")
r_text = t.get_text()
arr.append(r_text)
if 'colspan' in t.attrs and t['colspan']=='2':
d('[colspan2]')
arr.append('')
d("\t"+r_text, end=" ")
d('')
if len(arr)==1 and re.search('Note\:',arr[0]):
note_line = clean_funny( arr[0] )
note_line = re.sub(r'\n',' ', note_line)
note_line = re.sub(r'"','', note_line)
#note_line = re.sub(r',','\,', note_line)
return ',,,,,,,,,,,,,,,,,,"' + note_line + '"\n'
del arr[0]
arr[1] = clean_funny(arr[1])
arr[2] = clean_funny(arr[2])
if arr[1]: arr[1] = arr[1] + " " + arr[2]
del arr[2]
arr = [ re.sub(r'&nbsp;','',a) for a in arr]
arr = [ re.sub(',','. ',a) for a in arr]
arr = [ re.sub('\(P\)','',a) for a in arr]
arr = [ a.strip() for a in arr]
#del arr[-1]
r = ','.join(arr)+'\n'
r = re.sub('\n','',r)
r = re.sub('add to worksheet','',r)
d("Row Txt Fxn returns: " + r + "\n\n")
return r + '\n'
def clean_funny(str):
if str and str.encode('utf8') == ' ': return ''
return str
def clean_funny2(str):
if str and str == '\xa0': return ''
if str and str == ' ': return ''
return str
def clean_funny3(str):
return re.sub('\xa0','',str)
# Go to the semesters folder and read the schedule. Return dataframe
def get_semester_schedule(short='sp21'): # I used to be current_schedule
# todo: Some semesters have a different format.... partofday type site xxx i just dL'd them again
filename = 'cache/semesters/'+short_to_long(short)+'/' + short + '_sched.json'
print("opening %s" % filename)
#openfile = open(filename,'r')
#a = json.loads(openfile)
#return pd.DataFrame(a)
schedule = pd.read_json(filename)
schedule.teacher = schedule['teacher'].apply(fix_t_name)
#print schedule['teacher']
for index,r in schedule.iterrows():
tch = r['teacher']
parts = tch.split(' . ')
if len(parts)>1:
#print "Multiple teachers: (" + tch + ")"
schedule.loc[index,'teacher'] = parts[0]
#print " Fixed original: ", schedule.loc[index]
for t in parts[1:]:
r['teacher'] = t
schedule.loc[-1] = r
#print " New row appended: ", schedule.loc[-1]
schedule = schedule.assign(dept = schedule['code'].apply(split_class_dept))
schedule = schedule.assign(codenum = schedule['code'].apply(split_class_code))
schedule = schedule.assign(codeletter = schedule['code'].apply(split_class_code_letter))
#print(schedule)
schedule['sem'] = short
#print schedule.columns
return schedule
##
## CMDLINE INTERACTIVE SEARCH
##
##
course_types = {'in-person':'IP','hybrid':'H','online':'O','online live':'OL'}
def course_to_string(crs):
if len(crs['teacher'].split()) == 3:
crs['teacher'] = crs['teacher'].split()[0] + " " + crs['teacher'].split()[2]
# crn type loc days start end cred num/cap code name teacher date
lengths = [5, 3, 7, 5, 6, 6, 4, 9, 13, 35, 25,10]
items = [ crs[x] for x in 'crn,type,loc,days,time_start,time_end,cred,act,cap,code,name,teacher,date'.split(',')]
items[1] = course_types[ items[1] ]
if items[2] in ["ONLINE", "ONLINE LIVE"]: items[2] = ''
items[6] = items[6][0:3]
items[7] = f"{items[7]}/{items[8]}"
if int(crs['wl_act']) != 0: items[7] += f"+{crs['wl_act']}"
items.pop(8)
result = " ".join(f"{str(val):{width}}" for val, width in zip(items, lengths))
return result
def parse_days(str_days):
# return a list with actual day of week names
days = []
if 'M' in str_days:
days.append('monday')
if 'T' in str_days:
days.append('tuesday')
if 'W' in str_days:
days.append('wednesday')
if 'R' in str_days:
days.append('thursday')
if 'F' in str_days:
days.append('friday')
if 'S' in str_days:
days.append('saturday')
return days
def parse_courses(filename):
with open(filename) as f:
courses = json.load(f)
depts, crns, codes, coursenames, locations, teachers, days, now = {}, {}, {}, {}, {}, {}, {}, datetime.now()
weekdays = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
for course in courses:
#print(course)
dept = course['code'].split()[0]
depts.setdefault(dept, []).append(course)
# Add course to teachers dict
course['teacher'] = re.sub(r'\s+', ' ', course['teacher'])
teachers.setdefault(course['teacher'], []).append(course)
# add course to codes, crns, coursenames dict
codes.setdefault(course['code'], []).append(course)
crns.setdefault(course['crn'], []).append(course)
coursenames.setdefault(course['name'], []).append(course)
if course['type'] != 'in-person': continue
# Add course to locations dict
locations.setdefault(course['loc'], []).append(course)
# Add course to days dict
for day in parse_days(course['days']):
days.setdefault(day, []).append(course)
# Check if course is happening now
if course['time_start'] == '': continue
if course['time_end'] == '': continue
start_time = datetime.strptime(course['time_start'], '%H:%M')
end_time = datetime.strptime(course['time_end'], '%H:%M')
#if start_time.time() <= now.time() <= end_time.time():
# print(f"{course['code']} is happening now in {course['loc']}")
return depts,crns, codes, coursenames, locations, teachers, days
def write_at(row, col, text):
sys.stdout.write(f"\033[{row};{col}H{text}")
sys.stdout.flush()
def write_search_results(res,columns,rows,maximum=8):
height = max(len(res),maximum)
while len(res)<height:
res.append('')
for i,L in enumerate(res):
write_at(rows - i-1, 1, f"{L} ")
write_at(rows,0,'')
def write_cleared_search_results(columns,rows,maximum=8,):
for i in range(maximum):
write_at(rows - i, 1, f" ")
write_at(rows,0,'')
'''examples = 0
if examples:
# Get schedule for a specific room on a specific day
room = 'HU 104'
day = 'monday'
print(f"\nSchedule for {room} on {day}:")
for course in locations[room]:
if day in parse_days(course['days']):
print(" " + course_to_string(course))
# Get weekly schedule for a specific teacher
teacher = 'Kimberly J Smith'
print(f"\nWeekly schedule for {teacher}:")
for course in teachers[teacher]:
print(" " + course_to_string(course))'''
def interactive(allkeys,deptkeys,depts,teacherkeys,teachers, locationkeys,locations,coursenameskeys,coursenames,codeskeys,codes,crnskeys,crns):
import sys
columns, rows = os.get_terminal_size()
def getch():
# Unix
if sys.platform != 'win32':
import tty, termios
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
# Windows
else:
import msvcrt
return msvcrt.getch().decode('utf-8')
words = { x:{} for x in allkeys.keys() }
autocomplete = AutoComplete(words=words)
# get search term
query = ''
results = []
while True:
char = getch()
#print(repr(char))
#print("\n\n\n\n\n\n\n\n")
if char == '\x08':
query = query[:-1]
elif char == '\r':
if query == 'quit': return False
break
else:
query += char
results = [x[0] for x in autocomplete.search(word=query, max_cost=3, size=5)]
print(query+" ",end='',flush=True)
write_search_results(results,columns,rows)
write_cleared_search_results(columns,rows)
print()
print()
for (keyset,dataset) in [(deptkeys,depts),(teacherkeys,teachers), (locationkeys,locations),(coursenameskeys,coursenames),(codeskeys,codes),(crnskeys,crns)]:
if results[0] in keyset:
real_key = keyset[results[0]]
if real_key in dataset.keys():
print(f"\nWeekly schedule for {real_key}:")
for course in dataset[real_key]:
print(" " + course_to_string(course))
def interactive_schedule_search():
while True:
# Read in schedule
depts, crns, codes, coursenames, locations, teachers, days = parse_courses('cache/sample_semester.json')
deptkeys = { x.lower():x for x in depts.keys() }
teacherkeys = { x.lower():x for x in teachers.keys() }
locationkeys = { x.lower():x for x in locations.keys() }
coursenameskeys = { x.lower():x for x in coursenames.keys() }
codeskeys = { x.lower():x for x in codes.keys() }
crnskeys = { x.lower():x for x in crns.keys() }
lname_first = [x.split() for x in teachers.keys() ]
d_lnf = { x[-1].lower() + ", " + ' '.join(x[:-1]).lower(): ' '.join(x) for x in lname_first }
teacherkeys.update(d_lnf)
allkeys = {}
allkeys.update(deptkeys)
allkeys.update(teacherkeys)
allkeys.update(locationkeys)
allkeys.update(coursenameskeys)
allkeys.update(codeskeys)
allkeys.update(crnskeys)
print("\nEnter your query or 'quit': ")
if interactive(allkeys,deptkeys,depts,teacherkeys,teachers, locationkeys,locations,coursenameskeys,coursenames,codeskeys,codes,crnskeys,crns) == False: break
## Download all schedules or just most recent
def download_fresh_schedules():
loc = 'cache/schedules'
folder_path = Path(loc)
folder_path.mkdir(parents=True, exist_ok=True)
from semesters import sems_by_short_name
for short,sem in sems_by_short_name.items():
try:
print(sem["name"])
sched = requests.get(f"http://gavilan.cc/schedule/{short}_sched_expanded.json")
if sched.status_code != 200:
print(f" not found")
continue
schedfile = codecs.open(f"{loc}/{short}_sched_expanded.json","w","utf-8")
schedfile.write(sched.text)
schedfile.close()
except Exception as e:
print(e)
# clean leading, multiple, and trailing spaces
def clean_name(t):
t = t.strip()
t = re.sub('\s+', ' ', t)
return t
# given their name, a list of courses, provide a summary of some stats
def summarize_teacher(who,what):
tally_course_code = defaultdict( int )
tally_mode = defaultdict( int )
tally_site = defaultdict( int )
tally_depts = defaultdict( int )
num_sections = 0
which_semesters = defaultdict(int)
all_sems = funcy.pluck("sem",what)
all_sems_sis = [ short_to_sis(x) for x in all_sems ]
all_sems_sis = list( set( all_sems_sis))
all_sems_sis.sort()
oldest = sis_to_human(all_sems_sis[0])
newest = sis_to_human(all_sems_sis[-1])
for crs in what:
tally_course_code[ crs['code'] ] += 1
tally_mode[ crs['type'] ] += 1
tally_site[ crs['site'] ] += 1
num_sections += 1
which_semesters[ crs['sem']] += 1
try:
c_parts = crs['code'].split(' ')
dept = c_parts[0]
tally_depts[dept] += 1
except:
pass
top_two = sorted(tally_depts, key=tally_depts.get, reverse=True)[:2]
d1 = top_two[0]
d2 = ''
if len(top_two)>1: d2 = top_two[1]
training = json.loads( codecs.open('cache/gott_by_goo.json','r','utf-8').read() )
my_training = {}
if who in training:
my_training = training[who]
return { 'name':who, 'num_sections':num_sections, 'num_semesters':len(which_semesters.keys()), 'num_years': int(len(which_semesters.keys())/3),
'earliest_sem': oldest, 'most_recent':newest,
'courses':tally_course_code,
'training':my_training,
'modes':tally_mode, 'sites':tally_site, 'dept':d1, 'dept2':d2 }
def find_goo_for_name(name_dict,name):
name = clean_name(name)
if name in name_dict:
if name_dict[name]:
print(f"ok {name}")
return name_dict[name]
parts = name.split(' ')
if len(parts) == 3:
new_name = f"{parts[0]} {parts[2]}"
if new_name in name_dict and name_dict[new_name]:
print(f"ok {new_name}")
return name_dict[new_name]
print(f"*** no goo number for {name}.")
return ''
# All teachers' teaching history
def teachers_history():
# Let's just make a giant dict of names we know about -> G numbers
big_name_to_goo = defaultdict(str)
# all ilearn accounts with gavilan.edu address
ilearn_names = json.loads( codecs.open('cache/ilearn_staff.json','r','utf-8').read())
for iln in ilearn_names:
try:
big_name_to_goo[iln['name']] = iln['sis_user_id']
except:
pass
# everyone who's been in teacher role [ name, id, goo, created dt, coursename]
all_teacher_role = everyone_teacher_role()
for atr in all_teacher_role:
big_name_to_goo[atr[0]] = atr[2]
teachers_by_id = { str(x[1]): [x[0], x[2], x[3] ] for x in all_teacher_role }
# names that don't match
non_matching_names = [ line.strip().split(',') for line in codecs.open('cache/sched_name_to_ilearn_id.csv','r','utf-8').readlines() ]
for nmn in non_matching_names:
try:
big_name_to_goo[nmn[0]] = teachers_by_id[nmn[3]][1]
except:
pass
try:
big_name_to_goo[nmn[1]] = teachers_by_id[nmn[3]][1]
except:
pass
#print( sorted(list(big_name_to_goo.keys())) )
# we want canonical names also
big_goo_to_name = {}
for nnn,goo in big_name_to_goo.items():
big_goo_to_name[goo] = nnn
# Define the directory to search
folder_path = Path("cache/schedules")
all_sections_by_goo = defaultdict(list)
#non_matches_file = codecs.open('cache/nonmatches.txt','w','utf-8')
# Define the regex pattern
pattern = re.compile(r"(\w\w\d\d)_sched_expanded\.json")
i = 0
# Iterate through all saved schedule json files
for file in folder_path.iterdir():
if file.is_file() and pattern.match(file.name):
m = pattern.match(file.name)
sem = m.groups()[0]
print(sem)
with file.open("r", encoding="utf-8") as f:
data = json.load(f)
for c in data:
c['sem'] = sem
multiples = c['teacher'].split(' . ')
i += len(multiples)
if len(multiples) > 1:
for m in multiples:
cn = clean_name(m)
goo = find_goo_for_name(big_name_to_goo,m)
all_sections_by_goo[goo].append(c)
#print(f"{sem}\t{cn}\t{goo}\t{c['code']}")
else:
cn = clean_name(c['teacher'])
goo = find_goo_for_name(big_name_to_goo,cn)
all_sections_by_goo[goo].append(c)
#print(f"{sem}\t{cn}\t{goo}\t{c['code']}")
#print(json.dumps(teacher_names_raw, indent=2))
'''goos = sorted(list(all_sections_by_goo.keys()))
non_match_lookup = { x[0]: x[3].strip() for x in non_matching_names }
for N in goos:
found = in_list(ilearn_names,N,'name','sis_user_id')
new_name = ''
if not found:
parts = N.split(' ')
if len(parts) == 3:
new_name = f"{parts[0]} {parts[2]}"
found = in_list(ilearn_names,new_name,'name','sis_user_id')
try:
if not found:
if N in non_match_lookup and non_match_lookup[N]:
found = teachers_by_id[non_match_lookup[N]][1]
elif new_name in non_match_lookup and non_match_lookup[new_name]:
found = teachers_by_id[non_match_lookup[new_name]][1]
except Exception as e:
print(f"Exception on {N}: {e}")
print(f"{found} \t {N}")
print(f"{i} section/teachers analysed")'''
all_summary = []
index = []
DO_UPLOADS = 0
if input("Do uploads? (y/n) ") == 'y': DO_UPLOADS = 1
dataout = codecs.open('cache/teacherhistory.txt','w','utf-8')
for G,L in all_sections_by_goo.items():
#dataout.write(f"{G}\n")
if G:
N = big_goo_to_name[G]
summary = summarize_teacher(G,L)
summary['name'] = N
summary['goo'] = G
summary['sections'] = []
for course in L:
dataout.write(f"{G},{N},{course['sem']},{course['code']},{course['cred']},{course['type']},{course['site']},{course['days']}\n")
summary['sections'].append(f"{G},{N},{course['sem']},{course['code']},{course['cred']},{course['type']},{course['site']},{course['days']}")
all_summary.append(summary)
index.append( [G,N,summary['courses'],summary['training'],summary['most_recent'], summary['dept'],summary['dept2'] ] )
teacherout = codecs.open(f"cache/faculty/{G}.json","w","utf-8")
teacherout.write(json.dumps(summary,indent=2))
teacherout.close()
if DO_UPLOADS:
print(f"uploading {N}")
put_file(f"/home/public/faculty/", f"cache/faculty/", f"{G}.json", 0)
summaryout = codecs.open('cache/teachersummary.json','w','utf-8')
summaryout.write(json.dumps(all_summary,indent=2))
summaryout.close()
indexout = codecs.open('cache/faculty/index.json','w','utf-8')
indexout.write(json.dumps(index,indent=2))
indexout.close()
if DO_UPLOADS:
put_file(f"/home/public/faculty/", f"cache/", f"teachersummary.json", 0)
put_file(f"/home/public/faculty/", f"cache/faculty/", f"index.json", 0)
def in_list(li,needle,key,ret_key):
for L in li:
if L[key] == needle: return L[ret_key]
return False
def time_range(a,b):
if a and b:
return f"{add_colon_to_24hr_time(a)}-{add_colon_to_24hr_time(b)}"
return ""
def add_colon_to_24hr_time(the_time):
if the_time:
the_time = str(the_time)
return the_time[0:-2] + ":" + the_time[-2:]
return ""
def date_without_year(d):
return f"{d.month}/{d.day}"
def tchr(a,b):
if a and b:
return f"{a} {b}"
return ""
def type_num_to_type_str(typenum):
typenum = str(typenum)
if typenum in ["72","20","736","737","73A"]:
return "online"
if typenum in ["2","4","45","46","47","04A","04B"]:
return "in-person"
if typenum in ["5","40"]:
return "hybrid"
if typenum in ["71","73B","73"]:
return "online live"
return typenum
def excel_schedule():
su_xl = 'SU 2025- 3.24.25.xlsx'
fa_xl = 'FA 2025- 3.24.25.xlsx'
infile = fa_xl
outfile = 'fa25_sched_expanded.json'
from openpyxl import Workbook, load_workbook
from openpyxl.chart import BarChart, Series, Reference
from openpyxl.styles import PatternFill, Border, Side, Alignment, Protection, Font, Fill
wb = load_workbook(f"cache/{infile}")
print(wb.sheetnames)
# Select the active sheet (or you can pick a specific one: wb['SheetName'])
ws = wb['Schedule by Division and Dept -'] #wb.active
# Read and print all rows
#for row in ws.iter_rows(values_only=True):
# print(row)
#
#
# Extract header row
headers = [cell for cell in next(ws.iter_rows(min_row=1, max_row=1, values_only=True))]
# Mapping from Excel headers to your desired keys
header_map = {
"crn": "CRN",
"code": "Course",
"sec": "SeqNo",
#"PtTerm": "cmp",
"cred": "Units",
"name": "Title",
"days": "Days",
"time": lambda row: time_range(row[headers.index('BegTime')], row[headers.index('EndTime')]),
"time_start": lambda row: add_colon_to_24hr_time( row[headers.index["BegTime"]]),
"time_end": lambda row: add_colon_to_24hr_time( row[headers.index["EndTime"]]),
"cap": "MaxEnroll",
"rem": "MaxEnroll",
"teacher": lambda row: tchr(row[headers.index('FirstName')], row[headers.index('LastName')]),
"date": lambda row: f"{date_without_year(row[headers.index('StartDate')])}-{date_without_year(row[headers.index('EndDate')])}",
"start": lambda row: date_without_year(row[headers.index('StartDate')]),
"end": lambda row: date_without_year(row[headers.index('EndDate')]),
"loc": lambda row: "ONLINE" if row[headers.index("Bldg")] == "ONLINE" else f"{row[headers.index('Bldg')]} {row[headers.index('Room')]}",
# Add fixed fields
"ztc": lambda row: "",
"note": "Footnote",
"type": lambda row: type_num_to_type_str(row[headers.index('SchdTyp')]),
"site": lambda row: "",
"doy": lambda row: row[headers.index("StartDate")].timetuple().tm_yday if row[headers.index("StartDate")] else "",
}
# Function to convert a row to your dict format
def convert_row(row):
data = {}
for k, v in header_map.items():
try:
if isinstance(v, type(lambda: None)):
d2 = v(row)
if d2:
data[k] = str(d2)
else:
data[k] = ""
else:
d1 = row[ headers.index( header_map[k] ) ]
if d1:
data[k] = str(d1)
else:
data[k] = ""
except Exception as e:
print(f"Exception for key [{k}] value [{v}] on ROW: {row}\n{e}")
print(json.dumps(data,indent=2))
return data
# Process all data rows
data_rows = list(ws.iter_rows(min_row=2, values_only=True))
converted = [convert_row(row) for row in data_rows]
print(json.dumps(converted,indent=2))
with codecs.open(f'cache/schedules/{outfile}','w','utf-8') as ofi:
ofi.write(json.dumps(converted,indent=2))
# wanted: a class's teacher history: everyone who teaches it.
if __name__ == "__main__":
print ('')
options = { 1: ['Interactive schedule search',interactive_schedule_search] ,
2: ['test areas gp and deans',campus_dept_hierarchy] ,
3: ['download_fresh_schedules', download_fresh_schedules],
4: ['teachers_history', teachers_history],
5: ['parse in progress schedule', excel_schedule],
}
'''3: ['Fetch rosters on schedule',fetch_current_rosters_auto] ,
4: ['Compute how registration is filling up classes', schedule_filling] ,
5: ['Manually convert 3 csv files to joined json enrollment file.', convert_roster_files] ,
6: ['Canvas data: interactive sync', interactive ],
7: ['Canvas data: automated sync', sync_non_interactive ],
8: ['Get canvas data 2024 style', canvas_data_2024_run ],
9: ['Set up canvas data 2024 style', setup_canvas_data_2024_run],
16: ['Scrape schedule from ssb', scrape_schedule_multi ],
14: ['Generate latestart schedule', list_latestarts ],
15: ['Test ssb calls with python', scrape_schedule_py ],
10: ['schedule to db', scrape_for_db ],
11: ['clean argos draft schedule file', argos_data_from_cvc],
12: ['make expanded schedule json files of old semesters', expand_old_semesters ],
13: ['Parse deanza schedule', dza_sched ],'''
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
resp = int(sys.argv[1])
print("\n\nPerforming: %s\n\n" % options[resp][0])
else:
print ('')
for key in options:
print(str(key) + '.\t' + options[key][0])
print('')
resp = input('Choose: ')
# Call the function in the options dict
options[ int(resp)][1]()