canvasapp/curric2022.py

849 lines
34 KiB
Python

import requests,json,os,re, bisect, csv, codecs, funcy, sys, shutil, time
from datetime import datetime
import sortedcontainers as sc
from collections import defaultdict
from toolz.itertoolz import groupby,sliding_window
from sortedcontainers import SortedList
#from durable.lang import *
#from durable.engine import *
from pampy import match, _
from bs4 import BeautifulSoup as bs
leafcount = 0
displaynames = []
from canvas_secrets import cq_user, cq_pasw
CQ_URL = "https://secure.curricunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc"
PARAM = "?returnFormat=json&method=getCourses"
user = cq_user
pasw = cq_pasw
err_fail_filecount = 1
def fetch_all_programs():
if os.path.isdir('cache/programs'):
m = datetime.strptime(time.ctime(os.path.getctime('cache/programs')), "%a %b %d %H:%M:%S %Y")
today = 'cache/programs_%s' % m.strftime('%Y_%m_%d')
print("+ Creating folder: %s" % today)
shutil.move('cache/programs', today)
os.makedirs('cache/programs')
size = 100
endn = 0
filen = 1
PARAM = "?returnFormat=json&method=getPrograms&status=Active"
while(size > 99):
size, endn, items = another_request(CQ_URL+PARAM,endn)
out = codecs.open('cache/programs/programs_'+str(filen)+'.txt','w', 'utf-8')
out.write(json.dumps(items,indent=4))
out.close()
filen += 1
print("Written to 'cache/programs....")
def nothing(x=0):
pass
seen = []
def clean(st):
#return st
global seen
ok = ['b','i','ul','li','ol','strong','br','u']
soup = bs(st, features='lxml')
"""for tag in soup.recursiveChildGenerator():
if isinstance(tag,bs.Tag) and tag.name not in ok:
tag.unwrap()
return soup.prettify()
"""
for T in soup.find_all(recursive=True):
if not T.name in ok:
if not T.name in seen:
seen.append(T.name)
print("- %s" % T.name)
#print(seen)
T.unwrap()
else:
#print("+ %s" % T.name)
pass
return str(soup).strip()
num_failed_course = 1
def single_course_parse(c):
global num_failed_course
this_course = []
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], recur_matcher(c))
else:
print("I couldn't recognize a class in that")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def match_style_test():
classes = {}
oo = codecs.open("cache/courses/curric2022test.json","w","utf-8")
for f in os.listdir('cache/courses'):
if re.search('classes_',f):
print(f)
cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read())
for c in cls:
id,output = single_course_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def single_program_path_parse(c):
this_course = []
global num_failed_course
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], pathstyle(c))
else:
print("I couldn't recognize a program in that")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def path_style_prog():
classes = {}
oo = codecs.open("cache/programs/allprogrampaths.txt","w","utf-8")
for f in os.listdir('cache/programs'):
if re.search('^programs_',f):
print(f)
cls = json.loads(codecs.open('cache/programs/'+f,'r','utf-8').read())
for c in cls:
id,output = single_program_path_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def term_txt_to_code(t):
term_codes = {'Spring':'30','Summer':'50','Fall':'70'}
parts = t.split(" ")
if len(parts)>1:
yr = parts[1]
sem = term_codes[parts[0]]
return yr+sem
return ''
def all_outcomes():
csvfile = codecs.open('cache/courses/alloutcomes.csv','w','utf-8')
csvwriter = csv.writer(csvfile)
csvwriter.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' '))
csvfile2 = codecs.open('cache/courses/all_active_outcomes.csv','w','utf-8')
csvwriter2 = csv.writer(csvfile2)
csvwriter2.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' '))
rr = codecs.open("cache/courses/allclasspaths.txt","r", "utf-8").readlines()
ww = codecs.open("cache/courses/alloutcomes.txt","w", "utf-8")
course_index = []
current_course = {}
current_course_num = 0
term_counts = defaultdict(int)
count = 0
for L in rr:
a = re.search('Course\/(\d+)',L)
if a:
course_num = a.group(1)
#print(course_num, current_course_num)
if (course_num != current_course_num):
if current_course_num != 0:
# log the course info so we can know cq id numbers of courses
course_index.append(current_course)
# status
count += 1
#input('ok ')
if count % 100 == 0:
print(count)
#pass
current_course_num = course_num
#print(course_num)
current_course = {'c':'','d':'','n':'','t':'','s':'','T':'','o':[],'i':'','a':'','m':''}
current_course['c'] = course_num
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Discipline\/(.*)$',L)
if a:
current_course['d'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Number\/(.*)$',L)
if a:
current_course['n'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Title\/(.*)$',L)
if a:
current_course['T'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Short\ Title\/(.*)$',L)
if a:
current_course['t'] = a.group(2)
a = re.search('Course\ Description\/status\/(.*)$',L)
if a:
current_course['s'] = a.group(1)
a = re.search('Course\ Content\/\d+\/Lecture\ Content\/Curriculum\ Approval\ Date:\s*(.*)$',L)
if a:
current_course['a'] = a.group(1)
a = re.search('Course\ Description\/\d+\/Internal\ Processing\ Term\/(.*)$',L)
if a:
t_code = term_txt_to_code(a.group(1))
current_course['m'] = t_code
term_counts[t_code] += 1
# Course/10/10/Course Content/1/Lecture Content/Curriculum Approval Date: 02/24/2014
# Course/3091/1/Course Description/0/Internal Processing Term/Spring 2018
a = re.search('Learning\ Outcomes\/\d+\/(cqid_\d+)\/Learning\ Outcomes\/Description\/(.*)$',L)
if a:
current_course['o'].append(a.group(2))
current_course['i'] = a.group(1)
csvwriter.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)])
if current_course['s']=='Active':
csvwriter2.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)])
if re.search('Learning\ Outcomes\/Description\/',L):
ww.write(L)
if re.search('Description\/entityTitle\/',L):
ww.write(L)
if re.search('Description\/status\/',L):
ww.write(L)
xx = codecs.open("cache/courses/course_cq_index.json","w", "utf-8")
xx.write(json.dumps(course_index, indent=2))
#print(json.dumps(term_counts,indent=2))
def ddl():
return defaultdict(list)
def splitclassline(cl, id=''):
# "PHYS 4A - Physics for Scientists and Engineers I 4.000 *Active*"
dbg = 1
ret = {'name':'','units':'','units_hi':'','code':'','status':'', 'sequence':int(id)}
p1 = re.search(r'^(.*?)\s\-\s(.*)$',cl)
if p1:
code = p1.groups()[0]
ret['code'] = code
rest = p1.groups()[1]
p3 = re.search(r'^(.*)\s(\d+\.\d+)\s\-\s(\d+\.\d+)\s+\*(\w+)\*$',rest)
if p3:
name = p3.groups()[0]
units = p3.groups()[1]
units_hi = p3.groups()[2]
status = p3.groups()[3]
ret['name'] = name
ret['units'] = units
ret['units_hi'] = units_hi
ret['status'] = status
#if dbg: print( "%s --- code: %s - name: %s - units: %s-%s - status: %s" % (cl,code,name,units,units_hi,status))
return ret
p2 = re.search(r'^(.*)\s(\d+\.\d+)\s+\*(\w+)\*$',rest)
if p2:
name = p2.groups()[0]
units = p2.groups()[1]
status = p2.groups()[2]
ret['name'] = name
ret['units'] = units
ret['status'] = status
#if dbg: print( "%s --- code: %s - name: %s - units: %s - status: %s" % (cl,code,name,units,status))
return ret
else:
if dbg: print( "%s --- code: %s --------------------------------" % (cl,code))
else:
if dbg: print( "%s --- code:----------------------------------------" % cl)
#return (cl,'','')
return ret
def path_style_2_html():
verbose = 1
v = verbose
prog_title_subs = []
with codecs.open('cache/program_published_names.csv', 'r','utf-8') as file:
reader = csv.reader(file)
for row in reader:
prog_title_subs.append(row)
oo = codecs.open("cache/programs/allprogrampaths.txt","r","utf-8").readlines()
award_prebuild = defaultdict( ddl )
last_line = ""
for L in oo:
L = L.strip()
if not re.search(r'^Program',L):
last_line = last_line + " " + L
continue
else:
if re.search(r'\/$',last_line):
# ignore line with trailing slash - assume no data
last_line = L
continue
if re.search(r'Curriculum\sDivision\s\d+', last_line):
#print(last_line)
pass
test_1 = re.search(r'^Program\/(\d+)\/Course',last_line)
if test_1:
award_prebuild[ test_1.groups()[0] ]["Info"].append(last_line)
test_2 = re.search(r'^Program\/(\d+)\/(\d+)\/([\w\s]+)\/',last_line)
if test_2:
award_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line)
last_line = L
output = codecs.open("cache/programs/programs_prebuild.json","w","utf-8")
output.write( json.dumps(award_prebuild, indent=2) )
award_build = defaultdict( ddl )
for AW in sorted(list(award_prebuild.keys()),key=int):
v = 1
aw = award_prebuild[AW]
for line in aw["Program Description"]:
t1 = re.search(r'Division\/(.*)$', line)
if t1:
award_build[AW]["division"] = t1.groups()[0]
t1 = re.search(r'Department\/(.*)$', line)
if t1:
award_build[AW]["dept"] = t1.groups()[0]
t1 = re.search(r'Program\sTitle\/(.*)$', line)
if t1:
award_build[AW]["program_title"] = t1.groups()[0]
t1 = re.search(r'Award\sType\/(.*)$', line)
if t1:
award_build[AW]["award"] = t1.groups()[0]
t1 = re.search(r'\/Description\/(.*)$', line)
if t1:
award_build[AW]["description"] = t1.groups()[0]
t1 = re.search(r'Transfer\/CTE\/(.*)$', line)
if t1:
award_build[AW]["transfer_cte"] = t1.groups()[0]
t1 = re.search(r'CTE\sProgram\?\/\/(.*)$', line)
if t1:
award_build[AW]["is_cte"] = t1.groups()[0]
for line in aw["Info"]:
t1 = re.search(r'Description\/status\/(.*)$', line)
if t1:
award_build[AW]["status"] = t1.groups()[0]
t1 = re.search(r'Description\/proposalType\/(.*)$', line)
if t1:
award_build[AW]["proposal_type"] = t1.groups()[0]
for line in aw["Codes"]:
t1 = re.search(r'Banner\sCode\/(.*)$', line)
if t1:
award_build[AW]["banner_code"] = t1.groups()[0]
# substitute in program names more suitable for publishing
subbed = 0
for L in prog_title_subs:
if award_build[AW]["dept"] == L[0] and award_build[AW]["program_title"] == L[1]:
award_build[AW]["publish_title"] = L[2]
subbed = 1
if v: print("SUBBED")
if len(L)>3:
award_build[AW]["publish_title2"] = L[3]
else:
award_build[AW]["publish_title2"] = ""
if not subbed:
award_build[AW]["publish_title"] = award_build[AW]["dept"]
award_build[AW]["publish_title2"] = ""
if award_build[AW]["program_title"] == "Liberal Arts: Computer Science & Information Systems Emphasis":
award_build[AW]["publish_title"] = "Computer Science and Information Studies"
award_build[AW]["publish_title2"] = "Liberal Arts"
if v: print("-----LIB ART CSIS")
if v:
print("%s / %s - %s" % (award_build[AW]["publish_title"],award_build[AW]["program_title"], award_build[AW]["award"]))
v = 0
for line in aw["Program Learning Outcomes"]:
t1 = re.search(r'Program\sLearning\sOutcomes\/\d+\/Outcome\/(\d+)\/cqid_(\d+)\/Outcome\/Outcome\/(.*)$', line)
if t1:
if "PLO" in award_build[AW]:
award_build[AW]["PLO"].append( (t1.groups()[0], t1.groups()[2]) )
else:
award_build[AW]["PLO"] = [ (t1.groups()[0], t1.groups()[2]), ]
st = lambda x: x[0]
award_build[AW]["PLO"] = sorted( award_build[AW]["PLO"], key=st )
award_build[AW]["PLO"] = [ x[1] for x in award_build[AW]["PLO"] ]
req_prebuild = defaultdict(list)
pbd_unit_calcs = {}
# requirements table:
# - most types have a 'units' column, which might be calculated
# - might be overridden
# - might be single number or a range min/max
for line in aw["Program Requirements"]:
t1 = re.search(r'Program\sBlock\sDefinitions\/(\d+)/cqid_\d+/Program\sBlock\sDefinitions\/(.*)$', line)
if t1:
pbd_number = t1.groups()[0]
if not pbd_number in pbd_unit_calcs:
pbd_unit_calcs[pbd_number] = {'unit_sum':0,'unit_sum_max':0,'override':0,'min':0,'max':0}
t2 = re.search(r'Requirements\/\d+\/Program\sBlock\sDefinitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Course\sBlock\sDefinition\/(.*)$', line)
if t2:
req_prebuild[pbd_number].append( ('h3', '0', t2.groups()[1]) )
continue
t3 = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+\/Program\sCourses\/\d+\/\[Discipline\sand\sCourse\schained\scombo\]\/Course\/(.*)$',line)
if t3:
req_prebuild[pbd_number].append( ('course', t3.groups()[0], splitclassline( t3.groups()[1], t3.groups()[0] )) )
continue
t3a = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/or$',line)
if t3a:
req_prebuild[pbd_number].append( ('or', t3a.groups()[0]) )
continue
t3b = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/and$',line)
if t3b:
req_prebuild[pbd_number].append( ('and', t3b.groups()[0]) )
continue
t4 = re.search(r'Definitions\/(\d+)\/cqid_\d+/Program\sBlock\sDefinitions\/\d+\/Program\sCourses/(\d+)/cqid_\d+/Program\sCourses\/Non\-Course\sRequirements\/(.*)$',line)
if t4:
req_prebuild[pbd_number].append( ('noncourse', t4.groups()[1], t4.groups()[2]) )
continue
t5 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Override\sUnit\sCalculation\/1$',line)
if t5:
pbd_unit_calcs[pbd_number]['override'] = 1
continue
t6 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMin\/(.*)$',line)
if t6:
pbd_unit_calcs[pbd_number]['min'] = t6.groups()[1]
continue
t7 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMax/(.*)$',line)
if t7:
pbd_unit_calcs[pbd_number]['max'] = t7.groups()[1]
continue
t8 = re.search(r'chained\scombo\]\/Discipline',line)
if t8:
continue
t8a = re.search(r'Units\s[Low|High]',line)
if t8a:
continue
t9 = re.search(r'Definitions\/Block\sHeader\/(.*)$',line)
if t9:
req_prebuild[pbd_number].append( ('blockheader', t9.groups()[0]) )
continue
req_prebuild[pbd_number].append( ('', t1.groups()[1]) )
award_build[AW]["requirements"] = req_prebuild
award_build[AW]["unit_calcs"] = pbd_unit_calcs
# associate unit calculations with program blocks
for block_key in req_prebuild.keys():
if block_key in pbd_unit_calcs:
req_prebuild[block_key].insert(0, pbd_unit_calcs[block_key])
else:
req_prebuild[block_key].insert(0, {'unit_sum':0,'unit_sum_max':0,'override':0})
# do the unit calc math
for block_key in req_prebuild.keys():
this_block = req_prebuild[block_key]
pad = this_block[0]
if v: print("pad: ",pad)
block_dict = {}
for item in this_block[1:]:
print(item)
try:
if item[0] == "or":
block_dict[ item[1]+"or" ] = 1
if item[0] == "h3":
if v: print("+ ", item[1])
if item[0] == "blockheader":
if v: print(" ", item[1])
if not item[0] == "course":
continue
block_dict[ item[1] ] = item[2]
seq = int(item[1])
units = ''
if item[2]['units']: units = float( item[2]['units'] )
except Exception as e:
print("ERROR ERROR\nERROR ERROR")
print(e)
xyz = input('hit return to continue')
#print( "%i \t %f \t %s" % (seq,units, item[2]['name']))
if v:
for k in sorted( block_dict.keys() ):
print(k," ", block_dict[k])
#for k in sliding_window(3, sorted( block_dict.keys() )):
# l,m,n = k
# if re.search(r'or$',m):
# print("OR")
# print(block_dict[l],"\n",block_dict[m],"\n",block_dict[n],"\n\n")
#print()
output = codecs.open("cache/programs/programs_built.json","w","utf-8")
output.write( json.dumps(award_build, indent=2) )
def course_path_style_2_html():
verbose = 1
v = verbose
oo = codecs.open("cache/courses/allclasspaths.txt","r","utf-8").readlines()
course_prebuild = defaultdict( ddl )
last_line = ""
for L in oo:
L = L.strip()
if not re.search(r'^Course',L):
last_line = last_line + " <br /> " + L
continue
else:
if re.search(r'\/$',last_line):
# ignore line with trailing slash - assume no data
last_line = L
continue
test_1 = re.search(r'^Course\/(\d+)\/Course',last_line)
if test_1:
course_prebuild[ test_1.groups()[0] ]["Info"].append(last_line)
test_2 = re.search(r'^Course\/(\d+)\/(\d+)\/(.*?)\/(.*)$',last_line)
if test_2:
course_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line)
last_line = L
output = codecs.open("cache/courses/courses_prebuild.json","w","utf-8")
output.write( json.dumps(course_prebuild, indent=2) )
all_courses = {}
active_courses = {}
lookup_table = { 'entityTitle':'title', 'proposalType':'type',
'\/Course\sDescription\/status':'status', 'Course\sDiscipline':'dept',
'Course\sNumber':'number', 'Course\sTitle':'name',
'Short\sTitle':'shortname', 'Internal\sProcessing\sTerm':'term', 'This\sCourse\sIs\sDegree\sApplicable':'degree_applicable',
'\/Course\sDescription\/\d+\/Course\sDescription\/':'desc',
'Minimum\sUnits':'min_units', 'Minimum\sLecture\sHour':'min_lec_hour', 'Minimum\sLab\sHour':'min_lab_hour', 'Course\shas\svariable\shours':'has_var_hours',
'Number\sWeeks':'weeks',
'Maximum\sUnits':'max_units', 'Credit\sStatus':'credit_status',
'TOP\sCode':'top_code', 'Classification':'classification', 'Non\sCredit\sCategory':'noncredit_category', 'Stand-Alone\sClass?':'stand_alone',
'Grade\sOption':'grade_option', 'Is\sRepeatable':'repeatable', 'Learning\sOutcomes\/Description':'slo',
'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sState\sUniversities\sand\sColleges?':'transfer_csu',
'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sUniversity\sof\sCalifornia?':'transfer_uc',
'\/Catalog\sCourse\sSummary\sView\/':'catalog',
'\/Course\sContent/\d+/Lecture\sContent\/':'content',
'\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/':'objectives'}
for C in sorted(list(course_prebuild.keys()),key=int):
v = 0
crs = course_prebuild[C]
course_build = {'slo':{}} # defaultdict( ddl )
if v: print(C)
for K in crs.keys():
if v: print("\t%s" % K)
for line in crs[K]:
for (str,key) in lookup_table.items():
if re.search(str,line):
if key == 'slo':
# \s<br\s\/>\s
content_search = re.search(r'\/Learning\sOutcomes\/\d+\/cqid_(\d+)\/Learning\sOutcomes\/Description\/(.*?)$',line)
if content_search: course_build['slo'][content_search.groups()[0]] = content_search.groups()[1]
else:
print("NO SLO? %s" % line)
elif key == 'desc':
content_search = re.search(r'^Course\/\d+\/\d+\/Course\sDescription\/\d+\/Course\sDescription\/(.*)$',line)
course_build['desc'] = content_search.groups()[0]
elif key == 'catalog':
content_search = re.search(r'^Course\/\d+\/\d+\/General\sEducation\sPattern\/\d+\/Catalog\sCourse\sSummary\sView\/(.*)$',line)
course_build['catalog'] = content_search.groups()[0]
elif key == 'content':
content_search = re.search(r'^Course\/\d+\/\d+\/Course\sContent\/\d+\/Lecture\sContent\/(.*)$',line)
course_build['content'] = content_search.groups()[0]
elif key == 'objectives':
content_search = re.search(r'^Course\/\d+\/\d+\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/(.*)$',line)
course_build['objectives'] = content_search.groups()[0]
else:
content_search = re.search(r'^(.*)\/(.*?)$',line)
course_build[key] = content_search.groups()[1]
if v: print("\t\t%s - %s" % (key, course_build[key]))
continue
all_courses[C] = course_build
if course_build['status'] == 'Active':
active_courses[C] = course_build
output = codecs.open("cache/courses/courses_built.json","w","utf-8")
output.write( json.dumps(all_courses, indent=2) )
output2 = codecs.open("cache/courses/courses_active_built.json","w","utf-8")
output2.write( json.dumps(active_courses, indent=2) )
#########
#########
#########
#########
def another_request(url,startat):
global err_fail_filecount
newparam = "&skip=" + str(startat)
print((url+newparam))
r = requests.get(url+newparam, auth=(user,pasw))
try:
mydata = json.loads(r.text, strict=False)
except Exception as e:
print("Couldn't read that last bit")
#print((r.text))
codecs.open('cache/curric2022failfile_%i.txt' % err_fail_filecount,'w','utf-8').write(r.text)
err_fail_filecount += 1
print(e)
return 0,0,[]
size = mydata['resultSetMetadata']['ResultSetSize']
endn = mydata['resultSetMetadata']['EndResultNum']
items = mydata['entityInstances']
print((' Got ' + str(size) + ' instances, ending at item number ' + str(endn)))
return size,endn,items
def fetch_all_classes():
if os.path.isdir('cache/courses'):
m = datetime.strptime(time.ctime(os.path.getctime('cache/courses')), "%a %b %d %H:%M:%S %Y")
today = 'cache/courses_%s' % m.strftime('%Y_%m_%d')
print("+ Creating folder: %s" % today)
shutil.move('cache/courses', today)
os.makedirs('cache/courses')
size = 100
endn = 0
filen = 1
while(size > 99):
size, endn, items = another_request(CQ_URL+PARAM,endn)
out = codecs.open('cache/courses/classes_'+str(filen)+'.txt','w', 'utf-8')
out.write(json.dumps(items,indent=2))
out.close()
filen += 1
print("Written to 'cache/courses....")
#
#
# Main worker
#
def recur_path_matcher(item, path=[]):
def x2_path_update(x,y,z):
path.extend([str(y),x])
my_result_lines.append( '/'.join(path) + '/' + 'lastEdited' + '/' + z)
path_str = "/".join(path) + "/"
path_str = re.sub('\/+','/',path_str)
path_str = re.sub('\s+',' ',path_str)
my_result_lines = []
if type(item) == type({}):
original_path = path.copy()
match( item,
{'attributes': {'displayName': _}, 'lookUpDisplay': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'attributes': {'displayName': _}, 'fieldValue': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'attributes': {'fieldName': _}, 'fieldValue': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'instanceId':_, 'sectionName': _, 'sectionSortOrder':_},
lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]),
{'instanceId':_, 'sectionName': _, 'instanceSortOrder':_},
lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]),
{'sectionName': _, 'sectionSortOrder':_, 'lastUpdated': _ },
#lambda x,y,z: path.extend([str(y),x,z]),
x2_path_update,
{'sectionName': _, 'sectionSortOrder':_},
lambda x,y: path.extend([str(y),x]),
{'sectionName': _},
lambda x: path.append(x),
_, nothing #lambda x: path.append('')
)
path = original_path
for K,V in list(item.items()):
my_result_lines.extend(recur_path_matcher(V,path))
elif type(item) == type([]):
for V in item:
my_result_lines.extend(recur_path_matcher(V,path))
return my_result_lines
def pathstyle(theclass):
#theclass = json.loads( codecs.open('cache/courses/samplecourse.json','r','utf-8').read() )
# {'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }},
# lambda title,status,typ,id:
# my_result_lines.append("%s%s/%s/%s [%s]" % (path_str, str(typ), str(id), str(title),str(status))) ,
if "entityMetadata" in theclass:
id = theclass["entityMetadata"]["entityId"]
title = theclass["entityMetadata"]["entityTitle"]
typ = theclass["entityMetadata"]["entityType"]
action = theclass["entityMetadata"]["proposalType"]
status = theclass["entityMetadata"]["status"]
#"entityId": 4077,
#"entityTitle": "ENGL2B - American Ethnic Literature",
#"entityType": "Course",
#"proposalType": "Deactivate Course",
#"status": "Historical",
result = [ "/".join([ typ,str(id),"Course Description","entityTitle",title]) ,
"/".join([ typ,str(id),"Course Description","entityType",typ]) ,
"/".join([ typ,str(id),"Course Description","proposalType",action]) ,
"/".join([ typ,str(id),"Course Description","status",status]) , ]
result.extend(recur_path_matcher(theclass["entityFormData"]["rootSections"], [typ,str(id)] ))
#oo = codecs.open("cache/courses/curric2022test_path.json","w","utf-8")
#print(result)
return result
else:
print("didn't seem to be a class.")
def single_course_path_parse(c):
this_course = []
global num_failed_course
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], pathstyle(c))
else:
print("I couldn't recognize a class in that")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def path_style_test():
classes = {}
oo = codecs.open("cache/courses/allclasspaths.txt","w","utf-8")
for f in os.listdir('cache/courses'):
if re.search('^classes_',f):
print(f)
cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read(),strict=False)
for c in cls:
id,output = single_course_path_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def make_sl():
return SortedList(key=lambda x: -1 * int(x['m']))
def course_rank():
csvfile = codecs.open('cache/courses/all_courses_ranked.csv','w','utf-8')
csvwriter = csv.writer(csvfile)
csvwriter.writerow("code,cqcourseid,coursestatus,termineffect,dept,num,numoutcomes".split(","))
courses = json.loads(codecs.open('cache/courses/course_cq_index.json','r','utf-8').read())
all = defaultdict(make_sl)
for c in courses:
code = c['d']+c['n']
if not c['m']:
c['m'] = '200030'
all[code].add(c)
for k in sorted(all.keys()):
print("\n##",k)
print(json.dumps(list(all[k]),indent=2))
for version in all[k]:
csvwriter.writerow( [ version['d']+version['n'], version['c'], version['s'], version['m'], version['d'], version['n'], len(version['o']) ])
if __name__ == "__main__":
print ('')
options = { 1: ['fetch all courses', fetch_all_classes],
2: ['process all classes', path_style_test],
3: ['courses - path style to html catalog', course_path_style_2_html],
4: ['courses - rank by all versions', course_rank],
5: ['fetch all programs', fetch_all_programs],
6: ['process all programs', path_style_prog],
9: ['show course outcomes', all_outcomes],
10: ['programs - path style to html catalog', path_style_2_html],
}
print ('')
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
resp = int(sys.argv[1])
print("\n\nPerforming: %s\n\n" % options[resp][0])
else:
print ('')
for key in options:
print(str(key) + '.\t' + options[key][0])
print('')
resp = input('Choose: ')
# Call the function in the options dict
options[ int(resp)][1]()