# statistics """ ## Investigate: Success rates (grades) of students in: - online courses (over all) - sync and async and online live - teachers/courses that have passed POCR (are all async?) - teachers that have done more than the minimum training in online teaching - in person classes, if grades are available ## Data collection - Choose how many semesters (10?) - Script 1 - given a CRN and Semester, download all grades - Check if grades were used and make sense - Compute mean, % > 70, median, etc. - Script 2 - given all semester schedules, generate lists of: - CRNs which are online, online live, hybrid, inperson, excluded - CRNs in which teacher and course have passed pocr (and semester is greater than their pass date) - CRNs in which teacher passed pocr for a different course (and semester is greater than their pass date) - CRNs to exclude, for example SP20, because of covid. Possibly SU20 and FA20 - CRNs in which teacher has done more than the minimum training in online teaching ## Hypothesis Testing - """ def num(s): if s == '': return 0 try: return int(s) except ValueError: return float(s) import json, csv, requests, sys, re from statistics import mean, median, stdev from pipelines import fetch, url from courses import getCoursesInTerm from collections import defaultdict all_grades_file = f"cache/grades_all.csv" all_courses_file = f"cache/course_grades_all.csv" def get_all(): terms = '178 177 176 175 174 173 172 171 168 65 64 62 63 61 60 25 26 23 22 21'.split(' ') sems = '202330 202310 202270 202250 202230 202210 202170 202150 202130 202070 202050 202030 202010 201970 201950 201930 201910 201870 201850 201830'.split(' ') # Save grades to a CSV file with open(all_grades_file, "w", newline="") as csvfile: writer = csv.writer(csvfile) writer.writerow(["crn", "sem", "coursecode", "s_can_id","g","name", "current", "final"]) for (term,sem) in zip(terms,sems): print(term,sem,"\n") courses = getCoursesInTerm(term,get_fresh=0,show=0,active=1) for c in courses: print(c['name']) c_code = c['course_code'] grades(writer, sem, c['id'], c_code) csvfile.flush() def grades(writer, sem, COURSE_ID, course_code): params = { "include[]": ["enrollments", "current_grading_period_scores"] } grades = fetch(url + f"/api/v1/courses/{COURSE_ID}/users",0, params) #grades = json.loads(grades.text) for student in grades: try: id = student["id"] name = student["name"] g = student["login_id"] print("\t", name) if student['enrollments'][0]['type'] == 'StudentEnrollment': grade = student["enrollments"][0]["grades"]["final_score"] current = student["enrollments"][0]["grades"]["current_score"] writer.writerow([COURSE_ID, sem, course_code, id, g, name, current, grade]) except Exception as e: print("Exception:", e) def count_above_70(li): pass def process_one_course_grades(block, output): fxns = [mean, median, stdev, min, max, len] c_id = block[0][0] sem = block[0][1] course_code = block[0][2] cur_scores = [num(x[6]) for x in block] final_scores = [num(x[7]) for x in block] #print(cur_scores) #print(final_scores) try: (cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count) = [round(f(cur_scores)) for f in fxns] (final_mean, final_median, final_stdev, final_min, final_max, final_count) = [round(f(final_scores)) for f in fxns] print("Course mean median stdev min max count") print("{:>12} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {:6d} ".format(course_code, cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count)) print("{:>12} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {:6d} ".format(course_code, final_mean, final_median, final_stdev, final_min, final_max, final_count)) print() output.writerow( [course_code, "current score", cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count] ) output.writerow( [course_code, "final score", final_mean, final_median, final_stdev, final_min, final_max, final_count] ) except Exception as e: print("Exception:", e) def process_grades(): with open(all_courses_file, "w", newline="") as output_f: output = csv.writer(output_f) output.writerow("Course mean median stdev min max count".split(" ")) with open(all_grades_file, newline="") as csvfile: csvreader = csv.reader(csvfile) block = [] current_index = None next(csvreader) for row in csvreader: index = row[0] if index != current_index: if block: process_one_course_grades(block, output) block = [] current_index = index block.append(row) if block: process_one_course_grades(block, output) def grades_rundown(): global results, users_by_id load_users() results = [] all_sem_courses = [] ids_out = open('all_teachers_by_goo','w') all_ids = {} # for the current or given semester's shells (really, only active ones) with open('grades_out.csv','wb') as f: w = csv.DictWriter(f, 'id,name,teacher,mean,median,count,count_gt70,grades,avg_activity_time'.split(',')) w.writeheader() #for c in all_sem_courses: courses = getCoursesInTerm(term=23,show=0,active=1) for C in courses: activity_time_total = 0.0 course_info = {'id':str(C['id']),'name':C['name'],'grades':[], 'teacher':[] } #print(str(C['id']) + "\t " + C['name']) emts = course_enrollment(C['id']) for k,E in emts.items(): if E['type'] == 'TeacherEnrollment': course_info['teacher'].append(users_by_id[E['user_id']]['name']) all_ids[E['sis_user_id']] = 1 """ if 'grades' in E and E['grades']['current_score']: #print(str(E['grades']['final_score']) + ", ",) #print(str(E['grades']['current_score']) + ", ",) course_info['grades'].append(E['grades']['current_score']) activity_time_total += E['total_activity_time'] if course_info['grades']: s = pd.Series(course_info['grades']) course_info['mean'] = s.mean() course_info['median'] = s.median() course_info['count'] = len(s.values) course_info['count_gt70'] = (s > 70.0).count() course_info['avg_activity_time'] = activity_time_total / len(s.values) else: course_info['mean'] = 0 course_info['median'] = 0 course_info['count'] = 0 course_info['count_gt70'] = 0 course_info['avg_activity_time'] = 0""" #print(course_info) all_sem_courses.append(course_info) w.writerow(course_info) f.flush() # get a grade (final? current?) for each student for k,v in all_ids.items(): if k: ids_out.write(k + ', ') # sanity check to make sure grading is actually happening in the shell # report an average, median, and buckets def class_logs(): global results # 1. Search the current semester and the misc semesters for a list of courses # that we want to check for users/activity. #target = url + '/api/v1/accounts/1/terms' # list the terms target = url + '/api/v1/accounts/1/courses?published=true&enrollment_term_id=14' print("Getting term classes.") while target: target = fetch(target) print("\n\n\n") term_results = results full_results = [] for x in term_results: results = [] # now see who's logged in recently: target = url + '/api/v1/courses/' + str(x['id']) + '/recent_students' print("Getting class id: ", str(x['id'])) fetch(target) if len(results): #print(results) LL = [ how_long_ago(z['last_login']) for z in results ] avg = 9999 if len(LL): avg = sum(LL) / len(LL) d = { 'id':x['id'], 'avg':avg, 'name':x['name'] } full_results.append(d) sorted_results = sorted(full_results, key=lambda k: k['avg']) for x in sorted_results: print(x['id'], "\t", str(x['avg']), "\t", x['name']) def user_logs(): global url, users_by_id, results target_user = "6357" load_users() results = [] target = url + '/api/v1/users/' + target_user + '/page_views?per_page=200' while target: print(target) target = fetch(target) # have all student's hits. Filter to only this class #results = filter(match59,results) times = [] print(users_by_id[ int(target_user) ]) f.write(str(users_by_id[ int(target_user) ]) + "\n") f.write( "link,updated_at,remote_ip,url,context_type,user_agent,action\n") for hit in results: L = [hit['links']['user'],hit['updated_at'],hit['remote_ip'],hit['url'],hit['context_type'],hit['user_agent'],hit['action']] L = map(str,L) f.write( ",".join(L) + "\n" ) def recent_logins(): global results, url, results_dict p = { 'start_time':'2017-08-31T00:00:00Z', 'end_time':'2017-08-31T00:05:00Z'} target = url + "/api/v1/audit/authentication/accounts/1" results_dict = {} resp = fetch_dict(target,p) print(resp) print(results_dict) def userHitsThisSemester(uid=2): begin = "20170820T0000" t = url + "/api/v1/users/" + str(uid) + "/page_views?start_time=" + str(begin) while(t): t = fetch(t) print(json.dumps(results, indent=4, sort_keys=True)) def getCurrentActivity(): # a dict # CURRENT ACTIVITY #r = requests.get(url + '/api/v1/accounts/1/analytics/current/activity', headers = header ) #t = url + '/api/v1/accounts/1/users?per_page=500' # analytics/terms/:term_id/activity #t = url + '/api/v1/accounts/1/analytics/current/statistics' global results_dict t = url + '/api/v1/accounts/1/analytics/terms/11/activity' while(t): t = fetch_dict(t) sp17 = results_dict['by_date'] results_dict = {} t = url + '/api/v1/accounts/1/analytics/terms/14/activity' while(t): t = fetch_dict(t) su17 = results_dict['by_date'] results_dict = {} t = url + '/api/v1/accounts/1/analytics/terms/15/activity' while(t): t = fetch_dict(t) su17b = results_dict['by_date'] results_dict = {} t = url + '/api/v1/accounts/1/analytics/terms/18/activity' while(t): t = fetch_dict(t) fa17 = results_dict['by_date'] results_dict = {} t = url + '/api/v1/accounts/1/analytics/terms/21/activity' while(t): t = fetch_dict(t) sp18 = results_dict['by_date'] results_dict = {} t = url + '/api/v1/accounts/1/analytics/terms/7/activity' while(t): t = fetch_dict(t) cmte = results_dict['by_date'] results_dict = {} t = url + '/api/v1/accounts/1/analytics/terms/6/activity' while(t): t = fetch_dict(t) dev = results_dict['by_date'] results_dict = {} master_list_by_date = {} for sem in [sp17,su17,su17b,fa17,sp18,cmte,dev]: #print(sem) for record in sem: print(record) date = record['date'] if date in master_list_by_date: master_list_by_date[date]['participations'] += record['participations'] master_list_by_date[date]['views'] += record['views'] else: master_list_by_date[date] = {} master_list_by_date[date]['date'] = date master_list_by_date[date]['participations'] = record['participations'] master_list_by_date[date]['views'] = record['views'] out = open('canvas/daily.json','w') # want to match the old, funny format by_date = [] my_out = {'by_date':by_date} for day in master_list_by_date.keys(): by_date.append(master_list_by_date[day]) out.write(json.dumps(my_out,indent=2)) def externaltool(): # a list #mydata = { "course_navigation[text]": "Video Chat", # "course_navigation[default]": "false" } #t = url + '/api/v1/accounts/1/external_tools/704?course_navigation[text]=Video Chat&course_navigation[default]=false' #r = requests.put(t, headers=header) t = url + '/api/v1/accounts/1/external_tools/' while(t): t = fetch(t) print(results) if __name__ == "__main__": options = { 1: ['get all historical grades from ilearn',get_all] , 2: ['process grades csv file',process_grades] , } print ('') if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]()