# statistics """ ## Investigate: Success rates (grades) of students in: - online courses (over all) - sync and async and online live - teachers/courses that have passed POCR (are all async?) - teachers that have done more than the minimum training in online teaching - in person classes, if grades are available ## Data collection - Choose how many semesters (10?) - Script 1 - given a CRN and Semester, download all grades - Check if grades were used and make sense - Compute mean, % > 70, median, etc. - Anonymization steps - replace teacher names w/ id number - replace student names w/ id number - replace course names w/ course code - Script 2 - given all semester schedules, generate lists of: - CRNs which are online, online live, hybrid, inperson, excluded - CRNs in which teacher and course have passed pocr (and semester is greater than their pass date) - CRNs in which teacher passed pocr for a different course (and semester is greater than their pass date) - CRNs to exclude, for example SP20, because of covid. Possibly SU20 and FA20 - CRNs with are POCR approved - CRNs in which teacher has done more than the minimum training in online teaching - Student ids which have participated in the online orientation over a certain threshold - Next steps: generate the x-reference for what categories teachers are in, and integrate into the main data file. ## Hypothesis Testing - """ import codecs, os import json, csv, requests, sys, re from multiprocessing import Semaphore from statistics import mean, median, stdev from pipelines import fetch, url from courses import getCoursesInTerm, course_enrollment from localcache import get_course_enrollments from collections import defaultdict all_grades_file = f"cache/grades_all.csv" all_courses_file = f"cache/course_grades_all.csv" all_courses_file2 = f"cache/course_grades_compact.csv" all_courses_file3 = f"cache/course_grades_full.csv" student_orientation_participation = f'cache/participation_orientation_courses.json' def num(s): if s == '': return 0 try: return int(s) except ValueError: return float(s) def sem_num_to_code(sem_num): p = re.search(r'^(\d\d\d\d)(\d\d)$', sem_num) if p: yr = p.group(1)[2:4] sem = p.group(2) lookup = {'10':'wi','30':'sp', '50':'su', '70':'fa'} return f"{lookup[sem]}{yr}" return "" def sem_code_to_num(sem_code): # fa23 p = re.search(r'^([a-z]{2})(\d\d)$', sem_code) if p: s = p.group(1) y = p.group(2) lookup = {'wi':'10','sp':'30', 'su':'50', 'fa':'70'} return f"20{y}{lookup[s]}" return "" def codetest(): sems = '202330 202310 202270 202250 202230 202210 202170 202150 202130 202070 202050 202030 202010 201970 201950 201930 201910 201870 201850 201830'.split(' ') codes = 'fa21 wi22 sp23 su23 fa23 wi24'.split(' ') for s in sems: print("{}: {}".format(s, sem_num_to_code(s))) for c in codes: print("{}: {}".format(c, sem_code_to_num(c))) def get_all(): terms = '178 177 176 175 174 173 172 171 168 65 64 62 63 61 60 25 26 23 22 21'.split(' ') sems = '202330 202310 202270 202250 202230 202210 202170 202150 202130 202070 202050 202030 202010 201970 201950 201930 201910 201870 201850 201830'.split(' ') # Save grades to a CSV file with open(all_grades_file, "w", newline="") as csvfile: writer = csv.writer(csvfile) writer.writerow(["crn", "sem", "coursecode", "s_can_id","g","name", "current", "final"]) for (term,sem) in zip(terms,sems): print(term,sem,"\n") courses = getCoursesInTerm(term,get_fresh=0,show=0,active=1) for c in courses: print(c['name']) c_code = c['course_code'] grades(writer, sem, c['id'], c_code) csvfile.flush() def grades(writer, sem, COURSE_ID, course_code): params = { "include[]": ["enrollments", "current_grading_period_scores"] } grades = fetch(url + f"/api/v1/courses/{COURSE_ID}/users",0, params) #grades = json.loads(grades.text) for student in grades: try: id = student["id"] name = student["name"] g = student["login_id"] print("\t", name) if student['enrollments'][0]['type'] == 'StudentEnrollment': grade = student["enrollments"][0]["grades"]["final_score"] current = student["enrollments"][0]["grades"]["current_score"] writer.writerow([COURSE_ID, sem, course_code, id, g, name, current, grade]) except Exception as e: print("Exception:", e) def get_student_orientations(): courses = {'iLearn Student Orientation 2022':'9768', # 8170 students 'Kickstart Online Orientation - Transfer':'36', # 6149 'Kickstart Online Orientation - New to College':'35', # 5392 'LIB732 SP18':'3295', # 2193 'LIB732 FA17':'2037', # 1868 'LIB732 SP17':'69', # 1645 'Kickstart Online Orientation - Returning':'37', # 1463 'iLearn Student Orientation 2023':'15924', # 1292 'LIB732 SU17':'1439' # 1281 } views_bycourse = {} all_student_ids = set() # get pageviews of each orientation course for c,i in courses.items(): print(c) cache_file_name = f'cache/participation_course_{i}.json' student_ids = [x[1] for x in get_course_enrollments(i)] all_student_ids.update(student_ids) if os.path.exists(cache_file_name): pv = json.loads(codecs.open(cache_file_name,'r','utf-8').read()) else: pv = get_student_page_views(i, student_ids) codecs.open(cache_file_name,'w','utf-8').write(json.dumps(pv,indent=2)) views_bycourse[i] = pv # add up pageviews for each student views_bystudent = {} for student_id in all_student_ids: views_bystudent[student_id] = sum([views_bycourse[i].get(student_id,0) for i in courses.values()]) codecs.open(student_orientation_participation,'w','utf-8').write(json.dumps(views_bystudent,indent=2)) def get_student_page_views(course_id, student_ids): page_views = {} verbose = 0 for student_id in student_ids: a = f'/api/v1/courses/{course_id}/analytics/users/{student_id}/activity' response = fetch(url + a, verbose) page_views[student_id] = sum(response.get('page_views', {}).values()) if verbose: print(page_views) return page_views schedules = {} orientations = {} def load_schedules(): global schedules if not schedules: for f in os.listdir('cache/schedule'): m = re.search(r'(\w\w\d\d)_sched_expanded\.json', f) if m: sem = m.group(1) schedules[sem] = json.loads( codecs.open('cache/schedule/' + f, 'r', 'utf-8').read() ) def load_orientations(): global orientations if not orientations: orientations = json.loads( codecs.open(student_orientation_participation,'r','utf-8').read() ) return orientations def to_crn_fallback(name): #print(name) name = name.lower() try: m1 = re.search(r'(\d\d\d\d\d)',name) if m1: crn = m1.group(1) else: return None,None m2 = re.search(r'([wispufa][wispufa]\d\d)',name.lower()) if m2: sem = m2.group(1) else: return None, None #print(name, crn, sem) return crn, sem except Exception as e: #print("Exception: ", e, name) return None, None def ilearn_name_to_course_code(iname): parts = iname.split(' ') code = parts[0] return code def short_name_to_crn(name): #print(name) try: parts = name.split(' ') code = parts[0] sem = parts[1] crn = parts[2] m_sem = re.search(r'^(\w\w\d\d)$',sem) if not m_sem: return to_crn_fallback(name) m = re.search(r'^(\d\d\d\d\d)$',crn) if m: return crn,sem else: crn_parts = crn.split('/') m = re.search(r'^(\d\d\d\d\d)$',crn_parts[0]) if m: return crn_parts[0],sem #print("non standard course short name: ", code, sem, crn) return to_crn_fallback(name) except Exception as e: #print("Exception: ", e, name) return to_crn_fallback(name) def fixname(n): return re.sub(r'\s+',' ', n).strip() def short_name_to_teacher_type_crn_sem(name): load_schedules() crn, sem = short_name_to_crn(name) try: if sem: sem = sem.lower() if sem[0:2]=='wi': sem = 'sp' + sem[2:] for course in schedules[sem]: if course['crn'] == crn: return fixname(course['teacher']), course['type'], crn, sem except Exception as e: return None, None, None, None return None, None, None, None pocrs = {} def load_pocrs(): global pocrs if not pocrs: with open('cache/pocr_passed.csv') as csvfile: csvreader = csv.reader(csvfile) next(csvreader) for row in csvreader: pocrs[row[0] + " " + row[1]] = row[2] return pocrs def lookup_pocr(teacher,course,sem): p = load_pocrs() pcode = teacher + " " + course if pcode in p: sem_passed = sem_code_to_num(p[pcode]) sem_test = sem_code_to_num(sem) if sem_passed < sem_test: return True return False def nametest(): with open(all_courses_file) as csvfile: csvreader = csv.reader(csvfile) next(csvreader) for row in csvreader: print(row[0], "-", short_name_to_teacher_type_crn_sem(row[0])) next(csvreader) def above_70(li,maximum): cutoff = 0.7 * maximum above = list(filter(lambda x: x >= cutoff, li)) return round(len(above)/len(li), 3) # v1, does a row of averages for each course def process_one_course_grades(block, output, out_c, teacher_to_code, course_to_code): fxns = [mean, median, stdev, min, max, len] c_id = block[0][0] sem = block[0][1] course_code = block[0][2] cur_scores = [num(x[6]) for x in block] final_scores = [num(x[7]) for x in block] teacher, mode, crn, sem2 = short_name_to_teacher_type_crn_sem(course_code) if not teacher: return tch_code = teacher_to_code[teacher] crs_code = course_to_code[course_code] if len(final_scores) < 2: return try: (cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count) = [round(f(cur_scores)) for f in fxns] (final_mean, final_median, final_stdev, final_min, final_max, final_count) = [round(f(final_scores)) for f in fxns] cur_pct_passed = above_70(cur_scores, cur_max) final_pct_passed = above_70(final_scores, final_max) if final_max == 0: return scaled_final_scores = [ x / final_max for x in final_scores] (scl_mean, scl_median, scl_stdev, scl_min, scl_max, scl_count) = [round(f(scaled_final_scores),2) for f in fxns] good_code = ilearn_name_to_course_code(course_code) pocr = 1 if lookup_pocr(teacher, good_code, sem2) else 0 output.writerow( [crs_code, good_code, pocr, tch_code, mode, final_pct_passed, scl_mean, scl_median, scl_stdev, scl_min, scl_max, final_count] ) out_c.writerow([crs_code, good_code, pocr, tch_code, mode, final_pct_passed, scl_mean, scl_median, scl_stdev, final_count]) except Exception as e: print("Exception:", e) # v2, one line per student/course def process_one_course_grades_full(block, out_f, teacher_to_code, course_to_code): fxns = [mean, median, stdev, min, max, len] c_id = block[0][0] sem = block[0][1] course_code = block[0][2] cur_scores = [num(x[6]) for x in block] final_scores = [num(x[7]) for x in block] teacher, mode, crn, sem2 = short_name_to_teacher_type_crn_sem(course_code) if not teacher: return tch_code = teacher_to_code[teacher] crs_code = course_to_code[course_code] if len(final_scores) < 2: return try: # "course_code course pocr_status orientation_status teacher_code mode student_id scaled_score" (final_mean, final_median, final_stdev, final_min, final_max, final_count) = [round(f(final_scores)) for f in fxns] final_pct_passed = above_70(final_scores, final_max) if final_max == 0: return scaled_final_scores = [ x / final_max for x in final_scores] (scl_mean, scl_median, scl_stdev, scl_min, scl_max, scl_count) = [round(f(scaled_final_scores),2) for f in fxns] good_code = ilearn_name_to_course_code(course_code) pocr = 1 if lookup_pocr(teacher, good_code, sem2) else 0 o = load_orientations() for row in block: student_id = row[3] orientation = o[student_id] if student_id in o else 0 scaled_score = round(num(row[7]) / final_max, 2) out_f.writerow([crs_code, good_code, pocr, orientation, tch_code, mode, student_id, scaled_score]) print(course_code) except Exception as e: print("Exception:", e) def process_grades(): # first loop to get all names courses_labeled = {} teacher_to_code = {} code_to_teacher = {} course_to_code = {} code_to_course = {} index = 1001 crs_index = 4001 with open(all_grades_file, newline="") as csvfile: csvreader = csv.reader(csvfile) next(csvreader) for row in csvreader: crn_sem = row[0] + '_' + row[1] if not crn_sem in courses_labeled: teacher, mode, crn, sem2 = short_name_to_teacher_type_crn_sem(row[2]) courses_labeled[crn_sem] = teacher if not row[2] in course_to_code: course_to_code[row[2]] = crs_index code_to_course[crs_index] = row[2] crs_index += 1 if teacher: if not teacher in teacher_to_code: teacher_to_code[teacher] = index code_to_teacher[index] = teacher index += 1 codecs.open('cache/teacher_lookup_codes.json','w','utf-8').write( json.dumps( [teacher_to_code, code_to_teacher], indent=2) ) codecs.open('cache/course_lookup_codes.json','w','utf-8').write( json.dumps( [course_to_code, code_to_course], indent=2) ) out_fullrows = codecs.open(all_courses_file3,'w','utf-8') out_f = csv.writer(out_fullrows) out_f.writerow("course_code course pocr_status orientation_status teacher_code mode student_id scaled_score".split(" ")) out_compact = codecs.open(all_courses_file2,'w','utf-8') out_c = csv.writer(out_compact) out_c.writerow("course_code course pocr_status teacher_code mode percent_passed scl_mean scl_median scl_stdev count".split(" ")) with open(all_courses_file, "w", newline="") as output_f: output = csv.writer(output_f) output.writerow("course_code course pocr_status teacher_code mode percent_passed scl_mean scl_median scl_stdev scl_min scl_max count".split(" ")) with open(all_grades_file, newline="") as csvfile: csvreader = csv.reader(csvfile) block = [] current_index = None next(csvreader) for row in csvreader: index = row[0] if index != current_index: if block: process_one_course_grades(block, output, out_c, teacher_to_code, course_to_code) process_one_course_grades_full(block, out_f, teacher_to_code, course_to_code) block = [] current_index = index block.append(row) if block: process_one_course_grades(block, output, out_c, teacher_to_code, course_to_code) process_one_course_grades_full(block, out_f, teacher_to_code, course_to_code) if __name__ == "__main__": options = { 1: ['get all historical grades from ilearn',get_all] , 2: ['process grades csv file',process_grades] , 3: ['test shortname parse',nametest] , 4: ['test sem codes',codetest] , 5: ['get student data from orientations', get_student_orientations], } print ('') if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]()