889 lines
37 KiB
Python
889 lines
37 KiB
Python
import requests,json,os,re, bisect, csv, codecs, funcy, sys, shutil, time
|
|
from datetime import datetime
|
|
import sortedcontainers as sc
|
|
from collections import defaultdict
|
|
from toolz.itertoolz import groupby,sliding_window
|
|
from sortedcontainers import SortedList
|
|
#from durable.lang import *
|
|
#from durable.engine import *
|
|
from pampy import match, _
|
|
from bs4 import BeautifulSoup as bs
|
|
|
|
leafcount = 0
|
|
displaynames = []
|
|
|
|
from canvas_secrets import cq_user, cq_pasw
|
|
|
|
from outcomes import quick_add_course_outcomes
|
|
|
|
|
|
CQ_URL = "https://secure.curricunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc"
|
|
CQ_URL = "https://mws.services.curriqunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc"
|
|
PARAM = "?returnFormat=json&method=getCourses"
|
|
|
|
user = cq_user
|
|
pasw = cq_pasw
|
|
|
|
err_fail_filecount = 1
|
|
|
|
|
|
|
|
def fetch_all_programs():
|
|
if os.path.isdir('cache/programs'):
|
|
m = datetime.strptime(time.ctime(os.path.getctime('cache/programs')), "%a %b %d %H:%M:%S %Y")
|
|
today = 'cache/programs_%s' % m.strftime('%Y_%m_%d')
|
|
|
|
print("+ Creating folder: %s" % today)
|
|
shutil.move('cache/programs', today)
|
|
os.makedirs('cache/programs')
|
|
|
|
size = 100
|
|
endn = 0
|
|
filen = 1
|
|
PARAM = "?returnFormat=json&method=getPrograms&status=Active"
|
|
while(size > 99):
|
|
size, endn, items = another_request(CQ_URL+PARAM,endn)
|
|
out = codecs.open('cache/programs/programs_'+str(filen)+'.txt','w', 'utf-8')
|
|
out.write(json.dumps(items,indent=4))
|
|
out.close()
|
|
filen += 1
|
|
print("Written to 'cache/programs....")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def nothing(x=0):
|
|
pass
|
|
|
|
seen = []
|
|
|
|
def clean(st):
|
|
#return st
|
|
global seen
|
|
ok = ['b','i','ul','li','ol','strong','br','u']
|
|
|
|
soup = bs(st, features='lxml')
|
|
|
|
"""for tag in soup.recursiveChildGenerator():
|
|
if isinstance(tag,bs.Tag) and tag.name not in ok:
|
|
tag.unwrap()
|
|
|
|
return soup.prettify()
|
|
"""
|
|
|
|
for T in soup.find_all(recursive=True):
|
|
if not T.name in ok:
|
|
if not T.name in seen:
|
|
seen.append(T.name)
|
|
print("- %s" % T.name)
|
|
#print(seen)
|
|
T.unwrap()
|
|
else:
|
|
#print("+ %s" % T.name)
|
|
pass
|
|
|
|
return str(soup).strip()
|
|
|
|
|
|
|
|
def recur_matcher(item, depth=0):
|
|
indent = depth * " "
|
|
my_result_lines = []
|
|
if type(item) == type({}):
|
|
if not match( item,
|
|
{'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }},
|
|
lambda title,status,typ,id:
|
|
my_result_lines.append("%s%s: %s (id %s) status: %s" % (indent, str(typ), str(title), str(id), str(status))) ,
|
|
{'attributes': {'displayName': _}, 'lookUpDisplay': _, },
|
|
lambda x,y: my_result_lines.append("%s%s: %s" % (indent, clean(str(x)), clean(str(y)))) ,
|
|
{'attributes': {'displayName': _}, 'fieldValue': _, },
|
|
lambda x,y: my_result_lines.append("%s%s: %s" % (indent, clean(str(x)), clean(str(y)))) ,
|
|
{'sectionName': _},
|
|
lambda x: my_result_lines.append("%sSection: %s" % (indent, str(x))) ,
|
|
_, nothing
|
|
):
|
|
for K,V in list(item.items()):
|
|
my_result_lines.extend(recur_matcher(V,depth+1))
|
|
elif type(item) == type([]):
|
|
for V in item:
|
|
my_result_lines.extend(recur_matcher(V,depth+1))
|
|
return my_result_lines
|
|
|
|
|
|
|
|
num_failed_course = 1
|
|
|
|
def single_course_parse(c):
|
|
global num_failed_course
|
|
this_course = []
|
|
if "attributes" in c and "entityId" in c["attributes"]:
|
|
print(c["attributes"]["entityId"])
|
|
return (c["attributes"]["entityId"], recur_matcher(c))
|
|
else:
|
|
print("I couldn't recognize a class in that")
|
|
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
|
|
ooops.write(json.dumps(c,indent=2))
|
|
ooops.close()
|
|
num_failed_course = num_failed_course + 1
|
|
return ("-1", [])
|
|
|
|
def match_style_test():
|
|
classes = {}
|
|
oo = codecs.open("cache/courses/curric2022test.json","w","utf-8")
|
|
for f in os.listdir('cache/courses'):
|
|
if re.search('classes_',f):
|
|
print(f)
|
|
cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read())
|
|
for c in cls:
|
|
id,output = single_course_parse(c)
|
|
classes[id] = "\n".join(output)
|
|
oo.write( classes[id] )
|
|
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
|
|
oo.flush()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def single_program_path_parse(c):
|
|
this_course = []
|
|
global num_failed_course
|
|
if "attributes" in c and "entityId" in c["attributes"]:
|
|
print(c["attributes"]["entityId"])
|
|
return (c["attributes"]["entityId"], pathstyle(c))
|
|
else:
|
|
print("I couldn't recognize a program in that")
|
|
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
|
|
ooops.write(json.dumps(c,indent=2))
|
|
ooops.close()
|
|
num_failed_course = num_failed_course + 1
|
|
return ("-1", [])
|
|
|
|
|
|
def path_style_prog():
|
|
classes = {}
|
|
oo = codecs.open("cache/programs/allprogrampaths.txt","w","utf-8")
|
|
for f in os.listdir('cache/programs'):
|
|
if re.search('^programs_',f):
|
|
print(f)
|
|
cls = json.loads(codecs.open('cache/programs/'+f,'r','utf-8').read())
|
|
for c in cls:
|
|
id,output = single_program_path_parse(c)
|
|
classes[id] = "\n".join(output)
|
|
oo.write( classes[id] )
|
|
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
|
|
oo.flush()
|
|
|
|
def term_txt_to_code(t):
|
|
term_codes = {'Spring':'30','Summer':'50','Fall':'70'}
|
|
parts = t.split(" ")
|
|
if len(parts)>1:
|
|
yr = parts[1]
|
|
sem = term_codes[parts[0]]
|
|
return yr+sem
|
|
return ''
|
|
|
|
|
|
def all_outcomes():
|
|
csvfile = codecs.open('cache/courses/alloutcomes.csv','w','utf-8')
|
|
csvwriter = csv.writer(csvfile)
|
|
csvwriter.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' '))
|
|
|
|
csvfile2 = codecs.open('cache/courses/all_active_outcomes.csv','w','utf-8')
|
|
csvwriter2 = csv.writer(csvfile2)
|
|
csvwriter2.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' '))
|
|
|
|
rr = codecs.open("cache/courses/allclasspaths.txt","r", "utf-8").readlines()
|
|
ww = codecs.open("cache/courses/alloutcomes.txt","w", "utf-8")
|
|
course_index = []
|
|
|
|
current_course = {}
|
|
current_course_num = 0
|
|
|
|
term_counts = defaultdict(int)
|
|
|
|
count = 0
|
|
|
|
for L in rr:
|
|
a = re.search('Course\/(\d+)',L)
|
|
if a:
|
|
course_num = a.group(1)
|
|
#print(course_num, current_course_num)
|
|
|
|
if (course_num != current_course_num):
|
|
if current_course_num != 0:
|
|
# log the course info so we can know cq id numbers of courses
|
|
course_index.append(current_course)
|
|
|
|
# status
|
|
count += 1
|
|
#input('ok ')
|
|
if count % 100 == 0:
|
|
print(count)
|
|
#pass
|
|
|
|
current_course_num = course_num
|
|
#print(course_num)
|
|
current_course = {'c':'','d':'','n':'','t':'','s':'','T':'','o':[],'i':'','a':'','m':''}
|
|
current_course['c'] = course_num
|
|
|
|
|
|
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Discipline\/(.*)$',L)
|
|
if a:
|
|
current_course['d'] = a.group(2)
|
|
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Number\/(.*)$',L)
|
|
if a:
|
|
current_course['n'] = a.group(2)
|
|
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Title\/(.*)$',L)
|
|
if a:
|
|
current_course['T'] = a.group(2)
|
|
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Short\ Title\/(.*)$',L)
|
|
if a:
|
|
current_course['t'] = a.group(2)
|
|
a = re.search('Course\ Description\/status\/(.*)$',L)
|
|
if a:
|
|
current_course['s'] = a.group(1)
|
|
a = re.search('Course\ Content\/\d+\/Lecture\ Content\/Curriculum\ Approval\ Date:\s*(.*)$',L)
|
|
if a:
|
|
current_course['a'] = a.group(1)
|
|
a = re.search('Course\ Description\/\d+\/Internal\ Processing\ Term\/(.*)$',L)
|
|
if a:
|
|
t_code = term_txt_to_code(a.group(1))
|
|
current_course['m'] = t_code
|
|
term_counts[t_code] += 1
|
|
|
|
# Course/10/10/Course Content/1/Lecture Content/Curriculum Approval Date: 02/24/2014
|
|
|
|
# Course/3091/1/Course Description/0/Internal Processing Term/Spring 2018
|
|
|
|
a = re.search('Learning\ Outcomes\/\d+\/(cqid_\d+)\/Learning\ Outcomes\/Description\/(.*)$',L)
|
|
if a:
|
|
current_course['o'].append(a.group(2))
|
|
current_course['i'] = a.group(1)
|
|
csvwriter.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)])
|
|
if current_course['s']=='Active':
|
|
csvwriter2.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)])
|
|
|
|
|
|
if re.search('Learning\ Outcomes\/Description\/',L):
|
|
ww.write(L)
|
|
if re.search('Description\/entityTitle\/',L):
|
|
ww.write(L)
|
|
if re.search('Description\/status\/',L):
|
|
ww.write(L)
|
|
|
|
xx = codecs.open("cache/courses/course_cq_index.json","w", "utf-8")
|
|
xx.write(json.dumps(course_index, indent=2))
|
|
|
|
#print(json.dumps(term_counts,indent=2))
|
|
|
|
def ddl():
|
|
return defaultdict(list)
|
|
|
|
def splitclassline(cl, id=''):
|
|
# "PHYS 4A - Physics for Scientists and Engineers I 4.000 *Active*"
|
|
dbg = 1
|
|
ret = {'name':'','units':'','units_hi':'','code':'','status':'', 'sequence':int(id)}
|
|
p1 = re.search(r'^(.*?)\s\-\s(.*)$',cl)
|
|
if p1:
|
|
code = p1.groups()[0]
|
|
ret['code'] = code
|
|
rest = p1.groups()[1]
|
|
|
|
p3 = re.search(r'^(.*)\s(\d+\.\d+)\s\-\s(\d+\.\d+)\s+\*(\w+)\*$',rest)
|
|
if p3:
|
|
name = p3.groups()[0]
|
|
units = p3.groups()[1]
|
|
units_hi = p3.groups()[2]
|
|
status = p3.groups()[3]
|
|
ret['name'] = name
|
|
ret['units'] = units
|
|
ret['units_hi'] = units_hi
|
|
ret['status'] = status
|
|
#if dbg: print( "%s --- code: %s - name: %s - units: %s-%s - status: %s" % (cl,code,name,units,units_hi,status))
|
|
return ret
|
|
p2 = re.search(r'^(.*)\s(\d+\.\d+)\s+\*(\w+)\*$',rest)
|
|
if p2:
|
|
name = p2.groups()[0]
|
|
units = p2.groups()[1]
|
|
status = p2.groups()[2]
|
|
ret['name'] = name
|
|
ret['units'] = units
|
|
ret['status'] = status
|
|
#if dbg: print( "%s --- code: %s - name: %s - units: %s - status: %s" % (cl,code,name,units,status))
|
|
return ret
|
|
|
|
|
|
else:
|
|
if dbg: print( "%s --- code: %s --------------------------------" % (cl,code))
|
|
else:
|
|
if dbg: print( "%s --- code:----------------------------------------" % cl)
|
|
#return (cl,'','')
|
|
return ret
|
|
|
|
|
|
def path_style_2_html():
|
|
verbose = 1
|
|
v = verbose
|
|
|
|
prog_title_subs = []
|
|
with codecs.open('cache/program_published_names.csv', 'r','utf-8') as file:
|
|
reader = csv.reader(file)
|
|
for row in reader:
|
|
prog_title_subs.append(row)
|
|
|
|
|
|
oo = codecs.open("cache/programs/allprogrampaths.txt","r","utf-8").readlines()
|
|
award_prebuild = defaultdict( ddl )
|
|
last_line = ""
|
|
|
|
for L in oo:
|
|
L = L.strip()
|
|
if not re.search(r'^Program',L):
|
|
last_line = last_line + " " + L
|
|
continue
|
|
else:
|
|
if re.search(r'\/$',last_line):
|
|
# ignore line with trailing slash - assume no data
|
|
last_line = L
|
|
continue
|
|
|
|
if re.search(r'Curriculum\sDivision\s\d+', last_line):
|
|
#print(last_line)
|
|
pass
|
|
|
|
test_1 = re.search(r'^Program\/(\d+)\/Course',last_line)
|
|
if test_1:
|
|
award_prebuild[ test_1.groups()[0] ]["Info"].append(last_line)
|
|
test_2 = re.search(r'^Program\/(\d+)\/(\d+)\/([\w\s]+)\/',last_line)
|
|
if test_2:
|
|
award_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line)
|
|
last_line = L
|
|
output = codecs.open("cache/programs/programs_prebuild.json","w","utf-8")
|
|
output.write( json.dumps(award_prebuild, indent=2) )
|
|
|
|
|
|
award_build = defaultdict( ddl )
|
|
|
|
for AW in sorted(list(award_prebuild.keys()),key=int):
|
|
v = 1
|
|
aw = award_prebuild[AW]
|
|
for line in aw["Program Description"]:
|
|
t1 = re.search(r'Division\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["division"] = t1.groups()[0]
|
|
t1 = re.search(r'Department\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["dept"] = t1.groups()[0]
|
|
t1 = re.search(r'Program\sTitle\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["program_title"] = t1.groups()[0]
|
|
t1 = re.search(r'Award\sType\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["award"] = t1.groups()[0]
|
|
t1 = re.search(r'\/Description\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["description"] = t1.groups()[0]
|
|
t1 = re.search(r'Transfer\/CTE\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["transfer_cte"] = t1.groups()[0]
|
|
t1 = re.search(r'CTE\sProgram\?\/\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["is_cte"] = t1.groups()[0]
|
|
|
|
for line in aw["Info"]:
|
|
t1 = re.search(r'Description\/status\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["status"] = t1.groups()[0]
|
|
t1 = re.search(r'Description\/proposalType\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["proposal_type"] = t1.groups()[0]
|
|
|
|
for line in aw["Codes"]:
|
|
t1 = re.search(r'Banner\sCode\/(.*)$', line)
|
|
if t1:
|
|
award_build[AW]["banner_code"] = t1.groups()[0]
|
|
|
|
# substitute in program names more suitable for publishing
|
|
subbed = 0
|
|
for L in prog_title_subs:
|
|
if award_build[AW]["dept"] == L[0] and award_build[AW]["program_title"] == L[1]:
|
|
award_build[AW]["publish_title"] = L[2]
|
|
subbed = 1
|
|
if v: print("SUBBED")
|
|
if len(L)>3:
|
|
award_build[AW]["publish_title2"] = L[3]
|
|
else:
|
|
award_build[AW]["publish_title2"] = ""
|
|
|
|
if not subbed:
|
|
award_build[AW]["publish_title"] = award_build[AW]["dept"]
|
|
award_build[AW]["publish_title2"] = ""
|
|
if award_build[AW]["program_title"] == "Liberal Arts: Computer Science & Information Systems Emphasis":
|
|
award_build[AW]["publish_title"] = "Computer Science and Information Studies"
|
|
award_build[AW]["publish_title2"] = "Liberal Arts"
|
|
if v: print("-----LIB ART CSIS")
|
|
|
|
if v:
|
|
print("%s / %s - %s" % (award_build[AW]["publish_title"],award_build[AW]["program_title"], award_build[AW]["award"]))
|
|
|
|
v = 0
|
|
|
|
for line in aw["Program Learning Outcomes"]:
|
|
t1 = re.search(r'Program\sLearning\sOutcomes\/\d+\/Outcome\/(\d+)\/cqid_(\d+)\/Outcome\/Outcome\/(.*)$', line)
|
|
if t1:
|
|
if "PLO" in award_build[AW]:
|
|
award_build[AW]["PLO"].append( (t1.groups()[0], t1.groups()[2]) )
|
|
else:
|
|
award_build[AW]["PLO"] = [ (t1.groups()[0], t1.groups()[2]), ]
|
|
|
|
st = lambda x: x[0]
|
|
award_build[AW]["PLO"] = sorted( award_build[AW]["PLO"], key=st )
|
|
award_build[AW]["PLO"] = [ x[1] for x in award_build[AW]["PLO"] ]
|
|
req_prebuild = defaultdict(list)
|
|
|
|
pbd_unit_calcs = {}
|
|
|
|
# requirements table:
|
|
# - most types have a 'units' column, which might be calculated
|
|
# - might be overridden
|
|
# - might be single number or a range min/max
|
|
|
|
|
|
current_item_number = 0
|
|
for line in aw["Program Requirements"]:
|
|
t1 = re.search(r'Program\sBlock\sDefinitions\/(\d+)/cqid_\d+/Program\sBlock\sDefinitions\/(.*)$', line)
|
|
if t1:
|
|
pbd_number = t1.groups()[0]
|
|
if not pbd_number in pbd_unit_calcs:
|
|
pbd_unit_calcs[pbd_number] = {'unit_sum':0,'unit_sum_max':0,'override':0,'min':0,'max':0}
|
|
t2 = re.search(r'Requirements\/\d+\/Program\sBlock\sDefinitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Course\sBlock\sDefinition\/(.*)$', line)
|
|
if t2:
|
|
req_prebuild[pbd_number].append( ('h3', '0', t2.groups()[1]) )
|
|
current_item_number = 0
|
|
continue
|
|
t3 = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+\/Program\sCourses\/\d+\/\[Discipline\sand\sCourse\schained\scombo\]\/Course\/(.*)$',line)
|
|
if t3:
|
|
req_prebuild[pbd_number].append( ('course', t3.groups()[0], splitclassline( t3.groups()[1], t3.groups()[0] )) )
|
|
current_item_number = t3.groups()[0]
|
|
continue
|
|
t3a = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/or$',line)
|
|
if t3a:
|
|
req_prebuild[pbd_number].append( ('or', t3a.groups()[0]) )
|
|
current_item_number = t3a.groups()[0]
|
|
continue
|
|
t3b = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/and$',line)
|
|
if t3b:
|
|
req_prebuild[pbd_number].append( ('and', t3b.groups()[0]) )
|
|
current_item_number = t3b.groups()[0]
|
|
continue
|
|
t4 = re.search(r'Definitions\/(\d+)\/cqid_\d+/Program\sBlock\sDefinitions\/\d+\/Program\sCourses/(\d+)/cqid_\d+/Program\sCourses\/Non\-Course\sRequirements\/(.*)$',line)
|
|
if t4:
|
|
req_prebuild[pbd_number].append( ('noncourse', t4.groups()[1], t4.groups()[2]) )
|
|
current_item_number = t4.groups()[1]
|
|
continue
|
|
t5 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Override\sUnit\sCalculation\/1$',line)
|
|
if t5:
|
|
pbd_unit_calcs[pbd_number]['override'] = 1
|
|
continue
|
|
t6 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMin\/(.*)$',line)
|
|
if t6:
|
|
pbd_unit_calcs[pbd_number]['min'] = t6.groups()[1]
|
|
continue
|
|
t7 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMax/(.*)$',line)
|
|
if t7:
|
|
pbd_unit_calcs[pbd_number]['max'] = t7.groups()[1]
|
|
continue
|
|
t8 = re.search(r'chained\scombo\]\/Discipline',line)
|
|
if t8:
|
|
continue
|
|
t8a = re.search(r'Units\s[Low|High]',line)
|
|
if t8a:
|
|
continue
|
|
t9 = re.search(r'Definitions\/Block\sHeader\/(.*)$',line)
|
|
if t9:
|
|
req_prebuild[pbd_number].append( ('blockheader', 0.1, t9.groups()[0]) )
|
|
continue
|
|
req_prebuild[pbd_number].append( ('unknown', current_item_number, t1.groups()[1]) )
|
|
award_build[AW]["requirements"] = req_prebuild
|
|
award_build[AW]["unit_calcs"] = pbd_unit_calcs
|
|
|
|
# associate unit calculations with program blocks
|
|
for block_key in req_prebuild.keys():
|
|
if block_key in pbd_unit_calcs:
|
|
req_prebuild[block_key].insert(0, pbd_unit_calcs[block_key])
|
|
else:
|
|
req_prebuild[block_key].insert(0, {'unit_sum':0,'unit_sum_max':0,'override':0})
|
|
|
|
# do the unit calc math
|
|
for block_key in req_prebuild.keys():
|
|
this_block = req_prebuild[block_key]
|
|
pad = this_block[0]
|
|
if v: print("pad: ",pad)
|
|
block_dict = {}
|
|
for item in this_block[1:]:
|
|
print(item)
|
|
try:
|
|
if item[0] == "or":
|
|
block_dict[ item[1]+"or" ] = 1
|
|
if item[0] == "h3":
|
|
if v: print("+ ", item[1])
|
|
if item[0] == "blockheader":
|
|
if v: print(" ", item[1])
|
|
if not item[0] == "course":
|
|
continue
|
|
block_dict[ item[1] ] = item[2]
|
|
seq = int(item[1])
|
|
units = ''
|
|
if item[2]['units']: units = float( item[2]['units'] )
|
|
except Exception as e:
|
|
print("ERROR ERROR\nERROR ERROR")
|
|
print(e)
|
|
xyz = input('hit return to continue')
|
|
#print( "%i \t %f \t %s" % (seq,units, item[2]['name']))
|
|
if v:
|
|
for k in sorted( block_dict.keys() ):
|
|
print(k," ", block_dict[k])
|
|
#for k in sliding_window(3, sorted( block_dict.keys() )):
|
|
# l,m,n = k
|
|
# if re.search(r'or$',m):
|
|
# print("OR")
|
|
# print(block_dict[l],"\n",block_dict[m],"\n",block_dict[n],"\n\n")
|
|
#print()
|
|
|
|
|
|
output = codecs.open("cache/programs/programs_built.json","w","utf-8")
|
|
output.write( json.dumps(award_build, indent=2) )
|
|
|
|
|
|
|
|
|
|
|
|
def course_path_style_2_html():
|
|
verbose = 1
|
|
v = verbose
|
|
|
|
|
|
oo = codecs.open("cache/courses/allclasspaths.txt","r","utf-8").readlines()
|
|
course_prebuild = defaultdict( ddl )
|
|
last_line = ""
|
|
|
|
for L in oo:
|
|
L = L.strip()
|
|
if not re.search(r'^Course',L):
|
|
last_line = last_line + " <br /> " + L
|
|
continue
|
|
else:
|
|
if re.search(r'\/$',last_line):
|
|
# ignore line with trailing slash - assume no data
|
|
last_line = L
|
|
continue
|
|
|
|
|
|
test_1 = re.search(r'^Course\/(\d+)\/Course',last_line)
|
|
if test_1:
|
|
course_prebuild[ test_1.groups()[0] ]["Info"].append(last_line)
|
|
test_2 = re.search(r'^Course\/(\d+)\/(\d+)\/(.*?)\/(.*)$',last_line)
|
|
if test_2:
|
|
course_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line)
|
|
last_line = L
|
|
output = codecs.open("cache/courses/courses_prebuild.json","w","utf-8")
|
|
output.write( json.dumps(course_prebuild, indent=2) )
|
|
|
|
all_courses = {}
|
|
active_courses = {}
|
|
|
|
lookup_table = { 'entityTitle':'title', 'proposalType':'type',
|
|
'\/Course\sDescription\/status':'status', 'Course\sDiscipline':'dept',
|
|
'Course\sNumber':'number', 'Course\sTitle':'name',
|
|
'Short\sTitle':'shortname', 'Internal\sProcessing\sTerm':'term', 'This\sCourse\sIs\sDegree\sApplicable':'degree_applicable',
|
|
'\/Course\sDescription\/\d+\/Course\sDescription\/':'desc',
|
|
'Minimum\sUnits':'min_units', 'Minimum\sLecture\sHour':'min_lec_hour', 'Minimum\sLab\sHour':'min_lab_hour', 'Course\shas\svariable\shours':'has_var_hours',
|
|
'Number\sWeeks':'weeks',
|
|
'Maximum\sUnits':'max_units', 'Credit\sStatus':'credit_status',
|
|
'TOP\sCode':'top_code', 'Classification':'classification', 'Non\sCredit\sCategory':'noncredit_category', 'Stand-Alone\sClass?':'stand_alone',
|
|
'Grade\sOption':'grade_option', 'Is\sRepeatable':'repeatable', 'Learning\sOutcomes\/Description':'slo',
|
|
'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sState\sUniversities\sand\sColleges?':'transfer_csu',
|
|
'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sUniversity\sof\sCalifornia?':'transfer_uc',
|
|
'\/Catalog\sCourse\sSummary\sView\/':'catalog',
|
|
'\/Course\sContent/\d+/Lecture\sContent\/':'content',
|
|
'\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/':'objectives'}
|
|
|
|
for C in sorted(list(course_prebuild.keys()),key=int):
|
|
v = 0
|
|
crs = course_prebuild[C]
|
|
course_build = {'slo':{}} # defaultdict( ddl )
|
|
if v: print(C)
|
|
|
|
for K in crs.keys():
|
|
if v: print("\t%s" % K)
|
|
for line in crs[K]:
|
|
for (str,key) in lookup_table.items():
|
|
if re.search(str,line):
|
|
if key == 'slo':
|
|
# \s<br\s\/>\s
|
|
content_search = re.search(r'\/Learning\sOutcomes\/\d+\/cqid_(\d+)\/Learning\sOutcomes\/Description\/(.*?)$',line)
|
|
if content_search: course_build['slo'][content_search.groups()[0]] = content_search.groups()[1]
|
|
else:
|
|
print("NO SLO? %s" % line)
|
|
elif key == 'desc':
|
|
content_search = re.search(r'^Course\/\d+\/\d+\/Course\sDescription\/\d+\/Course\sDescription\/(.*)$',line)
|
|
course_build['desc'] = content_search.groups()[0]
|
|
elif key == 'catalog':
|
|
content_search = re.search(r'^Course\/\d+\/\d+\/General\sEducation\sPattern\/\d+\/Catalog\sCourse\sSummary\sView\/(.*)$',line)
|
|
course_build['catalog'] = content_search.groups()[0]
|
|
elif key == 'content':
|
|
content_search = re.search(r'^Course\/\d+\/\d+\/Course\sContent\/\d+\/Lecture\sContent\/(.*)$',line)
|
|
course_build['content'] = content_search.groups()[0]
|
|
elif key == 'objectives':
|
|
content_search = re.search(r'^Course\/\d+\/\d+\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/(.*)$',line)
|
|
course_build['objectives'] = content_search.groups()[0]
|
|
else:
|
|
content_search = re.search(r'^(.*)\/(.*?)$',line)
|
|
course_build[key] = content_search.groups()[1]
|
|
if v: print("\t\t%s - %s" % (key, course_build[key]))
|
|
continue
|
|
|
|
all_courses[C] = course_build
|
|
if course_build['status'] == 'Active':
|
|
active_courses[C] = course_build
|
|
output = codecs.open("cache/courses/courses_built.json","w","utf-8")
|
|
output.write( json.dumps(all_courses, indent=2) )
|
|
|
|
output2 = codecs.open("cache/courses/courses_active_built.json","w","utf-8")
|
|
output2.write( json.dumps(active_courses, indent=2) )
|
|
|
|
|
|
|
|
#########
|
|
#########
|
|
#########
|
|
#########
|
|
|
|
|
|
def another_request(url,startat):
|
|
global err_fail_filecount
|
|
newparam = "&skip=" + str(startat)
|
|
print((url+newparam))
|
|
r = requests.get(url+newparam, auth=(user,pasw))
|
|
#print(r.text)
|
|
try:
|
|
mydata = json.loads(r.text, strict=False)
|
|
except Exception as e:
|
|
print("Couldn't read that last bit")
|
|
#print((r.text))
|
|
codecs.open('cache/curric2022failfile_%i.txt' % err_fail_filecount,'w','utf-8').write(r.text)
|
|
err_fail_filecount += 1
|
|
print(e)
|
|
return 0,0,[]
|
|
|
|
size = mydata['resultSetMetadata']['ResultSetSize']
|
|
endn = mydata['resultSetMetadata']['EndResultNum']
|
|
items = mydata['entityInstances']
|
|
print((' Got ' + str(size) + ' instances, ending at item number ' + str(endn)))
|
|
return size,endn,items
|
|
|
|
|
|
|
|
|
|
def fetch_all_classes():
|
|
if os.path.isdir('cache/courses'):
|
|
m = datetime.strptime(time.ctime(os.path.getctime('cache/courses')), "%a %b %d %H:%M:%S %Y")
|
|
today = 'cache/courses_%s' % m.strftime('%Y_%m_%d')
|
|
|
|
print("+ Creating folder: %s" % today)
|
|
shutil.move('cache/courses', today)
|
|
os.makedirs('cache/courses')
|
|
|
|
size = 100
|
|
endn = 0
|
|
filen = 1
|
|
while(size > 99):
|
|
size, endn, items = another_request(CQ_URL+PARAM,endn)
|
|
out = codecs.open('cache/courses/classes_'+str(filen)+'.txt','w', 'utf-8')
|
|
out.write(json.dumps(items,indent=2))
|
|
out.close()
|
|
filen += 1
|
|
print("Written to 'cache/courses....")
|
|
|
|
|
|
|
|
|
|
#
|
|
#
|
|
# Main worker
|
|
#
|
|
|
|
def recur_path_matcher(item, path=[]):
|
|
def x2_path_update(x,y,z):
|
|
path.extend([str(y),x])
|
|
my_result_lines.append( '/'.join(path) + '/' + 'lastEdited' + '/' + z)
|
|
|
|
path_str = "/".join(path) + "/"
|
|
path_str = re.sub('\/+','/',path_str)
|
|
path_str = re.sub('\s+',' ',path_str)
|
|
my_result_lines = []
|
|
if type(item) == type({}):
|
|
original_path = path.copy()
|
|
match( item,
|
|
{'attributes': {'displayName': _}, 'lookUpDisplay': _, },
|
|
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
|
|
{'attributes': {'displayName': _}, 'fieldValue': _, },
|
|
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
|
|
{'attributes': {'fieldName': _}, 'fieldValue': _, },
|
|
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
|
|
{'instanceId':_, 'sectionName': _, 'sectionSortOrder':_},
|
|
lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]),
|
|
{'instanceId':_, 'sectionName': _, 'instanceSortOrder':_},
|
|
lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]),
|
|
{'sectionName': _, 'sectionSortOrder':_, 'lastUpdated': _ },
|
|
#lambda x,y,z: path.extend([str(y),x,z]),
|
|
x2_path_update,
|
|
{'sectionName': _, 'sectionSortOrder':_},
|
|
lambda x,y: path.extend([str(y),x]),
|
|
{'sectionName': _},
|
|
lambda x: path.append(x),
|
|
_, nothing #lambda x: path.append('')
|
|
)
|
|
path = original_path
|
|
for K,V in list(item.items()):
|
|
my_result_lines.extend(recur_path_matcher(V,path))
|
|
|
|
elif type(item) == type([]):
|
|
for V in item:
|
|
my_result_lines.extend(recur_path_matcher(V,path))
|
|
return my_result_lines
|
|
|
|
|
|
|
|
|
|
def pathstyle(theclass):
|
|
#theclass = json.loads( codecs.open('cache/courses/samplecourse.json','r','utf-8').read() )
|
|
# {'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }},
|
|
# lambda title,status,typ,id:
|
|
# my_result_lines.append("%s%s/%s/%s [%s]" % (path_str, str(typ), str(id), str(title),str(status))) ,
|
|
if "entityMetadata" in theclass:
|
|
id = theclass["entityMetadata"]["entityId"]
|
|
title = theclass["entityMetadata"]["entityTitle"]
|
|
typ = theclass["entityMetadata"]["entityType"]
|
|
action = theclass["entityMetadata"]["proposalType"]
|
|
status = theclass["entityMetadata"]["status"]
|
|
|
|
#"entityId": 4077,
|
|
#"entityTitle": "ENGL2B - American Ethnic Literature",
|
|
#"entityType": "Course",
|
|
#"proposalType": "Deactivate Course",
|
|
#"status": "Historical",
|
|
|
|
result = [ "/".join([ typ,str(id),"Course Description","entityTitle",title]) ,
|
|
"/".join([ typ,str(id),"Course Description","entityType",typ]) ,
|
|
"/".join([ typ,str(id),"Course Description","proposalType",action]) ,
|
|
"/".join([ typ,str(id),"Course Description","status",status]) , ]
|
|
|
|
result.extend(recur_path_matcher(theclass["entityFormData"]["rootSections"], [typ,str(id)] ))
|
|
#oo = codecs.open("cache/courses/curric2022test_path.json","w","utf-8")
|
|
#print(result)
|
|
return result
|
|
else:
|
|
print("didn't seem to be a class.")
|
|
|
|
|
|
|
|
def single_course_path_parse(c):
|
|
this_course = []
|
|
global num_failed_course
|
|
if "attributes" in c and "entityId" in c["attributes"]:
|
|
print(c["attributes"]["entityId"])
|
|
return (c["attributes"]["entityId"], pathstyle(c))
|
|
else:
|
|
print("I couldn't recognize a class in that")
|
|
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
|
|
ooops.write(json.dumps(c,indent=2))
|
|
ooops.close()
|
|
num_failed_course = num_failed_course + 1
|
|
return ("-1", [])
|
|
|
|
|
|
def path_style_test():
|
|
classes = {}
|
|
oo = codecs.open("cache/courses/allclasspaths.txt","w","utf-8")
|
|
for f in os.listdir('cache/courses'):
|
|
if re.search('^classes_',f):
|
|
print(f)
|
|
cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read(),strict=False)
|
|
for c in cls:
|
|
id,output = single_course_path_parse(c)
|
|
classes[id] = "\n".join(output)
|
|
oo.write( classes[id] )
|
|
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
|
|
oo.flush()
|
|
|
|
def make_sl():
|
|
return SortedList(key=lambda x: -1 * int(x['m']))
|
|
|
|
def course_rank():
|
|
csvfile = codecs.open('cache/courses/all_courses_ranked.csv','w','utf-8')
|
|
csvwriter = csv.writer(csvfile)
|
|
csvwriter.writerow("code,cqcourseid,coursestatus,termineffect,dept,num,numoutcomes".split(","))
|
|
|
|
courses = json.loads(codecs.open('cache/courses/course_cq_index.json','r','utf-8').read())
|
|
all = defaultdict(make_sl)
|
|
for c in courses:
|
|
code = c['d']+c['n']
|
|
if not c['m']:
|
|
c['m'] = '200030'
|
|
all[code].add(c)
|
|
|
|
for k in sorted(all.keys()):
|
|
print("\n##",k)
|
|
print(json.dumps(list(all[k]),indent=2))
|
|
for version in all[k]:
|
|
csvwriter.writerow( [ version['d']+version['n'], version['c'], version['s'], version['m'], version['d'], version['n'], len(version['o']) ])
|
|
|
|
|
|
def de_classpaths():
|
|
outfile = codecs.open('cache/courses/all_de_classpaths.txt', 'w','utf-8')
|
|
areas = ['Distance Education/1/2/Justification/Need/Justification','/Distance Education/1/3/Content Presentation/<b>A. Methods of Instruction</b>/','/Distance Education/1/3/Content Presentation/<b>B. Instructional Materials and Resources:</b><br/>1. What materials and resources will you provide your students <b>in a virtual environment</b>?/','/Distance Education/4/Assessment/','/Distance Education/4/Methods of Instruction/','/Distance Education/1/3/Content Presentation/2. Have you assessed the use of high-quality open educational resources (OER) to help bridge the digital divide for students in the course? If so, please describe how you will be using them./','/Distance Education/4/Instructional Materials and Resources/','/Distance Education/1/3/Content Presentation/3. How will students be provided access to library materials and other learning resources <b>in a virtual environment</b>? (virtual reference librarian, research guides, digital content, etc.)/','/Distance Education/4/<b>How will students be provided access to library materials and what support will students be provided to help them locate and use these materials?</b><br/>Library and Other Learning Resources/','/Distance Education/1/3/Content Presentation/4. How will students access equitable student support services <b>in a virtual environment</b>? (tutoring, financial aid, counseling, etc.)/','/Distance Education/4/Accommodations for Students with Disabilities/','/6/Distance Education/4/Office Hours/','/Contact/Contact/Description/']
|
|
|
|
for area in areas:
|
|
with codecs.open('cache/courses/allclasspaths.txt', 'r','utf-8') as infile:
|
|
outfile.writelines(line for line in infile if area in line)
|
|
|
|
if __name__ == "__main__":
|
|
|
|
print ('')
|
|
options = { 1: ['fetch all courses', fetch_all_classes],
|
|
2: ['process all classes', path_style_test],
|
|
3: ['courses - path style to html catalog', course_path_style_2_html],
|
|
4: ['show course outcomes', all_outcomes],
|
|
5: ['courses - rank by all versions', course_rank],
|
|
6: ['extract de info from class paths', de_classpaths],
|
|
10: ['fetch all programs', fetch_all_programs],
|
|
11: ['process all programs', path_style_prog],
|
|
12: ['programs - path style to html catalog', path_style_2_html],
|
|
}
|
|
|
|
print ('')
|
|
|
|
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
|
|
resp = int(sys.argv[1])
|
|
print("\n\nPerforming: %s\n\n" % options[resp][0])
|
|
|
|
else:
|
|
print ('')
|
|
for key in options:
|
|
print(str(key) + '.\t' + options[key][0])
|
|
|
|
print('')
|
|
resp = input('Choose: ')
|
|
|
|
# Call the function in the options dict
|
|
options[ int(resp)][1]()
|
|
|