canvasapp/curric2022.py

1001 lines
41 KiB
Python

import util
import requests,json,os,re, bisect, csv, codecs, funcy, sys, shutil, time
from datetime import datetime
import sortedcontainers as sc
from collections import defaultdict
from toolz.itertoolz import groupby,sliding_window
from sortedcontainers import SortedList
from pampy import match, _
from bs4 import BeautifulSoup as bs
leafcount = 0
displaynames = []
from canvas_secrets import cq_user, cq_pasw
from outcomes import quick_add_course_outcomes
from schedules import campus_dept_hierarchy
CQ_URL = "https://secure.curricunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc"
CQ_URL = "https://mws.services.curriqunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc"
PARAM = "?returnFormat=json&method=getCourses"
user = cq_user
pasw = cq_pasw
err_fail_filecount = 1
def fetch_all_programs():
if os.path.isdir('cache/programs'):
m = datetime.strptime(time.ctime(os.path.getctime('cache/programs')), "%a %b %d %H:%M:%S %Y")
today = 'cache/programs_%s' % m.strftime('%Y_%m_%d')
print("+ Creating folder: %s" % today)
shutil.move('cache/programs', today)
os.makedirs('cache/programs')
size = 100
endn = 0
filen = 1
PARAM = "?returnFormat=json&method=getPrograms&status=Active"
while(size > 99):
size, endn, items = another_request(CQ_URL+PARAM,endn)
out = codecs.open('cache/programs/programs_'+str(filen)+'.txt','w', 'utf-8')
out.write(json.dumps(items,indent=4))
out.close()
filen += 1
print("Written to 'cache/programs....")
def nothing(x=0):
pass
seen = []
def clean(st):
#return st
global seen
ok = ['b','i','ul','li','ol','strong','br','u']
soup = bs(st, features='lxml')
"""for tag in soup.recursiveChildGenerator():
if isinstance(tag,bs.Tag) and tag.name not in ok:
tag.unwrap()
return soup.prettify()
"""
for T in soup.find_all(recursive=True):
if not T.name in ok:
if not T.name in seen:
seen.append(T.name)
#print("- %s" % T.name)
#print(seen)
T.unwrap()
else:
#print("+ %s" % T.name)
pass
return str(soup).strip()
def recur_matcher(item, depth=0):
indent = depth * " "
my_result_lines = []
if type(item) == type({}):
if not match( item,
{'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }},
lambda title,status,typ,id:
my_result_lines.append("%s%s: %s (id %s) status: %s" % (indent, str(typ), str(title), str(id), str(status))) ,
{'attributes': {'displayName': _}, 'lookUpDisplay': _, },
lambda x,y: my_result_lines.append("%s%s: %s" % (indent, clean(str(x)), clean(str(y)))) ,
{'attributes': {'displayName': _}, 'fieldValue': _, },
lambda x,y: my_result_lines.append("%s%s: %s" % (indent, clean(str(x)), clean(str(y)))) ,
{'sectionName': _},
lambda x: my_result_lines.append("%sSection: %s" % (indent, str(x))) ,
_, nothing
):
for K,V in list(item.items()):
my_result_lines.extend(recur_matcher(V,depth+1))
elif type(item) == type([]):
for V in item:
my_result_lines.extend(recur_matcher(V,depth+1))
return my_result_lines
num_failed_course = 1
def single_course_parse(c):
global num_failed_course
this_course = []
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], recur_matcher(c))
else:
print("I couldn't recognize a class in that")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def match_style_test():
classes = {}
oo = codecs.open("cache/courses/curric2022test.json","w","utf-8")
for f in os.listdir('cache/courses'):
if re.search('classes_',f):
print(f)
cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read())
for c in cls:
id,output = single_course_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def single_program_path_parse(c):
this_course = []
global num_failed_course
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], pathstyle(c))
else:
print(f"I couldn't recognize a program in: {json.dumps(c,indent=2)}")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def path_style_prog():
classes = {}
oo = codecs.open("cache/programs/allprogrampaths.txt","w","utf-8")
for f in os.listdir('cache/programs'):
if re.search('^programs_',f):
print(f)
cls = json.loads(codecs.open('cache/programs/'+f,'r','utf-8').read())
for c in cls:
id,output = single_program_path_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def term_txt_to_code(t):
term_codes = {'Winter Intersession':'10','Spring':'30','Summer':'50','Fall':'70'}
m = re.search(r'(^.*)\s(\d\d\d+\d+)$', t)
if m:
yr = m.group(2)
sem = term_codes[m.group(1)]
return yr+sem
return ''
def all_outcomes():
csvfile = codecs.open('cache/courses/alloutcomes.csv','w','utf-8')
csvwriter = csv.writer(csvfile)
csvwriter.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' '))
csvfile2 = codecs.open('cache/courses/all_active_outcomes.csv','w','utf-8')
csvwriter2 = csv.writer(csvfile2)
csvwriter2.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' '))
rr = codecs.open("cache/courses/allclasspaths.txt","r", "utf-8").readlines()
ww = codecs.open("cache/courses/alloutcomes.txt","w", "utf-8")
course_index = []
current_course = {}
current_course_num = 0
term_counts = defaultdict(int)
count = 0
for L in rr:
a = re.search('Course\/(\d+)',L)
if a:
course_num = a.group(1)
#print(course_num, current_course_num)
if (course_num != current_course_num):
if current_course_num != 0:
# log the course info so we can know cq id numbers of courses
course_index.append(current_course)
# status
count += 1
#input('ok ')
if count % 100 == 0:
print(count)
#pass
current_course_num = course_num
#print(course_num)
current_course = {'c':'','d':'','n':'','t':'','s':'','T':'','o':[],'i':'','a':'','m':''}
current_course['c'] = course_num
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Discipline\/(.*)$',L)
if a:
current_course['d'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Number\/(.*)$',L)
if a:
current_course['n'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Title\/(.*)$',L)
if a:
current_course['T'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Short\ Title\/(.*)$',L)
if a:
current_course['t'] = a.group(2)
a = re.search('Course\ Description\/status\/(.*)$',L)
if a:
current_course['s'] = a.group(1)
a = re.search('Course\ Content\/\d+\/Lecture\ Content\/Curriculum\ Approval\ Date:\s*(.*)$',L)
if a:
current_course['a'] = a.group(1)
a = re.search('Course\ Description\/\d+\/Internal\ Processing\ Term\/(.*)$',L)
if a:
t_code = term_txt_to_code(a.group(1))
current_course['m'] = t_code
term_counts[t_code] += 1
# Course/10/10/Course Content/1/Lecture Content/Curriculum Approval Date: 02/24/2014
# Course/3091/1/Course Description/0/Internal Processing Term/Spring 2018
a = re.search('Learning\ Outcomes\/\d+\/(cqid_\d+)\/Learning\ Outcomes\/Description\/(.*)$',L)
if a:
current_course['o'].append(a.group(2))
current_course['i'] = a.group(1)
csvwriter.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)])
if current_course['s']=='Active':
csvwriter2.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)])
if re.search('Learning\ Outcomes\/Description\/',L):
ww.write(L)
if re.search('Description\/entityTitle\/',L):
ww.write(L)
if re.search('Description\/status\/',L):
ww.write(L)
xx = codecs.open("cache/courses/course_cq_index.json","w", "utf-8")
xx.write(json.dumps(course_index, indent=2))
#print(json.dumps(term_counts,indent=2))
def ddl():
return defaultdict(list)
def splitclassline(cl, id=''):
# "PHYS 4A - Physics for Scientists and Engineers I 4.000 *Active*"
dbg = 1
ret = {'name':'','units':'','units_hi':'','code':'','status':'', 'sequence':int(id)}
p1 = re.search(r'^(.*?)\s\-\s(.*)$',cl)
if p1:
code = p1.groups()[0]
ret['code'] = code
rest = p1.groups()[1]
p3 = re.search(r'^(.*)\s(\d+\.\d+)\s\-\s(\d+\.\d+)\s+\*(\w+)\*$',rest)
if p3:
name = p3.groups()[0]
units = p3.groups()[1]
units_hi = p3.groups()[2]
status = p3.groups()[3]
ret['name'] = name
ret['units'] = units
ret['units_hi'] = units_hi
ret['status'] = status
#if dbg: print( "%s --- code: %s - name: %s - units: %s-%s - status: %s" % (cl,code,name,units,units_hi,status))
return ret
p2 = re.search(r'^(.*)\s(\d+\.\d+)\s+\*(\w+)\*$',rest)
if p2:
name = p2.groups()[0]
units = p2.groups()[1]
status = p2.groups()[2]
ret['name'] = name
ret['units'] = units
ret['status'] = status
#if dbg: print( "%s --- code: %s - name: %s - units: %s - status: %s" % (cl,code,name,units,status))
return ret
else:
if dbg: print( "%s --- code: %s --------------------------------" % (cl,code))
else:
if dbg: print( "%s --- code:----------------------------------------" % cl)
#return (cl,'','')
return ret
def path_style_2_html():
verbose = 1
v = verbose
prog_title_subs = []
#with codecs.open('cache/program_published_names.csv', 'r','utf-8') as file:
# reader = csv.reader(file)
# for row in reader:
# prog_title_subs.append(row)
oo = codecs.open("cache/programs/allprogrampaths.txt","r","utf-8").readlines()
award_prebuild = defaultdict( ddl )
last_line = ""
for L in oo:
L = L.strip()
if not re.search(r'^Program',L):
last_line = last_line + " " + L
continue
else:
if re.search(r'\/$',last_line):
# ignore line with trailing slash - assume no data
last_line = L
continue
if re.search(r'Curriculum\sDivision\s\d+', last_line):
#print(last_line)
pass
test_1 = re.search(r'^Program\/(\d+)\/Course',last_line)
if test_1:
award_prebuild[ test_1.groups()[0] ]["Info"].append(last_line)
test_2 = re.search(r'^Program\/(\d+)\/(\d+)\/([\w\s]+)\/',last_line)
if test_2:
award_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line)
last_line = L
output = codecs.open("cache/programs/programs_prebuild.json","w","utf-8")
output.write( json.dumps(award_prebuild, indent=2) )
award_build = defaultdict( ddl )
for AW in sorted(list(award_prebuild.keys()),key=int):
v = 1
aw = award_prebuild[AW]
for line in aw["Program Description"]:
t1 = re.search(r'Division\/(.*)$', line)
if t1:
award_build[AW]["division"] = t1.groups()[0]
t1 = re.search(r'Department\/(.*)$', line)
if t1:
award_build[AW]["dept"] = t1.groups()[0]
t1 = re.search(r'Program\sTitle\/(.*)$', line)
if t1:
award_build[AW]["program_title"] = t1.groups()[0]
t1 = re.search(r'Award\sType\/(.*)$', line)
if t1:
award_build[AW]["award"] = t1.groups()[0]
t1 = re.search(r'\/Description\/(.*)$', line)
if t1:
award_build[AW]["description"] = t1.groups()[0]
t1 = re.search(r'Transfer\/CTE\/(.*)$', line)
if t1:
award_build[AW]["transfer_cte"] = t1.groups()[0]
t1 = re.search(r'CTE\sProgram\?\/\/(.*)$', line)
if t1:
award_build[AW]["is_cte"] = t1.groups()[0]
for line in aw["Info"]:
t1 = re.search(r'Description\/status\/(.*)$', line)
if t1:
award_build[AW]["status"] = t1.groups()[0]
t1 = re.search(r'Description\/proposalType\/(.*)$', line)
if t1:
award_build[AW]["proposal_type"] = t1.groups()[0]
for line in aw["Codes"]:
t1 = re.search(r'Banner\sCode\/(.*)$', line)
if t1:
award_build[AW]["banner_code"] = t1.groups()[0]
# substitute in program names more suitable for publishing
subbed = 0
for L in prog_title_subs:
if award_build[AW]["dept"] == L[0] and award_build[AW]["program_title"] == L[1]:
award_build[AW]["publish_title"] = L[2]
subbed = 1
if v: print("SUBBED")
if len(L)>3:
award_build[AW]["publish_title2"] = L[3]
else:
award_build[AW]["publish_title2"] = ""
if not subbed:
award_build[AW]["publish_title"] = award_build[AW]["dept"]
award_build[AW]["publish_title2"] = ""
if award_build[AW]["program_title"] == "Liberal Arts: Computer Science & Information Systems Emphasis":
award_build[AW]["publish_title"] = "Computer Science and Information Studies"
award_build[AW]["publish_title2"] = "Liberal Arts"
if v: print("-----LIB ART CSIS")
if v:
print("%s / %s - %s" % (award_build[AW]["publish_title"],award_build[AW]["program_title"], award_build[AW]["award"]))
v = 0
for line in aw["Program Learning Outcomes"]:
t1 = re.search(r'Program\sLearning\sOutcomes\/\d+\/Outcome\/(\d+)\/cqid_(\d+)\/Outcome\/Outcome\/(.*)$', line)
if t1:
if "PLO" in award_build[AW]:
award_build[AW]["PLO"].append( (t1.groups()[0], t1.groups()[2]) )
else:
award_build[AW]["PLO"] = [ (t1.groups()[0], t1.groups()[2]), ]
st = lambda x: x[0]
award_build[AW]["PLO"] = sorted( award_build[AW]["PLO"], key=st )
award_build[AW]["PLO"] = [ x[1] for x in award_build[AW]["PLO"] ]
req_prebuild = defaultdict(list)
pbd_unit_calcs = {}
# requirements table:
# - most types have a 'units' column, which might be calculated
# - might be overridden
# - might be single number or a range min/max
current_item_number = 0
for line in aw["Program Requirements"]:
t1 = re.search(r'Program\sBlock\sDefinitions\/(\d+)/cqid_\d+/Program\sBlock\sDefinitions\/(.*)$', line)
if t1:
pbd_number = t1.groups()[0]
if not pbd_number in pbd_unit_calcs:
pbd_unit_calcs[pbd_number] = {'unit_sum':0,'unit_sum_max':0,'override':0,'min':0,'max':0}
t2 = re.search(r'Requirements\/\d+\/Program\sBlock\sDefinitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Course\sBlock\sDefinition\/(.*)$', line)
if t2:
req_prebuild[pbd_number].append( ('h3', '0', t2.groups()[1]) )
current_item_number = 0
continue
t3 = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+\/Program\sCourses\/\d+\/\[Discipline\sand\sCourse\schained\scombo\]\/Course\/(.*)$',line)
if t3:
req_prebuild[pbd_number].append( ('course', t3.groups()[0], splitclassline( t3.groups()[1], t3.groups()[0] )) )
current_item_number = t3.groups()[0]
continue
t3a = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/or$',line)
if t3a:
req_prebuild[pbd_number].append( ('or', t3a.groups()[0]) )
current_item_number = t3a.groups()[0]
continue
t3b = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/and$',line)
if t3b:
req_prebuild[pbd_number].append( ('and', t3b.groups()[0]) )
current_item_number = t3b.groups()[0]
continue
t4 = re.search(r'Definitions\/(\d+)\/cqid_\d+/Program\sBlock\sDefinitions\/\d+\/Program\sCourses/(\d+)/cqid_\d+/Program\sCourses\/Non\-Course\sRequirements\/(.*)$',line)
if t4:
req_prebuild[pbd_number].append( ('noncourse', t4.groups()[1], t4.groups()[2]) )
current_item_number = t4.groups()[1]
continue
t5 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Override\sUnit\sCalculation\/1$',line)
if t5:
pbd_unit_calcs[pbd_number]['override'] = 1
continue
t6 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMin\/(.*)$',line)
if t6:
pbd_unit_calcs[pbd_number]['min'] = t6.groups()[1]
continue
t7 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMax/(.*)$',line)
if t7:
pbd_unit_calcs[pbd_number]['max'] = t7.groups()[1]
continue
t8 = re.search(r'chained\scombo\]\/Discipline',line)
if t8:
continue
t8a = re.search(r'Units\s[Low|High]',line)
if t8a:
continue
t9 = re.search(r'Definitions\/Block\sHeader\/(.*)$',line)
if t9:
req_prebuild[pbd_number].append( ('blockheader', 0.1, t9.groups()[0]) )
continue
req_prebuild[pbd_number].append( ('unknown', current_item_number, t1.groups()[1]) )
award_build[AW]["requirements"] = req_prebuild
award_build[AW]["unit_calcs"] = pbd_unit_calcs
# associate unit calculations with program blocks
for block_key in req_prebuild.keys():
if block_key in pbd_unit_calcs:
req_prebuild[block_key].insert(0, pbd_unit_calcs[block_key])
else:
req_prebuild[block_key].insert(0, {'unit_sum':0,'unit_sum_max':0,'override':0})
# do the unit calc math
for block_key in req_prebuild.keys():
this_block = req_prebuild[block_key]
pad = this_block[0]
if v: print("pad: ",pad)
block_dict = {}
for item in this_block[1:]:
print(item)
try:
if item[0] == "or":
block_dict[ item[1]+"or" ] = 1
if item[0] == "h3":
if v: print("+ ", item[1])
if item[0] == "blockheader":
if v: print(" ", item[1])
if not item[0] == "course":
continue
block_dict[ item[1] ] = item[2]
seq = int(item[1])
units = ''
if item[2]['units']: units = float( item[2]['units'] )
except Exception as e:
print("ERROR ERROR\nERROR ERROR")
print(e)
xyz = input('hit return to continue')
#print( "%i \t %f \t %s" % (seq,units, item[2]['name']))
if v:
for k in sorted( block_dict.keys() ):
print(k," ", block_dict[k])
#for k in sliding_window(3, sorted( block_dict.keys() )):
# l,m,n = k
# if re.search(r'or$',m):
# print("OR")
# print(block_dict[l],"\n",block_dict[m],"\n",block_dict[n],"\n\n")
#print()
output = codecs.open("cache/programs/programs_built.json","w","utf-8")
output.write( json.dumps(award_build, indent=2) )
def course_path_style_2_html():
verbose = 1
v = verbose
dbg = codecs.open('cache/courses/debugout.txt','w','utf-8')
oo = codecs.open("cache/courses/allclasspaths.txt","r","utf-8").readlines()
course_prebuild = defaultdict( ddl )
last_line = ""
for L in oo:
L = L.strip()
if not re.search(r'^Course',L):
last_line = last_line + " <br /> " + L
continue
else:
if re.search(r'\/$',last_line):
# ignore line with trailing slash - assume no data
last_line = L
continue
test_1 = re.search(r'^Course\/(\d+)\/Course',last_line)
if test_1:
course_prebuild[ test_1.groups()[0] ]["Info"].append(last_line)
test_2 = re.search(r'^Course\/(\d+)\/(\d+)\/(.*?)\/(.*)$',last_line)
if test_2:
course_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line)
last_line = L
output = codecs.open("cache/courses/courses_prebuild.json","w","utf-8")
output.write( json.dumps(course_prebuild, indent=2) )
all_courses = {}
active_courses = {}
lookup_table = { 'entityTitle':'title', 'proposalType':'type',
'\/Course\sDescription\/status':'status', 'Course\sDiscipline':'dept',
'Course\sNumber':'number', 'Course\sTitle':'name', 'Course Description\/\d\/Justification':'justification',
'Short\sTitle':'shortname', 'Course Description\/\d\/Internal\sProcessing\sTerm':'term', 'This\sCourse\sIs\sDegree\sApplicable':'degree_applicable',
'\/Course\sDescription\/\d+\/Course\sDescription\/':'desc',
'Minimum\sUnits':'min_units', 'Minimum\sLecture\sHour':'min_lec_hour', 'Minimum\sLab\sHour':'min_lab_hour', 'Course\shas\svariable\shours':'has_var_hours',
'Number\sWeeks':'weeks',
'Maximum\sUnits':'max_units', 'Credit\sStatus':'credit_status',
'TOP\sCode':'top_code', 'Classification':'classification', 'Non\sCredit\sCategory':'noncredit_category', 'Stand-Alone\sClass?':'stand_alone',
'Grade\sOption':'grade_option', 'Is\sRepeatable':'repeatable', 'Learning\sOutcomes\/Description':'slo',
'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sState\sUniversities\sand\sColleges?':'transfer_csu',
'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sUniversity\sof\sCalifornia?':'transfer_uc',
'\/Catalog\sCourse\sSummary\sView\/':'catalog',
'\/Course\sContent/\d+/Lecture\sContent\/':'content',
'\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/':'objectives'}
for C in sorted(list(course_prebuild.keys()),key=int):
v = 0
crs = course_prebuild[C]
course_build = {'slo':{}} # defaultdict( ddl )
if v: print(C)
dbg.write(f"{C}\n")
for K in crs.keys():
if v: print("\t%s" % K)
for line in crs[K]:
for (str,key) in lookup_table.items():
if re.search(str,line):
if key == 'slo':
# \s<br\s\/>\s
content_search = re.search(r'\/Learning\sOutcomes\/\d+\/cqid_(\d+)\/Learning\sOutcomes\/Description\/(.*?)$',line)
if content_search: course_build['slo'][content_search.groups()[0]] = content_search.groups()[1]
else:
print("NO SLO? %s" % line)
elif key == 'desc':
content_search = re.search(r'^Course\/\d+\/\d+\/Course\sDescription\/\d+\/Course\sDescription\/(.*)$',line)
course_build['desc'] = content_search.groups()[0]
elif key == 'catalog':
content_search = re.search(r'^Course\/\d+\/\d+\/General\sEducation\sPattern\/\d+\/Catalog\sCourse\sSummary\sView\/(.*)$',line)
course_build['catalog'] = content_search.groups()[0]
elif key == 'content':
content_search = re.search(r'^Course\/\d+\/\d+\/Course\sContent\/\d+\/Lecture\sContent\/(.*)$',line)
course_build['content'] = content_search.groups()[0]
elif key == 'objectives':
content_search = re.search(r'^Course\/\d+\/\d+\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/(.*)$',line)
course_build['objectives'] = content_search.groups()[0]
else:
content_search = re.search(r'^(.*)\/(.*?)$',line)
course_build[key] = content_search.groups()[1]
dbg.write(f"{key} => {content_search.groups()[1]}\n")
if v: print("\t\t%s - %s" % (key, course_build[key]))
continue
all_courses[C] = course_build
if course_build['status'] == 'Active':
active_courses[C] = course_build
output = codecs.open("cache/courses/courses_built.json","w","utf-8")
output.write( json.dumps(all_courses, indent=2) )
output2 = codecs.open("cache/courses/courses_active_built.json","w","utf-8")
output2.write( json.dumps(active_courses, indent=2) )
#########
#########
#########
#########
def another_request(url,startat):
global err_fail_filecount
newparam = "&skip=" + str(startat)
print((url+newparam))
r = requests.get(url+newparam, auth=(user,pasw))
#print(r.text)
try:
mydata = json.loads(r.text, strict=False)
except Exception as e:
print("Couldn't read that last bit")
#print((r.text))
codecs.open('cache/curric2022failfile_%i.txt' % err_fail_filecount,'w','utf-8').write(r.text)
err_fail_filecount += 1
print(e)
return 0,0,[]
size = mydata['resultSetMetadata']['ResultSetSize']
endn = mydata['resultSetMetadata']['EndResultNum']
items = mydata['entityInstances']
print((' Got ' + str(size) + ' instances, ending at item number ' + str(endn)))
return size,endn,items
def fetch_all_classes():
if os.path.isdir('cache/courses'):
m = datetime.strptime(time.ctime(os.path.getctime('cache/courses')), "%a %b %d %H:%M:%S %Y")
today = 'cache/courses_%s' % m.strftime('%Y_%m_%d')
print("+ Creating folder: %s" % today)
shutil.move('cache/courses', today)
os.makedirs('cache/courses')
size = 100
endn = 0
filen = 1
while(size > 99):
size, endn, items = another_request(CQ_URL+PARAM,endn)
out = codecs.open('cache/courses/classes_'+str(filen)+'.txt','w', 'utf-8')
out.write(json.dumps(items,indent=2))
out.close()
filen += 1
print("Written to 'cache/courses....")
#
#
# Main worker
#
def recur_path_matcher(item, path=[]):
def x2_path_update(x,y,z):
path.extend([str(y),x])
my_result_lines.append( '/'.join(path) + '/' + 'lastEdited' + '/' + z)
path_str = "/".join(path) + "/"
path_str = re.sub('\/+','/',path_str)
path_str = re.sub('\s+',' ',path_str)
my_result_lines = []
if type(item) == type({}):
original_path = path.copy()
match( item,
{'attributes': {'displayName': _}, 'lookUpDisplay': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'attributes': {'displayName': _}, 'fieldValue': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'attributes': {'fieldName': _}, 'fieldValue': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'instanceId':_, 'sectionName': _, 'sectionSortOrder':_},
lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]),
{'instanceId':_, 'sectionName': _, 'instanceSortOrder':_},
lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]),
{'sectionName': _, 'sectionSortOrder':_, 'lastUpdated': _ },
#lambda x,y,z: path.extend([str(y),x,z]),
x2_path_update,
{'sectionName': _, 'sectionSortOrder':_},
lambda x,y: path.extend([str(y),x]),
{'sectionName': _},
lambda x: path.append(x),
_, nothing #lambda x: path.append('')
)
path = original_path
for K,V in list(item.items()):
my_result_lines.extend(recur_path_matcher(V,path))
elif type(item) == type([]):
for V in item:
my_result_lines.extend(recur_path_matcher(V,path))
return my_result_lines
def pathstyle(theclass):
#theclass = json.loads( codecs.open('cache/courses/samplecourse.json','r','utf-8').read() )
# {'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }},
# lambda title,status,typ,id:
# my_result_lines.append("%s%s/%s/%s [%s]" % (path_str, str(typ), str(id), str(title),str(status))) ,
if "entityMetadata" in theclass:
id = theclass["entityMetadata"]["entityId"]
title = theclass["entityMetadata"]["entityTitle"]
typ = theclass["entityMetadata"]["entityType"]
action = theclass["entityMetadata"]["proposalType"]
status = theclass["entityMetadata"]["status"]
#"entityId": 4077,
#"entityTitle": "ENGL2B - American Ethnic Literature",
#"entityType": "Course",
#"proposalType": "Deactivate Course",
#"status": "Historical",
result = [ "/".join([ typ,str(id),"Course Description","entityTitle",title]) ,
"/".join([ typ,str(id),"Course Description","entityType",typ]) ,
"/".join([ typ,str(id),"Course Description","proposalType",action]) ,
"/".join([ typ,str(id),"Course Description","status",status]) , ]
result.extend(recur_path_matcher(theclass["entityFormData"]["rootSections"], [typ,str(id)] ))
#oo = codecs.open("cache/courses/curric2022test_path.json","w","utf-8")
#print(result)
return result
else:
print("didn't seem to be a class.")
def single_course_path_parse(c):
this_course = []
global num_failed_course
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], pathstyle(c))
else:
print("I couldn't recognize a class in that")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def path_style_test():
classes = {}
oo = codecs.open("cache/courses/allclasspaths.txt","w","utf-8")
for f in os.listdir('cache/courses'):
if re.search('^classes_',f):
print(f)
cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read(),strict=False)
for c in cls:
id,output = single_course_path_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def make_sl():
return SortedList(key=lambda x: -1 * int(x['m']))
def course_rank():
csvfile = codecs.open('cache/courses/all_courses_ranked.csv','w','utf-8')
csvwriter = csv.writer(csvfile)
csvwriter.writerow("code,cqcourseid,coursestatus,termineffect,dept,num,numoutcomes".split(","))
courses = json.loads(codecs.open('cache/courses/course_cq_index.json','r','utf-8').read())
all = defaultdict(make_sl)
for c in courses:
code = c['d']+c['n']
if not c['m']:
c['m'] = '200030'
all[code].add(c)
for k in sorted(all.keys()):
#print("\n##",k)
#print(json.dumps(list(all[k]),indent=2))
for version in all[k]:
csvwriter.writerow( [ version['d']+version['n'], version['c'], version['s'], version['m'], version['d'], version['n'], len(version['o']) ])
def de_classpaths():
outfile = codecs.open('cache/courses/all_de_classpaths.txt', 'w','utf-8')
areas = ['Distance Education/1/2/Justification/Need/Justification','/Distance Education/1/3/Content Presentation/<b>A. Methods of Instruction</b>/','/Distance Education/1/3/Content Presentation/<b>B. Instructional Materials and Resources:</b><br/>1. What materials and resources will you provide your students <b>in a virtual environment</b>?/','/Distance Education/4/Assessment/','/Distance Education/4/Methods of Instruction/','/Distance Education/1/3/Content Presentation/2. Have you assessed the use of high-quality open educational resources (OER) to help bridge the digital divide for students in the course? If so, please describe how you will be using them./','/Distance Education/4/Instructional Materials and Resources/','/Distance Education/1/3/Content Presentation/3. How will students be provided access to library materials and other learning resources <b>in a virtual environment</b>? (virtual reference librarian, research guides, digital content, etc.)/','/Distance Education/4/<b>How will students be provided access to library materials and what support will students be provided to help them locate and use these materials?</b><br/>Library and Other Learning Resources/','/Distance Education/1/3/Content Presentation/4. How will students access equitable student support services <b>in a virtual environment</b>? (tutoring, financial aid, counseling, etc.)/','/Distance Education/4/Accommodations for Students with Disabilities/','/6/Distance Education/4/Office Hours/','/Contact/Contact/Description/']
i = 0
for area in areas:
with codecs.open('cache/courses/allclasspaths.txt', 'r','utf-8') as infile:
outfile.writelines(line for line in infile if area in line)
i += 1
if i % 1000 == 0: print(i)
from semesters import human_to_sis, get_previous_season
#from pipelines import area, areas
def extract_digits(input_string):
"""
Removes all non-digit characters from the input string and returns an integer.
:param input_string: The string to process.
:return: An integer containing only the digits from the input string.
"""
#return input_string
digits_only = ''.join(char for char in input_string if char.isdigit())
return int(digits_only) if digits_only else 0
def filter_classes(): # for removing deactivated classes
json_file_path = 'cache/courses/courses_built.json'
output_csv_path = 'cache/courses/active_courses.txt'
all_courses = []
with open(json_file_path, 'r') as json_file:
data = json.load(json_file)
for i,C in data.items():
term = ''
try:
term = C['term']
except:
print(f"** {i} {C['dept']} {C['number']} is missing term")
term = ''
shortname = ''
try:
shortname = C['shortname']
except:
shortname = C['name']
print(f"** {i} {C['dept']} {C['number']} is missing shortname")
all_courses.append(f"{C['dept']} {C['number']} {shortname} \t {C['status']} {C['type']} \t{term} - {i}")
all_courses.sort()
for C in all_courses: print(C)
def slo_summary_report(): # for scheduling slo assessment
json_file_path = 'cache/courses/courses_built.json'
output_csv_path = 'cache/courses/courses_slo_schedule.csv'
term_csv_file_path = 'cache/courses/slo_schedule.csv'
(gp, course_to_area, areacode_to_area, area_to_dean, dean, dean_code_to_name) = campus_dept_hierarchy()
with open(json_file_path, 'r') as json_file:
data = json.load(json_file)
# Extract course information
courses = []
term_courses = []
for key, course in data.items():
try:
#print(f"{course['dept']} - -" )
re_code_course = {
"key": key,
"type": course.get("type", ""),
"status": course.get("status", ""),
"dept": course.get("dept", ""),
"number": course.get("number", ""),
"name": course.get("name", ""),
"first_active_term": course.get("term", ""),
'first_active_term_code': human_to_sis(course.get('term', '')),
"reviewing_term": get_previous_season(course.get("term","")),
"reviewing_term_code": human_to_sis(get_previous_season(course.get('term', ''))),
"area": areacode_to_area[ course_to_area[course.get("dept", "").upper()] ]
}
courses.append(re_code_course)
if course["status"] in ["Active", "In Review"] and course["type"] != "Deactivate Course":
term_courses.append(re_code_course)
except Exception as e:
print(f"error on course: {course['dept']} {course['number']} {course['name']}")
# Sort by dept, number, and term
courses.sort(key=lambda x: (x["dept"], extract_digits(x["number"]), x["reviewing_term_code"]))
term_courses.sort(key=lambda x: (x["reviewing_term_code"],x["dept"], extract_digits(x["number"])))
# Write to CSV
fieldnames = ["dept", "number", "reviewing_term", "reviewing_term_code", "status", "key", "type", "name", "first_active_term", "first_active_term_code","area"]
with open(output_csv_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(courses)
with open(term_csv_file_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(term_courses)
print(f"CSV file '{output_csv_path}' has been created.")
if __name__ == "__main__":
print ('')
options = { 1: ['fetch all courses', fetch_all_classes],
2: ['process all classes', path_style_test],
3: ['courses - path style to json and html catalog', course_path_style_2_html],
4: ['show course outcomes', all_outcomes],
5: ['courses - rank by all versions', course_rank],
6: ['extract de info from class paths', de_classpaths],
7: ['build schedule or summary for SLO planning', slo_summary_report],
8: ['remove deactivated courses', filter_classes],
10: ['fetch all programs', fetch_all_programs],
11: ['process all programs', path_style_prog],
12: ['programs - path style to html catalog', path_style_2_html],
}
print ('')
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
resp = int(sys.argv[1])
print("\n\nPerforming: %s\n\n" % options[resp][0])
else:
print ('')
for key in options:
print(str(key) + '.\t' + options[key][0])
print('')
resp = input('Choose: ')
# Call the function in the options dict
options[ int(resp)][1]()