import requests,json,os,re, bisect, csv, codecs, funcy, sys, shutil, time
from datetime import datetime
import sortedcontainers as sc
from collections import defaultdict
from toolz.itertoolz import groupby,sliding_window
from sortedcontainers import SortedList
#from durable.lang import *
#from durable.engine import *
from pampy import match, _
from bs4 import BeautifulSoup as bs
leafcount = 0
displaynames = []
from canvas_secrets import cq_user, cq_pasw
from outcomes import quick_add_course_outcomes
from schedules import campus_dept_hierarchy
CQ_URL = "https://secure.curricunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc"
CQ_URL = "https://mws.services.curriqunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc"
PARAM = "?returnFormat=json&method=getCourses"
user = cq_user
pasw = cq_pasw
err_fail_filecount = 1
def fetch_all_programs():
if os.path.isdir('cache/programs'):
m = datetime.strptime(time.ctime(os.path.getctime('cache/programs')), "%a %b %d %H:%M:%S %Y")
today = 'cache/programs_%s' % m.strftime('%Y_%m_%d')
print("+ Creating folder: %s" % today)
shutil.move('cache/programs', today)
os.makedirs('cache/programs')
size = 100
endn = 0
filen = 1
PARAM = "?returnFormat=json&method=getPrograms&status=Active"
while(size > 99):
size, endn, items = another_request(CQ_URL+PARAM,endn)
out = codecs.open('cache/programs/programs_'+str(filen)+'.txt','w', 'utf-8')
out.write(json.dumps(items,indent=4))
out.close()
filen += 1
print("Written to 'cache/programs....")
def nothing(x=0):
pass
seen = []
def clean(st):
#return st
global seen
ok = ['b','i','ul','li','ol','strong','br','u']
soup = bs(st, features='lxml')
"""for tag in soup.recursiveChildGenerator():
if isinstance(tag,bs.Tag) and tag.name not in ok:
tag.unwrap()
return soup.prettify()
"""
for T in soup.find_all(recursive=True):
if not T.name in ok:
if not T.name in seen:
seen.append(T.name)
#print("- %s" % T.name)
#print(seen)
T.unwrap()
else:
#print("+ %s" % T.name)
pass
return str(soup).strip()
def recur_matcher(item, depth=0):
indent = depth * " "
my_result_lines = []
if type(item) == type({}):
if not match( item,
{'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }},
lambda title,status,typ,id:
my_result_lines.append("%s%s: %s (id %s) status: %s" % (indent, str(typ), str(title), str(id), str(status))) ,
{'attributes': {'displayName': _}, 'lookUpDisplay': _, },
lambda x,y: my_result_lines.append("%s%s: %s" % (indent, clean(str(x)), clean(str(y)))) ,
{'attributes': {'displayName': _}, 'fieldValue': _, },
lambda x,y: my_result_lines.append("%s%s: %s" % (indent, clean(str(x)), clean(str(y)))) ,
{'sectionName': _},
lambda x: my_result_lines.append("%sSection: %s" % (indent, str(x))) ,
_, nothing
):
for K,V in list(item.items()):
my_result_lines.extend(recur_matcher(V,depth+1))
elif type(item) == type([]):
for V in item:
my_result_lines.extend(recur_matcher(V,depth+1))
return my_result_lines
num_failed_course = 1
def single_course_parse(c):
global num_failed_course
this_course = []
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], recur_matcher(c))
else:
print("I couldn't recognize a class in that")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def match_style_test():
classes = {}
oo = codecs.open("cache/courses/curric2022test.json","w","utf-8")
for f in os.listdir('cache/courses'):
if re.search('classes_',f):
print(f)
cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read())
for c in cls:
id,output = single_course_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def single_program_path_parse(c):
this_course = []
global num_failed_course
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], pathstyle(c))
else:
print(f"I couldn't recognize a program in: {json.dumps(c,indent=2)}")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def path_style_prog():
classes = {}
oo = codecs.open("cache/programs/allprogrampaths.txt","w","utf-8")
for f in os.listdir('cache/programs'):
if re.search('^programs_',f):
print(f)
cls = json.loads(codecs.open('cache/programs/'+f,'r','utf-8').read())
for c in cls:
id,output = single_program_path_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def term_txt_to_code(t):
term_codes = {'Winter Intersession':'10','Spring':'30','Summer':'50','Fall':'70'}
m = re.search(r'(^.*)\s(\d\d\d+\d+)$', t)
if m:
yr = m.group(2)
sem = term_codes[m.group(1)]
return yr+sem
return ''
def all_outcomes():
csvfile = codecs.open('cache/courses/alloutcomes.csv','w','utf-8')
csvwriter = csv.writer(csvfile)
csvwriter.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' '))
csvfile2 = codecs.open('cache/courses/all_active_outcomes.csv','w','utf-8')
csvwriter2 = csv.writer(csvfile2)
csvwriter2.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' '))
rr = codecs.open("cache/courses/allclasspaths.txt","r", "utf-8").readlines()
ww = codecs.open("cache/courses/alloutcomes.txt","w", "utf-8")
course_index = []
current_course = {}
current_course_num = 0
term_counts = defaultdict(int)
count = 0
for L in rr:
a = re.search('Course\/(\d+)',L)
if a:
course_num = a.group(1)
#print(course_num, current_course_num)
if (course_num != current_course_num):
if current_course_num != 0:
# log the course info so we can know cq id numbers of courses
course_index.append(current_course)
# status
count += 1
#input('ok ')
if count % 100 == 0:
print(count)
#pass
current_course_num = course_num
#print(course_num)
current_course = {'c':'','d':'','n':'','t':'','s':'','T':'','o':[],'i':'','a':'','m':''}
current_course['c'] = course_num
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Discipline\/(.*)$',L)
if a:
current_course['d'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Number\/(.*)$',L)
if a:
current_course['n'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Title\/(.*)$',L)
if a:
current_course['T'] = a.group(2)
a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Short\ Title\/(.*)$',L)
if a:
current_course['t'] = a.group(2)
a = re.search('Course\ Description\/status\/(.*)$',L)
if a:
current_course['s'] = a.group(1)
a = re.search('Course\ Content\/\d+\/Lecture\ Content\/Curriculum\ Approval\ Date:\s*(.*)$',L)
if a:
current_course['a'] = a.group(1)
a = re.search('Course\ Description\/\d+\/Internal\ Processing\ Term\/(.*)$',L)
if a:
t_code = term_txt_to_code(a.group(1))
current_course['m'] = t_code
term_counts[t_code] += 1
# Course/10/10/Course Content/1/Lecture Content/Curriculum Approval Date: 02/24/2014
# Course/3091/1/Course Description/0/Internal Processing Term/Spring 2018
a = re.search('Learning\ Outcomes\/\d+\/(cqid_\d+)\/Learning\ Outcomes\/Description\/(.*)$',L)
if a:
current_course['o'].append(a.group(2))
current_course['i'] = a.group(1)
csvwriter.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)])
if current_course['s']=='Active':
csvwriter2.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)])
if re.search('Learning\ Outcomes\/Description\/',L):
ww.write(L)
if re.search('Description\/entityTitle\/',L):
ww.write(L)
if re.search('Description\/status\/',L):
ww.write(L)
xx = codecs.open("cache/courses/course_cq_index.json","w", "utf-8")
xx.write(json.dumps(course_index, indent=2))
#print(json.dumps(term_counts,indent=2))
def ddl():
return defaultdict(list)
def splitclassline(cl, id=''):
# "PHYS 4A - Physics for Scientists and Engineers I 4.000 *Active*"
dbg = 1
ret = {'name':'','units':'','units_hi':'','code':'','status':'', 'sequence':int(id)}
p1 = re.search(r'^(.*?)\s\-\s(.*)$',cl)
if p1:
code = p1.groups()[0]
ret['code'] = code
rest = p1.groups()[1]
p3 = re.search(r'^(.*)\s(\d+\.\d+)\s\-\s(\d+\.\d+)\s+\*(\w+)\*$',rest)
if p3:
name = p3.groups()[0]
units = p3.groups()[1]
units_hi = p3.groups()[2]
status = p3.groups()[3]
ret['name'] = name
ret['units'] = units
ret['units_hi'] = units_hi
ret['status'] = status
#if dbg: print( "%s --- code: %s - name: %s - units: %s-%s - status: %s" % (cl,code,name,units,units_hi,status))
return ret
p2 = re.search(r'^(.*)\s(\d+\.\d+)\s+\*(\w+)\*$',rest)
if p2:
name = p2.groups()[0]
units = p2.groups()[1]
status = p2.groups()[2]
ret['name'] = name
ret['units'] = units
ret['status'] = status
#if dbg: print( "%s --- code: %s - name: %s - units: %s - status: %s" % (cl,code,name,units,status))
return ret
else:
if dbg: print( "%s --- code: %s --------------------------------" % (cl,code))
else:
if dbg: print( "%s --- code:----------------------------------------" % cl)
#return (cl,'','')
return ret
def path_style_2_html():
verbose = 1
v = verbose
prog_title_subs = []
#with codecs.open('cache/program_published_names.csv', 'r','utf-8') as file:
# reader = csv.reader(file)
# for row in reader:
# prog_title_subs.append(row)
oo = codecs.open("cache/programs/allprogrampaths.txt","r","utf-8").readlines()
award_prebuild = defaultdict( ddl )
last_line = ""
for L in oo:
L = L.strip()
if not re.search(r'^Program',L):
last_line = last_line + " " + L
continue
else:
if re.search(r'\/$',last_line):
# ignore line with trailing slash - assume no data
last_line = L
continue
if re.search(r'Curriculum\sDivision\s\d+', last_line):
#print(last_line)
pass
test_1 = re.search(r'^Program\/(\d+)\/Course',last_line)
if test_1:
award_prebuild[ test_1.groups()[0] ]["Info"].append(last_line)
test_2 = re.search(r'^Program\/(\d+)\/(\d+)\/([\w\s]+)\/',last_line)
if test_2:
award_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line)
last_line = L
output = codecs.open("cache/programs/programs_prebuild.json","w","utf-8")
output.write( json.dumps(award_prebuild, indent=2) )
award_build = defaultdict( ddl )
for AW in sorted(list(award_prebuild.keys()),key=int):
v = 1
aw = award_prebuild[AW]
for line in aw["Program Description"]:
t1 = re.search(r'Division\/(.*)$', line)
if t1:
award_build[AW]["division"] = t1.groups()[0]
t1 = re.search(r'Department\/(.*)$', line)
if t1:
award_build[AW]["dept"] = t1.groups()[0]
t1 = re.search(r'Program\sTitle\/(.*)$', line)
if t1:
award_build[AW]["program_title"] = t1.groups()[0]
t1 = re.search(r'Award\sType\/(.*)$', line)
if t1:
award_build[AW]["award"] = t1.groups()[0]
t1 = re.search(r'\/Description\/(.*)$', line)
if t1:
award_build[AW]["description"] = t1.groups()[0]
t1 = re.search(r'Transfer\/CTE\/(.*)$', line)
if t1:
award_build[AW]["transfer_cte"] = t1.groups()[0]
t1 = re.search(r'CTE\sProgram\?\/\/(.*)$', line)
if t1:
award_build[AW]["is_cte"] = t1.groups()[0]
for line in aw["Info"]:
t1 = re.search(r'Description\/status\/(.*)$', line)
if t1:
award_build[AW]["status"] = t1.groups()[0]
t1 = re.search(r'Description\/proposalType\/(.*)$', line)
if t1:
award_build[AW]["proposal_type"] = t1.groups()[0]
for line in aw["Codes"]:
t1 = re.search(r'Banner\sCode\/(.*)$', line)
if t1:
award_build[AW]["banner_code"] = t1.groups()[0]
# substitute in program names more suitable for publishing
subbed = 0
for L in prog_title_subs:
if award_build[AW]["dept"] == L[0] and award_build[AW]["program_title"] == L[1]:
award_build[AW]["publish_title"] = L[2]
subbed = 1
if v: print("SUBBED")
if len(L)>3:
award_build[AW]["publish_title2"] = L[3]
else:
award_build[AW]["publish_title2"] = ""
if not subbed:
award_build[AW]["publish_title"] = award_build[AW]["dept"]
award_build[AW]["publish_title2"] = ""
if award_build[AW]["program_title"] == "Liberal Arts: Computer Science & Information Systems Emphasis":
award_build[AW]["publish_title"] = "Computer Science and Information Studies"
award_build[AW]["publish_title2"] = "Liberal Arts"
if v: print("-----LIB ART CSIS")
if v:
print("%s / %s - %s" % (award_build[AW]["publish_title"],award_build[AW]["program_title"], award_build[AW]["award"]))
v = 0
for line in aw["Program Learning Outcomes"]:
t1 = re.search(r'Program\sLearning\sOutcomes\/\d+\/Outcome\/(\d+)\/cqid_(\d+)\/Outcome\/Outcome\/(.*)$', line)
if t1:
if "PLO" in award_build[AW]:
award_build[AW]["PLO"].append( (t1.groups()[0], t1.groups()[2]) )
else:
award_build[AW]["PLO"] = [ (t1.groups()[0], t1.groups()[2]), ]
st = lambda x: x[0]
award_build[AW]["PLO"] = sorted( award_build[AW]["PLO"], key=st )
award_build[AW]["PLO"] = [ x[1] for x in award_build[AW]["PLO"] ]
req_prebuild = defaultdict(list)
pbd_unit_calcs = {}
# requirements table:
# - most types have a 'units' column, which might be calculated
# - might be overridden
# - might be single number or a range min/max
current_item_number = 0
for line in aw["Program Requirements"]:
t1 = re.search(r'Program\sBlock\sDefinitions\/(\d+)/cqid_\d+/Program\sBlock\sDefinitions\/(.*)$', line)
if t1:
pbd_number = t1.groups()[0]
if not pbd_number in pbd_unit_calcs:
pbd_unit_calcs[pbd_number] = {'unit_sum':0,'unit_sum_max':0,'override':0,'min':0,'max':0}
t2 = re.search(r'Requirements\/\d+\/Program\sBlock\sDefinitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Course\sBlock\sDefinition\/(.*)$', line)
if t2:
req_prebuild[pbd_number].append( ('h3', '0', t2.groups()[1]) )
current_item_number = 0
continue
t3 = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+\/Program\sCourses\/\d+\/\[Discipline\sand\sCourse\schained\scombo\]\/Course\/(.*)$',line)
if t3:
req_prebuild[pbd_number].append( ('course', t3.groups()[0], splitclassline( t3.groups()[1], t3.groups()[0] )) )
current_item_number = t3.groups()[0]
continue
t3a = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/or$',line)
if t3a:
req_prebuild[pbd_number].append( ('or', t3a.groups()[0]) )
current_item_number = t3a.groups()[0]
continue
t3b = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/and$',line)
if t3b:
req_prebuild[pbd_number].append( ('and', t3b.groups()[0]) )
current_item_number = t3b.groups()[0]
continue
t4 = re.search(r'Definitions\/(\d+)\/cqid_\d+/Program\sBlock\sDefinitions\/\d+\/Program\sCourses/(\d+)/cqid_\d+/Program\sCourses\/Non\-Course\sRequirements\/(.*)$',line)
if t4:
req_prebuild[pbd_number].append( ('noncourse', t4.groups()[1], t4.groups()[2]) )
current_item_number = t4.groups()[1]
continue
t5 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Override\sUnit\sCalculation\/1$',line)
if t5:
pbd_unit_calcs[pbd_number]['override'] = 1
continue
t6 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMin\/(.*)$',line)
if t6:
pbd_unit_calcs[pbd_number]['min'] = t6.groups()[1]
continue
t7 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMax/(.*)$',line)
if t7:
pbd_unit_calcs[pbd_number]['max'] = t7.groups()[1]
continue
t8 = re.search(r'chained\scombo\]\/Discipline',line)
if t8:
continue
t8a = re.search(r'Units\s[Low|High]',line)
if t8a:
continue
t9 = re.search(r'Definitions\/Block\sHeader\/(.*)$',line)
if t9:
req_prebuild[pbd_number].append( ('blockheader', 0.1, t9.groups()[0]) )
continue
req_prebuild[pbd_number].append( ('unknown', current_item_number, t1.groups()[1]) )
award_build[AW]["requirements"] = req_prebuild
award_build[AW]["unit_calcs"] = pbd_unit_calcs
# associate unit calculations with program blocks
for block_key in req_prebuild.keys():
if block_key in pbd_unit_calcs:
req_prebuild[block_key].insert(0, pbd_unit_calcs[block_key])
else:
req_prebuild[block_key].insert(0, {'unit_sum':0,'unit_sum_max':0,'override':0})
# do the unit calc math
for block_key in req_prebuild.keys():
this_block = req_prebuild[block_key]
pad = this_block[0]
if v: print("pad: ",pad)
block_dict = {}
for item in this_block[1:]:
print(item)
try:
if item[0] == "or":
block_dict[ item[1]+"or" ] = 1
if item[0] == "h3":
if v: print("+ ", item[1])
if item[0] == "blockheader":
if v: print(" ", item[1])
if not item[0] == "course":
continue
block_dict[ item[1] ] = item[2]
seq = int(item[1])
units = ''
if item[2]['units']: units = float( item[2]['units'] )
except Exception as e:
print("ERROR ERROR\nERROR ERROR")
print(e)
xyz = input('hit return to continue')
#print( "%i \t %f \t %s" % (seq,units, item[2]['name']))
if v:
for k in sorted( block_dict.keys() ):
print(k," ", block_dict[k])
#for k in sliding_window(3, sorted( block_dict.keys() )):
# l,m,n = k
# if re.search(r'or$',m):
# print("OR")
# print(block_dict[l],"\n",block_dict[m],"\n",block_dict[n],"\n\n")
#print()
output = codecs.open("cache/programs/programs_built.json","w","utf-8")
output.write( json.dumps(award_build, indent=2) )
def course_path_style_2_html():
verbose = 1
v = verbose
dbg = codecs.open('cache/courses/debugout.txt','w','utf-8')
oo = codecs.open("cache/courses/allclasspaths.txt","r","utf-8").readlines()
course_prebuild = defaultdict( ddl )
last_line = ""
for L in oo:
L = L.strip()
if not re.search(r'^Course',L):
last_line = last_line + "
" + L
continue
else:
if re.search(r'\/$',last_line):
# ignore line with trailing slash - assume no data
last_line = L
continue
test_1 = re.search(r'^Course\/(\d+)\/Course',last_line)
if test_1:
course_prebuild[ test_1.groups()[0] ]["Info"].append(last_line)
test_2 = re.search(r'^Course\/(\d+)\/(\d+)\/(.*?)\/(.*)$',last_line)
if test_2:
course_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line)
last_line = L
output = codecs.open("cache/courses/courses_prebuild.json","w","utf-8")
output.write( json.dumps(course_prebuild, indent=2) )
all_courses = {}
active_courses = {}
lookup_table = { 'entityTitle':'title', 'proposalType':'type',
'\/Course\sDescription\/status':'status', 'Course\sDiscipline':'dept',
'Course\sNumber':'number', 'Course\sTitle':'name', 'Course Description\/\d\/Justification':'justification',
'Short\sTitle':'shortname', 'Course Description\/\d\/Internal\sProcessing\sTerm':'term', 'This\sCourse\sIs\sDegree\sApplicable':'degree_applicable',
'\/Course\sDescription\/\d+\/Course\sDescription\/':'desc',
'Minimum\sUnits':'min_units', 'Minimum\sLecture\sHour':'min_lec_hour', 'Minimum\sLab\sHour':'min_lab_hour', 'Course\shas\svariable\shours':'has_var_hours',
'Number\sWeeks':'weeks',
'Maximum\sUnits':'max_units', 'Credit\sStatus':'credit_status',
'TOP\sCode':'top_code', 'Classification':'classification', 'Non\sCredit\sCategory':'noncredit_category', 'Stand-Alone\sClass?':'stand_alone',
'Grade\sOption':'grade_option', 'Is\sRepeatable':'repeatable', 'Learning\sOutcomes\/Description':'slo',
'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sState\sUniversities\sand\sColleges?':'transfer_csu',
'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sUniversity\sof\sCalifornia?':'transfer_uc',
'\/Catalog\sCourse\sSummary\sView\/':'catalog',
'\/Course\sContent/\d+/Lecture\sContent\/':'content',
'\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/':'objectives'}
for C in sorted(list(course_prebuild.keys()),key=int):
v = 0
crs = course_prebuild[C]
course_build = {'slo':{}} # defaultdict( ddl )
if v: print(C)
dbg.write(f"{C}\n")
for K in crs.keys():
if v: print("\t%s" % K)
for line in crs[K]:
for (str,key) in lookup_table.items():
if re.search(str,line):
if key == 'slo':
# \s
\s
content_search = re.search(r'\/Learning\sOutcomes\/\d+\/cqid_(\d+)\/Learning\sOutcomes\/Description\/(.*?)$',line)
if content_search: course_build['slo'][content_search.groups()[0]] = content_search.groups()[1]
else:
print("NO SLO? %s" % line)
elif key == 'desc':
content_search = re.search(r'^Course\/\d+\/\d+\/Course\sDescription\/\d+\/Course\sDescription\/(.*)$',line)
course_build['desc'] = content_search.groups()[0]
elif key == 'catalog':
content_search = re.search(r'^Course\/\d+\/\d+\/General\sEducation\sPattern\/\d+\/Catalog\sCourse\sSummary\sView\/(.*)$',line)
course_build['catalog'] = content_search.groups()[0]
elif key == 'content':
content_search = re.search(r'^Course\/\d+\/\d+\/Course\sContent\/\d+\/Lecture\sContent\/(.*)$',line)
course_build['content'] = content_search.groups()[0]
elif key == 'objectives':
content_search = re.search(r'^Course\/\d+\/\d+\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/(.*)$',line)
course_build['objectives'] = content_search.groups()[0]
else:
content_search = re.search(r'^(.*)\/(.*?)$',line)
course_build[key] = content_search.groups()[1]
dbg.write(f"{key} => {content_search.groups()[1]}\n")
if v: print("\t\t%s - %s" % (key, course_build[key]))
continue
all_courses[C] = course_build
if course_build['status'] == 'Active':
active_courses[C] = course_build
output = codecs.open("cache/courses/courses_built.json","w","utf-8")
output.write( json.dumps(all_courses, indent=2) )
output2 = codecs.open("cache/courses/courses_active_built.json","w","utf-8")
output2.write( json.dumps(active_courses, indent=2) )
#########
#########
#########
#########
def another_request(url,startat):
global err_fail_filecount
newparam = "&skip=" + str(startat)
print((url+newparam))
r = requests.get(url+newparam, auth=(user,pasw))
#print(r.text)
try:
mydata = json.loads(r.text, strict=False)
except Exception as e:
print("Couldn't read that last bit")
#print((r.text))
codecs.open('cache/curric2022failfile_%i.txt' % err_fail_filecount,'w','utf-8').write(r.text)
err_fail_filecount += 1
print(e)
return 0,0,[]
size = mydata['resultSetMetadata']['ResultSetSize']
endn = mydata['resultSetMetadata']['EndResultNum']
items = mydata['entityInstances']
print((' Got ' + str(size) + ' instances, ending at item number ' + str(endn)))
return size,endn,items
def fetch_all_classes():
if os.path.isdir('cache/courses'):
m = datetime.strptime(time.ctime(os.path.getctime('cache/courses')), "%a %b %d %H:%M:%S %Y")
today = 'cache/courses_%s' % m.strftime('%Y_%m_%d')
print("+ Creating folder: %s" % today)
shutil.move('cache/courses', today)
os.makedirs('cache/courses')
size = 100
endn = 0
filen = 1
while(size > 99):
size, endn, items = another_request(CQ_URL+PARAM,endn)
out = codecs.open('cache/courses/classes_'+str(filen)+'.txt','w', 'utf-8')
out.write(json.dumps(items,indent=2))
out.close()
filen += 1
print("Written to 'cache/courses....")
#
#
# Main worker
#
def recur_path_matcher(item, path=[]):
def x2_path_update(x,y,z):
path.extend([str(y),x])
my_result_lines.append( '/'.join(path) + '/' + 'lastEdited' + '/' + z)
path_str = "/".join(path) + "/"
path_str = re.sub('\/+','/',path_str)
path_str = re.sub('\s+',' ',path_str)
my_result_lines = []
if type(item) == type({}):
original_path = path.copy()
match( item,
{'attributes': {'displayName': _}, 'lookUpDisplay': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'attributes': {'displayName': _}, 'fieldValue': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'attributes': {'fieldName': _}, 'fieldValue': _, },
lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) ,
{'instanceId':_, 'sectionName': _, 'sectionSortOrder':_},
lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]),
{'instanceId':_, 'sectionName': _, 'instanceSortOrder':_},
lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]),
{'sectionName': _, 'sectionSortOrder':_, 'lastUpdated': _ },
#lambda x,y,z: path.extend([str(y),x,z]),
x2_path_update,
{'sectionName': _, 'sectionSortOrder':_},
lambda x,y: path.extend([str(y),x]),
{'sectionName': _},
lambda x: path.append(x),
_, nothing #lambda x: path.append('')
)
path = original_path
for K,V in list(item.items()):
my_result_lines.extend(recur_path_matcher(V,path))
elif type(item) == type([]):
for V in item:
my_result_lines.extend(recur_path_matcher(V,path))
return my_result_lines
def pathstyle(theclass):
#theclass = json.loads( codecs.open('cache/courses/samplecourse.json','r','utf-8').read() )
# {'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }},
# lambda title,status,typ,id:
# my_result_lines.append("%s%s/%s/%s [%s]" % (path_str, str(typ), str(id), str(title),str(status))) ,
if "entityMetadata" in theclass:
id = theclass["entityMetadata"]["entityId"]
title = theclass["entityMetadata"]["entityTitle"]
typ = theclass["entityMetadata"]["entityType"]
action = theclass["entityMetadata"]["proposalType"]
status = theclass["entityMetadata"]["status"]
#"entityId": 4077,
#"entityTitle": "ENGL2B - American Ethnic Literature",
#"entityType": "Course",
#"proposalType": "Deactivate Course",
#"status": "Historical",
result = [ "/".join([ typ,str(id),"Course Description","entityTitle",title]) ,
"/".join([ typ,str(id),"Course Description","entityType",typ]) ,
"/".join([ typ,str(id),"Course Description","proposalType",action]) ,
"/".join([ typ,str(id),"Course Description","status",status]) , ]
result.extend(recur_path_matcher(theclass["entityFormData"]["rootSections"], [typ,str(id)] ))
#oo = codecs.open("cache/courses/curric2022test_path.json","w","utf-8")
#print(result)
return result
else:
print("didn't seem to be a class.")
def single_course_path_parse(c):
this_course = []
global num_failed_course
if "attributes" in c and "entityId" in c["attributes"]:
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], pathstyle(c))
else:
print("I couldn't recognize a class in that")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
num_failed_course = num_failed_course + 1
return ("-1", [])
def path_style_test():
classes = {}
oo = codecs.open("cache/courses/allclasspaths.txt","w","utf-8")
for f in os.listdir('cache/courses'):
if re.search('^classes_',f):
print(f)
cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read(),strict=False)
for c in cls:
id,output = single_course_path_parse(c)
classes[id] = "\n".join(output)
oo.write( classes[id] )
oo.write( "\n\n\n" + "-"*30 + "\n\n" )
oo.flush()
def make_sl():
return SortedList(key=lambda x: -1 * int(x['m']))
def course_rank():
csvfile = codecs.open('cache/courses/all_courses_ranked.csv','w','utf-8')
csvwriter = csv.writer(csvfile)
csvwriter.writerow("code,cqcourseid,coursestatus,termineffect,dept,num,numoutcomes".split(","))
courses = json.loads(codecs.open('cache/courses/course_cq_index.json','r','utf-8').read())
all = defaultdict(make_sl)
for c in courses:
code = c['d']+c['n']
if not c['m']:
c['m'] = '200030'
all[code].add(c)
for k in sorted(all.keys()):
#print("\n##",k)
#print(json.dumps(list(all[k]),indent=2))
for version in all[k]:
csvwriter.writerow( [ version['d']+version['n'], version['c'], version['s'], version['m'], version['d'], version['n'], len(version['o']) ])
def de_classpaths():
outfile = codecs.open('cache/courses/all_de_classpaths.txt', 'w','utf-8')
areas = ['Distance Education/1/2/Justification/Need/Justification','/Distance Education/1/3/Content Presentation/A. Methods of Instruction/','/Distance Education/1/3/Content Presentation/B. Instructional Materials and Resources:
1. What materials and resources will you provide your students in a virtual environment?/','/Distance Education/4/Assessment/','/Distance Education/4/Methods of Instruction/','/Distance Education/1/3/Content Presentation/2. Have you assessed the use of high-quality open educational resources (OER) to help bridge the digital divide for students in the course? If so, please describe how you will be using them./','/Distance Education/4/Instructional Materials and Resources/','/Distance Education/1/3/Content Presentation/3. How will students be provided access to library materials and other learning resources in a virtual environment? (virtual reference librarian, research guides, digital content, etc.)/','/Distance Education/4/How will students be provided access to library materials and what support will students be provided to help them locate and use these materials?
Library and Other Learning Resources/','/Distance Education/1/3/Content Presentation/4. How will students access equitable student support services in a virtual environment? (tutoring, financial aid, counseling, etc.)/','/Distance Education/4/Accommodations for Students with Disabilities/','/6/Distance Education/4/Office Hours/','/Contact/Contact/Description/']
i = 0
for area in areas:
with codecs.open('cache/courses/allclasspaths.txt', 'r','utf-8') as infile:
outfile.writelines(line for line in infile if area in line)
i += 1
if i % 1000 == 0: print(i)
from semesters import human_to_sis, get_previous_season
#from pipelines import area, areas
def extract_digits(input_string):
"""
Removes all non-digit characters from the input string and returns an integer.
:param input_string: The string to process.
:return: An integer containing only the digits from the input string.
"""
#return input_string
digits_only = ''.join(char for char in input_string if char.isdigit())
return int(digits_only) if digits_only else 0
def filter_classes(): # for removing deactivated classes
json_file_path = 'cache/courses/courses_built.json'
output_csv_path = 'cache/courses/active_courses.txt'
all_courses = []
with open(json_file_path, 'r') as json_file:
data = json.load(json_file)
for i,C in data.items():
term = ''
try:
term = C['term']
except:
print(f"** {i} {C['dept']} {C['number']} is missing term")
term = ''
shortname = ''
try:
shortname = C['shortname']
except:
shortname = C['name']
print(f"** {i} {C['dept']} {C['number']} is missing shortname")
all_courses.append(f"{C['dept']} {C['number']} {shortname} \t {C['status']} {C['type']} \t{term} - {i}")
all_courses.sort()
for C in all_courses: print(C)
def slo_summary_report(): # for scheduling slo assessment
json_file_path = 'cache/courses/courses_built.json'
output_csv_path = 'cache/courses/courses_slo_schedule.csv'
term_csv_file_path = 'cache/courses/slo_schedule.csv'
(gp, course_to_area, areacode_to_area, area_to_dean, dean, dean_code_to_name) = campus_dept_hierarchy()
with open(json_file_path, 'r') as json_file:
data = json.load(json_file)
# Extract course information
courses = []
term_courses = []
for key, course in data.items():
try:
#print(f"{course['dept']} - -" )
re_code_course = {
"key": key,
"type": course.get("type", ""),
"status": course.get("status", ""),
"dept": course.get("dept", ""),
"number": course.get("number", ""),
"name": course.get("name", ""),
"first_active_term": course.get("term", ""),
'first_active_term_code': human_to_sis(course.get('term', '')),
"reviewing_term": get_previous_season(course.get("term","")),
"reviewing_term_code": human_to_sis(get_previous_season(course.get('term', ''))),
"area": areacode_to_area[ course_to_area[course.get("dept", "").upper()] ]
}
courses.append(re_code_course)
if course["status"] in ["Active", "In Review"] and course["type"] != "Deactivate Course":
term_courses.append(re_code_course)
except Exception as e:
print(f"error on course: {course['dept']} {course['number']} {course['name']}")
# Sort by dept, number, and term
courses.sort(key=lambda x: (x["dept"], extract_digits(x["number"]), x["reviewing_term_code"]))
term_courses.sort(key=lambda x: (x["reviewing_term_code"],x["dept"], extract_digits(x["number"])))
# Write to CSV
fieldnames = ["dept", "number", "reviewing_term", "reviewing_term_code", "status", "key", "type", "name", "first_active_term", "first_active_term_code","area"]
with open(output_csv_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(courses)
with open(term_csv_file_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(term_courses)
print(f"CSV file '{output_csv_path}' has been created.")
if __name__ == "__main__":
print ('')
options = { 1: ['fetch all courses', fetch_all_classes],
2: ['process all classes', path_style_test],
3: ['courses - path style to json and html catalog', course_path_style_2_html],
4: ['show course outcomes', all_outcomes],
5: ['courses - rank by all versions', course_rank],
6: ['extract de info from class paths', de_classpaths],
7: ['build schedule or summary for SLO planning', slo_summary_report],
8: ['remove deactivated courses', filter_classes],
10: ['fetch all programs', fetch_all_programs],
11: ['process all programs', path_style_prog],
12: ['programs - path style to html catalog', path_style_2_html],
}
print ('')
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
resp = int(sys.argv[1])
print("\n\nPerforming: %s\n\n" % options[resp][0])
else:
print ('')
for key in options:
print(str(key) + '.\t' + options[key][0])
print('')
resp = input('Choose: ')
# Call the function in the options dict
options[ int(resp)][1]()