import util import requests,json,os,re, bisect, csv, codecs, funcy, sys, shutil, time from datetime import datetime import sortedcontainers as sc from collections import defaultdict from toolz.itertoolz import groupby,sliding_window from sortedcontainers import SortedList from pampy import match, _ from bs4 import BeautifulSoup as bs leafcount = 0 displaynames = [] from canvas_secrets import cq_user, cq_pasw from outcomes import quick_add_course_outcomes from schedules import campus_dept_hierarchy CQ_URL = "https://secure.curricunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc" CQ_URL = "https://mws.services.curriqunet.com/scripts/webservices/generic_meta/clients/versions/v4/gavilan.cfc" PARAM = "?returnFormat=json&method=getCourses" user = cq_user pasw = cq_pasw err_fail_filecount = 1 def fetch_all_programs(): if os.path.isdir('cache/programs'): m = datetime.strptime(time.ctime(os.path.getctime('cache/programs')), "%a %b %d %H:%M:%S %Y") today = 'cache/programs_%s' % m.strftime('%Y_%m_%d') print("+ Creating folder: %s" % today) shutil.move('cache/programs', today) os.makedirs('cache/programs') size = 100 endn = 0 filen = 1 PARAM = "?returnFormat=json&method=getPrograms&status=Active" while(size > 99): size, endn, items = another_request(CQ_URL+PARAM,endn) out = codecs.open('cache/programs/programs_'+str(filen)+'.txt','w', 'utf-8') out.write(json.dumps(items,indent=4)) out.close() filen += 1 print("Written to 'cache/programs....") def nothing(x=0): pass seen = [] def clean(st): #return st global seen ok = ['b','i','ul','li','ol','strong','br','u'] soup = bs(st, features='lxml') """for tag in soup.recursiveChildGenerator(): if isinstance(tag,bs.Tag) and tag.name not in ok: tag.unwrap() return soup.prettify() """ for T in soup.find_all(recursive=True): if not T.name in ok: if not T.name in seen: seen.append(T.name) #print("- %s" % T.name) #print(seen) T.unwrap() else: #print("+ %s" % T.name) pass return str(soup).strip() def recur_matcher(item, depth=0): indent = depth * " " my_result_lines = [] if type(item) == type({}): if not match( item, {'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }}, lambda title,status,typ,id: my_result_lines.append("%s%s: %s (id %s) status: %s" % (indent, str(typ), str(title), str(id), str(status))) , {'attributes': {'displayName': _}, 'lookUpDisplay': _, }, lambda x,y: my_result_lines.append("%s%s: %s" % (indent, clean(str(x)), clean(str(y)))) , {'attributes': {'displayName': _}, 'fieldValue': _, }, lambda x,y: my_result_lines.append("%s%s: %s" % (indent, clean(str(x)), clean(str(y)))) , {'sectionName': _}, lambda x: my_result_lines.append("%sSection: %s" % (indent, str(x))) , _, nothing ): for K,V in list(item.items()): my_result_lines.extend(recur_matcher(V,depth+1)) elif type(item) == type([]): for V in item: my_result_lines.extend(recur_matcher(V,depth+1)) return my_result_lines num_failed_course = 1 def single_course_parse(c): global num_failed_course this_course = [] if "attributes" in c and "entityId" in c["attributes"]: print(c["attributes"]["entityId"]) return (c["attributes"]["entityId"], recur_matcher(c)) else: print("I couldn't recognize a class in that") ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8') ooops.write(json.dumps(c,indent=2)) ooops.close() num_failed_course = num_failed_course + 1 return ("-1", []) def match_style_test(): classes = {} oo = codecs.open("cache/courses/curric2022test.json","w","utf-8") for f in os.listdir('cache/courses'): if re.search('classes_',f): print(f) cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read()) for c in cls: id,output = single_course_parse(c) classes[id] = "\n".join(output) oo.write( classes[id] ) oo.write( "\n\n\n" + "-"*30 + "\n\n" ) oo.flush() def single_program_path_parse(c): this_course = [] global num_failed_course if "attributes" in c and "entityId" in c["attributes"]: print(c["attributes"]["entityId"]) return (c["attributes"]["entityId"], pathstyle(c)) else: print(f"I couldn't recognize a program in: {json.dumps(c,indent=2)}") ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8') ooops.write(json.dumps(c,indent=2)) ooops.close() num_failed_course = num_failed_course + 1 return ("-1", []) def path_style_prog(): classes = {} oo = codecs.open("cache/programs/allprogrampaths.txt","w","utf-8") for f in os.listdir('cache/programs'): if re.search('^programs_',f): print(f) cls = json.loads(codecs.open('cache/programs/'+f,'r','utf-8').read()) for c in cls: id,output = single_program_path_parse(c) classes[id] = "\n".join(output) oo.write( classes[id] ) oo.write( "\n\n\n" + "-"*30 + "\n\n" ) oo.flush() def term_txt_to_code(t): term_codes = {'Winter Intersession':'10','Spring':'30','Summer':'50','Fall':'70'} m = re.search(r'(^.*)\s(\d\d\d+\d+)$', t) if m: yr = m.group(2) sem = term_codes[m.group(1)] return yr+sem return '' def all_outcomes(): csvfile = codecs.open('cache/courses/alloutcomes.csv','w','utf-8') csvwriter = csv.writer(csvfile) csvwriter.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' ')) csvfile2 = codecs.open('cache/courses/all_active_outcomes.csv','w','utf-8') csvwriter2 = csv.writer(csvfile2) csvwriter2.writerow('code cqcourseid coursestatus termineffect dept num cqoutcomeid outcome'.split(' ')) rr = codecs.open("cache/courses/allclasspaths.txt","r", "utf-8").readlines() ww = codecs.open("cache/courses/alloutcomes.txt","w", "utf-8") course_index = [] current_course = {} current_course_num = 0 term_counts = defaultdict(int) count = 0 for L in rr: a = re.search('Course\/(\d+)',L) if a: course_num = a.group(1) #print(course_num, current_course_num) if (course_num != current_course_num): if current_course_num != 0: # log the course info so we can know cq id numbers of courses course_index.append(current_course) # status count += 1 #input('ok ') if count % 100 == 0: print(count) #pass current_course_num = course_num #print(course_num) current_course = {'c':'','d':'','n':'','t':'','s':'','T':'','o':[],'i':'','a':'','m':''} current_course['c'] = course_num a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Discipline\/(.*)$',L) if a: current_course['d'] = a.group(2) a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Number\/(.*)$',L) if a: current_course['n'] = a.group(2) a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Course\ Title\/(.*)$',L) if a: current_course['T'] = a.group(2) a = re.search('Course\/(\d+)\/1\/Course\ Description\/0\/Short\ Title\/(.*)$',L) if a: current_course['t'] = a.group(2) a = re.search('Course\ Description\/status\/(.*)$',L) if a: current_course['s'] = a.group(1) a = re.search('Course\ Content\/\d+\/Lecture\ Content\/Curriculum\ Approval\ Date:\s*(.*)$',L) if a: current_course['a'] = a.group(1) a = re.search('Course\ Description\/\d+\/Internal\ Processing\ Term\/(.*)$',L) if a: t_code = term_txt_to_code(a.group(1)) current_course['m'] = t_code term_counts[t_code] += 1 # Course/10/10/Course Content/1/Lecture Content/Curriculum Approval Date: 02/24/2014 # Course/3091/1/Course Description/0/Internal Processing Term/Spring 2018 a = re.search('Learning\ Outcomes\/\d+\/(cqid_\d+)\/Learning\ Outcomes\/Description\/(.*)$',L) if a: current_course['o'].append(a.group(2)) current_course['i'] = a.group(1) csvwriter.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)]) if current_course['s']=='Active': csvwriter2.writerow([current_course['d']+current_course['n'], current_course_num, current_course['s'], current_course['m'], current_course['d'], current_course['n'], current_course['i'], a.group(2)]) if re.search('Learning\ Outcomes\/Description\/',L): ww.write(L) if re.search('Description\/entityTitle\/',L): ww.write(L) if re.search('Description\/status\/',L): ww.write(L) xx = codecs.open("cache/courses/course_cq_index.json","w", "utf-8") xx.write(json.dumps(course_index, indent=2)) #print(json.dumps(term_counts,indent=2)) def ddl(): return defaultdict(list) def splitclassline(cl, id=''): # "PHYS 4A - Physics for Scientists and Engineers I 4.000 *Active*" dbg = 1 ret = {'name':'','units':'','units_hi':'','code':'','status':'', 'sequence':int(id)} p1 = re.search(r'^(.*?)\s\-\s(.*)$',cl) if p1: code = p1.groups()[0] ret['code'] = code rest = p1.groups()[1] p3 = re.search(r'^(.*)\s(\d+\.\d+)\s\-\s(\d+\.\d+)\s+\*(\w+)\*$',rest) if p3: name = p3.groups()[0] units = p3.groups()[1] units_hi = p3.groups()[2] status = p3.groups()[3] ret['name'] = name ret['units'] = units ret['units_hi'] = units_hi ret['status'] = status #if dbg: print( "%s --- code: %s - name: %s - units: %s-%s - status: %s" % (cl,code,name,units,units_hi,status)) return ret p2 = re.search(r'^(.*)\s(\d+\.\d+)\s+\*(\w+)\*$',rest) if p2: name = p2.groups()[0] units = p2.groups()[1] status = p2.groups()[2] ret['name'] = name ret['units'] = units ret['status'] = status #if dbg: print( "%s --- code: %s - name: %s - units: %s - status: %s" % (cl,code,name,units,status)) return ret else: if dbg: print( "%s --- code: %s --------------------------------" % (cl,code)) else: if dbg: print( "%s --- code:----------------------------------------" % cl) #return (cl,'','') return ret def path_style_2_html(): verbose = 1 v = verbose prog_title_subs = [] #with codecs.open('cache/program_published_names.csv', 'r','utf-8') as file: # reader = csv.reader(file) # for row in reader: # prog_title_subs.append(row) oo = codecs.open("cache/programs/allprogrampaths.txt","r","utf-8").readlines() award_prebuild = defaultdict( ddl ) last_line = "" for L in oo: L = L.strip() if not re.search(r'^Program',L): last_line = last_line + " " + L continue else: if re.search(r'\/$',last_line): # ignore line with trailing slash - assume no data last_line = L continue if re.search(r'Curriculum\sDivision\s\d+', last_line): #print(last_line) pass test_1 = re.search(r'^Program\/(\d+)\/Course',last_line) if test_1: award_prebuild[ test_1.groups()[0] ]["Info"].append(last_line) test_2 = re.search(r'^Program\/(\d+)\/(\d+)\/([\w\s]+)\/',last_line) if test_2: award_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line) last_line = L output = codecs.open("cache/programs/programs_prebuild.json","w","utf-8") output.write( json.dumps(award_prebuild, indent=2) ) award_build = defaultdict( ddl ) for AW in sorted(list(award_prebuild.keys()),key=int): v = 1 aw = award_prebuild[AW] for line in aw["Program Description"]: t1 = re.search(r'Division\/(.*)$', line) if t1: award_build[AW]["division"] = t1.groups()[0] t1 = re.search(r'Department\/(.*)$', line) if t1: award_build[AW]["dept"] = t1.groups()[0] t1 = re.search(r'Program\sTitle\/(.*)$', line) if t1: award_build[AW]["program_title"] = t1.groups()[0] t1 = re.search(r'Award\sType\/(.*)$', line) if t1: award_build[AW]["award"] = t1.groups()[0] t1 = re.search(r'\/Description\/(.*)$', line) if t1: award_build[AW]["description"] = t1.groups()[0] t1 = re.search(r'Transfer\/CTE\/(.*)$', line) if t1: award_build[AW]["transfer_cte"] = t1.groups()[0] t1 = re.search(r'CTE\sProgram\?\/\/(.*)$', line) if t1: award_build[AW]["is_cte"] = t1.groups()[0] for line in aw["Info"]: t1 = re.search(r'Description\/status\/(.*)$', line) if t1: award_build[AW]["status"] = t1.groups()[0] t1 = re.search(r'Description\/proposalType\/(.*)$', line) if t1: award_build[AW]["proposal_type"] = t1.groups()[0] for line in aw["Codes"]: t1 = re.search(r'Banner\sCode\/(.*)$', line) if t1: award_build[AW]["banner_code"] = t1.groups()[0] # substitute in program names more suitable for publishing subbed = 0 for L in prog_title_subs: if award_build[AW]["dept"] == L[0] and award_build[AW]["program_title"] == L[1]: award_build[AW]["publish_title"] = L[2] subbed = 1 if v: print("SUBBED") if len(L)>3: award_build[AW]["publish_title2"] = L[3] else: award_build[AW]["publish_title2"] = "" if not subbed: award_build[AW]["publish_title"] = award_build[AW]["dept"] award_build[AW]["publish_title2"] = "" if award_build[AW]["program_title"] == "Liberal Arts: Computer Science & Information Systems Emphasis": award_build[AW]["publish_title"] = "Computer Science and Information Studies" award_build[AW]["publish_title2"] = "Liberal Arts" if v: print("-----LIB ART CSIS") if v: print("%s / %s - %s" % (award_build[AW]["publish_title"],award_build[AW]["program_title"], award_build[AW]["award"])) v = 0 for line in aw["Program Learning Outcomes"]: t1 = re.search(r'Program\sLearning\sOutcomes\/\d+\/Outcome\/(\d+)\/cqid_(\d+)\/Outcome\/Outcome\/(.*)$', line) if t1: if "PLO" in award_build[AW]: award_build[AW]["PLO"].append( (t1.groups()[0], t1.groups()[2]) ) else: award_build[AW]["PLO"] = [ (t1.groups()[0], t1.groups()[2]), ] st = lambda x: x[0] award_build[AW]["PLO"] = sorted( award_build[AW]["PLO"], key=st ) award_build[AW]["PLO"] = [ x[1] for x in award_build[AW]["PLO"] ] req_prebuild = defaultdict(list) pbd_unit_calcs = {} # requirements table: # - most types have a 'units' column, which might be calculated # - might be overridden # - might be single number or a range min/max current_item_number = 0 for line in aw["Program Requirements"]: t1 = re.search(r'Program\sBlock\sDefinitions\/(\d+)/cqid_\d+/Program\sBlock\sDefinitions\/(.*)$', line) if t1: pbd_number = t1.groups()[0] if not pbd_number in pbd_unit_calcs: pbd_unit_calcs[pbd_number] = {'unit_sum':0,'unit_sum_max':0,'override':0,'min':0,'max':0} t2 = re.search(r'Requirements\/\d+\/Program\sBlock\sDefinitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Course\sBlock\sDefinition\/(.*)$', line) if t2: req_prebuild[pbd_number].append( ('h3', '0', t2.groups()[1]) ) current_item_number = 0 continue t3 = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+\/Program\sCourses\/\d+\/\[Discipline\sand\sCourse\schained\scombo\]\/Course\/(.*)$',line) if t3: req_prebuild[pbd_number].append( ('course', t3.groups()[0], splitclassline( t3.groups()[1], t3.groups()[0] )) ) current_item_number = t3.groups()[0] continue t3a = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/or$',line) if t3a: req_prebuild[pbd_number].append( ('or', t3a.groups()[0]) ) current_item_number = t3a.groups()[0] continue t3b = re.search(r'Definitions\/\d+\/Program\sCourses\/(\d+)\/cqid_\d+/Program\sCourses\/\d+\/\[Condition\sSection\]\/Condition\/and$',line) if t3b: req_prebuild[pbd_number].append( ('and', t3b.groups()[0]) ) current_item_number = t3b.groups()[0] continue t4 = re.search(r'Definitions\/(\d+)\/cqid_\d+/Program\sBlock\sDefinitions\/\d+\/Program\sCourses/(\d+)/cqid_\d+/Program\sCourses\/Non\-Course\sRequirements\/(.*)$',line) if t4: req_prebuild[pbd_number].append( ('noncourse', t4.groups()[1], t4.groups()[2]) ) current_item_number = t4.groups()[1] continue t5 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Override\sUnit\sCalculation\/1$',line) if t5: pbd_unit_calcs[pbd_number]['override'] = 1 continue t6 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMin\/(.*)$',line) if t6: pbd_unit_calcs[pbd_number]['min'] = t6.groups()[1] continue t7 = re.search(r'Definitions\/(\d+)\/cqid_\d+\/Program\sBlock\sDefinitions\/Unit\sMax/(.*)$',line) if t7: pbd_unit_calcs[pbd_number]['max'] = t7.groups()[1] continue t8 = re.search(r'chained\scombo\]\/Discipline',line) if t8: continue t8a = re.search(r'Units\s[Low|High]',line) if t8a: continue t9 = re.search(r'Definitions\/Block\sHeader\/(.*)$',line) if t9: req_prebuild[pbd_number].append( ('blockheader', 0.1, t9.groups()[0]) ) continue req_prebuild[pbd_number].append( ('unknown', current_item_number, t1.groups()[1]) ) award_build[AW]["requirements"] = req_prebuild award_build[AW]["unit_calcs"] = pbd_unit_calcs # associate unit calculations with program blocks for block_key in req_prebuild.keys(): if block_key in pbd_unit_calcs: req_prebuild[block_key].insert(0, pbd_unit_calcs[block_key]) else: req_prebuild[block_key].insert(0, {'unit_sum':0,'unit_sum_max':0,'override':0}) # do the unit calc math for block_key in req_prebuild.keys(): this_block = req_prebuild[block_key] pad = this_block[0] if v: print("pad: ",pad) block_dict = {} for item in this_block[1:]: print(item) try: if item[0] == "or": block_dict[ item[1]+"or" ] = 1 if item[0] == "h3": if v: print("+ ", item[1]) if item[0] == "blockheader": if v: print(" ", item[1]) if not item[0] == "course": continue block_dict[ item[1] ] = item[2] seq = int(item[1]) units = '' if item[2]['units']: units = float( item[2]['units'] ) except Exception as e: print("ERROR ERROR\nERROR ERROR") print(e) xyz = input('hit return to continue') #print( "%i \t %f \t %s" % (seq,units, item[2]['name'])) if v: for k in sorted( block_dict.keys() ): print(k," ", block_dict[k]) #for k in sliding_window(3, sorted( block_dict.keys() )): # l,m,n = k # if re.search(r'or$',m): # print("OR") # print(block_dict[l],"\n",block_dict[m],"\n",block_dict[n],"\n\n") #print() output = codecs.open("cache/programs/programs_built.json","w","utf-8") output.write( json.dumps(award_build, indent=2) ) def course_path_style_2_html(): verbose = 1 v = verbose dbg = codecs.open('cache/courses/debugout.txt','w','utf-8') oo = codecs.open("cache/courses/allclasspaths.txt","r","utf-8").readlines() course_prebuild = defaultdict( ddl ) last_line = "" for L in oo: L = L.strip() if not re.search(r'^Course',L): last_line = last_line + "
" + L continue else: if re.search(r'\/$',last_line): # ignore line with trailing slash - assume no data last_line = L continue test_1 = re.search(r'^Course\/(\d+)\/Course',last_line) if test_1: course_prebuild[ test_1.groups()[0] ]["Info"].append(last_line) test_2 = re.search(r'^Course\/(\d+)\/(\d+)\/(.*?)\/(.*)$',last_line) if test_2: course_prebuild[ test_2.groups()[0] ][test_2.groups()[2]].append(last_line) last_line = L output = codecs.open("cache/courses/courses_prebuild.json","w","utf-8") output.write( json.dumps(course_prebuild, indent=2) ) all_courses = {} active_courses = {} lookup_table = { 'entityTitle':'title', 'proposalType':'type', '\/Course\sDescription\/status':'status', 'Course\sDiscipline':'dept', 'Course\sNumber':'number', 'Course\sTitle':'name', 'Course Description\/\d\/Justification':'justification', 'Short\sTitle':'shortname', 'Course Description\/\d\/Internal\sProcessing\sTerm':'term', 'This\sCourse\sIs\sDegree\sApplicable':'degree_applicable', '\/Course\sDescription\/\d+\/Course\sDescription\/':'desc', 'Minimum\sUnits':'min_units', 'Minimum\sLecture\sHour':'min_lec_hour', 'Minimum\sLab\sHour':'min_lab_hour', 'Course\shas\svariable\shours':'has_var_hours', 'Number\sWeeks':'weeks', 'Maximum\sUnits':'max_units', 'Credit\sStatus':'credit_status', 'TOP\sCode':'top_code', 'Classification':'classification', 'Non\sCredit\sCategory':'noncredit_category', 'Stand-Alone\sClass?':'stand_alone', 'Grade\sOption':'grade_option', 'Is\sRepeatable':'repeatable', 'Learning\sOutcomes\/Description':'slo', 'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sState\sUniversities\sand\sColleges?':'transfer_csu', 'Is\sThis\sCourse\sis\sRecommended\sfor\sTransfer\sto\sUniversity\sof\sCalifornia?':'transfer_uc', '\/Catalog\sCourse\sSummary\sView\/':'catalog', '\/Course\sContent/\d+/Lecture\sContent\/':'content', '\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/':'objectives'} for C in sorted(list(course_prebuild.keys()),key=int): v = 0 crs = course_prebuild[C] course_build = {'slo':{}} # defaultdict( ddl ) if v: print(C) dbg.write(f"{C}\n") for K in crs.keys(): if v: print("\t%s" % K) for line in crs[K]: for (str,key) in lookup_table.items(): if re.search(str,line): if key == 'slo': # \s\s content_search = re.search(r'\/Learning\sOutcomes\/\d+\/cqid_(\d+)\/Learning\sOutcomes\/Description\/(.*?)$',line) if content_search: course_build['slo'][content_search.groups()[0]] = content_search.groups()[1] else: print("NO SLO? %s" % line) elif key == 'desc': content_search = re.search(r'^Course\/\d+\/\d+\/Course\sDescription\/\d+\/Course\sDescription\/(.*)$',line) course_build['desc'] = content_search.groups()[0] elif key == 'catalog': content_search = re.search(r'^Course\/\d+\/\d+\/General\sEducation\sPattern\/\d+\/Catalog\sCourse\sSummary\sView\/(.*)$',line) course_build['catalog'] = content_search.groups()[0] elif key == 'content': content_search = re.search(r'^Course\/\d+\/\d+\/Course\sContent\/\d+\/Lecture\sContent\/(.*)$',line) course_build['content'] = content_search.groups()[0] elif key == 'objectives': content_search = re.search(r'^Course\/\d+\/\d+\/ASSIST\sPreview\/\d+\/Outcomes\sand\sObjectives\/(.*)$',line) course_build['objectives'] = content_search.groups()[0] else: content_search = re.search(r'^(.*)\/(.*?)$',line) course_build[key] = content_search.groups()[1] dbg.write(f"{key} => {content_search.groups()[1]}\n") if v: print("\t\t%s - %s" % (key, course_build[key])) continue all_courses[C] = course_build if course_build['status'] == 'Active': active_courses[C] = course_build output = codecs.open("cache/courses/courses_built.json","w","utf-8") output.write( json.dumps(all_courses, indent=2) ) output2 = codecs.open("cache/courses/courses_active_built.json","w","utf-8") output2.write( json.dumps(active_courses, indent=2) ) ######### ######### ######### ######### def another_request(url,startat): global err_fail_filecount newparam = "&skip=" + str(startat) print((url+newparam)) r = requests.get(url+newparam, auth=(user,pasw)) #print(r.text) try: mydata = json.loads(r.text, strict=False) except Exception as e: print("Couldn't read that last bit") #print((r.text)) codecs.open('cache/curric2022failfile_%i.txt' % err_fail_filecount,'w','utf-8').write(r.text) err_fail_filecount += 1 print(e) return 0,0,[] size = mydata['resultSetMetadata']['ResultSetSize'] endn = mydata['resultSetMetadata']['EndResultNum'] items = mydata['entityInstances'] print((' Got ' + str(size) + ' instances, ending at item number ' + str(endn))) return size,endn,items def fetch_all_classes(): if os.path.isdir('cache/courses'): m = datetime.strptime(time.ctime(os.path.getctime('cache/courses')), "%a %b %d %H:%M:%S %Y") today = 'cache/courses_%s' % m.strftime('%Y_%m_%d') print("+ Creating folder: %s" % today) shutil.move('cache/courses', today) os.makedirs('cache/courses') size = 100 endn = 0 filen = 1 while(size > 99): size, endn, items = another_request(CQ_URL+PARAM,endn) out = codecs.open('cache/courses/classes_'+str(filen)+'.txt','w', 'utf-8') out.write(json.dumps(items,indent=2)) out.close() filen += 1 print("Written to 'cache/courses....") # # # Main worker # def recur_path_matcher(item, path=[]): def x2_path_update(x,y,z): path.extend([str(y),x]) my_result_lines.append( '/'.join(path) + '/' + 'lastEdited' + '/' + z) path_str = "/".join(path) + "/" path_str = re.sub('\/+','/',path_str) path_str = re.sub('\s+',' ',path_str) my_result_lines = [] if type(item) == type({}): original_path = path.copy() match( item, {'attributes': {'displayName': _}, 'lookUpDisplay': _, }, lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) , {'attributes': {'displayName': _}, 'fieldValue': _, }, lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) , {'attributes': {'fieldName': _}, 'fieldValue': _, }, lambda x,y: my_result_lines.append("%s%s/%s" % (path_str, clean(str(x)), clean(str(y)))) , {'instanceId':_, 'sectionName': _, 'sectionSortOrder':_}, lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]), {'instanceId':_, 'sectionName': _, 'instanceSortOrder':_}, lambda id,name,order: path.extend([str(order),'cqid_'+str(id),name]), {'sectionName': _, 'sectionSortOrder':_, 'lastUpdated': _ }, #lambda x,y,z: path.extend([str(y),x,z]), x2_path_update, {'sectionName': _, 'sectionSortOrder':_}, lambda x,y: path.extend([str(y),x]), {'sectionName': _}, lambda x: path.append(x), _, nothing #lambda x: path.append('') ) path = original_path for K,V in list(item.items()): my_result_lines.extend(recur_path_matcher(V,path)) elif type(item) == type([]): for V in item: my_result_lines.extend(recur_path_matcher(V,path)) return my_result_lines def pathstyle(theclass): #theclass = json.loads( codecs.open('cache/courses/samplecourse.json','r','utf-8').read() ) # {'entityMetadata': {'entityTitle': _,'status': _, 'entityType':_, 'entityId':_ }}, # lambda title,status,typ,id: # my_result_lines.append("%s%s/%s/%s [%s]" % (path_str, str(typ), str(id), str(title),str(status))) , if "entityMetadata" in theclass: id = theclass["entityMetadata"]["entityId"] title = theclass["entityMetadata"]["entityTitle"] typ = theclass["entityMetadata"]["entityType"] action = theclass["entityMetadata"]["proposalType"] status = theclass["entityMetadata"]["status"] #"entityId": 4077, #"entityTitle": "ENGL2B - American Ethnic Literature", #"entityType": "Course", #"proposalType": "Deactivate Course", #"status": "Historical", result = [ "/".join([ typ,str(id),"Course Description","entityTitle",title]) , "/".join([ typ,str(id),"Course Description","entityType",typ]) , "/".join([ typ,str(id),"Course Description","proposalType",action]) , "/".join([ typ,str(id),"Course Description","status",status]) , ] result.extend(recur_path_matcher(theclass["entityFormData"]["rootSections"], [typ,str(id)] )) #oo = codecs.open("cache/courses/curric2022test_path.json","w","utf-8") #print(result) return result else: print("didn't seem to be a class.") def single_course_path_parse(c): this_course = [] global num_failed_course if "attributes" in c and "entityId" in c["attributes"]: print(c["attributes"]["entityId"]) return (c["attributes"]["entityId"], pathstyle(c)) else: print("I couldn't recognize a class in that") ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8') ooops.write(json.dumps(c,indent=2)) ooops.close() num_failed_course = num_failed_course + 1 return ("-1", []) def path_style_test(): classes = {} oo = codecs.open("cache/courses/allclasspaths.txt","w","utf-8") for f in os.listdir('cache/courses'): if re.search('^classes_',f): print(f) cls = json.loads(codecs.open('cache/courses/'+f,'r','utf-8').read(),strict=False) for c in cls: id,output = single_course_path_parse(c) classes[id] = "\n".join(output) oo.write( classes[id] ) oo.write( "\n\n\n" + "-"*30 + "\n\n" ) oo.flush() def make_sl(): return SortedList(key=lambda x: -1 * int(x['m'])) def course_rank(): csvfile = codecs.open('cache/courses/all_courses_ranked.csv','w','utf-8') csvwriter = csv.writer(csvfile) csvwriter.writerow("code,cqcourseid,coursestatus,termineffect,dept,num,numoutcomes".split(",")) courses = json.loads(codecs.open('cache/courses/course_cq_index.json','r','utf-8').read()) all = defaultdict(make_sl) for c in courses: code = c['d']+c['n'] if not c['m']: c['m'] = '200030' all[code].add(c) for k in sorted(all.keys()): #print("\n##",k) #print(json.dumps(list(all[k]),indent=2)) for version in all[k]: csvwriter.writerow( [ version['d']+version['n'], version['c'], version['s'], version['m'], version['d'], version['n'], len(version['o']) ]) def de_classpaths(): outfile = codecs.open('cache/courses/all_de_classpaths.txt', 'w','utf-8') areas = ['Distance Education/1/2/Justification/Need/Justification','/Distance Education/1/3/Content Presentation/A. Methods of Instruction/','/Distance Education/1/3/Content Presentation/B. Instructional Materials and Resources:
1. What materials and resources will you provide your students in a virtual environment?/','/Distance Education/4/Assessment/','/Distance Education/4/Methods of Instruction/','/Distance Education/1/3/Content Presentation/2. Have you assessed the use of high-quality open educational resources (OER) to help bridge the digital divide for students in the course? If so, please describe how you will be using them./','/Distance Education/4/Instructional Materials and Resources/','/Distance Education/1/3/Content Presentation/3. How will students be provided access to library materials and other learning resources in a virtual environment? (virtual reference librarian, research guides, digital content, etc.)/','/Distance Education/4/How will students be provided access to library materials and what support will students be provided to help them locate and use these materials?
Library and Other Learning Resources/','/Distance Education/1/3/Content Presentation/4. How will students access equitable student support services in a virtual environment? (tutoring, financial aid, counseling, etc.)/','/Distance Education/4/Accommodations for Students with Disabilities/','/6/Distance Education/4/Office Hours/','/Contact/Contact/Description/'] i = 0 for area in areas: with codecs.open('cache/courses/allclasspaths.txt', 'r','utf-8') as infile: outfile.writelines(line for line in infile if area in line) i += 1 if i % 1000 == 0: print(i) from semesters import human_to_sis, get_previous_season #from pipelines import area, areas def extract_digits(input_string): """ Removes all non-digit characters from the input string and returns an integer. :param input_string: The string to process. :return: An integer containing only the digits from the input string. """ #return input_string digits_only = ''.join(char for char in input_string if char.isdigit()) return int(digits_only) if digits_only else 0 def filter_classes(): # for removing deactivated classes json_file_path = 'cache/courses/courses_built.json' output_csv_path = 'cache/courses/active_courses.txt' all_courses = [] with open(json_file_path, 'r') as json_file: data = json.load(json_file) for i,C in data.items(): term = '' try: term = C['term'] except: print(f"** {i} {C['dept']} {C['number']} is missing term") term = '' shortname = '' try: shortname = C['shortname'] except: shortname = C['name'] print(f"** {i} {C['dept']} {C['number']} is missing shortname") all_courses.append(f"{C['dept']} {C['number']} {shortname} \t {C['status']} {C['type']} \t{term} - {i}") all_courses.sort() for C in all_courses: print(C) def slo_summary_report(): # for scheduling slo assessment json_file_path = 'cache/courses/courses_built.json' output_csv_path = 'cache/courses/courses_slo_schedule.csv' term_csv_file_path = 'cache/courses/slo_schedule.csv' (gp, course_to_area, areacode_to_area, area_to_dean, dean, dean_code_to_name) = campus_dept_hierarchy() with open(json_file_path, 'r') as json_file: data = json.load(json_file) # Extract course information courses = [] term_courses = [] for key, course in data.items(): try: #print(f"{course['dept']} - -" ) re_code_course = { "key": key, "type": course.get("type", ""), "status": course.get("status", ""), "dept": course.get("dept", ""), "number": course.get("number", ""), "name": course.get("name", ""), "first_active_term": course.get("term", ""), 'first_active_term_code': human_to_sis(course.get('term', '')), "reviewing_term": get_previous_season(course.get("term","")), "reviewing_term_code": human_to_sis(get_previous_season(course.get('term', ''))), "area": areacode_to_area[ course_to_area[course.get("dept", "").upper()] ] } courses.append(re_code_course) if course["status"] in ["Active", "In Review"] and course["type"] != "Deactivate Course": term_courses.append(re_code_course) except Exception as e: print(f"error on course: {course['dept']} {course['number']} {course['name']}") # Sort by dept, number, and term courses.sort(key=lambda x: (x["dept"], extract_digits(x["number"]), x["reviewing_term_code"])) term_courses.sort(key=lambda x: (x["reviewing_term_code"],x["dept"], extract_digits(x["number"]))) # Write to CSV fieldnames = ["dept", "number", "reviewing_term", "reviewing_term_code", "status", "key", "type", "name", "first_active_term", "first_active_term_code","area"] with open(output_csv_path, 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(courses) with open(term_csv_file_path, 'w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(term_courses) print(f"CSV file '{output_csv_path}' has been created.") if __name__ == "__main__": print ('') options = { 1: ['fetch all courses', fetch_all_classes], 2: ['process all classes', path_style_test], 3: ['courses - path style to json and html catalog', course_path_style_2_html], 4: ['show course outcomes', all_outcomes], 5: ['courses - rank by all versions', course_rank], 6: ['extract de info from class paths', de_classpaths], 7: ['build schedule or summary for SLO planning', slo_summary_report], 8: ['remove deactivated courses', filter_classes], 10: ['fetch all programs', fetch_all_programs], 11: ['process all programs', path_style_prog], 12: ['programs - path style to html catalog', path_style_2_html], } print ('') if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]()