# Outcomes 2023 # Tasks: # # - List all courses (in semester) in iLearn: # + SLOs associated with the course # + Whether they are current or inactive # + Whether they are attached to an assessment # + Whether and by how many students, they have been assessed # # - Fetch most current SLOs from Curricunet # + Assemble multiple versions of a (CQ) course and determine which semesters they apply to # + Whether they are present in the relevant classes in iLearn # + Insert SLO into course if not present # + Mark as inactive (change name) if necessary # # - Update shell with correct outcomes # - Issue: # + Course naming / sections joined... import concurrent.futures import pandas as pd from pipelines import fetch, url, header from outcomes import quick_add_course_outcomes, code_from_ilearn_name, all_linked_outcomes_in_term from courses import getCoursesInTerm, getCourses import codecs, json, sys, re, csv, requests, textwrap from path_dict import PathDict outputfile = '' csvwriter = '' TERM = 184 def escape_commas(s): if ',' in s: return '"' + s.replace('"', '""') + '"' else: return s def add_outcome_to_course(shell_id=''): if shell_id == '': shell_id = input("Enter shell id > ") course = getCourses(str(shell_id)) dept, code, crn = code_from_ilearn_name(course['name']) print(f"{dept} {code} {crn} for course named: {course['name']}") #xyz = input(f"Using: {code} for {course['name']}. Enter a different code, q to skip or press enter to continue > ") #if xyz == 'q': # return #if xyz != '': # code = xyz cq_course_id = find_recent_cqcourseid(code) oc = codecs.open('cache/courses/alloutcomes.csv','r','utf-8') reader = csv.reader(oc) cols = next(reader) # skip header # Filter rows matching the code rows = [row for row in reader if row[1] == cq_course_id] rows_dicts = [ {cols[i]: r[i] for i in range(len(cols))} for r in rows ] #abc = input(f"Using outcomes:\n{rows_dicts}\n\nPress enter to continue > ") quick_add_course_outcomes(shell_id, rows_dicts) def course_slo_getter(q): global outputfile, csvwriter (name,id) = q info = {'ilearnname':name,'ilearnid':id} print(" + Thread getting %s %s" % (str(name),str(id))) # Get GROUPS for a course u1 = url + "/api/v1/courses/%s/outcome_groups" % str(id) og_for_course = fetch(u1) if len(og_for_course): # There is a GROUP... for og in og_for_course: if "outcomes_url" in og: # There are OUTCOMES... outcomes = fetch(url + og["outcomes_url"]) og['outcomes'] = outcomes og['full_outcomes'] = {} for oo in outcomes: print(" -> " + url + oo['outcome']['url']) this_outcome = fetch( url + oo['outcome']['url'] ) og['full_outcomes'][this_outcome['id']] = this_outcome saveme = [name, this_outcome['assessed'], id, this_outcome['id'], this_outcome['points_possible'], this_outcome['title'], this_outcome['display_name'], this_outcome['description'], this_outcome['vendor_guid'] ] saveme2 = [escape_commas(str(x)) for x in saveme] csvwriter.writerow([id, name, og['id'], this_outcome['id'], this_outcome['vendor_guid'], this_outcome['points_possible'], this_outcome['mastery_points'], this_outcome['assessed'], this_outcome['description']]) outputfile.write(",".join(saveme2) + "\n") outputfile.flush() if type(og_for_course) == list: og_for_course.insert(0,info) else: og_for_course.update(info) print(" - Thread %s DONE" % str(id)) return og_for_course # I duplicate??? def outcomes_in_shell(course_id): print(f"Getting root outcome group for course id {course_id}") root_og = fetch(f"{url}/api/v1/courses/{course_id}/root_outcome_group") print(f"Getting outcomes") u1 =f"{url}/api/v1/courses/{course_id}/outcome_groups/{root_og['id']}/outcomes" outcomes_list = fetch( u1 ) the_outcomes = [] if 'errors' in outcomes_list: print(f"Error: {outcomes_list}") if len(outcomes_list): for oo in outcomes_list: print(f"Getting outcome id {oo['outcome']['id']}") outcome = fetch( url + '/api/v1/outcomes/%s' % str(oo['outcome']['id']) ) outcome['parent_group'] = root_og['id'] the_outcomes.append(outcome) u2 = f"{url}/api/v1/courses/{course_id}/outcome_groups/{root_og['id']}/subgroups" g2 = fetch( u2 ) for subgroup in g2: print("doing subgroup id %s" % str(subgroup['id'])) u3 = f"{url}/api/v1/courses/{course_id}/outcome_groups/{subgroup['id']}/outcomes" outcomes_list = fetch( u3 ) if 'errors' in outcomes_list: print(f"Error: {outcomes_list}") continue if len(outcomes_list): for oo in outcomes_list: outcome = fetch( f"{url}/api/v1/outcomes/{oo['outcome']['id']}" ) outcome['parent_group'] = subgroup['id'] the_outcomes.append(outcome) return root_og, the_outcomes, g2 def ilearn_shell_slo_to_csv(shell_slos): L = ['canvasid','name','crn','has_outcomes',] for i in range(1,11): L.append("o%i_id" % i) L.append("o%i_vendor_guid" % i) L.append("o%i_desc" % i) L.append("o%i_assd" % i) df = pd.DataFrame(columns=L) for S in shell_slos: try: short = S[0] this_crs = {'canvasid':short['ilearnid'], 'name':short['ilearnname'], 'has_outcomes':0, } if len(S)>1: full = S[1] this_crs['has_outcomes'] = 1 i = 1 for o in full['outcomes']: try: this_id = int(o['outcome']['id']) this_crs['o%i_id' % i] = o['outcome']['id'] except Exception as e: this_crs['o%i_id' % i] = '!' try: this_crs['o%i_desc' % i] = full['full_outcomes'][this_id]['description'] except Exception as e: this_crs['o%i_desc' % i] = '!' try: assessed = 0 if full['full_outcomes'][this_id]['assessed'] == 'True': assessed = 1 this_crs['o%i_assd' % i] = assessed except Exception as e: this_crs['o%i_assd' % i] = '!' try: this_crs['o%i_vendor_guid' % i] = full['full_outcomes'][this_id]['vendor_guid'] except Exception as e: this_crs['o%i_vendor_guid' % i] = '!' i += 1 df2 = pd.DataFrame(this_crs, columns = df.columns, index=[0]) df = pd.concat( [df, df2], ignore_index = True ) except Exception as e: print(f"*** Exception {e} with {S}\n\n") df.to_csv('cache/outcome.csv') print(df) def get_outcomes_term_index(): global outputfile, csvwriter NUM_THREADS = 20 get_fresh = 0 sem_courses = getCoursesInTerm(TERM,get_fresh) # shorter list for test? #sem_courses = sem_courses[:50] print("Got %i courses in current semester." % len(sem_courses)) outputfile = codecs.open(f'cache/slo/outcomes_bycourse_{TERM}.output.txt','w','utf-8') outputfile.write( "coursename,assessed,courseid,outcome_id,points,title,displayname,description,guid\n") csvfile = codecs.open(f'cache/slo/linked_slos_term_{TERM}_compact.csv','w','utf-8') csvwriter = csv.writer(csvfile) csvwriter.writerow('courseid coursename ogid oid vendorguid points mastery assessed desc'.split(' ')) raw_log = codecs.open('cache/outcome_raw_log.txt','w','utf-8') #raw_log.write( json.dumps(output,indent=2) ) output = [] with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as pool: futures = [] for C in sem_courses: print("Adding ", C['name'], C['id'], " to queue") futures.append( pool.submit(course_slo_getter, [C['name'], C['id']] ) ) for future in concurrent.futures.as_completed(futures): output.append(future.result()) print(future.result()) raw_log.write( json.dumps(future.result(),indent=2) + "\n" ) csvfile.close() ilearn_shell_slo_to_csv(output) def classify_shell(lines): # given a list of lines like this, determine status of shell # (from linked_slos_term_180_compact.csv) outcomes.py all_linked_outcomes_in_term() # # courseid,coursename,ogid,oid,vendorguid,points,mastery,assessed,desc # 16909,AH11 FA23 10003/10014/12251,10860,819,,5,3,False,Use scientific facts and principles to critically analyze nutrition information and use the information to assess personal diet and the diets of other cultures. # 16909,AH11 FA23 10003/10014/12251,10860,820,,5,3,False,Evaluate nutrition information for accuracy and reliability. # 16909,AH11 FA23 10003/10014/12251,10860,821,,5,3,False,Analyze and identify the relationship between nutrition and health. # 16909,AH11 FA23 10003/10014/12251,10860,822,,5,3,False,Differentiate among food habits and practices related to traditional foods and preparation techniques in selected cultures or religions. # 16909,AH11 FA23 10003/10014/12251,10860,823,,5,3,False,Analyze nutritional problems of selected cultures and create a nutritionally balanced menu. # # 1. number of outcomes # 2. points are correct (max=3,mastery=2) or incorrect (max=5,mastery=3) # 3. assessed or not course_status = {'outcome_count':0, 'id':0, 'name':'', 'assessed_count':0, 'points_ok':1} for L in lines: #print(L) #L = L.split(',') course_status['outcome_count'] += 1 course_status['id'] = L[0] course_status['name'] = L[1] outcome_status = {'courseid':L[0],'coursename':L[1],'ogid':L[2],'oid':L[3],'vendorguid':L[4],'points':L[5],'mastery':L[6],'assessed':L[7],'desc':L[8], 'pointscorrect':0} if L[5] == '5' and L[6] == '3': outcome_status['pointscorrect'] = 0 course_status['points_ok'] = 0 elif (L[5] == '3.0' or L[5] == '3') and L[6] == '2': outcome_status['pointscorrect'] = 1 else: outcome_status['pointscorrect'] = -1 if L[7] == 'True': course_status['assessed_count'] += 1 return course_status def find_recent_cqcourseid(code): # code example: CSIS42 with open('cache/courses/all_courses_ranked.csv', 'r') as f: reader = csv.reader(f) next(reader) # skip header # Filter rows matching the code rows = [row for row in reader if row[0] == code] print(f"All entries for {code}:\n{rows}") if not rows: raise ValueError(f"No rows found for code {code}") # Sort by 'termineffect', in descending order rows.sort(key=lambda row: row[3], reverse=True) # Return cqcourseid of the first row myrow = rows[0][1] print(f"Using: {myrow}") return myrow def remove_old_outcomes(course_id): root_og, current_outcomes, subgroups = outcomes_in_shell(course_id) print(f"Got {len(current_outcomes)} outcomes for course id {course_id}") print(f"Current outcomes:\n{json.dumps(current_outcomes,indent=2)}") # Try deleting them if 1: for deleted_outcome in current_outcomes: print(f"Deleting outcome id {deleted_outcome['id']}") u9 = f"{url}/api/v1/courses/{course_id}/outcome_groups/{deleted_outcome['parent_group']}/outcomes/{deleted_outcome['id']}" print(u9) # make the DELETE request (update with your actual access token) response = requests.delete(u9, headers=header) # check the status of the request if response.status_code == 200: print(' Delete operation was successful') else: print(' Failed to delete, response code:', response.status_code) print(' Response message:', response.text) def repair_outcome_points(course_id): # Compare to what Outcomes SHOULD be course = getCourses(course_id) dept, code, crn = code_from_ilearn_name(course['name']) xyz = input(f"Using: {code} for {course['name']}. Enter a different code or press enter to continue > ") if xyz != '': code = xyz cq_course_id = find_recent_cqcourseid(code) oc = codecs.open('cache/courses/alloutcomes.csv','r','utf-8') reader = csv.reader(oc) cols = next(reader) # skip header # Filter rows matching the code rows = [row for row in reader if row[1] == cq_course_id] rows_dicts = [ {cols[i]: r[i] for i in range(len(cols))} for r in rows ] abc = input(f"Using outcomes:\n{json.dumps(rows_dicts,indent=2)}\n\nPress enter to continue > ") return outcome_id = 0 data = { 'mastery_points': '2', 'calculation_method': 'decaying_average', 'calculation_int': '65', 'ratings[0][description]': 'Exceeds Expectations', 'ratings[0][points]': '3', 'ratings[1][description]': 'Meets Expectations', 'ratings[1][points]': '2', 'ratings[2][description]': 'Does Not Meet Expectations', 'ratings[2][points]': '0' } response = requests.put(f'{url}/api/v1/outcomes/{outcome_id}.json', headers=header, data=data) if response.status_code == 200: print(f"Successfully updated outcome with id {outcome_id}.") else: print(f"Failed to update outcome with id {outcome_id}. Error: {response.text}") def add_o_dept_dry_run(): add_o_dept(1) def add_o_whole_term(): course_groups = full_term_overview(0) dept_shells_to_add = [ a for a in course_groups['no outcomes'] ] sorted_dept_shells_to_add = sorted(dept_shells_to_add, key=lambda x: f"{x['dept']}{x['code']}") print(f"Adding to {len(sorted_dept_shells_to_add)} shells.") for shell in sorted_dept_shells_to_add: print(f"Adding outcomes to {shell['name']}") try: add_outcome_to_course(shell['id']) except Exception as e: print(f"Failed on {shell['id']}: {e}") def add_o_dept(dry_run=0): d = input("Enter dept or deps separated with a space > ") d_list = d.split(' ') course_groups = full_term_overview(0) dept_shells_to_add = [ a for a in course_groups['no outcomes'] if a['dept'] in d_list ] sorted_dept_shells_to_add = sorted(dept_shells_to_add, key=lambda x: f"{x['dept']}{x['code']}") print(f"Adding to {len(sorted_dept_shells_to_add)} shells.") for shell in sorted_dept_shells_to_add: print(f"Adding outcomes to {shell['name']}") if not dry_run: try: add_outcome_to_course(shell['id']) except Exception as e: print(f"Failed on {shell['id']}: {e}") else: print(" Dry run, not adding") def remove_all_bad_points(): course_groups = full_term_overview(0) dept_shells_to_zap = [ a for a in course_groups['fix_points'] ] for shell in dept_shells_to_zap: print(f"Removing outcomes from {shell['name']}") remove_old_outcomes(shell['id']) def full_term_overview(verbose=1): out2 = codecs.open(f'cache/slo/slo_status_{TERM}.json','w','utf-8') out3 = codecs.open(f'cache/slo/slo_status_{TERM}.csv','w','utf-8') csv_fields = 'outcome_count,id,name,dept,code,crn,assessed_count,points_ok'.split(',') fn1 = f"cache/courses_in_term_{TERM}.json" all_courses = json.loads(codecs.open(fn1,'r','utf-8').read()) all_courses_status = {} # default values for all courses for C in all_courses: dept,code,crn = code_from_ilearn_name(C['name']) all_courses_status[str(C['id'])] = {'outcome_count':0, 'id':C['id'], 'name':C['name'], 'dept':dept, 'code':code, 'crn':crn, 'assessed_count':0, 'points_ok':1} # read the existing ilearn outcomes and group by shell filename = f"cache/slo/linked_slos_term_{TERM}_compact.csv" with open(filename, 'r') as csvfile: reader = csv.reader(csvfile) next(reader) # skip header # Read the rows into a list rows = list(reader) # Sort the rows based on a specific column (e.g., column 0) sorted_rows = sorted(rows, key=lambda x: x[0]) groups = [] current_group = [] last_courseid = None for row in sorted_rows: courseid = row[0] if last_courseid != courseid and current_group: # courseid changed from last row to current row groups.append(current_group) current_group = [] current_group.append(row) last_courseid = courseid # append the last group if any if current_group: groups.append(current_group) for g in groups: classified = classify_shell(g) dept,code,crn = code_from_ilearn_name(g[0][1]) classified['dept'] = dept classified['code'] = code classified['crn'] = crn all_courses_status[str(classified['id'])] = classified #for C in all_courses_status: # print(all_courses_status[C]) course_groups = { 'no outcomes': [], 'ok': [], 'fix_points_and_scores': [], 'fix_points': [] } for d in all_courses_status.values(): outcome_count = d['outcome_count'] points_ok = d['points_ok'] assessed_count = d['assessed_count'] if outcome_count == 0: course_groups['no outcomes'].append(d) elif points_ok == 1 and assessed_count > 0: course_groups['fix_points_and_scores'].append(d) elif points_ok == 0 and assessed_count > 0: course_groups['fix_points_and_scores'].append(d) elif points_ok == 1: course_groups['ok'].append(d) elif points_ok == 0: course_groups['fix_points'].append(d) # Print out the groups out2.write(json.dumps(course_groups,indent=2)) if verbose: cwriter = csv.DictWriter(out3,fieldnames=csv_fields) cwriter.writeheader() for group, dicts in course_groups.items(): sorted_dicts = sorted(dicts, key=lambda x: f"{x['dept']}{x['code']}") print(f"{group} - {len(sorted_dicts)} item(s)") cwriter.writerows(sorted_dicts) # out3.write(f"{group} - {len(sorted_dicts)} item(s)\n") for d in sorted_dicts: print(d) #out3.write(str(d) + "\n") print("\n") out3.write("\n") return course_groups def fetch_term_outcomes_and_report(): get_outcomes_term_index() full_term_overview() if __name__ == "__main__": options = { 1: ['Refresh term outcome list & report', fetch_term_outcomes_and_report], 2: ['Add outcomes to unset courses in whole term', add_o_whole_term], 3: ['Add outcomes to course id', add_outcome_to_course], 4: ['Fix outcome points', remove_old_outcomes], 5: ['Add outcomes to dept, dry run', add_o_dept_dry_run], 6: ['Add outcomes to dept', add_o_dept], 7: ['Remove all outcomes with wrong points', remove_all_bad_points], } print ('') if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]()