3509 lines
132 KiB
Python
3509 lines
132 KiB
Python
import json, re, requests, codecs, sys, time, funcy, os, csv, random
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta, timezone
|
|
import pytz
|
|
from util import print_table, int_or_zero, float_or_zero, dept_from_name, num_from_name
|
|
from pipelines import fetch, fetch_stream, fetch_collapse, header, url
|
|
from schedules import get_semester_schedule
|
|
from localcache import course_quick_stats, get_courses_in_term_local, course_student_stats, all_sem_courses_teachers, full_reload
|
|
from localcache2 import (
|
|
db,
|
|
users_new_this_semester,
|
|
users_new_this_2x_semester,
|
|
course_from_id,
|
|
user_ids_in_shell,
|
|
student_count,
|
|
teacher_list,
|
|
course_sched_entry_from_id,
|
|
get_orientation_shells,
|
|
get_orientation_memberships,
|
|
get_student_enrollment_summary,
|
|
)
|
|
from collections import defaultdict
|
|
from semesters import find_term
|
|
|
|
#from dateutil import parser
|
|
#from ast import Try, TryStar
|
|
#from symbol import try_stmt
|
|
#from pipelines import sems
|
|
|
|
|
|
stem_course_id = '11015' # TODO
|
|
|
|
|
|
#########
|
|
######### GET FACTS FROM INDIVIDUAL COURSES
|
|
#########
|
|
#########
|
|
|
|
# Gott 1 Bootcamp - report on who completed it.
|
|
def get_gott1_passers():
|
|
course = '1561'
|
|
|
|
min_passing = 85
|
|
passers_filename = 'cache/teacherdata/bootcamp_passed.csv'
|
|
still_active_filename = 'cache/teacherdata/bootcamp_active.csv'
|
|
#get_course_passers(course, min_passing, passers_filename, still_active_filename)
|
|
|
|
# Plagiarism Module - report on who completed it.
|
|
def get_plague_passers():
|
|
course = '11633'
|
|
min_passing = 85
|
|
passers_filename = 'cache/teacherdata/plagiarism_passed.csv'
|
|
still_active_filename = 'cache/teacherdata/plagiarism_active.csv'
|
|
"""
|
|
(passed, didnt) = get_course_passers(course, min_passing, passers_filename, still_active_filename)
|
|
passed = set( [z[2] for z in passed] )
|
|
didnt = set( [z[2] for z in didnt] )
|
|
enrol = [ [ str(z) for z in list(course_enrollment(cr)) ] for cr in ['11677','11698'] ]
|
|
|
|
print(enrol)
|
|
|
|
enrol = set(funcy.cat(enrol))
|
|
everyone = passed.union(didnt,enrol)
|
|
|
|
reportable = passed.intersection(enrol)
|
|
outputfile = open('cache/plagcheck.txt','w').write( json.dumps( [ list(reportable), list(enrol), list(passed), list(didnt), list(everyone) ],indent=2))
|
|
return 1
|
|
|
|
#enrol = { cr: [ str(z) for z in list(course_enrollment(cr).keys()) ] for cr in ['11677','11698',] }
|
|
# # [x['user_id'] for x in course_enrollment(cr)]
|
|
outputfile = open('cache/plagcheck.txt','w').write( json.dumps( [ [z[2] for z in passed],[z[2] for z in didnt],enrol],indent=2))
|
|
return 1
|
|
|
|
passed = {}
|
|
didnt = {}
|
|
|
|
output_by_course = {}
|
|
course_s = {}
|
|
|
|
for p in passed: passed_by_deptr(p[2])] = p
|
|
for p in didnt: didnt_d(p[2])] = p
|
|
|
|
passed_s = [ str(k) for k in passed_d() ]
|
|
didnt_s = [ str(k) for k in didnt_by_deptys() ]
|
|
|
|
|
|
crossref = ['11677','11698',]
|
|
|
|
outputfile = open('cache/plagcheck.txt','w')
|
|
oo = { 'passed': passed_by_deptdidnt': didnt_by_dept
|
|
for cr in crossref:
|
|
student_int = course_enrollment(cr)
|
|
student_by_dict{ str(k): v for k,v in student_int.items() }
|
|
oo[cr] = student_by_dict
|
|
output_by_course[cr] = { 'passed':{}, 'didnt':{}, 'missing':{} }
|
|
|
|
course_s[cr] = set( [ str(k) for k in student_by_dict.keys() ])
|
|
|
|
for k,v in student_by_dict.items():
|
|
key_s = str(k)
|
|
|
|
if key_s in passed_by_dict output_by_course[cr]['passed'][key_s] = passed_by_dicty_s]
|
|
elif key_s in didnt_by_dict output_by_course[cr]['didnt'][key_s] = didnt_by_dicty_s]
|
|
else:
|
|
output_by_course[cr]['missing'][key_s] = v['user']
|
|
|
|
oo['final_output'] = output_by_course
|
|
oo['passed_s'] = list(passed_s)
|
|
oo['didnt_s'] = list(didnt_s)
|
|
|
|
course_sd = {k: list(v) for k,v in course_s.items() }
|
|
|
|
oo['course_s'] = course_sd
|
|
|
|
outputfile.write(json.dumps(oo,indent=2))
|
|
|
|
|
|
# Who, in a class, passed?
|
|
def get_course_passers(course, min_passing, passers_filename, still_active_filename):
|
|
path = url + '/api/v1/courses/%s/enrollments' % str(course)
|
|
|
|
tempout = open('cache/passers_temp.txt','w')
|
|
|
|
enrl = fetch( path, 0)
|
|
passed = []
|
|
didnt = []
|
|
for E in enrl:
|
|
try:
|
|
n = E['user']['name']
|
|
oo = E['user']['sis_user_id']
|
|
i = str(E['user_id'])
|
|
r = E['role']
|
|
g = E['grades']['current_score']
|
|
l = E['last_activity_at']
|
|
p = float_or_zero(g) > min_passing
|
|
print( "%s: a %s, grade of %s. Passed? %s. Last seen: %s" % (n,r,str(g),str(p),l) )
|
|
|
|
tempout.write(json.dumps(E['user']['name']) + "\n")
|
|
tempout.write(json.dumps(E['grades'],indent=2) + "\n\n-----\n\n")
|
|
|
|
if p:
|
|
passed.append( [n, oo, i, r, g, l ] )
|
|
else:
|
|
didnt.append( [n, oo, i, r, g, l ] )
|
|
except:
|
|
pass
|
|
|
|
columns = ['name', 'goo','canvas_id','role','grade','last_activity']
|
|
pp = pd.DataFrame(passed, columns=columns)
|
|
pp.sort_values(by='last_activity',inplace=True)
|
|
pp.to_csv(passers_filename, index=False)
|
|
dd = pd.DataFrame(didnt, columns=columns)
|
|
dd.sort_values(by='last_activity',inplace=True)
|
|
dd.to_csv(still_active_filename, index=False)
|
|
|
|
print("Saved output to \n - passed: %s\n - not passed: %s\n" % (passers_filename, still_active_filename))
|
|
return (passed,didnt)
|
|
|
|
"""
|
|
# Gott 1A
|
|
"""course = '2908'
|
|
quiz = '15250'
|
|
pass_grade = 0.90
|
|
|
|
path = url + '/api/v1/courses/%s/quizzes/%s/submissions' % (course,quiz)
|
|
q_subs = fetch_collapse(path, 'quiz_submissions')
|
|
for Q in q_subs:
|
|
prct = float_or_zero(Q['score']) / float_or_zero( Q['quiz_points_possible'] )
|
|
print( 'Passed: %s\t Score: %s,\tUser: %s' % \
|
|
( str(prct>0.9), str(int_or_zero(Q['score'])), Q['user_id'] ))"""
|
|
|
|
|
|
|
|
# Who, in a class and a quiz, passed?
|
|
def get_quiz_passers():
|
|
# Gott 1 Bootcamp
|
|
course = '1561'
|
|
path = url + '/api/v1/courses/%s/enrollments' % course
|
|
enrl = fetch( path, 0)
|
|
min_passing = 85
|
|
passed = []
|
|
didnt = []
|
|
for E in enrl:
|
|
try:
|
|
n = E['user']['name']
|
|
i = E['user_id']
|
|
r = E['role']
|
|
g = E['grades']['current_score']
|
|
l = E['last_activity_at']
|
|
p = float_or_zero(g) > min_passing
|
|
print( "%s: a %s, grade of %s. Passed? %s. Last seen: %s" % (n,r,str(g),str(p),l) )
|
|
if p:
|
|
passed.append( [n, i, r, g, l ] )
|
|
else:
|
|
didnt.append( [n, i, r, g, l ] )
|
|
except:
|
|
pass
|
|
|
|
columns = ['name','canvas_id','role','grade','last_activity']
|
|
pp = pd.DataFrame(passed, columns=columns)
|
|
pp.sort_values(by='last_activity',inplace=True)
|
|
pp.to_csv('cache/teacherdata/bootcamp_passed.csv', index=False)
|
|
dd = pd.DataFrame(didnt, columns=columns)
|
|
dd.sort_values(by='last_activity',inplace=True)
|
|
dd.to_csv('cache/teacherdata/bootcamp_active.csv', index=False)
|
|
|
|
print("Saved output to ./teachers/bootcamp_*")
|
|
|
|
# Gott 1A
|
|
"""course = '2908'
|
|
quiz = '15250'
|
|
pass_grade = 0.90
|
|
|
|
path = url + '/api/v1/courses/%s/quizzes/%s/submissions' % (course,quiz)
|
|
q_subs = fetch_collapse(path, 'quiz_submissions')
|
|
for Q in q_subs:
|
|
prct = float_or_zero(Q['score']) / float_or_zero( Q['quiz_points_possible'] )
|
|
print( 'Passed: %s\t Score: %s,\tUser: %s' % \
|
|
( str(prct>0.9), str(int_or_zero(Q['score'])), Q['user_id'] ))"""
|
|
|
|
|
|
|
|
|
|
# Change courses to show 2 announcements
|
|
def change_course_ann_homepage(id="10458"):
|
|
u = url + "/api/v1/courses/%s/settings" % id
|
|
data = { 'show_announcements_on_home_page':'true', \
|
|
'home_page_announcement_limit':'2'}
|
|
r = requests.put(u, data=data, headers=header)
|
|
print(r.text)
|
|
|
|
|
|
|
|
# All students enrolled in a class in the given semester. Simpler verson of below. Return SET of course_ids.
|
|
def users_in_semester():
|
|
all_c = getCoursesInTerm('65',0,0) # fall 2020 TODO
|
|
all_s = set()
|
|
for c in all_c:
|
|
for u in course_enrollment(c['id']).values():
|
|
if u['type'] != "StudentEnrollment": continue
|
|
all_s.add(u['id'])
|
|
return all_s
|
|
|
|
|
|
#
|
|
# All students (and faculty) in STEM (or any list of depts.. match the course_code). Return SET of canvas ids.
|
|
def users_in_by_depts_live(depts=[], termid='181'):
|
|
courses_by_by_dept = {}
|
|
students_by_by_dept = {}
|
|
|
|
all_c = getCoursesInTerm(termid,1,0)
|
|
codecs.open('cache/courses_in_term_%s.json' % termid,'w','utf-8').write( json.dumps(all_c,indent=2) )
|
|
for c in all_c:
|
|
#print(c['course_code'])
|
|
for d in depts:
|
|
#print("Dept: %s" % d)
|
|
match = re.search('^(%s)' % d, c['course_code'])
|
|
if match:
|
|
if d == "STAT" and match.group() == "STAT":
|
|
print("STAT")
|
|
else:
|
|
continue
|
|
print("Getting enrollments for %s" % c['course_code'])
|
|
if d in courses_by_by_dept: courses_by_by_dept[d].append(c)
|
|
else: courses_by_by_dept[d] = [ c, ]
|
|
for u in course_enrollment_with_faculty(c['id'],0).values():
|
|
#if u['type'] != "StudentEnrollment": continue
|
|
if not (d in students_by_by_dept):
|
|
students_by_by_dept[d] = set()
|
|
students_by_by_dept[d].add(u['user_id'])
|
|
continue
|
|
print(students_by_by_dept)
|
|
codecs.open('cache/students_by_by_dept_in_term_%s.json' % termid,'w','utf-8').write( str(students_by_by_dept) )
|
|
all_students = set()
|
|
for dd in students_by_by_dept.values(): all_students.update(dd)
|
|
codecs.open('cache/all_students_in_by_depts_in_term_%s.json' % termid,'w','utf-8').write( str(all_students) )
|
|
return all_students
|
|
|
|
# Course enrollment, including teachers
|
|
def course_enrollment_with_faculty(id='', verbose=0):
|
|
if verbose: print("Getting enrollments for course id %s" % str(id))
|
|
if not id:
|
|
id = input('Course id? ')
|
|
t = url + '/api/v1/courses/%s/enrollments' % str(id)
|
|
if verbose: print(t)
|
|
emts = fetch(t,verbose)
|
|
if verbose: print(emts)
|
|
emt_by_id = {}
|
|
for E in emts:
|
|
if verbose: print(E)
|
|
try:
|
|
emt_by_id[E['user_id']] = E
|
|
except Exception as exp:
|
|
print("Skipped [%s] with this exception: %s" % (str(E), str(exp)))
|
|
ff = codecs.open('cache/courses/%s.json' % str(id), 'w', 'utf-8')
|
|
ff.write(json.dumps(emt_by_id, indent=2))
|
|
if verbose: print( " %i results" % len(emts) )
|
|
return emt_by_id
|
|
|
|
|
|
# Course enrollment list, students only
|
|
def course_enrollment(id='', verbose=0):
|
|
if verbose: print("Getting enrollments for course id %s" % str(id))
|
|
if not id:
|
|
id = input('Course id? ')
|
|
t = url + '/api/v1/courses/%s/enrollments?role[]=StudentEnrollment' % str(id)
|
|
if verbose: print(t)
|
|
emts = fetch(t,verbose)
|
|
if verbose: print(emts)
|
|
emt_by_id = {}
|
|
for E in emts:
|
|
if verbose: print(E)
|
|
try:
|
|
emt_by_id[E['user_id']] = E
|
|
except Exception as exp:
|
|
print("Skipped [%s] with this exception: %s" % (str(E), str(exp)))
|
|
ff = codecs.open('cache/courses/%s.json' % str(id), 'w', 'utf-8')
|
|
ff.write(json.dumps(emt_by_id, indent=2))
|
|
if verbose: print( " %i results" % len(emts) )
|
|
return emt_by_id
|
|
|
|
|
|
def askForTerms():
|
|
user_input = input("The term id? (separate multiples with commas) ")
|
|
return user_input.split(",")
|
|
|
|
"""
|
|
names = []
|
|
if not term:
|
|
s = url + '/api/v1/accounts/1/terms?workflow_state[]=all'
|
|
s = fetch_collapse(s,"enrollment_terms",1)
|
|
print(json.dumps(s,indent=2))
|
|
print("Terms: ")
|
|
for u in s:
|
|
print(str(u['id']) + "\t" + u['name'])
|
|
#print json.dumps(results_by_dept,indent=2)
|
|
term = input("The term id? ")
|
|
"""
|
|
|
|
|
|
|
|
# Return a list of term names and IDs. Also store in cache/courses/terms.txt
|
|
def getTerms(printme=1, ask=1):
|
|
s = url + '/api/v1/accounts/1/terms' #?workflow_state[]=all'
|
|
terms = fetch_collapse(s,'enrollment_terms')
|
|
ff = codecs.open('cache/courses/terms.txt', 'w', 'utf-8') # TODO unsafe overwrite
|
|
#print(terms)
|
|
ff.write(json.dumps(terms, indent=2))
|
|
ff.close()
|
|
|
|
if printme:
|
|
print("Terms: ")
|
|
for u in terms:
|
|
print(str(u['id']) + "\t" + u['name'])
|
|
if ask:
|
|
return input("The term id? ")
|
|
return terms
|
|
|
|
def getCourses(x=0): # a dict
|
|
if not x:
|
|
user_input = input("The Course IDs to get? (separate with spaces: ")
|
|
courselist = list(map(int, user_input.split()))
|
|
else:
|
|
courselist = [x, ]
|
|
|
|
for id in courselist:
|
|
t = url + '/api/v1/courses/' + str(id) # + '?perpage=100'
|
|
t = fetch(t,0)
|
|
#print(t)
|
|
return t
|
|
|
|
|
|
def update_course_conclude(courseid="13590",enddate='2021-12-23T01:00Z'):
|
|
(connection,cursor) = db()
|
|
q = "SELECT * FROM courses AS c WHERE c.code LIKE '%FA21%' AND c.conclude='2021-08-29 07:00:00.000'"
|
|
result = cursor.execute(q)
|
|
for R in result:
|
|
try:
|
|
#print(R)
|
|
print('doing course: %s' % R[6])
|
|
courseid = R[1]
|
|
#d = getCourses(courseid)
|
|
#print("\tconclude on: %s" % d['end_at'])
|
|
|
|
data = { 'course[end_at]': enddate }
|
|
t = url + '/api/v1/courses/' + str(courseid)
|
|
r3 = requests.put(t, headers=header, params=data)
|
|
#print(" " + r3.text)
|
|
except Exception as e:
|
|
print('****%s' % str(e))
|
|
|
|
# Relevant stuff trying to see if its even being used or not
|
|
def course_term_summary_local(term="180",term_label="FA23"):
|
|
O = "\t<li>Course: <a href='%s' target='_blank' class='%s'>%s</a><br />Status: <b>%s</b><br />Teacher: %s<br />Number students: %s</li>\n"
|
|
courses = get_courses_in_term_local(term)
|
|
oo = codecs.open(f'cache/semester_summary_{term_label}.html','w','utf-8')
|
|
oo.write('<style>.a{background-color:yellow;}.b{background-color:pink;}</style><ul>\n')
|
|
|
|
for C in sorted(courses):
|
|
style = ''
|
|
info = course_quick_stats(C[3])
|
|
sinfo = course_student_stats(C[3])
|
|
D = list(C)
|
|
D.append(info)
|
|
D.append(sinfo)
|
|
#print(D)
|
|
if D[6][0][0] == 0: continue
|
|
if D[2] == 'claimed': style="a"
|
|
mystr = O % ( "https://ilearn.gavilan.edu/courses/"+str(D[3]), style, D[1], D[2], str(', '.join(D[5])), str(D[6][0][0]))
|
|
print(D[1])
|
|
oo.write(mystr )
|
|
oo.flush()
|
|
#print(info)
|
|
oo.write('\n</ul>\n')
|
|
|
|
|
|
# Fetch all courses in a given term
|
|
def getCoursesInTerm(term=0,get_fresh=1,show=1,active=0): # a list
|
|
if not term:
|
|
term = getTerms(1,1)
|
|
ff = 'cache/courses_in_term_%s.json' % str(term)
|
|
if not get_fresh:
|
|
if os.path.isfile(ff):
|
|
return json.loads( codecs.open(ff,'r','utf-8').read() )
|
|
else:
|
|
print(" -> couldn't find cached classes at: %s" % ff)
|
|
|
|
# https://gavilan.instructure.com:443/api/v1/accounts/1/courses?published=true&enrollment_term_id=11
|
|
names = []
|
|
if active:
|
|
active = "published=true&"
|
|
else:
|
|
active = ""
|
|
t = f"{url}/api/v1/accounts/1/courses?{active}enrollment_term_id={term}"
|
|
results = fetch(t,show)
|
|
if show:
|
|
for R in results:
|
|
try:
|
|
print(str(R['id']) + "\t" + R['name'])
|
|
except Exception as e:
|
|
print("Caused a problem: ")
|
|
print(R)
|
|
#print json.dumps(results,indent=2)
|
|
info = []
|
|
for a in results:
|
|
names.append(a['name'])
|
|
info.append( [a['id'], a['name'], a['workflow_state'] ] )
|
|
if show: print_table(info)
|
|
codecs.open(ff, 'w', 'utf-8').write(json.dumps(results,indent=2))
|
|
return results
|
|
|
|
|
|
def getCoursesTermSearch(term=0,search='',v=0):
|
|
term = term or input("term id? ")
|
|
search = search or input("What to search for? ")
|
|
|
|
s = url + '/api/v1/accounts/1/courses?enrollment_term_id=%s&search_term=%s' % ( str(term) , search )
|
|
if v: print(s)
|
|
|
|
courses = fetch(s)
|
|
if v: print(json.dumps(courses,indent=2))
|
|
return courses
|
|
|
|
def courseLineSummary(c,sections={}):
|
|
ss = "\t"
|
|
crn = "\t"
|
|
host = ""
|
|
if 'crn' in c:
|
|
crn = "crn: %s\t" % c['crn']
|
|
|
|
if c['id'] in sections:
|
|
ss = "section: %s\t" % str(sections[c['id']])
|
|
|
|
if 'host' in c:
|
|
host = "send to crn: %s\t" % c['host']
|
|
|
|
out = "%i\t%s%s%s%s" % (c['id'], ss ,crn, host, c['name'])
|
|
return out
|
|
|
|
def xlistLineSummary(c,sections={}):
|
|
# can_id incoming_sec_id crn name
|
|
|
|
new_sec = "missing"
|
|
if 'partner' in c and 'sectionid' in c['partner']:
|
|
new_sec = c['partner']['sectionid']
|
|
|
|
out = "can_id:%i\t new_sec_id:%s\t crn:%s\t %s" % (c['id'], new_sec ,c['crn'], c['name'])
|
|
return out
|
|
|
|
def numbers_in_common(L):
|
|
# how many leading numbers do the strings in L share?
|
|
for i in [0,1,2,3,4]:
|
|
number = L[0][i]
|
|
for s in L:
|
|
#print("%s -> %s" % (number,s[i]))
|
|
if s[i] != number: return i
|
|
return 5
|
|
|
|
def combined_name(nic,L):
|
|
# string with prettier section numbers combined
|
|
if len(L) < 2:
|
|
return L[0]
|
|
return "/".join(L)
|
|
|
|
# old method of trying to shorten section numbers
|
|
if nic < 2:
|
|
return "/".join(L)
|
|
L_mod = [ x[nic:6] for x in L]
|
|
L_mod[0] = L[0]
|
|
new_name = "/".join(L_mod)
|
|
#print(nic, " ", L_mod)
|
|
return new_name
|
|
|
|
def all_equal2(iterator):
|
|
return len(set(iterator)) <= 1
|
|
|
|
|
|
def semester_cross_lister():
|
|
tt = find_term( input("term? (ex: fa25) ") )
|
|
|
|
if not tt or (not 'canvas_term_id' in tt) or (not 'code' in tt):
|
|
print(f"Couldn't find term.")
|
|
return
|
|
|
|
term = tt['canvas_term_id']
|
|
sem = tt['code']
|
|
|
|
xlist_filename = f"cache/{sem}_crosslist.csv"
|
|
checkfile = codecs.open('cache/xlist_check.html','w','utf-8')
|
|
checkfile.write('<html><body><table>\n')
|
|
|
|
xlistfile = codecs.open(xlist_filename,'r','utf-8').readlines()[1:]
|
|
by_section = {}
|
|
by_group = defaultdict( list )
|
|
crn_to_canvasid = {}
|
|
crn_to_canvasname = {}
|
|
crn_to_canvascode = {}
|
|
|
|
get_fresh = 1
|
|
c = getCoursesInTerm(term,get_fresh,0)
|
|
|
|
for C in c:
|
|
if 'sis_course_id' in C and C['sis_course_id']:
|
|
crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id'])
|
|
crn_to_canvasname[C['sis_course_id'][7:13]] = str(C['name'])
|
|
crn_to_canvascode[C['sis_course_id'][7:13]] = str(C['course_code'])
|
|
# "Term","PrtTerm","xlstGroup","Subject","CrseNo","EffectCrseTitle","CRN","Session","SecSchdType","AttnMeth","MtgSchdType","MtgType","MaxEnroll","TotalEnroll","SeatsAvail","Bldg","Room","Units","LecHrs","LabHrs","HrsPerDay","HrsPerWk","TotalHrs","Days","D/E","Wks","BegTime","EndTime","StartDate","EndDate","LastName","FirstName","PercentResp"
|
|
for xc in xlistfile:
|
|
parts = xc.split(r',')
|
|
course = parts[2] + " " + parts[3]
|
|
group = parts[1]
|
|
crn = parts[5]
|
|
|
|
if crn in crn_to_canvasid:
|
|
cid = crn_to_canvasid[crn]
|
|
oldname = crn_to_canvasname[crn]
|
|
oldcode = crn_to_canvascode[crn]
|
|
else:
|
|
print("! Not seeing crn %s in canvas semester" % crn)
|
|
cid = ''
|
|
oldname = ''
|
|
oldcode = ''
|
|
|
|
if crn in by_section: continue
|
|
by_section[crn] = [crn, course, group, cid, oldname, oldcode]
|
|
by_group[group].append( [crn, course, group, cid, oldname, oldcode] )
|
|
|
|
for x in by_section.values():
|
|
print(x)
|
|
href = '<a target="_blank" href="%s">%s</a>' % ('https://ilearn.gavilan.edu/courses/'+x[3]+'/settings#tab-details', x[3])
|
|
checkfile.write('<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>' % (x[0],x[2],x[1],href) )
|
|
checkfile.write('</table></body></html>')
|
|
|
|
print("GROUPS")
|
|
for y in by_group.keys():
|
|
sects = [ z[0] for z in by_group[y] ]
|
|
sects.sort()
|
|
nic = numbers_in_common(sects)
|
|
new_sec = combined_name(nic,sects)
|
|
|
|
# same dept?
|
|
depts_list = [ z[1].split(' ')[0] for z in by_group[y] ]
|
|
nums_list = list(set([ z[1].split(' ')[1] for z in by_group[y] ]))
|
|
if all_equal2(depts_list):
|
|
depts = depts_list[0]
|
|
nums_list.sort()
|
|
nums = '/'.join(nums_list)
|
|
else:
|
|
depts = list(set(depts_list))
|
|
depts.sort()
|
|
depts = '/'.join(depts )
|
|
nums = by_group[y][0][1].split(' ')[1]
|
|
|
|
new_name = f"{depts}{nums} {' '.join(by_group[y][0][4].split(' ')[1:-1])} {new_sec}"
|
|
#new_name = by_group[y][0][4][0:-5] + new_sec
|
|
new_code = f"{depts}{nums} {sem.upper()} {new_sec}"
|
|
#new_code = by_group[y][0][5][0:-5] + new_sec
|
|
print(y)
|
|
print("\t", sects)
|
|
#print("\tThey share %i leading numbers" % nic)
|
|
print("\t", by_group[y])
|
|
|
|
host_id = by_group[y][0][3]
|
|
sections = by_group[y][1:]
|
|
|
|
for target_section in sections:
|
|
xlist_ii(target_section[3],host_id,new_name,new_code)
|
|
#pass
|
|
|
|
def do_manual_xlist():
|
|
infile = [ x.strip() for x in open('cache/sp25_manual_crosslist.txt','r').readlines() ]
|
|
for L in infile:
|
|
print(L)
|
|
paraL,host = L.split(' -> ')
|
|
para_list = paraL.split(',')
|
|
print(host)
|
|
print(para_list)
|
|
xlist(host, para_list)
|
|
|
|
def ez_xlist():
|
|
host = int(input('what is the host id? '))
|
|
parasite = input('what are parasite ids? (separate with commas) ')
|
|
parasite = [ int(x) for x in parasite.split(',') ]
|
|
xlist(host,parasite)
|
|
|
|
# Crosslist given 2 ids, computing the new name and code
|
|
def xlist(host_id, parasite_list):
|
|
host_info = course_from_id(host_id)
|
|
|
|
if not host_info:
|
|
print(f"Couldn't find course id {host_id} in database. Do you need to update it?")
|
|
return ""
|
|
host_info['crn'] = host_info['sis_source_id'][7:]
|
|
host_info['dept'] = dept_from_name( host_info['course_code'] )
|
|
host_info['num'] = num_from_name(host_info['course_code'] )
|
|
host_info['bare_name'] = ' '.join(host_info['name'].split(' ')[1:-1]) # name without course code or crn
|
|
|
|
sem = host_info['course_code'].split(' ')[1]
|
|
|
|
para_info_list = [ course_from_id(x) for x in parasite_list ]
|
|
for p in para_info_list:
|
|
if not p:
|
|
print(f"Couldn't find course id for parasite in database. Do you need to update it?")
|
|
return ""
|
|
p['crn'] = p['sis_source_id'][7:]
|
|
p['dept'] = dept_from_name(p['course_code'] )
|
|
p['num'] = num_from_name(p['course_code'] )
|
|
p['bare_name'] = ' '.join(p['name'].split(' ')[1:-1]) # name without course code or crn
|
|
all = para_info_list.copy()
|
|
all.append(host_info)
|
|
|
|
# determine new name and code
|
|
sects = [ z['crn'] for z in all ]
|
|
sects.sort()
|
|
nic = numbers_in_common(sects)
|
|
new_sec = combined_name(nic,sects)
|
|
|
|
# same dept?
|
|
depts_list = [ z['dept'] for z in all ]
|
|
nums_list = list(set([ z['num'] for z in all ]))
|
|
if all_equal2(depts_list):
|
|
depts = depts_list[0]
|
|
nums_list.sort()
|
|
nums = '/'.join(nums_list)
|
|
else:
|
|
depts = list(set(depts_list))
|
|
depts.sort()
|
|
depts = '/'.join(depts )
|
|
nums = all[0]['num']
|
|
|
|
new_name = f"{depts}{nums} {all[0]['bare_name']} {new_sec}"
|
|
#new_name = by_group[y][0][4][0:-5] + new_sec
|
|
new_code = f"{depts}{nums} {sem.upper()} {new_sec}"
|
|
#new_code = by_group[y][0][5][0:-5] + new_sec
|
|
print(f"New name: {new_name}")
|
|
print(f"New code: {new_code}")
|
|
print(sects)
|
|
|
|
for target_section in para_info_list:
|
|
xlist_ii(target_section['id'],host_id,new_name,new_code)
|
|
|
|
|
|
|
|
# Perform an actual cross-list, given 2 id numbers, new name and code
|
|
def xlist_ii(parasite_id,host_id,new_name,new_code):
|
|
print("Parasite id: ",parasite_id," Host id: ", host_id)
|
|
print("New name: ", new_name)
|
|
print("New code: ", new_code)
|
|
xyz = 'y'
|
|
#xyz = input("Perform cross list? Enter y for yes, n for no: ")
|
|
if xyz != 'n':
|
|
try:
|
|
uu = url + '/api/v1/courses/%s/sections' % parasite_id
|
|
c_sect = fetch(uu)
|
|
#print(json.dumps(c_sect,indent=2))
|
|
if len(c_sect) > 1:
|
|
print("* * * * Already Crosslisted!!")
|
|
return
|
|
if not c_sect:
|
|
print("* * * * Already Crosslisted!!")
|
|
return
|
|
else:
|
|
parasite_sxn_id = str(c_sect[0]['id'])
|
|
print("Parasite section id: ", parasite_sxn_id)
|
|
|
|
u = url + "/api/v1/sections/%s/crosslist/%s" % (parasite_sxn_id,host_id)
|
|
print(u)
|
|
res = requests.post(u, headers = header)
|
|
print(res.text)
|
|
|
|
u3 = url + "/api/v1/courses/%s" % host_id
|
|
data = {'course[name]': new_name, 'course[course_code]': new_code}
|
|
print(data)
|
|
print(u3)
|
|
r3 = requests.put(u3, headers=header, params=data)
|
|
print(r3.text)
|
|
print("\n\n")
|
|
except Exception as e:
|
|
print(f"\n\nSome sort of failure on {new_name}: {e}")
|
|
|
|
|
|
# Relevant stuff trying to see if its even being used or not
|
|
# relies on schedule being in database
|
|
def course_term_summary():
|
|
term = find_term( input("term? (ex: fa25) ") )
|
|
|
|
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
|
|
print(f"Couldn't find term.")
|
|
return
|
|
|
|
term = term['canvas_term_id']
|
|
SEM = term['code']
|
|
|
|
|
|
print(f"Summary of {SEM}")
|
|
get_fresh = 1
|
|
courses = getCoursesInTerm(term, get_fresh, 0)
|
|
|
|
print(f"output to cache/term_summary_{term}.csv")
|
|
outp = codecs.open(f'cache/term_summary_{term}.csv','w','utf-8')
|
|
outp.write('id,name,view,type,state,sched_start,ilearn_start,sched_students,ilearn_students,num_teachers,teacher1,teacher2,teacher2\n')
|
|
|
|
for c in courses:
|
|
c_db = course_from_id(c['id'])
|
|
try:
|
|
ilearn_start = c_db['start_at']
|
|
s_db = course_sched_entry_from_id(c['id'])
|
|
except:
|
|
print(f"problem with this course: {c_db}")
|
|
continue
|
|
sched_start = ''
|
|
sched_students = ''
|
|
type = ''
|
|
if (s_db):
|
|
sched_start = s_db['start']
|
|
sched_students =s_db['act']
|
|
type = s_db['type']
|
|
#print(s_db)
|
|
num_students = student_count(c['id'])
|
|
tchr = teacher_list(c['id'])
|
|
tt = ','.join([x[1] for x in tchr])
|
|
|
|
line = f"{c['id']},{c['course_code']},{c['default_view']},{type},{c['workflow_state']},{sched_start},{ilearn_start},{sched_students},{num_students},{len(tchr)},{tt}"
|
|
print(line)
|
|
outp.write(line + "\n")
|
|
return
|
|
|
|
tup = tuple("id course_code default_view workflow_state".split(" "))
|
|
smaller = [ funcy.project(x , tup) for x in courses ]
|
|
#print(json.dumps(smaller, indent=2))
|
|
by_code = {}
|
|
(connection,cursor) = db()
|
|
(pub, not_pub) = funcy.split( lambda x: x['workflow_state'] == "available", smaller)
|
|
|
|
for S in smaller:
|
|
print(S)
|
|
by_code[ S['course_code'] ] = str(S) + "\n"
|
|
outp.write( str(S) + "\n" )
|
|
q = """SELECT c.id AS courseid, c.code, tt.name, c.state, COUNT(u.id) AS student_count FROM courses AS c
|
|
JOIN enrollment AS e ON e.course_id=c.id
|
|
JOIN users AS u ON u.id=e.user_id
|
|
JOIN ( SELECT c.id AS courseid, u.id AS userid, c.code, u.name FROM courses AS c
|
|
JOIN enrollment AS e ON e.course_id=c.id
|
|
JOIN users AS u ON u.id=e.user_id
|
|
WHERE c.canvasid=%s
|
|
AND e."type"="TeacherEnrollment" ) AS tt ON c.id=tt.courseid
|
|
WHERE c.canvasid=%s
|
|
AND e."type"="StudentEnrollment"
|
|
GROUP BY c.code ORDER BY c.state, c.code""" % (S['id'],S['id'])
|
|
result = cursor.execute(q)
|
|
for R in result:
|
|
print(R)
|
|
by_code[ S['course_code'] ] += str(R) + "\n"
|
|
outp.write( str(R) + "\n\n" )
|
|
pages = fetch(url + "/api/v1/courses/%s/pages" % S['id'])
|
|
by_code[ S['course_code'] ] += json.dumps(pages, indent=2) + "\n\n"
|
|
modules = fetch(url + "/api/v1/courses/%s/modules" % S['id'])
|
|
by_code[ S['course_code'] ] += json.dumps(modules, indent=2) + "\n\n"
|
|
|
|
print()
|
|
|
|
out2 = codecs.open('cache/summary2.txt','w', 'utf-8')
|
|
|
|
for K in sorted(by_code.keys()):
|
|
out2.write('\n------ ' + K + '\n' + by_code[K])
|
|
out2.flush()
|
|
|
|
return
|
|
|
|
#published = list(funcy.where( smaller, workflow_state="available" ))
|
|
#notpub = list(filter( lambda x: x['workflow_state'] != "available", smaller))
|
|
notpub_ids = [ x['id'] for x in notpub ]
|
|
|
|
#for ix in notpub_ids:
|
|
# # print(course_quick_stats(ix))
|
|
|
|
|
|
outp.write(json.dumps(courses, indent=2))
|
|
|
|
outp2 = codecs.open('cache/term_summary_pub.txt','w','utf-8')
|
|
outp2.write("PUBLISHED\n\n" + json.dumps(published, indent=2))
|
|
outp2.write("\n\n---------\nNOT PUBLISHED\n\n" + json.dumps(notpub, indent=2))
|
|
|
|
def course_term_summary_2():
|
|
lines = codecs.open('cache/term_summary.txt','r','utf-8').readlines()
|
|
output = codecs.open('cache/term_summary.html','w','utf-8')
|
|
for L in lines:
|
|
try:
|
|
L = L.strip()
|
|
print(L)
|
|
if re.search('unpublished',L):
|
|
m = re.search(r"'id': (\d+),",L)
|
|
m2 = re.search(r"'course_code': '(.+?)',",L)
|
|
if m:
|
|
ss = "<br />Course: <a href='%s' target='_blank'>%s</a><br />" % ("https://ilearn.gavilan.edu/courses/"+str(m.group(1)), m2.group(1))
|
|
output.write( ss )
|
|
print(ss+"\n")
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
|
|
# check number of students and publish state of all shells in a term
|
|
'''
|
|
def all_semester_course_sanity_check():
|
|
term = "su25"
|
|
target_start = "6-14"
|
|
outputfile = f'cache/courses_checker_{term}.csv'
|
|
t = 288
|
|
c = getCoursesInTerm(t,1,0)
|
|
sched1 = requests.get(f"http://gavilan.cc/schedule/{term}_sched_expanded.json").json()
|
|
sched = { x['crn']: x for x in sched1 }
|
|
#codecs.open('cache/courses_in_term_{t}.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
#output = codecs.open('cache/courses_w_sections.csv','w','utf-8')
|
|
#output.write( ",".join(['what','id','parent_course_id','sis_course_id','name']) + "\n" )
|
|
output2 = codecs.open(outputfile,'w','utf-8')
|
|
output2.write( ",".join(['id','sis_course_id','name','state','mode','startdate','students']) + "\n" )
|
|
htmlout = codecs.open(f'cache/courses_checker_{term}.html','w','utf-8')
|
|
htmlout.write('<html><body><table>\n')
|
|
htmlout.write(f'<tr><td><b>Name</b></td><td><b>SIS ID</b></td><td><b>State</b></td><td><b>Mode</b></td><td><b>Start Date</b></td><td><b># Stu</b></td></tr>\n')
|
|
html_sections = []
|
|
i = 0
|
|
for course in c:
|
|
try:
|
|
u2 = url + '/api/v1/courses/%s?include[]=total_students' % str(course['id'])
|
|
course['info'] = fetch(u2)
|
|
|
|
# correlate to schedule
|
|
crn = course['sis_course_id'][7:]
|
|
ctype = '?'
|
|
cstart = '?'
|
|
ts = '?'
|
|
if crn in sched:
|
|
ctype = sched[crn]['type']
|
|
cstart = sched[crn]['start']
|
|
ts = sched[crn]['act']
|
|
teacher = sched[crn]['teacher']
|
|
|
|
info = [ 'course', course['id'], '', course['sis_course_id'], course['name'], course['workflow_state'], ts ]
|
|
info = list(map(str,info))
|
|
info2 = [ course['id'], course['sis_course_id'], course['name'], course['workflow_state'], ctype, cstart, ts, teacher ]
|
|
info2 = list(map(str,info2))
|
|
output2.write( ",".join(info2) + "\n" )
|
|
output2.flush()
|
|
print(info2)
|
|
#output.write( ",".join(info) + "\n" )
|
|
|
|
uu = f"https://ilearn.gavilan.edu/courses/{course['id']}"
|
|
if course["workflow_state"]=='unpublished' and ctype=='online' and cstart==target_start:
|
|
html_sections.append(f'<!--{course["name"]}--><tr><td><a href="{uu}" target="_blank">{course["name"]}</a></td><td>{course["sis_course_id"]}</td><td>{course["workflow_state"]}</td><td>{ctype}</td><td>{cstart}</td><td>{ts}</td><td>{teacher}</td></tr>\n')
|
|
#uu = url + '/api/v1/courses/%s/sections' % str(course['id'])
|
|
#course['sections'] = fetch(uu)
|
|
#s_info = [ [ 'section', y['id'], y['course_id'], y['sis_course_id'], y['name'], y['total_students'] ] for y in course['sections'] ]
|
|
#for row in s_info:
|
|
# print(row)
|
|
# output.write( ",".join( map(str,row) ) + "\n" )
|
|
#output.flush()
|
|
i += 1
|
|
#if i % 5 == 0:
|
|
# codecs.open('cache/courses_w_sections.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
except Exception as e:
|
|
print(f"error on {course}")
|
|
print(f"{e}")
|
|
#codecs.open('cache/courses_w_sections.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
|
|
html_sections.sort()
|
|
for h in html_sections:
|
|
htmlout.write(h)
|
|
htmlout.write('</table></body></html>\n')
|
|
print(f"wrote to {outputfile}")
|
|
'''
|
|
|
|
|
|
def eslCrosslister():
|
|
fives = []
|
|
sevens = []
|
|
others = []
|
|
|
|
course_by_crn = {}
|
|
|
|
sections = {}
|
|
|
|
combos = [ [y.strip() for y in x.split(',') ] for x in open('cache/xcombos.txt','r').readlines() ]
|
|
|
|
combo_checklist = [ 0 for i in range(len(combos)) ]
|
|
|
|
#print("\n\nCombos:")
|
|
#[ print("%s - %s" % (x[0],x[1])) for x in combos]
|
|
|
|
#return
|
|
|
|
courses = getCoursesTermSearch(62,"ESL",0)
|
|
|
|
for C in courses:
|
|
ma = re.search( r'(\d{5})', C['name'])
|
|
if ma:
|
|
#print("Found Section: %s from course %s" % (ma.group(1), C['name']))
|
|
C['crn'] = ma.group(1)
|
|
course_by_crn[C['crn']] = C
|
|
|
|
if C['name'].startswith("ESL5"): fives.append(C)
|
|
elif C['name'].startswith("ESL7"): sevens.append(C)
|
|
else: others.append(C)
|
|
|
|
for S in sevens:
|
|
uu = url + '/api/v1/courses/%i/sections' % S['id']
|
|
#print(uu)
|
|
c_sect = fetch(uu)
|
|
print(".",end='')
|
|
#print(json.dumps(c_sect,indent=2))
|
|
if len(c_sect) > 1:
|
|
print("* * * * Already Crosslisted!!")
|
|
if c_sect:
|
|
sections[ S['id'] ] = c_sect[0]['id']
|
|
S['sectionid'] = c_sect[0]['id']
|
|
|
|
if S['crn']:
|
|
for i,co in enumerate(combos):
|
|
if S['crn'] == co[0]:
|
|
S['partner'] = co[1]
|
|
combo_checklist[i] = 1
|
|
course_by_crn[co[1]]['partner'] = S
|
|
elif S['crn'] == co[1]:
|
|
S['partner'] = co[0]
|
|
combo_checklist[i] = 1
|
|
course_by_crn[co[0]]['partner'] = S
|
|
|
|
|
|
print("Others:")
|
|
for F in sorted(others, key=lambda x: x['name']):
|
|
print(courseLineSummary(F))
|
|
|
|
print("\n\nFive hundreds")
|
|
for F in sorted(fives, key=lambda x: x['name']):
|
|
print(courseLineSummary(F))
|
|
|
|
print("\n\nSeven hundreds")
|
|
for F in sorted(sevens, key=lambda x: x['name']):
|
|
print(courseLineSummary(F,sections))
|
|
|
|
|
|
print("\n\nMake a x-list: ")
|
|
for F in sorted(fives, key=lambda x: x['name']):
|
|
if 'partner' in F:
|
|
print(xlistLineSummary(F,sections))
|
|
if 'partner' in F and 'sectionid' in F['partner']:
|
|
if not input('ready to crosslist. Are you? Enter "q" to quit. ') == 'q':
|
|
xlist( F['partner']['sectionid'], F['id'] )
|
|
else:
|
|
break
|
|
for i,c in enumerate(combo_checklist):
|
|
if not c:
|
|
print("Didn't catch: "+ str(combos[i]))
|
|
|
|
def xlist_iii(parasite='', host=''): # section id , new course id
|
|
|
|
host = host or input("ID number of the HOSTING COURSE? ")
|
|
if not parasite:
|
|
parasite = input("ID number of the SECTION to add to above? (or 'q' to quit) ")
|
|
|
|
while parasite != 'q':
|
|
#h_sections = fetch( url + "/api/v1/courses/%s/sections" % str(host))
|
|
#print(h_sections)
|
|
|
|
p_sections = fetch( url + "/api/v1/courses/%s/sections" % str(parasite))
|
|
#print(p_sections)
|
|
parasite_section = p_sections[0]['id']
|
|
# TODO need to get the section id from each course:
|
|
# GET /api/v1/courses/:course_id/sections
|
|
|
|
# POST /api/v1/sections/:id/crosslist/:new_course_id
|
|
# SECTION ID (to move) NEW __COURSE__ ID
|
|
|
|
u = url + "/api/v1/sections/%s/crosslist/%s" % (str(parasite_section),str(host))
|
|
print(u)
|
|
res = requests.post(u, headers = header)
|
|
print(res.text)
|
|
parasite = input("ID number of the SECTION to add to above? ")
|
|
|
|
def unenroll_student(courseid,enrolid):
|
|
t = url + "/api/v1/courses/%s/enrollments/%s" % ( str(courseid), str(enrolid) )
|
|
data = {"task": "delete" }
|
|
r4 = requests.delete(t, headers=header, params=data)
|
|
print(data)
|
|
|
|
#def get_enrollments(courseid):
|
|
# t = url + "/api/v1/courses/%s/enrollments?type=StudentEnrollment" % courseid
|
|
# return fetch(t,1)
|
|
|
|
|
|
def enroll_id_list_to_shell(id_list, shell_id, v=0):
|
|
# id list has pairs, [id,name]
|
|
|
|
id_list = set([i[0] for i in id_list])
|
|
existing = course_enrollment(shell_id) # by user_id
|
|
existing_ids = set( [ x['user_id'] for x in existing.values() ])
|
|
|
|
if v: print("To Enroll: %s" % str(id_list))
|
|
if v: print(r"\n\Already Enrolled: %s" % str(existing_ids))
|
|
|
|
enroll_us = id_list.difference(existing_ids)
|
|
if v: print("\n\nTO ENROLL %s" % str(enroll_us))
|
|
|
|
(connection,cursor) = db()
|
|
|
|
for j in enroll_us:
|
|
try:
|
|
q = "SELECT name,id FROM canvas.users u WHERE u.id=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Enrolling: %s" % s[0])
|
|
t = url + '/api/v1/courses/%s/enrollments' % shell_id
|
|
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
|
|
'enrollment[enrollment_state]': 'active' }
|
|
r3 = requests.post(t, headers=header, params=data)
|
|
#print(r3.text)
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
|
|
|
|
# multiple semesters
|
|
def enroll_stem_students_live():
|
|
semesters = [289]
|
|
|
|
for S in semesters:
|
|
enroll_stem_students_live_semester(S)
|
|
|
|
|
|
def enroll_stem_students_live_semester(the_term, do_removes=0):
|
|
import localcache2
|
|
depts = "MATH BIO CHEM CSIS PHYS PSCI GEOG ASTR ECOL ENVS ENGR STAT".split(" ")
|
|
users_to_enroll = users_in_by_depts_live(depts, the_term) # term id
|
|
|
|
stem_enrollments = course_enrollment_with_faculty(stem_course_id) # by user_id
|
|
|
|
users_in_stem_shell = set( [ x['user_id'] for x in stem_enrollments.values() ])
|
|
|
|
print("ALL STEM STUDENTS %s" % str(users_to_enroll))
|
|
print("\n\nALREADY IN STEM SHELL %s" % str(users_in_stem_shell))
|
|
|
|
enroll_us = users_to_enroll.difference(users_in_stem_shell)
|
|
remove_us = users_in_stem_shell.difference(users_to_enroll)
|
|
|
|
print("\n\nTO ENROLL %s" % str(enroll_us))
|
|
(connection,cursor) = localcache2.db()
|
|
|
|
#xyz = input('enter to continue')
|
|
|
|
|
|
|
|
eee = 0
|
|
uuu = 0
|
|
|
|
if do_removes:
|
|
print("\n\nTO REMOVE %s" % str(remove_us))
|
|
for j in remove_us:
|
|
try:
|
|
q = "SELECT name,id FROM canvas.users WHERE id=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Removing: %s" % s[0])
|
|
r1 = unenroll_student(str(stem_course_id), stem_enrollments[j]['id'])
|
|
print(r1)
|
|
uuu += 1
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
|
|
for j in enroll_us:
|
|
try:
|
|
q = "SELECT name,id FROM canvas.users WHERE id=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Enrolling: %s" % s[0])
|
|
enrollment = { }
|
|
#print(s)
|
|
t = url + '/api/v1/courses/%s/enrollments' % stem_course_id
|
|
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
|
|
'enrollment[enrollment_state]': 'active' }
|
|
#print(data)
|
|
#if input('enter to enroll %s or q to quit: ' % s[0]) == 'q':
|
|
#break
|
|
r3 = requests.post(t, headers=header, params=data)
|
|
print(data)
|
|
eee += 0
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
#print(r3.text)
|
|
print("\n\nTO ENROLL %s" % str(enroll_us))
|
|
#print("\n\nTO REMOVE %s" % str(remove_us))
|
|
return (eee,uuu)
|
|
|
|
|
|
|
|
###########################
|
|
|
|
def enroll_bulk_students_bydept(course_id, depts, the_term="172", cautious=1): # a string, a list of strings
|
|
users_to_enroll = users_in_by_depts_live(depts, the_term) # term id
|
|
|
|
targeted_enrollments = course_enrollment(course_id) # by user_id.. (live, uses api)
|
|
|
|
current_enrollments = set( [ x['user_id'] for x in targeted_enrollments.values() ])
|
|
|
|
print("ALL TARGET STUDENTS %s" % str(users_to_enroll))
|
|
print("\nALREADY IN SHELL %s" % str(current_enrollments))
|
|
|
|
enroll_us = users_to_enroll.difference(current_enrollments)
|
|
remove_us = current_enrollments.difference(users_to_enroll)
|
|
|
|
print("\n\nTO ENROLL %s" % str(enroll_us))
|
|
xyz = input('enter to continue')
|
|
print("\n\nTO REMOVE %s" % str(remove_us))
|
|
|
|
(connection,cursor) = db()
|
|
|
|
|
|
for j in remove_us:
|
|
try:
|
|
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Removing: %s" % s[0])
|
|
|
|
## TODO not done here
|
|
# r1 = unenroll_student(str(course_id), stem_enrollments[j]['id'])
|
|
#print(r1)
|
|
|
|
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
|
|
for j in enroll_us:
|
|
try:
|
|
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Enrolling: %s" % s[0])
|
|
enrollment = { }
|
|
#print(s)
|
|
t = url + '/api/v1/courses/%s/enrollments' % course_id
|
|
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
|
|
'enrollment[enrollment_state]': 'active' }
|
|
|
|
if cautious:
|
|
print(t)
|
|
print(data)
|
|
prompt = input('enter to enroll %s, k to go ahead with everyone, or q to quit: ' % s[0])
|
|
if prompt == 'q':
|
|
break
|
|
elif prompt == 'k':
|
|
cautious = 0
|
|
r3 = requests.post(t, headers=header, params=data)
|
|
if cautious:
|
|
print(data)
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
#print(r3.text)
|
|
|
|
|
|
def enroll_gott_workshops():
|
|
# stupid gav tls broken
|
|
r = requests.get("https://www.gavilan.edu/staff/tlc/signups.php")
|
|
|
|
text = r.text
|
|
|
|
# Regex to extract the JSON object
|
|
match = re.search(r"var\s+signups\s*=\s*(\[\{.*?\}\]);", text, re.DOTALL)
|
|
|
|
if match:
|
|
json_str = match.group(1) # Extract the JSON string
|
|
try:
|
|
signups = json.loads(json_str) # Convert to Python list of dicts
|
|
# Normalize NBSP and spaces in key fields to make title/date matching robust
|
|
def _norm(v):
|
|
try:
|
|
return str(v).replace('\xa0',' ').strip()
|
|
except Exception:
|
|
return v
|
|
for s in signups:
|
|
if isinstance(s, dict):
|
|
if 'training' in s and s['training'] is not None:
|
|
s['training'] = _norm(s['training'])
|
|
if 'date_rsvp' in s and s['date_rsvp'] is not None:
|
|
s['date_rsvp'] = _norm(s['date_rsvp'])
|
|
#print(json.dumps(signups,indent=2))
|
|
except json.JSONDecodeError as e:
|
|
print("Error decoding JSON:", e)
|
|
return
|
|
else:
|
|
print("JSON object not found")
|
|
return
|
|
|
|
#signups = json.loads(r.text)
|
|
|
|
#signups = json.loads(codecs.open('cache/signups.json','r','utf-8').read())
|
|
|
|
# update w/ users.py #1
|
|
all_staff = json.loads(codecs.open('cache/ilearn_staff.json','r','utf-8').read())
|
|
by_email = { x['email'].lower():x for x in all_staff }
|
|
#print(by_email.keys())
|
|
|
|
workshop_ids = [
|
|
#'GOTT 2: Intro to Async Online Teaching and Learning2023-07-09 17:00:00': 17992,
|
|
#'GOTT 4: Assessment in Digital Learning2023-07-09 17:00:00': 17995,
|
|
#'Restricted to STEM faculty. Humanizing (STEM) Online Learning 2023-06-18 17:00:00': 17996,
|
|
#'GOTT 6: Online Live Teaching and Learning2023-06-11 17:00:00': 17986,
|
|
#'GOTT 5: Essentials of Blended Learning2023-06-25 17:00:00': 17987,
|
|
#'GOTT 5: Essentials of Blended Learning (HyFlex)2023-06-25 17:00:00': 17987,
|
|
#'GOTT 1: Intro to Teaching Online with Canvas2023-05-29 17:00:00': 17985,
|
|
#'GOTT 1: Intro to Teaching Online with Canvas2023-08-20 17:00:00': 17994
|
|
#'GOTT 1: Intro to Online Teaching2024-01-02 16:00:00': 19278,
|
|
#'GOTT 2: Intro to Asynchronous Teaching and Learning2024-01-02 16:00:00': 19222,
|
|
#'GOTT 5: Essentials of Blended Learning2024-01-02 16:00:00': 19223,
|
|
#'GOTT 6: Intro to Live Online Teaching and Learning2024-01-14 16:00:00': 19224,
|
|
#'5/28-6/9 GOTT 1: Intro to Teaching Online 2024-05-28 12:00:00': 20567,
|
|
#'5/28-6/21 GOTT 2: Introduction to Asynchronous Teaching and Design2024-05-28 12:00:00': 20575,
|
|
#'GOTT 4: Assessment in Digital Learning2024-06-02 17:00:00': 20600, # 6/2
|
|
#'6/10-6/23 GOTT 5: Essentials of Blended Learning, Hyflex2024-06-10 12:00:00': 20568,
|
|
#'6/17-6/30 GOTT 6 Introduction to Live Online Teaching and Learning2024-06-17 12:00:00': 20569,
|
|
#'GOTT 1 Intro to Teaching Online AUG242024-07-29 12:00:00': 20603, # 7/29
|
|
#['2025-01-01 16:00:00 GOTT 1: Intro to Teaching Online with Canvas', 21770, 'enroll_gott1.txt'],
|
|
#['2025-01-01 16:00:00 GOTT 2: Introduction to Asynchronous Teaching and Design', 21772, 'enroll_gott2.txt']
|
|
|
|
# date, title, shell_id
|
|
#['2025-02-23 16:00:00', 'GOTT 6: Intro to Synchronous Teaching (Sync/Hyflex)', 21835],
|
|
#['2025-03-14 17:00:00', 'GOTT 5: The Essentials of Blended Learning (Hybrid) ', '21886'],
|
|
#['2025-02-23 16:00:00', 'GOTT 1: Intro to Teaching Online (2 week, async)', 21874]
|
|
#['2025-05-26 17:00:00', 'GOTT 2: Introduction to Asynchronous Teaching and Learning', 23015],
|
|
#['2025-06-01 17:00:00', 'GOTT 1: Intro to Teaching Online', 23083],
|
|
#['2025-06-01 17:00:00', 'GOTT 4: Assessments in Digital Learning', 21898],
|
|
|
|
#['2025-08-11 13:00:00', 'GOTT 1: Introduction to Online Teaching with Canvas', 23232],
|
|
#['2025-09-01 17:00:00', r'GOTT 1: Intro to Online Teaching (Canvas, Accessibility and RSI) ', 23270],
|
|
#['2025-09-14 17:00:00', r'GOTT 2: Intro to Asynchronous Online Teaching and Learning', 23290],
|
|
['2025-09-28 17:00:00', r'GOTT 6: Foundation of Teaching Live Online/Hyflex', 23311],
|
|
]
|
|
#print(json.dumps(signups,indent=4))
|
|
#print(json.dumps(by_email,indent=4))
|
|
|
|
subs = {'csalvin@gavilan.edu':'christinasalvin@gmail.com',
|
|
'karenjeansutton@gmail.com': 'ksutton@gavilan.edu',
|
|
'elisepeeren@gmail.com': 'epeeren@gavilan.edu',
|
|
'kjoyenderle@gmail.com': 'kenderle@gavilan.edu',
|
|
'flozana@gmail.com': 'flozano@gavilan.edu',
|
|
'fyarahmadi2191@gmail.com': 'fyarahmadi@gavilan.edu',
|
|
'jacquelinejeancollins@yahoo.com': 'jcollins@gavilan.edu',
|
|
'bt@gavilan.edu': 'btagg@gavilan.edu',
|
|
'tagg.brian@yahoo.com': 'btagg@gavilan.edu',
|
|
'tmiller.realestate@gmail.com': 'tmiller@gavilan.edu',
|
|
'gemayo70@yahoo.com': 'pclaros@gavilan.edu',
|
|
'csalvin@gmail.com': 'csalvin@gavilan.edu',
|
|
'efalvey@aol.com': 'efalvey@gavilan.edu',
|
|
'lorrmay36@mac.com': 'llevy@gavilan.edu',
|
|
'gkalu1@gmail.com': 'gkalu@gavilan.edu',
|
|
'rpotter@gav.edu': 'rpotter@gavilan.edu',
|
|
'ally162@qq.com': 'aao@gavilan.edu',
|
|
'davidamancio791@gmail.com': 'damancio@gavilan.edu',
|
|
'carissaamunoz83@gmail.com': 'amunoz@gavilan.edu',
|
|
'jasonwcpa@yahoo.com': 'jwolowitz@gavilan.edu',
|
|
'fam.grzan@charter.net': 'rgrzan@gavilan.edu',
|
|
'carissaadangelo@yahoo.com': 'cmunoz@gavilan.edu',
|
|
}
|
|
|
|
for each_workshop in workshop_ids:
|
|
#if wkshp not in workshop_ids:
|
|
# print(f"skipping {wkshp}")
|
|
# continue
|
|
wkshp_date, wkshp_title, wkshp_shell_id = each_workshop
|
|
# local normalizer consistent with signup cleaning
|
|
def _norm(v):
|
|
try:
|
|
return str(v).replace('\xa0',' ').strip()
|
|
except Exception:
|
|
return v
|
|
to_enroll = []
|
|
#from_file = [ L.strip().split(' - ') for L in codecs.open(f'cache/{student_list}', 'r', 'utf-8').readlines() ]
|
|
#print(from_file)
|
|
|
|
for s in signups:
|
|
if _norm(wkshp_date) == _norm(s.get('date_rsvp')) and _norm(wkshp_title) == _norm(s.get('training')):
|
|
e = s['email'].lower()
|
|
if e in subs:
|
|
e = subs[e]
|
|
print( f"{wkshp_title} {e} {s['name']}" )
|
|
if e in by_email:
|
|
user = by_email[e]
|
|
#print(f"\t{user['name']} {e} {user['login_id']}")
|
|
to_enroll.append([user['id'],user['name']])
|
|
else:
|
|
#print("** ** NOT FOUND")
|
|
pass
|
|
print(f"Workshop: {wkshp_date} {wkshp_title} \n\tEnrolling: {', '.join(i[1] for i in to_enroll)}")
|
|
enroll_id_list_to_shell(to_enroll, wkshp_shell_id)
|
|
|
|
def enroll_gnumber_list_to_courseid():
|
|
infile = codecs.open('cache/gottenrollments.txt','r','utf-8').readlines()
|
|
courseid = infile[0].strip()
|
|
glist = [ x.strip().split(',')[0] for x in infile[1:] ]
|
|
|
|
from localcache2 import user_from_goo
|
|
idlist = [user_from_goo(x)['id'] for x in glist ]
|
|
namelist = [user_from_goo(x)['name'] for x in glist ]
|
|
print(courseid)
|
|
print(glist)
|
|
print(idlist)
|
|
|
|
for i,id in enumerate(idlist):
|
|
try:
|
|
print(f"Enrolling: {id}, {namelist[i]}")
|
|
t = f"{url}/api/v1/courses/{courseid}/enrollments"
|
|
data = { 'enrollment[user_id]': id, 'enrollment[type]':'StudentEnrollment',
|
|
'enrollment[enrollment_state]': 'active' }
|
|
r3 = requests.post(t, headers=header, params=data)
|
|
print(r3.text)
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print(f"Something went wrong with id {id}, course {courseid}, user {namelist[i]}")
|
|
|
|
|
|
|
|
def enroll_art_students_live():
|
|
depts = "THEA ART DM MUS MCTV".split(" ")
|
|
course_id = "13717"
|
|
enroll_bulk_students_bydept(course_id,depts)
|
|
print("done.")
|
|
|
|
def enroll_orientation_students():
|
|
|
|
# For testing purposes
|
|
DO_IT = 1
|
|
|
|
import localcache2
|
|
ori_shell_id = "20862" # 2025 "19094" # 2024 # "" # 2023 orientation shell 15924 # 2022: "9768"
|
|
|
|
print("Getting users in orientation shell")
|
|
#users_in_ori_shell = set( \
|
|
# [ str(x['user_id']) for x in course_enrollment(ori_shell_id).values() ]) # api fetch
|
|
|
|
users_in_ori_shell = list(user_ids_in_shell(ori_shell_id))
|
|
|
|
# single semester
|
|
# users_to_enroll = users_new_this_semester(the_semester) ### ##### USES LOCAL DB
|
|
|
|
# double semester (SU + FA)
|
|
users_to_enroll = users_new_this_2x_semester("202550", "202570") ##### USES LOCAL DB
|
|
|
|
#print("ALL ORIENTATION STUDENTS %s" % str(users_to_enroll))
|
|
#print("\n\nALREADY IN ORI SHELL %s" % str(users_in_ori_shell))
|
|
|
|
enroll_us = users_to_enroll.difference(users_in_ori_shell)
|
|
|
|
#print("\n\nTO ENROLL %s\n" % str(enroll_us))
|
|
print(f"{len(enroll_us)} new students to enroll in Orientation shell." )
|
|
|
|
eee = 0
|
|
uuu = 0
|
|
|
|
(connection,cursor) = localcache2.db()
|
|
|
|
for j in enroll_us:
|
|
s = ""
|
|
try:
|
|
q = "SELECT name,id FROM canvas.users WHERE id=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print(" + Enrolling: %s" % s[0])
|
|
t = url + '/api/v1/courses/%s/enrollments' % ori_shell_id
|
|
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
|
|
'enrollment[enrollment_state]': 'active' }
|
|
#print(t)
|
|
#print(data)
|
|
if DO_IT:
|
|
r3 = requests.post(t, headers=header, params=data)
|
|
eee += 1
|
|
#print(r3.text)
|
|
time.sleep(0.250)
|
|
except Exception as e:
|
|
print(" - Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
# return (eee,uuu)
|
|
|
|
def enroll_o_s_students():
|
|
#full_reload()
|
|
|
|
(es,us) = enroll_stem_students_live()
|
|
(eo, uo) = enroll_orientation_students()
|
|
|
|
print("Enrolled %i and unenrolled %i students in STEM shell" % (es,us))
|
|
print("Enrolled %i students in Orientation shell" % eo)
|
|
|
|
|
|
|
|
|
|
def make_ztc_list(sem='sp20'):
|
|
sched = json.loads(open('output/semesters/2020spring/sp20_sched.json','r').read())
|
|
responses = open('cache/ztc_responses_sp20.csv','r').readlines()[1:]
|
|
|
|
result = open('cache/ztc_crossref.csv','w')
|
|
result.write('Course,Section,Name,Teacher,ZTC teacher\n')
|
|
|
|
ztc_by_dept = {}
|
|
for R in responses:
|
|
R = re.sub(',Yes','',R)
|
|
R = re.sub(r'\s\s+',',',R)
|
|
|
|
parts = R.split(r',') #name courselist yes
|
|
#print(parts[1])
|
|
name = parts[0]
|
|
|
|
for C in parts[1:] :
|
|
C = C.strip()
|
|
#print(C)
|
|
if C in ztc_by_dept:
|
|
ztc_by_dept[C] += ', ' + parts[0]
|
|
else:
|
|
ztc_by_dept[C] = parts[0]
|
|
print(ztc_by_dept)
|
|
for CO in sched:
|
|
#if re.match(r'CWE',CO['code']):
|
|
#print(CO)
|
|
|
|
if CO['code'] in ztc_by_dept:
|
|
print(('Possible match, ' + CO['code'] + ' ' + ztc_by_dept[CO['code']] + ' is ztc, this section taught by: ' + CO['teacher'] ))
|
|
result.write( ','.join( [CO['code'] ,CO['crn'] , CO['name'] , CO['teacher'] , ztc_by_dept[CO['code']] ]) + "\n" )
|
|
|
|
def course_search_by_sis():
|
|
term = 65
|
|
all_courses = getCoursesInTerm(term)
|
|
all = []
|
|
for course in all_courses:
|
|
#u = "/api/v1/accounts/1/courses/%s" % course_id
|
|
#i = fetch( url + u)
|
|
all.append([ course['name'], course['sis_course_id'] ])
|
|
print_table(all)
|
|
# print(json.dumps(x, indent=2))
|
|
|
|
|
|
# run overview_start_dates to get most recent info
|
|
def set_custom_start_dates():
|
|
from datetime import datetime
|
|
|
|
term = find_term( input("term? (ex: fa25) ") )
|
|
|
|
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
|
|
print(f"Couldn't find term. Try updating the saved terms list.")
|
|
return
|
|
|
|
TERM = term['canvas_term_id']
|
|
SEM = term['code']
|
|
|
|
term_start_month = int(term['begin'].split('/')[0])
|
|
term_start_day = int(term['begin'].split('/')[1])
|
|
term_start_year = '20' + term['code'][2:4]
|
|
|
|
print(f"term begins on {term_start_month}/{term_start_day}")
|
|
|
|
output_path = f"cache/overview_semester_shells_annotated{SEM}.csv"
|
|
|
|
input_path = f"cache/overview_semester_shells_{SEM}.csv"
|
|
if not os.path.exists(input_path):
|
|
print(f"file does not exist: {input_path}")
|
|
print("Run overview_start_dates first")
|
|
return
|
|
|
|
make_changes = 1
|
|
do_all = 0
|
|
get_fresh = 0
|
|
|
|
# just do certain ids in cache/changeme.txt
|
|
limit_to_specific_ids = 0
|
|
|
|
limit_to = [x.strip() for x in open('cache/changeme.txt','r').readlines()]
|
|
|
|
def adjust_shell_startdate(row):
|
|
# Placeholder stub
|
|
pass
|
|
|
|
def parse_date(date_str):
|
|
if not date_str or date_str.lower() == 'none':
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(date_str.replace("Z", "").replace("T", " "))
|
|
except ValueError:
|
|
return None
|
|
|
|
with open(input_path, newline='', encoding='utf-8') as infile, \
|
|
open(output_path, "w", newline='', encoding='utf-8') as outfile:
|
|
|
|
reader = csv.DictReader(infile)
|
|
fieldnames = reader.fieldnames + [
|
|
"ignore","is_early_start", "is_late_start", "shell_custom_start", "shell_warn_crosslist_sections"
|
|
]
|
|
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for row in reader:
|
|
if int(row["shell_numsections"]) == 0:
|
|
continue
|
|
|
|
sched_start = parse_date(row["sched_start"])
|
|
shell_start = parse_date(row["shell_start"])
|
|
shortname = row["shell_shortname"]
|
|
num_sections = int(row["shell_numsections"])
|
|
|
|
# Initialize new columns
|
|
row["ignore"] = ""
|
|
row["is_early_start"] = ""
|
|
row["is_late_start"] = ""
|
|
row["shell_custom_start"] = ""
|
|
row["shell_warn_crosslist_sections"] = ""
|
|
|
|
# check for cops program
|
|
department = shortname.split()[0].rstrip("0123456789") # → "JLE"
|
|
if department in ("JLE", "JFT"):
|
|
row["ignore"] = department
|
|
|
|
# Early/late start check
|
|
if sched_start:
|
|
sched_mmdd = (sched_start.month, sched_start.day)
|
|
term_mmdd = (term_start_month, term_start_day)
|
|
if sched_mmdd < term_mmdd:
|
|
row["is_early_start"] = sched_start.date().isoformat()
|
|
elif sched_mmdd > term_mmdd:
|
|
row["is_late_start"] = sched_start.date().isoformat()
|
|
|
|
# shell_start override
|
|
if shell_start:
|
|
row["shell_custom_start"] = shell_start.date().isoformat()
|
|
else:
|
|
if row["is_early_start"] or row["is_late_start"]:
|
|
adjust_shell_startdate(row)
|
|
|
|
# Crosslist check
|
|
if '/' in shortname:
|
|
parts = shortname.split()
|
|
section_part = parts[-1]
|
|
section_count = len(section_part.split('/'))
|
|
if section_count != num_sections:
|
|
row["shell_warn_crosslist_sections"] = section_part
|
|
|
|
writer.writerow(row)
|
|
return
|
|
'''
|
|
# Do we adjust the start date? Only if it doesn't match term
|
|
if d_start.month == term_start_month and d_start.day == term_start_day:
|
|
print(" Ignoring, term start date" )
|
|
continue
|
|
|
|
else:
|
|
print(" Adjust course start day?")
|
|
|
|
if make_changes:
|
|
if do_all != 'a':
|
|
do_all = input(' -> adjust? [enter] for yes, [a] to do all remaining. [n] to quit. >')
|
|
if do_all == 'n':
|
|
exit()
|
|
if do_all == '' or do_all == 'a':
|
|
data = {'course[start_at]':d_start.isoformat(), 'course[restrict_student_future_view]': True,
|
|
'course[restrict_enrollments_to_course_dates]':True }
|
|
u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{this_id}"
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(" updated.. OK")
|
|
'''
|
|
|
|
|
|
def overview_start_dates():
|
|
term = find_term( input("term? (ex: fa25) ") )
|
|
|
|
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
|
|
print(f"Couldn't find term.")
|
|
return
|
|
|
|
TERM = term['canvas_term_id']
|
|
SEM = term['code']
|
|
|
|
output = codecs.open(f"cache/overview_semester_shells_{SEM}.csv","w","utf-8")
|
|
|
|
get_fresh = 0
|
|
if not get_fresh:
|
|
gf = input('Fetch new list of semester courses? (y/n) ')
|
|
if gf=='y':
|
|
get_fresh = 1
|
|
|
|
|
|
# get list of online course shells
|
|
c = getCoursesInTerm(TERM,get_fresh,0)
|
|
|
|
# dict to match section numbers between shells and schedule
|
|
crn_to_canvasid = {}
|
|
for C in c:
|
|
if 'sis_course_id' in C and C['sis_course_id']:
|
|
#print( f"{C['name']} -> {C['sis_course_id'][7:13]}" )
|
|
crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id'])
|
|
else:
|
|
print( f"---NO CRN IN: {C['name']} -> {C}" )
|
|
|
|
header = f"id,shell_shortname,type,enrolled,max,sched_start,shell_start,shell_end,shell_restrict_view_dates,shell_restrict_view_dates,shell_state,shell_numstudents,shell_numsections"
|
|
output.write(header + "\n")
|
|
print("\n\n" + header)
|
|
|
|
# get course info from schedule
|
|
s = requests.get(f"https://gavilan.cc/schedule/{SEM}_sched_expanded.json").json()
|
|
for S in s:
|
|
# get dates
|
|
start = re.sub( r'\-','/', S['start']) + '/20' + SEM[2:4]
|
|
d_start = datetime.strptime(start,"%m/%d/%Y")
|
|
|
|
# try to find online shell matching this schedule entry
|
|
try:
|
|
this_id = crn_to_canvasid[S['crn']]
|
|
except Exception as e:
|
|
print(f"DIDN'T FIND CRN - {start} {d_start} - {S['code']} {S['crn']} {S['name']}" )
|
|
continue
|
|
|
|
# get more canvas course shell info
|
|
uu = f"{url}/api/v1/courses/{this_id}"
|
|
this_course = fetch(uu)
|
|
|
|
shell_start = this_course['start_at']
|
|
shell_end = this_course['end_at']
|
|
shell_restrict_view_dates = '?'
|
|
if 'access_restricted_by_date' in this_course:
|
|
shell_restrict_view_dates = this_course['access_restricted_by_date']
|
|
shell_shortname = this_course['course_code']
|
|
shell_state = this_course['workflow_state']
|
|
|
|
# get user count
|
|
ss = f"{url}/api/v1/courses/{this_id}/users"
|
|
enrollments = fetch(ss, params={"enrollment_type[]":"student"})
|
|
shell_numstudents = len(enrollments)
|
|
|
|
# get teachers
|
|
s2 = f"{url}/api/v1/courses/{this_id}/users"
|
|
teachers = fetch(s2, params={"enrollment_type[]":"teacher"})
|
|
shell_teachers = [(x['id'],x['name']) for x in teachers]
|
|
|
|
# cross-listed?
|
|
sec = f"{url}/api/v1/courses/{this_id}/sections"
|
|
sections = fetch(sec, params={"include[]":"total_students"})
|
|
shell_numsections = len(sections)
|
|
|
|
content = f"{this_id},{shell_shortname},{S['type']},{S['act']},{S['cap']},{d_start},{shell_start},{shell_end},{shell_restrict_view_dates},{shell_restrict_view_dates},{shell_state},{shell_numstudents},{shell_numsections},{S['teacher']},{shell_teachers}"
|
|
output.write(content + "\n")
|
|
print(content)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def course_by_depts_terms(section=0):
|
|
|
|
get_fresh = 0
|
|
TERM = 291
|
|
WI_TERM = 290
|
|
DOING_WINTER_MOVES = 1
|
|
|
|
SEM = "sp26"
|
|
|
|
make_changes = 0
|
|
do_all = 'a'
|
|
|
|
winter_start_day = 5
|
|
aviation_start_day = 7
|
|
nursing_start_day = 0
|
|
spring_start_day = 26
|
|
|
|
# get list of online course shells
|
|
if get_fresh:
|
|
print(f"Getting list of courses in {SEM}")
|
|
c = getCoursesInTerm(TERM,get_fresh,0)
|
|
codecs.open(f'cache/courses_in_term_{TERM}.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
else:
|
|
c = json.loads( codecs.open(f'cache/courses_in_term_{TERM}.json','r','utf-8').read() )
|
|
|
|
# dict to match section numbers between shells and schedule
|
|
crn_to_canvasid = {}
|
|
for C in c:
|
|
if 'sis_course_id' in C and C['sis_course_id']:
|
|
print( f"{C['name']} -> {C['sis_course_id'][7:13]}" )
|
|
crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id'])
|
|
else:
|
|
print( f"---NO CRN IN: {C['name']} -> {C}" )
|
|
|
|
# get course info from schedule
|
|
s = requests.get(f"http://gavilan.cc/schedule/{SEM}_sched_expanded.json").json()
|
|
for S in s:
|
|
# get dates
|
|
start = re.sub( r'\-','/', S['start']) + '/20' + SEM[2:4]
|
|
d_start = datetime.strptime(start,"%m/%d/%Y")
|
|
|
|
# try to find online shell matching this schedule entry
|
|
try:
|
|
this_id = crn_to_canvasid[S['crn']]
|
|
report_line = f" - {start} {d_start} - id: {this_id} - {S['code']} {S['crn']} {S['name']}"
|
|
except Exception as e:
|
|
print(f"DIDN'T FIND CRN - {start} {d_start} - {S['code']} {S['crn']} {S['name']}" )
|
|
continue
|
|
|
|
if 1:
|
|
#if d_start.month < 5 or d_start.month > 7:
|
|
# print(f" Ignoring {d_start}, starting too far away...")
|
|
# continue
|
|
|
|
if d_start.month == 1 and d_start.day == aviation_start_day:
|
|
print(f"+ AVIAT {report_line}")
|
|
continue
|
|
|
|
#if d_start.month == 1 and d_start.day == nursing_start_day:
|
|
# print("- Nursing ", start, d_start, " - ", S['code'], " ", S['crn'] )
|
|
# continue
|
|
|
|
if d_start.month == 1 and d_start.day == spring_start_day:
|
|
print(f" IGNORE {report_line}" )
|
|
continue
|
|
|
|
else:
|
|
#print(f" Adjust course start day? {report_line}")
|
|
|
|
if make_changes:
|
|
if do_all != 'a':
|
|
do_all = input(' -> adjust? [enter] for yes, [a] to do all remaining. [n] to quit. >')
|
|
if do_all == 'n':
|
|
exit()
|
|
if do_all == '' or do_all == 'a':
|
|
data = {'course[start_at]':d_start.isoformat(), 'course[restrict_student_future_view]': True,
|
|
'course[restrict_enrollments_to_course_dates]':True }
|
|
u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{this_id}"
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(f"+ UPDATED {report_line}")
|
|
|
|
make_changes = 1
|
|
if DOING_WINTER_MOVES:
|
|
if d_start.month == 1 and d_start.day == winter_start_day:
|
|
print(f"+ WINTER {report_line}")
|
|
data = {'course[term_id]':WI_TERM}
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s" % crn_to_canvasid[S['crn']]
|
|
if make_changes:
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
#print(" updated.. OK")
|
|
#print(r3.text)
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
def xlist_cwe():
|
|
|
|
# cwe190 and wtrm290 get put into 1 shell
|
|
|
|
# cwe192 get put into another shell
|
|
|
|
|
|
this_sem_190_id = 23594 # they get 190s and 290s
|
|
this_sem_192_id = 23603 # they get 192s
|
|
# this_sem_term = 289
|
|
|
|
term = find_term( input("term? (ex: fa25) ") )
|
|
|
|
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
|
|
print(f"Couldn't find term.")
|
|
return
|
|
|
|
this_sem_term = term['canvas_term_id']
|
|
SEM = term['code']
|
|
|
|
|
|
get_fresh = 1
|
|
sem_courses = getCoursesInTerm(this_sem_term, get_fresh, 0)
|
|
|
|
for search_string in ['CWE190','WTRM290']:
|
|
for R in sem_courses:
|
|
try:
|
|
if re.search(search_string, R['name']) and str(R['id']) != str(this_sem_190_id):
|
|
|
|
# use the course to get the section id
|
|
print ( R['name'] )
|
|
u = url + '/api/v1/courses/%i/sections' % R['id']
|
|
for S in fetch(u):
|
|
if (S['id']):
|
|
myanswer = input( "-> Should I crosslist: %i\t%s\tsection id: %i (y/n) " % (R['id'],R['name'],S['id'] ))
|
|
if myanswer=='y':
|
|
# cross list
|
|
v = url + "/api/v1/sections/%i/crosslist/%i" % (S['id'],this_sem_190_id)
|
|
res = requests.post(v, headers = header)
|
|
print( json.dumps( json.loads(res.text), indent=2) )
|
|
|
|
print()
|
|
except Exception as e:
|
|
print( "Caused a problem: " + str(e) + "\n" + str(R) + "\n" )
|
|
|
|
## Now the 192s
|
|
search_string = "CWE192"
|
|
for R in sem_courses:
|
|
try:
|
|
if re.search(search_string, R['name']) and str(R['id']) != str(this_sem_192_id):
|
|
|
|
# use the course to get the section id
|
|
print ( R['name'] )
|
|
u = url + '/api/v1/courses/%i/sections' % R['id']
|
|
for S in fetch(u):
|
|
if (S['id']):
|
|
myanswer = input( "-> Should I crosslist: %i\t%s\tsection id: %i (y/n) " % (R['id'],R['name'],S['id'] ))
|
|
if myanswer=='y':
|
|
# cross list
|
|
v = url + "/api/v1/sections/%i/crosslist/%i" % (S['id'],this_sem_192_id)
|
|
res = requests.post(v, headers = header)
|
|
print( json.dumps( json.loads(res.text), indent=2) )
|
|
|
|
print()
|
|
except Exception as e:
|
|
print( "Caused a problem: " + str(e) + "\n" + str(R) + "\n" )
|
|
|
|
|
|
|
|
def change_term(courseid,termid):
|
|
data = { 'course[term_id]': str(termid), 'course[restrict_enrollments_to_course_dates]':'false' }
|
|
t = f'{url}/api/v1/courses/{courseid}'
|
|
r3 = requests.put(t, headers=header, params=data)
|
|
result = json.loads(r3.text)
|
|
json.dumps(result,indent=2)
|
|
|
|
|
|
|
|
|
|
def teacher_to_many_shells():
|
|
for id in [21842,21096,20952,21561,21219,21274,20875,21018,21093,21719,21460,21102,21506,21169,21538]:
|
|
#print(id)
|
|
#continue
|
|
|
|
term_id_misc = 9
|
|
term_id_sp25 = 287
|
|
|
|
change_term(id, term_id_misc)
|
|
|
|
# Add teacher
|
|
u3 = url + f"/api/v1/courses/{id}/enrollments"
|
|
#usrid = input("id of %s? " % N)
|
|
usrid = '30286'
|
|
data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":usrid,
|
|
"enrollment[enrollment_state]":"active" }
|
|
r4 = requests.post(u3, headers=header, params=data2)
|
|
result = json.loads(r4.text)
|
|
print(json.dumps(result, indent=2))
|
|
print(f"enrolled user id: {usrid} as teacher in course {id}.")
|
|
|
|
change_term(id, term_id_sp25)
|
|
|
|
import os, pickle
|
|
|
|
def create_sandboxes():
|
|
|
|
## TODO: read all student names and determine ahead of time if initials conflict. deal with them
|
|
|
|
courses_to_sandbox = [ #(20567, ' Sandbox GOTT1 SU24'),
|
|
#(20575, ' Sandbox GOTT2 SU24'),
|
|
#(20600, ' Sandbox GOTT4 SU24'),
|
|
#(19223, ' Sandbox GOTT5 WI24'),
|
|
#(19224, ' Sandbox GOTT6 WI24'),
|
|
#(20761, ' Sandbox GOTT1 FA24'),
|
|
#(21770, ' Sandbox GOTT1 WI25'),
|
|
#(21772, ' Sandbox GOTT2 WI25'),
|
|
#(23083, ' Sandbox GOTT1 SU25'),
|
|
#(23015, ' Sandbox GOTT2 SU25'),
|
|
#(21898, ' Sandbox GOTT4 SU25'),
|
|
#(23270, ' Sandbox GOTT1 FA25SEPT'),
|
|
#(23290, ' Sandbox GOTT2 FA25SEPT'),
|
|
(23314, ' Sandbox GOTT5 FA25'),
|
|
(23315, ' Sandbox GOTT4 FA25'),
|
|
]
|
|
filepath = 'cache/sandbox_courses.pkl'
|
|
|
|
report = codecs.open('cache/sandbox_report.txt','a','utf-8')
|
|
|
|
if os.path.exists(filepath):
|
|
with open(filepath, 'rb') as f:
|
|
sandbox_log = pickle.load(f)
|
|
else:
|
|
sandbox_log = []
|
|
|
|
for crs_id, label in courses_to_sandbox:
|
|
# TODO check and skip "Test Student"
|
|
crs_info = getCourses(crs_id)
|
|
# print(json.dumps(crs_info,indent=2))
|
|
c_name = crs_info['name']
|
|
print(f"\nStudents in course {crs_id}: {c_name}" )
|
|
report.write(f"\nCourse: {c_name}\n" )
|
|
enrolled = course_enrollment(crs_id)
|
|
for eid,stu in enrolled.items():
|
|
if stu['role'] != 'StudentEnrollment':
|
|
continue
|
|
u_name = stu['user']['short_name']
|
|
u_id = stu['user']['id']
|
|
initials = ''.join([ x[0] for x in u_name.split(" ") ])
|
|
print(f" id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}")
|
|
report.write(f" id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}")
|
|
coursename = f"{initials}{label}"
|
|
if coursename in sandbox_log:
|
|
print(f" - Already created: {coursename}")
|
|
else:
|
|
print(f" + Creating course: {coursename} for {u_name}, id: {u_id}")
|
|
u2 = url + "/api/v1/accounts/1/courses"
|
|
data = {
|
|
"course[name]": coursename,
|
|
"course[code]": coursename,
|
|
"course[term_id]": "8",
|
|
}
|
|
|
|
# Create a course
|
|
r3 = requests.post(u2, headers=header, params=data)
|
|
new_course_response = json.loads(r3.text)
|
|
id = new_course_response['id']
|
|
print(f" created course id {id}")
|
|
report.write(f" link: https://ilearn.gavilan.edu/courses/{id} id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}\n")
|
|
|
|
# Add teacher
|
|
u3 = url + f"/api/v1/courses/{id}/enrollments"
|
|
data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":u_id,
|
|
"enrollment[enrollment_state]":"active" }
|
|
r4 = requests.post(u3, headers=header, params=data2)
|
|
print(f" enrolled user id: {u_id} as teacher.")
|
|
|
|
# Desired settings
|
|
data = { 'course[is_public_to_auth_users]': True, 'course[event]': 'offer' }
|
|
t = url + f"/api/v1/courses/{id}"
|
|
r3 = requests.put(t, headers=header, params=data)
|
|
result = json.loads(r3.text)
|
|
if 'name' in result:
|
|
print(f" > Name: {result['name']}")
|
|
if 'workflow_state' in result:
|
|
print(f" > State: {result['workflow_state']}")
|
|
if 'is_public_to_auth_users' in result:
|
|
print(f" > Public: {result['is_public_to_auth_users']}")
|
|
sandbox_log.append(coursename)
|
|
|
|
# Write log back out
|
|
with open(filepath, 'wb') as handle:
|
|
pickle.dump(sandbox_log, handle, protocol=pickle.HIGHEST_PROTOCOL)
|
|
|
|
return 1
|
|
|
|
# ('ED','82'),
|
|
sandboxes = [ ('JH','45324'), ('PK','38183'), ('GM','5167'), ('BS','19231'),
|
|
('ST','303'), ('KW','5145')]
|
|
|
|
sandboxes = [ ('CD','51701'), ('LC','45193'), ('JC','70'), ('DG','133'), ('JH','2816'),('SM','18812'), ('GM','211'),
|
|
('RM','45341'), ('DP','251'), ('BT','58059'), ('TT','36834') ]
|
|
|
|
sandboxes = [ ('MA','8'), ('WA','15'), ('BA','18'), ('CC','51701'), ('LC','45193'), ('PC','4100'), ('ED','82'), ('KE','101'),
|
|
('OF','41897'), ('SG','115'), ('JG','37654'), ('DG','133'), ('DK','168'), ('JM','204'), ('GM', '211'),
|
|
('RM','45341'), ('CR','5655'), ('CS','272'), ('BS','19231'), ('SS', '274') ]
|
|
|
|
sandboxes = [ ('SM','191')]
|
|
|
|
sandboxes = [ ('KD', '2509'), ('KE', '2904'), ('SH', '144'), ('SN','60996'), ('EP', '16726'), ('PS','60938'), ('JW', '43052') ]
|
|
|
|
sandboxes = [('HA','61620'), ('AS','61451'), ('MP', '11565'), ('AA','51276') ]
|
|
sandboxes = [('JR','61062')]
|
|
|
|
for (N,usrid) in sandboxes:
|
|
coursename = f"{N} Sandbox SU23 (GOTT1)"
|
|
coursecode = f"{N} SU23 Sandbox (GOTT1)"
|
|
print(f"Creating course: {coursename} for {N}, id: {usrid}")
|
|
u2 = url + "/api/v1/accounts/1/courses"
|
|
data = {
|
|
"course[name]": coursename,
|
|
"course[code]": coursecode,
|
|
"course[term_id]": "8",
|
|
}
|
|
|
|
# Create a course
|
|
r3 = requests.post(u2, headers=header, params=data)
|
|
course_by_dept = json.loads(r3.text)
|
|
id = course_by_dept['id']
|
|
print(f"created course id {id}")
|
|
|
|
report.append( f"{coursename} https://ilearn.gavilan.edu/courses/{id}" )
|
|
|
|
# Add teacher
|
|
u3 = url + f"/api/v1/courses/{id}/enrollments"
|
|
#usrid = input("id of %s? " % N)
|
|
data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":usrid,
|
|
"enrollment[enrollment_state]":"active" }
|
|
r4 = requests.post(u3, headers=header, params=data2)
|
|
print(f"enrolled user id: {usrid} as teacher.")
|
|
|
|
# Desired settings
|
|
data = { 'course[is_public_to_auth_users]': True, 'course[event]': 'offer' }
|
|
t = url + f"/api/v1/courses/{id}"
|
|
r3 = requests.put(t, headers=header, params=data)
|
|
result = json.loads(r3.text)
|
|
if 'name' in result:
|
|
print(f"Name: {result['name']}")
|
|
if 'workflow_state' in result:
|
|
print(f" State: {result['workflow_state']}")
|
|
if 'is_public_to_auth_users' in result:
|
|
print(f" Public: {result['is_public_to_auth_users']}")
|
|
|
|
|
|
|
|
#print(json.dumps(json.loads(r4.text),indent=2))
|
|
#print()
|
|
#x = input("enter to continue")
|
|
print("\n\n")
|
|
print("\n".join(report))
|
|
print("\n")
|
|
|
|
|
|
|
|
|
|
## ##
|
|
## ##
|
|
## ## Course Nav and External Tools
|
|
## ##
|
|
## ##
|
|
|
|
def do_gav_connect():
|
|
term = 181
|
|
sem = "202430"
|
|
get_fresh = 1
|
|
crns = [sem + "-" + x.strip() for x in open('cache/starfish.txt','r').readlines()]
|
|
target = len(crns)
|
|
print(crns)
|
|
print("Press enter to begin.")
|
|
a = input()
|
|
print("Fetching all course names...")
|
|
c = getCoursesInTerm(term, get_fresh, 0)
|
|
i = 0
|
|
|
|
for course in c:
|
|
if course['sis_course_id'] in crns:
|
|
print(" Adding gav connect to", course['name'])
|
|
print()
|
|
result = add_gav_connect(course['id'])
|
|
if result:
|
|
i += 1
|
|
else:
|
|
print("Something went wrong with", course['name'])
|
|
print(f"Added {i} redirects out of {target}.")
|
|
|
|
def add_gav_connect(course_id):
|
|
params = { "name": "GavConnect",
|
|
"privacy_level": "anonymous",
|
|
"description": "Add links to external web resources that show up as navigation items in course, user or account navigation. Whatever URL you specify is loaded within the content pane when users click the link.",
|
|
"consumer_key": "N/A",
|
|
"shared_secret": "N/A",
|
|
"url": "https://www.edu-apps.org/redirect",
|
|
"custom_fields[new_tab]": "1",
|
|
"custom_fields[url]": "https://gavilan.starfishsolutions.com/starfish-ops/dl/student/dashboard.html",
|
|
"workflow_state": "anonymous",
|
|
"course_navigation[enabled]": "true",
|
|
"course_navigation[visibility]": "public",
|
|
"course_navigation[label]": "GavConnect",
|
|
"course_navigation[selection_width]": "800",
|
|
"course_navigation[selection_height]": "400",
|
|
"course_navigation[icon_url]": "https://www.edu-apps.org/assets/lti_redirect_engine/redirect_icon.png",
|
|
}
|
|
|
|
# POST
|
|
u = url + f"/api/v1/courses/{course_id}/external_tools"
|
|
res = requests.post(u, headers = header, params=params)
|
|
result = json.loads(res.text)
|
|
|
|
#print( json.dumps( result, indent=2) )
|
|
if "errors" in result:
|
|
return 0
|
|
if "id" in result:
|
|
return 1
|
|
|
|
# Create a Program Mapper redirect external tool in course nav.
|
|
def add_career_academic_pathways(course_id):
|
|
params = { "name": "Career & Academic Pathways",
|
|
"privacy_level": "anonymous",
|
|
"description": "Add links to external web resources that show up as navigation items in course, user or account navigation. Whatever URL you specify is loaded within the content pane when users click the link.",
|
|
"consumer_key": "N/A",
|
|
"shared_secret": "N/A",
|
|
"url": "https://www.edu-apps.org/redirect",
|
|
"custom_fields[new_tab]": "1",
|
|
"custom_fields[url]": "https://gavilan.programmapper.com/academics",
|
|
"workflow_state": "anonymous",
|
|
"course_navigation[enabled]": "true",
|
|
"course_navigation[visibility]": "public",
|
|
"course_navigation[label]": "Career & Academic Pathways",
|
|
"course_navigation[selection_width]": "800",
|
|
"course_navigation[selection_height]": "400",
|
|
"course_navigation[icon_url]": "https://www.edu-apps.org/assets/lti_redirect_engine/redirect_icon.png",
|
|
"course_navigation[default]": "disabled",
|
|
}
|
|
|
|
# POST
|
|
u = url + f"/api/v1/courses/{course_id}/external_tools"
|
|
res = requests.post(u, headers = header, params=params)
|
|
result = json.loads(res.text)
|
|
|
|
#print( json.dumps( result, indent=2) )
|
|
if "errors" in result:
|
|
return 0
|
|
if "id" in result:
|
|
return 1
|
|
|
|
# Bulk add GavConnect to a list of course ids provided by user input.
|
|
def add_gav_connect_prompt_list():
|
|
raw = input("Enter course ids (comma/space separated): ")
|
|
ids = [s.strip() for s in re.split(r"[\s,]+", raw) if s.strip()]
|
|
if not ids:
|
|
print("No course ids provided.")
|
|
return
|
|
ok = 0
|
|
for cid in ids:
|
|
try:
|
|
res = add_gav_connect(cid)
|
|
print(f"{cid}: {'OK' if res else 'FAILED'}")
|
|
if res:
|
|
ok += 1
|
|
except Exception as ex:
|
|
print(f"{cid}: error {ex}")
|
|
print(f"Added GavConnect to {ok} of {len(ids)} courses.")
|
|
|
|
# Add Pathways link to every course in a selected term (default OFF).
|
|
def add_pathways_all_courses_in_term():
|
|
t = find_term(input("term? (ex: fa25) "))
|
|
if not t or (not 'canvas_term_id' in t) or (not 'code' in t):
|
|
print("Couldn't find term.")
|
|
return
|
|
term = t['canvas_term_id']
|
|
print("Fetching list of all active courses")
|
|
courses = getCoursesInTerm(term, 1, 0)
|
|
ok, total = 0, 0
|
|
for C in courses:
|
|
total += 1
|
|
try:
|
|
res = add_career_academic_pathways(C['id'])
|
|
print(f"{C['id']} {C.get('name','')}: {'OK' if res else 'FAILED'}")
|
|
if res:
|
|
ok += 1
|
|
except Exception as ex:
|
|
print(f"{C['id']} {C.get('name','')}: error {ex}")
|
|
print(f"Added Pathways to {ok} of {total} courses.")
|
|
|
|
# Ensure Pathways link exists for every course in the term; add if missing (default OFF).
|
|
def ensure_pathways_in_term():
|
|
t = find_term(input("term? (ex: fa25) "))
|
|
if not t or (not 'canvas_term_id' in t) or (not 'code' in t):
|
|
print("Couldn't find term.")
|
|
return
|
|
term = t['canvas_term_id']
|
|
TARGET_LABEL = "Career & Academic Pathways"
|
|
print("Fetching list of all active courses")
|
|
courses = getCoursesInTerm(term, 1, 0)
|
|
added, present, total = 0, 0, 0
|
|
for C in courses:
|
|
total += 1
|
|
try:
|
|
tabs = fetch(f"{url}/api/v1/courses/{C['id']}/tabs") or []
|
|
has_it = any(t.get('label') == TARGET_LABEL for t in tabs)
|
|
if has_it:
|
|
present += 1
|
|
print(f"{C['id']} {C.get('name','')}: already present")
|
|
else:
|
|
res = add_career_academic_pathways(C['id'])
|
|
print(f"{C['id']} {C.get('name','')}: {'ADDED' if res else 'FAILED'}")
|
|
if res:
|
|
added += 1
|
|
except Exception as ex:
|
|
print(f"{C['id']} {C.get('name','')}: error {ex}")
|
|
print(f"Present: {present}, Added: {added}, Total: {total}")
|
|
|
|
def mod_eval_visibility( shell_id, visible=True ):
|
|
evals_hidden = True
|
|
if (visible): evals_hidden = False
|
|
data = {'position':2, 'hidden':evals_hidden}
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % shell_id
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
#print(" " + r3.text)
|
|
|
|
|
|
|
|
def instructor_list_to_activate_evals():
|
|
courses = all_sem_courses_teachers()
|
|
|
|
mylist = codecs.open('cache/fa21_eval_teachers.txt','r','utf-8').readlines()
|
|
mylist = [ x.split(',')[2].strip() for x in mylist ]
|
|
|
|
count = 0
|
|
limit = 5000
|
|
|
|
for c in courses:
|
|
shell_id = c[1]
|
|
teacher_id = c[6]
|
|
teacher_name = c[5]
|
|
course_name = c[3]
|
|
|
|
if teacher_id in mylist:
|
|
print("Teacher: %s \t course: %s" % (teacher_name,course_name))
|
|
mod_eval_visibility( shell_id, False)
|
|
count += 1
|
|
if count > limit: return
|
|
|
|
|
|
#print(mylist)
|
|
|
|
# Toggle the eval tool visibility for all courses in the selected Canvas term.
|
|
def add_evals(section=0):
|
|
# show or hide?
|
|
|
|
term_record = find_term(input('term? '))
|
|
if not term_record:
|
|
raise ValueError(f"Unknown term")
|
|
|
|
term_id = term_record.get('canvas_term_id')
|
|
if term_id is None:
|
|
raise ValueError(f"Canvas term id missing for {term_record}")
|
|
|
|
term_code = term_record.get('code')
|
|
|
|
# fetch list of courses?
|
|
GET_FRESH_LIST = 0
|
|
|
|
# turn off eval link to clean up from prev semester?
|
|
#CLEAN_UP = 1
|
|
|
|
# just print, don't change anything
|
|
TEST_RUN = 0
|
|
|
|
# confirm each shell?
|
|
ASK = 0
|
|
|
|
# are we showing or hiding the course eval link?
|
|
HIDE = False
|
|
|
|
|
|
s = [ x.strip() for x in codecs.open(f"cache/{term_code}_eval_sections.txt",'r').readlines()]
|
|
s = list(funcy.flatten(s))
|
|
s.sort()
|
|
print(f"Going to activate course evals in these sections: \n{s}\n")
|
|
xyz = input('hit return to continue')
|
|
|
|
all_semester_courses = getCoursesInTerm(term_id, GET_FRESH_LIST, 1)
|
|
eval_course_ids = []
|
|
courses = {}
|
|
for C in all_semester_courses:
|
|
if C and 'sis_course_id' in C and C['sis_course_id']:
|
|
parts = C['sis_course_id'].split('-')
|
|
if parts[1] in s:
|
|
#print(C['name'])
|
|
courses[str(C['id'])] = C
|
|
eval_course_ids.append(str(C['id']))
|
|
|
|
data = {'position':2, 'hidden':HIDE}
|
|
eval_course_ids.sort()
|
|
|
|
for i in eval_course_ids:
|
|
if TEST_RUN:
|
|
print(f"{courses[i]['id']} / {courses[i]['name']}")
|
|
else:
|
|
if ASK:
|
|
a = input(f"Hit q to quit, a to do all, or enter to activate eval for: {courses[i]['id']} / {courses[i]['name']} : ")
|
|
if a == 'a': ASK = 0
|
|
if a == 'q': return
|
|
else:
|
|
print(f"{courses[i]['id']} / {courses[i]['name']}")
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % i
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(f"OK {u2}")
|
|
#print(r3.text)
|
|
#time.sleep(0.400)
|
|
|
|
|
|
def remove_evals_all_sections():
|
|
TERM = 184
|
|
SEM = "fa24"
|
|
|
|
# fetch list of courses?
|
|
GET_FRESH_LIST = 0
|
|
|
|
# just print, don't change anything
|
|
TEST_RUN = 0
|
|
|
|
# confirm each shell?
|
|
ASK = 0
|
|
|
|
# are we showing or hiding the course eval link?
|
|
HIDE = True
|
|
|
|
|
|
all_semester_courses = getCoursesInTerm(TERM, GET_FRESH_LIST, 1)
|
|
eval_course_ids = [ C['id'] for C in all_semester_courses ]
|
|
|
|
courses = { C['id']: C for C in all_semester_courses }
|
|
|
|
data = {'position':2, 'hidden':HIDE}
|
|
eval_course_ids.sort()
|
|
|
|
for i in eval_course_ids:
|
|
if TEST_RUN:
|
|
print(f"{courses[i]['id']} / {courses[i]['name']}")
|
|
else:
|
|
if ASK:
|
|
a = input(f"Hit q to quit, a to do all, or enter to activate eval for: {courses[i]['id']} / {courses[i]['name']} : ")
|
|
if a == 'a': ASK = 0
|
|
if a == 'q': return
|
|
else:
|
|
print(f"{courses[i]['id']} / {courses[i]['name']}")
|
|
u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{i}/tabs/context_external_tool_1953"
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(r3.text)
|
|
|
|
|
|
|
|
|
|
|
|
def get_ext_tools():
|
|
r = url + '/api/v1/accounts/1/external_tools'
|
|
s = fetch(r)
|
|
print(json.dumps(s,indent=2))
|
|
|
|
def set_ext_tools():
|
|
TOOL = 733
|
|
r = url + '/api/v1/accounts/1/external_tools/%s' % str(TOOL)
|
|
data = { 'course_navigation[default]': 'disabled' }
|
|
s = json.loads(requests.put(r, headers=header, params=data).text)
|
|
print(json.dumps(s,indent=2))
|
|
|
|
|
|
def get_course_ext_tools():
|
|
course_id = "15971"
|
|
r = url + f"/api/v1/courses/{course_id}/external_tools"
|
|
s = fetch(r)
|
|
print(json.dumps(s,indent=2))
|
|
|
|
|
|
def remove_n_analytics(section=0):
|
|
print("Fetching list of all active courses")
|
|
|
|
c = getCoursesInTerm(172,1,0)
|
|
print(c)
|
|
ids = []
|
|
courses = {}
|
|
data = {'hidden':True}
|
|
|
|
pause = 1
|
|
|
|
for C in c:
|
|
#print( json.dumps(C,indent=2) )
|
|
parts = C['sis_course_id'].split('-')
|
|
#print("\n")
|
|
print(C['name'])
|
|
courses[str(C['id'])] = C
|
|
ids.append(str(C['id']))
|
|
|
|
u3 = url + '/api/v1/courses/%s/tabs' % str(C['id'])
|
|
tabs = fetch(u3)
|
|
for T in tabs:
|
|
if T['label'] == "New Analytics":
|
|
print( "\tVisibility is: " + T["visibility"] ) # json.dumps(tabs,indent=2) )
|
|
if "hidden" in T:
|
|
print( "\tHidden is: " + str(T["hidden"]) ) # json.dumps(tabs,indent=2) )
|
|
if 1: # T["visibility"] != "admins":
|
|
u4 = url + "/api/v1/courses/%s/tabs/%s" % ( str(C['id']), str(T['id']) )
|
|
print( "\tChanging visiblity of a. tab" )
|
|
r4 = requests.put(u4, headers=header, params=data)
|
|
print("\t" + r4.text)
|
|
if pause:
|
|
xyz = input('\n\nenter for next one or [y] to do all: ')
|
|
if xyz == 'y': pause = 0
|
|
|
|
|
|
exit()
|
|
|
|
|
|
"""ask = 1
|
|
|
|
evals_hidden = True
|
|
|
|
|
|
data = {'position':2, 'hidden':evals_hidden}
|
|
|
|
for i in ids:
|
|
if ask:
|
|
a = input("Hit q to quit, a to do all, or enter to activate eval for: \n " + str(courses[i]) + "\n> ")
|
|
if a == 'a': ask = 0
|
|
if a == 'q': return
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % i
|
|
print(courses[i]['name'])
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(" " + r3.text)
|
|
time.sleep(0.300)
|
|
"""
|
|
|
|
import csv
|
|
|
|
def my_nav_filter(row):
|
|
# Filter logic: consider a tab ON only when visibility is public and not hidden; used for CSV summary.
|
|
if str(row.get('visibility', '')).lower() != 'public':
|
|
return False
|
|
hidden = row.get('hidden')
|
|
if str(hidden).lower() in ['true']:
|
|
return False
|
|
return True
|
|
|
|
|
|
## Multi-pass course nav tool. Pass 1: index + XLSX; Pass 2: optional hide/show ids.
|
|
def clean_course_nav_setup_semester(section=0):
|
|
# Multi-pass tool: index course nav, write XLSX with x/-/blank, then optionally bulk hide/show by label across the term.
|
|
TESTING = False # Limit to first 10 courses while testing
|
|
import openpyxl
|
|
from openpyxl.utils import get_column_letter
|
|
|
|
t = find_term(input("term? (ex: fa25) "))
|
|
|
|
if not t or (not 'canvas_term_id' in t) or (not 'code' in t):
|
|
print("Couldn't find term.")
|
|
return
|
|
|
|
term = t['canvas_term_id']
|
|
SEM = t['code']
|
|
|
|
print("Fetching list of all active courses")
|
|
courses_in_term = getCoursesInTerm(term, 1, 0)
|
|
if TESTING:
|
|
print("TESTING mode enabled: limiting to first 20 courses")
|
|
courses_in_term = courses_in_term[:20]
|
|
|
|
# Collect course records and their tabs
|
|
all_tabs_by_course = {} # course_id -> list of tab dicts
|
|
all_tab_ids = {} # tab_id -> label (most recent seen) — kept for Pass 2 id operations
|
|
all_labels = set() # all labels seen in any course (visible or hidden)
|
|
|
|
# Also write a detailed CSV of visible tabs as before
|
|
nav_out = codecs.open(f'cache/course_nav_summary_{SEM}.csv', 'w', 'utf-8')
|
|
nav_writer = csv.writer(nav_out)
|
|
columns = "id name code start state label position hidden visibility type url".split(" ")
|
|
nav_writer.writerow(columns)
|
|
|
|
for C in courses_in_term:
|
|
try:
|
|
cid = str(C['id'])
|
|
print(C['name'])
|
|
u3 = f"{url}/api/v1/courses/{C['id']}/tabs"
|
|
tabs = fetch(u3) or []
|
|
# Normalize hidden
|
|
for T in tabs:
|
|
if 'hidden' not in T:
|
|
T['hidden'] = "n/a"
|
|
# Track global set of tab ids and a sample label
|
|
all_tab_ids[str(T.get('id'))] = T.get('label', str(T.get('id')))
|
|
if T.get('label'):
|
|
all_labels.add(T.get('label'))
|
|
# Write summary of ON tabs
|
|
vals = [C['id'], C['name'], C['course_code'], C.get('start_at', ''), C.get('workflow_state', ''),
|
|
T.get('label', ''), T.get('position', ''), T.get('hidden', ''), T.get('visibility', ''),
|
|
T.get('type', ''), T.get('html_url', '')]
|
|
mydict = dict(zip(columns, vals))
|
|
if my_nav_filter(mydict):
|
|
nav_writer.writerow(vals)
|
|
nav_out.flush()
|
|
all_tabs_by_course[cid] = tabs
|
|
except Exception as err:
|
|
print(f"Exception: {err}")
|
|
|
|
try:
|
|
nav_out.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# Build XLSX matrix
|
|
try:
|
|
# Compute popularity (count of visible 'x' occurrences per label) and sort by popularity then alpha.
|
|
# First, build quick lookup per course for label visibility/hidden.
|
|
def tab_status(t):
|
|
vis = str(t.get('visibility', '')).lower()
|
|
hid = str(t.get('hidden', '')).lower()
|
|
if vis == 'public' and hid not in ['true']:
|
|
return 'x' # present and visible
|
|
return '-' # present but hidden (includes non-public visibility)
|
|
|
|
# Precompute per-course map: label -> status symbol ('x' or '-')
|
|
status_by_course = {}
|
|
for C in courses_in_term:
|
|
cid = str(C['id'])
|
|
label_to_status = {}
|
|
for tdict in all_tabs_by_course.get(cid, []) or []:
|
|
lbl = tdict.get('label')
|
|
if not lbl:
|
|
continue
|
|
cur = label_to_status.get(lbl)
|
|
new = tab_status(tdict)
|
|
# If any instance is visible, prefer 'x' over '-'
|
|
if cur == 'x':
|
|
continue
|
|
label_to_status[lbl] = 'x' if new == 'x' else (cur or '-')
|
|
status_by_course[cid] = label_to_status
|
|
|
|
# Popularity counts
|
|
visible_counts = defaultdict(int)
|
|
for cid, lmap in status_by_course.items():
|
|
for lbl, sym in lmap.items():
|
|
if sym == 'x':
|
|
visible_counts[lbl] += 1
|
|
|
|
# Column order: by decreasing x-count, then alphabetical for ties
|
|
tab_labels = sorted(all_labels, key=lambda s: (-visible_counts.get(s, 0), str(s).lower()))
|
|
xlsx_path = f"cache/course_nav_matrix_{SEM}.xlsx"
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "matrix"
|
|
|
|
# Headers
|
|
static_headers = ['course_id', 'course_name', 'first_teacher']
|
|
# Row 1: labels only, sorted by popularity (most x's first)
|
|
row1 = static_headers + list(tab_labels)
|
|
ws.append(row1)
|
|
|
|
# Rows: one per course
|
|
for C in courses_in_term:
|
|
cid = str(C['id'])
|
|
cname = C.get('name', '')
|
|
# First teacher
|
|
teacher = ''
|
|
try:
|
|
tlist = teacher_list(int(cid)) or []
|
|
if tlist:
|
|
# list of tuples: (id, name)
|
|
teacher = sorted([t[1] for t in tlist])[0]
|
|
except Exception:
|
|
pass
|
|
|
|
row = [cid, cname, teacher]
|
|
course_map = status_by_course.get(cid, {})
|
|
for lbl in tab_labels:
|
|
sym = course_map.get(lbl, '')
|
|
row.append(sym)
|
|
ws.append(row)
|
|
|
|
# Simple sizing for readability
|
|
for col_idx in range(1, ws.max_column + 1):
|
|
col_letter = get_column_letter(col_idx)
|
|
ws.column_dimensions[col_letter].width = 16 if col_idx > 3 else 20
|
|
|
|
wb.save(xlsx_path)
|
|
print(f"Wrote matrix: {xlsx_path}")
|
|
except Exception as ex:
|
|
print(f"Failed to write XLSX matrix: {ex}")
|
|
|
|
# Optional Pass 2: apply hide/show updates (labels only)
|
|
try_apply = input("Apply changes? [n] hide/show selected labels across all courses (y/N): ").strip().lower() in ['y', 'yes']
|
|
if not try_apply:
|
|
print("Pass 1 complete. No changes applied.")
|
|
return
|
|
|
|
# Gather labels to HIDE and SHOW
|
|
def parse_label_list(s):
|
|
if not s:
|
|
return []
|
|
# Allow comma-separated list. Spaces are preserved within labels.
|
|
if '\n' in s:
|
|
raw = [x.strip() for x in s.split('\n') if x.strip()]
|
|
else:
|
|
raw = [x.strip() for x in s.split(',') if x.strip()]
|
|
return raw
|
|
|
|
hide_src = input("Enter labels to HIDE (comma-separated or newline file path), leave blank to skip: ").strip()
|
|
show_src = input("Enter labels to SHOW (comma-separated or newline file path), leave blank to skip: ").strip()
|
|
|
|
hide_labels = []
|
|
show_labels = []
|
|
|
|
def read_labels_from_path(pth):
|
|
try:
|
|
with codecs.open(pth, 'r', 'utf-8') as f:
|
|
content = f.read()
|
|
# Prefer newline separation in files (one label per line)
|
|
return [x.strip() for x in content.splitlines() if x.strip()]
|
|
except Exception:
|
|
return []
|
|
|
|
if hide_src:
|
|
if os.path.exists(hide_src):
|
|
hide_labels = read_labels_from_path(hide_src)
|
|
elif os.path.exists(os.path.join('cache', hide_src)):
|
|
hide_labels = read_labels_from_path(os.path.join('cache', hide_src))
|
|
else:
|
|
hide_labels = parse_label_list(hide_src)
|
|
|
|
if show_src:
|
|
if os.path.exists(show_src):
|
|
show_labels = read_labels_from_path(show_src)
|
|
elif os.path.exists(os.path.join('cache', show_src)):
|
|
show_labels = read_labels_from_path(os.path.join('cache', show_src))
|
|
else:
|
|
show_labels = parse_label_list(show_src)
|
|
|
|
# De-duplicate while keeping order roughly intact
|
|
hide_labels = list(dict.fromkeys(hide_labels))
|
|
show_labels = list(dict.fromkeys(show_labels))
|
|
|
|
if not (hide_labels or show_labels):
|
|
print("No labels provided. Skipping Pass 2.")
|
|
return
|
|
|
|
print(f"HIDE labels: {hide_labels}")
|
|
print(f"SHOW labels: {show_labels}")
|
|
confirm = input("Proceed with updates? (y/N): ").strip().lower() in ['y', 'yes']
|
|
if not confirm:
|
|
print("Aborted. No changes applied.")
|
|
return
|
|
|
|
# Build lookup of labels currently ON per course from the CSV summary (to avoid unnecessary hide calls)
|
|
on_labels_by_course = defaultdict(set)
|
|
try:
|
|
csv_path = f'cache/course_nav_summary_{SEM}.csv'
|
|
with codecs.open(csv_path, 'r', 'utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
cid = str(row.get('id', '')).strip()
|
|
lbl = row.get('label')
|
|
if cid and lbl:
|
|
on_labels_by_course[cid].add(lbl)
|
|
except Exception as ex:
|
|
print(f"Warning: could not read CSV summary for ON labels: {ex}")
|
|
|
|
# Apply changes
|
|
for C in courses_in_term:
|
|
cid = str(C['id'])
|
|
try:
|
|
course_tabs = all_tabs_by_course.get(cid)
|
|
# Refresh tabs to reduce staleness just before applying
|
|
if course_tabs is None:
|
|
course_tabs = fetch(f"{url}/api/v1/courses/{cid}/tabs") or []
|
|
tabs_by_label = defaultdict(list)
|
|
for t in course_tabs:
|
|
lbl = t.get('label')
|
|
if lbl:
|
|
tabs_by_label[lbl].append(t)
|
|
|
|
# Helpers to PUT hide/show for a tab id
|
|
def set_hidden_for_id(tab_id, hidden_val):
|
|
put_url = f"{url}/api/v1/courses/{cid}/tabs/{tab_id}"
|
|
data = {'hidden': hidden_val}
|
|
r = requests.put(put_url, headers=header, params=data)
|
|
action = 'HIDE' if hidden_val else 'SHOW'
|
|
print(f"{action} {cid} {C.get('name','')} -> {tab_id}: {r.status_code}")
|
|
|
|
# Hide by labels (only if label is currently ON per CSV)
|
|
labels_to_hide_here = [lbl for lbl in hide_labels if lbl in on_labels_by_course.get(cid, set())]
|
|
for lbl in labels_to_hide_here:
|
|
for t in tabs_by_label.get(lbl, []):
|
|
tid = str(t.get('id'))
|
|
set_hidden_for_id(tid, True)
|
|
|
|
# Show by labels
|
|
for lbl in show_labels:
|
|
for t in tabs_by_label.get(lbl, []):
|
|
tid = str(t.get('id'))
|
|
set_hidden_for_id(tid, False)
|
|
except Exception as ex:
|
|
print(f"Failed applying to course {cid}: {ex}")
|
|
|
|
print("Pass 2 complete.")
|
|
|
|
|
|
def fetch_rubric_scores(course_id=16528, assignment_id=1):
|
|
api_url = f'{url}/api/v1/courses/{course_id}'
|
|
course_info = fetch(api_url)
|
|
|
|
out = codecs.open('cache/rubric_scores.txt','w','utf-8')
|
|
|
|
#print(course_info)
|
|
|
|
# Extract course details
|
|
course_name = course_info['name']
|
|
course_short_name = course_info['course_code']
|
|
course_semester = course_info['enrollment_term_id']
|
|
|
|
# Print course information
|
|
out.write(f"Course Name: {course_name}\n")
|
|
out.write(f"Short Name: {course_short_name}\n")
|
|
out.write(f"Semester: {course_semester}\n")
|
|
|
|
api_url = f'{url}/api/v1/courses/{course_id}/assignments'
|
|
assignments_list = fetch(api_url)
|
|
|
|
#print(assignments_list)
|
|
|
|
assignments_by_dept = {}
|
|
ratings_by_dept = {}
|
|
|
|
# Iterate through the list of assignments and populate the dictionary
|
|
for assignment in assignments_list:
|
|
assignment_id = assignment['id']
|
|
assignment_name = assignment['name']
|
|
rubric = assignment.get('rubric', []) # Get the rubric field (default to an empty list if not present)
|
|
|
|
has_rubric = 'no'
|
|
if rubric: has_rubric = 'yes'
|
|
|
|
out.write(f" Asmt Name: {assignment_name} ID: {assignment_id} Rubric: {has_rubric}\n")
|
|
|
|
# Save assignment details including rubric
|
|
assignments_by_dept[assignment_id] = {
|
|
'name': assignment_name,
|
|
'rubric': rubric
|
|
# Add more assignment details if needed
|
|
}
|
|
|
|
if rubric:
|
|
print("RUBRIC:")
|
|
print(json.dumps(rubric,indent=2))
|
|
for r in rubric:
|
|
for rat in r.get('ratings',[]):
|
|
ratings_by_dept[rat['id']] = { 'description': r['description'], 'long_description': rat['description'], 'points': rat['points']}
|
|
|
|
|
|
# Print the assignments dictionary
|
|
out.write(json.dumps(assignments_by_dept,indent=2)+'\n\n\n')
|
|
out.write(json.dumps(ratings_by_dept,indent=2)+'\n\n\n')
|
|
|
|
# Loop thru assignments with rubrics and report on grades
|
|
for assignment in assignments_list:
|
|
|
|
if not assignment.get('rubric', []):
|
|
continue
|
|
|
|
assignment_id = assignment['id']
|
|
out.write(f" Asmt Name: {assignment_name} ID: {assignment_id}\n")
|
|
|
|
api_url = f'{url}/api/v1/courses/{course_id}/assignments/{assignment_id}/submissions?include[]=rubric_assessment'
|
|
|
|
# Include the 'include[]=rubric_assessment' parameter to request rubric assessments
|
|
# params = {'include[]': 'rubric_assessment'}
|
|
|
|
# Make the API request with the parameters
|
|
#response = requests.get(api_url, params=params)
|
|
|
|
# Check if the request was successful (status code 200)
|
|
#if response.status_code != 200:
|
|
# print(f"Request failed with status code {response.status_code}")
|
|
# continue
|
|
|
|
submissions_by_dept = fetch(api_url)
|
|
|
|
|
|
# Iterate through the list of submissions and retrieve rubric scores and comments
|
|
for submission in submissions_by_dept:
|
|
user_id = submission['user_id']
|
|
rubric = submission.get('rubric_assessment', []) # Get the rubric assessment (empty list if not present)
|
|
comments = submission.get('submission_comments', '') # Get submission comments (empty string if not present)
|
|
score = submission.get('score', -1)
|
|
|
|
|
|
# Process and use rubric scores and comments as needed
|
|
# Example: Print user information, rubric scores, and comments
|
|
if rubric:
|
|
print(json.dumps(submission,indent=2))
|
|
|
|
out.write(f"\nSubmission User ID/Assignment ID: {user_id}/{assignment_id}\n")
|
|
out.write(f"Score: {score}\n")
|
|
out.write(f"Submission Comments: {comments}\n")
|
|
out.write(f"Rubric:\n")
|
|
for k,v in rubric.items():
|
|
rub_by_dept = '?'
|
|
rat_by_dept = '?'
|
|
if v['rating_id'] in ratings_by_dept:
|
|
rub_rating = ratings_by_dept[v['rating_id']]
|
|
rub_by_dept = rub_rating['rub_by_deptription']
|
|
rat_by_dept = rub_rating['rat_by_deptription']
|
|
out.write(f" {rub_by_dept} - {rat_by_dept} ({v['rating_id']}): {v['points']}/{rub_rating['points']} points: {v['comments']}\n")
|
|
out.write("---") # Separator between submissions
|
|
out.flush()
|
|
|
|
|
|
|
|
|
|
|
|
def quick_sem_course_list(term=180):
|
|
c = getCoursesInTerm(term,1,0)
|
|
c = sorted(c, key=lambda k: k['name'])
|
|
for C in c:
|
|
print(C['name'])
|
|
|
|
|
|
# Check Canvas for an existing calendar event that matches the provided metadata.
|
|
def find_existing_calendar_event(context_code, title, start_at_iso, description="", tolerance_hours=12):
|
|
def _normalize_iso(value):
|
|
if not value:
|
|
return None
|
|
if value.endswith('Z'):
|
|
value = value[:-1] + '+00:00'
|
|
try:
|
|
return datetime.fromisoformat(value)
|
|
except ValueError:
|
|
return None
|
|
|
|
target_start = _normalize_iso(start_at_iso)
|
|
if not target_start:
|
|
return None
|
|
|
|
window_start = (target_start - timedelta(hours=tolerance_hours)).date().isoformat()
|
|
window_end = (target_start + timedelta(hours=tolerance_hours)).date().isoformat()
|
|
|
|
params = {
|
|
"context_codes[]": context_code,
|
|
"start_date": window_start,
|
|
"end_date": window_end,
|
|
}
|
|
|
|
existing_events = fetch("/api/v1/calendar_events", params=params)
|
|
if not isinstance(existing_events, list):
|
|
print(f"Unable to inspect existing events for context {context_code}: unexpected response")
|
|
return None
|
|
|
|
normalized_title = title.strip().lower() if isinstance(title, str) else ""
|
|
normalized_description = description.strip().lower() if isinstance(description, str) else ""
|
|
|
|
for event in existing_events:
|
|
event_title = (event.get('title') or "").strip().lower()
|
|
event_description = (event.get('description') or "").strip().lower()
|
|
event_start = _normalize_iso(event.get('start_at') or "")
|
|
if not event_start:
|
|
continue
|
|
time_difference = abs((event_start - target_start).total_seconds())
|
|
if time_difference > tolerance_hours * 3600:
|
|
continue
|
|
if event_title == normalized_title:
|
|
return event
|
|
if normalized_description and event_description == normalized_description:
|
|
return event
|
|
return None
|
|
|
|
|
|
# Remove all calendar events attached to a course after user confirmation.
|
|
def remove_all_course_events():
|
|
course_id = input("course id> ").strip()
|
|
if not course_id:
|
|
print("No course id provided; aborting.")
|
|
return
|
|
context_code = course_id if course_id.startswith("course_") else f"course_{course_id}"
|
|
today = datetime.now(timezone.utc).date()
|
|
start_date = (today - timedelta(days=730)).isoformat()
|
|
end_date = (today + timedelta(days=365)).isoformat()
|
|
print(f"Fetching existing events for {context_code} between {start_date} and {end_date}...")
|
|
params = {
|
|
"context_codes[]": context_code,
|
|
"per_page": 100,
|
|
"start_date": start_date,
|
|
"end_date": end_date,
|
|
}
|
|
events = fetch("/api/v1/calendar_events", params=params)
|
|
if not events:
|
|
print("No events found for this course.")
|
|
return
|
|
|
|
print(f"Found {len(events)} events. Beginning removal...")
|
|
for event in events:
|
|
event_id = event.get("id")
|
|
event_title = event.get("title", "(no title)")
|
|
if not event_id:
|
|
print(f"Skipping event '{event_title}' with missing id")
|
|
continue
|
|
print(f"Deleting event '{event_title}' (id {event_id}) in {context_code}...", end=' ')
|
|
delete_url = f"{url}/api/v1/calendar_events/{event_id}"
|
|
response = requests.delete(delete_url, headers=header)
|
|
if response.ok:
|
|
print("deleted successfully")
|
|
else:
|
|
print(f"failed: {response.status_code} {response.text}")
|
|
|
|
|
|
# Build a term-wide CSV summarizing student participation metrics for every course.
|
|
def build_term_participation_report():
|
|
term_alias = input("Term alias (ex: fa25): ").strip()
|
|
if not term_alias:
|
|
print("No term alias provided; aborting.")
|
|
return
|
|
|
|
normalized_alias = term_alias.lower()
|
|
term_record = find_term(normalized_alias)
|
|
if not term_record:
|
|
print(f"Unknown term alias: {term_alias}")
|
|
return
|
|
|
|
term_id = term_record.get('canvas_term_id')
|
|
if not term_id:
|
|
print(f"Canvas term id missing for {term_alias}")
|
|
return
|
|
|
|
term_code = (term_record.get('code') or normalized_alias).lower()
|
|
mode_choice = input("Demo run with ~10 random courses? (y/N): ").strip().lower()
|
|
demo_mode = mode_choice == 'y'
|
|
courses = getCoursesInTerm(term_id, get_fresh=0, show=0)
|
|
if not isinstance(courses, list):
|
|
print("Unable to fetch courses for this term; aborting.")
|
|
return
|
|
|
|
if demo_mode:
|
|
random.shuffle(courses)
|
|
print("Demo mode: targeting up to 10 courses with analytics data")
|
|
|
|
output_path = f"cache/{term_code}_participation.csv"
|
|
base_fields = [
|
|
'term_code',
|
|
'course_id',
|
|
'course_name',
|
|
'course_sis_id',
|
|
'course_code',
|
|
'student_canvas_id',
|
|
'student_sortable_name',
|
|
'student_sis_user_id',
|
|
'student_login_id',
|
|
'student_name',
|
|
'student_email',
|
|
]
|
|
rows = []
|
|
data_fields = set()
|
|
|
|
def flatten_value(prefix, value, dest):
|
|
if isinstance(value, dict):
|
|
for key, val in value.items():
|
|
next_key = f"{prefix}.{key}" if prefix else str(key)
|
|
flatten_value(next_key, val, dest)
|
|
elif isinstance(value, list):
|
|
dest[prefix] = json.dumps(value)
|
|
else:
|
|
dest[prefix] = value
|
|
|
|
processed_courses = 0
|
|
for course in courses:
|
|
if demo_mode and processed_courses >= 10:
|
|
break
|
|
course_id = course.get('id')
|
|
if not course_id:
|
|
continue
|
|
course_name = course.get('name', '')
|
|
print(f"Fetching analytics for course {course_id}: {course_name}")
|
|
enrollment_index = {}
|
|
try:
|
|
enrollment_params = {
|
|
'type[]': 'StudentEnrollment',
|
|
'per_page': 100,
|
|
}
|
|
enrollments = fetch(f"/api/v1/courses/{course_id}/enrollments", params=enrollment_params)
|
|
if isinstance(enrollments, list):
|
|
for enrollment in enrollments:
|
|
user = enrollment.get('user') or {}
|
|
user_id = user.get('id') or enrollment.get('user_id')
|
|
if not user_id:
|
|
continue
|
|
entry = {
|
|
'sortable_name': user.get('sortable_name', ''),
|
|
'sis_user_id': user.get('sis_user_id', ''),
|
|
'login_id': user.get('login_id', ''),
|
|
'sis_login_id': user.get('sis_login_id', ''),
|
|
'email': user.get('email', ''),
|
|
'name': user.get('name', ''),
|
|
}
|
|
enrollment_index[user_id] = entry
|
|
enrollment_index[str(user_id)] = entry
|
|
except Exception as exc:
|
|
print(f"Failed to fetch enrollments for {course_id}: {exc}")
|
|
|
|
try:
|
|
summaries = fetch(f"/api/v1/courses/{course_id}/analytics/student_summaries")
|
|
except Exception as exc:
|
|
print(f"Failed to fetch analytics for {course_id}: {exc}")
|
|
continue
|
|
|
|
if not isinstance(summaries, list):
|
|
print(f"Unexpected analytics payload for {course_id}; skipping")
|
|
continue
|
|
|
|
course_rows_added = 0
|
|
for summary in summaries:
|
|
flattened = {}
|
|
flatten_value('', summary, flattened)
|
|
user_id = (
|
|
summary.get('id')
|
|
or summary.get('user_id')
|
|
or flattened.get('user_id')
|
|
or flattened.get('user.id')
|
|
)
|
|
enrollment_details = {}
|
|
if user_id in enrollment_index:
|
|
enrollment_details = enrollment_index[user_id]
|
|
elif isinstance(user_id, str) and user_id.isdigit():
|
|
enrollment_details = enrollment_index.get(int(user_id), {})
|
|
elif isinstance(user_id, int):
|
|
enrollment_details = enrollment_index.get(str(user_id), {})
|
|
row = {
|
|
'term_code': term_code,
|
|
'course_id': str(course_id),
|
|
'course_name': course_name,
|
|
'course_sis_id': course.get('sis_course_id', ''),
|
|
'course_code': course.get('course_code', ''),
|
|
'student_canvas_id': str(user_id) if user_id else '',
|
|
'student_sortable_name': enrollment_details.get('sortable_name') or '',
|
|
'student_sis_user_id': (enrollment_details.get('sis_user_id') or enrollment_details.get('sis_login_id')) or '',
|
|
'student_login_id': enrollment_details.get('login_id') or '',
|
|
'student_name': enrollment_details.get('name') or '',
|
|
'student_email': enrollment_details.get('email') or '',
|
|
}
|
|
if enrollment_details:
|
|
data_fields.add('student_name')
|
|
data_fields.add('student_email')
|
|
for key, value in flattened.items():
|
|
if not key:
|
|
continue
|
|
row[key] = value
|
|
data_fields.add(key)
|
|
rows.append(row)
|
|
course_rows_added += 1
|
|
|
|
if course_rows_added == 0:
|
|
print(f"Skipping course {course_id}: no student analytics data")
|
|
continue
|
|
|
|
processed_courses += 1
|
|
|
|
if demo_mode and processed_courses < 10:
|
|
print(f"Demo mode finished early: only {processed_courses} courses had analytics data")
|
|
|
|
if not rows:
|
|
print("No analytics data found; nothing to write.")
|
|
return
|
|
|
|
field_order = base_fields + sorted([field for field in data_fields if field not in base_fields])
|
|
print(f"Writing {len(rows)} rows to {output_path}")
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=field_order)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
writer.writerow({field: row.get(field, '') for field in field_order})
|
|
|
|
|
|
# Summarize student orientation enrollments across the account and flag coverage gaps.
|
|
def audit_student_orientation_enrollments():
|
|
orientation_years = ['2022', '2023', '2024', '2025', '2026']
|
|
|
|
orientation_shells = get_orientation_shells(orientation_years)
|
|
missing_years = [year for year in orientation_years if year not in orientation_shells]
|
|
if missing_years:
|
|
print(f"Warning: orientation shells not found for years: {', '.join(missing_years)}")
|
|
if not orientation_shells:
|
|
print("No orientation courses located; aborting.")
|
|
return
|
|
|
|
orientation_memberships = get_orientation_memberships(orientation_years)
|
|
student_summaries = get_student_enrollment_summary()
|
|
|
|
if not student_summaries:
|
|
print("No student enrollment data available; aborting.")
|
|
return
|
|
|
|
rows = []
|
|
for summary in student_summaries:
|
|
user_id = summary.get('user_id')
|
|
user_key = str(user_id)
|
|
membership = orientation_memberships.get(user_key, {'years': set(), 'total': 0})
|
|
membership_years = membership.get('years', set())
|
|
orientation_total = membership.get('total', 0)
|
|
|
|
row = {
|
|
'student_id': user_key,
|
|
'sortable_name': summary.get('sortable_name') or summary.get('name') or '',
|
|
'sis_id': summary.get('sis_user_id') or '',
|
|
'student_enrollment_count': summary.get('student_enrollments', 0),
|
|
'teacher_enrollment_count': summary.get('teacher_enrollments', 0),
|
|
'orientation_enrollment_total': orientation_total,
|
|
'missing_student_orientation': 1 if orientation_total == 0 else 0,
|
|
}
|
|
|
|
for year in orientation_years:
|
|
row[year] = 1 if year in membership_years else 0
|
|
|
|
rows.append(row)
|
|
|
|
if not rows:
|
|
print("No rows to write; aborting.")
|
|
return
|
|
|
|
rows.sort(key=lambda r: (r.get('missing_student_orientation', 0), r.get('sortable_name', '')))
|
|
output_path = 'cache/student_orientation_audit.csv'
|
|
fieldnames = [
|
|
'student_id',
|
|
'sortable_name',
|
|
'sis_id',
|
|
'student_enrollment_count',
|
|
'teacher_enrollment_count',
|
|
*orientation_years,
|
|
'orientation_enrollment_total',
|
|
'missing_student_orientation'
|
|
]
|
|
|
|
print(f"Writing {len(rows)} rows to {output_path}")
|
|
with open(output_path, 'w', newline='', encoding='utf-8') as handle:
|
|
writer = csv.DictWriter(handle, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
writer.writerow({field: row.get(field, '') for field in fieldnames})
|
|
|
|
missing_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) == 0)
|
|
multi_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) > 1)
|
|
print(f"Orientation audit complete. Missing: {missing_count}, duplicates: {multi_count}.")
|
|
|
|
|
|
# Create Canvas calendar events for predefined orientation shells from CSV input.
|
|
def create_calendar_event():
|
|
events = codecs.open('cache/academic_calendar_2025.csv','r','utf-8').readlines()
|
|
|
|
orientation_shells = ["course_15924","course_19094","course_20862", "course_23313"]
|
|
|
|
for ori_shell in orientation_shells:
|
|
for e in events:
|
|
if not e.strip():
|
|
continue
|
|
parts = [part.strip() for part in e.split(',', 2)]
|
|
if len(parts) < 3:
|
|
continue
|
|
date, title, desc = parts
|
|
local = pytz.timezone("America/Los_Angeles")
|
|
naive = datetime.strptime(date, "%Y-%m-%d")
|
|
local_dt = local.localize(naive, is_dst=None)
|
|
utc_dt = local_dt.astimezone(pytz.utc).isoformat()
|
|
|
|
print(f"Checking event '{title}' ({date}) in {ori_shell}...", end=' ')
|
|
existing_event = find_existing_calendar_event(ori_shell, title, utc_dt, desc)
|
|
if existing_event:
|
|
existing_id = existing_event.get('id')
|
|
print(f"exists as id {existing_id} in {ori_shell}, skipping add")
|
|
continue
|
|
print(f"no existing event in {ori_shell}, attempting add")
|
|
|
|
params = {
|
|
"calendar_event[context_code]": ori_shell,
|
|
"calendar_event[title]": title,
|
|
"calendar_event[description]": desc,
|
|
"calendar_event[start_at]": utc_dt, # DateTime
|
|
"calendar_event[all_by_dept": "true",
|
|
}
|
|
|
|
u = url + "/api/v1/calendar_events"
|
|
res = requests.post(u, headers = header, params=params)
|
|
if res.ok:
|
|
try:
|
|
result = json.loads(res.text)
|
|
except json.JSONDecodeError:
|
|
print(f"add completed for '{title}' in {ori_shell} (status {res.status_code}) but response parse failed")
|
|
continue
|
|
new_id = result.get("id")
|
|
if new_id:
|
|
print(f"added successfully as id {new_id} in {ori_shell} (status {res.status_code})")
|
|
elif "errors" in result:
|
|
print(f"add failed for '{title}' in {ori_shell}: {result['errors']}")
|
|
else:
|
|
print(f"add attempted for '{title}' in {ori_shell} with unexpected response {result}")
|
|
else:
|
|
print(f"add failed for '{title}' in {ori_shell}: {res.status_code} {res.text}")
|
|
|
|
def utc_to_local(utc_str):
|
|
if not utc_str: return ""
|
|
utc_dt = datetime.strptime(utc_str, '%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
# Set the UTC timezone
|
|
utc_tz = pytz.timezone('UTC')
|
|
|
|
# Convert the UTC datetime to the Pacific Time Zone
|
|
pacific_tz = pytz.timezone('US/Pacific')
|
|
pacific_dt = pytz.timezone(pacific_tz)
|
|
|
|
return pacific_dt.strftime('%a %b %d, %Y %#I:%M%p')
|
|
|
|
def list_all_assignments():
|
|
course = input("the course id> ")
|
|
u = url + f"/api/v1/courses/{course}/assignments"
|
|
c = fetch(u)
|
|
#print(json.dumps(c,indent=2))
|
|
for a in c:
|
|
p = 'not published'
|
|
if a['published'] == True: p = 'published'
|
|
date = utc_to_local(a['due_at'])
|
|
print(f"{a['name']}\t{p}\t{date}")
|
|
|
|
|
|
def bulk_unenroll():
|
|
course_id = input("course id> ")
|
|
enrollments = fetch(f"{url}/api/v1/courses/{course_id}/enrollments")
|
|
|
|
for enrollment in enrollments:
|
|
enrollment_id = enrollment['id']
|
|
#skiplist = ['51237','58362','237']
|
|
#if enrollment_id in skiplist:
|
|
# continue
|
|
|
|
# Set the headers and parameters for the DELETE API call
|
|
api_url = f"{url}/api/v1/courses/{course_id}/enrollments/{enrollment_id}"
|
|
|
|
# Make the DELETE request
|
|
response = requests.delete(api_url, headers=header)
|
|
|
|
# Check the response
|
|
if response.status_code == 200:
|
|
print(f"Successfully unenrolled student with id {enrollment_id} from course {course_id}.")
|
|
else:
|
|
print(f"Failed to unenroll student with id {enrollment_id} from course {course_id}. Error: {response.text}")
|
|
|
|
|
|
def fetch_announcements(course_id=0):
|
|
if not course_id:
|
|
course_id = input("course id> ")
|
|
announcements_url = f"{url}/api/v1/announcements?context_codes[]=course_{course_id}&start_date=2025-01-01&end_date=2025-12-31"
|
|
announcements = fetch(announcements_url)
|
|
|
|
print(json.dumps(announcements,indent=2))
|
|
|
|
filename = f"cache/announcements_{course_id}.json"
|
|
with open(filename, "w") as file:
|
|
json.dump(announcements, file,indent=2)
|
|
|
|
print("Announcements saved to ", filename)
|
|
|
|
|
|
def change_link_in_all_terms_pages():
|
|
old_link = "https://www.gavilan.edu/ezproxy"
|
|
new_link = "https://www.gavilan.edu/ezproxy_new"
|
|
|
|
term = 181
|
|
|
|
courses = getCoursesInTerm(term,get_fresh=1,show=0,active=1)
|
|
|
|
def enrollment_helper():
|
|
|
|
ignore = ['JLE','JFT', 'CWE']
|
|
ignore2 = ['AH 190', 'AE 600', 'AE 602', 'AE 603','ACCT 190','AJ 100A', 'AJ 107A', 'AJ 213A','AJ 229A','AJ 231A','AMT 190','ATH 23','BUS 190','CD 190','COS 290','WTRM 290','SPAN 8A', 'SPAN 8B', 'SPAN 8C', 'SPAN 8D', 'RE 190','MKTG 190']
|
|
keep = 'code,name,days,cap,act,teacher,date,partofday,type,site'.split(',')
|
|
oo = codecs.open('cache/section_history.json','w','utf-8')
|
|
# fetch enrollment stats for last few years
|
|
from semesters import code, sems_by_short_name, short_to_sis
|
|
from util import dept_from_name
|
|
raw = []
|
|
code.reverse()
|
|
sort = defaultdict(dict)
|
|
for s in sems_by_short_name.keys():
|
|
try:
|
|
sched1 = requests.get(f"http://gavilan.cc/schedule/{s}_sched_expanded.json").json()
|
|
sort[s] = defaultdict(dict)
|
|
for sect in sched1:
|
|
if sect['name'] in ignore2:
|
|
continue
|
|
sect_smaller = funcy.project(sect,keep)
|
|
sect_smaller['sem'] = short_to_sis(s)
|
|
if int(sect_smaller['cap'])==0 or int(sect_smaller['act'])==0:
|
|
sect_smaller['fill_pct'] = 100
|
|
else:
|
|
sect_smaller['fill_pct'] = round( (int(sect_smaller['act']) / int(sect_smaller['cap']))*100 )
|
|
d = dept_from_name(sect_smaller['code'])
|
|
if d in ignore:
|
|
continue
|
|
sect_smaller['dept'] = d
|
|
raw.append(sect_smaller)
|
|
|
|
if not d in sort[s]:
|
|
sort[s][d] = defaultdict(dict)
|
|
name = sect['code']
|
|
if not name in sort[s][d]:
|
|
sort[s][d][name] = []
|
|
sort[s][d][name].append(sect_smaller)
|
|
print(f"{s} OK.")
|
|
except Exception as e:
|
|
print(f"{s} not found. {e}")
|
|
#sems.pop(s)
|
|
oo.write(json.dumps(sort,indent=2))
|
|
|
|
df = pd.DataFrame(raw)
|
|
df_sorted = df.sort_values(['dept', 'code', 'type','site','partofday','fill_pct'])
|
|
df_sorted.to_csv('cache/section_history.csv')
|
|
|
|
class_counts = df.groupby(['sem', 'code']).size().reset_index(name='class_count')
|
|
print("Class counts by semester")
|
|
print(class_counts)
|
|
pivot_df = class_counts.pivot_table(index='code', columns='sem', values='class_count', aggfunc='sum', fill_value=0)
|
|
# Reset the index to move 'class_name' back to a column
|
|
pivot_df.reset_index(inplace=True)
|
|
print(pivot_df)
|
|
pivot_df.to_csv('cache/section_counts_history.csv')
|
|
|
|
|
|
# Group by semester and class type, and then count the number of occurrences of each class type
|
|
class_type_counts = df.groupby(['sem', 'code', 'type']).size().reset_index(name='class_type_count')
|
|
print("Class type by semester")
|
|
print(class_type_counts)
|
|
pivot_df2 = class_type_counts.pivot_table(index='code', columns=['sem','type'], values='class_type_count', aggfunc='sum', fill_value=0)
|
|
# Reset the index to move 'class_name' back to a column
|
|
pivot_df2.reset_index(inplace=True)
|
|
|
|
'''kmeans = try_clustering(pivot_df2.copy())
|
|
|
|
pivot_df2.insert(0, "Cluster", kmeans.labels_)
|
|
|
|
print(pivot_df2)
|
|
pivot_df2.to_csv('cache/section_and_mode_counts_history.csv')
|
|
|
|
|
|
# Group by teacher
|
|
class_teacher_counts = df.groupby(['sem', 'code', 'teacher']).size().reset_index(name='class_teacher_count')
|
|
print("Class teacher by semester")
|
|
print(class_teacher_counts)
|
|
|
|
# group by COURSE (ie: ENGL1A)
|
|
|
|
# For each historical WINTER, SPRING, SUMMER, FALL:
|
|
|
|
# number of sections offered, by mode, time of day, campus
|
|
|
|
# all teachers who taught it (and their qual to teach online)
|
|
|
|
# fill percentage for each section, then by mode, tod, campus
|
|
|
|
## moved: try_clustering now in search.py
|
|
# Add labels to the DataFrame
|
|
#df['clusters'] = labels
|
|
#print(df)
|
|
#df.to_csv('cache/section_and_mode_counts_history_clusters.csv')
|
|
return kmeans
|
|
'''
|
|
|
|
|
|
def unpublish_a_course(course_id=0):
|
|
if course_id == 0:
|
|
course_id = input('course id? ')
|
|
u = url + f"/api/v1/courses/{course_id}"
|
|
data = { 'course[event]':'claim' }
|
|
r = requests.put(u, data=data, headers=header)
|
|
print(r.text)
|
|
|
|
|
|
def course_log():
|
|
course_id = 19566
|
|
L = fetch(f"{url}/api/v1/audit/course/courses/{course_id}")
|
|
print(json.dumps(L,indent=2))
|
|
|
|
def fetch_rubric():
|
|
course = 21274
|
|
r_id = 35961
|
|
u = f"{url}/api/v1/courses/{course}/rubrics/{r_id}"
|
|
|
|
result = fetch(u)
|
|
#print(json.dumps(result,indent=2))
|
|
|
|
rows = []
|
|
|
|
for row in result['data']:
|
|
r = []
|
|
r.append(f"<td style='vertical-align:top;'><b>{row['description']}</b><br />{row['long_description']}</td>")
|
|
for item in row['ratings']:
|
|
r.append(f"<td style='vertical-align:top;'><u>{item['description']}</u><br />{item['long_description']}<br /><i>{item['points']} points</i></td>")
|
|
|
|
rows.append("<tr>" + "\n".join( r ) + "</tr>\n")
|
|
output = f"<h3>{result['title']}</h3>\n"
|
|
output += "<table border='1'>" + ''.join( [ f"<tr>{x}</tr>\n" for x in rows] ) + "</table>\n"
|
|
|
|
print(output)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
options = { 1: ['Cross check schedule with ztc responses',make_ztc_list] ,
|
|
2: ['Add announcements to homepage', change_course_ann_homepage],
|
|
3: ['Unpublish a course', unpublish_a_course],
|
|
4: ['List the terms', getTerms],
|
|
5: ['Show courses in a term', getCoursesInTerm],
|
|
6: ['Save enrollments in a course', course_enrollment],
|
|
7: ['Simple list of course data, search by sis_id', course_search_by_sis],
|
|
8: ['Overview of a term', course_term_summary],
|
|
9: ['process the semester overview output (8)', course_term_summary_2],
|
|
|
|
10: ['Enroll orientation students (refresh local db first)', enroll_orientation_students],
|
|
11: ['Enroll ORIENTATION and STEM student shells after catching up database.', enroll_o_s_students],
|
|
12: ['Enroll stem students', enroll_stem_students_live],
|
|
13: ['Enroll ART students', enroll_art_students_live],
|
|
|
|
20: ['Get a course info by id',getCourses],
|
|
21: ['Reset course conclude date',update_course_conclude],
|
|
22: ['Create calendar events for orientation shells', create_calendar_event],
|
|
23: ['Remove all calendar events from a course', remove_all_course_events],
|
|
24: ['Build participation report for a term', build_term_participation_report],
|
|
25: ['Audit student orientation enrollments', audit_student_orientation_enrollments],
|
|
26: ['list all assignments', list_all_assignments],
|
|
27: ['Bulk unenroll from course', bulk_unenroll],
|
|
28: ['enrollment helper', enrollment_helper],
|
|
29: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid],
|
|
|
|
30: ['* Overview semester start dates',overview_start_dates],
|
|
31: ['Fine tune term dates and winter session', course_by_depts_terms],
|
|
32: ['Set summer start dates', set_custom_start_dates],
|
|
33: ['Cross list, ask for sections', ez_xlist],
|
|
34: ['Cross list a semester from argos export file', semester_cross_lister],
|
|
35: ['Cross list from manually created file', do_manual_xlist],
|
|
36: ['Cross list CWE courses', xlist_cwe],
|
|
|
|
40: ['Enroll GOTT Workshops', enroll_gott_workshops],
|
|
41: ['Create some sandbox courses', create_sandboxes],
|
|
42: ['Add teacher to many shells', teacher_to_many_shells],
|
|
44: ['List users who passed GOTT 1 / Bootcamp', get_gott1_passers],
|
|
45: ['List users who passed Plagiarism Module', get_plague_passers],
|
|
|
|
50: ['Fetch rubric scores and comments', fetch_rubric_scores],
|
|
51: ['Fetch announcements in a course', fetch_announcements],
|
|
52: ['show course audit log', course_log],
|
|
53: ['fetch a rubric', fetch_rubric],
|
|
|
|
# --- Course Nav / External Tools ---
|
|
70: ['ext tools',get_ext_tools],
|
|
71: ['set ext tools',set_ext_tools],
|
|
72: ['Get course ext tools', get_course_ext_tools],
|
|
73: ['Remove "new analytics" from all courses navs in a semester', remove_n_analytics],
|
|
74: ['Add course evals', add_evals],
|
|
75: ['Remove course evals all sections', remove_evals_all_sections],
|
|
76: ['Course nav: index + bulk hide/show (one semester)', clean_course_nav_setup_semester],
|
|
77: ['Nav: add GavConnect to list of course ids', add_gav_connect_prompt_list],
|
|
78: ['Nav: add Pathways to all courses in a term (default OFF)', add_pathways_all_courses_in_term],
|
|
79: ['Nav: ensure Pathways exists for a term (add if missing, OFF)', ensure_pathways_in_term],
|
|
|
|
# 24: ['Add course evals to whole semester',instructor_list_to_activate_evals],
|
|
# 21: ['Add announcements to homepage', change_course_ann_homepage],
|
|
#32: ['Cross-list classes', xlist ],
|
|
#33: ['Cross list helper', eslCrosslister],
|
|
##55: ['Check all courses & their sections in semester', all_semester_course_sanity_check],
|
|
#4: ['List students who passed quiz X', get_quiz_passers],
|
|
# TODO wanted: group shell for each GP (guided pathway) as a basic student services gateway....
|
|
#
|
|
|
|
}
|
|
print ('')
|
|
|
|
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
|
|
resp = int(sys.argv[1])
|
|
print("\n\nPerforming: %s\n\n" % options[resp][0])
|
|
|
|
else:
|
|
print ('')
|
|
for key in options:
|
|
print(str(key) + '.\t' + options[key][0])
|
|
|
|
print('')
|
|
resp = input('Choose: ')
|
|
|
|
# Call the function in the options dict
|
|
options[ int(resp)][1]()
|