471 lines
16 KiB
Python
471 lines
16 KiB
Python
# statistics
|
|
|
|
"""
|
|
## Investigate: Success rates (grades) of students in:
|
|
|
|
- online courses (over all)
|
|
- sync and async and online live
|
|
- teachers/courses that have passed POCR (are all async?)
|
|
- teachers that have done more than the minimum training in online teaching
|
|
- in person classes, if grades are available
|
|
|
|
|
|
|
|
## Data collection
|
|
|
|
- Choose how many semesters (10?)
|
|
- Script 1 - given a CRN and Semester, download all grades
|
|
- Check if grades were used and make sense
|
|
- Compute mean, % > 70, median, etc.
|
|
|
|
- Script 2 - given all semester schedules, generate lists of:
|
|
- CRNs which are online, online live, hybrid, inperson, excluded
|
|
- CRNs in which teacher and course have passed pocr (and semester is greater than their pass date)
|
|
- CRNs in which teacher passed pocr for a different course (and semester is greater than their pass date)
|
|
- CRNs to exclude, for example SP20, because of covid. Possibly SU20 and FA20
|
|
- CRNs in which teacher has done more than the minimum training in online teaching
|
|
|
|
|
|
## Hypothesis Testing
|
|
|
|
-
|
|
"""
|
|
|
|
|
|
def num(s):
|
|
if s == '': return 0
|
|
try:
|
|
return int(s)
|
|
except ValueError:
|
|
return float(s)
|
|
|
|
import json, csv, requests, sys, re
|
|
from multiprocessing import Semaphore
|
|
from statistics import mean, median, stdev
|
|
from pipelines import fetch, url
|
|
from courses import getCoursesInTerm
|
|
from collections import defaultdict
|
|
|
|
all_grades_file = f"cache/grades_all.csv"
|
|
all_courses_file = f"cache/course_grades_all.csv"
|
|
|
|
def get_all():
|
|
terms = '178 177 176 175 174 173 172 171 168 65 64 62 63 61 60 25 26 23 22 21'.split(' ')
|
|
sems = '202330 202310 202270 202250 202230 202210 202170 202150 202130 202070 202050 202030 202010 201970 201950 201930 201910 201870 201850 201830'.split(' ')
|
|
# Save grades to a CSV file
|
|
with open(all_grades_file, "w", newline="") as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(["crn", "sem", "coursecode", "s_can_id","g","name", "current", "final"])
|
|
for (term,sem) in zip(terms,sems):
|
|
print(term,sem,"\n")
|
|
courses = getCoursesInTerm(term,get_fresh=0,show=0,active=1)
|
|
for c in courses:
|
|
print(c['name'])
|
|
c_code = c['course_code']
|
|
grades(writer, sem, c['id'], c_code)
|
|
csvfile.flush()
|
|
|
|
|
|
def grades(writer, sem, COURSE_ID, course_code):
|
|
params = { "include[]": ["enrollments", "current_grading_period_scores"] }
|
|
grades = fetch(url + f"/api/v1/courses/{COURSE_ID}/users",0, params)
|
|
#grades = json.loads(grades.text)
|
|
|
|
for student in grades:
|
|
try:
|
|
id = student["id"]
|
|
name = student["name"]
|
|
g = student["login_id"]
|
|
print("\t", name)
|
|
if student['enrollments'][0]['type'] == 'StudentEnrollment':
|
|
grade = student["enrollments"][0]["grades"]["final_score"]
|
|
current = student["enrollments"][0]["grades"]["current_score"]
|
|
writer.writerow([COURSE_ID, sem, course_code, id, g, name, current, grade])
|
|
except Exception as e:
|
|
print("Exception:", e)
|
|
|
|
|
|
schedules = {}
|
|
|
|
import codecs, os
|
|
|
|
def load_schedules():
|
|
global schedules
|
|
if not schedules:
|
|
for f in os.listdir('cache/schedule'):
|
|
m = re.search(r'(\w\w\d\d)_sched_expanded\.json', f)
|
|
if m:
|
|
sem = m.group(1)
|
|
schedules[sem] = json.loads( codecs.open('cache/schedule/' + f, 'r', 'utf-8').read() )
|
|
|
|
def to_crn_fallback(name):
|
|
#print(name)
|
|
name = name.lower()
|
|
try:
|
|
m1 = re.search(r'(\d\d\d\d\d)',name)
|
|
if m1:
|
|
crn = m1.group(1)
|
|
else:
|
|
return None,None
|
|
m2 = re.search(r'([wispufa][wispufa]\d\d)',name.lower())
|
|
if m2:
|
|
sem = m2.group(1)
|
|
else:
|
|
return None, None
|
|
#print(name, crn, sem)
|
|
return crn, sem
|
|
except Exception as e:
|
|
#print("Exception: ", e, name)
|
|
return None, None
|
|
|
|
|
|
|
|
def short_name_to_crn(name):
|
|
#print(name)
|
|
try:
|
|
parts = name.split(' ')
|
|
code = parts[0]
|
|
sem = parts[1]
|
|
crn = parts[2]
|
|
m_sem = re.search(r'^(\w\w\d\d)$',sem)
|
|
if not m_sem:
|
|
return to_crn_fallback(name)
|
|
m = re.search(r'^(\d\d\d\d\d)$',crn)
|
|
if m:
|
|
return crn,sem
|
|
else:
|
|
crn_parts = crn.split('/')
|
|
m = re.search(r'^(\d\d\d\d\d)$',crn_parts[0])
|
|
if m:
|
|
return crn_parts[0],sem
|
|
#print("non standard course short name: ", code, sem, crn)
|
|
return to_crn_fallback(name)
|
|
except Exception as e:
|
|
#print("Exception: ", e, name)
|
|
return to_crn_fallback(name)
|
|
|
|
def short_name_to_teacher(name):
|
|
load_schedules()
|
|
crn, sem = short_name_to_crn(name)
|
|
|
|
try:
|
|
if sem:
|
|
sem = sem.lower()
|
|
if sem[0:2]=='wi':
|
|
sem = 'sp' + sem[2:]
|
|
for course in schedules[sem]:
|
|
if course['crn'] == crn:
|
|
return course['teacher'], course['type']
|
|
except Exception as e:
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
|
|
def nametest():
|
|
with open(all_courses_file) as csvfile:
|
|
csvreader = csv.reader(csvfile)
|
|
next(csvreader)
|
|
|
|
for row in csvreader:
|
|
print(row[0], "-", short_name_to_teacher(row[0]))
|
|
next(csvreader)
|
|
|
|
|
|
|
|
def count_above_70(li):
|
|
pass
|
|
|
|
def process_one_course_grades(block, output):
|
|
fxns = [mean, median, stdev, min, max, len]
|
|
c_id = block[0][0]
|
|
sem = block[0][1]
|
|
course_code = block[0][2]
|
|
cur_scores = [num(x[6]) for x in block]
|
|
final_scores = [num(x[7]) for x in block]
|
|
#print(cur_scores)
|
|
#print(final_scores)
|
|
try:
|
|
(cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count) = [round(f(cur_scores)) for f in fxns]
|
|
(final_mean, final_median, final_stdev, final_min, final_max, final_count) = [round(f(final_scores)) for f in fxns]
|
|
|
|
print("Course mean median stdev min max count")
|
|
print("{:>12} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {:6d} ".format(course_code, cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count))
|
|
print("{:>12} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {:6d} ".format(course_code, final_mean, final_median, final_stdev, final_min, final_max, final_count))
|
|
print()
|
|
output.writerow( [course_code, "current score", cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count] )
|
|
output.writerow( [course_code, "final score", final_mean, final_median, final_stdev, final_min, final_max, final_count] )
|
|
except Exception as e:
|
|
print("Exception:", e)
|
|
|
|
def process_grades():
|
|
with open(all_courses_file, "w", newline="") as output_f:
|
|
output = csv.writer(output_f)
|
|
output.writerow("Course mean median stdev min max count".split(" "))
|
|
|
|
with open(all_grades_file, newline="") as csvfile:
|
|
csvreader = csv.reader(csvfile)
|
|
block = []
|
|
current_index = None
|
|
|
|
next(csvreader)
|
|
|
|
for row in csvreader:
|
|
index = row[0]
|
|
|
|
if index != current_index:
|
|
if block:
|
|
process_one_course_grades(block, output)
|
|
block = []
|
|
current_index = index
|
|
|
|
block.append(row)
|
|
|
|
if block:
|
|
process_one_course_grades(block, output)
|
|
|
|
|
|
|
|
def grades_rundown():
|
|
global results, users_by_id
|
|
load_users()
|
|
results = []
|
|
all_sem_courses = []
|
|
ids_out = open('all_teachers_by_goo','w')
|
|
all_ids = {}
|
|
# for the current or given semester's shells (really, only active ones)
|
|
with open('grades_out.csv','wb') as f:
|
|
w = csv.DictWriter(f, 'id,name,teacher,mean,median,count,count_gt70,grades,avg_activity_time'.split(','))
|
|
w.writeheader()
|
|
#for c in all_sem_courses:
|
|
courses = getCoursesInTerm(term=23,show=0,active=1)
|
|
for C in courses:
|
|
activity_time_total = 0.0
|
|
course_info = {'id':str(C['id']),'name':C['name'],'grades':[], 'teacher':[] }
|
|
#print(str(C['id']) + "\t " + C['name'])
|
|
emts = course_enrollment(C['id'])
|
|
for k,E in emts.items():
|
|
if E['type'] == 'TeacherEnrollment':
|
|
course_info['teacher'].append(users_by_id[E['user_id']]['name'])
|
|
all_ids[E['sis_user_id']] = 1
|
|
""" if 'grades' in E and E['grades']['current_score']:
|
|
#print(str(E['grades']['final_score']) + ", ",)
|
|
#print(str(E['grades']['current_score']) + ", ",)
|
|
course_info['grades'].append(E['grades']['current_score'])
|
|
activity_time_total += E['total_activity_time']
|
|
if course_info['grades']:
|
|
s = pd.Series(course_info['grades'])
|
|
course_info['mean'] = s.mean()
|
|
course_info['median'] = s.median()
|
|
course_info['count'] = len(s.values)
|
|
course_info['count_gt70'] = (s > 70.0).count()
|
|
course_info['avg_activity_time'] = activity_time_total / len(s.values)
|
|
else:
|
|
course_info['mean'] = 0
|
|
course_info['median'] = 0
|
|
course_info['count'] = 0
|
|
course_info['count_gt70'] = 0
|
|
course_info['avg_activity_time'] = 0"""
|
|
|
|
#print(course_info)
|
|
all_sem_courses.append(course_info)
|
|
w.writerow(course_info)
|
|
f.flush()
|
|
|
|
# get a grade (final? current?) for each student
|
|
for k,v in all_ids.items():
|
|
if k: ids_out.write(k + ', ')
|
|
|
|
# sanity check to make sure grading is actually happening in the shell
|
|
|
|
# report an average, median, and buckets
|
|
|
|
|
|
|
|
|
|
def class_logs():
|
|
global results
|
|
# 1. Search the current semester and the misc semesters for a list of courses
|
|
# that we want to check for users/activity.
|
|
#target = url + '/api/v1/accounts/1/terms' # list the terms
|
|
target = url + '/api/v1/accounts/1/courses?published=true&enrollment_term_id=14'
|
|
print("Getting term classes.")
|
|
while target:
|
|
target = fetch(target)
|
|
|
|
print("\n\n\n")
|
|
|
|
term_results = results
|
|
full_results = []
|
|
for x in term_results:
|
|
results = []
|
|
# now see who's logged in recently:
|
|
target = url + '/api/v1/courses/' + str(x['id']) + '/recent_students'
|
|
print("Getting class id: ", str(x['id']))
|
|
fetch(target)
|
|
if len(results):
|
|
#print(results)
|
|
LL = [ how_long_ago(z['last_login']) for z in results ]
|
|
avg = 9999
|
|
if len(LL): avg = sum(LL) / len(LL)
|
|
d = { 'id':x['id'], 'avg':avg, 'name':x['name'] }
|
|
full_results.append(d)
|
|
sorted_results = sorted(full_results, key=lambda k: k['avg'])
|
|
for x in sorted_results:
|
|
print(x['id'], "\t", str(x['avg']), "\t", x['name'])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def user_logs():
|
|
global url, users_by_id, results
|
|
target_user = "6357"
|
|
load_users()
|
|
results = []
|
|
target = url + '/api/v1/users/' + target_user + '/page_views?per_page=200'
|
|
while target:
|
|
print(target)
|
|
target = fetch(target)
|
|
# have all student's hits. Filter to only this class
|
|
#results = filter(match59,results)
|
|
times = []
|
|
print(users_by_id[ int(target_user) ])
|
|
f.write(str(users_by_id[ int(target_user) ]) + "\n")
|
|
f.write( "link,updated_at,remote_ip,url,context_type,user_agent,action\n")
|
|
for hit in results:
|
|
L = [hit['links']['user'],hit['updated_at'],hit['remote_ip'],hit['url'],hit['context_type'],hit['user_agent'],hit['action']]
|
|
L = map(str,L)
|
|
f.write( ",".join(L) + "\n" )
|
|
|
|
|
|
|
|
|
|
def recent_logins():
|
|
global results, url, results_dict
|
|
p = { 'start_time':'2017-08-31T00:00:00Z', 'end_time':'2017-08-31T00:05:00Z'}
|
|
target = url + "/api/v1/audit/authentication/accounts/1"
|
|
results_dict = {}
|
|
resp = fetch_dict(target,p)
|
|
print(resp)
|
|
print(results_dict)
|
|
|
|
|
|
|
|
def userHitsThisSemester(uid=2):
|
|
begin = "20170820T0000"
|
|
t = url + "/api/v1/users/" + str(uid) + "/page_views?start_time=" + str(begin)
|
|
while(t): t = fetch(t)
|
|
print(json.dumps(results, indent=4, sort_keys=True))
|
|
|
|
|
|
|
|
|
|
def getCurrentActivity(): # a dict
|
|
# CURRENT ACTIVITY
|
|
#r = requests.get(url + '/api/v1/accounts/1/analytics/current/activity', headers = header )
|
|
#t = url + '/api/v1/accounts/1/users?per_page=500'
|
|
# analytics/terms/:term_id/activity
|
|
#t = url + '/api/v1/accounts/1/analytics/current/statistics'
|
|
global results_dict
|
|
t = url + '/api/v1/accounts/1/analytics/terms/11/activity'
|
|
while(t): t = fetch_dict(t)
|
|
sp17 = results_dict['by_date']
|
|
results_dict = {}
|
|
|
|
t = url + '/api/v1/accounts/1/analytics/terms/14/activity'
|
|
while(t): t = fetch_dict(t)
|
|
su17 = results_dict['by_date']
|
|
results_dict = {}
|
|
|
|
t = url + '/api/v1/accounts/1/analytics/terms/15/activity'
|
|
while(t): t = fetch_dict(t)
|
|
su17b = results_dict['by_date']
|
|
results_dict = {}
|
|
|
|
t = url + '/api/v1/accounts/1/analytics/terms/18/activity'
|
|
while(t): t = fetch_dict(t)
|
|
fa17 = results_dict['by_date']
|
|
results_dict = {}
|
|
|
|
t = url + '/api/v1/accounts/1/analytics/terms/21/activity'
|
|
while(t): t = fetch_dict(t)
|
|
sp18 = results_dict['by_date']
|
|
results_dict = {}
|
|
|
|
t = url + '/api/v1/accounts/1/analytics/terms/7/activity'
|
|
while(t): t = fetch_dict(t)
|
|
cmte = results_dict['by_date']
|
|
results_dict = {}
|
|
|
|
t = url + '/api/v1/accounts/1/analytics/terms/6/activity'
|
|
while(t): t = fetch_dict(t)
|
|
dev = results_dict['by_date']
|
|
results_dict = {}
|
|
|
|
master_list_by_date = {}
|
|
for sem in [sp17,su17,su17b,fa17,sp18,cmte,dev]:
|
|
#print(sem)
|
|
for record in sem:
|
|
print(record)
|
|
date = record['date']
|
|
if date in master_list_by_date:
|
|
master_list_by_date[date]['participations'] += record['participations']
|
|
master_list_by_date[date]['views'] += record['views']
|
|
else:
|
|
master_list_by_date[date] = {}
|
|
master_list_by_date[date]['date'] = date
|
|
master_list_by_date[date]['participations'] = record['participations']
|
|
master_list_by_date[date]['views'] = record['views']
|
|
out = open('canvas/daily.json','w')
|
|
# want to match the old, funny format
|
|
by_date = []
|
|
my_out = {'by_date':by_date}
|
|
|
|
for day in master_list_by_date.keys():
|
|
by_date.append(master_list_by_date[day])
|
|
out.write(json.dumps(my_out,indent=2))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def externaltool(): # a list
|
|
|
|
|
|
#mydata = { "course_navigation[text]": "Video Chat",
|
|
# "course_navigation[default]": "false" }
|
|
#t = url + '/api/v1/accounts/1/external_tools/704?course_navigation[text]=Video Chat&course_navigation[default]=false'
|
|
#r = requests.put(t, headers=header)
|
|
t = url + '/api/v1/accounts/1/external_tools/'
|
|
while(t): t = fetch(t)
|
|
print(results)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
options = { 1: ['get all historical grades from ilearn',get_all] ,
|
|
2: ['process grades csv file',process_grades] ,
|
|
3: ['test shortname parse',nametest] ,
|
|
}
|
|
print ('')
|
|
|
|
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
|
|
resp = int(sys.argv[1])
|
|
print("\n\nPerforming: %s\n\n" % options[resp][0])
|
|
|
|
else:
|
|
print ('')
|
|
for key in options:
|
|
print(str(key) + '.\t' + options[key][0])
|
|
|
|
print('')
|
|
resp = input('Choose: ')
|
|
|
|
# Call the function in the options dict
|
|
options[ int(resp)][1]()
|