import requests, json, codecs, csv, re, sys, os, shutil, time from collections import defaultdict from pipelines import fetch, url, header from courses import getCoursesInTerm, getTerms from concurrent.futures import ThreadPoolExecutor f = codecs.open('cache/slo/log.txt','w','utf-8') VERBOSE = 1 TERM = '180' SLO_CURRENT_SOURCE = 'cache/slo/2018_slo.csv' # term 21 #SLO_CURRENT_SOURCE = 'cache/slo/2020_slo.csv' ## Outcomes import format looks like this ## https://canvas.instructure.com/doc/api/file.outcomes_csv.html # vendor_guid,object_type,title,description,display_name,calculation_method,calculation_int,workflow_state,parent_guids,ratings,,,,,,, # a,group,Parent group,parent group description,G-1,,,active,,,,,,,,, # b,group,Child group,child group description,G-1.1,,,active,a,,,,,,,, # c,outcome,Learning Standard,outcome description,LS-100,decaying_average,40,active,a b,3,Excellent,2,Better,1,Good,, def outcome_overview(term=21): d = input("Pick a department. Enter the short code for it: ") d = d.upper() account_level,dept_details = outcome_groups() local_source = slo_source_by_dept() this_term,sections_by_course, section_to_cid = outcomes_attached_to_courses(term,d) f.write("\n\n* * * * * * * *\nACCOUNT LEVEL GROUPS & OUTCOMES\n") f.write(json.dumps(account_level,indent=4)) f.write("\n\n* * * * * * * *\LOCAL OUTCOMES ON FILE\n") f.write(json.dumps(local_source,indent=4)) f.write("\n\n* * * * * * * *\OUTCOMES ATTACHED TO COURSES IN THIS TERM\n") f.write(json.dumps(this_term,indent=4)) f_t_s, s_t_f = x_ref_dept_names() act = [] acct_folder_present = "Present" if d in account_level: act = account_level[s_t_f[d]].keys() # else: acct_folder_present = "Missing" all_sections = {} act_col = {} local_col = {} course_col = {} report = [['Course','Account Lvl','Local Txt','Course Shell'],['------','------','------','------',]] report.append( ['Department folder', acct_folder_present, '','' ] ) big_union = list(set().union( *[ this_term[d].keys(), act, local_source[d].keys()] )) big_union.sort() for C in big_union: row = [ C, ] if d in account_level and C in account_level[d]: act_col[C] = str(len(account_level[d][C])-1)+" outcomes" row.append(str(len(account_level[d][C])-1)+" outcomes") else: act_col[C] = "NO" row.append("No") if d in local_source and C in local_source[d]: local_col[C] = str(len(local_source[d][C]))+" outcomes" row.append(str(len(local_source[d][C]))+" outcomes") else: local_col[C] = "NO" row.append('No') row.append('') # handle sections below report.append(row) if C in sections_by_course.keys(): for S in sections_by_course[C]: report.append( [' '+S,'','',str(len(sections_by_course[C][S]))+" outcomes"] ) all_sections[S] = sections_by_course[C][S] print("Semester is: Spring 2017") for R in report: print("{: <20} {: >20} {: >20} {: >20}".format(*R)) if acct_folder_present == 'Missing': a = input("Department's outcome group is missing. Add it? (y/n) ") if a == 'y': create_dept_group(d) account_level,dept_details = outcome_groups() action = '' while action != 'q': f.flush() print("\n------\nOptions:") print(" a {classcode} - Create account level outcomes, using local outcomes") print(" c {section num} - Connect the acct level outcomes to a particular course") print(" d - Enter Account level outcomes for the whole department") print(" t - Connect outcomes to all sections in this term") print(" p - Pick new department") print(" q - quit") action = input('> ') a = re.search('^(\D)\s?(.*)$',action) if a: if a.group(1) == 'a': classcode = a.group(2) print("Creating outcomes for: " + classcode) if classcode in account_level[d]: # whether or not there's a dept folder already... print(account_level[d][classcode]['outcome_group']) create_acct_lvl_outcomes(local_source[d][classcode],dept_details[d],'',str(account_level[d][classcode]['outcome_group']['id'])) else: create_acct_lvl_outcomes(local_source[d][classcode],dept_details[d],classcode) elif a.group(1) == 'c': section = a.group(2) print(sections_by_course) this_course = '' # need to find the course that this section corresponds to for C in sections_by_course.keys(): if section in sections_by_course[C].keys(): this_course = C f.write("\n--------> SECTIONS BY COURSE -------->\n\n" + json.dumps(sections_by_course,indent=2)) f.write("\n--------> ALL SECTIONS -------->\n\n" + json.dumps(all_sections,indent=2)) f.write("\n--------> OC GROUPS THIS DEPT -------->\n\n" + json.dumps(account_level[d],indent=2)) if this_course: print("Connecting outcomes for course: " + this_course + ", to section: " + section) try: connect_acct_oc_to_course(section_to_cid[section], str(account_level[d][this_course]['outcome_group']['id'])) except KeyError as e: print("Couldn't do this because there don't appear to be account level outcomes for " + str(d)) elif a.group(1) == 'd': # put in acct level OCs for all classes in dept's local for classcode in local_source[d]: print("Creating outcomes for: " + classcode) if classcode in account_level[d]: # whether or not there's a dept folder already... print(account_level[d][classcode]['outcome_group']) create_acct_lvl_outcomes(local_source[d][classcode],dept_details[d],'',str(account_level[d][classcode]['outcome_group']['id'])) else: create_acct_lvl_outcomes(local_source[d][classcode],dept_details[d],classcode) elif a.group(1) == 't': #for C in local_source[d]: i = 0 for C in sections_by_course.keys(): if i > 10: break print('Finding sections for ' + C + ':') for S in sections_by_course[C]: print("Connecting outcomes for course: " + C + ", to section: " + S) if len(sections_by_course[C][S]): print("** looks like it already has some") continue try: connect_acct_oc_to_course(section_to_cid[S], str(account_level[d][C]['outcome_group']['id'])) i += 1 except KeyError as e: print("Couldn't do this because there don't appear to be account level outcomes for " + str(C)) elif a.group(1) == 'p': action = 'q' outcome_overview(term) """ if d in this_term: print("\n\nIn this terms courses:\n"+json.dumps(this_term[d].keys(),indent=4)) if d in account_level: print("Account level:\n"+json.dumps(account_level[d].keys(),indent=4)) else: print("Missing department at account level. ") if d in local_source: print("\n\nLocal:\n"+json.dumps(local_source[d].keys(),indent=4)) """ def create_acct_lvl_outcomes(src,dept,makefolder='',folder=0): print("these... ") print(json.dumps(dept,indent=2)) print(json.dumps(src,indent=2)) parent_group = str(dept['id']) if makefolder: print("I need to make a folder for this course: " + makefolder + " with parent id=" + parent_group) new_folder = create_course_group(makefolder,parent_group) parent_group = str(new_folder['id']) else: parent_group = folder for this_outcome in src: #this_outcome = src[2] short_name = this_outcome[1] + ". " + " ".join(this_outcome[2].split(" ")[0:4]) + "..." parameters = { "title": short_name, "display_name": short_name, "description": this_outcome[2], "mastery_points": 3, } t = url + '/api/v1/accounts/1/outcome_groups/'+parent_group+'/outcomes' print(t) #print(parameters) print(json.dumps(parameters,indent=2)) r = requests.post(t,data=parameters, headers=header) print(r.text) def connect_acct_oc_to_course(course_id,oc_group_id): global results, results_dict results_dict = {} print("these... ") print(json.dumps(course_id,indent=2)) print(json.dumps(oc_group_id,indent=2)) # get course's outcome group id results_dict = fetch(url + '/api/v1/courses/' + str(course_id) + '/root_outcome_group') print('Course outcome group:') print(results_dict) og_id = str(results_dict['id']) # get all the account level outcomes for this course these_outcomes = fetch(url + '/api/v1/accounts/1/outcome_groups/' + oc_group_id + '/outcomes?outcome_style=full') print('Outcomes in account level group for course:') print(these_outcomes) for o in these_outcomes: o_id = str(o['outcome']['id']) t = url + '/api/v1/courses/' + str(course_id) + '/outcome_groups/' + og_id + '/outcomes/' + o_id #printt r = requests.put(t, headers=header) print(r.text ) def outcome_groups_dump(): print("Loading account level outcomes..." ) top_level = fetch(url + '/api/v1/accounts/1/outcome_groups', VERBOSE) codecs.open('cache/slo/outcome_groups.json','w','utf-8').write( json.dumps(top_level,indent=2)) def outcome_groups_backup(): top_level = fetch(url + '/api/v1/accounts/1/outcome_groups') course_groups = defaultdict( list ) dept_groups = defaultdict( list ) # Arranging the "groups" into a structure for O in top_level: if isinstance(O, dict): parent = 0 if 'parent_outcome_group' in O: parent = O['parent_outcome_group']['id'] if O['parent_outcome_group']['id'] == 1: dept_groups[O['id']].append(O) else: course_groups[O['parent_outcome_group']['id']].append(O) # Add actual outcomes to the structure group_links = fetch(url + '/api/v1/accounts/1/outcome_group_links') for G in group_links: if 'outcome' in G: # print(str(G['outcome']['id']) + "\t" + str(G['outcome_group']['id']) + "\t" + str(G['outcome']['title']) + "\t") parent = G['outcome_group']['id'] course_groups[parent].append(G) # Traverse the tree and print it for D in dept_groups.keys(): print("Department: " + dept_groups[D][0]['title']) dept_id = dept_groups[D][0]['id'] for C in course_groups[dept_id]: print(" Course: " + C['title']) course_id = C['id'] for O in course_groups[course_id]: print(" " + str(O['outcome']['id']) + "\t" + O['outcome']['title']) return dept_groups def create_course_group(short,parent): t = url + '/api/v1/accounts/1/outcome_groups/'+parent+'/subgroups' new_group = {'title': short } data = json.dumps(new_group) print(t) r = requests.post(t,data=new_group, headers=header) result = json.loads(r.text) print(r.text) return result def create_dept_group(short): full_to_short_names, short_to_full = x_ref_dept_names() print("Creating Dept-Level outcome group for " + short_to_full[short] + " (" + short + ")") #a = input("Press return to continue or c to cancel.") #if a == 'c': return t = url + '/api/v1/accounts/1/outcome_groups/1/subgroups' new_group = {'title': short_to_full[short] } data = json.dumps(new_group) print(t) r = requests.post(t,data=new_group, headers=header) print(r.text) def outcomes_attached_to_courses(term=TERM,limitdept=''): # For each class in a term, check to see if it has outcomes and/or # an outcome group attached to it. courses = getCoursesInTerm(term,show=0,active=0) bycode = read_slo_source() by_dept = defaultdict(dict) sections_by_course = defaultdict(dict) section_to_courseid = {} print("Loading course-attached outcomes for this semester...") for R in courses: results = [] #print(R) id = R['id'] name = R['name'] b = '' ### TODO Handle this: CSIS/DM85 WEB DESIGN 40823/24 # MCTV17A/18/THEA17 # MUS4B/5ABCD # APE # # the dept/num combo is 'codeguess' codeguess = name.split(' ')[0] if '/' in codeguess: a,b = fix_joined_class(codeguess) codeguess = a # get the dept name alone... a = re.search('^(\D+)(\d+)(.*)$',codeguess) if a: dept = a.group(1) else: print(" ! Problem getting dept from: %s\n ! Original: %s" % (codeguess,name)) dept = 'unknown' # section number? a = re.search('\s(\d+)$',name) section = '' if a: section = a.group(1) if limitdept: if not dept == limitdept: continue section_to_courseid[section] = str(id) # look for outcomes in this term's course results = fetch(url + '/api/v1/courses/' + str(id) + '/outcome_group_links') if len(results): #print(" found online SLOs" # summarize_course_online_slo(results)) print(results) details = [ fetch(url + '/api/v1/outcomes/' + str(oo['outcome']['id']), VERBOSE) for oo in results ] by_dept[dept][codeguess] = [results,details] sections_by_course[codeguess][section] = [results,details] else: by_dept[dept][codeguess] = [] sections_by_course[codeguess][section] = [] #print(" no online SLOs." ) codecs.open('cache/slo/slo_attached_by_dept.json','w','utf-8').write( json.dumps(by_dept,indent=2)) codecs.open('cache/slo/slo_attached_by_course.json','w','utf-8').write( json.dumps(sections_by_course,indent=2)) codecs.open('cache/slo/slo_attached_sect_to_courseid.json','w','utf-8').write( json.dumps(section_to_courseid,indent=2)) return by_dept, sections_by_course, section_to_courseid def summarize_course_online_slo(outcome_list): print("Found online SLOs: ") for o in outcome_list: ass = '' title = '' dname = '' if o['assessed'] == 'true': ass = "\t(has assessments) " if 'display_name' in o['outcome']: dname = "\t " + str(o['outcome']['display_name']) if 'title' in o['outcome']: title = "\t " + str(o['outcome']['title']) print(" " + str(o['outcome']['id']) + ass + dname + title) details = fetch_outcome_details(o['outcome']['id']) print(" " + details['description']) def fetch_outcome_details(id): return fetch(url + '/api/v1/outcomes/' + str(id), VERBOSE) # Report on the actual evaluation data? def outcome_report1(): output = open('cache/slo/report.txt','w') # first, get all classes in a term, then filter to published classes #t = url + "/api/v1/accounts/1/courses?published=true&enrollment_term_id=18" #while t: t = fetch(t) results = [ {'id':1697,'course_code':'anth5 10407'},{'id':1825,'course_code':'anth1 10398'},{'id':2565,'course_code':'csis8 10705'}] for c in results: oc_t = url + '/api/v1/courses/' + str(c['id']) + '/outcome_results' while oc_t: oc_t = fetch(oc_t) # TODO if len(results_dict['outcome_results']): print(c['id'], "\t", c['course_code']) output.write( "\t".join([str(c['id']), c['course_code'], "\n"])) num_entries = len(results_dict['outcome_results']) total_score = 0 by_student = {} for R in results_dict['outcome_results']: usr = R['links']['user'] print(usr) print(R) if usr in by_student: by_student[usr] += 1 else: by_student[usr] = 1 total_score += R['score'] num_students = len(by_student.keys()) average_score = total_score / (5.0 * num_entries) output.write("Total Students: " + str(num_students)) output.write("\nTotal Entries: " + str(num_entries)) output.write("\nAverage Score: " + str(average_score) + "\n\n") # For the given course, get all outcome measurements, and display scores and stats. def outcome_report2(): print("Getting course level outcomes.") output = open('cache/slo/report.txt','w') res = [ {'id':1697,'course_code':'anth5 10407'},{'id':1825,'course_code':'anth1 10398'},{'id':2565,'course_code':'csis8 10705'}] for c in res: results = fetch(url + "/api/v1/courses/" + str(c['id']) + "/outcome_groups", VERBOSE) for outcome in results: f.write("Outcome groups\n") f.write(json.dumps(outcome,indent=4)) if 'subgroups_url' in outcome: f.write("\nfound subgroup. getting it: " + outcome['subgroups_url']) t = url+outcome['subgroups_url'] rr = fetch(t, VERBOSE) f.write("\nThis: \n" + json.dumps(rr,indent=4)) crs_oc_grps = fetch(url + '/api/v1/courses/' + str(c['id']) + '/outcome_results', VERBOSE) if len(crs_oc_grps['outcome_results']): f.write( str(len(crs_oc_grps['outcome_results'])) + " results here.\n") print(c['id'], "\t", c['course_code']) output.write( "\t".join([str(c['id']), c['course_code'], "\n"])) num_entries = len(crs_oc_grps['outcome_results']) total_score = 0 by_student = {} for R in crs_oc_grps['outcome_results']: usr = R['links']['user'] print(usr) print(R) f.write( "\t".join([str(c['id']), c['course_code'], "\n"])) f.write(json.dumps(R,indent=4)) if usr in by_student: by_student[usr] += 1 else: by_student[usr] = 1 total_score += R['score'] num_students = len(by_student.keys()) average_score = total_score / (5.0 * num_entries) output.write("Total Students: " + str(num_students)) output.write("\nTotal Entries: " + str(num_entries)) output.write("\nAverage Score: " + str(average_score) + "\n\n") def fix_joined_class(str): parts = str.split('/') class_num = re.search(r'(\D+)(\d+\D?)', parts[1]) if class_num: dept = class_num.group(1) num = class_num.group(2) return parts[0]+num, parts[1] else: class_num = re.search(r'(\D+)(\d+)\/(\d+\D?)',str) if class_num: dept = class_num.group(1) num1 = class_num.group(2) num2 = class_num.group(3) return dept+num1, dept+num2 else: class_num = re.search(r'(\D+)\/(\D+)\/(\D+)(\d+)',str) if class_num: dept1 = class_num.group(1) dept2 = class_num.group(2) num = class_num.group(4) return dept1+num, dept2+num else: class_num = re.search(r'(\D+)(\d+)(\D)\/(\D)',str) if class_num: dept = class_num.group(1) num = class_num.group(2) ltr1 = class_num.group(3) ltr2 = class_num.group(4) return dept+num+ltr1, dept+num+ltr2 print("can't guess courses on: " + str) return "","" def split_slo_name(str): n_parts = str.split(r' - ') code = n_parts[0] title = n_parts[1] m = re.search(r'^([^\d]+)(\d+)$',code) dept = '' if m: dept = m.groups()[0].rstrip() return dept, title, code def outcome_report3(): output = open('cache/slo/report.txt','w') # with an uppercase dept abbreviation, look up the outcome group #a = input("Department code (ex: AMT): ") code_to_name = {} name_to_code = {} dnames = open('cache/slo/dept_names.csv','r').readlines() for DN in dnames: DN = DN.rstrip() pts = DN.split(',') code_to_name[pts[0]] = pts[1] name_to_code[pts[1]] = pts[0] print(json.dumps(name_to_code,indent=4)) # get the main canvas slo subgroups t = url + '/api/v1/accounts/1/outcome_groups' while(t): t = fetch(t) top_level = results for TL in top_level: if 'parent_outcome_group' in TL and TL['parent_outcome_group']['id']==1: if TL['title'] in name_to_code: print("Matched: " + TL['title'] + " " + name_to_code[TL['title']]) #else: # print("Didn't match: " + json.dumps(TL,indent=4)) else: print("Not top level: " + TL['title']) sample = ['DM61 - 3D Animation,3,Student will analyze character movements and synthesize necessary joints and kinematics for realistic animation.,"project, performance"'] rd = csv.reader(sample) for row in rd: (dept,title,code) = split_slo_name(row[0]) full_dept = "NOT FOUND" if dept in code_to_name: full_dept = code_to_name[dept] print("dept: " + dept) print("dept long: " + full_dept) print("title: " + title) print("code: " + code) print("number in course: " + row[1]) print("text of slo: " + row[2]) print("assessment: " + row[3]) def read_slo_source(): f = open(SLO_CURRENT_SOURCE,'r') fr = csv.reader(f) i=0 bycode = defaultdict(list) for row in fr: if i: (d, t, c) = split_slo_name(row[0]) c = c.replace(" ","") #print(d + "\t" + c + "\t" + row[1]) bycode[c].append(row) i += 1 #print(json.dumps(bycode,indent=2)) return bycode def slo_source_by_dept(): bycode = read_slo_source() bydept = defaultdict(dict) for code in bycode.keys(): a = re.search('(\D+)(\d+)',code) if a: dept = a.group(1) num = a.group(2) bydept[dept][code] = bycode[code] else: print("Couldn't interpret: " + code) return bydept def printj(j): print( json.dumps(j, indent=2) ) def writej(o,j): o.write(json.dumps(j,indent=2)) o.write('\n') o.flush() # Get root outcome group def root_og(): f = url + '/api/v1/global/root_outcome_group' g = fetch(f) printj(g) return g def recur_og(): output = codecs.open('cache/outcomes_log.txt','w','utf-8') #all = [] #r = root_og() recur_main(output) def recur_main(out,g_url=""): if not g_url: g_url = url + '/api/v1/global/root_outcome_group' print('fetching: %s' % g_url) g = fetch(g_url,1) printj(g) writej(out,g) if "subgroups_url" in g: print('Subgroups: ' + g['subgroups_url']) for S in fetch(url+g["subgroups_url"]): recur_main(S) if "outcomes_url" in g: print('Outcomes: ' + g['outcomes_url']) for S in fetch(url+g["outcomes_url"]): recur_main(S) out.write('\n') print() def recur2(out,og={}): if not og: return if "subgroups_url" in og: print('Subgroups: ' + og['subgroups_url']) for S in fetch(url+og["subgroups_url"],1): printj(S) writej(out,S) recur2(out,S) if "outcomes_url" in og: print('Outcomes: ' + og['outcomes_url']) for S in fetch(url+og["outcomes_url"],1): printj(S) writej(out,S) recur2(out,S) out.write('\n') print() def all_og(): output = codecs.open('cache/outcomes_log.txt','w','utf-8') f = url + '/api/v1/accounts/1/outcome_groups' g = fetch(f,1) for OG in g: printj(g) writej(output,g) recur2(output,OG) NUM_THREADS = 10 def course_slo_getter(q): i = q["i"] folder = q["folder"] imgsrc = q["url"] total = q["total"] (head,tail) = os.path.split(imgsrc) if os.path.exists( os.path.join(folder,tail) ): print(" + Image %i was already downloaded." % i) return print(" Image %i / %i, folder %s, getting %s" % (i,total,folder,imgsrc)) r = requests.get(imgsrc,stream=True) if r.status_code == 200: with open(os.path.join(folder,tail),'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) print(" + Done with image %i." % i) time.sleep(0.75) else: print(" - Failed with image %i." % i) ''' results = [] def threaded_getter(): global results threads = [] qqueue = [] i = 0 for img in soup.select('a[href] img'): link = img.find_parent('a', href=True) qqueue.append( {"i":i,"folder":folder,"url":fix_url(url,link['href']) } ) i += 1 print("There are %i images to fetch." % len(qqueue)) pool = ThreadPoolExecutor(max_workers=NUM_THREADS) for q in qqueue: q["total"] = len(qqueue) results.append( pool.submit(course_slo_getter, q) ) ''' # Creating outcomes with scale """ curl 'https:///api/v1/accounts/1/outcome_groups/1/outcomes.json' \ -X POST \ --data-binary '{ "title": "Outcome Title", "display_name": "Title for reporting", "description": "Outcome description", "vendor_guid": "customid9000", "mastery_points": 3, "ratings": [ { "description": "Exceeds Expectations", "points": 5 }, { "description": "Meets Expectations", "points": 3 }, { "description": "Does Not Meet Expectations", "points": 0 } ] }' \ -H "Content-Type: application/json" \ -H "Authorization: Bearer " """ def demo_o_fetch(): print(fetch_outcome_details('269')) print(fetch_outcome_details('270')) def outcome_groups_2021(): csvfile = codecs.open('cache/slo/ilearn_compact.csv','w','utf-8') csvwriter = csv.writer(csvfile) csvwriter.writerow('id parentid title type desc'.split(' ')) print("Loading account level outcomes..." ) # id, parentid, title, type, description #top_level = json.loads(codecs.open('cache/slo/outcome_groups.json','r','utf-8').read()) top_level = fetch(url + '/api/v1/accounts/1/outcome_groups', VERBOSE) by_dept = defaultdict(dict) dept_details = {} all_outcomes = {} #course_groups = defaultdict( list ) #dept_groups = defaultdict( list ) group_id_to_dept = {} group_id_to_course = {} full_to_short_names, short_to_full_names = x_ref_dept_names() # Arranging the "groups" into a structure for O in top_level: if isinstance(O, dict): if 'parent_outcome_group' in O: if O['parent_outcome_group']['id'] == 1: group_id_to_dept[str(O['id'])] = full_to_short_names[ O['title'] ] by_dept[ full_to_short_names[O['title']]] = {} dept_details[ full_to_short_names[O['title']] ] = O else: group_id_to_course[ str(O['id']) ] = O['title'] # repeat after dept names are gathered. for O in top_level: if isinstance(O, dict): if 'parent_outcome_group' in O: if O['parent_outcome_group']['id'] != 1: parent_id = str(O['parent_outcome_group']['id']) by_dept[ group_id_to_dept[ parent_id ] ][O['title']] = {'outcome_group':O} #print(json.dumps(by_dept, indent=4)) #print(json.dumps(group_id_to_dept, indent=4) ) #print(json.dumps(group_id_to_course, indent=4) ) # Add actual outcomes to the structure results = fetch('/api/v1/accounts/1/outcome_group_links', VERBOSE) for G in results: if 'outcome' in G: # find the dept and parent ... # this depends on every outcome (group) having a different name #print(G) details = fetch(url + '/api/v1/outcomes/' + str(G['outcome']['id']), VERBOSE) print(details) try: parent = group_id_to_course[ str(G['outcome_group']['id']) ] details['parent'] = parent except: print("can't find a course for this outcome group: ", G['outcome_group']['id']) details['parent'] = 'unknown' continue all_outcomes[ details['id'] ] = details dept = '' crs = '' for D in by_dept.keys(): for C in by_dept[D].keys(): if C == parent: dept = D crs = C #print(G['outcome']['title'] ) #print("Parent: " + parent + "\n\n") by_dept[dept][crs][G['outcome']['display_name']] = G f.write("\n\n+++++++++++ DEPT DETAILS\n\\n" + json.dumps(dept_details, indent=4)+"\n\n") f.write("\n\n+++++++++++ SLOS BY DEPT\n\\n" + json.dumps(by_dept, indent=4)+"\n\n") codecs.open('cache/slo/all_canvas_outcomes.json','w','utf-8').write( json.dumps(by_dept,indent=2)) codecs.open('cache/slo/canvas_outcomes_list.json','w','utf-8').write( json.dumps(all_outcomes,indent=2)) return by_dept, dept_details def x_ref_dept_names(): full_to_short_names = {} short_to_full = {} for L in open('cache/slo/dept_names.csv','r').read().split('\n'): parts = L.split(',') full_to_short_names[parts[1]] = parts[0] short_to_full[parts[0]] = parts[1] return full_to_short_names, short_to_full ## 2023 Updated Work call_num = 1 groups_queue = {} outcomes_queue = {} groups_raw_log = codecs.open('cache/slo/rawlog.txt','w','utf-8') def all_outcome_results_in_term(termid=''): terms = [171,172,174,176,178] for t in terms: print("\n\nTERM: ", str(t)) all_outcome_results_in_term_sub(str(t)) def all_outcome_results_in_term_sub(termid=''): if not termid: termid = str(getTerms(printme=1, ask=1)) courses = getCoursesInTerm(term=termid,get_fresh=1,show=1,active=0) log = codecs.open('cache/slo/assessed_slos_term_%s.txt' % str(termid),'w','utf-8') items = {} for C in courses: groups_raw_log.write(json.dumps(C,indent=2)) groups_raw_log.write("\n\n") print(C['id'],C['name']) res = fetch( url + '/api/v1/courses/%s/outcome_results' % str(C['id'])) items[C['id']] = res groups_raw_log.write(json.dumps(res,indent=2)) groups_raw_log.write("\n\n") groups_raw_log.flush() log.write(json.dumps(items,indent=2)) def all_linked_outcomes_in_term(termid=''): #terms = [172,174,176,178] #for t in terms: # all_linked_outcomes_in_term_sub(str(t)) all_linked_outcomes_in_term_sub(TERM) def all_linked_outcomes_in_term_sub(termid=''): if not termid: termid = str(getTerms(printme=1, ask=1)) courses = getCoursesInTerm(term=termid,get_fresh=0,show=1,active=0) items = {} csvfile = codecs.open('cache/slo/linked_slos_term_%s_compact.csv' % str(termid),'w','utf-8') csvwriter = csv.writer(csvfile) csvwriter.writerow('courseid coursename ogid oid vendorguid points mastery assessed desc'.split(' ')) for C in courses: log_obj = {'course':C,'og':[],'outcomes':[],'subgroups':[]} items[C['id']] = log_obj groups_raw_log.write(json.dumps(C,indent=2)) groups_raw_log.write("\n\n") print(C['id'],C['name']) results_dict = fetch(url + '/api/v1/courses/%s/root_outcome_group' % str(C['id'])) log_obj['og'].append(results_dict) groups_raw_log.write(json.dumps(results_dict,indent=2)) groups_raw_log.write("\n\n") # these_outcomes = fetch(url + '/api/v1/accounts/1/outcome_groups/' + oc_group_id + '/outcomes?outcome_style=full') # these_outcomes = fetch(url + '/api/v1/accounts/1/outcome_groups/' + oc_group_id + '/outcomes?outcome_style=full') u1 = url + '/api/v1/courses/%s/outcome_groups/%s/outcomes' % (str(C['id']), str(results_dict['id'])) groups_raw_log.write("\n" + u1 + "\n") outcomes_list = fetch( u1 ) groups_raw_log.write(json.dumps(outcomes_list,indent=2)) groups_raw_log.write("\n\n") if 'errors' in outcomes_list: continue if len(outcomes_list): for oo in outcomes_list: outcome = fetch( url + '/api/v1/outcomes/%s' % str(oo['outcome']['id']) ) log_obj['outcomes'].append(outcome) csvwriter.writerow([C['id'], C['course_code'], results_dict['id'], outcome['id'], outcome['vendor_guid'], outcome['points_possible'], outcome['mastery_points'], outcome['assessed'], outcome['description']]) groups_raw_log.write(json.dumps(outcome,indent=2)) groups_raw_log.write("\n\n") #"/api/v1/courses/12714/outcome_groups/6631/subgroups" u2 = url + '/api/v1/courses/%s/outcome_groups/%s/subgroups' % (str(C['id']), str(results_dict['id'])) groups_raw_log.write("\n" + u2 + "\n") g2 = fetch( u2 ) log_obj['subgroups'].append(g2) groups_raw_log.write(json.dumps(g2,indent=2)) groups_raw_log.write("\n\n") for subgroup in g2: print(" doing subgroup id %s" % str(subgroup['id'])) u3 = url + '/api/v1/courses/%s/outcome_groups/%s/outcomes' % (str(C['id']), str(subgroup['id'])) groups_raw_log.write("\n" + u3 + "\n") outcomes_list = fetch( u3 ) log_obj['subgroups'].append(outcomes_list) groups_raw_log.write(json.dumps(outcomes_list,indent=2)) groups_raw_log.write("\n\n") if 'errors' in outcomes_list: continue if len(outcomes_list): for oo in outcomes_list: outcome = fetch( url + '/api/v1/outcomes/%s' % str(oo['outcome']['id']) ) log_obj['outcomes'].append(outcome) csvwriter.writerow([C['id'], C['course_code'], subgroup['id'], outcome['id'], outcome['vendor_guid'], outcome['points_possible'], outcome['mastery_points'], outcome['assessed'], outcome['description']]) groups_raw_log.write(json.dumps(outcome,indent=2)) groups_raw_log.write("\n\n") csvfile.flush() log = codecs.open('cache/slo/linked_slos_term_%s.txt' % str(termid),'w','utf-8') log.write(json.dumps(items,indent=2)) log.close() def assemblerow(g,parent=''): # prep it for csv output id = '-1' pid = '-1' ctype = '' title = '' dname = '' desc = '' guid = '' if 'id' in g: id = g['id'] if 'parent_outcome_group' in g: pid = g['parent_outcome_group'] elif parent: pid = parent ctype = 'outcome' if 'title' in g: title = g['title'] if 'context_type' in g: ctype = g['context_type'] if 'display_name' in g: dname = g['display_name'] if 'description' in g: desc = g['description'] if 'vendor_guid' in g: guid = g['vendor_guid'] return [id, pid, guid, ctype, title, dname, desc] #g_url = url + '/api/v1/global/root_outcome_group' #g_url = url + '/api/v1/accounts/1/outcome_groups' def recur_full_fetch(out,g,parent=""): global call_num my_call_num = call_num call_num += 1 print("Start call # %i" % my_call_num) groups_raw_log.write(json.dumps(g,indent=2)) groups_raw_log.write("\n\n") row = assemblerow(g,parent) print(row) out.writerow(row) if "subgroups_url" in g: print('Subgroups: ' + g['subgroups_url']) oo = fetch(url+g["subgroups_url"]) for S in oo: subgroup = fetch(url+S['url']) print(" parent: ", row[0], " sub ",S) recur_full_fetch(out,S,parent=row[0]) if "outcomes_url" in g: print('Outcomes: ' + g['outcomes_url']) oo = fetch(url+g["outcomes_url"]) groups_raw_log.write(json.dumps(oo,indent=2)) groups_raw_log.write("\n\n") for S in oo: outcome = fetch(url+S['outcome']['url']) print(" otc ",S) recur_full_fetch(out,outcome,parent=row[0]) print("Finished call # %i" % my_call_num) print() # some curriqunet course versions don't have outcomes, and some versions # should be used even though they are historical. Given the course code (ACCT101) # return the appropriate cq course version. def find_cq_course_version(code): ranks = csv.reader( codecs.open('cache/courses/all_courses_ranked.csv','r','utf-8') ) header = next(ranks) historical_list = [] for row in ranks: r = {header[i]: row[i] for i in range(len(header))} if r['code'] == code: if int(r['numoutcomes']) == 0: continue if r['coursestatus'] == 'Historical': historical_list.append(r) continue if r['coursestatus'] == 'Active': return r['cqcourseid'] if len(historical_list): return historical_list[0]['cqcourseid'] return 0 def outcome_groups(): csvfile = codecs.open('cache/slo/ilearn_outcomes_and_groups.csv','w','utf-8') csvwriter = csv.writer(csvfile) csvwriter.writerow('id parentid guid type title displayname desc'.split(' ')) print("Loading account level outcomes..." ) g_url = url + '/api/v1/accounts/1/outcome_groups/1' print('fetching: %s' % g_url) g = fetch(g_url,1) recur_full_fetch(csvwriter,g) def summary_string(s): parts = s.split(" ") return ' '.join(parts[:4]) + "..." """ def add_outcomes_course_id(canvas_id): (dept,code,crn) = code_from_ilearn_name(C['name']) if dept == 0: non_matches.append(C['name']) continue cq_code = find_cq_course_version(code) if cq_code: # in cq_codes: """ def add_outcomes_course_code(): courses = "CMUN129 CSIS74 CSIS75 CSIS76 CSIS77 CSIS80 CSIS107 CSIS1 CSIS2 CSIS8".split(' ') courses = "CSIS26 CSIS28 CSIS121 CSIS129 CSIS179 CSIS186".split(' ') for c in courses: print("Adding outcomes to course: ", c) add_outcomes_course_code_sub(c) def add_outcomes_course_code_sub(target_code='AJ184',term=178,fresh=0): courses = getCoursesInTerm(term,get_fresh=fresh) cq_course_code = 0 for C in courses: (dept,code,crn) = code_from_ilearn_name(C['name']) if dept == 0: continue if code != target_code: continue cq_course_code = find_cq_course_version(code) if cq_course_code: outcomes = [] ilearn_course_id = C['id'] print("Using cq course id: %s and ilearn course id: %s." % (str(cq_course_code),str(ilearn_course_id))) f = codecs.open('cache/courses/alloutcomes.csv','r','utf-8') r = csv.reader(f) header = next(r) for row in r: if row[1] == cq_course_code: row_dict = {header[i]: row[i] for i in range(len(header))} outcomes.append(row_dict) print(" Got ", len(outcomes), " outcomes") quick_add_course_outcomes(ilearn_course_id, outcomes) else: print("Didn't find course %s in term %s.", (target_code,str(term))) return 0 def add_csis_sp22(): search = find_cq_course_version('CSIS6') # ENGL1B') #'CSIS6' print(search) return outcomes = [] ilearn_course_id = '14853' f = codecs.open('cache/courses/all_active_outcomes.csv','r','utf-8') r = csv.reader(f) header = next(r) for row in r: if row[0] == search: print(row) row_dict = {header[i]: row[i] for i in range(len(header))} outcomes.append(row_dict) print(row_dict) quick_add_course_outcomes(ilearn_course_id, outcomes) def quick_add_course_outcomes(ilearn_course_id, cq_outcome_id_list): print(" Fetching course id %s..." % str(ilearn_course_id)) course_root_og = fetch(url + '/api/v1/courses/%s/root_outcome_group' % str(ilearn_course_id)) course_og_id = course_root_og['id'] for o in cq_outcome_id_list: parameters = [ ("title", summary_string(o['outcome'])) , ("display_name", summary_string(o['outcome'])), ("description", o['outcome']), ("mastery_points", 2), ("ratings[][description]", 'Exceeds Expectations'), ("ratings[][points]", 3), ("ratings[][description]", 'Meets Expectations'), ("ratings[][points]", 2), ("ratings[][description]", 'Partially Meets Expectations'), ("ratings[][points]", 1), ("ratings[][description]", 'Does Not Meet Expectations'), ("ratings[][points]", 0), ("vendor_guid", o['cqoutcomeid'])] t = url + '/api/v1/courses/%s/outcome_groups/%s/outcomes' % (str(ilearn_course_id), str(course_og_id)) r = requests.post(t,data=parameters, headers=header) result = json.loads(r.text) new_outcome_id = result['outcome']['id'] print(" Added outcome: ", o['outcome']) print() def stringpad(s,n): if len(s) >= n: return " " + s[:n-1] pad = n - len(s) return " "*pad + s def code_from_ilearn_name(n,verbose=0): parts = n.split(" ") code = parts[0] crn = parts[-1] dept = '' num = '' v = verbose ### CUSTOM MATCHES ### customs = [ ('HUM/MCTV AUDIO / CINEMA / MOTION PRODUCTION FA22', ("HUM","HUM25",crn)), ('HUM25/MCTV6/MCTV26', ("HUM","HUM25",crn)), ('MUS4B-5D', ("MUS","MUS4B",crn)), ('MUS4', ("MUS","MUS4B",crn)), ('MUS4/5', ("MUS","MUS4B",crn)), ('KIN64', ("KIN","KIN64A",crn)), ('KIN24', ("KIN","KIN24A",crn)), ('ART/CD25A/B', ("ART","ART25A",crn)), ('ENGR10A', ("ENGR","ENGR10",crn)), ] for c in customs: if n == c[0] or code == c[0]: if v: print("ilearn: ", stringpad(n, 35), " dept: ", stringpad(c[1][0],6), ' num: ', stringpad('',7), ' code: ', stringpad(c[1][1],11), " crn: ", stringpad(crn,9), " R: C", end='') return c[1] a = re.search('^([A-Z]+)(\d+[A-Z]?)$', code) if a: dept = a.group(1) num = a.group(2) if v: print("ilearn: ", stringpad(n, 35), " dept: ", stringpad(dept,6), ' num: ', stringpad(num,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 1", end='') return (dept,code,crn) # two depts, with nums on each a = re.search('^([A-Z]+)(\d+[A-Z]*)\/([A-Z]+)(\d+[A-Z]*)$', code) if a: dept1 = a.group(1) num1 = a.group(2) dept2 = a.group(3) num2 = a.group(4) code = dept1+num1 if v: print("ilearn: ", stringpad(n, 35), "*dept: ", stringpad(dept1,6), ' num: ', stringpad(num1,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 2", end='') return (dept1,code,crn) # two depts, same num, two letters "ART/CD25AB a = re.search('^([A-Z]+)\/([A-Z]+)(\d+)([A-Z])([A-Z])$', code) if a: dept1 = a.group(1) dept2 = a.group(2) num1 = a.group(3)+a.group(4) num2 = a.group(3)+a.group(5) code = dept1+num1 if v: print("ilearn: ", stringpad(n, 35), "*dept: ", stringpad(dept1,6), ' num: ', stringpad(num1,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 8", end='') return (dept1,code,crn) # two depts, same num a = re.search('^([A-Z]+)\/([A-Z]+)(\d+[A-Z]*)$', code) if a: dept1 = a.group(1) dept2 = a.group(2) num1 = a.group(3) code = dept1+num1 if v: print("ilearn: ", stringpad(n, 35), "*dept: ", stringpad(dept1,6), ' num: ', stringpad(num1,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 3", end='') return (dept1,code,crn) # three depts, same num a = re.search('^([A-Z]+)\/([A-Z]+)\/([A-Z]+)(\d+[A-Z]*)$', code) if a: dept1 = a.group(1) dept2 = a.group(2) dept3 = a.group(3) num1 = a.group(4) code = dept1+num1 if v: print("ilearn: ", stringpad(n, 35), "*dept: ", stringpad(dept1,6), ' num: ', stringpad(num1,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 4", end='') return (dept1,code,crn) # one dept, two nums a = re.search('^([A-Z]+)(\d+[A-Z]*)\/(\d+[A-Z]*)$', code) if a: dept1 = a.group(1) num1 = a.group(2) num2 = a.group(3) code = dept1+num1 if v: print("ilearn: ", stringpad(n, 35), "*dept: ", stringpad(dept1,6), ' num: ', stringpad(num1,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 5", end='') return (dept1,code,crn) # A/B/C a = re.search('^([A-Z]+)(\d+[A-Z])([\/[A-Z]+)$', code) if a: dept1 = a.group(1) num1 = a.group(2) other = a.group(3) code = dept1+num1 if v: print("ilearn: ", stringpad(n, 35), "*dept: ", stringpad(dept1,6), ' num: ', stringpad(num1,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 6", end='') return (dept1,code,crn) # AB a = re.search('^([A-Z]+)(\d+)([A-Z])([A-Z])$', code) if a: dept1 = a.group(1) num1 = a.group(2)+a.group(3) num2 = a.group(2)+a.group(4) code = dept1+num1 if v: print("ilearn: ", stringpad(n, 35), "*dept: ", stringpad(dept1,6), ' num: ', stringpad(num1,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 7", end='') return (dept1,code,crn) # KIN71A-C a = re.search('^([A-Z]+)(\d+)([A-Z])\-([A-Z])$', code) if a: dept1 = a.group(1) num1 = a.group(2)+a.group(3) num2 = a.group(2)+a.group(4) code = dept1+num1 if v: print("ilearn: ", stringpad(n, 35), "*dept: ", stringpad(dept1,6), ' num: ', stringpad(num1,7), ' code: ', stringpad(code,11), " crn: ", stringpad(crn,9), " R: 7", end='') return (dept1,code,crn) return (0,0,0) def parse_ilearn_course_names_ALLSEMESTERS(): log = codecs.open('cache/log_ilearn_course_names_parsing.txt','w','utf-8') for t in [25,26,60,61,62,63,64,65,168,171,172,173,174,175,176,177,178]: parse_ilearn_course_names(str(t),1,log) def parse_ilearn_course_names(term=TERM,fresh=0,log=0): non_matches = [] courses = getCoursesInTerm(term,get_fresh=fresh) for C in courses: (dept,code,crn) = code_from_ilearn_name(C['name']) if dept == 0: non_matches.append(C['name']) continue cq_code = find_cq_course_version(code) if cq_code: # in cq_codes: line = f"{C['name']} is cq course id: {cq_code}" print(line) if log: log.write(line+"\n") else: print(f"{C['name']} - NO CQ MATCH") non_matches.append( [code,crn] ) print("Non matches:") for n in non_matches: print(n) print("can't figure out shortname for ", len(non_matches), " courses...") if __name__ == "__main__": print ('') options = { 1: ['all outcome results in a semester', all_outcome_results_in_term], 2: ['all linked outcomes/courses in a semester', all_linked_outcomes_in_term], 3: ['Main outcome show & modify for a semester', outcome_overview], 4: ['The outcome groups and links in iLearn', outcome_groups], 5: ['Outcome report #2 sample', outcome_report2], 6: ['fetch root outcome group', root_og], 7: ['recurisively fetch outcomes', recur_og], 8: ['get all outcome groups', all_og], 9: ['demo get outcomes', demo_o_fetch], 10: ['demo post outcomes to course', add_outcomes_course_code], 11: ['match ilearn courses to cq courses', parse_ilearn_course_names], } if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]()