267 lines
9.1 KiB
Python
267 lines
9.1 KiB
Python
# statistics
|
|
|
|
"""
|
|
## Investigate: Success rates (grades) of students in:
|
|
|
|
- online courses (over all)
|
|
- sync and async and online live
|
|
- teachers/courses that have passed POCR (are all async?)
|
|
- teachers that have done more than the minimum training in online teaching
|
|
- in person classes, if grades are available
|
|
|
|
|
|
|
|
## Data collection
|
|
|
|
- Choose how many semesters (10?)
|
|
- Script 1 - given a CRN and Semester, download all grades
|
|
- Check if grades were used and make sense
|
|
- Compute mean, % > 70, median, etc.
|
|
|
|
- Script 2 - given all semester schedules, generate lists of:
|
|
- CRNs which are online, online live, hybrid, inperson, excluded
|
|
- CRNs in which teacher and course have passed pocr (and semester is greater than their pass date)
|
|
- CRNs in which teacher passed pocr for a different course (and semester is greater than their pass date)
|
|
- CRNs to exclude, for example SP20, because of covid. Possibly SU20 and FA20
|
|
- CRNs in which teacher has done more than the minimum training in online teaching
|
|
|
|
- Next steps: generate the x-reference for what categories teachers are in, and
|
|
integrate into the main data file.
|
|
|
|
## Hypothesis Testing
|
|
|
|
-
|
|
"""
|
|
|
|
|
|
def num(s):
|
|
if s == '': return 0
|
|
try:
|
|
return int(s)
|
|
except ValueError:
|
|
return float(s)
|
|
|
|
import json, csv, requests, sys, re
|
|
from multiprocessing import Semaphore
|
|
from statistics import mean, median, stdev
|
|
from pipelines import fetch, url
|
|
from courses import getCoursesInTerm
|
|
from collections import defaultdict
|
|
|
|
all_grades_file = f"cache/grades_all.csv"
|
|
all_courses_file = f"cache/course_grades_all.csv"
|
|
|
|
def get_all():
|
|
terms = '178 177 176 175 174 173 172 171 168 65 64 62 63 61 60 25 26 23 22 21'.split(' ')
|
|
sems = '202330 202310 202270 202250 202230 202210 202170 202150 202130 202070 202050 202030 202010 201970 201950 201930 201910 201870 201850 201830'.split(' ')
|
|
# Save grades to a CSV file
|
|
with open(all_grades_file, "w", newline="") as csvfile:
|
|
writer = csv.writer(csvfile)
|
|
writer.writerow(["crn", "sem", "coursecode", "s_can_id","g","name", "current", "final"])
|
|
for (term,sem) in zip(terms,sems):
|
|
print(term,sem,"\n")
|
|
courses = getCoursesInTerm(term,get_fresh=0,show=0,active=1)
|
|
for c in courses:
|
|
print(c['name'])
|
|
c_code = c['course_code']
|
|
grades(writer, sem, c['id'], c_code)
|
|
csvfile.flush()
|
|
|
|
|
|
def grades(writer, sem, COURSE_ID, course_code):
|
|
params = { "include[]": ["enrollments", "current_grading_period_scores"] }
|
|
grades = fetch(url + f"/api/v1/courses/{COURSE_ID}/users",0, params)
|
|
#grades = json.loads(grades.text)
|
|
|
|
for student in grades:
|
|
try:
|
|
id = student["id"]
|
|
name = student["name"]
|
|
g = student["login_id"]
|
|
print("\t", name)
|
|
if student['enrollments'][0]['type'] == 'StudentEnrollment':
|
|
grade = student["enrollments"][0]["grades"]["final_score"]
|
|
current = student["enrollments"][0]["grades"]["current_score"]
|
|
writer.writerow([COURSE_ID, sem, course_code, id, g, name, current, grade])
|
|
except Exception as e:
|
|
print("Exception:", e)
|
|
|
|
|
|
schedules = {}
|
|
|
|
import codecs, os
|
|
|
|
def load_schedules():
|
|
global schedules
|
|
if not schedules:
|
|
for f in os.listdir('cache/schedule'):
|
|
m = re.search(r'(\w\w\d\d)_sched_expanded\.json', f)
|
|
if m:
|
|
sem = m.group(1)
|
|
schedules[sem] = json.loads( codecs.open('cache/schedule/' + f, 'r', 'utf-8').read() )
|
|
|
|
def to_crn_fallback(name):
|
|
#print(name)
|
|
name = name.lower()
|
|
try:
|
|
m1 = re.search(r'(\d\d\d\d\d)',name)
|
|
if m1:
|
|
crn = m1.group(1)
|
|
else:
|
|
return None,None
|
|
m2 = re.search(r'([wispufa][wispufa]\d\d)',name.lower())
|
|
if m2:
|
|
sem = m2.group(1)
|
|
else:
|
|
return None, None
|
|
#print(name, crn, sem)
|
|
return crn, sem
|
|
except Exception as e:
|
|
#print("Exception: ", e, name)
|
|
return None, None
|
|
|
|
|
|
|
|
def short_name_to_crn(name):
|
|
#print(name)
|
|
try:
|
|
parts = name.split(' ')
|
|
code = parts[0]
|
|
sem = parts[1]
|
|
crn = parts[2]
|
|
m_sem = re.search(r'^(\w\w\d\d)$',sem)
|
|
if not m_sem:
|
|
return to_crn_fallback(name)
|
|
m = re.search(r'^(\d\d\d\d\d)$',crn)
|
|
if m:
|
|
return crn,sem
|
|
else:
|
|
crn_parts = crn.split('/')
|
|
m = re.search(r'^(\d\d\d\d\d)$',crn_parts[0])
|
|
if m:
|
|
return crn_parts[0],sem
|
|
#print("non standard course short name: ", code, sem, crn)
|
|
return to_crn_fallback(name)
|
|
except Exception as e:
|
|
#print("Exception: ", e, name)
|
|
return to_crn_fallback(name)
|
|
|
|
|
|
def fixname(n):
|
|
return re.sub(r'\s+',' ', n).strip()
|
|
|
|
|
|
def short_name_to_teacher_type_crn_sem(name):
|
|
load_schedules()
|
|
crn, sem = short_name_to_crn(name)
|
|
|
|
try:
|
|
if sem:
|
|
sem = sem.lower()
|
|
if sem[0:2]=='wi':
|
|
sem = 'sp' + sem[2:]
|
|
for course in schedules[sem]:
|
|
if course['crn'] == crn:
|
|
return fixname(course['teacher']), course['type'], crn, sem
|
|
except Exception as e:
|
|
return None, None, None, None
|
|
|
|
return None, None, None, None
|
|
|
|
|
|
|
|
def nametest():
|
|
with open(all_courses_file) as csvfile:
|
|
csvreader = csv.reader(csvfile)
|
|
next(csvreader)
|
|
|
|
for row in csvreader:
|
|
print(row[0], "-", short_name_to_teacher_type_crn_sem(row[0]))
|
|
next(csvreader)
|
|
|
|
def above_70(li,maximum):
|
|
cutoff = 0.7 * maximum
|
|
above = list(filter(lambda x: x >= cutoff, li))
|
|
return (len(above)/len(li))
|
|
|
|
def process_one_course_grades(block, output):
|
|
fxns = [mean, median, stdev, min, max, len]
|
|
c_id = block[0][0]
|
|
sem = block[0][1]
|
|
course_code = block[0][2]
|
|
cur_scores = [num(x[6]) for x in block]
|
|
final_scores = [num(x[7]) for x in block]
|
|
print(course_code)
|
|
teacher, mode, crn, sem2 = short_name_to_teacher_type_crn_sem(course_code)
|
|
if not teacher:
|
|
return
|
|
#print(cur_scores)
|
|
#print(final_scores)
|
|
try:
|
|
(cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count) = [round(f(cur_scores)) for f in fxns]
|
|
(final_mean, final_median, final_stdev, final_min, final_max, final_count) = [round(f(final_scores)) for f in fxns]
|
|
|
|
cur_pct_passed = above_70(cur_scores, cur_max)
|
|
final_pct_passed = above_70(final_scores, final_max)
|
|
|
|
print("Course % > 70 mean median stdev min max count")
|
|
print("{:>12} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {:6d} ".format(course_code, cur_pct_passed, cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count))
|
|
print("{:>12} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {: 6.0f} {:6d} ".format(course_code, final_pct_passed, final_mean, final_median, final_stdev, final_min, final_max, final_count))
|
|
print()
|
|
#output.writerow( [sem2, crn, course_code, "current score", teacher, mode, cur_pct_passed, cur_mean, cur_median, cur_stdev, cur_min, cur_max, cur_count] )
|
|
output.writerow( [sem2, crn, course_code, "final score", teacher, mode, final_pct_passed, final_mean, final_median, final_stdev, final_min, final_max, final_count] )
|
|
except Exception as e:
|
|
print("Exception:", e)
|
|
|
|
def process_grades():
|
|
with open(all_courses_file, "w", newline="") as output_f:
|
|
output = csv.writer(output_f)
|
|
output.writerow("sem crn shortname score_type teacher mode percent_passed mean median stdev min max count".split(" "))
|
|
|
|
with open(all_grades_file, newline="") as csvfile:
|
|
csvreader = csv.reader(csvfile)
|
|
block = []
|
|
current_index = None
|
|
|
|
next(csvreader)
|
|
|
|
for row in csvreader:
|
|
index = row[0]
|
|
|
|
if index != current_index:
|
|
if block:
|
|
process_one_course_grades(block, output)
|
|
block = []
|
|
current_index = index
|
|
|
|
block.append(row)
|
|
|
|
if block:
|
|
process_one_course_grades(block, output)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
options = { 1: ['get all historical grades from ilearn',get_all] ,
|
|
2: ['process grades csv file',process_grades] ,
|
|
3: ['test shortname parse',nametest] ,
|
|
}
|
|
print ('')
|
|
|
|
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
|
|
resp = int(sys.argv[1])
|
|
print("\n\nPerforming: %s\n\n" % options[resp][0])
|
|
|
|
else:
|
|
print ('')
|
|
for key in options:
|
|
print(str(key) + '.\t' + options[key][0])
|
|
|
|
print('')
|
|
resp = input('Choose: ')
|
|
|
|
# Call the function in the options dict
|
|
options[ int(resp)][1]()
|