# statistics """ ## Investigate: Success rates (grades) of students in: - online courses (over all) - sync and async and online live - teachers/courses that have passed POCR (are all async?) - teachers that have done more than the minimum training in online teaching - in person classes, if grades are available ## Data collection - Choose how many semesters (10?) - Script 1 - given a CRN and Semester, download all grades - Check if grades were used and make sense - Compute mean, % > 70, median, etc. - Script 2 - given all semester schedules, generate lists of: - CRNs which are online, online live, hybrid, inperson, excluded - CRNs in which teacher and course have passed pocr (and semester is greater than their pass date) - CRNs in which teacher passed pocr for a different course (and semester is greater than their pass date) - CRNs to exclude, for example SP20, because of covid. Possibly SU20 and FA20 - CRNs in which teacher has done more than the minimum training in online teaching - Next steps: generate the x-reference for what categories teachers are in, and integrate into the main data file. ## Hypothesis Testing - """ def num(s): if s == '': return 0 try: return int(s) except ValueError: return float(s) import json, csv, requests, sys, re from multiprocessing import Semaphore from statistics import mean, median, stdev from pipelines import fetch, url from courses import getCoursesInTerm from collections import defaultdict all_grades_file = f"cache/grades_all.csv" all_courses_file = f"cache/course_grades_all.csv" def sem_num_to_code(sem_num): p = re.search(r'^(\d\d\d\d)(\d\d)$', sem_num) if p: yr = p.group(1)[2:4] sem = p.group(2) lookup = {'10':'wi','30':'sp', '50':'su', '70':'fa'} return f"{lookup[sem]}{yr}" return "" def sem_code_to_num(sem_code): # fa23 p = re.search(r'^([a-z]{2})(\d\d)$', sem_code) if p: s = p.group(1) y = p.group(2) lookup = {'wi':'10','sp':'30', 'su':'50', 'fa':'70'} return f"20{y}{lookup[s]}" return "" def codetest(): sems = '202330 202310 202270 202250 202230 202210 202170 202150 202130 202070 202050 202030 202010 201970 201950 201930 201910 201870 201850 201830'.split(' ') codes = 'fa21 wi22 sp23 su23 fa23 wi24'.split(' ') for s in sems: print("{}: {}".format(s, sem_num_to_code(s))) for c in codes: print("{}: {}".format(c, sem_code_to_num(c))) def get_all(): terms = '178 177 176 175 174 173 172 171 168 65 64 62 63 61 60 25 26 23 22 21'.split(' ') sems = '202330 202310 202270 202250 202230 202210 202170 202150 202130 202070 202050 202030 202010 201970 201950 201930 201910 201870 201850 201830'.split(' ') # Save grades to a CSV file with open(all_grades_file, "w", newline="") as csvfile: writer = csv.writer(csvfile) writer.writerow(["crn", "sem", "coursecode", "s_can_id","g","name", "current", "final"]) for (term,sem) in zip(terms,sems): print(term,sem,"\n") courses = getCoursesInTerm(term,get_fresh=0,show=0,active=1) for c in courses: print(c['name']) c_code = c['course_code'] grades(writer, sem, c['id'], c_code) csvfile.flush() def grades(writer, sem, COURSE_ID, course_code): params = { "include[]": ["enrollments", "current_grading_period_scores"] } grades = fetch(url + f"/api/v1/courses/{COURSE_ID}/users",0, params) #grades = json.loads(grades.text) for student in grades: try: id = student["id"] name = student["name"] g = student["login_id"] print("\t", name) if student['enrollments'][0]['type'] == 'StudentEnrollment': grade = student["enrollments"][0]["grades"]["final_score"] current = student["enrollments"][0]["grades"]["current_score"] writer.writerow([COURSE_ID, sem, course_code, id, g, name, current, grade]) except Exception as e: print("Exception:", e) schedules = {} import codecs, os def load_schedules(): global schedules if not schedules: for f in os.listdir('cache/schedule'): m = re.search(r'(\w\w\d\d)_sched_expanded\.json', f) if m: sem = m.group(1) schedules[sem] = json.loads( codecs.open('cache/schedule/' + f, 'r', 'utf-8').read() ) def to_crn_fallback(name): #print(name) name = name.lower() try: m1 = re.search(r'(\d\d\d\d\d)',name) if m1: crn = m1.group(1) else: return None,None m2 = re.search(r'([wispufa][wispufa]\d\d)',name.lower()) if m2: sem = m2.group(1) else: return None, None #print(name, crn, sem) return crn, sem except Exception as e: #print("Exception: ", e, name) return None, None def ilearn_name_to_course_code(iname): parts = iname.split(' ') code = parts[0] return code def short_name_to_crn(name): #print(name) try: parts = name.split(' ') code = parts[0] sem = parts[1] crn = parts[2] m_sem = re.search(r'^(\w\w\d\d)$',sem) if not m_sem: return to_crn_fallback(name) m = re.search(r'^(\d\d\d\d\d)$',crn) if m: return crn,sem else: crn_parts = crn.split('/') m = re.search(r'^(\d\d\d\d\d)$',crn_parts[0]) if m: return crn_parts[0],sem #print("non standard course short name: ", code, sem, crn) return to_crn_fallback(name) except Exception as e: #print("Exception: ", e, name) return to_crn_fallback(name) def fixname(n): return re.sub(r'\s+',' ', n).strip() def short_name_to_teacher_type_crn_sem(name): load_schedules() crn, sem = short_name_to_crn(name) try: if sem: sem = sem.lower() if sem[0:2]=='wi': sem = 'sp' + sem[2:] for course in schedules[sem]: if course['crn'] == crn: return fixname(course['teacher']), course['type'], crn, sem except Exception as e: return None, None, None, None return None, None, None, None pocrs = {} def load_pocrs(): global pocrs if not pocrs: with open('cache/pocr_passed.csv') as csvfile: csvreader = csv.reader(csvfile) next(csvreader) for row in csvreader: pocrs[row[0] + " " + row[1]] = row[2] return pocrs def lookup_pocr(teacher,course,sem): p = load_pocrs() pcode = teacher + " " + course if pcode in p: sem_passed = sem_code_to_num(p[pcode]) sem_test = sem_code_to_num(sem) if sem_passed < sem_test: return True return False def nametest(): with open(all_courses_file) as csvfile: csvreader = csv.reader(csvfile) next(csvreader) for row in csvreader: print(row[0], "-", short_name_to_teacher_type_crn_sem(row[0])) next(csvreader) def above_70(li,maximum): cutoff = 0.7 * maximum above = list(filter(lambda x: x >= cutoff, li)) return round(len(above)/len(li), 3) def process_one_course_grades(block, output, teacher_to_code, course_to_code): fxns = [mean, median, stdev, min, max, len] c_id = block[0][0] sem = block[0][1] course_code = block[0][2] cur_scores = [num(x[6]) for x in block] final_scores = [num(x[7]) for x in block] teacher, mode, crn, sem2 = short_name_to_teacher_type_crn_sem(course_code) if not teacher: return tch_code = teacher_to_code[teacher] crs_code = course_to_code[course_code] if len(final_scores) < 2: return try: (cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count) = [round(f(cur_scores)) for f in fxns] (final_mean, final_median, final_stdev, final_min, final_max, final_count) = [round(f(final_scores)) for f in fxns] cur_pct_passed = above_70(cur_scores, cur_max) final_pct_passed = above_70(final_scores, final_max) if final_max == 0: return scaled_final_scores = [ x / final_max for x in final_scores] (scl_mean, scl_median, scl_stdev, scl_min, scl_max, scl_count) = [round(f(scaled_final_scores),2) for f in fxns] good_code = ilearn_name_to_course_code(course_code) pocr = 1 if lookup_pocr(teacher, good_code, sem2) else 0 #print("Course % > 70 mean median stdev min max count") #print("{:>12} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {:6d} ".format(course_code, cur_pct_passed, cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count)) #print("{:>12} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {:6d} ".format(course_code, final_pct_passed, final_mean, final_median, final_stdev, final_min, final_max, final_count)) #print() #output.writerow( [sem2, crn, course_code, "current score", teacher, mode, cur_pct_passed, cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count] ) #output.writerow( [crs_code, pocr, tch_code, mode, final_pct_passed, final_mean, final_median, final_stdev, final_min, final_max, scl_mean, scl_median, scl_stdev, scl_min, scl_max, final_count] ) output.writerow( [crs_code, good_code, pocr, tch_code, mode, final_pct_passed, scl_mean, scl_median, scl_stdev, scl_min, scl_max, final_count] ) except Exception as e: print("Exception:", e) def process_grades(): # first loop to get all names courses_labeled = {} teacher_to_code = {} code_to_teacher = {} course_to_code = {} code_to_course = {} index = 1001 crs_index = 4001 with open(all_grades_file, newline="") as csvfile: csvreader = csv.reader(csvfile) next(csvreader) for row in csvreader: crn_sem = row[0] + '_' + row[1] if not crn_sem in courses_labeled: teacher, mode, crn, sem2 = short_name_to_teacher_type_crn_sem(row[2]) courses_labeled[crn_sem] = teacher if not row[2] in course_to_code: course_to_code[row[2]] = crs_index code_to_course[crs_index] = row[2] crs_index += 1 if teacher: if not teacher in teacher_to_code: teacher_to_code[teacher] = index code_to_teacher[index] = teacher index += 1 codecs.open('cache/teacher_lookup_codes.json','w','utf-8').write( json.dumps( [teacher_to_code, code_to_teacher], indent=2) ) codecs.open('cache/course_lookup_codes.json','w','utf-8').write( json.dumps( [course_to_code, code_to_course], indent=2) ) with open(all_courses_file, "w", newline="") as output_f: output = csv.writer(output_f) output.writerow("course_code course pocr_status teacher_code mode percent_passed scl_mean scl_median scl_stdev scl_min scl_max count".split(" ")) with open(all_grades_file, newline="") as csvfile: csvreader = csv.reader(csvfile) block = [] current_index = None next(csvreader) for row in csvreader: index = row[0] if index != current_index: if block: process_one_course_grades(block, output, teacher_to_code, course_to_code) block = [] current_index = index block.append(row) if block: process_one_course_grades(block, output, teacher_to_code, course_to_code) if __name__ == "__main__": options = { 1: ['get all historical grades from ilearn',get_all] , 2: ['process grades csv file',process_grades] , 3: ['test shortname parse',nametest] , 4: ['test sem codes',codetest] , } print ('') if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]()