import json, re, requests, codecs, sys, time, funcy, os, csv, random
import pandas as pd
from datetime import datetime, timedelta, timezone
import pytz
from util import print_table, int_or_zero, float_or_zero, dept_from_name, num_from_name
from pipelines import fetch, fetch_stream, fetch_collapse, header, url
from schedules import get_semester_schedule
from localcache import course_quick_stats, get_courses_in_term_local, course_student_stats, all_sem_courses_teachers, full_reload
from localcache2 import (
db,
users_new_this_semester,
users_new_this_2x_semester,
course_from_id,
user_ids_in_shell,
student_count,
teacher_list,
course_sched_entry_from_id,
get_orientation_shells,
get_orientation_memberships,
get_student_enrollment_summary,
)
from collections import defaultdict
from semesters import find_term
#from dateutil import parser
#from ast import Try, TryStar
#from symbol import try_stmt
#from pipelines import sems
stem_course_id = '11015' # TODO
#########
######### GET FACTS FROM INDIVIDUAL COURSES
#########
#########
# Gott 1 Bootcamp - report on who completed it.
def get_gott1_passers():
course = '1561'
min_passing = 85
passers_filename = 'cache/teacherdata/bootcamp_passed.csv'
still_active_filename = 'cache/teacherdata/bootcamp_active.csv'
#get_course_passers(course, min_passing, passers_filename, still_active_filename)
# Plagiarism Module - report on who completed it.
def get_plague_passers():
course = '11633'
min_passing = 85
passers_filename = 'cache/teacherdata/plagiarism_passed.csv'
still_active_filename = 'cache/teacherdata/plagiarism_active.csv'
"""
(passed, didnt) = get_course_passers(course, min_passing, passers_filename, still_active_filename)
passed = set( [z[2] for z in passed] )
didnt = set( [z[2] for z in didnt] )
enrol = [ [ str(z) for z in list(course_enrollment(cr)) ] for cr in ['11677','11698'] ]
print(enrol)
enrol = set(funcy.cat(enrol))
everyone = passed.union(didnt,enrol)
reportable = passed.intersection(enrol)
outputfile = open('cache/plagcheck.txt','w').write( json.dumps( [ list(reportable), list(enrol), list(passed), list(didnt), list(everyone) ],indent=2))
return 1
#enrol = { cr: [ str(z) for z in list(course_enrollment(cr).keys()) ] for cr in ['11677','11698',] }
# # [x['user_id'] for x in course_enrollment(cr)]
outputfile = open('cache/plagcheck.txt','w').write( json.dumps( [ [z[2] for z in passed],[z[2] for z in didnt],enrol],indent=2))
return 1
passed = {}
didnt = {}
output_by_course = {}
course_s = {}
for p in passed: passed_by_deptr(p[2])] = p
for p in didnt: didnt_d(p[2])] = p
passed_s = [ str(k) for k in passed_d() ]
didnt_s = [ str(k) for k in didnt_by_deptys() ]
crossref = ['11677','11698',]
outputfile = open('cache/plagcheck.txt','w')
oo = { 'passed': passed_by_deptdidnt': didnt_by_dept
for cr in crossref:
student_int = course_enrollment(cr)
student_by_dict{ str(k): v for k,v in student_int.items() }
oo[cr] = student_by_dict
output_by_course[cr] = { 'passed':{}, 'didnt':{}, 'missing':{} }
course_s[cr] = set( [ str(k) for k in student_by_dict.keys() ])
for k,v in student_by_dict.items():
key_s = str(k)
if key_s in passed_by_dict output_by_course[cr]['passed'][key_s] = passed_by_dicty_s]
elif key_s in didnt_by_dict output_by_course[cr]['didnt'][key_s] = didnt_by_dicty_s]
else:
output_by_course[cr]['missing'][key_s] = v['user']
oo['final_output'] = output_by_course
oo['passed_s'] = list(passed_s)
oo['didnt_s'] = list(didnt_s)
course_sd = {k: list(v) for k,v in course_s.items() }
oo['course_s'] = course_sd
outputfile.write(json.dumps(oo,indent=2))
# Who, in a class, passed?
def get_course_passers(course, min_passing, passers_filename, still_active_filename):
path = url + '/api/v1/courses/%s/enrollments' % str(course)
tempout = open('cache/passers_temp.txt','w')
enrl = fetch( path, 0)
passed = []
didnt = []
for E in enrl:
try:
n = E['user']['name']
oo = E['user']['sis_user_id']
i = str(E['user_id'])
r = E['role']
g = E['grades']['current_score']
l = E['last_activity_at']
p = float_or_zero(g) > min_passing
print( "%s: a %s, grade of %s. Passed? %s. Last seen: %s" % (n,r,str(g),str(p),l) )
tempout.write(json.dumps(E['user']['name']) + "\n")
tempout.write(json.dumps(E['grades'],indent=2) + "\n\n-----\n\n")
if p:
passed.append( [n, oo, i, r, g, l ] )
else:
didnt.append( [n, oo, i, r, g, l ] )
except:
pass
columns = ['name', 'goo','canvas_id','role','grade','last_activity']
pp = pd.DataFrame(passed, columns=columns)
pp.sort_values(by='last_activity',inplace=True)
pp.to_csv(passers_filename, index=False)
dd = pd.DataFrame(didnt, columns=columns)
dd.sort_values(by='last_activity',inplace=True)
dd.to_csv(still_active_filename, index=False)
print("Saved output to \n - passed: %s\n - not passed: %s\n" % (passers_filename, still_active_filename))
return (passed,didnt)
"""
# Gott 1A
"""course = '2908'
quiz = '15250'
pass_grade = 0.90
path = url + '/api/v1/courses/%s/quizzes/%s/submissions' % (course,quiz)
q_subs = fetch_collapse(path, 'quiz_submissions')
for Q in q_subs:
prct = float_or_zero(Q['score']) / float_or_zero( Q['quiz_points_possible'] )
print( 'Passed: %s\t Score: %s,\tUser: %s' % \
( str(prct>0.9), str(int_or_zero(Q['score'])), Q['user_id'] ))"""
# Who, in a class and a quiz, passed?
def get_quiz_passers():
# Gott 1 Bootcamp
course = '1561'
path = url + '/api/v1/courses/%s/enrollments' % course
enrl = fetch( path, 0)
min_passing = 85
passed = []
didnt = []
for E in enrl:
try:
n = E['user']['name']
i = E['user_id']
r = E['role']
g = E['grades']['current_score']
l = E['last_activity_at']
p = float_or_zero(g) > min_passing
print( "%s: a %s, grade of %s. Passed? %s. Last seen: %s" % (n,r,str(g),str(p),l) )
if p:
passed.append( [n, i, r, g, l ] )
else:
didnt.append( [n, i, r, g, l ] )
except:
pass
columns = ['name','canvas_id','role','grade','last_activity']
pp = pd.DataFrame(passed, columns=columns)
pp.sort_values(by='last_activity',inplace=True)
pp.to_csv('cache/teacherdata/bootcamp_passed.csv', index=False)
dd = pd.DataFrame(didnt, columns=columns)
dd.sort_values(by='last_activity',inplace=True)
dd.to_csv('cache/teacherdata/bootcamp_active.csv', index=False)
print("Saved output to ./teachers/bootcamp_*")
# Gott 1A
"""course = '2908'
quiz = '15250'
pass_grade = 0.90
path = url + '/api/v1/courses/%s/quizzes/%s/submissions' % (course,quiz)
q_subs = fetch_collapse(path, 'quiz_submissions')
for Q in q_subs:
prct = float_or_zero(Q['score']) / float_or_zero( Q['quiz_points_possible'] )
print( 'Passed: %s\t Score: %s,\tUser: %s' % \
( str(prct>0.9), str(int_or_zero(Q['score'])), Q['user_id'] ))"""
# Change courses to show 2 announcements
def change_course_ann_homepage(id="10458"):
u = url + "/api/v1/courses/%s/settings" % id
data = { 'show_announcements_on_home_page':'true', \
'home_page_announcement_limit':'2'}
r = requests.put(u, data=data, headers=header)
print(r.text)
# All students enrolled in a class in the given semester. Simpler verson of below. Return SET of course_ids.
def users_in_semester():
all_c = getCoursesInTerm('65',0,0) # fall 2020 TODO
all_s = set()
for c in all_c:
for u in course_enrollment(c['id']).values():
if u['type'] != "StudentEnrollment": continue
all_s.add(u['id'])
return all_s
#
# All students (and faculty) in STEM (or any list of depts.. match the course_code). Return SET of canvas ids.
def users_in_by_depts_live(depts=[], termid='181'):
courses_by_by_dept = {}
students_by_by_dept = {}
all_c = getCoursesInTerm(termid,1,0)
codecs.open('cache/courses_in_term_%s.json' % termid,'w','utf-8').write( json.dumps(all_c,indent=2) )
for c in all_c:
#print(c['course_code'])
for d in depts:
#print("Dept: %s" % d)
match = re.search('^(%s)' % d, c['course_code'])
if match:
if d == "STAT" and match.group() == "STAT":
print("STAT")
else:
continue
print("Getting enrollments for %s" % c['course_code'])
if d in courses_by_by_dept: courses_by_by_dept[d].append(c)
else: courses_by_by_dept[d] = [ c, ]
for u in course_enrollment_with_faculty(c['id'],0).values():
#if u['type'] != "StudentEnrollment": continue
if not (d in students_by_by_dept):
students_by_by_dept[d] = set()
students_by_by_dept[d].add(u['user_id'])
continue
print(students_by_by_dept)
codecs.open('cache/students_by_by_dept_in_term_%s.json' % termid,'w','utf-8').write( str(students_by_by_dept) )
all_students = set()
for dd in students_by_by_dept.values(): all_students.update(dd)
codecs.open('cache/all_students_in_by_depts_in_term_%s.json' % termid,'w','utf-8').write( str(all_students) )
return all_students
# Course enrollment, including teachers
def course_enrollment_with_faculty(id='', verbose=0):
if verbose: print("Getting enrollments for course id %s" % str(id))
if not id:
id = input('Course id? ')
t = url + '/api/v1/courses/%s/enrollments' % str(id)
if verbose: print(t)
emts = fetch(t,verbose)
if verbose: print(emts)
emt_by_id = {}
for E in emts:
if verbose: print(E)
try:
emt_by_id[E['user_id']] = E
except Exception as exp:
print("Skipped [%s] with this exception: %s" % (str(E), str(exp)))
ff = codecs.open('cache/courses/%s.json' % str(id), 'w', 'utf-8')
ff.write(json.dumps(emt_by_id, indent=2))
if verbose: print( " %i results" % len(emts) )
return emt_by_id
# Course enrollment list, students only
def course_enrollment(id='', verbose=0):
if verbose: print("Getting enrollments for course id %s" % str(id))
if not id:
id = input('Course id? ')
t = url + '/api/v1/courses/%s/enrollments?role[]=StudentEnrollment' % str(id)
if verbose: print(t)
emts = fetch(t,verbose)
if verbose: print(emts)
emt_by_id = {}
for E in emts:
if verbose: print(E)
try:
emt_by_id[E['user_id']] = E
except Exception as exp:
print("Skipped [%s] with this exception: %s" % (str(E), str(exp)))
ff = codecs.open('cache/courses/%s.json' % str(id), 'w', 'utf-8')
ff.write(json.dumps(emt_by_id, indent=2))
if verbose: print( " %i results" % len(emts) )
return emt_by_id
def askForTerms():
user_input = input("The term id? (separate multiples with commas) ")
return user_input.split(",")
"""
names = []
if not term:
s = url + '/api/v1/accounts/1/terms?workflow_state[]=all'
s = fetch_collapse(s,"enrollment_terms",1)
print(json.dumps(s,indent=2))
print("Terms: ")
for u in s:
print(str(u['id']) + "\t" + u['name'])
#print json.dumps(results_by_dept,indent=2)
term = input("The term id? ")
"""
# Return a list of term names and IDs. Also store in cache/courses/terms.txt
def getTerms(printme=1, ask=1):
s = url + '/api/v1/accounts/1/terms' #?workflow_state[]=all'
terms = fetch_collapse(s,'enrollment_terms')
ff = codecs.open('cache/courses/terms.txt', 'w', 'utf-8') # TODO unsafe overwrite
#print(terms)
ff.write(json.dumps(terms, indent=2))
ff.close()
if printme:
print("Terms: ")
for u in terms:
print(str(u['id']) + "\t" + u['name'])
if ask:
return input("The term id? ")
return terms
def getCourses(x=0): # a dict
if not x:
user_input = input("The Course IDs to get? (separate with spaces: ")
courselist = list(map(int, user_input.split()))
else:
courselist = [x, ]
for id in courselist:
t = url + '/api/v1/courses/' + str(id) # + '?perpage=100'
t = fetch(t,0)
#print(t)
return t
def update_course_conclude(courseid="13590",enddate='2021-12-23T01:00Z'):
(connection,cursor) = db()
q = "SELECT * FROM courses AS c WHERE c.code LIKE '%FA21%' AND c.conclude='2021-08-29 07:00:00.000'"
result = cursor.execute(q)
for R in result:
try:
#print(R)
print('doing course: %s' % R[6])
courseid = R[1]
#d = getCourses(courseid)
#print("\tconclude on: %s" % d['end_at'])
data = { 'course[end_at]': enddate }
t = url + '/api/v1/courses/' + str(courseid)
r3 = requests.put(t, headers=header, params=data)
#print(" " + r3.text)
except Exception as e:
print('****%s' % str(e))
# Relevant stuff trying to see if its even being used or not
def course_term_summary_local(term="180",term_label="FA23"):
O = "\t
Course: %s
Status: %s
Teacher: %s
Number students: %s\n"
courses = get_courses_in_term_local(term)
oo = codecs.open(f'cache/semester_summary_{term_label}.html','w','utf-8')
oo.write('\n')
for C in sorted(courses):
style = ''
info = course_quick_stats(C[3])
sinfo = course_student_stats(C[3])
D = list(C)
D.append(info)
D.append(sinfo)
#print(D)
if D[6][0][0] == 0: continue
if D[2] == 'claimed': style="a"
mystr = O % ( "https://ilearn.gavilan.edu/courses/"+str(D[3]), style, D[1], D[2], str(', '.join(D[5])), str(D[6][0][0]))
print(D[1])
oo.write(mystr )
oo.flush()
#print(info)
oo.write('\n
\n')
# Fetch all courses in a given term
def getCoursesInTerm(term=0,get_fresh=1,show=1,active=0): # a list
if not term:
term = getTerms(1,1)
ff = 'cache/courses_in_term_%s.json' % str(term)
if not get_fresh:
if os.path.isfile(ff):
return json.loads( codecs.open(ff,'r','utf-8').read() )
else:
print(" -> couldn't find cached classes at: %s" % ff)
# https://gavilan.instructure.com:443/api/v1/accounts/1/courses?published=true&enrollment_term_id=11
names = []
if active:
active = "published=true&"
else:
active = ""
t = f"{url}/api/v1/accounts/1/courses?{active}enrollment_term_id={term}"
results = fetch(t,show)
if show:
for R in results:
try:
print(str(R['id']) + "\t" + R['name'])
except Exception as e:
print("Caused a problem: ")
print(R)
#print json.dumps(results,indent=2)
info = []
for a in results:
names.append(a['name'])
info.append( [a['id'], a['name'], a['workflow_state'] ] )
if show: print_table(info)
codecs.open(ff, 'w', 'utf-8').write(json.dumps(results,indent=2))
return results
def getCoursesTermSearch(term=0,search='',v=0):
term = term or input("term id? ")
search = search or input("What to search for? ")
s = url + '/api/v1/accounts/1/courses?enrollment_term_id=%s&search_term=%s' % ( str(term) , search )
if v: print(s)
courses = fetch(s)
if v: print(json.dumps(courses,indent=2))
return courses
def courseLineSummary(c,sections={}):
ss = "\t"
crn = "\t"
host = ""
if 'crn' in c:
crn = "crn: %s\t" % c['crn']
if c['id'] in sections:
ss = "section: %s\t" % str(sections[c['id']])
if 'host' in c:
host = "send to crn: %s\t" % c['host']
out = "%i\t%s%s%s%s" % (c['id'], ss ,crn, host, c['name'])
return out
def xlistLineSummary(c,sections={}):
# can_id incoming_sec_id crn name
new_sec = "missing"
if 'partner' in c and 'sectionid' in c['partner']:
new_sec = c['partner']['sectionid']
out = "can_id:%i\t new_sec_id:%s\t crn:%s\t %s" % (c['id'], new_sec ,c['crn'], c['name'])
return out
def numbers_in_common(L):
# how many leading numbers do the strings in L share?
for i in [0,1,2,3,4]:
number = L[0][i]
for s in L:
#print("%s -> %s" % (number,s[i]))
if s[i] != number: return i
return 5
def combined_name(nic,L):
# string with prettier section numbers combined
if len(L) < 2:
return L[0]
return "/".join(L)
# old method of trying to shorten section numbers
if nic < 2:
return "/".join(L)
L_mod = [ x[nic:6] for x in L]
L_mod[0] = L[0]
new_name = "/".join(L_mod)
#print(nic, " ", L_mod)
return new_name
def all_equal2(iterator):
return len(set(iterator)) <= 1
def semester_cross_lister():
tt = find_term( input("term? (ex: fa25) ") )
if not tt or (not 'canvas_term_id' in tt) or (not 'code' in tt):
print(f"Couldn't find term.")
return
term = tt['canvas_term_id']
sem = tt['code']
xlist_filename = f"cache/{sem}_crosslist.csv"
checkfile = codecs.open('cache/xlist_check.html','w','utf-8')
checkfile.write('\n')
xlistfile = codecs.open(xlist_filename,'r','utf-8').readlines()[1:]
by_section = {}
by_group = defaultdict( list )
crn_to_canvasid = {}
crn_to_canvasname = {}
crn_to_canvascode = {}
get_fresh = 1
c = getCoursesInTerm(term,get_fresh,0)
for C in c:
if 'sis_course_id' in C and C['sis_course_id']:
crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id'])
crn_to_canvasname[C['sis_course_id'][7:13]] = str(C['name'])
crn_to_canvascode[C['sis_course_id'][7:13]] = str(C['course_code'])
# "Term","PrtTerm","xlstGroup","Subject","CrseNo","EffectCrseTitle","CRN","Session","SecSchdType","AttnMeth","MtgSchdType","MtgType","MaxEnroll","TotalEnroll","SeatsAvail","Bldg","Room","Units","LecHrs","LabHrs","HrsPerDay","HrsPerWk","TotalHrs","Days","D/E","Wks","BegTime","EndTime","StartDate","EndDate","LastName","FirstName","PercentResp"
for xc in xlistfile:
parts = xc.split(r',')
course = parts[2] + " " + parts[3]
group = parts[1]
crn = parts[5]
if crn in crn_to_canvasid:
cid = crn_to_canvasid[crn]
oldname = crn_to_canvasname[crn]
oldcode = crn_to_canvascode[crn]
else:
print("! Not seeing crn %s in canvas semester" % crn)
cid = ''
oldname = ''
oldcode = ''
if crn in by_section: continue
by_section[crn] = [crn, course, group, cid, oldname, oldcode]
by_group[group].append( [crn, course, group, cid, oldname, oldcode] )
for x in by_section.values():
print(x)
href = '%s' % ('https://ilearn.gavilan.edu/courses/'+x[3]+'/settings#tab-details', x[3])
checkfile.write('| %s | %s | %s | %s |
' % (x[0],x[2],x[1],href) )
checkfile.write('
')
print("GROUPS")
for y in by_group.keys():
sects = [ z[0] for z in by_group[y] ]
sects.sort()
nic = numbers_in_common(sects)
new_sec = combined_name(nic,sects)
# same dept?
depts_list = [ z[1].split(' ')[0] for z in by_group[y] ]
nums_list = list(set([ z[1].split(' ')[1] for z in by_group[y] ]))
if all_equal2(depts_list):
depts = depts_list[0]
nums_list.sort()
nums = '/'.join(nums_list)
else:
depts = list(set(depts_list))
depts.sort()
depts = '/'.join(depts )
nums = by_group[y][0][1].split(' ')[1]
new_name = f"{depts}{nums} {' '.join(by_group[y][0][4].split(' ')[1:-1])} {new_sec}"
#new_name = by_group[y][0][4][0:-5] + new_sec
new_code = f"{depts}{nums} {sem.upper()} {new_sec}"
#new_code = by_group[y][0][5][0:-5] + new_sec
print(y)
print("\t", sects)
#print("\tThey share %i leading numbers" % nic)
print("\t", by_group[y])
host_id = by_group[y][0][3]
sections = by_group[y][1:]
for target_section in sections:
xlist_ii(target_section[3],host_id,new_name,new_code)
#pass
def do_manual_xlist():
infile = [ x.strip() for x in open('cache/sp25_manual_crosslist.txt','r').readlines() ]
for L in infile:
print(L)
paraL,host = L.split(' -> ')
para_list = paraL.split(',')
print(host)
print(para_list)
xlist(host, para_list)
def ez_xlist():
host = int(input('what is the host id? '))
parasite = input('what are parasite ids? (separate with commas) ')
parasite = [ int(x) for x in parasite.split(',') ]
xlist(host,parasite)
# Crosslist given 2 ids, computing the new name and code
def xlist(host_id, parasite_list):
host_info = course_from_id(host_id)
if not host_info:
print(f"Couldn't find course id {host_id} in database. Do you need to update it?")
return ""
host_info['crn'] = host_info['sis_source_id'][7:]
host_info['dept'] = dept_from_name( host_info['course_code'] )
host_info['num'] = num_from_name(host_info['course_code'] )
host_info['bare_name'] = ' '.join(host_info['name'].split(' ')[1:-1]) # name without course code or crn
sem = host_info['course_code'].split(' ')[1]
para_info_list = [ course_from_id(x) for x in parasite_list ]
for p in para_info_list:
if not p:
print(f"Couldn't find course id for parasite in database. Do you need to update it?")
return ""
p['crn'] = p['sis_source_id'][7:]
p['dept'] = dept_from_name(p['course_code'] )
p['num'] = num_from_name(p['course_code'] )
p['bare_name'] = ' '.join(p['name'].split(' ')[1:-1]) # name without course code or crn
all = para_info_list.copy()
all.append(host_info)
# determine new name and code
sects = [ z['crn'] for z in all ]
sects.sort()
nic = numbers_in_common(sects)
new_sec = combined_name(nic,sects)
# same dept?
depts_list = [ z['dept'] for z in all ]
nums_list = list(set([ z['num'] for z in all ]))
if all_equal2(depts_list):
depts = depts_list[0]
nums_list.sort()
nums = '/'.join(nums_list)
else:
depts = list(set(depts_list))
depts.sort()
depts = '/'.join(depts )
nums = all[0]['num']
new_name = f"{depts}{nums} {all[0]['bare_name']} {new_sec}"
#new_name = by_group[y][0][4][0:-5] + new_sec
new_code = f"{depts}{nums} {sem.upper()} {new_sec}"
#new_code = by_group[y][0][5][0:-5] + new_sec
print(f"New name: {new_name}")
print(f"New code: {new_code}")
print(sects)
for target_section in para_info_list:
xlist_ii(target_section['id'],host_id,new_name,new_code)
# Perform an actual cross-list, given 2 id numbers, new name and code
def xlist_ii(parasite_id,host_id,new_name,new_code):
print("Parasite id: ",parasite_id," Host id: ", host_id)
print("New name: ", new_name)
print("New code: ", new_code)
xyz = 'y'
#xyz = input("Perform cross list? Enter y for yes, n for no: ")
if xyz != 'n':
try:
uu = url + '/api/v1/courses/%s/sections' % parasite_id
c_sect = fetch(uu)
#print(json.dumps(c_sect,indent=2))
if len(c_sect) > 1:
print("* * * * Already Crosslisted!!")
return
if not c_sect:
print("* * * * Already Crosslisted!!")
return
else:
parasite_sxn_id = str(c_sect[0]['id'])
print("Parasite section id: ", parasite_sxn_id)
u = url + "/api/v1/sections/%s/crosslist/%s" % (parasite_sxn_id,host_id)
print(u)
res = requests.post(u, headers = header)
print(res.text)
u3 = url + "/api/v1/courses/%s" % host_id
data = {'course[name]': new_name, 'course[course_code]': new_code}
print(data)
print(u3)
r3 = requests.put(u3, headers=header, params=data)
print(r3.text)
print("\n\n")
except Exception as e:
print(f"\n\nSome sort of failure on {new_name}: {e}")
# Relevant stuff trying to see if its even being used or not
# relies on schedule being in database
def course_term_summary():
term = find_term( input("term? (ex: fa25) ") )
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
print(f"Couldn't find term.")
return
term = term['canvas_term_id']
SEM = term['code']
print(f"Summary of {SEM}")
get_fresh = 1
courses = getCoursesInTerm(term, get_fresh, 0)
print(f"output to cache/term_summary_{term}.csv")
outp = codecs.open(f'cache/term_summary_{term}.csv','w','utf-8')
outp.write('id,name,view,type,state,sched_start,ilearn_start,sched_students,ilearn_students,num_teachers,teacher1,teacher2,teacher2\n')
for c in courses:
c_db = course_from_id(c['id'])
try:
ilearn_start = c_db['start_at']
s_db = course_sched_entry_from_id(c['id'])
except:
print(f"problem with this course: {c_db}")
continue
sched_start = ''
sched_students = ''
type = ''
if (s_db):
sched_start = s_db['start']
sched_students =s_db['act']
type = s_db['type']
#print(s_db)
num_students = student_count(c['id'])
tchr = teacher_list(c['id'])
tt = ','.join([x[1] for x in tchr])
line = f"{c['id']},{c['course_code']},{c['default_view']},{type},{c['workflow_state']},{sched_start},{ilearn_start},{sched_students},{num_students},{len(tchr)},{tt}"
print(line)
outp.write(line + "\n")
return
tup = tuple("id course_code default_view workflow_state".split(" "))
smaller = [ funcy.project(x , tup) for x in courses ]
#print(json.dumps(smaller, indent=2))
by_code = {}
(connection,cursor) = db()
(pub, not_pub) = funcy.split( lambda x: x['workflow_state'] == "available", smaller)
for S in smaller:
print(S)
by_code[ S['course_code'] ] = str(S) + "\n"
outp.write( str(S) + "\n" )
q = """SELECT c.id AS courseid, c.code, tt.name, c.state, COUNT(u.id) AS student_count FROM courses AS c
JOIN enrollment AS e ON e.course_id=c.id
JOIN users AS u ON u.id=e.user_id
JOIN ( SELECT c.id AS courseid, u.id AS userid, c.code, u.name FROM courses AS c
JOIN enrollment AS e ON e.course_id=c.id
JOIN users AS u ON u.id=e.user_id
WHERE c.canvasid=%s
AND e."type"="TeacherEnrollment" ) AS tt ON c.id=tt.courseid
WHERE c.canvasid=%s
AND e."type"="StudentEnrollment"
GROUP BY c.code ORDER BY c.state, c.code""" % (S['id'],S['id'])
result = cursor.execute(q)
for R in result:
print(R)
by_code[ S['course_code'] ] += str(R) + "\n"
outp.write( str(R) + "\n\n" )
pages = fetch(url + "/api/v1/courses/%s/pages" % S['id'])
by_code[ S['course_code'] ] += json.dumps(pages, indent=2) + "\n\n"
modules = fetch(url + "/api/v1/courses/%s/modules" % S['id'])
by_code[ S['course_code'] ] += json.dumps(modules, indent=2) + "\n\n"
print()
out2 = codecs.open('cache/summary2.txt','w', 'utf-8')
for K in sorted(by_code.keys()):
out2.write('\n------ ' + K + '\n' + by_code[K])
out2.flush()
return
#published = list(funcy.where( smaller, workflow_state="available" ))
#notpub = list(filter( lambda x: x['workflow_state'] != "available", smaller))
notpub_ids = [ x['id'] for x in notpub ]
#for ix in notpub_ids:
# # print(course_quick_stats(ix))
outp.write(json.dumps(courses, indent=2))
outp2 = codecs.open('cache/term_summary_pub.txt','w','utf-8')
outp2.write("PUBLISHED\n\n" + json.dumps(published, indent=2))
outp2.write("\n\n---------\nNOT PUBLISHED\n\n" + json.dumps(notpub, indent=2))
def course_term_summary_2():
lines = codecs.open('cache/term_summary.txt','r','utf-8').readlines()
output = codecs.open('cache/term_summary.html','w','utf-8')
for L in lines:
try:
L = L.strip()
print(L)
if re.search('unpublished',L):
m = re.search(r"'id': (\d+),",L)
m2 = re.search(r"'course_code': '(.+?)',",L)
if m:
ss = "
Course: %s
" % ("https://ilearn.gavilan.edu/courses/"+str(m.group(1)), m2.group(1))
output.write( ss )
print(ss+"\n")
except Exception as e:
print(e)
# check number of students and publish state of all shells in a term
'''
def all_semester_course_sanity_check():
term = "su25"
target_start = "6-14"
outputfile = f'cache/courses_checker_{term}.csv'
t = 288
c = getCoursesInTerm(t,1,0)
sched1 = requests.get(f"http://gavilan.cc/schedule/{term}_sched_expanded.json").json()
sched = { x['crn']: x for x in sched1 }
#codecs.open('cache/courses_in_term_{t}.json','w','utf-8').write(json.dumps(c,indent=2))
#output = codecs.open('cache/courses_w_sections.csv','w','utf-8')
#output.write( ",".join(['what','id','parent_course_id','sis_course_id','name']) + "\n" )
output2 = codecs.open(outputfile,'w','utf-8')
output2.write( ",".join(['id','sis_course_id','name','state','mode','startdate','students']) + "\n" )
htmlout = codecs.open(f'cache/courses_checker_{term}.html','w','utf-8')
htmlout.write('\n')
htmlout.write(f'| Name | SIS ID | State | Mode | Start Date | # Stu |
\n')
html_sections = []
i = 0
for course in c:
try:
u2 = url + '/api/v1/courses/%s?include[]=total_students' % str(course['id'])
course['info'] = fetch(u2)
# correlate to schedule
crn = course['sis_course_id'][7:]
ctype = '?'
cstart = '?'
ts = '?'
if crn in sched:
ctype = sched[crn]['type']
cstart = sched[crn]['start']
ts = sched[crn]['act']
teacher = sched[crn]['teacher']
info = [ 'course', course['id'], '', course['sis_course_id'], course['name'], course['workflow_state'], ts ]
info = list(map(str,info))
info2 = [ course['id'], course['sis_course_id'], course['name'], course['workflow_state'], ctype, cstart, ts, teacher ]
info2 = list(map(str,info2))
output2.write( ",".join(info2) + "\n" )
output2.flush()
print(info2)
#output.write( ",".join(info) + "\n" )
uu = f"https://ilearn.gavilan.edu/courses/{course['id']}"
if course["workflow_state"]=='unpublished' and ctype=='online' and cstart==target_start:
html_sections.append(f'| {course["name"]} | {course["sis_course_id"]} | {course["workflow_state"]} | {ctype} | {cstart} | {ts} | {teacher} |
\n')
#uu = url + '/api/v1/courses/%s/sections' % str(course['id'])
#course['sections'] = fetch(uu)
#s_info = [ [ 'section', y['id'], y['course_id'], y['sis_course_id'], y['name'], y['total_students'] ] for y in course['sections'] ]
#for row in s_info:
# print(row)
# output.write( ",".join( map(str,row) ) + "\n" )
#output.flush()
i += 1
#if i % 5 == 0:
# codecs.open('cache/courses_w_sections.json','w','utf-8').write(json.dumps(c,indent=2))
except Exception as e:
print(f"error on {course}")
print(f"{e}")
#codecs.open('cache/courses_w_sections.json','w','utf-8').write(json.dumps(c,indent=2))
html_sections.sort()
for h in html_sections:
htmlout.write(h)
htmlout.write('
\n')
print(f"wrote to {outputfile}")
'''
def eslCrosslister():
fives = []
sevens = []
others = []
course_by_crn = {}
sections = {}
combos = [ [y.strip() for y in x.split(',') ] for x in open('cache/xcombos.txt','r').readlines() ]
combo_checklist = [ 0 for i in range(len(combos)) ]
#print("\n\nCombos:")
#[ print("%s - %s" % (x[0],x[1])) for x in combos]
#return
courses = getCoursesTermSearch(62,"ESL",0)
for C in courses:
ma = re.search( r'(\d{5})', C['name'])
if ma:
#print("Found Section: %s from course %s" % (ma.group(1), C['name']))
C['crn'] = ma.group(1)
course_by_crn[C['crn']] = C
if C['name'].startswith("ESL5"): fives.append(C)
elif C['name'].startswith("ESL7"): sevens.append(C)
else: others.append(C)
for S in sevens:
uu = url + '/api/v1/courses/%i/sections' % S['id']
#print(uu)
c_sect = fetch(uu)
print(".",end='')
#print(json.dumps(c_sect,indent=2))
if len(c_sect) > 1:
print("* * * * Already Crosslisted!!")
if c_sect:
sections[ S['id'] ] = c_sect[0]['id']
S['sectionid'] = c_sect[0]['id']
if S['crn']:
for i,co in enumerate(combos):
if S['crn'] == co[0]:
S['partner'] = co[1]
combo_checklist[i] = 1
course_by_crn[co[1]]['partner'] = S
elif S['crn'] == co[1]:
S['partner'] = co[0]
combo_checklist[i] = 1
course_by_crn[co[0]]['partner'] = S
print("Others:")
for F in sorted(others, key=lambda x: x['name']):
print(courseLineSummary(F))
print("\n\nFive hundreds")
for F in sorted(fives, key=lambda x: x['name']):
print(courseLineSummary(F))
print("\n\nSeven hundreds")
for F in sorted(sevens, key=lambda x: x['name']):
print(courseLineSummary(F,sections))
print("\n\nMake a x-list: ")
for F in sorted(fives, key=lambda x: x['name']):
if 'partner' in F:
print(xlistLineSummary(F,sections))
if 'partner' in F and 'sectionid' in F['partner']:
if not input('ready to crosslist. Are you? Enter "q" to quit. ') == 'q':
xlist( F['partner']['sectionid'], F['id'] )
else:
break
for i,c in enumerate(combo_checklist):
if not c:
print("Didn't catch: "+ str(combos[i]))
def xlist_iii(parasite='', host=''): # section id , new course id
host = host or input("ID number of the HOSTING COURSE? ")
if not parasite:
parasite = input("ID number of the SECTION to add to above? (or 'q' to quit) ")
while parasite != 'q':
#h_sections = fetch( url + "/api/v1/courses/%s/sections" % str(host))
#print(h_sections)
p_sections = fetch( url + "/api/v1/courses/%s/sections" % str(parasite))
#print(p_sections)
parasite_section = p_sections[0]['id']
# TODO need to get the section id from each course:
# GET /api/v1/courses/:course_id/sections
# POST /api/v1/sections/:id/crosslist/:new_course_id
# SECTION ID (to move) NEW __COURSE__ ID
u = url + "/api/v1/sections/%s/crosslist/%s" % (str(parasite_section),str(host))
print(u)
res = requests.post(u, headers = header)
print(res.text)
parasite = input("ID number of the SECTION to add to above? ")
def unenroll_student(courseid,enrolid):
t = url + "/api/v1/courses/%s/enrollments/%s" % ( str(courseid), str(enrolid) )
data = {"task": "delete" }
r4 = requests.delete(t, headers=header, params=data)
print(data)
#def get_enrollments(courseid):
# t = url + "/api/v1/courses/%s/enrollments?type=StudentEnrollment" % courseid
# return fetch(t,1)
def enroll_id_list_to_shell(id_list, shell_id, v=0):
# id list has pairs, [id,name]
id_list = set([i[0] for i in id_list])
existing = course_enrollment(shell_id) # by user_id
existing_ids = set( [ x['user_id'] for x in existing.values() ])
if v: print("To Enroll: %s" % str(id_list))
if v: print(r"\n\Already Enrolled: %s" % str(existing_ids))
enroll_us = id_list.difference(existing_ids)
if v: print("\n\nTO ENROLL %s" % str(enroll_us))
(connection,cursor) = db()
for j in enroll_us:
try:
q = "SELECT name,id FROM canvas.users u WHERE u.id=%s" % j
cursor.execute(q)
s = cursor.fetchall()
if s:
s = s[0]
print("Enrolling: %s" % s[0])
t = url + '/api/v1/courses/%s/enrollments' % shell_id
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
'enrollment[enrollment_state]': 'active' }
r3 = requests.post(t, headers=header, params=data)
#print(r3.text)
time.sleep(0.600)
except Exception as e:
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
# multiple semesters
def enroll_stem_students_live():
semesters = [289]
for S in semesters:
enroll_stem_students_live_semester(S)
def enroll_stem_students_live_semester(the_term, do_removes=0):
import localcache2
depts = "MATH BIO CHEM CSIS PHYS PSCI GEOG ASTR ECOL ENVS ENGR STAT".split(" ")
users_to_enroll = users_in_by_depts_live(depts, the_term) # term id
stem_enrollments = course_enrollment_with_faculty(stem_course_id) # by user_id
users_in_stem_shell = set( [ x['user_id'] for x in stem_enrollments.values() ])
print("ALL STEM STUDENTS %s" % str(users_to_enroll))
print("\n\nALREADY IN STEM SHELL %s" % str(users_in_stem_shell))
enroll_us = users_to_enroll.difference(users_in_stem_shell)
remove_us = users_in_stem_shell.difference(users_to_enroll)
print("\n\nTO ENROLL %s" % str(enroll_us))
(connection,cursor) = localcache2.db()
#xyz = input('enter to continue')
eee = 0
uuu = 0
if do_removes:
print("\n\nTO REMOVE %s" % str(remove_us))
for j in remove_us:
try:
q = "SELECT name,id FROM canvas.users WHERE id=%s" % j
cursor.execute(q)
s = cursor.fetchall()
if s:
s = s[0]
print("Removing: %s" % s[0])
r1 = unenroll_student(str(stem_course_id), stem_enrollments[j]['id'])
print(r1)
uuu += 1
time.sleep(0.600)
except Exception as e:
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
for j in enroll_us:
try:
q = "SELECT name,id FROM canvas.users WHERE id=%s" % j
cursor.execute(q)
s = cursor.fetchall()
if s:
s = s[0]
print("Enrolling: %s" % s[0])
enrollment = { }
#print(s)
t = url + '/api/v1/courses/%s/enrollments' % stem_course_id
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
'enrollment[enrollment_state]': 'active' }
#print(data)
#if input('enter to enroll %s or q to quit: ' % s[0]) == 'q':
#break
r3 = requests.post(t, headers=header, params=data)
print(data)
eee += 0
time.sleep(0.600)
except Exception as e:
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
#print(r3.text)
print("\n\nTO ENROLL %s" % str(enroll_us))
#print("\n\nTO REMOVE %s" % str(remove_us))
return (eee,uuu)
###########################
def enroll_bulk_students_bydept(course_id, depts, the_term="172", cautious=1): # a string, a list of strings
users_to_enroll = users_in_by_depts_live(depts, the_term) # term id
targeted_enrollments = course_enrollment(course_id) # by user_id.. (live, uses api)
current_enrollments = set( [ x['user_id'] for x in targeted_enrollments.values() ])
print("ALL TARGET STUDENTS %s" % str(users_to_enroll))
print("\nALREADY IN SHELL %s" % str(current_enrollments))
enroll_us = users_to_enroll.difference(current_enrollments)
remove_us = current_enrollments.difference(users_to_enroll)
print("\n\nTO ENROLL %s" % str(enroll_us))
xyz = input('enter to continue')
print("\n\nTO REMOVE %s" % str(remove_us))
(connection,cursor) = db()
for j in remove_us:
try:
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
cursor.execute(q)
s = cursor.fetchall()
if s:
s = s[0]
print("Removing: %s" % s[0])
## TODO not done here
# r1 = unenroll_student(str(course_id), stem_enrollments[j]['id'])
#print(r1)
time.sleep(0.600)
except Exception as e:
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
for j in enroll_us:
try:
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
cursor.execute(q)
s = cursor.fetchall()
if s:
s = s[0]
print("Enrolling: %s" % s[0])
enrollment = { }
#print(s)
t = url + '/api/v1/courses/%s/enrollments' % course_id
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
'enrollment[enrollment_state]': 'active' }
if cautious:
print(t)
print(data)
prompt = input('enter to enroll %s, k to go ahead with everyone, or q to quit: ' % s[0])
if prompt == 'q':
break
elif prompt == 'k':
cautious = 0
r3 = requests.post(t, headers=header, params=data)
if cautious:
print(data)
time.sleep(0.600)
except Exception as e:
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
#print(r3.text)
def enroll_gott_workshops():
# stupid gav tls broken
r = requests.get("https://www.gavilan.edu/staff/tlc/signups.php")
text = r.text
# Regex to extract the JSON object
match = re.search(r"var\s+signups\s*=\s*(\[\{.*?\}\]);", text, re.DOTALL)
if match:
json_str = match.group(1) # Extract the JSON string
try:
signups = json.loads(json_str) # Convert to Python list of dicts
# Normalize NBSP and spaces in key fields to make title/date matching robust
def _norm(v):
try:
return str(v).replace('\xa0',' ').strip()
except Exception:
return v
for s in signups:
if isinstance(s, dict):
if 'training' in s and s['training'] is not None:
s['training'] = _norm(s['training'])
if 'date_rsvp' in s and s['date_rsvp'] is not None:
s['date_rsvp'] = _norm(s['date_rsvp'])
#print(json.dumps(signups,indent=2))
except json.JSONDecodeError as e:
print("Error decoding JSON:", e)
return
else:
print("JSON object not found")
return
#signups = json.loads(r.text)
#signups = json.loads(codecs.open('cache/signups.json','r','utf-8').read())
# update w/ users.py #1
all_staff = json.loads(codecs.open('cache/ilearn_staff.json','r','utf-8').read())
by_email = { x['email'].lower():x for x in all_staff }
#print(by_email.keys())
workshop_ids = [
#'GOTT 2: Intro to Async Online Teaching and Learning2023-07-09 17:00:00': 17992,
#'GOTT 4: Assessment in Digital Learning2023-07-09 17:00:00': 17995,
#'Restricted to STEM faculty. Humanizing (STEM) Online Learning 2023-06-18 17:00:00': 17996,
#'GOTT 6: Online Live Teaching and Learning2023-06-11 17:00:00': 17986,
#'GOTT 5: Essentials of Blended Learning2023-06-25 17:00:00': 17987,
#'GOTT 5: Essentials of Blended Learning (HyFlex)2023-06-25 17:00:00': 17987,
#'GOTT 1: Intro to Teaching Online with Canvas2023-05-29 17:00:00': 17985,
#'GOTT 1: Intro to Teaching Online with Canvas2023-08-20 17:00:00': 17994
#'GOTT 1: Intro to Online Teaching2024-01-02 16:00:00': 19278,
#'GOTT 2: Intro to Asynchronous Teaching and Learning2024-01-02 16:00:00': 19222,
#'GOTT 5: Essentials of Blended Learning2024-01-02 16:00:00': 19223,
#'GOTT 6: Intro to Live Online Teaching and Learning2024-01-14 16:00:00': 19224,
#'5/28-6/9 GOTT 1: Intro to Teaching Online 2024-05-28 12:00:00': 20567,
#'5/28-6/21 GOTT 2: Introduction to Asynchronous Teaching and Design2024-05-28 12:00:00': 20575,
#'GOTT 4: Assessment in Digital Learning2024-06-02 17:00:00': 20600, # 6/2
#'6/10-6/23 GOTT 5: Essentials of Blended Learning, Hyflex2024-06-10 12:00:00': 20568,
#'6/17-6/30 GOTT 6 Introduction to Live Online Teaching and Learning2024-06-17 12:00:00': 20569,
#'GOTT 1 Intro to Teaching Online AUG242024-07-29 12:00:00': 20603, # 7/29
#['2025-01-01 16:00:00 GOTT 1: Intro to Teaching Online with Canvas', 21770, 'enroll_gott1.txt'],
#['2025-01-01 16:00:00 GOTT 2: Introduction to Asynchronous Teaching and Design', 21772, 'enroll_gott2.txt']
# date, title, shell_id
#['2025-02-23 16:00:00', 'GOTT 6: Intro to Synchronous Teaching (Sync/Hyflex)', 21835],
#['2025-03-14 17:00:00', 'GOTT 5: The Essentials of Blended Learning (Hybrid) ', '21886'],
#['2025-02-23 16:00:00', 'GOTT 1: Intro to Teaching Online (2 week, async)', 21874]
#['2025-05-26 17:00:00', 'GOTT 2: Introduction to Asynchronous Teaching and Learning', 23015],
#['2025-06-01 17:00:00', 'GOTT 1: Intro to Teaching Online', 23083],
#['2025-06-01 17:00:00', 'GOTT 4: Assessments in Digital Learning', 21898],
#['2025-08-11 13:00:00', 'GOTT 1: Introduction to Online Teaching with Canvas', 23232],
#['2025-09-01 17:00:00', r'GOTT 1: Intro to Online Teaching (Canvas, Accessibility and RSI) ', 23270],
#['2025-09-14 17:00:00', r'GOTT 2: Intro to Asynchronous Online Teaching and Learning', 23290],
['2025-09-28 17:00:00', r'GOTT 6: Foundation of Teaching Live Online/Hyflex', 23311],
]
#print(json.dumps(signups,indent=4))
#print(json.dumps(by_email,indent=4))
subs = {'csalvin@gavilan.edu':'christinasalvin@gmail.com',
'karenjeansutton@gmail.com': 'ksutton@gavilan.edu',
'elisepeeren@gmail.com': 'epeeren@gavilan.edu',
'kjoyenderle@gmail.com': 'kenderle@gavilan.edu',
'flozana@gmail.com': 'flozano@gavilan.edu',
'fyarahmadi2191@gmail.com': 'fyarahmadi@gavilan.edu',
'jacquelinejeancollins@yahoo.com': 'jcollins@gavilan.edu',
'bt@gavilan.edu': 'btagg@gavilan.edu',
'tagg.brian@yahoo.com': 'btagg@gavilan.edu',
'tmiller.realestate@gmail.com': 'tmiller@gavilan.edu',
'gemayo70@yahoo.com': 'pclaros@gavilan.edu',
'csalvin@gmail.com': 'csalvin@gavilan.edu',
'efalvey@aol.com': 'efalvey@gavilan.edu',
'lorrmay36@mac.com': 'llevy@gavilan.edu',
'gkalu1@gmail.com': 'gkalu@gavilan.edu',
'rpotter@gav.edu': 'rpotter@gavilan.edu',
'ally162@qq.com': 'aao@gavilan.edu',
'davidamancio791@gmail.com': 'damancio@gavilan.edu',
'carissaamunoz83@gmail.com': 'amunoz@gavilan.edu',
'jasonwcpa@yahoo.com': 'jwolowitz@gavilan.edu',
'fam.grzan@charter.net': 'rgrzan@gavilan.edu',
'carissaadangelo@yahoo.com': 'cmunoz@gavilan.edu',
}
for each_workshop in workshop_ids:
#if wkshp not in workshop_ids:
# print(f"skipping {wkshp}")
# continue
wkshp_date, wkshp_title, wkshp_shell_id = each_workshop
# local normalizer consistent with signup cleaning
def _norm(v):
try:
return str(v).replace('\xa0',' ').strip()
except Exception:
return v
to_enroll = []
#from_file = [ L.strip().split(' - ') for L in codecs.open(f'cache/{student_list}', 'r', 'utf-8').readlines() ]
#print(from_file)
for s in signups:
if _norm(wkshp_date) == _norm(s.get('date_rsvp')) and _norm(wkshp_title) == _norm(s.get('training')):
e = s['email'].lower()
if e in subs:
e = subs[e]
print( f"{wkshp_title} {e} {s['name']}" )
if e in by_email:
user = by_email[e]
#print(f"\t{user['name']} {e} {user['login_id']}")
to_enroll.append([user['id'],user['name']])
else:
#print("** ** NOT FOUND")
pass
print(f"Workshop: {wkshp_date} {wkshp_title} \n\tEnrolling: {', '.join(i[1] for i in to_enroll)}")
enroll_id_list_to_shell(to_enroll, wkshp_shell_id)
def enroll_gnumber_list_to_courseid():
infile = codecs.open('cache/gottenrollments.txt','r','utf-8').readlines()
courseid = infile[0].strip()
glist = [ x.strip().split(',')[0] for x in infile[1:] ]
from localcache2 import user_from_goo
idlist = [user_from_goo(x)['id'] for x in glist ]
namelist = [user_from_goo(x)['name'] for x in glist ]
print(courseid)
print(glist)
print(idlist)
for i,id in enumerate(idlist):
try:
print(f"Enrolling: {id}, {namelist[i]}")
t = f"{url}/api/v1/courses/{courseid}/enrollments"
data = { 'enrollment[user_id]': id, 'enrollment[type]':'StudentEnrollment',
'enrollment[enrollment_state]': 'active' }
r3 = requests.post(t, headers=header, params=data)
print(r3.text)
time.sleep(0.600)
except Exception as e:
print(f"Something went wrong with id {id}, course {courseid}, user {namelist[i]}")
def enroll_art_students_live():
depts = "THEA ART DM MUS MCTV".split(" ")
course_id = "13717"
enroll_bulk_students_bydept(course_id,depts)
print("done.")
def enroll_orientation_students():
# For testing purposes
DO_IT = 1
import localcache2
ori_shell_id = "20862" # 2025 "19094" # 2024 # "" # 2023 orientation shell 15924 # 2022: "9768"
print("Getting users in orientation shell")
#users_in_ori_shell = set( \
# [ str(x['user_id']) for x in course_enrollment(ori_shell_id).values() ]) # api fetch
users_in_ori_shell = list(user_ids_in_shell(ori_shell_id))
# single semester
# users_to_enroll = users_new_this_semester(the_semester) ### ##### USES LOCAL DB
# double semester (SU + FA)
users_to_enroll = users_new_this_2x_semester("202550", "202570") ##### USES LOCAL DB
#print("ALL ORIENTATION STUDENTS %s" % str(users_to_enroll))
#print("\n\nALREADY IN ORI SHELL %s" % str(users_in_ori_shell))
enroll_us = users_to_enroll.difference(users_in_ori_shell)
#print("\n\nTO ENROLL %s\n" % str(enroll_us))
print(f"{len(enroll_us)} new students to enroll in Orientation shell." )
eee = 0
uuu = 0
(connection,cursor) = localcache2.db()
for j in enroll_us:
s = ""
try:
q = "SELECT name,id FROM canvas.users WHERE id=%s" % j
cursor.execute(q)
s = cursor.fetchall()
if s:
s = s[0]
print(" + Enrolling: %s" % s[0])
t = url + '/api/v1/courses/%s/enrollments' % ori_shell_id
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
'enrollment[enrollment_state]': 'active' }
#print(t)
#print(data)
if DO_IT:
r3 = requests.post(t, headers=header, params=data)
eee += 1
#print(r3.text)
time.sleep(0.250)
except Exception as e:
print(" - Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
# return (eee,uuu)
def enroll_o_s_students():
#full_reload()
(es,us) = enroll_stem_students_live()
(eo, uo) = enroll_orientation_students()
print("Enrolled %i and unenrolled %i students in STEM shell" % (es,us))
print("Enrolled %i students in Orientation shell" % eo)
def make_ztc_list(sem='sp20'):
sched = json.loads(open('output/semesters/2020spring/sp20_sched.json','r').read())
responses = open('cache/ztc_responses_sp20.csv','r').readlines()[1:]
result = open('cache/ztc_crossref.csv','w')
result.write('Course,Section,Name,Teacher,ZTC teacher\n')
ztc_by_dept = {}
for R in responses:
R = re.sub(',Yes','',R)
R = re.sub(r'\s\s+',',',R)
parts = R.split(r',') #name courselist yes
#print(parts[1])
name = parts[0]
for C in parts[1:] :
C = C.strip()
#print(C)
if C in ztc_by_dept:
ztc_by_dept[C] += ', ' + parts[0]
else:
ztc_by_dept[C] = parts[0]
print(ztc_by_dept)
for CO in sched:
#if re.match(r'CWE',CO['code']):
#print(CO)
if CO['code'] in ztc_by_dept:
print(('Possible match, ' + CO['code'] + ' ' + ztc_by_dept[CO['code']] + ' is ztc, this section taught by: ' + CO['teacher'] ))
result.write( ','.join( [CO['code'] ,CO['crn'] , CO['name'] , CO['teacher'] , ztc_by_dept[CO['code']] ]) + "\n" )
def course_search_by_sis():
term = 65
all_courses = getCoursesInTerm(term)
all = []
for course in all_courses:
#u = "/api/v1/accounts/1/courses/%s" % course_id
#i = fetch( url + u)
all.append([ course['name'], course['sis_course_id'] ])
print_table(all)
# print(json.dumps(x, indent=2))
# run overview_start_dates to get most recent info
def set_custom_start_dates():
from datetime import datetime
term = find_term( input("term? (ex: fa25) ") )
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
print(f"Couldn't find term. Try updating the saved terms list.")
return
TERM = term['canvas_term_id']
SEM = term['code']
term_start_month = int(term['begin'].split('/')[0])
term_start_day = int(term['begin'].split('/')[1])
term_start_year = '20' + term['code'][2:4]
print(f"term begins on {term_start_month}/{term_start_day}")
output_path = f"cache/overview_semester_shells_annotated{SEM}.csv"
input_path = f"cache/overview_semester_shells_{SEM}.csv"
if not os.path.exists(input_path):
print(f"file does not exist: {input_path}")
print("Run overview_start_dates first")
return
make_changes = 1
do_all = 0
get_fresh = 0
# just do certain ids in cache/changeme.txt
limit_to_specific_ids = 0
limit_to = [x.strip() for x in open('cache/changeme.txt','r').readlines()]
def adjust_shell_startdate(row):
# Placeholder stub
pass
def parse_date(date_str):
if not date_str or date_str.lower() == 'none':
return None
try:
return datetime.fromisoformat(date_str.replace("Z", "").replace("T", " "))
except ValueError:
return None
with open(input_path, newline='', encoding='utf-8') as infile, \
open(output_path, "w", newline='', encoding='utf-8') as outfile:
reader = csv.DictReader(infile)
fieldnames = reader.fieldnames + [
"ignore","is_early_start", "is_late_start", "shell_custom_start", "shell_warn_crosslist_sections"
]
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
if int(row["shell_numsections"]) == 0:
continue
sched_start = parse_date(row["sched_start"])
shell_start = parse_date(row["shell_start"])
shortname = row["shell_shortname"]
num_sections = int(row["shell_numsections"])
# Initialize new columns
row["ignore"] = ""
row["is_early_start"] = ""
row["is_late_start"] = ""
row["shell_custom_start"] = ""
row["shell_warn_crosslist_sections"] = ""
# check for cops program
department = shortname.split()[0].rstrip("0123456789") # → "JLE"
if department in ("JLE", "JFT"):
row["ignore"] = department
# Early/late start check
if sched_start:
sched_mmdd = (sched_start.month, sched_start.day)
term_mmdd = (term_start_month, term_start_day)
if sched_mmdd < term_mmdd:
row["is_early_start"] = sched_start.date().isoformat()
elif sched_mmdd > term_mmdd:
row["is_late_start"] = sched_start.date().isoformat()
# shell_start override
if shell_start:
row["shell_custom_start"] = shell_start.date().isoformat()
else:
if row["is_early_start"] or row["is_late_start"]:
adjust_shell_startdate(row)
# Crosslist check
if '/' in shortname:
parts = shortname.split()
section_part = parts[-1]
section_count = len(section_part.split('/'))
if section_count != num_sections:
row["shell_warn_crosslist_sections"] = section_part
writer.writerow(row)
return
'''
# Do we adjust the start date? Only if it doesn't match term
if d_start.month == term_start_month and d_start.day == term_start_day:
print(" Ignoring, term start date" )
continue
else:
print(" Adjust course start day?")
if make_changes:
if do_all != 'a':
do_all = input(' -> adjust? [enter] for yes, [a] to do all remaining. [n] to quit. >')
if do_all == 'n':
exit()
if do_all == '' or do_all == 'a':
data = {'course[start_at]':d_start.isoformat(), 'course[restrict_student_future_view]': True,
'course[restrict_enrollments_to_course_dates]':True }
u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{this_id}"
r3 = requests.put(u2, headers=header, params=data)
print(" updated.. OK")
'''
def overview_start_dates():
term = find_term( input("term? (ex: fa25) ") )
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
print(f"Couldn't find term.")
return
TERM = term['canvas_term_id']
SEM = term['code']
output = codecs.open(f"cache/overview_semester_shells_{SEM}.csv","w","utf-8")
get_fresh = 0
if not get_fresh:
gf = input('Fetch new list of semester courses? (y/n) ')
if gf=='y':
get_fresh = 1
# get list of online course shells
c = getCoursesInTerm(TERM,get_fresh,0)
# dict to match section numbers between shells and schedule
crn_to_canvasid = {}
for C in c:
if 'sis_course_id' in C and C['sis_course_id']:
#print( f"{C['name']} -> {C['sis_course_id'][7:13]}" )
crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id'])
else:
print( f"---NO CRN IN: {C['name']} -> {C}" )
header = f"id,shell_shortname,type,enrolled,max,sched_start,shell_start,shell_end,shell_restrict_view_dates,shell_restrict_view_dates,shell_state,shell_numstudents,shell_numsections"
output.write(header + "\n")
print("\n\n" + header)
# get course info from schedule
s = requests.get(f"https://gavilan.cc/schedule/{SEM}_sched_expanded.json").json()
for S in s:
# get dates
start = re.sub( r'\-','/', S['start']) + '/20' + SEM[2:4]
d_start = datetime.strptime(start,"%m/%d/%Y")
# try to find online shell matching this schedule entry
try:
this_id = crn_to_canvasid[S['crn']]
except Exception as e:
print(f"DIDN'T FIND CRN - {start} {d_start} - {S['code']} {S['crn']} {S['name']}" )
continue
# get more canvas course shell info
uu = f"{url}/api/v1/courses/{this_id}"
this_course = fetch(uu)
shell_start = this_course['start_at']
shell_end = this_course['end_at']
shell_restrict_view_dates = '?'
if 'access_restricted_by_date' in this_course:
shell_restrict_view_dates = this_course['access_restricted_by_date']
shell_shortname = this_course['course_code']
shell_state = this_course['workflow_state']
# get user count
ss = f"{url}/api/v1/courses/{this_id}/users"
enrollments = fetch(ss, params={"enrollment_type[]":"student"})
shell_numstudents = len(enrollments)
# get teachers
s2 = f"{url}/api/v1/courses/{this_id}/users"
teachers = fetch(s2, params={"enrollment_type[]":"teacher"})
shell_teachers = [(x['id'],x['name']) for x in teachers]
# cross-listed?
sec = f"{url}/api/v1/courses/{this_id}/sections"
sections = fetch(sec, params={"include[]":"total_students"})
shell_numsections = len(sections)
content = f"{this_id},{shell_shortname},{S['type']},{S['act']},{S['cap']},{d_start},{shell_start},{shell_end},{shell_restrict_view_dates},{shell_restrict_view_dates},{shell_state},{shell_numstudents},{shell_numsections},{S['teacher']},{shell_teachers}"
output.write(content + "\n")
print(content)
def course_by_depts_terms(section=0):
get_fresh = 0
TERM = 291
WI_TERM = 290
DOING_WINTER_MOVES = 1
SEM = "sp26"
make_changes = 0
do_all = 'a'
winter_start_day = 5
aviation_start_day = 7
nursing_start_day = 0
spring_start_day = 26
# get list of online course shells
if get_fresh:
print(f"Getting list of courses in {SEM}")
c = getCoursesInTerm(TERM,get_fresh,0)
codecs.open(f'cache/courses_in_term_{TERM}.json','w','utf-8').write(json.dumps(c,indent=2))
else:
c = json.loads( codecs.open(f'cache/courses_in_term_{TERM}.json','r','utf-8').read() )
# dict to match section numbers between shells and schedule
crn_to_canvasid = {}
for C in c:
if 'sis_course_id' in C and C['sis_course_id']:
print( f"{C['name']} -> {C['sis_course_id'][7:13]}" )
crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id'])
else:
print( f"---NO CRN IN: {C['name']} -> {C}" )
# get course info from schedule
s = requests.get(f"http://gavilan.cc/schedule/{SEM}_sched_expanded.json").json()
for S in s:
# get dates
start = re.sub( r'\-','/', S['start']) + '/20' + SEM[2:4]
d_start = datetime.strptime(start,"%m/%d/%Y")
# try to find online shell matching this schedule entry
try:
this_id = crn_to_canvasid[S['crn']]
report_line = f" - {start} {d_start} - id: {this_id} - {S['code']} {S['crn']} {S['name']}"
except Exception as e:
print(f"DIDN'T FIND CRN - {start} {d_start} - {S['code']} {S['crn']} {S['name']}" )
continue
if 1:
#if d_start.month < 5 or d_start.month > 7:
# print(f" Ignoring {d_start}, starting too far away...")
# continue
if d_start.month == 1 and d_start.day == aviation_start_day:
print(f"+ AVIAT {report_line}")
continue
#if d_start.month == 1 and d_start.day == nursing_start_day:
# print("- Nursing ", start, d_start, " - ", S['code'], " ", S['crn'] )
# continue
if d_start.month == 1 and d_start.day == spring_start_day:
print(f" IGNORE {report_line}" )
continue
else:
#print(f" Adjust course start day? {report_line}")
if make_changes:
if do_all != 'a':
do_all = input(' -> adjust? [enter] for yes, [a] to do all remaining. [n] to quit. >')
if do_all == 'n':
exit()
if do_all == '' or do_all == 'a':
data = {'course[start_at]':d_start.isoformat(), 'course[restrict_student_future_view]': True,
'course[restrict_enrollments_to_course_dates]':True }
u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{this_id}"
r3 = requests.put(u2, headers=header, params=data)
print(f"+ UPDATED {report_line}")
make_changes = 1
if DOING_WINTER_MOVES:
if d_start.month == 1 and d_start.day == winter_start_day:
print(f"+ WINTER {report_line}")
data = {'course[term_id]':WI_TERM}
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s" % crn_to_canvasid[S['crn']]
if make_changes:
r3 = requests.put(u2, headers=header, params=data)
#print(" updated.. OK")
#print(r3.text)
return
def xlist_cwe():
# cwe190 and wtrm290 get put into 1 shell
# cwe192 get put into another shell
this_sem_190_id = 23594 # they get 190s and 290s
this_sem_192_id = 23603 # they get 192s
# this_sem_term = 289
term = find_term( input("term? (ex: fa25) ") )
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
print(f"Couldn't find term.")
return
this_sem_term = term['canvas_term_id']
SEM = term['code']
get_fresh = 1
sem_courses = getCoursesInTerm(this_sem_term, get_fresh, 0)
for search_string in ['CWE190','WTRM290']:
for R in sem_courses:
try:
if re.search(search_string, R['name']) and str(R['id']) != str(this_sem_190_id):
# use the course to get the section id
print ( R['name'] )
u = url + '/api/v1/courses/%i/sections' % R['id']
for S in fetch(u):
if (S['id']):
myanswer = input( "-> Should I crosslist: %i\t%s\tsection id: %i (y/n) " % (R['id'],R['name'],S['id'] ))
if myanswer=='y':
# cross list
v = url + "/api/v1/sections/%i/crosslist/%i" % (S['id'],this_sem_190_id)
res = requests.post(v, headers = header)
print( json.dumps( json.loads(res.text), indent=2) )
print()
except Exception as e:
print( "Caused a problem: " + str(e) + "\n" + str(R) + "\n" )
## Now the 192s
search_string = "CWE192"
for R in sem_courses:
try:
if re.search(search_string, R['name']) and str(R['id']) != str(this_sem_192_id):
# use the course to get the section id
print ( R['name'] )
u = url + '/api/v1/courses/%i/sections' % R['id']
for S in fetch(u):
if (S['id']):
myanswer = input( "-> Should I crosslist: %i\t%s\tsection id: %i (y/n) " % (R['id'],R['name'],S['id'] ))
if myanswer=='y':
# cross list
v = url + "/api/v1/sections/%i/crosslist/%i" % (S['id'],this_sem_192_id)
res = requests.post(v, headers = header)
print( json.dumps( json.loads(res.text), indent=2) )
print()
except Exception as e:
print( "Caused a problem: " + str(e) + "\n" + str(R) + "\n" )
def change_term(courseid,termid):
data = { 'course[term_id]': str(termid), 'course[restrict_enrollments_to_course_dates]':'false' }
t = f'{url}/api/v1/courses/{courseid}'
r3 = requests.put(t, headers=header, params=data)
result = json.loads(r3.text)
json.dumps(result,indent=2)
def teacher_to_many_shells():
for id in [21842,21096,20952,21561,21219,21274,20875,21018,21093,21719,21460,21102,21506,21169,21538]:
#print(id)
#continue
term_id_misc = 9
term_id_sp25 = 287
change_term(id, term_id_misc)
# Add teacher
u3 = url + f"/api/v1/courses/{id}/enrollments"
#usrid = input("id of %s? " % N)
usrid = '30286'
data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":usrid,
"enrollment[enrollment_state]":"active" }
r4 = requests.post(u3, headers=header, params=data2)
result = json.loads(r4.text)
print(json.dumps(result, indent=2))
print(f"enrolled user id: {usrid} as teacher in course {id}.")
change_term(id, term_id_sp25)
import os, pickle
def create_sandboxes():
## TODO: read all student names and determine ahead of time if initials conflict. deal with them
courses_to_sandbox = [ #(20567, ' Sandbox GOTT1 SU24'),
#(20575, ' Sandbox GOTT2 SU24'),
#(20600, ' Sandbox GOTT4 SU24'),
#(19223, ' Sandbox GOTT5 WI24'),
#(19224, ' Sandbox GOTT6 WI24'),
#(20761, ' Sandbox GOTT1 FA24'),
#(21770, ' Sandbox GOTT1 WI25'),
#(21772, ' Sandbox GOTT2 WI25'),
#(23083, ' Sandbox GOTT1 SU25'),
#(23015, ' Sandbox GOTT2 SU25'),
#(21898, ' Sandbox GOTT4 SU25'),
#(23270, ' Sandbox GOTT1 FA25SEPT'),
#(23290, ' Sandbox GOTT2 FA25SEPT'),
(23314, ' Sandbox GOTT5 FA25'),
(23315, ' Sandbox GOTT4 FA25'),
]
filepath = 'cache/sandbox_courses.pkl'
report = codecs.open('cache/sandbox_report.txt','a','utf-8')
if os.path.exists(filepath):
with open(filepath, 'rb') as f:
sandbox_log = pickle.load(f)
else:
sandbox_log = []
for crs_id, label in courses_to_sandbox:
# TODO check and skip "Test Student"
crs_info = getCourses(crs_id)
# print(json.dumps(crs_info,indent=2))
c_name = crs_info['name']
print(f"\nStudents in course {crs_id}: {c_name}" )
report.write(f"\nCourse: {c_name}\n" )
enrolled = course_enrollment(crs_id)
for eid,stu in enrolled.items():
if stu['role'] != 'StudentEnrollment':
continue
u_name = stu['user']['short_name']
u_id = stu['user']['id']
initials = ''.join([ x[0] for x in u_name.split(" ") ])
print(f" id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}")
report.write(f" id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}")
coursename = f"{initials}{label}"
if coursename in sandbox_log:
print(f" - Already created: {coursename}")
else:
print(f" + Creating course: {coursename} for {u_name}, id: {u_id}")
u2 = url + "/api/v1/accounts/1/courses"
data = {
"course[name]": coursename,
"course[code]": coursename,
"course[term_id]": "8",
}
# Create a course
r3 = requests.post(u2, headers=header, params=data)
new_course_response = json.loads(r3.text)
id = new_course_response['id']
print(f" created course id {id}")
report.write(f" link: https://ilearn.gavilan.edu/courses/{id} id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}\n")
# Add teacher
u3 = url + f"/api/v1/courses/{id}/enrollments"
data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":u_id,
"enrollment[enrollment_state]":"active" }
r4 = requests.post(u3, headers=header, params=data2)
print(f" enrolled user id: {u_id} as teacher.")
# Desired settings
data = { 'course[is_public_to_auth_users]': True, 'course[event]': 'offer' }
t = url + f"/api/v1/courses/{id}"
r3 = requests.put(t, headers=header, params=data)
result = json.loads(r3.text)
if 'name' in result:
print(f" > Name: {result['name']}")
if 'workflow_state' in result:
print(f" > State: {result['workflow_state']}")
if 'is_public_to_auth_users' in result:
print(f" > Public: {result['is_public_to_auth_users']}")
sandbox_log.append(coursename)
# Write log back out
with open(filepath, 'wb') as handle:
pickle.dump(sandbox_log, handle, protocol=pickle.HIGHEST_PROTOCOL)
return 1
# ('ED','82'),
sandboxes = [ ('JH','45324'), ('PK','38183'), ('GM','5167'), ('BS','19231'),
('ST','303'), ('KW','5145')]
sandboxes = [ ('CD','51701'), ('LC','45193'), ('JC','70'), ('DG','133'), ('JH','2816'),('SM','18812'), ('GM','211'),
('RM','45341'), ('DP','251'), ('BT','58059'), ('TT','36834') ]
sandboxes = [ ('MA','8'), ('WA','15'), ('BA','18'), ('CC','51701'), ('LC','45193'), ('PC','4100'), ('ED','82'), ('KE','101'),
('OF','41897'), ('SG','115'), ('JG','37654'), ('DG','133'), ('DK','168'), ('JM','204'), ('GM', '211'),
('RM','45341'), ('CR','5655'), ('CS','272'), ('BS','19231'), ('SS', '274') ]
sandboxes = [ ('SM','191')]
sandboxes = [ ('KD', '2509'), ('KE', '2904'), ('SH', '144'), ('SN','60996'), ('EP', '16726'), ('PS','60938'), ('JW', '43052') ]
sandboxes = [('HA','61620'), ('AS','61451'), ('MP', '11565'), ('AA','51276') ]
sandboxes = [('JR','61062')]
for (N,usrid) in sandboxes:
coursename = f"{N} Sandbox SU23 (GOTT1)"
coursecode = f"{N} SU23 Sandbox (GOTT1)"
print(f"Creating course: {coursename} for {N}, id: {usrid}")
u2 = url + "/api/v1/accounts/1/courses"
data = {
"course[name]": coursename,
"course[code]": coursecode,
"course[term_id]": "8",
}
# Create a course
r3 = requests.post(u2, headers=header, params=data)
course_by_dept = json.loads(r3.text)
id = course_by_dept['id']
print(f"created course id {id}")
report.append( f"{coursename} https://ilearn.gavilan.edu/courses/{id}" )
# Add teacher
u3 = url + f"/api/v1/courses/{id}/enrollments"
#usrid = input("id of %s? " % N)
data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":usrid,
"enrollment[enrollment_state]":"active" }
r4 = requests.post(u3, headers=header, params=data2)
print(f"enrolled user id: {usrid} as teacher.")
# Desired settings
data = { 'course[is_public_to_auth_users]': True, 'course[event]': 'offer' }
t = url + f"/api/v1/courses/{id}"
r3 = requests.put(t, headers=header, params=data)
result = json.loads(r3.text)
if 'name' in result:
print(f"Name: {result['name']}")
if 'workflow_state' in result:
print(f" State: {result['workflow_state']}")
if 'is_public_to_auth_users' in result:
print(f" Public: {result['is_public_to_auth_users']}")
#print(json.dumps(json.loads(r4.text),indent=2))
#print()
#x = input("enter to continue")
print("\n\n")
print("\n".join(report))
print("\n")
## ##
## ##
## ## Course Nav and External Tools
## ##
## ##
def do_gav_connect():
term = 181
sem = "202430"
get_fresh = 1
crns = [sem + "-" + x.strip() for x in open('cache/starfish.txt','r').readlines()]
target = len(crns)
print(crns)
print("Press enter to begin.")
a = input()
print("Fetching all course names...")
c = getCoursesInTerm(term, get_fresh, 0)
i = 0
for course in c:
if course['sis_course_id'] in crns:
print(" Adding gav connect to", course['name'])
print()
result = add_gav_connect(course['id'])
if result:
i += 1
else:
print("Something went wrong with", course['name'])
print(f"Added {i} redirects out of {target}.")
def add_gav_connect(course_id):
params = { "name": "GavConnect",
"privacy_level": "anonymous",
"description": "Add links to external web resources that show up as navigation items in course, user or account navigation. Whatever URL you specify is loaded within the content pane when users click the link.",
"consumer_key": "N/A",
"shared_secret": "N/A",
"url": "https://www.edu-apps.org/redirect",
"custom_fields[new_tab]": "1",
"custom_fields[url]": "https://gavilan.starfishsolutions.com/starfish-ops/dl/student/dashboard.html",
"workflow_state": "anonymous",
"course_navigation[enabled]": "true",
"course_navigation[visibility]": "public",
"course_navigation[label]": "GavConnect",
"course_navigation[selection_width]": "800",
"course_navigation[selection_height]": "400",
"course_navigation[icon_url]": "https://www.edu-apps.org/assets/lti_redirect_engine/redirect_icon.png",
}
# POST
u = url + f"/api/v1/courses/{course_id}/external_tools"
res = requests.post(u, headers = header, params=params)
result = json.loads(res.text)
#print( json.dumps( result, indent=2) )
if "errors" in result:
return 0
if "id" in result:
return 1
# Create a Program Mapper redirect external tool in course nav.
def add_career_academic_pathways(course_id):
params = { "name": "Career & Academic Pathways",
"privacy_level": "anonymous",
"description": "Add links to external web resources that show up as navigation items in course, user or account navigation. Whatever URL you specify is loaded within the content pane when users click the link.",
"consumer_key": "N/A",
"shared_secret": "N/A",
"url": "https://www.edu-apps.org/redirect",
"custom_fields[new_tab]": "1",
"custom_fields[url]": "https://gavilan.programmapper.com/academics",
"workflow_state": "anonymous",
"course_navigation[enabled]": "true",
"course_navigation[visibility]": "public",
"course_navigation[label]": "Career & Academic Pathways",
"course_navigation[selection_width]": "800",
"course_navigation[selection_height]": "400",
"course_navigation[icon_url]": "https://www.edu-apps.org/assets/lti_redirect_engine/redirect_icon.png",
"course_navigation[default]": "disabled",
}
# POST
u = url + f"/api/v1/courses/{course_id}/external_tools"
res = requests.post(u, headers = header, params=params)
result = json.loads(res.text)
#print( json.dumps( result, indent=2) )
if "errors" in result:
return 0
if "id" in result:
return 1
# Bulk add GavConnect to a list of course ids provided by user input.
def add_gav_connect_prompt_list():
raw = input("Enter course ids (comma/space separated): ")
ids = [s.strip() for s in re.split(r"[\s,]+", raw) if s.strip()]
if not ids:
print("No course ids provided.")
return
ok = 0
for cid in ids:
try:
res = add_gav_connect(cid)
print(f"{cid}: {'OK' if res else 'FAILED'}")
if res:
ok += 1
except Exception as ex:
print(f"{cid}: error {ex}")
print(f"Added GavConnect to {ok} of {len(ids)} courses.")
# Add Pathways link to every course in a selected term (default OFF).
def add_pathways_all_courses_in_term():
t = find_term(input("term? (ex: fa25) "))
if not t or (not 'canvas_term_id' in t) or (not 'code' in t):
print("Couldn't find term.")
return
term = t['canvas_term_id']
print("Fetching list of all active courses")
courses = getCoursesInTerm(term, 1, 0)
ok, total = 0, 0
for C in courses:
total += 1
try:
res = add_career_academic_pathways(C['id'])
print(f"{C['id']} {C.get('name','')}: {'OK' if res else 'FAILED'}")
if res:
ok += 1
except Exception as ex:
print(f"{C['id']} {C.get('name','')}: error {ex}")
print(f"Added Pathways to {ok} of {total} courses.")
# Ensure Pathways link exists for every course in the term; add if missing (default OFF).
def ensure_pathways_in_term():
t = find_term(input("term? (ex: fa25) "))
if not t or (not 'canvas_term_id' in t) or (not 'code' in t):
print("Couldn't find term.")
return
term = t['canvas_term_id']
TARGET_LABEL = "Career & Academic Pathways"
print("Fetching list of all active courses")
courses = getCoursesInTerm(term, 1, 0)
added, present, total = 0, 0, 0
for C in courses:
total += 1
try:
tabs = fetch(f"{url}/api/v1/courses/{C['id']}/tabs") or []
has_it = any(t.get('label') == TARGET_LABEL for t in tabs)
if has_it:
present += 1
print(f"{C['id']} {C.get('name','')}: already present")
else:
res = add_career_academic_pathways(C['id'])
print(f"{C['id']} {C.get('name','')}: {'ADDED' if res else 'FAILED'}")
if res:
added += 1
except Exception as ex:
print(f"{C['id']} {C.get('name','')}: error {ex}")
print(f"Present: {present}, Added: {added}, Total: {total}")
def mod_eval_visibility( shell_id, visible=True ):
evals_hidden = True
if (visible): evals_hidden = False
data = {'position':2, 'hidden':evals_hidden}
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % shell_id
r3 = requests.put(u2, headers=header, params=data)
#print(" " + r3.text)
def instructor_list_to_activate_evals():
courses = all_sem_courses_teachers()
mylist = codecs.open('cache/fa21_eval_teachers.txt','r','utf-8').readlines()
mylist = [ x.split(',')[2].strip() for x in mylist ]
count = 0
limit = 5000
for c in courses:
shell_id = c[1]
teacher_id = c[6]
teacher_name = c[5]
course_name = c[3]
if teacher_id in mylist:
print("Teacher: %s \t course: %s" % (teacher_name,course_name))
mod_eval_visibility( shell_id, False)
count += 1
if count > limit: return
#print(mylist)
# Toggle the eval tool visibility for all courses in the selected Canvas term.
def add_evals(section=0):
# show or hide?
term_record = find_term(input('term? '))
if not term_record:
raise ValueError(f"Unknown term")
term_id = term_record.get('canvas_term_id')
if term_id is None:
raise ValueError(f"Canvas term id missing for {term_record}")
term_code = term_record.get('code')
# fetch list of courses?
GET_FRESH_LIST = 0
# turn off eval link to clean up from prev semester?
#CLEAN_UP = 1
# just print, don't change anything
TEST_RUN = 0
# confirm each shell?
ASK = 0
# are we showing or hiding the course eval link?
HIDE = False
s = [ x.strip() for x in codecs.open(f"cache/{term_code}_eval_sections.txt",'r').readlines()]
s = list(funcy.flatten(s))
s.sort()
print(f"Going to activate course evals in these sections: \n{s}\n")
xyz = input('hit return to continue')
all_semester_courses = getCoursesInTerm(term_id, GET_FRESH_LIST, 1)
eval_course_ids = []
courses = {}
for C in all_semester_courses:
if C and 'sis_course_id' in C and C['sis_course_id']:
parts = C['sis_course_id'].split('-')
if parts[1] in s:
#print(C['name'])
courses[str(C['id'])] = C
eval_course_ids.append(str(C['id']))
data = {'position':2, 'hidden':HIDE}
eval_course_ids.sort()
for i in eval_course_ids:
if TEST_RUN:
print(f"{courses[i]['id']} / {courses[i]['name']}")
else:
if ASK:
a = input(f"Hit q to quit, a to do all, or enter to activate eval for: {courses[i]['id']} / {courses[i]['name']} : ")
if a == 'a': ASK = 0
if a == 'q': return
else:
print(f"{courses[i]['id']} / {courses[i]['name']}")
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % i
r3 = requests.put(u2, headers=header, params=data)
print(f"OK {u2}")
#print(r3.text)
#time.sleep(0.400)
def remove_evals_all_sections():
TERM = 184
SEM = "fa24"
# fetch list of courses?
GET_FRESH_LIST = 0
# just print, don't change anything
TEST_RUN = 0
# confirm each shell?
ASK = 0
# are we showing or hiding the course eval link?
HIDE = True
all_semester_courses = getCoursesInTerm(TERM, GET_FRESH_LIST, 1)
eval_course_ids = [ C['id'] for C in all_semester_courses ]
courses = { C['id']: C for C in all_semester_courses }
data = {'position':2, 'hidden':HIDE}
eval_course_ids.sort()
for i in eval_course_ids:
if TEST_RUN:
print(f"{courses[i]['id']} / {courses[i]['name']}")
else:
if ASK:
a = input(f"Hit q to quit, a to do all, or enter to activate eval for: {courses[i]['id']} / {courses[i]['name']} : ")
if a == 'a': ASK = 0
if a == 'q': return
else:
print(f"{courses[i]['id']} / {courses[i]['name']}")
u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{i}/tabs/context_external_tool_1953"
r3 = requests.put(u2, headers=header, params=data)
print(r3.text)
def get_ext_tools():
r = url + '/api/v1/accounts/1/external_tools'
s = fetch(r)
print(json.dumps(s,indent=2))
def set_ext_tools():
TOOL = 733
r = url + '/api/v1/accounts/1/external_tools/%s' % str(TOOL)
data = { 'course_navigation[default]': 'disabled' }
s = json.loads(requests.put(r, headers=header, params=data).text)
print(json.dumps(s,indent=2))
def get_course_ext_tools():
course_id = "15971"
r = url + f"/api/v1/courses/{course_id}/external_tools"
s = fetch(r)
print(json.dumps(s,indent=2))
def remove_n_analytics(section=0):
print("Fetching list of all active courses")
c = getCoursesInTerm(172,1,0)
print(c)
ids = []
courses = {}
data = {'hidden':True}
pause = 1
for C in c:
#print( json.dumps(C,indent=2) )
parts = C['sis_course_id'].split('-')
#print("\n")
print(C['name'])
courses[str(C['id'])] = C
ids.append(str(C['id']))
u3 = url + '/api/v1/courses/%s/tabs' % str(C['id'])
tabs = fetch(u3)
for T in tabs:
if T['label'] == "New Analytics":
print( "\tVisibility is: " + T["visibility"] ) # json.dumps(tabs,indent=2) )
if "hidden" in T:
print( "\tHidden is: " + str(T["hidden"]) ) # json.dumps(tabs,indent=2) )
if 1: # T["visibility"] != "admins":
u4 = url + "/api/v1/courses/%s/tabs/%s" % ( str(C['id']), str(T['id']) )
print( "\tChanging visiblity of a. tab" )
r4 = requests.put(u4, headers=header, params=data)
print("\t" + r4.text)
if pause:
xyz = input('\n\nenter for next one or [y] to do all: ')
if xyz == 'y': pause = 0
exit()
"""ask = 1
evals_hidden = True
data = {'position':2, 'hidden':evals_hidden}
for i in ids:
if ask:
a = input("Hit q to quit, a to do all, or enter to activate eval for: \n " + str(courses[i]) + "\n> ")
if a == 'a': ask = 0
if a == 'q': return
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % i
print(courses[i]['name'])
r3 = requests.put(u2, headers=header, params=data)
print(" " + r3.text)
time.sleep(0.300)
"""
import csv
def my_nav_filter(row):
# Filter logic: consider a tab ON only when visibility is public and not hidden; used for CSV summary.
if str(row.get('visibility', '')).lower() != 'public':
return False
hidden = row.get('hidden')
if str(hidden).lower() in ['true']:
return False
return True
## Multi-pass course nav tool. Pass 1: index + XLSX; Pass 2: optional hide/show ids.
def clean_course_nav_setup_semester(section=0):
# Multi-pass tool: index course nav, write XLSX with x/-/blank, then optionally bulk hide/show by label across the term.
TESTING = False # Limit to first 10 courses while testing
import openpyxl
from openpyxl.utils import get_column_letter
t = find_term(input("term? (ex: fa25) "))
if not t or (not 'canvas_term_id' in t) or (not 'code' in t):
print("Couldn't find term.")
return
term = t['canvas_term_id']
SEM = t['code']
print("Fetching list of all active courses")
courses_in_term = getCoursesInTerm(term, 1, 0)
if TESTING:
print("TESTING mode enabled: limiting to first 20 courses")
courses_in_term = courses_in_term[:20]
# Collect course records and their tabs
all_tabs_by_course = {} # course_id -> list of tab dicts
all_tab_ids = {} # tab_id -> label (most recent seen) — kept for Pass 2 id operations
all_labels = set() # all labels seen in any course (visible or hidden)
# Also write a detailed CSV of visible tabs as before
nav_out = codecs.open(f'cache/course_nav_summary_{SEM}.csv', 'w', 'utf-8')
nav_writer = csv.writer(nav_out)
columns = "id name code start state label position hidden visibility type url".split(" ")
nav_writer.writerow(columns)
for C in courses_in_term:
try:
cid = str(C['id'])
print(C['name'])
u3 = f"{url}/api/v1/courses/{C['id']}/tabs"
tabs = fetch(u3) or []
# Normalize hidden
for T in tabs:
if 'hidden' not in T:
T['hidden'] = "n/a"
# Track global set of tab ids and a sample label
all_tab_ids[str(T.get('id'))] = T.get('label', str(T.get('id')))
if T.get('label'):
all_labels.add(T.get('label'))
# Write summary of ON tabs
vals = [C['id'], C['name'], C['course_code'], C.get('start_at', ''), C.get('workflow_state', ''),
T.get('label', ''), T.get('position', ''), T.get('hidden', ''), T.get('visibility', ''),
T.get('type', ''), T.get('html_url', '')]
mydict = dict(zip(columns, vals))
if my_nav_filter(mydict):
nav_writer.writerow(vals)
nav_out.flush()
all_tabs_by_course[cid] = tabs
except Exception as err:
print(f"Exception: {err}")
try:
nav_out.close()
except Exception:
pass
# Build XLSX matrix
try:
# Compute popularity (count of visible 'x' occurrences per label) and sort by popularity then alpha.
# First, build quick lookup per course for label visibility/hidden.
def tab_status(t):
vis = str(t.get('visibility', '')).lower()
hid = str(t.get('hidden', '')).lower()
if vis == 'public' and hid not in ['true']:
return 'x' # present and visible
return '-' # present but hidden (includes non-public visibility)
# Precompute per-course map: label -> status symbol ('x' or '-')
status_by_course = {}
for C in courses_in_term:
cid = str(C['id'])
label_to_status = {}
for tdict in all_tabs_by_course.get(cid, []) or []:
lbl = tdict.get('label')
if not lbl:
continue
cur = label_to_status.get(lbl)
new = tab_status(tdict)
# If any instance is visible, prefer 'x' over '-'
if cur == 'x':
continue
label_to_status[lbl] = 'x' if new == 'x' else (cur or '-')
status_by_course[cid] = label_to_status
# Popularity counts
visible_counts = defaultdict(int)
for cid, lmap in status_by_course.items():
for lbl, sym in lmap.items():
if sym == 'x':
visible_counts[lbl] += 1
# Column order: by decreasing x-count, then alphabetical for ties
tab_labels = sorted(all_labels, key=lambda s: (-visible_counts.get(s, 0), str(s).lower()))
xlsx_path = f"cache/course_nav_matrix_{SEM}.xlsx"
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "matrix"
# Headers
static_headers = ['course_id', 'course_name', 'first_teacher']
# Row 1: labels only, sorted by popularity (most x's first)
row1 = static_headers + list(tab_labels)
ws.append(row1)
# Rows: one per course
for C in courses_in_term:
cid = str(C['id'])
cname = C.get('name', '')
# First teacher
teacher = ''
try:
tlist = teacher_list(int(cid)) or []
if tlist:
# list of tuples: (id, name)
teacher = sorted([t[1] for t in tlist])[0]
except Exception:
pass
row = [cid, cname, teacher]
course_map = status_by_course.get(cid, {})
for lbl in tab_labels:
sym = course_map.get(lbl, '')
row.append(sym)
ws.append(row)
# Simple sizing for readability
for col_idx in range(1, ws.max_column + 1):
col_letter = get_column_letter(col_idx)
ws.column_dimensions[col_letter].width = 16 if col_idx > 3 else 20
wb.save(xlsx_path)
print(f"Wrote matrix: {xlsx_path}")
except Exception as ex:
print(f"Failed to write XLSX matrix: {ex}")
# Optional Pass 2: apply hide/show updates (labels only)
try_apply = input("Apply changes? [n] hide/show selected labels across all courses (y/N): ").strip().lower() in ['y', 'yes']
if not try_apply:
print("Pass 1 complete. No changes applied.")
return
# Gather labels to HIDE and SHOW
def parse_label_list(s):
if not s:
return []
# Allow comma-separated list. Spaces are preserved within labels.
if '\n' in s:
raw = [x.strip() for x in s.split('\n') if x.strip()]
else:
raw = [x.strip() for x in s.split(',') if x.strip()]
return raw
hide_src = input("Enter labels to HIDE (comma-separated or newline file path), leave blank to skip: ").strip()
show_src = input("Enter labels to SHOW (comma-separated or newline file path), leave blank to skip: ").strip()
hide_labels = []
show_labels = []
def read_labels_from_path(pth):
try:
with codecs.open(pth, 'r', 'utf-8') as f:
content = f.read()
# Prefer newline separation in files (one label per line)
return [x.strip() for x in content.splitlines() if x.strip()]
except Exception:
return []
if hide_src:
if os.path.exists(hide_src):
hide_labels = read_labels_from_path(hide_src)
elif os.path.exists(os.path.join('cache', hide_src)):
hide_labels = read_labels_from_path(os.path.join('cache', hide_src))
else:
hide_labels = parse_label_list(hide_src)
if show_src:
if os.path.exists(show_src):
show_labels = read_labels_from_path(show_src)
elif os.path.exists(os.path.join('cache', show_src)):
show_labels = read_labels_from_path(os.path.join('cache', show_src))
else:
show_labels = parse_label_list(show_src)
# De-duplicate while keeping order roughly intact
hide_labels = list(dict.fromkeys(hide_labels))
show_labels = list(dict.fromkeys(show_labels))
if not (hide_labels or show_labels):
print("No labels provided. Skipping Pass 2.")
return
print(f"HIDE labels: {hide_labels}")
print(f"SHOW labels: {show_labels}")
confirm = input("Proceed with updates? (y/N): ").strip().lower() in ['y', 'yes']
if not confirm:
print("Aborted. No changes applied.")
return
# Build lookup of labels currently ON per course from the CSV summary (to avoid unnecessary hide calls)
on_labels_by_course = defaultdict(set)
try:
csv_path = f'cache/course_nav_summary_{SEM}.csv'
with codecs.open(csv_path, 'r', 'utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
cid = str(row.get('id', '')).strip()
lbl = row.get('label')
if cid and lbl:
on_labels_by_course[cid].add(lbl)
except Exception as ex:
print(f"Warning: could not read CSV summary for ON labels: {ex}")
# Apply changes
for C in courses_in_term:
cid = str(C['id'])
try:
course_tabs = all_tabs_by_course.get(cid)
# Refresh tabs to reduce staleness just before applying
if course_tabs is None:
course_tabs = fetch(f"{url}/api/v1/courses/{cid}/tabs") or []
tabs_by_label = defaultdict(list)
for t in course_tabs:
lbl = t.get('label')
if lbl:
tabs_by_label[lbl].append(t)
# Helpers to PUT hide/show for a tab id
def set_hidden_for_id(tab_id, hidden_val):
put_url = f"{url}/api/v1/courses/{cid}/tabs/{tab_id}"
data = {'hidden': hidden_val}
r = requests.put(put_url, headers=header, params=data)
action = 'HIDE' if hidden_val else 'SHOW'
print(f"{action} {cid} {C.get('name','')} -> {tab_id}: {r.status_code}")
# Hide by labels (only if label is currently ON per CSV)
labels_to_hide_here = [lbl for lbl in hide_labels if lbl in on_labels_by_course.get(cid, set())]
for lbl in labels_to_hide_here:
for t in tabs_by_label.get(lbl, []):
tid = str(t.get('id'))
set_hidden_for_id(tid, True)
# Show by labels
for lbl in show_labels:
for t in tabs_by_label.get(lbl, []):
tid = str(t.get('id'))
set_hidden_for_id(tid, False)
except Exception as ex:
print(f"Failed applying to course {cid}: {ex}")
print("Pass 2 complete.")
def fetch_rubric_scores(course_id=16528, assignment_id=1):
api_url = f'{url}/api/v1/courses/{course_id}'
course_info = fetch(api_url)
out = codecs.open('cache/rubric_scores.txt','w','utf-8')
#print(course_info)
# Extract course details
course_name = course_info['name']
course_short_name = course_info['course_code']
course_semester = course_info['enrollment_term_id']
# Print course information
out.write(f"Course Name: {course_name}\n")
out.write(f"Short Name: {course_short_name}\n")
out.write(f"Semester: {course_semester}\n")
api_url = f'{url}/api/v1/courses/{course_id}/assignments'
assignments_list = fetch(api_url)
#print(assignments_list)
assignments_by_dept = {}
ratings_by_dept = {}
# Iterate through the list of assignments and populate the dictionary
for assignment in assignments_list:
assignment_id = assignment['id']
assignment_name = assignment['name']
rubric = assignment.get('rubric', []) # Get the rubric field (default to an empty list if not present)
has_rubric = 'no'
if rubric: has_rubric = 'yes'
out.write(f" Asmt Name: {assignment_name} ID: {assignment_id} Rubric: {has_rubric}\n")
# Save assignment details including rubric
assignments_by_dept[assignment_id] = {
'name': assignment_name,
'rubric': rubric
# Add more assignment details if needed
}
if rubric:
print("RUBRIC:")
print(json.dumps(rubric,indent=2))
for r in rubric:
for rat in r.get('ratings',[]):
ratings_by_dept[rat['id']] = { 'description': r['description'], 'long_description': rat['description'], 'points': rat['points']}
# Print the assignments dictionary
out.write(json.dumps(assignments_by_dept,indent=2)+'\n\n\n')
out.write(json.dumps(ratings_by_dept,indent=2)+'\n\n\n')
# Loop thru assignments with rubrics and report on grades
for assignment in assignments_list:
if not assignment.get('rubric', []):
continue
assignment_id = assignment['id']
out.write(f" Asmt Name: {assignment_name} ID: {assignment_id}\n")
api_url = f'{url}/api/v1/courses/{course_id}/assignments/{assignment_id}/submissions?include[]=rubric_assessment'
# Include the 'include[]=rubric_assessment' parameter to request rubric assessments
# params = {'include[]': 'rubric_assessment'}
# Make the API request with the parameters
#response = requests.get(api_url, params=params)
# Check if the request was successful (status code 200)
#if response.status_code != 200:
# print(f"Request failed with status code {response.status_code}")
# continue
submissions_by_dept = fetch(api_url)
# Iterate through the list of submissions and retrieve rubric scores and comments
for submission in submissions_by_dept:
user_id = submission['user_id']
rubric = submission.get('rubric_assessment', []) # Get the rubric assessment (empty list if not present)
comments = submission.get('submission_comments', '') # Get submission comments (empty string if not present)
score = submission.get('score', -1)
# Process and use rubric scores and comments as needed
# Example: Print user information, rubric scores, and comments
if rubric:
print(json.dumps(submission,indent=2))
out.write(f"\nSubmission User ID/Assignment ID: {user_id}/{assignment_id}\n")
out.write(f"Score: {score}\n")
out.write(f"Submission Comments: {comments}\n")
out.write(f"Rubric:\n")
for k,v in rubric.items():
rub_by_dept = '?'
rat_by_dept = '?'
if v['rating_id'] in ratings_by_dept:
rub_rating = ratings_by_dept[v['rating_id']]
rub_by_dept = rub_rating['rub_by_deptription']
rat_by_dept = rub_rating['rat_by_deptription']
out.write(f" {rub_by_dept} - {rat_by_dept} ({v['rating_id']}): {v['points']}/{rub_rating['points']} points: {v['comments']}\n")
out.write("---") # Separator between submissions
out.flush()
def quick_sem_course_list(term=180):
c = getCoursesInTerm(term,1,0)
c = sorted(c, key=lambda k: k['name'])
for C in c:
print(C['name'])
# Check Canvas for an existing calendar event that matches the provided metadata.
def find_existing_calendar_event(context_code, title, start_at_iso, description="", tolerance_hours=12):
def _normalize_iso(value):
if not value:
return None
if value.endswith('Z'):
value = value[:-1] + '+00:00'
try:
return datetime.fromisoformat(value)
except ValueError:
return None
target_start = _normalize_iso(start_at_iso)
if not target_start:
return None
window_start = (target_start - timedelta(hours=tolerance_hours)).date().isoformat()
window_end = (target_start + timedelta(hours=tolerance_hours)).date().isoformat()
params = {
"context_codes[]": context_code,
"start_date": window_start,
"end_date": window_end,
}
existing_events = fetch("/api/v1/calendar_events", params=params)
if not isinstance(existing_events, list):
print(f"Unable to inspect existing events for context {context_code}: unexpected response")
return None
normalized_title = title.strip().lower() if isinstance(title, str) else ""
normalized_description = description.strip().lower() if isinstance(description, str) else ""
for event in existing_events:
event_title = (event.get('title') or "").strip().lower()
event_description = (event.get('description') or "").strip().lower()
event_start = _normalize_iso(event.get('start_at') or "")
if not event_start:
continue
time_difference = abs((event_start - target_start).total_seconds())
if time_difference > tolerance_hours * 3600:
continue
if event_title == normalized_title:
return event
if normalized_description and event_description == normalized_description:
return event
return None
# Remove all calendar events attached to a course after user confirmation.
def remove_all_course_events():
course_id = input("course id> ").strip()
if not course_id:
print("No course id provided; aborting.")
return
context_code = course_id if course_id.startswith("course_") else f"course_{course_id}"
today = datetime.now(timezone.utc).date()
start_date = (today - timedelta(days=730)).isoformat()
end_date = (today + timedelta(days=365)).isoformat()
print(f"Fetching existing events for {context_code} between {start_date} and {end_date}...")
params = {
"context_codes[]": context_code,
"per_page": 100,
"start_date": start_date,
"end_date": end_date,
}
events = fetch("/api/v1/calendar_events", params=params)
if not events:
print("No events found for this course.")
return
print(f"Found {len(events)} events. Beginning removal...")
for event in events:
event_id = event.get("id")
event_title = event.get("title", "(no title)")
if not event_id:
print(f"Skipping event '{event_title}' with missing id")
continue
print(f"Deleting event '{event_title}' (id {event_id}) in {context_code}...", end=' ')
delete_url = f"{url}/api/v1/calendar_events/{event_id}"
response = requests.delete(delete_url, headers=header)
if response.ok:
print("deleted successfully")
else:
print(f"failed: {response.status_code} {response.text}")
# Build a term-wide CSV summarizing student participation metrics for every course.
def build_term_participation_report():
term_alias = input("Term alias (ex: fa25): ").strip()
if not term_alias:
print("No term alias provided; aborting.")
return
normalized_alias = term_alias.lower()
term_record = find_term(normalized_alias)
if not term_record:
print(f"Unknown term alias: {term_alias}")
return
term_id = term_record.get('canvas_term_id')
if not term_id:
print(f"Canvas term id missing for {term_alias}")
return
term_code = (term_record.get('code') or normalized_alias).lower()
mode_choice = input("Demo run with ~10 random courses? (y/N): ").strip().lower()
demo_mode = mode_choice == 'y'
courses = getCoursesInTerm(term_id, get_fresh=0, show=0)
if not isinstance(courses, list):
print("Unable to fetch courses for this term; aborting.")
return
if demo_mode:
random.shuffle(courses)
print("Demo mode: targeting up to 10 courses with analytics data")
output_path = f"cache/{term_code}_participation.csv"
base_fields = [
'term_code',
'course_id',
'course_name',
'course_sis_id',
'course_code',
'student_canvas_id',
'student_sortable_name',
'student_sis_user_id',
'student_login_id',
'student_name',
'student_email',
]
rows = []
data_fields = set()
def flatten_value(prefix, value, dest):
if isinstance(value, dict):
for key, val in value.items():
next_key = f"{prefix}.{key}" if prefix else str(key)
flatten_value(next_key, val, dest)
elif isinstance(value, list):
dest[prefix] = json.dumps(value)
else:
dest[prefix] = value
processed_courses = 0
for course in courses:
if demo_mode and processed_courses >= 10:
break
course_id = course.get('id')
if not course_id:
continue
course_name = course.get('name', '')
print(f"Fetching analytics for course {course_id}: {course_name}")
enrollment_index = {}
try:
enrollment_params = {
'type[]': 'StudentEnrollment',
'per_page': 100,
}
enrollments = fetch(f"/api/v1/courses/{course_id}/enrollments", params=enrollment_params)
if isinstance(enrollments, list):
for enrollment in enrollments:
user = enrollment.get('user') or {}
user_id = user.get('id') or enrollment.get('user_id')
if not user_id:
continue
entry = {
'sortable_name': user.get('sortable_name', ''),
'sis_user_id': user.get('sis_user_id', ''),
'login_id': user.get('login_id', ''),
'sis_login_id': user.get('sis_login_id', ''),
'email': user.get('email', ''),
'name': user.get('name', ''),
}
enrollment_index[user_id] = entry
enrollment_index[str(user_id)] = entry
except Exception as exc:
print(f"Failed to fetch enrollments for {course_id}: {exc}")
try:
summaries = fetch(f"/api/v1/courses/{course_id}/analytics/student_summaries")
except Exception as exc:
print(f"Failed to fetch analytics for {course_id}: {exc}")
continue
if not isinstance(summaries, list):
print(f"Unexpected analytics payload for {course_id}; skipping")
continue
course_rows_added = 0
for summary in summaries:
flattened = {}
flatten_value('', summary, flattened)
user_id = (
summary.get('id')
or summary.get('user_id')
or flattened.get('user_id')
or flattened.get('user.id')
)
enrollment_details = {}
if user_id in enrollment_index:
enrollment_details = enrollment_index[user_id]
elif isinstance(user_id, str) and user_id.isdigit():
enrollment_details = enrollment_index.get(int(user_id), {})
elif isinstance(user_id, int):
enrollment_details = enrollment_index.get(str(user_id), {})
row = {
'term_code': term_code,
'course_id': str(course_id),
'course_name': course_name,
'course_sis_id': course.get('sis_course_id', ''),
'course_code': course.get('course_code', ''),
'student_canvas_id': str(user_id) if user_id else '',
'student_sortable_name': enrollment_details.get('sortable_name') or '',
'student_sis_user_id': (enrollment_details.get('sis_user_id') or enrollment_details.get('sis_login_id')) or '',
'student_login_id': enrollment_details.get('login_id') or '',
'student_name': enrollment_details.get('name') or '',
'student_email': enrollment_details.get('email') or '',
}
if enrollment_details:
data_fields.add('student_name')
data_fields.add('student_email')
for key, value in flattened.items():
if not key:
continue
row[key] = value
data_fields.add(key)
rows.append(row)
course_rows_added += 1
if course_rows_added == 0:
print(f"Skipping course {course_id}: no student analytics data")
continue
processed_courses += 1
if demo_mode and processed_courses < 10:
print(f"Demo mode finished early: only {processed_courses} courses had analytics data")
if not rows:
print("No analytics data found; nothing to write.")
return
field_order = base_fields + sorted([field for field in data_fields if field not in base_fields])
print(f"Writing {len(rows)} rows to {output_path}")
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=field_order)
writer.writeheader()
for row in rows:
writer.writerow({field: row.get(field, '') for field in field_order})
# Summarize student orientation enrollments across the account and flag coverage gaps.
def audit_student_orientation_enrollments():
orientation_years = ['2022', '2023', '2024', '2025', '2026']
orientation_shells = get_orientation_shells(orientation_years)
missing_years = [year for year in orientation_years if year not in orientation_shells]
if missing_years:
print(f"Warning: orientation shells not found for years: {', '.join(missing_years)}")
if not orientation_shells:
print("No orientation courses located; aborting.")
return
orientation_memberships = get_orientation_memberships(orientation_years)
student_summaries = get_student_enrollment_summary()
if not student_summaries:
print("No student enrollment data available; aborting.")
return
rows = []
for summary in student_summaries:
user_id = summary.get('user_id')
user_key = str(user_id)
membership = orientation_memberships.get(user_key, {'years': set(), 'total': 0})
membership_years = membership.get('years', set())
orientation_total = membership.get('total', 0)
row = {
'student_id': user_key,
'sortable_name': summary.get('sortable_name') or summary.get('name') or '',
'sis_id': summary.get('sis_user_id') or '',
'student_enrollment_count': summary.get('student_enrollments', 0),
'teacher_enrollment_count': summary.get('teacher_enrollments', 0),
'orientation_enrollment_total': orientation_total,
'missing_student_orientation': 1 if orientation_total == 0 else 0,
}
for year in orientation_years:
row[year] = 1 if year in membership_years else 0
rows.append(row)
if not rows:
print("No rows to write; aborting.")
return
rows.sort(key=lambda r: (r.get('missing_student_orientation', 0), r.get('sortable_name', '')))
output_path = 'cache/student_orientation_audit.csv'
fieldnames = [
'student_id',
'sortable_name',
'sis_id',
'student_enrollment_count',
'teacher_enrollment_count',
*orientation_years,
'orientation_enrollment_total',
'missing_student_orientation'
]
print(f"Writing {len(rows)} rows to {output_path}")
with open(output_path, 'w', newline='', encoding='utf-8') as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow({field: row.get(field, '') for field in fieldnames})
missing_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) == 0)
multi_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) > 1)
print(f"Orientation audit complete. Missing: {missing_count}, duplicates: {multi_count}.")
# Create Canvas calendar events for predefined orientation shells from CSV input.
def create_calendar_event():
events = codecs.open('cache/academic_calendar_2025.csv','r','utf-8').readlines()
orientation_shells = ["course_15924","course_19094","course_20862", "course_23313"]
for ori_shell in orientation_shells:
for e in events:
if not e.strip():
continue
parts = [part.strip() for part in e.split(',', 2)]
if len(parts) < 3:
continue
date, title, desc = parts
local = pytz.timezone("America/Los_Angeles")
naive = datetime.strptime(date, "%Y-%m-%d")
local_dt = local.localize(naive, is_dst=None)
utc_dt = local_dt.astimezone(pytz.utc).isoformat()
print(f"Checking event '{title}' ({date}) in {ori_shell}...", end=' ')
existing_event = find_existing_calendar_event(ori_shell, title, utc_dt, desc)
if existing_event:
existing_id = existing_event.get('id')
print(f"exists as id {existing_id} in {ori_shell}, skipping add")
continue
print(f"no existing event in {ori_shell}, attempting add")
params = {
"calendar_event[context_code]": ori_shell,
"calendar_event[title]": title,
"calendar_event[description]": desc,
"calendar_event[start_at]": utc_dt, # DateTime
"calendar_event[all_by_dept": "true",
}
u = url + "/api/v1/calendar_events"
res = requests.post(u, headers = header, params=params)
if res.ok:
try:
result = json.loads(res.text)
except json.JSONDecodeError:
print(f"add completed for '{title}' in {ori_shell} (status {res.status_code}) but response parse failed")
continue
new_id = result.get("id")
if new_id:
print(f"added successfully as id {new_id} in {ori_shell} (status {res.status_code})")
elif "errors" in result:
print(f"add failed for '{title}' in {ori_shell}: {result['errors']}")
else:
print(f"add attempted for '{title}' in {ori_shell} with unexpected response {result}")
else:
print(f"add failed for '{title}' in {ori_shell}: {res.status_code} {res.text}")
def utc_to_local(utc_str):
if not utc_str: return ""
utc_dt = datetime.strptime(utc_str, '%Y-%m-%dT%H:%M:%SZ')
# Set the UTC timezone
utc_tz = pytz.timezone('UTC')
# Convert the UTC datetime to the Pacific Time Zone
pacific_tz = pytz.timezone('US/Pacific')
pacific_dt = pytz.timezone(pacific_tz)
return pacific_dt.strftime('%a %b %d, %Y %#I:%M%p')
def list_all_assignments():
course = input("the course id> ")
u = url + f"/api/v1/courses/{course}/assignments"
c = fetch(u)
#print(json.dumps(c,indent=2))
for a in c:
p = 'not published'
if a['published'] == True: p = 'published'
date = utc_to_local(a['due_at'])
print(f"{a['name']}\t{p}\t{date}")
def bulk_unenroll():
course_id = input("course id> ")
enrollments = fetch(f"{url}/api/v1/courses/{course_id}/enrollments")
for enrollment in enrollments:
enrollment_id = enrollment['id']
#skiplist = ['51237','58362','237']
#if enrollment_id in skiplist:
# continue
# Set the headers and parameters for the DELETE API call
api_url = f"{url}/api/v1/courses/{course_id}/enrollments/{enrollment_id}"
# Make the DELETE request
response = requests.delete(api_url, headers=header)
# Check the response
if response.status_code == 200:
print(f"Successfully unenrolled student with id {enrollment_id} from course {course_id}.")
else:
print(f"Failed to unenroll student with id {enrollment_id} from course {course_id}. Error: {response.text}")
def fetch_announcements(course_id=0):
if not course_id:
course_id = input("course id> ")
announcements_url = f"{url}/api/v1/announcements?context_codes[]=course_{course_id}&start_date=2025-01-01&end_date=2025-12-31"
announcements = fetch(announcements_url)
print(json.dumps(announcements,indent=2))
filename = f"cache/announcements_{course_id}.json"
with open(filename, "w") as file:
json.dump(announcements, file,indent=2)
print("Announcements saved to ", filename)
def change_link_in_all_terms_pages():
old_link = "https://www.gavilan.edu/ezproxy"
new_link = "https://www.gavilan.edu/ezproxy_new"
term = 181
courses = getCoursesInTerm(term,get_fresh=1,show=0,active=1)
def enrollment_helper():
ignore = ['JLE','JFT', 'CWE']
ignore2 = ['AH 190', 'AE 600', 'AE 602', 'AE 603','ACCT 190','AJ 100A', 'AJ 107A', 'AJ 213A','AJ 229A','AJ 231A','AMT 190','ATH 23','BUS 190','CD 190','COS 290','WTRM 290','SPAN 8A', 'SPAN 8B', 'SPAN 8C', 'SPAN 8D', 'RE 190','MKTG 190']
keep = 'code,name,days,cap,act,teacher,date,partofday,type,site'.split(',')
oo = codecs.open('cache/section_history.json','w','utf-8')
# fetch enrollment stats for last few years
from semesters import code, sems_by_short_name, short_to_sis
from util import dept_from_name
raw = []
code.reverse()
sort = defaultdict(dict)
for s in sems_by_short_name.keys():
try:
sched1 = requests.get(f"http://gavilan.cc/schedule/{s}_sched_expanded.json").json()
sort[s] = defaultdict(dict)
for sect in sched1:
if sect['name'] in ignore2:
continue
sect_smaller = funcy.project(sect,keep)
sect_smaller['sem'] = short_to_sis(s)
if int(sect_smaller['cap'])==0 or int(sect_smaller['act'])==0:
sect_smaller['fill_pct'] = 100
else:
sect_smaller['fill_pct'] = round( (int(sect_smaller['act']) / int(sect_smaller['cap']))*100 )
d = dept_from_name(sect_smaller['code'])
if d in ignore:
continue
sect_smaller['dept'] = d
raw.append(sect_smaller)
if not d in sort[s]:
sort[s][d] = defaultdict(dict)
name = sect['code']
if not name in sort[s][d]:
sort[s][d][name] = []
sort[s][d][name].append(sect_smaller)
print(f"{s} OK.")
except Exception as e:
print(f"{s} not found. {e}")
#sems.pop(s)
oo.write(json.dumps(sort,indent=2))
df = pd.DataFrame(raw)
df_sorted = df.sort_values(['dept', 'code', 'type','site','partofday','fill_pct'])
df_sorted.to_csv('cache/section_history.csv')
class_counts = df.groupby(['sem', 'code']).size().reset_index(name='class_count')
print("Class counts by semester")
print(class_counts)
pivot_df = class_counts.pivot_table(index='code', columns='sem', values='class_count', aggfunc='sum', fill_value=0)
# Reset the index to move 'class_name' back to a column
pivot_df.reset_index(inplace=True)
print(pivot_df)
pivot_df.to_csv('cache/section_counts_history.csv')
# Group by semester and class type, and then count the number of occurrences of each class type
class_type_counts = df.groupby(['sem', 'code', 'type']).size().reset_index(name='class_type_count')
print("Class type by semester")
print(class_type_counts)
pivot_df2 = class_type_counts.pivot_table(index='code', columns=['sem','type'], values='class_type_count', aggfunc='sum', fill_value=0)
# Reset the index to move 'class_name' back to a column
pivot_df2.reset_index(inplace=True)
'''kmeans = try_clustering(pivot_df2.copy())
pivot_df2.insert(0, "Cluster", kmeans.labels_)
print(pivot_df2)
pivot_df2.to_csv('cache/section_and_mode_counts_history.csv')
# Group by teacher
class_teacher_counts = df.groupby(['sem', 'code', 'teacher']).size().reset_index(name='class_teacher_count')
print("Class teacher by semester")
print(class_teacher_counts)
# group by COURSE (ie: ENGL1A)
# For each historical WINTER, SPRING, SUMMER, FALL:
# number of sections offered, by mode, time of day, campus
# all teachers who taught it (and their qual to teach online)
# fill percentage for each section, then by mode, tod, campus
## moved: try_clustering now in search.py
# Add labels to the DataFrame
#df['clusters'] = labels
#print(df)
#df.to_csv('cache/section_and_mode_counts_history_clusters.csv')
return kmeans
'''
def unpublish_a_course(course_id=0):
if course_id == 0:
course_id = input('course id? ')
u = url + f"/api/v1/courses/{course_id}"
data = { 'course[event]':'claim' }
r = requests.put(u, data=data, headers=header)
print(r.text)
def course_log():
course_id = 19566
L = fetch(f"{url}/api/v1/audit/course/courses/{course_id}")
print(json.dumps(L,indent=2))
def fetch_rubric():
course = 21274
r_id = 35961
u = f"{url}/api/v1/courses/{course}/rubrics/{r_id}"
result = fetch(u)
#print(json.dumps(result,indent=2))
rows = []
for row in result['data']:
r = []
r.append(f"{row['description']} {row['long_description']} | ")
for item in row['ratings']:
r.append(f"{item['description']} {item['long_description']} {item['points']} points | ")
rows.append("" + "\n".join( r ) + "
\n")
output = f"{result['title']}
\n"
output += "" + ''.join( [ f"{x}
\n" for x in rows] ) + "
\n"
print(output)
if __name__ == "__main__":
options = { 1: ['Cross check schedule with ztc responses',make_ztc_list] ,
2: ['Add announcements to homepage', change_course_ann_homepage],
3: ['Unpublish a course', unpublish_a_course],
4: ['List the terms', getTerms],
5: ['Show courses in a term', getCoursesInTerm],
6: ['Save enrollments in a course', course_enrollment],
7: ['Simple list of course data, search by sis_id', course_search_by_sis],
8: ['Overview of a term', course_term_summary],
9: ['process the semester overview output (8)', course_term_summary_2],
10: ['Enroll orientation students (refresh local db first)', enroll_orientation_students],
11: ['Enroll ORIENTATION and STEM student shells after catching up database.', enroll_o_s_students],
12: ['Enroll stem students', enroll_stem_students_live],
13: ['Enroll ART students', enroll_art_students_live],
20: ['Get a course info by id',getCourses],
21: ['Reset course conclude date',update_course_conclude],
22: ['Create calendar events for orientation shells', create_calendar_event],
23: ['Remove all calendar events from a course', remove_all_course_events],
24: ['Build participation report for a term', build_term_participation_report],
25: ['Audit student orientation enrollments', audit_student_orientation_enrollments],
26: ['list all assignments', list_all_assignments],
27: ['Bulk unenroll from course', bulk_unenroll],
28: ['enrollment helper', enrollment_helper],
29: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid],
30: ['* Overview semester start dates',overview_start_dates],
31: ['Fine tune term dates and winter session', course_by_depts_terms],
32: ['Set summer start dates', set_custom_start_dates],
33: ['Cross list, ask for sections', ez_xlist],
34: ['Cross list a semester from argos export file', semester_cross_lister],
35: ['Cross list from manually created file', do_manual_xlist],
36: ['Cross list CWE courses', xlist_cwe],
40: ['Enroll GOTT Workshops', enroll_gott_workshops],
41: ['Create some sandbox courses', create_sandboxes],
42: ['Add teacher to many shells', teacher_to_many_shells],
44: ['List users who passed GOTT 1 / Bootcamp', get_gott1_passers],
45: ['List users who passed Plagiarism Module', get_plague_passers],
50: ['Fetch rubric scores and comments', fetch_rubric_scores],
51: ['Fetch announcements in a course', fetch_announcements],
52: ['show course audit log', course_log],
53: ['fetch a rubric', fetch_rubric],
# --- Course Nav / External Tools ---
70: ['ext tools',get_ext_tools],
71: ['set ext tools',set_ext_tools],
72: ['Get course ext tools', get_course_ext_tools],
73: ['Remove "new analytics" from all courses navs in a semester', remove_n_analytics],
74: ['Add course evals', add_evals],
75: ['Remove course evals all sections', remove_evals_all_sections],
76: ['Course nav: index + bulk hide/show (one semester)', clean_course_nav_setup_semester],
77: ['Nav: add GavConnect to list of course ids', add_gav_connect_prompt_list],
78: ['Nav: add Pathways to all courses in a term (default OFF)', add_pathways_all_courses_in_term],
79: ['Nav: ensure Pathways exists for a term (add if missing, OFF)', ensure_pathways_in_term],
# 24: ['Add course evals to whole semester',instructor_list_to_activate_evals],
# 21: ['Add announcements to homepage', change_course_ann_homepage],
#32: ['Cross-list classes', xlist ],
#33: ['Cross list helper', eslCrosslister],
##55: ['Check all courses & their sections in semester', all_semester_course_sanity_check],
#4: ['List students who passed quiz X', get_quiz_passers],
# TODO wanted: group shell for each GP (guided pathway) as a basic student services gateway....
#
}
print ('')
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
resp = int(sys.argv[1])
print("\n\nPerforming: %s\n\n" % options[resp][0])
else:
print ('')
for key in options:
print(str(key) + '.\t' + options[key][0])
print('')
resp = input('Choose: ')
# Call the function in the options dict
options[ int(resp)][1]()