import json, re, requests, codecs, sys, time, funcy, os, csv, random import pandas as pd from datetime import datetime, timedelta, timezone import pytz from util import print_table, int_or_zero, float_or_zero, dept_from_name, num_from_name from pipelines import fetch, fetch_stream, fetch_collapse, header, url from schedules import get_semester_schedule from localcache import course_quick_stats, get_courses_in_term_local, course_student_stats, all_sem_courses_teachers, full_reload from localcache2 import ( db, users_new_this_semester, users_new_this_2x_semester, course_from_id, user_ids_in_shell, student_count, teacher_list, course_sched_entry_from_id, get_orientation_shells, get_orientation_memberships, get_student_enrollment_summary, ) from collections import defaultdict from semesters import find_term #from dateutil import parser #from ast import Try, TryStar #from symbol import try_stmt #from pipelines import sems stem_course_id = '11015' # TODO ######### ######### GET FACTS FROM INDIVIDUAL COURSES ######### ######### # Gott 1 Bootcamp - report on who completed it. def get_gott1_passers(): course = '1561' min_passing = 85 passers_filename = 'cache/teacherdata/bootcamp_passed.csv' still_active_filename = 'cache/teacherdata/bootcamp_active.csv' #get_course_passers(course, min_passing, passers_filename, still_active_filename) # Plagiarism Module - report on who completed it. def get_plague_passers(): course = '11633' min_passing = 85 passers_filename = 'cache/teacherdata/plagiarism_passed.csv' still_active_filename = 'cache/teacherdata/plagiarism_active.csv' """ (passed, didnt) = get_course_passers(course, min_passing, passers_filename, still_active_filename) passed = set( [z[2] for z in passed] ) didnt = set( [z[2] for z in didnt] ) enrol = [ [ str(z) for z in list(course_enrollment(cr)) ] for cr in ['11677','11698'] ] print(enrol) enrol = set(funcy.cat(enrol)) everyone = passed.union(didnt,enrol) reportable = passed.intersection(enrol) outputfile = open('cache/plagcheck.txt','w').write( json.dumps( [ list(reportable), list(enrol), list(passed), list(didnt), list(everyone) ],indent=2)) return 1 #enrol = { cr: [ str(z) for z in list(course_enrollment(cr).keys()) ] for cr in ['11677','11698',] } # # [x['user_id'] for x in course_enrollment(cr)] outputfile = open('cache/plagcheck.txt','w').write( json.dumps( [ [z[2] for z in passed],[z[2] for z in didnt],enrol],indent=2)) return 1 passed = {} didnt = {} output_by_course = {} course_s = {} for p in passed: passed_by_deptr(p[2])] = p for p in didnt: didnt_d(p[2])] = p passed_s = [ str(k) for k in passed_d() ] didnt_s = [ str(k) for k in didnt_by_deptys() ] crossref = ['11677','11698',] outputfile = open('cache/plagcheck.txt','w') oo = { 'passed': passed_by_deptdidnt': didnt_by_dept for cr in crossref: student_int = course_enrollment(cr) student_by_dict{ str(k): v for k,v in student_int.items() } oo[cr] = student_by_dict output_by_course[cr] = { 'passed':{}, 'didnt':{}, 'missing':{} } course_s[cr] = set( [ str(k) for k in student_by_dict.keys() ]) for k,v in student_by_dict.items(): key_s = str(k) if key_s in passed_by_dict output_by_course[cr]['passed'][key_s] = passed_by_dicty_s] elif key_s in didnt_by_dict output_by_course[cr]['didnt'][key_s] = didnt_by_dicty_s] else: output_by_course[cr]['missing'][key_s] = v['user'] oo['final_output'] = output_by_course oo['passed_s'] = list(passed_s) oo['didnt_s'] = list(didnt_s) course_sd = {k: list(v) for k,v in course_s.items() } oo['course_s'] = course_sd outputfile.write(json.dumps(oo,indent=2)) # Who, in a class, passed? def get_course_passers(course, min_passing, passers_filename, still_active_filename): path = url + '/api/v1/courses/%s/enrollments' % str(course) tempout = open('cache/passers_temp.txt','w') enrl = fetch( path, 0) passed = [] didnt = [] for E in enrl: try: n = E['user']['name'] oo = E['user']['sis_user_id'] i = str(E['user_id']) r = E['role'] g = E['grades']['current_score'] l = E['last_activity_at'] p = float_or_zero(g) > min_passing print( "%s: a %s, grade of %s. Passed? %s. Last seen: %s" % (n,r,str(g),str(p),l) ) tempout.write(json.dumps(E['user']['name']) + "\n") tempout.write(json.dumps(E['grades'],indent=2) + "\n\n-----\n\n") if p: passed.append( [n, oo, i, r, g, l ] ) else: didnt.append( [n, oo, i, r, g, l ] ) except: pass columns = ['name', 'goo','canvas_id','role','grade','last_activity'] pp = pd.DataFrame(passed, columns=columns) pp.sort_values(by='last_activity',inplace=True) pp.to_csv(passers_filename, index=False) dd = pd.DataFrame(didnt, columns=columns) dd.sort_values(by='last_activity',inplace=True) dd.to_csv(still_active_filename, index=False) print("Saved output to \n - passed: %s\n - not passed: %s\n" % (passers_filename, still_active_filename)) return (passed,didnt) """ # Gott 1A """course = '2908' quiz = '15250' pass_grade = 0.90 path = url + '/api/v1/courses/%s/quizzes/%s/submissions' % (course,quiz) q_subs = fetch_collapse(path, 'quiz_submissions') for Q in q_subs: prct = float_or_zero(Q['score']) / float_or_zero( Q['quiz_points_possible'] ) print( 'Passed: %s\t Score: %s,\tUser: %s' % \ ( str(prct>0.9), str(int_or_zero(Q['score'])), Q['user_id'] ))""" # Who, in a class and a quiz, passed? def get_quiz_passers(): # Gott 1 Bootcamp course = '1561' path = url + '/api/v1/courses/%s/enrollments' % course enrl = fetch( path, 0) min_passing = 85 passed = [] didnt = [] for E in enrl: try: n = E['user']['name'] i = E['user_id'] r = E['role'] g = E['grades']['current_score'] l = E['last_activity_at'] p = float_or_zero(g) > min_passing print( "%s: a %s, grade of %s. Passed? %s. Last seen: %s" % (n,r,str(g),str(p),l) ) if p: passed.append( [n, i, r, g, l ] ) else: didnt.append( [n, i, r, g, l ] ) except: pass columns = ['name','canvas_id','role','grade','last_activity'] pp = pd.DataFrame(passed, columns=columns) pp.sort_values(by='last_activity',inplace=True) pp.to_csv('cache/teacherdata/bootcamp_passed.csv', index=False) dd = pd.DataFrame(didnt, columns=columns) dd.sort_values(by='last_activity',inplace=True) dd.to_csv('cache/teacherdata/bootcamp_active.csv', index=False) print("Saved output to ./teachers/bootcamp_*") # Gott 1A """course = '2908' quiz = '15250' pass_grade = 0.90 path = url + '/api/v1/courses/%s/quizzes/%s/submissions' % (course,quiz) q_subs = fetch_collapse(path, 'quiz_submissions') for Q in q_subs: prct = float_or_zero(Q['score']) / float_or_zero( Q['quiz_points_possible'] ) print( 'Passed: %s\t Score: %s,\tUser: %s' % \ ( str(prct>0.9), str(int_or_zero(Q['score'])), Q['user_id'] ))""" # Change courses to show 2 announcements def change_course_ann_homepage(id="10458"): u = url + "/api/v1/courses/%s/settings" % id data = { 'show_announcements_on_home_page':'true', \ 'home_page_announcement_limit':'2'} r = requests.put(u, data=data, headers=header) print(r.text) # All students enrolled in a class in the given semester. Simpler verson of below. Return SET of course_ids. def users_in_semester(): all_c = getCoursesInTerm('65',0,0) # fall 2020 TODO all_s = set() for c in all_c: for u in course_enrollment(c['id']).values(): if u['type'] != "StudentEnrollment": continue all_s.add(u['id']) return all_s # # All students (and faculty) in STEM (or any list of depts.. match the course_code). Return SET of canvas ids. def users_in_by_depts_live(depts=[], termid='181'): courses_by_by_dept = {} students_by_by_dept = {} all_c = getCoursesInTerm(termid,1,0) codecs.open('cache/courses_in_term_%s.json' % termid,'w','utf-8').write( json.dumps(all_c,indent=2) ) for c in all_c: #print(c['course_code']) for d in depts: #print("Dept: %s" % d) match = re.search('^(%s)' % d, c['course_code']) if match: if d == "STAT" and match.group() == "STAT": print("STAT") else: continue print("Getting enrollments for %s" % c['course_code']) if d in courses_by_by_dept: courses_by_by_dept[d].append(c) else: courses_by_by_dept[d] = [ c, ] for u in course_enrollment_with_faculty(c['id'],0).values(): #if u['type'] != "StudentEnrollment": continue if not (d in students_by_by_dept): students_by_by_dept[d] = set() students_by_by_dept[d].add(u['user_id']) continue print(students_by_by_dept) codecs.open('cache/students_by_by_dept_in_term_%s.json' % termid,'w','utf-8').write( str(students_by_by_dept) ) all_students = set() for dd in students_by_by_dept.values(): all_students.update(dd) codecs.open('cache/all_students_in_by_depts_in_term_%s.json' % termid,'w','utf-8').write( str(all_students) ) return all_students # Course enrollment, including teachers def course_enrollment_with_faculty(id='', verbose=0): if verbose: print("Getting enrollments for course id %s" % str(id)) if not id: id = input('Course id? ') t = url + '/api/v1/courses/%s/enrollments' % str(id) if verbose: print(t) emts = fetch(t,verbose) if verbose: print(emts) emt_by_id = {} for E in emts: if verbose: print(E) try: emt_by_id[E['user_id']] = E except Exception as exp: print("Skipped [%s] with this exception: %s" % (str(E), str(exp))) ff = codecs.open('cache/courses/%s.json' % str(id), 'w', 'utf-8') ff.write(json.dumps(emt_by_id, indent=2)) if verbose: print( " %i results" % len(emts) ) return emt_by_id # Course enrollment list, students only def course_enrollment(id='', verbose=0): if verbose: print("Getting enrollments for course id %s" % str(id)) if not id: id = input('Course id? ') t = url + '/api/v1/courses/%s/enrollments?role[]=StudentEnrollment' % str(id) if verbose: print(t) emts = fetch(t,verbose) if verbose: print(emts) emt_by_id = {} for E in emts: if verbose: print(E) try: emt_by_id[E['user_id']] = E except Exception as exp: print("Skipped [%s] with this exception: %s" % (str(E), str(exp))) ff = codecs.open('cache/courses/%s.json' % str(id), 'w', 'utf-8') ff.write(json.dumps(emt_by_id, indent=2)) if verbose: print( " %i results" % len(emts) ) return emt_by_id def askForTerms(): user_input = input("The term id? (separate multiples with commas) ") return user_input.split(",") """ names = [] if not term: s = url + '/api/v1/accounts/1/terms?workflow_state[]=all' s = fetch_collapse(s,"enrollment_terms",1) print(json.dumps(s,indent=2)) print("Terms: ") for u in s: print(str(u['id']) + "\t" + u['name']) #print json.dumps(results_by_dept,indent=2) term = input("The term id? ") """ # Return a list of term names and IDs. Also store in cache/courses/terms.txt def getTerms(printme=1, ask=1): s = url + '/api/v1/accounts/1/terms' #?workflow_state[]=all' terms = fetch_collapse(s,'enrollment_terms') ff = codecs.open('cache/courses/terms.txt', 'w', 'utf-8') # TODO unsafe overwrite #print(terms) ff.write(json.dumps(terms, indent=2)) ff.close() if printme: print("Terms: ") for u in terms: print(str(u['id']) + "\t" + u['name']) if ask: return input("The term id? ") return terms def getCourses(x=0): # a dict if not x: user_input = input("The Course IDs to get? (separate with spaces: ") courselist = list(map(int, user_input.split())) else: courselist = [x, ] for id in courselist: t = url + '/api/v1/courses/' + str(id) # + '?perpage=100' t = fetch(t,0) #print(t) return t def update_course_conclude(courseid="13590",enddate='2021-12-23T01:00Z'): (connection,cursor) = db() q = "SELECT * FROM courses AS c WHERE c.code LIKE '%FA21%' AND c.conclude='2021-08-29 07:00:00.000'" result = cursor.execute(q) for R in result: try: #print(R) print('doing course: %s' % R[6]) courseid = R[1] #d = getCourses(courseid) #print("\tconclude on: %s" % d['end_at']) data = { 'course[end_at]': enddate } t = url + '/api/v1/courses/' + str(courseid) r3 = requests.put(t, headers=header, params=data) #print(" " + r3.text) except Exception as e: print('****%s' % str(e)) # Relevant stuff trying to see if its even being used or not def course_term_summary_local(term="180",term_label="FA23"): O = "\t
  • Course: %s
    Status: %s
    Teacher: %s
    Number students: %s
  • \n" courses = get_courses_in_term_local(term) oo = codecs.open(f'cache/semester_summary_{term_label}.html','w','utf-8') oo.write('\n') # Fetch all courses in a given term def getCoursesInTerm(term=0,get_fresh=1,show=1,active=0): # a list if not term: term = getTerms(1,1) ff = 'cache/courses_in_term_%s.json' % str(term) if not get_fresh: if os.path.isfile(ff): return json.loads( codecs.open(ff,'r','utf-8').read() ) else: print(" -> couldn't find cached classes at: %s" % ff) # https://gavilan.instructure.com:443/api/v1/accounts/1/courses?published=true&enrollment_term_id=11 names = [] if active: active = "published=true&" else: active = "" t = f"{url}/api/v1/accounts/1/courses?{active}enrollment_term_id={term}" results = fetch(t,show) if show: for R in results: try: print(str(R['id']) + "\t" + R['name']) except Exception as e: print("Caused a problem: ") print(R) #print json.dumps(results,indent=2) info = [] for a in results: names.append(a['name']) info.append( [a['id'], a['name'], a['workflow_state'] ] ) if show: print_table(info) codecs.open(ff, 'w', 'utf-8').write(json.dumps(results,indent=2)) return results def getCoursesTermSearch(term=0,search='',v=0): term = term or input("term id? ") search = search or input("What to search for? ") s = url + '/api/v1/accounts/1/courses?enrollment_term_id=%s&search_term=%s' % ( str(term) , search ) if v: print(s) courses = fetch(s) if v: print(json.dumps(courses,indent=2)) return courses def courseLineSummary(c,sections={}): ss = "\t" crn = "\t" host = "" if 'crn' in c: crn = "crn: %s\t" % c['crn'] if c['id'] in sections: ss = "section: %s\t" % str(sections[c['id']]) if 'host' in c: host = "send to crn: %s\t" % c['host'] out = "%i\t%s%s%s%s" % (c['id'], ss ,crn, host, c['name']) return out def xlistLineSummary(c,sections={}): # can_id incoming_sec_id crn name new_sec = "missing" if 'partner' in c and 'sectionid' in c['partner']: new_sec = c['partner']['sectionid'] out = "can_id:%i\t new_sec_id:%s\t crn:%s\t %s" % (c['id'], new_sec ,c['crn'], c['name']) return out def numbers_in_common(L): # how many leading numbers do the strings in L share? for i in [0,1,2,3,4]: number = L[0][i] for s in L: #print("%s -> %s" % (number,s[i])) if s[i] != number: return i return 5 def combined_name(nic,L): # string with prettier section numbers combined if len(L) < 2: return L[0] return "/".join(L) # old method of trying to shorten section numbers if nic < 2: return "/".join(L) L_mod = [ x[nic:6] for x in L] L_mod[0] = L[0] new_name = "/".join(L_mod) #print(nic, " ", L_mod) return new_name def all_equal2(iterator): return len(set(iterator)) <= 1 def semester_cross_lister(): tt = find_term( input("term? (ex: fa25) ") ) if not tt or (not 'canvas_term_id' in tt) or (not 'code' in tt): print(f"Couldn't find term.") return term = tt['canvas_term_id'] sem = tt['code'] xlist_filename = f"cache/{sem}_crosslist.csv" checkfile = codecs.open('cache/xlist_check.html','w','utf-8') checkfile.write('\n') xlistfile = codecs.open(xlist_filename,'r','utf-8').readlines()[1:] by_section = {} by_group = defaultdict( list ) crn_to_canvasid = {} crn_to_canvasname = {} crn_to_canvascode = {} get_fresh = 1 c = getCoursesInTerm(term,get_fresh,0) for C in c: if 'sis_course_id' in C and C['sis_course_id']: crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id']) crn_to_canvasname[C['sis_course_id'][7:13]] = str(C['name']) crn_to_canvascode[C['sis_course_id'][7:13]] = str(C['course_code']) # "Term","PrtTerm","xlstGroup","Subject","CrseNo","EffectCrseTitle","CRN","Session","SecSchdType","AttnMeth","MtgSchdType","MtgType","MaxEnroll","TotalEnroll","SeatsAvail","Bldg","Room","Units","LecHrs","LabHrs","HrsPerDay","HrsPerWk","TotalHrs","Days","D/E","Wks","BegTime","EndTime","StartDate","EndDate","LastName","FirstName","PercentResp" for xc in xlistfile: parts = xc.split(r',') course = parts[2] + " " + parts[3] group = parts[1] crn = parts[5] if crn in crn_to_canvasid: cid = crn_to_canvasid[crn] oldname = crn_to_canvasname[crn] oldcode = crn_to_canvascode[crn] else: print("! Not seeing crn %s in canvas semester" % crn) cid = '' oldname = '' oldcode = '' if crn in by_section: continue by_section[crn] = [crn, course, group, cid, oldname, oldcode] by_group[group].append( [crn, course, group, cid, oldname, oldcode] ) for x in by_section.values(): print(x) href = '%s' % ('https://ilearn.gavilan.edu/courses/'+x[3]+'/settings#tab-details', x[3]) checkfile.write('' % (x[0],x[2],x[1],href) ) checkfile.write('
    %s%s%s%s
    ') print("GROUPS") for y in by_group.keys(): sects = [ z[0] for z in by_group[y] ] sects.sort() nic = numbers_in_common(sects) new_sec = combined_name(nic,sects) # same dept? depts_list = [ z[1].split(' ')[0] for z in by_group[y] ] nums_list = list(set([ z[1].split(' ')[1] for z in by_group[y] ])) if all_equal2(depts_list): depts = depts_list[0] nums_list.sort() nums = '/'.join(nums_list) else: depts = list(set(depts_list)) depts.sort() depts = '/'.join(depts ) nums = by_group[y][0][1].split(' ')[1] new_name = f"{depts}{nums} {' '.join(by_group[y][0][4].split(' ')[1:-1])} {new_sec}" #new_name = by_group[y][0][4][0:-5] + new_sec new_code = f"{depts}{nums} {sem.upper()} {new_sec}" #new_code = by_group[y][0][5][0:-5] + new_sec print(y) print("\t", sects) #print("\tThey share %i leading numbers" % nic) print("\t", by_group[y]) host_id = by_group[y][0][3] sections = by_group[y][1:] for target_section in sections: xlist_ii(target_section[3],host_id,new_name,new_code) #pass def do_manual_xlist(): infile = [ x.strip() for x in open('cache/sp25_manual_crosslist.txt','r').readlines() ] for L in infile: print(L) paraL,host = L.split(' -> ') para_list = paraL.split(',') print(host) print(para_list) xlist(host, para_list) def ez_xlist(): host = int(input('what is the host id? ')) parasite = input('what are parasite ids? (separate with commas) ') parasite = [ int(x) for x in parasite.split(',') ] xlist(host,parasite) # Crosslist given 2 ids, computing the new name and code def xlist(host_id, parasite_list): host_info = course_from_id(host_id) if not host_info: print(f"Couldn't find course id {host_id} in database. Do you need to update it?") return "" host_info['crn'] = host_info['sis_source_id'][7:] host_info['dept'] = dept_from_name( host_info['course_code'] ) host_info['num'] = num_from_name(host_info['course_code'] ) host_info['bare_name'] = ' '.join(host_info['name'].split(' ')[1:-1]) # name without course code or crn sem = host_info['course_code'].split(' ')[1] para_info_list = [ course_from_id(x) for x in parasite_list ] for p in para_info_list: if not p: print(f"Couldn't find course id for parasite in database. Do you need to update it?") return "" p['crn'] = p['sis_source_id'][7:] p['dept'] = dept_from_name(p['course_code'] ) p['num'] = num_from_name(p['course_code'] ) p['bare_name'] = ' '.join(p['name'].split(' ')[1:-1]) # name without course code or crn all = para_info_list.copy() all.append(host_info) # determine new name and code sects = [ z['crn'] for z in all ] sects.sort() nic = numbers_in_common(sects) new_sec = combined_name(nic,sects) # same dept? depts_list = [ z['dept'] for z in all ] nums_list = list(set([ z['num'] for z in all ])) if all_equal2(depts_list): depts = depts_list[0] nums_list.sort() nums = '/'.join(nums_list) else: depts = list(set(depts_list)) depts.sort() depts = '/'.join(depts ) nums = all[0]['num'] new_name = f"{depts}{nums} {all[0]['bare_name']} {new_sec}" #new_name = by_group[y][0][4][0:-5] + new_sec new_code = f"{depts}{nums} {sem.upper()} {new_sec}" #new_code = by_group[y][0][5][0:-5] + new_sec print(f"New name: {new_name}") print(f"New code: {new_code}") print(sects) for target_section in para_info_list: xlist_ii(target_section['id'],host_id,new_name,new_code) # Perform an actual cross-list, given 2 id numbers, new name and code def xlist_ii(parasite_id,host_id,new_name,new_code): print("Parasite id: ",parasite_id," Host id: ", host_id) print("New name: ", new_name) print("New code: ", new_code) xyz = 'y' #xyz = input("Perform cross list? Enter y for yes, n for no: ") if xyz != 'n': uu = url + '/api/v1/courses/%s/sections' % parasite_id c_sect = fetch(uu) #print(json.dumps(c_sect,indent=2)) if len(c_sect) > 1: print("* * * * Already Crosslisted!!") return if not c_sect: print("* * * * Already Crosslisted!!") return else: parasite_sxn_id = str(c_sect[0]['id']) print("Parasite section id: ", parasite_sxn_id) u = url + "/api/v1/sections/%s/crosslist/%s" % (parasite_sxn_id,host_id) print(u) res = requests.post(u, headers = header) print(res.text) u3 = url + "/api/v1/courses/%s" % host_id data = {'course[name]': new_name, 'course[course_code]': new_code} print(data) print(u3) r3 = requests.put(u3, headers=header, params=data) print(r3.text) print("\n\n") # Relevant stuff trying to see if its even being used or not # relies on schedule being in database def course_term_summary(): term = find_term( input("term? (ex: fa25) ") ) if not term or (not 'canvas_term_id' in term) or (not 'code' in term): print(f"Couldn't find term.") return term = term['canvas_term_id'] SEM = term['code'] print(f"Summary of {SEM}") get_fresh = 1 courses = getCoursesInTerm(term, get_fresh, 0) print(f"output to cache/term_summary_{term}.csv") outp = codecs.open(f'cache/term_summary_{term}.csv','w','utf-8') outp.write('id,name,view,type,state,sched_start,ilearn_start,sched_students,ilearn_students,num_teachers,teacher1,teacher2,teacher2\n') for c in courses: c_db = course_from_id(c['id']) try: ilearn_start = c_db['start_at'] s_db = course_sched_entry_from_id(c['id']) except: print(f"problem with this course: {c_db}") continue sched_start = '' sched_students = '' type = '' if (s_db): sched_start = s_db['start'] sched_students =s_db['act'] type = s_db['type'] #print(s_db) num_students = student_count(c['id']) tchr = teacher_list(c['id']) tt = ','.join([x[1] for x in tchr]) line = f"{c['id']},{c['course_code']},{c['default_view']},{type},{c['workflow_state']},{sched_start},{ilearn_start},{sched_students},{num_students},{len(tchr)},{tt}" print(line) outp.write(line + "\n") return tup = tuple("id course_code default_view workflow_state".split(" ")) smaller = [ funcy.project(x , tup) for x in courses ] #print(json.dumps(smaller, indent=2)) by_code = {} (connection,cursor) = db() (pub, not_pub) = funcy.split( lambda x: x['workflow_state'] == "available", smaller) for S in smaller: print(S) by_code[ S['course_code'] ] = str(S) + "\n" outp.write( str(S) + "\n" ) q = """SELECT c.id AS courseid, c.code, tt.name, c.state, COUNT(u.id) AS student_count FROM courses AS c JOIN enrollment AS e ON e.course_id=c.id JOIN users AS u ON u.id=e.user_id JOIN ( SELECT c.id AS courseid, u.id AS userid, c.code, u.name FROM courses AS c JOIN enrollment AS e ON e.course_id=c.id JOIN users AS u ON u.id=e.user_id WHERE c.canvasid=%s AND e."type"="TeacherEnrollment" ) AS tt ON c.id=tt.courseid WHERE c.canvasid=%s AND e."type"="StudentEnrollment" GROUP BY c.code ORDER BY c.state, c.code""" % (S['id'],S['id']) result = cursor.execute(q) for R in result: print(R) by_code[ S['course_code'] ] += str(R) + "\n" outp.write( str(R) + "\n\n" ) pages = fetch(url + "/api/v1/courses/%s/pages" % S['id']) by_code[ S['course_code'] ] += json.dumps(pages, indent=2) + "\n\n" modules = fetch(url + "/api/v1/courses/%s/modules" % S['id']) by_code[ S['course_code'] ] += json.dumps(modules, indent=2) + "\n\n" print() out2 = codecs.open('cache/summary2.txt','w', 'utf-8') for K in sorted(by_code.keys()): out2.write('\n------ ' + K + '\n' + by_code[K]) out2.flush() return #published = list(funcy.where( smaller, workflow_state="available" )) #notpub = list(filter( lambda x: x['workflow_state'] != "available", smaller)) notpub_ids = [ x['id'] for x in notpub ] #for ix in notpub_ids: # # print(course_quick_stats(ix)) outp.write(json.dumps(courses, indent=2)) outp2 = codecs.open('cache/term_summary_pub.txt','w','utf-8') outp2.write("PUBLISHED\n\n" + json.dumps(published, indent=2)) outp2.write("\n\n---------\nNOT PUBLISHED\n\n" + json.dumps(notpub, indent=2)) def course_term_summary_2(): lines = codecs.open('cache/term_summary.txt','r','utf-8').readlines() output = codecs.open('cache/term_summary.html','w','utf-8') for L in lines: try: L = L.strip() print(L) if re.search('unpublished',L): m = re.search(r"'id': (\d+),",L) m2 = re.search(r"'course_code': '(.+?)',",L) if m: ss = "
    Course: %s
    " % ("https://ilearn.gavilan.edu/courses/"+str(m.group(1)), m2.group(1)) output.write( ss ) print(ss+"\n") except Exception as e: print(e) # check number of students and publish state of all shells in a term ''' def all_semester_course_sanity_check(): term = "su25" target_start = "6-14" outputfile = f'cache/courses_checker_{term}.csv' t = 288 c = getCoursesInTerm(t,1,0) sched1 = requests.get(f"http://gavilan.cc/schedule/{term}_sched_expanded.json").json() sched = { x['crn']: x for x in sched1 } #codecs.open('cache/courses_in_term_{t}.json','w','utf-8').write(json.dumps(c,indent=2)) #output = codecs.open('cache/courses_w_sections.csv','w','utf-8') #output.write( ",".join(['what','id','parent_course_id','sis_course_id','name']) + "\n" ) output2 = codecs.open(outputfile,'w','utf-8') output2.write( ",".join(['id','sis_course_id','name','state','mode','startdate','students']) + "\n" ) htmlout = codecs.open(f'cache/courses_checker_{term}.html','w','utf-8') htmlout.write('\n') htmlout.write(f'\n') html_sections = [] i = 0 for course in c: try: u2 = url + '/api/v1/courses/%s?include[]=total_students' % str(course['id']) course['info'] = fetch(u2) # correlate to schedule crn = course['sis_course_id'][7:] ctype = '?' cstart = '?' ts = '?' if crn in sched: ctype = sched[crn]['type'] cstart = sched[crn]['start'] ts = sched[crn]['act'] teacher = sched[crn]['teacher'] info = [ 'course', course['id'], '', course['sis_course_id'], course['name'], course['workflow_state'], ts ] info = list(map(str,info)) info2 = [ course['id'], course['sis_course_id'], course['name'], course['workflow_state'], ctype, cstart, ts, teacher ] info2 = list(map(str,info2)) output2.write( ",".join(info2) + "\n" ) output2.flush() print(info2) #output.write( ",".join(info) + "\n" ) uu = f"https://ilearn.gavilan.edu/courses/{course['id']}" if course["workflow_state"]=='unpublished' and ctype=='online' and cstart==target_start: html_sections.append(f'\n') #uu = url + '/api/v1/courses/%s/sections' % str(course['id']) #course['sections'] = fetch(uu) #s_info = [ [ 'section', y['id'], y['course_id'], y['sis_course_id'], y['name'], y['total_students'] ] for y in course['sections'] ] #for row in s_info: # print(row) # output.write( ",".join( map(str,row) ) + "\n" ) #output.flush() i += 1 #if i % 5 == 0: # codecs.open('cache/courses_w_sections.json','w','utf-8').write(json.dumps(c,indent=2)) except Exception as e: print(f"error on {course}") print(f"{e}") #codecs.open('cache/courses_w_sections.json','w','utf-8').write(json.dumps(c,indent=2)) html_sections.sort() for h in html_sections: htmlout.write(h) htmlout.write('
    NameSIS IDStateModeStart Date# Stu
    {course["name"]}{course["sis_course_id"]}{course["workflow_state"]}{ctype}{cstart}{ts}{teacher}
    \n') print(f"wrote to {outputfile}") ''' def eslCrosslister(): fives = [] sevens = [] others = [] course_by_crn = {} sections = {} combos = [ [y.strip() for y in x.split(',') ] for x in open('cache/xcombos.txt','r').readlines() ] combo_checklist = [ 0 for i in range(len(combos)) ] #print("\n\nCombos:") #[ print("%s - %s" % (x[0],x[1])) for x in combos] #return courses = getCoursesTermSearch(62,"ESL",0) for C in courses: ma = re.search( r'(\d{5})', C['name']) if ma: #print("Found Section: %s from course %s" % (ma.group(1), C['name'])) C['crn'] = ma.group(1) course_by_crn[C['crn']] = C if C['name'].startswith("ESL5"): fives.append(C) elif C['name'].startswith("ESL7"): sevens.append(C) else: others.append(C) for S in sevens: uu = url + '/api/v1/courses/%i/sections' % S['id'] #print(uu) c_sect = fetch(uu) print(".",end='') #print(json.dumps(c_sect,indent=2)) if len(c_sect) > 1: print("* * * * Already Crosslisted!!") if c_sect: sections[ S['id'] ] = c_sect[0]['id'] S['sectionid'] = c_sect[0]['id'] if S['crn']: for i,co in enumerate(combos): if S['crn'] == co[0]: S['partner'] = co[1] combo_checklist[i] = 1 course_by_crn[co[1]]['partner'] = S elif S['crn'] == co[1]: S['partner'] = co[0] combo_checklist[i] = 1 course_by_crn[co[0]]['partner'] = S print("Others:") for F in sorted(others, key=lambda x: x['name']): print(courseLineSummary(F)) print("\n\nFive hundreds") for F in sorted(fives, key=lambda x: x['name']): print(courseLineSummary(F)) print("\n\nSeven hundreds") for F in sorted(sevens, key=lambda x: x['name']): print(courseLineSummary(F,sections)) print("\n\nMake a x-list: ") for F in sorted(fives, key=lambda x: x['name']): if 'partner' in F: print(xlistLineSummary(F,sections)) if 'partner' in F and 'sectionid' in F['partner']: if not input('ready to crosslist. Are you? Enter "q" to quit. ') == 'q': xlist( F['partner']['sectionid'], F['id'] ) else: break for i,c in enumerate(combo_checklist): if not c: print("Didn't catch: "+ str(combos[i])) def xlist_iii(parasite='', host=''): # section id , new course id host = host or input("ID number of the HOSTING COURSE? ") if not parasite: parasite = input("ID number of the SECTION to add to above? (or 'q' to quit) ") while parasite != 'q': #h_sections = fetch( url + "/api/v1/courses/%s/sections" % str(host)) #print(h_sections) p_sections = fetch( url + "/api/v1/courses/%s/sections" % str(parasite)) #print(p_sections) parasite_section = p_sections[0]['id'] # TODO need to get the section id from each course: # GET /api/v1/courses/:course_id/sections # POST /api/v1/sections/:id/crosslist/:new_course_id # SECTION ID (to move) NEW __COURSE__ ID u = url + "/api/v1/sections/%s/crosslist/%s" % (str(parasite_section),str(host)) print(u) res = requests.post(u, headers = header) print(res.text) parasite = input("ID number of the SECTION to add to above? ") def unenroll_student(courseid,enrolid): t = url + "/api/v1/courses/%s/enrollments/%s" % ( str(courseid), str(enrolid) ) data = {"task": "delete" } r4 = requests.delete(t, headers=header, params=data) print(data) #def get_enrollments(courseid): # t = url + "/api/v1/courses/%s/enrollments?type=StudentEnrollment" % courseid # return fetch(t,1) def enroll_id_list_to_shell(id_list, shell_id, v=0): # id list has pairs, [id,name] id_list = set([i[0] for i in id_list]) existing = course_enrollment(shell_id) # by user_id existing_ids = set( [ x['user_id'] for x in existing.values() ]) if v: print("To Enroll: %s" % str(id_list)) if v: print(r"\n\Already Enrolled: %s" % str(existing_ids)) enroll_us = id_list.difference(existing_ids) if v: print("\n\nTO ENROLL %s" % str(enroll_us)) (connection,cursor) = db() for j in enroll_us: try: q = "SELECT name,id FROM canvas.users u WHERE u.id=%s" % j cursor.execute(q) s = cursor.fetchall() if s: s = s[0] print("Enrolling: %s" % s[0]) t = url + '/api/v1/courses/%s/enrollments' % shell_id data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment', 'enrollment[enrollment_state]': 'active' } r3 = requests.post(t, headers=header, params=data) #print(r3.text) time.sleep(0.600) except Exception as e: print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e))) # multiple semesters def enroll_stem_students_live(): semesters = [289] for S in semesters: enroll_stem_students_live_semester(S) def enroll_stem_students_live_semester(the_term, do_removes=0): import localcache2 depts = "MATH BIO CHEM CSIS PHYS PSCI GEOG ASTR ECOL ENVS ENGR STAT".split(" ") users_to_enroll = users_in_by_depts_live(depts, the_term) # term id stem_enrollments = course_enrollment_with_faculty(stem_course_id) # by user_id users_in_stem_shell = set( [ x['user_id'] for x in stem_enrollments.values() ]) print("ALL STEM STUDENTS %s" % str(users_to_enroll)) print("\n\nALREADY IN STEM SHELL %s" % str(users_in_stem_shell)) enroll_us = users_to_enroll.difference(users_in_stem_shell) remove_us = users_in_stem_shell.difference(users_to_enroll) print("\n\nTO ENROLL %s" % str(enroll_us)) (connection,cursor) = localcache2.db() #xyz = input('enter to continue') eee = 0 uuu = 0 if do_removes: print("\n\nTO REMOVE %s" % str(remove_us)) for j in remove_us: try: q = "SELECT name,id FROM canvas.users WHERE id=%s" % j cursor.execute(q) s = cursor.fetchall() if s: s = s[0] print("Removing: %s" % s[0]) r1 = unenroll_student(str(stem_course_id), stem_enrollments[j]['id']) print(r1) uuu += 1 time.sleep(0.600) except Exception as e: print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e))) for j in enroll_us: try: q = "SELECT name,id FROM canvas.users WHERE id=%s" % j cursor.execute(q) s = cursor.fetchall() if s: s = s[0] print("Enrolling: %s" % s[0]) enrollment = { } #print(s) t = url + '/api/v1/courses/%s/enrollments' % stem_course_id data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment', 'enrollment[enrollment_state]': 'active' } #print(data) #if input('enter to enroll %s or q to quit: ' % s[0]) == 'q': #break r3 = requests.post(t, headers=header, params=data) print(data) eee += 0 time.sleep(0.600) except Exception as e: print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e))) #print(r3.text) print("\n\nTO ENROLL %s" % str(enroll_us)) #print("\n\nTO REMOVE %s" % str(remove_us)) return (eee,uuu) ########################### def enroll_bulk_students_bydept(course_id, depts, the_term="172", cautious=1): # a string, a list of strings users_to_enroll = users_in_by_depts_live(depts, the_term) # term id targeted_enrollments = course_enrollment(course_id) # by user_id.. (live, uses api) current_enrollments = set( [ x['user_id'] for x in targeted_enrollments.values() ]) print("ALL TARGET STUDENTS %s" % str(users_to_enroll)) print("\nALREADY IN SHELL %s" % str(current_enrollments)) enroll_us = users_to_enroll.difference(current_enrollments) remove_us = current_enrollments.difference(users_to_enroll) print("\n\nTO ENROLL %s" % str(enroll_us)) xyz = input('enter to continue') print("\n\nTO REMOVE %s" % str(remove_us)) (connection,cursor) = db() for j in remove_us: try: q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j cursor.execute(q) s = cursor.fetchall() if s: s = s[0] print("Removing: %s" % s[0]) ## TODO not done here # r1 = unenroll_student(str(course_id), stem_enrollments[j]['id']) #print(r1) time.sleep(0.600) except Exception as e: print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e))) for j in enroll_us: try: q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j cursor.execute(q) s = cursor.fetchall() if s: s = s[0] print("Enrolling: %s" % s[0]) enrollment = { } #print(s) t = url + '/api/v1/courses/%s/enrollments' % course_id data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment', 'enrollment[enrollment_state]': 'active' } if cautious: print(t) print(data) prompt = input('enter to enroll %s, k to go ahead with everyone, or q to quit: ' % s[0]) if prompt == 'q': break elif prompt == 'k': cautious = 0 r3 = requests.post(t, headers=header, params=data) if cautious: print(data) time.sleep(0.600) except Exception as e: print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e))) #print(r3.text) def enroll_gott_workshops(): # stupid gav tls broken r = requests.get("https://www.gavilan.edu/staff/tlc/signups.php") text = r.text # Regex to extract the JSON object match = re.search(r"var\s+signups\s*=\s*(\[\{.*?\}\]);", text, re.DOTALL) if match: json_str = match.group(1) # Extract the JSON string try: signups = json.loads(json_str) # Convert to Python list of dicts # Normalize NBSP and spaces in key fields to make title/date matching robust def _norm(v): try: return str(v).replace('\xa0',' ').strip() except Exception: return v for s in signups: if isinstance(s, dict): if 'training' in s and s['training'] is not None: s['training'] = _norm(s['training']) if 'date_rsvp' in s and s['date_rsvp'] is not None: s['date_rsvp'] = _norm(s['date_rsvp']) #print(json.dumps(signups,indent=2)) except json.JSONDecodeError as e: print("Error decoding JSON:", e) return else: print("JSON object not found") return #signups = json.loads(r.text) #signups = json.loads(codecs.open('cache/signups.json','r','utf-8').read()) # update w/ users.py #1 all_staff = json.loads(codecs.open('cache/ilearn_staff.json','r','utf-8').read()) by_email = { x['email'].lower():x for x in all_staff } #print(by_email.keys()) workshop_ids = [ #'GOTT 2: Intro to Async Online Teaching and Learning2023-07-09 17:00:00': 17992, #'GOTT 4: Assessment in Digital Learning2023-07-09 17:00:00': 17995, #'Restricted to STEM faculty. Humanizing (STEM) Online Learning 2023-06-18 17:00:00': 17996, #'GOTT 6: Online Live Teaching and Learning2023-06-11 17:00:00': 17986, #'GOTT 5: Essentials of Blended Learning2023-06-25 17:00:00': 17987, #'GOTT 5: Essentials of Blended Learning (HyFlex)2023-06-25 17:00:00': 17987, #'GOTT 1: Intro to Teaching Online with Canvas2023-05-29 17:00:00': 17985, #'GOTT 1: Intro to Teaching Online with Canvas2023-08-20 17:00:00': 17994 #'GOTT 1: Intro to Online Teaching2024-01-02 16:00:00': 19278, #'GOTT 2: Intro to Asynchronous Teaching and Learning2024-01-02 16:00:00': 19222, #'GOTT 5: Essentials of Blended Learning2024-01-02 16:00:00': 19223, #'GOTT 6: Intro to Live Online Teaching and Learning2024-01-14 16:00:00': 19224, #'5/28-6/9 GOTT 1: Intro to Teaching Online 2024-05-28 12:00:00': 20567, #'5/28-6/21 GOTT 2: Introduction to Asynchronous Teaching and Design2024-05-28 12:00:00': 20575, #'GOTT 4: Assessment in Digital Learning2024-06-02 17:00:00': 20600, # 6/2 #'6/10-6/23 GOTT 5: Essentials of Blended Learning, Hyflex2024-06-10 12:00:00': 20568, #'6/17-6/30 GOTT 6 Introduction to Live Online Teaching and Learning2024-06-17 12:00:00': 20569, #'GOTT 1 Intro to Teaching Online AUG242024-07-29 12:00:00': 20603, # 7/29 #['2025-01-01 16:00:00 GOTT 1: Intro to Teaching Online with Canvas', 21770, 'enroll_gott1.txt'], #['2025-01-01 16:00:00 GOTT 2: Introduction to Asynchronous Teaching and Design', 21772, 'enroll_gott2.txt'] # date, title, shell_id #['2025-02-23 16:00:00', 'GOTT 6: Intro to Synchronous Teaching (Sync/Hyflex)', 21835], #['2025-03-14 17:00:00', 'GOTT 5: The Essentials of Blended Learning (Hybrid) ', '21886'], #['2025-02-23 16:00:00', 'GOTT 1: Intro to Teaching Online (2 week, async)', 21874] #['2025-05-26 17:00:00', 'GOTT 2: Introduction to Asynchronous Teaching and Learning', 23015], #['2025-06-01 17:00:00', 'GOTT 1: Intro to Teaching Online', 23083], #['2025-06-01 17:00:00', 'GOTT 4: Assessments in Digital Learning', 21898], #['2025-08-11 13:00:00', 'GOTT 1: Introduction to Online Teaching with Canvas', 23232], #['2025-09-01 17:00:00', r'GOTT 1: Intro to Online Teaching (Canvas, Accessibility and RSI) ', 23270], #['2025-09-14 17:00:00', r'GOTT 2: Intro to Asynchronous Online Teaching and Learning', 23290], ['2025-09-28 17:00:00', r'GOTT 6: Foundation of Teaching Live Online/Hyflex', 23311], ] #print(json.dumps(signups,indent=4)) #print(json.dumps(by_email,indent=4)) subs = {'csalvin@gavilan.edu':'christinasalvin@gmail.com', 'karenjeansutton@gmail.com': 'ksutton@gavilan.edu', 'elisepeeren@gmail.com': 'epeeren@gavilan.edu', 'kjoyenderle@gmail.com': 'kenderle@gavilan.edu', 'flozana@gmail.com': 'flozano@gavilan.edu', 'fyarahmadi2191@gmail.com': 'fyarahmadi@gavilan.edu', 'jacquelinejeancollins@yahoo.com': 'jcollins@gavilan.edu', 'bt@gavilan.edu': 'btagg@gavilan.edu', 'tagg.brian@yahoo.com': 'btagg@gavilan.edu', 'tmiller.realestate@gmail.com': 'tmiller@gavilan.edu', 'gemayo70@yahoo.com': 'pclaros@gavilan.edu', 'csalvin@gmail.com': 'csalvin@gavilan.edu', 'efalvey@aol.com': 'efalvey@gavilan.edu', 'lorrmay36@mac.com': 'llevy@gavilan.edu', 'gkalu1@gmail.com': 'gkalu@gavilan.edu', 'rpotter@gav.edu': 'rpotter@gavilan.edu', 'ally162@qq.com': 'aao@gavilan.edu', 'davidamancio791@gmail.com': 'damancio@gavilan.edu', 'carissaamunoz83@gmail.com': 'amunoz@gavilan.edu', 'jasonwcpa@yahoo.com': 'jwolowitz@gavilan.edu', 'fam.grzan@charter.net': 'rgrzan@gavilan.edu', 'carissaadangelo@yahoo.com': 'cmunoz@gavilan.edu', } for each_workshop in workshop_ids: #if wkshp not in workshop_ids: # print(f"skipping {wkshp}") # continue wkshp_date, wkshp_title, wkshp_shell_id = each_workshop # local normalizer consistent with signup cleaning def _norm(v): try: return str(v).replace('\xa0',' ').strip() except Exception: return v to_enroll = [] #from_file = [ L.strip().split(' - ') for L in codecs.open(f'cache/{student_list}', 'r', 'utf-8').readlines() ] #print(from_file) for s in signups: if _norm(wkshp_date) == _norm(s.get('date_rsvp')) and _norm(wkshp_title) == _norm(s.get('training')): e = s['email'].lower() if e in subs: e = subs[e] print( f"{wkshp_title} {e} {s['name']}" ) if e in by_email: user = by_email[e] #print(f"\t{user['name']} {e} {user['login_id']}") to_enroll.append([user['id'],user['name']]) else: #print("** ** NOT FOUND") pass print(f"Workshop: {wkshp_date} {wkshp_title} \n\tEnrolling: {', '.join(i[1] for i in to_enroll)}") enroll_id_list_to_shell(to_enroll, wkshp_shell_id) def enroll_gnumber_list_to_courseid(): infile = codecs.open('cache/gottenrollments.txt','r','utf-8').readlines() courseid = infile[0].strip() glist = [ x.strip().split(',')[0] for x in infile[1:] ] from localcache2 import user_from_goo idlist = [user_from_goo(x)['id'] for x in glist ] namelist = [user_from_goo(x)['name'] for x in glist ] print(courseid) print(glist) print(idlist) for i,id in enumerate(idlist): try: print(f"Enrolling: {id}, {namelist[i]}") t = f"{url}/api/v1/courses/{courseid}/enrollments" data = { 'enrollment[user_id]': id, 'enrollment[type]':'StudentEnrollment', 'enrollment[enrollment_state]': 'active' } r3 = requests.post(t, headers=header, params=data) print(r3.text) time.sleep(0.600) except Exception as e: print(f"Something went wrong with id {id}, course {courseid}, user {namelist[i]}") def enroll_art_students_live(): depts = "THEA ART DM MUS MCTV".split(" ") course_id = "13717" enroll_bulk_students_bydept(course_id,depts) print("done.") def enroll_orientation_students(): # For testing purposes DO_IT = 1 import localcache2 ori_shell_id = "20862" # 2025 "19094" # 2024 # "" # 2023 orientation shell 15924 # 2022: "9768" print("Getting users in orientation shell") #users_in_ori_shell = set( \ # [ str(x['user_id']) for x in course_enrollment(ori_shell_id).values() ]) # api fetch users_in_ori_shell = list(user_ids_in_shell(ori_shell_id)) # single semester # users_to_enroll = users_new_this_semester(the_semester) ### ##### USES LOCAL DB # double semester (SU + FA) users_to_enroll = users_new_this_2x_semester("202550", "202570") ##### USES LOCAL DB #print("ALL ORIENTATION STUDENTS %s" % str(users_to_enroll)) #print("\n\nALREADY IN ORI SHELL %s" % str(users_in_ori_shell)) enroll_us = users_to_enroll.difference(users_in_ori_shell) #print("\n\nTO ENROLL %s\n" % str(enroll_us)) print(f"{len(enroll_us)} new students to enroll in Orientation shell." ) eee = 0 uuu = 0 (connection,cursor) = localcache2.db() for j in enroll_us: s = "" try: q = "SELECT name,id FROM canvas.users WHERE id=%s" % j cursor.execute(q) s = cursor.fetchall() if s: s = s[0] print(" + Enrolling: %s" % s[0]) t = url + '/api/v1/courses/%s/enrollments' % ori_shell_id data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment', 'enrollment[enrollment_state]': 'active' } #print(t) #print(data) if DO_IT: r3 = requests.post(t, headers=header, params=data) eee += 1 #print(r3.text) time.sleep(0.250) except Exception as e: print(" - Something went wrong with id %s, %s, %s" % (j, str(s), str(e))) # return (eee,uuu) def enroll_o_s_students(): #full_reload() (es,us) = enroll_stem_students_live() (eo, uo) = enroll_orientation_students() print("Enrolled %i and unenrolled %i students in STEM shell" % (es,us)) print("Enrolled %i students in Orientation shell" % eo) def make_ztc_list(sem='sp20'): sched = json.loads(open('output/semesters/2020spring/sp20_sched.json','r').read()) responses = open('cache/ztc_responses_sp20.csv','r').readlines()[1:] result = open('cache/ztc_crossref.csv','w') result.write('Course,Section,Name,Teacher,ZTC teacher\n') ztc_by_dept = {} for R in responses: R = re.sub(',Yes','',R) R = re.sub(r'\s\s+',',',R) parts = R.split(r',') #name courselist yes #print(parts[1]) name = parts[0] for C in parts[1:] : C = C.strip() #print(C) if C in ztc_by_dept: ztc_by_dept[C] += ', ' + parts[0] else: ztc_by_dept[C] = parts[0] print(ztc_by_dept) for CO in sched: #if re.match(r'CWE',CO['code']): #print(CO) if CO['code'] in ztc_by_dept: print(('Possible match, ' + CO['code'] + ' ' + ztc_by_dept[CO['code']] + ' is ztc, this section taught by: ' + CO['teacher'] )) result.write( ','.join( [CO['code'] ,CO['crn'] , CO['name'] , CO['teacher'] , ztc_by_dept[CO['code']] ]) + "\n" ) def course_search_by_sis(): term = 65 all_courses = getCoursesInTerm(term) all = [] for course in all_courses: #u = "/api/v1/accounts/1/courses/%s" % course_id #i = fetch( url + u) all.append([ course['name'], course['sis_course_id'] ]) print_table(all) # print(json.dumps(x, indent=2)) # run overview_start_dates to get most recent info def set_custom_start_dates(): from datetime import datetime term = find_term( input("term? (ex: fa25) ") ) if not term or (not 'canvas_term_id' in term) or (not 'code' in term): print(f"Couldn't find term. Try updating the saved terms list.") return TERM = term['canvas_term_id'] SEM = term['code'] term_start_month = int(term['begin'].split('/')[0]) term_start_day = int(term['begin'].split('/')[1]) term_start_year = '20' + term['code'][2:4] print(f"term begins on {term_start_month}/{term_start_day}") output_path = f"cache/overview_semester_shells_annotated{SEM}.csv" input_path = f"cache/overview_semester_shells_{SEM}.csv" if not os.path.exists(input_path): print(f"file does not exist: {input_path}") print("Run overview_start_dates first") return make_changes = 1 do_all = 0 get_fresh = 0 # just do certain ids in cache/changeme.txt limit_to_specific_ids = 0 limit_to = [x.strip() for x in open('cache/changeme.txt','r').readlines()] def adjust_shell_startdate(row): # Placeholder stub pass def parse_date(date_str): if not date_str or date_str.lower() == 'none': return None try: return datetime.fromisoformat(date_str.replace("Z", "").replace("T", " ")) except ValueError: return None with open(input_path, newline='', encoding='utf-8') as infile, \ open(output_path, "w", newline='', encoding='utf-8') as outfile: reader = csv.DictReader(infile) fieldnames = reader.fieldnames + [ "ignore","is_early_start", "is_late_start", "shell_custom_start", "shell_warn_crosslist_sections" ] writer = csv.DictWriter(outfile, fieldnames=fieldnames) writer.writeheader() for row in reader: if int(row["shell_numsections"]) == 0: continue sched_start = parse_date(row["sched_start"]) shell_start = parse_date(row["shell_start"]) shortname = row["shell_shortname"] num_sections = int(row["shell_numsections"]) # Initialize new columns row["ignore"] = "" row["is_early_start"] = "" row["is_late_start"] = "" row["shell_custom_start"] = "" row["shell_warn_crosslist_sections"] = "" # check for cops program department = shortname.split()[0].rstrip("0123456789") # → "JLE" if department in ("JLE", "JFT"): row["ignore"] = department # Early/late start check if sched_start: sched_mmdd = (sched_start.month, sched_start.day) term_mmdd = (term_start_month, term_start_day) if sched_mmdd < term_mmdd: row["is_early_start"] = sched_start.date().isoformat() elif sched_mmdd > term_mmdd: row["is_late_start"] = sched_start.date().isoformat() # shell_start override if shell_start: row["shell_custom_start"] = shell_start.date().isoformat() else: if row["is_early_start"] or row["is_late_start"]: adjust_shell_startdate(row) # Crosslist check if '/' in shortname: parts = shortname.split() section_part = parts[-1] section_count = len(section_part.split('/')) if section_count != num_sections: row["shell_warn_crosslist_sections"] = section_part writer.writerow(row) return ''' # Do we adjust the start date? Only if it doesn't match term if d_start.month == term_start_month and d_start.day == term_start_day: print(" Ignoring, term start date" ) continue else: print(" Adjust course start day?") if make_changes: if do_all != 'a': do_all = input(' -> adjust? [enter] for yes, [a] to do all remaining. [n] to quit. >') if do_all == 'n': exit() if do_all == '' or do_all == 'a': data = {'course[start_at]':d_start.isoformat(), 'course[restrict_student_future_view]': True, 'course[restrict_enrollments_to_course_dates]':True } u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{this_id}" r3 = requests.put(u2, headers=header, params=data) print(" updated.. OK") ''' def overview_start_dates(): term = find_term( input("term? (ex: fa25) ") ) if not term or (not 'canvas_term_id' in term) or (not 'code' in term): print(f"Couldn't find term.") return TERM = term['canvas_term_id'] SEM = term['code'] output = codecs.open(f"cache/overview_semester_shells_{SEM}.csv","w","utf-8") get_fresh = 0 if not get_fresh: gf = input('Fetch new list of semester courses? (y/n) ') if gf=='y': get_fresh = 1 # get list of online course shells c = getCoursesInTerm(TERM,get_fresh,0) # dict to match section numbers between shells and schedule crn_to_canvasid = {} for C in c: if 'sis_course_id' in C and C['sis_course_id']: #print( f"{C['name']} -> {C['sis_course_id'][7:13]}" ) crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id']) else: print( f"---NO CRN IN: {C['name']} -> {C}" ) header = f"id,shell_shortname,type,enrolled,max,sched_start,shell_start,shell_end,shell_restrict_view_dates,shell_restrict_view_dates,shell_state,shell_numstudents,shell_numsections" output.write(header + "\n") print("\n\n" + header) # get course info from schedule s = requests.get(f"https://gavilan.cc/schedule/{SEM}_sched_expanded.json").json() for S in s: # get dates start = re.sub( r'\-','/', S['start']) + '/20' + SEM[2:4] d_start = datetime.strptime(start,"%m/%d/%Y") # try to find online shell matching this schedule entry try: this_id = crn_to_canvasid[S['crn']] except Exception as e: print(f"DIDN'T FIND CRN - {start} {d_start} - {S['code']} {S['crn']} {S['name']}" ) continue # get more canvas course shell info uu = f"{url}/api/v1/courses/{this_id}" this_course = fetch(uu) shell_start = this_course['start_at'] shell_end = this_course['end_at'] shell_restrict_view_dates = '?' if 'access_restricted_by_date' in this_course: shell_restrict_view_dates = this_course['access_restricted_by_date'] shell_shortname = this_course['course_code'] shell_state = this_course['workflow_state'] # get user count ss = f"{url}/api/v1/courses/{this_id}/users" enrollments = fetch(ss, params={"enrollment_type[]":"student"}) shell_numstudents = len(enrollments) # get teachers s2 = f"{url}/api/v1/courses/{this_id}/users" teachers = fetch(s2, params={"enrollment_type[]":"teacher"}) shell_teachers = [(x['id'],x['name']) for x in teachers] # cross-listed? sec = f"{url}/api/v1/courses/{this_id}/sections" sections = fetch(sec, params={"include[]":"total_students"}) shell_numsections = len(sections) content = f"{this_id},{shell_shortname},{S['type']},{S['act']},{S['cap']},{d_start},{shell_start},{shell_end},{shell_restrict_view_dates},{shell_restrict_view_dates},{shell_state},{shell_numstudents},{shell_numsections},{S['teacher']},{shell_teachers}" output.write(content + "\n") print(content) def course_by_depts_terms(section=0): get_fresh = 1 TERM = 287 WI_TERM = 286 DOING_WINTER_MOVES = 1 SEM = "sp25" make_changes = 1 do_all = 0 winter_start_day = 2 aviation_start_day = 9 nursing_start_day = 0 spring_start_day = 27 # get list of online course shells if get_fresh: print(f"Getting list of courses in {SEM}") c = getCoursesInTerm(TERM,get_fresh,0) codecs.open(f'cache/courses_in_term_{TERM}.json','w','utf-8').write(json.dumps(c,indent=2)) else: c = json.loads( codecs.open(f'cache/courses_in_term_{TERM}.json','r','utf-8').read() ) # dict to match section numbers between shells and schedule crn_to_canvasid = {} for C in c: if 'sis_course_id' in C and C['sis_course_id']: print( f"{C['name']} -> {C['sis_course_id'][7:13]}" ) crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id']) else: print( f"---NO CRN IN: {C['name']} -> {C}" ) # get course info from schedule s = requests.get(f"http://gavilan.cc/schedule/{SEM}_sched_expanded.json").json() for S in s: # get dates start = re.sub( r'\-','/', S['start']) + '/20' + SEM[2:4] d_start = datetime.strptime(start,"%m/%d/%Y") # try to find online shell matching this schedule entry try: this_id = crn_to_canvasid[S['crn']] except Exception as e: print(f"DIDN'T FIND CRN - {start} {d_start} - {S['code']} {S['crn']} {S['name']}" ) continue print(f" - {start} {d_start} - id: {this_id} - {S['code']} {S['crn']} {S['name']}" ) if 1: #if d_start.month < 5 or d_start.month > 7: # print(f" Ignoring {d_start}, starting too far away...") # continue if d_start.month == 1 and d_start.day == aviation_start_day: print("- Aviation ", start, d_start, " - ", S['code'], " ", S['crn'] ) continue #if d_start.month == 1 and d_start.day == nursing_start_day: # print("- Nursing ", start, d_start, " - ", S['code'], " ", S['crn'] ) # continue if d_start.month == 1 and d_start.day == spring_start_day: print(" Ignoring, term start date" ) continue else: print(" Adjust course start day?") if make_changes: if do_all != 'a': do_all = input(' -> adjust? [enter] for yes, [a] to do all remaining. [n] to quit. >') if do_all == 'n': exit() if do_all == '' or do_all == 'a': data = {'course[start_at]':d_start.isoformat(), 'course[restrict_student_future_view]': True, 'course[restrict_enrollments_to_course_dates]':True } u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{this_id}" r3 = requests.put(u2, headers=header, params=data) print(" updated.. OK") if DOING_WINTER_MOVES: if d_start.month == 1 and d_start.day == winter_start_day: print("+ winter session: ", d_start, " - ", S['code']) data = {'course[term_id]':WI_TERM} u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s" % crn_to_canvasid[S['crn']] if make_changes: r3 = requests.put(u2, headers=header, params=data) print(" updated.. OK") #print(r3.text) return def xlist_cwe(): # cwe190 and wtrm290 get put into 1 shell # cwe192 get put into another shell this_sem_190_id = 22890 # they get 190s and 290s this_sem_192_id = 22894 # they get 192s # this_sem_term = 289 term = find_term( input("term? (ex: fa25) ") ) if not term or (not 'canvas_term_id' in term) or (not 'code' in term): print(f"Couldn't find term.") return this_sem_term = term['canvas_term_id'] SEM = term['code'] get_fresh = 1 sem_courses = getCoursesInTerm(this_sem_term, get_fresh, 0) for search_string in ['CWE190','WTRM290']: for R in sem_courses: try: if re.search(search_string, R['name']) and str(R['id']) != str(this_sem_190_id): # use the course to get the section id print ( R['name'] ) u = url + '/api/v1/courses/%i/sections' % R['id'] for S in fetch(u): if (S['id']): myanswer = input( "-> Should I crosslist: %i\t%s\tsection id: %i (y/n) " % (R['id'],R['name'],S['id'] )) if myanswer=='y': # cross list v = url + "/api/v1/sections/%i/crosslist/%i" % (S['id'],this_sem_190_id) res = requests.post(v, headers = header) print( json.dumps( json.loads(res.text), indent=2) ) print() except Exception as e: print( "Caused a problem: " + str(e) + "\n" + str(R) + "\n" ) ## Now the 192s search_string = "CWE192" for R in sem_courses: try: if re.search(search_string, R['name']) and str(R['id']) != str(this_sem_192_id): # use the course to get the section id print ( R['name'] ) u = url + '/api/v1/courses/%i/sections' % R['id'] for S in fetch(u): if (S['id']): myanswer = input( "-> Should I crosslist: %i\t%s\tsection id: %i (y/n) " % (R['id'],R['name'],S['id'] )) if myanswer=='y': # cross list v = url + "/api/v1/sections/%i/crosslist/%i" % (S['id'],this_sem_192_id) res = requests.post(v, headers = header) print( json.dumps( json.loads(res.text), indent=2) ) print() except Exception as e: print( "Caused a problem: " + str(e) + "\n" + str(R) + "\n" ) def change_term(courseid,termid): data = { 'course[term_id]': str(termid), 'course[restrict_enrollments_to_course_dates]':'false' } t = f'{url}/api/v1/courses/{courseid}' r3 = requests.put(t, headers=header, params=data) result = json.loads(r3.text) json.dumps(result,indent=2) def teacher_to_many_shells(): for id in [21842,21096,20952,21561,21219,21274,20875,21018,21093,21719,21460,21102,21506,21169,21538]: #print(id) #continue term_id_misc = 9 term_id_sp25 = 287 change_term(id, term_id_misc) # Add teacher u3 = url + f"/api/v1/courses/{id}/enrollments" #usrid = input("id of %s? " % N) usrid = '30286' data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":usrid, "enrollment[enrollment_state]":"active" } r4 = requests.post(u3, headers=header, params=data2) result = json.loads(r4.text) print(json.dumps(result, indent=2)) print(f"enrolled user id: {usrid} as teacher in course {id}.") change_term(id, term_id_sp25) import os, pickle def create_sandboxes(): ## TODO: read all student names and determine ahead of time if initials conflict. deal with them courses_to_sandbox = [ #(20567, ' Sandbox GOTT1 SU24'), #(20575, ' Sandbox GOTT2 SU24'), #(20600, ' Sandbox GOTT4 SU24'), #(19223, ' Sandbox GOTT5 WI24'), #(19224, ' Sandbox GOTT6 WI24'), #(20761, ' Sandbox GOTT1 FA24'), #(21770, ' Sandbox GOTT1 WI25'), #(21772, ' Sandbox GOTT2 WI25'), #(23083, ' Sandbox GOTT1 SU25'), #(23015, ' Sandbox GOTT2 SU25'), #(21898, ' Sandbox GOTT4 SU25'), #(23270, ' Sandbox GOTT1 FA25SEPT'), #(23290, ' Sandbox GOTT2 FA25SEPT'), (23314, ' Sandbox GOTT5 FA25'), (23315, ' Sandbox GOTT4 FA25'), ] filepath = 'cache/sandbox_courses.pkl' report = codecs.open('cache/sandbox_report.txt','a','utf-8') if os.path.exists(filepath): with open(filepath, 'rb') as f: sandbox_log = pickle.load(f) else: sandbox_log = [] for crs_id, label in courses_to_sandbox: # TODO check and skip "Test Student" crs_info = getCourses(crs_id) # print(json.dumps(crs_info,indent=2)) c_name = crs_info['name'] print(f"\nStudents in course {crs_id}: {c_name}" ) report.write(f"\nCourse: {c_name}\n" ) enrolled = course_enrollment(crs_id) for eid,stu in enrolled.items(): if stu['role'] != 'StudentEnrollment': continue u_name = stu['user']['short_name'] u_id = stu['user']['id'] initials = ''.join([ x[0] for x in u_name.split(" ") ]) print(f" id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}") report.write(f" id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}") coursename = f"{initials}{label}" if coursename in sandbox_log: print(f" - Already created: {coursename}") else: print(f" + Creating course: {coursename} for {u_name}, id: {u_id}") u2 = url + "/api/v1/accounts/1/courses" data = { "course[name]": coursename, "course[code]": coursename, "course[term_id]": "8", } # Create a course r3 = requests.post(u2, headers=header, params=data) new_course_response = json.loads(r3.text) id = new_course_response['id'] print(f" created course id {id}") report.write(f" link: https://ilearn.gavilan.edu/courses/{id} id: {stu['user_id']} ititials: {initials} name: {stu['user']['short_name']} role: {stu['role']}\n") # Add teacher u3 = url + f"/api/v1/courses/{id}/enrollments" data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":u_id, "enrollment[enrollment_state]":"active" } r4 = requests.post(u3, headers=header, params=data2) print(f" enrolled user id: {u_id} as teacher.") # Desired settings data = { 'course[is_public_to_auth_users]': True, 'course[event]': 'offer' } t = url + f"/api/v1/courses/{id}" r3 = requests.put(t, headers=header, params=data) result = json.loads(r3.text) if 'name' in result: print(f" > Name: {result['name']}") if 'workflow_state' in result: print(f" > State: {result['workflow_state']}") if 'is_public_to_auth_users' in result: print(f" > Public: {result['is_public_to_auth_users']}") sandbox_log.append(coursename) # Write log back out with open(filepath, 'wb') as handle: pickle.dump(sandbox_log, handle, protocol=pickle.HIGHEST_PROTOCOL) return 1 # ('ED','82'), sandboxes = [ ('JH','45324'), ('PK','38183'), ('GM','5167'), ('BS','19231'), ('ST','303'), ('KW','5145')] sandboxes = [ ('CD','51701'), ('LC','45193'), ('JC','70'), ('DG','133'), ('JH','2816'),('SM','18812'), ('GM','211'), ('RM','45341'), ('DP','251'), ('BT','58059'), ('TT','36834') ] sandboxes = [ ('MA','8'), ('WA','15'), ('BA','18'), ('CC','51701'), ('LC','45193'), ('PC','4100'), ('ED','82'), ('KE','101'), ('OF','41897'), ('SG','115'), ('JG','37654'), ('DG','133'), ('DK','168'), ('JM','204'), ('GM', '211'), ('RM','45341'), ('CR','5655'), ('CS','272'), ('BS','19231'), ('SS', '274') ] sandboxes = [ ('SM','191')] sandboxes = [ ('KD', '2509'), ('KE', '2904'), ('SH', '144'), ('SN','60996'), ('EP', '16726'), ('PS','60938'), ('JW', '43052') ] sandboxes = [('HA','61620'), ('AS','61451'), ('MP', '11565'), ('AA','51276') ] sandboxes = [('JR','61062')] for (N,usrid) in sandboxes: coursename = f"{N} Sandbox SU23 (GOTT1)" coursecode = f"{N} SU23 Sandbox (GOTT1)" print(f"Creating course: {coursename} for {N}, id: {usrid}") u2 = url + "/api/v1/accounts/1/courses" data = { "course[name]": coursename, "course[code]": coursecode, "course[term_id]": "8", } # Create a course r3 = requests.post(u2, headers=header, params=data) course_by_dept = json.loads(r3.text) id = course_by_dept['id'] print(f"created course id {id}") report.append( f"{coursename} https://ilearn.gavilan.edu/courses/{id}" ) # Add teacher u3 = url + f"/api/v1/courses/{id}/enrollments" #usrid = input("id of %s? " % N) data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":usrid, "enrollment[enrollment_state]":"active" } r4 = requests.post(u3, headers=header, params=data2) print(f"enrolled user id: {usrid} as teacher.") # Desired settings data = { 'course[is_public_to_auth_users]': True, 'course[event]': 'offer' } t = url + f"/api/v1/courses/{id}" r3 = requests.put(t, headers=header, params=data) result = json.loads(r3.text) if 'name' in result: print(f"Name: {result['name']}") if 'workflow_state' in result: print(f" State: {result['workflow_state']}") if 'is_public_to_auth_users' in result: print(f" Public: {result['is_public_to_auth_users']}") #print(json.dumps(json.loads(r4.text),indent=2)) #print() #x = input("enter to continue") print("\n\n") print("\n".join(report)) print("\n") ## ## ## ## ## ## Course Nav and External Tools ## ## ## ## def do_gav_connect(): term = 181 sem = "202430" get_fresh = 1 crns = [sem + "-" + x.strip() for x in open('cache/starfish.txt','r').readlines()] target = len(crns) print(crns) print("Press enter to begin.") a = input() print("Fetching all course names...") c = getCoursesInTerm(term, get_fresh, 0) i = 0 for course in c: if course['sis_course_id'] in crns: print(" Adding gav connect to", course['name']) print() result = add_gav_connect(course['id']) if result: i += 1 else: print("Something went wrong with", course['name']) print(f"Added {i} redirects out of {target}.") def add_gav_connect(course_id): params = { "name": "GavConnect", "privacy_level": "anonymous", "description": "Add links to external web resources that show up as navigation items in course, user or account navigation. Whatever URL you specify is loaded within the content pane when users click the link.", "consumer_key": "N/A", "shared_secret": "N/A", "url": "https://www.edu-apps.org/redirect", "custom_fields[new_tab]": "1", "custom_fields[url]": "https://gavilan.starfishsolutions.com/starfish-ops/dl/student/dashboard.html", "workflow_state": "anonymous", "course_navigation[enabled]": "true", "course_navigation[visibility]": "public", "course_navigation[label]": "GavConnect", "course_navigation[selection_width]": "800", "course_navigation[selection_height]": "400", "course_navigation[icon_url]": "https://www.edu-apps.org/assets/lti_redirect_engine/redirect_icon.png", } # POST u = url + f"/api/v1/courses/{course_id}/external_tools" res = requests.post(u, headers = header, params=params) result = json.loads(res.text) #print( json.dumps( result, indent=2) ) if "errors" in result: return 0 if "id" in result: return 1 # Create a Program Mapper redirect external tool in course nav. def add_career_academic_pathways(course_id): params = { "name": "Career & Academic Pathways", "privacy_level": "anonymous", "description": "Add links to external web resources that show up as navigation items in course, user or account navigation. Whatever URL you specify is loaded within the content pane when users click the link.", "consumer_key": "N/A", "shared_secret": "N/A", "url": "https://www.edu-apps.org/redirect", "custom_fields[new_tab]": "1", "custom_fields[url]": "https://gavilan.programmapper.com/academics", "workflow_state": "anonymous", "course_navigation[enabled]": "true", "course_navigation[visibility]": "public", "course_navigation[label]": "Career & Academic Pathways", "course_navigation[selection_width]": "800", "course_navigation[selection_height]": "400", "course_navigation[icon_url]": "https://www.edu-apps.org/assets/lti_redirect_engine/redirect_icon.png", "course_navigation[default]": "disabled", } # POST u = url + f"/api/v1/courses/{course_id}/external_tools" res = requests.post(u, headers = header, params=params) result = json.loads(res.text) #print( json.dumps( result, indent=2) ) if "errors" in result: return 0 if "id" in result: return 1 # Bulk add GavConnect to a list of course ids provided by user input. def add_gav_connect_prompt_list(): raw = input("Enter course ids (comma/space separated): ") ids = [s.strip() for s in re.split(r"[\s,]+", raw) if s.strip()] if not ids: print("No course ids provided.") return ok = 0 for cid in ids: try: res = add_gav_connect(cid) print(f"{cid}: {'OK' if res else 'FAILED'}") if res: ok += 1 except Exception as ex: print(f"{cid}: error {ex}") print(f"Added GavConnect to {ok} of {len(ids)} courses.") # Add Pathways link to every course in a selected term (default OFF). def add_pathways_all_courses_in_term(): t = find_term(input("term? (ex: fa25) ")) if not t or (not 'canvas_term_id' in t) or (not 'code' in t): print("Couldn't find term.") return term = t['canvas_term_id'] print("Fetching list of all active courses") courses = getCoursesInTerm(term, 1, 0) ok, total = 0, 0 for C in courses: total += 1 try: res = add_career_academic_pathways(C['id']) print(f"{C['id']} {C.get('name','')}: {'OK' if res else 'FAILED'}") if res: ok += 1 except Exception as ex: print(f"{C['id']} {C.get('name','')}: error {ex}") print(f"Added Pathways to {ok} of {total} courses.") # Ensure Pathways link exists for every course in the term; add if missing (default OFF). def ensure_pathways_in_term(): t = find_term(input("term? (ex: fa25) ")) if not t or (not 'canvas_term_id' in t) or (not 'code' in t): print("Couldn't find term.") return term = t['canvas_term_id'] TARGET_LABEL = "Career & Academic Pathways" print("Fetching list of all active courses") courses = getCoursesInTerm(term, 1, 0) added, present, total = 0, 0, 0 for C in courses: total += 1 try: tabs = fetch(f"{url}/api/v1/courses/{C['id']}/tabs") or [] has_it = any(t.get('label') == TARGET_LABEL for t in tabs) if has_it: present += 1 print(f"{C['id']} {C.get('name','')}: already present") else: res = add_career_academic_pathways(C['id']) print(f"{C['id']} {C.get('name','')}: {'ADDED' if res else 'FAILED'}") if res: added += 1 except Exception as ex: print(f"{C['id']} {C.get('name','')}: error {ex}") print(f"Present: {present}, Added: {added}, Total: {total}") def mod_eval_visibility( shell_id, visible=True ): evals_hidden = True if (visible): evals_hidden = False data = {'position':2, 'hidden':evals_hidden} u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % shell_id r3 = requests.put(u2, headers=header, params=data) #print(" " + r3.text) def instructor_list_to_activate_evals(): courses = all_sem_courses_teachers() mylist = codecs.open('cache/fa21_eval_teachers.txt','r','utf-8').readlines() mylist = [ x.split(',')[2].strip() for x in mylist ] count = 0 limit = 5000 for c in courses: shell_id = c[1] teacher_id = c[6] teacher_name = c[5] course_name = c[3] if teacher_id in mylist: print("Teacher: %s \t course: %s" % (teacher_name,course_name)) mod_eval_visibility( shell_id, False) count += 1 if count > limit: return #print(mylist) # Toggle the eval tool visibility for all courses in the selected Canvas term. def add_evals(section=0): # show or hide? term_record = find_term(input('term? ')) if not term_record: raise ValueError(f"Unknown term") term_id = term_record.get('canvas_term_id') if term_id is None: raise ValueError(f"Canvas term id missing for {term_record}") term_code = term_record.get('code') # fetch list of courses? GET_FRESH_LIST = 0 # turn off eval link to clean up from prev semester? #CLEAN_UP = 1 # just print, don't change anything TEST_RUN = 0 # confirm each shell? ASK = 0 # are we showing or hiding the course eval link? HIDE = False s = [ x.strip() for x in codecs.open(f"cache/{term_code}_eval_sections.txt",'r').readlines()] s = list(funcy.flatten(s)) s.sort() print(f"Going to activate course evals in these sections: \n{s}\n") xyz = input('hit return to continue') all_semester_courses = getCoursesInTerm(term_id, GET_FRESH_LIST, 1) eval_course_ids = [] courses = {} for C in all_semester_courses: if C and 'sis_course_id' in C and C['sis_course_id']: parts = C['sis_course_id'].split('-') if parts[1] in s: #print(C['name']) courses[str(C['id'])] = C eval_course_ids.append(str(C['id'])) data = {'position':2, 'hidden':HIDE} eval_course_ids.sort() for i in eval_course_ids: if TEST_RUN: print(f"{courses[i]['id']} / {courses[i]['name']}") else: if ASK: a = input(f"Hit q to quit, a to do all, or enter to activate eval for: {courses[i]['id']} / {courses[i]['name']} : ") if a == 'a': ASK = 0 if a == 'q': return else: print(f"{courses[i]['id']} / {courses[i]['name']}") u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % i r3 = requests.put(u2, headers=header, params=data) print(f"OK {u2}") #print(r3.text) #time.sleep(0.400) def remove_evals_all_sections(): TERM = 184 SEM = "fa24" # fetch list of courses? GET_FRESH_LIST = 0 # just print, don't change anything TEST_RUN = 0 # confirm each shell? ASK = 0 # are we showing or hiding the course eval link? HIDE = True all_semester_courses = getCoursesInTerm(TERM, GET_FRESH_LIST, 1) eval_course_ids = [ C['id'] for C in all_semester_courses ] courses = { C['id']: C for C in all_semester_courses } data = {'position':2, 'hidden':HIDE} eval_course_ids.sort() for i in eval_course_ids: if TEST_RUN: print(f"{courses[i]['id']} / {courses[i]['name']}") else: if ASK: a = input(f"Hit q to quit, a to do all, or enter to activate eval for: {courses[i]['id']} / {courses[i]['name']} : ") if a == 'a': ASK = 0 if a == 'q': return else: print(f"{courses[i]['id']} / {courses[i]['name']}") u2 = f"https://gavilan.instructure.com:443/api/v1/courses/{i}/tabs/context_external_tool_1953" r3 = requests.put(u2, headers=header, params=data) print(r3.text) def get_ext_tools(): r = url + '/api/v1/accounts/1/external_tools' s = fetch(r) print(json.dumps(s,indent=2)) def set_ext_tools(): TOOL = 733 r = url + '/api/v1/accounts/1/external_tools/%s' % str(TOOL) data = { 'course_navigation[default]': 'disabled' } s = json.loads(requests.put(r, headers=header, params=data).text) print(json.dumps(s,indent=2)) def get_course_ext_tools(): course_id = "15971" r = url + f"/api/v1/courses/{course_id}/external_tools" s = fetch(r) print(json.dumps(s,indent=2)) def remove_n_analytics(section=0): print("Fetching list of all active courses") c = getCoursesInTerm(172,1,0) print(c) ids = [] courses = {} data = {'hidden':True} pause = 1 for C in c: #print( json.dumps(C,indent=2) ) parts = C['sis_course_id'].split('-') #print("\n") print(C['name']) courses[str(C['id'])] = C ids.append(str(C['id'])) u3 = url + '/api/v1/courses/%s/tabs' % str(C['id']) tabs = fetch(u3) for T in tabs: if T['label'] == "New Analytics": print( "\tVisibility is: " + T["visibility"] ) # json.dumps(tabs,indent=2) ) if "hidden" in T: print( "\tHidden is: " + str(T["hidden"]) ) # json.dumps(tabs,indent=2) ) if 1: # T["visibility"] != "admins": u4 = url + "/api/v1/courses/%s/tabs/%s" % ( str(C['id']), str(T['id']) ) print( "\tChanging visiblity of a. tab" ) r4 = requests.put(u4, headers=header, params=data) print("\t" + r4.text) if pause: xyz = input('\n\nenter for next one or [y] to do all: ') if xyz == 'y': pause = 0 exit() """ask = 1 evals_hidden = True data = {'position':2, 'hidden':evals_hidden} for i in ids: if ask: a = input("Hit q to quit, a to do all, or enter to activate eval for: \n " + str(courses[i]) + "\n> ") if a == 'a': ask = 0 if a == 'q': return u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % i print(courses[i]['name']) r3 = requests.put(u2, headers=header, params=data) print(" " + r3.text) time.sleep(0.300) """ import csv def my_nav_filter(row): # Filter logic: consider a tab ON only when visibility is public and not hidden; used for CSV summary. if str(row.get('visibility', '')).lower() != 'public': return False hidden = row.get('hidden') if str(hidden).lower() in ['true']: return False return True ## Multi-pass course nav tool. Pass 1: index + XLSX; Pass 2: optional hide/show ids. def clean_course_nav_setup_semester(section=0): # Multi-pass tool: index course nav, write XLSX with x/-/blank, then optionally bulk hide/show by label across the term. TESTING = False # Limit to first 10 courses while testing import openpyxl from openpyxl.utils import get_column_letter t = find_term(input("term? (ex: fa25) ")) if not t or (not 'canvas_term_id' in t) or (not 'code' in t): print("Couldn't find term.") return term = t['canvas_term_id'] SEM = t['code'] print("Fetching list of all active courses") courses_in_term = getCoursesInTerm(term, 1, 0) if TESTING: print("TESTING mode enabled: limiting to first 20 courses") courses_in_term = courses_in_term[:20] # Collect course records and their tabs all_tabs_by_course = {} # course_id -> list of tab dicts all_tab_ids = {} # tab_id -> label (most recent seen) — kept for Pass 2 id operations all_labels = set() # all labels seen in any course (visible or hidden) # Also write a detailed CSV of visible tabs as before nav_out = codecs.open(f'cache/course_nav_summary_{SEM}.csv', 'w', 'utf-8') nav_writer = csv.writer(nav_out) columns = "id name code start state label position hidden visibility type url".split(" ") nav_writer.writerow(columns) for C in courses_in_term: try: cid = str(C['id']) print(C['name']) u3 = f"{url}/api/v1/courses/{C['id']}/tabs" tabs = fetch(u3) or [] # Normalize hidden for T in tabs: if 'hidden' not in T: T['hidden'] = "n/a" # Track global set of tab ids and a sample label all_tab_ids[str(T.get('id'))] = T.get('label', str(T.get('id'))) if T.get('label'): all_labels.add(T.get('label')) # Write summary of ON tabs vals = [C['id'], C['name'], C['course_code'], C.get('start_at', ''), C.get('workflow_state', ''), T.get('label', ''), T.get('position', ''), T.get('hidden', ''), T.get('visibility', ''), T.get('type', ''), T.get('html_url', '')] mydict = dict(zip(columns, vals)) if my_nav_filter(mydict): nav_writer.writerow(vals) nav_out.flush() all_tabs_by_course[cid] = tabs except Exception as err: print(f"Exception: {err}") try: nav_out.close() except Exception: pass # Build XLSX matrix try: # Compute popularity (count of visible 'x' occurrences per label) and sort by popularity then alpha. # First, build quick lookup per course for label visibility/hidden. def tab_status(t): vis = str(t.get('visibility', '')).lower() hid = str(t.get('hidden', '')).lower() if vis == 'public' and hid not in ['true']: return 'x' # present and visible return '-' # present but hidden (includes non-public visibility) # Precompute per-course map: label -> status symbol ('x' or '-') status_by_course = {} for C in courses_in_term: cid = str(C['id']) label_to_status = {} for tdict in all_tabs_by_course.get(cid, []) or []: lbl = tdict.get('label') if not lbl: continue cur = label_to_status.get(lbl) new = tab_status(tdict) # If any instance is visible, prefer 'x' over '-' if cur == 'x': continue label_to_status[lbl] = 'x' if new == 'x' else (cur or '-') status_by_course[cid] = label_to_status # Popularity counts visible_counts = defaultdict(int) for cid, lmap in status_by_course.items(): for lbl, sym in lmap.items(): if sym == 'x': visible_counts[lbl] += 1 # Column order: by decreasing x-count, then alphabetical for ties tab_labels = sorted(all_labels, key=lambda s: (-visible_counts.get(s, 0), str(s).lower())) xlsx_path = f"cache/course_nav_matrix_{SEM}.xlsx" wb = openpyxl.Workbook() ws = wb.active ws.title = "matrix" # Headers static_headers = ['course_id', 'course_name', 'first_teacher'] # Row 1: labels only, sorted by popularity (most x's first) row1 = static_headers + list(tab_labels) ws.append(row1) # Rows: one per course for C in courses_in_term: cid = str(C['id']) cname = C.get('name', '') # First teacher teacher = '' try: tlist = teacher_list(int(cid)) or [] if tlist: # list of tuples: (id, name) teacher = sorted([t[1] for t in tlist])[0] except Exception: pass row = [cid, cname, teacher] course_map = status_by_course.get(cid, {}) for lbl in tab_labels: sym = course_map.get(lbl, '') row.append(sym) ws.append(row) # Simple sizing for readability for col_idx in range(1, ws.max_column + 1): col_letter = get_column_letter(col_idx) ws.column_dimensions[col_letter].width = 16 if col_idx > 3 else 20 wb.save(xlsx_path) print(f"Wrote matrix: {xlsx_path}") except Exception as ex: print(f"Failed to write XLSX matrix: {ex}") # Optional Pass 2: apply hide/show updates (labels only) try_apply = input("Apply changes? [n] hide/show selected labels across all courses (y/N): ").strip().lower() in ['y', 'yes'] if not try_apply: print("Pass 1 complete. No changes applied.") return # Gather labels to HIDE and SHOW def parse_label_list(s): if not s: return [] # Allow comma-separated list. Spaces are preserved within labels. if '\n' in s: raw = [x.strip() for x in s.split('\n') if x.strip()] else: raw = [x.strip() for x in s.split(',') if x.strip()] return raw hide_src = input("Enter labels to HIDE (comma-separated or newline file path), leave blank to skip: ").strip() show_src = input("Enter labels to SHOW (comma-separated or newline file path), leave blank to skip: ").strip() hide_labels = [] show_labels = [] def read_labels_from_path(pth): try: with codecs.open(pth, 'r', 'utf-8') as f: content = f.read() # Prefer newline separation in files (one label per line) return [x.strip() for x in content.splitlines() if x.strip()] except Exception: return [] if hide_src: if os.path.exists(hide_src): hide_labels = read_labels_from_path(hide_src) elif os.path.exists(os.path.join('cache', hide_src)): hide_labels = read_labels_from_path(os.path.join('cache', hide_src)) else: hide_labels = parse_label_list(hide_src) if show_src: if os.path.exists(show_src): show_labels = read_labels_from_path(show_src) elif os.path.exists(os.path.join('cache', show_src)): show_labels = read_labels_from_path(os.path.join('cache', show_src)) else: show_labels = parse_label_list(show_src) # De-duplicate while keeping order roughly intact hide_labels = list(dict.fromkeys(hide_labels)) show_labels = list(dict.fromkeys(show_labels)) if not (hide_labels or show_labels): print("No labels provided. Skipping Pass 2.") return print(f"HIDE labels: {hide_labels}") print(f"SHOW labels: {show_labels}") confirm = input("Proceed with updates? (y/N): ").strip().lower() in ['y', 'yes'] if not confirm: print("Aborted. No changes applied.") return # Build lookup of labels currently ON per course from the CSV summary (to avoid unnecessary hide calls) on_labels_by_course = defaultdict(set) try: csv_path = f'cache/course_nav_summary_{SEM}.csv' with codecs.open(csv_path, 'r', 'utf-8') as f: reader = csv.DictReader(f) for row in reader: cid = str(row.get('id', '')).strip() lbl = row.get('label') if cid and lbl: on_labels_by_course[cid].add(lbl) except Exception as ex: print(f"Warning: could not read CSV summary for ON labels: {ex}") # Apply changes for C in courses_in_term: cid = str(C['id']) try: course_tabs = all_tabs_by_course.get(cid) # Refresh tabs to reduce staleness just before applying if course_tabs is None: course_tabs = fetch(f"{url}/api/v1/courses/{cid}/tabs") or [] tabs_by_label = defaultdict(list) for t in course_tabs: lbl = t.get('label') if lbl: tabs_by_label[lbl].append(t) # Helpers to PUT hide/show for a tab id def set_hidden_for_id(tab_id, hidden_val): put_url = f"{url}/api/v1/courses/{cid}/tabs/{tab_id}" data = {'hidden': hidden_val} r = requests.put(put_url, headers=header, params=data) action = 'HIDE' if hidden_val else 'SHOW' print(f"{action} {cid} {C.get('name','')} -> {tab_id}: {r.status_code}") # Hide by labels (only if label is currently ON per CSV) labels_to_hide_here = [lbl for lbl in hide_labels if lbl in on_labels_by_course.get(cid, set())] for lbl in labels_to_hide_here: for t in tabs_by_label.get(lbl, []): tid = str(t.get('id')) set_hidden_for_id(tid, True) # Show by labels for lbl in show_labels: for t in tabs_by_label.get(lbl, []): tid = str(t.get('id')) set_hidden_for_id(tid, False) except Exception as ex: print(f"Failed applying to course {cid}: {ex}") print("Pass 2 complete.") def fetch_rubric_scores(course_id=16528, assignment_id=1): api_url = f'{url}/api/v1/courses/{course_id}' course_info = fetch(api_url) out = codecs.open('cache/rubric_scores.txt','w','utf-8') #print(course_info) # Extract course details course_name = course_info['name'] course_short_name = course_info['course_code'] course_semester = course_info['enrollment_term_id'] # Print course information out.write(f"Course Name: {course_name}\n") out.write(f"Short Name: {course_short_name}\n") out.write(f"Semester: {course_semester}\n") api_url = f'{url}/api/v1/courses/{course_id}/assignments' assignments_list = fetch(api_url) #print(assignments_list) assignments_by_dept = {} ratings_by_dept = {} # Iterate through the list of assignments and populate the dictionary for assignment in assignments_list: assignment_id = assignment['id'] assignment_name = assignment['name'] rubric = assignment.get('rubric', []) # Get the rubric field (default to an empty list if not present) has_rubric = 'no' if rubric: has_rubric = 'yes' out.write(f" Asmt Name: {assignment_name} ID: {assignment_id} Rubric: {has_rubric}\n") # Save assignment details including rubric assignments_by_dept[assignment_id] = { 'name': assignment_name, 'rubric': rubric # Add more assignment details if needed } if rubric: print("RUBRIC:") print(json.dumps(rubric,indent=2)) for r in rubric: for rat in r.get('ratings',[]): ratings_by_dept[rat['id']] = { 'description': r['description'], 'long_description': rat['description'], 'points': rat['points']} # Print the assignments dictionary out.write(json.dumps(assignments_by_dept,indent=2)+'\n\n\n') out.write(json.dumps(ratings_by_dept,indent=2)+'\n\n\n') # Loop thru assignments with rubrics and report on grades for assignment in assignments_list: if not assignment.get('rubric', []): continue assignment_id = assignment['id'] out.write(f" Asmt Name: {assignment_name} ID: {assignment_id}\n") api_url = f'{url}/api/v1/courses/{course_id}/assignments/{assignment_id}/submissions?include[]=rubric_assessment' # Include the 'include[]=rubric_assessment' parameter to request rubric assessments # params = {'include[]': 'rubric_assessment'} # Make the API request with the parameters #response = requests.get(api_url, params=params) # Check if the request was successful (status code 200) #if response.status_code != 200: # print(f"Request failed with status code {response.status_code}") # continue submissions_by_dept = fetch(api_url) # Iterate through the list of submissions and retrieve rubric scores and comments for submission in submissions_by_dept: user_id = submission['user_id'] rubric = submission.get('rubric_assessment', []) # Get the rubric assessment (empty list if not present) comments = submission.get('submission_comments', '') # Get submission comments (empty string if not present) score = submission.get('score', -1) # Process and use rubric scores and comments as needed # Example: Print user information, rubric scores, and comments if rubric: print(json.dumps(submission,indent=2)) out.write(f"\nSubmission User ID/Assignment ID: {user_id}/{assignment_id}\n") out.write(f"Score: {score}\n") out.write(f"Submission Comments: {comments}\n") out.write(f"Rubric:\n") for k,v in rubric.items(): rub_by_dept = '?' rat_by_dept = '?' if v['rating_id'] in ratings_by_dept: rub_rating = ratings_by_dept[v['rating_id']] rub_by_dept = rub_rating['rub_by_deptription'] rat_by_dept = rub_rating['rat_by_deptription'] out.write(f" {rub_by_dept} - {rat_by_dept} ({v['rating_id']}): {v['points']}/{rub_rating['points']} points: {v['comments']}\n") out.write("---") # Separator between submissions out.flush() def quick_sem_course_list(term=180): c = getCoursesInTerm(term,1,0) c = sorted(c, key=lambda k: k['name']) for C in c: print(C['name']) # Check Canvas for an existing calendar event that matches the provided metadata. def find_existing_calendar_event(context_code, title, start_at_iso, description="", tolerance_hours=12): def _normalize_iso(value): if not value: return None if value.endswith('Z'): value = value[:-1] + '+00:00' try: return datetime.fromisoformat(value) except ValueError: return None target_start = _normalize_iso(start_at_iso) if not target_start: return None window_start = (target_start - timedelta(hours=tolerance_hours)).date().isoformat() window_end = (target_start + timedelta(hours=tolerance_hours)).date().isoformat() params = { "context_codes[]": context_code, "start_date": window_start, "end_date": window_end, } existing_events = fetch("/api/v1/calendar_events", params=params) if not isinstance(existing_events, list): print(f"Unable to inspect existing events for context {context_code}: unexpected response") return None normalized_title = title.strip().lower() if isinstance(title, str) else "" normalized_description = description.strip().lower() if isinstance(description, str) else "" for event in existing_events: event_title = (event.get('title') or "").strip().lower() event_description = (event.get('description') or "").strip().lower() event_start = _normalize_iso(event.get('start_at') or "") if not event_start: continue time_difference = abs((event_start - target_start).total_seconds()) if time_difference > tolerance_hours * 3600: continue if event_title == normalized_title: return event if normalized_description and event_description == normalized_description: return event return None # Remove all calendar events attached to a course after user confirmation. def remove_all_course_events(): course_id = input("course id> ").strip() if not course_id: print("No course id provided; aborting.") return context_code = course_id if course_id.startswith("course_") else f"course_{course_id}" today = datetime.now(timezone.utc).date() start_date = (today - timedelta(days=730)).isoformat() end_date = (today + timedelta(days=365)).isoformat() print(f"Fetching existing events for {context_code} between {start_date} and {end_date}...") params = { "context_codes[]": context_code, "per_page": 100, "start_date": start_date, "end_date": end_date, } events = fetch("/api/v1/calendar_events", params=params) if not events: print("No events found for this course.") return print(f"Found {len(events)} events. Beginning removal...") for event in events: event_id = event.get("id") event_title = event.get("title", "(no title)") if not event_id: print(f"Skipping event '{event_title}' with missing id") continue print(f"Deleting event '{event_title}' (id {event_id}) in {context_code}...", end=' ') delete_url = f"{url}/api/v1/calendar_events/{event_id}" response = requests.delete(delete_url, headers=header) if response.ok: print("deleted successfully") else: print(f"failed: {response.status_code} {response.text}") # Build a term-wide CSV summarizing student participation metrics for every course. def build_term_participation_report(): term_alias = input("Term alias (ex: fa25): ").strip() if not term_alias: print("No term alias provided; aborting.") return normalized_alias = term_alias.lower() term_record = find_term(normalized_alias) if not term_record: print(f"Unknown term alias: {term_alias}") return term_id = term_record.get('canvas_term_id') if not term_id: print(f"Canvas term id missing for {term_alias}") return term_code = (term_record.get('code') or normalized_alias).lower() mode_choice = input("Demo run with ~10 random courses? (y/N): ").strip().lower() demo_mode = mode_choice == 'y' courses = getCoursesInTerm(term_id, get_fresh=0, show=0) if not isinstance(courses, list): print("Unable to fetch courses for this term; aborting.") return if demo_mode: random.shuffle(courses) print("Demo mode: targeting up to 10 courses with analytics data") output_path = f"cache/{term_code}_participation.csv" base_fields = [ 'term_code', 'course_id', 'course_name', 'course_sis_id', 'course_code', 'student_canvas_id', 'student_sortable_name', 'student_sis_user_id', 'student_login_id', 'student_name', 'student_email', ] rows = [] data_fields = set() def flatten_value(prefix, value, dest): if isinstance(value, dict): for key, val in value.items(): next_key = f"{prefix}.{key}" if prefix else str(key) flatten_value(next_key, val, dest) elif isinstance(value, list): dest[prefix] = json.dumps(value) else: dest[prefix] = value processed_courses = 0 for course in courses: if demo_mode and processed_courses >= 10: break course_id = course.get('id') if not course_id: continue course_name = course.get('name', '') print(f"Fetching analytics for course {course_id}: {course_name}") enrollment_index = {} try: enrollment_params = { 'type[]': 'StudentEnrollment', 'per_page': 100, } enrollments = fetch(f"/api/v1/courses/{course_id}/enrollments", params=enrollment_params) if isinstance(enrollments, list): for enrollment in enrollments: user = enrollment.get('user') or {} user_id = user.get('id') or enrollment.get('user_id') if not user_id: continue entry = { 'sortable_name': user.get('sortable_name', ''), 'sis_user_id': user.get('sis_user_id', ''), 'login_id': user.get('login_id', ''), 'sis_login_id': user.get('sis_login_id', ''), 'email': user.get('email', ''), 'name': user.get('name', ''), } enrollment_index[user_id] = entry enrollment_index[str(user_id)] = entry except Exception as exc: print(f"Failed to fetch enrollments for {course_id}: {exc}") try: summaries = fetch(f"/api/v1/courses/{course_id}/analytics/student_summaries") except Exception as exc: print(f"Failed to fetch analytics for {course_id}: {exc}") continue if not isinstance(summaries, list): print(f"Unexpected analytics payload for {course_id}; skipping") continue course_rows_added = 0 for summary in summaries: flattened = {} flatten_value('', summary, flattened) user_id = ( summary.get('id') or summary.get('user_id') or flattened.get('user_id') or flattened.get('user.id') ) enrollment_details = {} if user_id in enrollment_index: enrollment_details = enrollment_index[user_id] elif isinstance(user_id, str) and user_id.isdigit(): enrollment_details = enrollment_index.get(int(user_id), {}) elif isinstance(user_id, int): enrollment_details = enrollment_index.get(str(user_id), {}) row = { 'term_code': term_code, 'course_id': str(course_id), 'course_name': course_name, 'course_sis_id': course.get('sis_course_id', ''), 'course_code': course.get('course_code', ''), 'student_canvas_id': str(user_id) if user_id else '', 'student_sortable_name': enrollment_details.get('sortable_name') or '', 'student_sis_user_id': (enrollment_details.get('sis_user_id') or enrollment_details.get('sis_login_id')) or '', 'student_login_id': enrollment_details.get('login_id') or '', 'student_name': enrollment_details.get('name') or '', 'student_email': enrollment_details.get('email') or '', } if enrollment_details: data_fields.add('student_name') data_fields.add('student_email') for key, value in flattened.items(): if not key: continue row[key] = value data_fields.add(key) rows.append(row) course_rows_added += 1 if course_rows_added == 0: print(f"Skipping course {course_id}: no student analytics data") continue processed_courses += 1 if demo_mode and processed_courses < 10: print(f"Demo mode finished early: only {processed_courses} courses had analytics data") if not rows: print("No analytics data found; nothing to write.") return field_order = base_fields + sorted([field for field in data_fields if field not in base_fields]) print(f"Writing {len(rows)} rows to {output_path}") with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_order) writer.writeheader() for row in rows: writer.writerow({field: row.get(field, '') for field in field_order}) # Summarize student orientation enrollments across the account and flag coverage gaps. def audit_student_orientation_enrollments(): orientation_years = ['2022', '2023', '2024', '2025', '2026'] orientation_shells = get_orientation_shells(orientation_years) missing_years = [year for year in orientation_years if year not in orientation_shells] if missing_years: print(f"Warning: orientation shells not found for years: {', '.join(missing_years)}") if not orientation_shells: print("No orientation courses located; aborting.") return orientation_memberships = get_orientation_memberships(orientation_years) student_summaries = get_student_enrollment_summary() if not student_summaries: print("No student enrollment data available; aborting.") return rows = [] for summary in student_summaries: user_id = summary.get('user_id') user_key = str(user_id) membership = orientation_memberships.get(user_key, {'years': set(), 'total': 0}) membership_years = membership.get('years', set()) orientation_total = membership.get('total', 0) row = { 'student_id': user_key, 'sortable_name': summary.get('sortable_name') or summary.get('name') or '', 'sis_id': summary.get('sis_user_id') or '', 'student_enrollment_count': summary.get('student_enrollments', 0), 'teacher_enrollment_count': summary.get('teacher_enrollments', 0), 'orientation_enrollment_total': orientation_total, 'missing_student_orientation': 1 if orientation_total == 0 else 0, } for year in orientation_years: row[year] = 1 if year in membership_years else 0 rows.append(row) if not rows: print("No rows to write; aborting.") return rows.sort(key=lambda r: (r.get('missing_student_orientation', 0), r.get('sortable_name', ''))) output_path = 'cache/student_orientation_audit.csv' fieldnames = [ 'student_id', 'sortable_name', 'sis_id', 'student_enrollment_count', 'teacher_enrollment_count', *orientation_years, 'orientation_enrollment_total', 'missing_student_orientation' ] print(f"Writing {len(rows)} rows to {output_path}") with open(output_path, 'w', newline='', encoding='utf-8') as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow({field: row.get(field, '') for field in fieldnames}) missing_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) == 0) multi_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) > 1) print(f"Orientation audit complete. Missing: {missing_count}, duplicates: {multi_count}.") # Create Canvas calendar events for predefined orientation shells from CSV input. def create_calendar_event(): events = codecs.open('cache/academic_calendar_2025.csv','r','utf-8').readlines() orientation_shells = ["course_15924","course_19094","course_20862", "course_23313"] for ori_shell in orientation_shells: for e in events: if not e.strip(): continue parts = [part.strip() for part in e.split(',', 2)] if len(parts) < 3: continue date, title, desc = parts local = pytz.timezone("America/Los_Angeles") naive = datetime.strptime(date, "%Y-%m-%d") local_dt = local.localize(naive, is_dst=None) utc_dt = local_dt.astimezone(pytz.utc).isoformat() print(f"Checking event '{title}' ({date}) in {ori_shell}...", end=' ') existing_event = find_existing_calendar_event(ori_shell, title, utc_dt, desc) if existing_event: existing_id = existing_event.get('id') print(f"exists as id {existing_id} in {ori_shell}, skipping add") continue print(f"no existing event in {ori_shell}, attempting add") params = { "calendar_event[context_code]": ori_shell, "calendar_event[title]": title, "calendar_event[description]": desc, "calendar_event[start_at]": utc_dt, # DateTime "calendar_event[all_by_dept": "true", } u = url + "/api/v1/calendar_events" res = requests.post(u, headers = header, params=params) if res.ok: try: result = json.loads(res.text) except json.JSONDecodeError: print(f"add completed for '{title}' in {ori_shell} (status {res.status_code}) but response parse failed") continue new_id = result.get("id") if new_id: print(f"added successfully as id {new_id} in {ori_shell} (status {res.status_code})") elif "errors" in result: print(f"add failed for '{title}' in {ori_shell}: {result['errors']}") else: print(f"add attempted for '{title}' in {ori_shell} with unexpected response {result}") else: print(f"add failed for '{title}' in {ori_shell}: {res.status_code} {res.text}") def utc_to_local(utc_str): if not utc_str: return "" utc_dt = datetime.strptime(utc_str, '%Y-%m-%dT%H:%M:%SZ') # Set the UTC timezone utc_tz = pytz.timezone('UTC') # Convert the UTC datetime to the Pacific Time Zone pacific_tz = pytz.timezone('US/Pacific') pacific_dt = pytz.timezone(pacific_tz) return pacific_dt.strftime('%a %b %d, %Y %#I:%M%p') def list_all_assignments(): course = input("the course id> ") u = url + f"/api/v1/courses/{course}/assignments" c = fetch(u) #print(json.dumps(c,indent=2)) for a in c: p = 'not published' if a['published'] == True: p = 'published' date = utc_to_local(a['due_at']) print(f"{a['name']}\t{p}\t{date}") def bulk_unenroll(): course_id = input("course id> ") enrollments = fetch(f"{url}/api/v1/courses/{course_id}/enrollments") for enrollment in enrollments: enrollment_id = enrollment['id'] #skiplist = ['51237','58362','237'] #if enrollment_id in skiplist: # continue # Set the headers and parameters for the DELETE API call api_url = f"{url}/api/v1/courses/{course_id}/enrollments/{enrollment_id}" # Make the DELETE request response = requests.delete(api_url, headers=header) # Check the response if response.status_code == 200: print(f"Successfully unenrolled student with id {enrollment_id} from course {course_id}.") else: print(f"Failed to unenroll student with id {enrollment_id} from course {course_id}. Error: {response.text}") def fetch_announcements(course_id=0): if not course_id: course_id = input("course id> ") announcements_url = f"{url}/api/v1/announcements?context_codes[]=course_{course_id}&start_date=2025-01-01&end_date=2025-12-31" announcements = fetch(announcements_url) print(json.dumps(announcements,indent=2)) filename = f"cache/announcements_{course_id}.json" with open(filename, "w") as file: json.dump(announcements, file,indent=2) print("Announcements saved to ", filename) def change_link_in_all_terms_pages(): old_link = "https://www.gavilan.edu/ezproxy" new_link = "https://www.gavilan.edu/ezproxy_new" term = 181 courses = getCoursesInTerm(term,get_fresh=1,show=0,active=1) def enrollment_helper(): ignore = ['JLE','JFT', 'CWE'] ignore2 = ['AH 190', 'AE 600', 'AE 602', 'AE 603','ACCT 190','AJ 100A', 'AJ 107A', 'AJ 213A','AJ 229A','AJ 231A','AMT 190','ATH 23','BUS 190','CD 190','COS 290','WTRM 290','SPAN 8A', 'SPAN 8B', 'SPAN 8C', 'SPAN 8D', 'RE 190','MKTG 190'] keep = 'code,name,days,cap,act,teacher,date,partofday,type,site'.split(',') oo = codecs.open('cache/section_history.json','w','utf-8') # fetch enrollment stats for last few years from semesters import code, sems_by_short_name, short_to_sis from util import dept_from_name raw = [] code.reverse() sort = defaultdict(dict) for s in sems_by_short_name.keys(): try: sched1 = requests.get(f"http://gavilan.cc/schedule/{s}_sched_expanded.json").json() sort[s] = defaultdict(dict) for sect in sched1: if sect['name'] in ignore2: continue sect_smaller = funcy.project(sect,keep) sect_smaller['sem'] = short_to_sis(s) if int(sect_smaller['cap'])==0 or int(sect_smaller['act'])==0: sect_smaller['fill_pct'] = 100 else: sect_smaller['fill_pct'] = round( (int(sect_smaller['act']) / int(sect_smaller['cap']))*100 ) d = dept_from_name(sect_smaller['code']) if d in ignore: continue sect_smaller['dept'] = d raw.append(sect_smaller) if not d in sort[s]: sort[s][d] = defaultdict(dict) name = sect['code'] if not name in sort[s][d]: sort[s][d][name] = [] sort[s][d][name].append(sect_smaller) print(f"{s} OK.") except Exception as e: print(f"{s} not found. {e}") #sems.pop(s) oo.write(json.dumps(sort,indent=2)) df = pd.DataFrame(raw) df_sorted = df.sort_values(['dept', 'code', 'type','site','partofday','fill_pct']) df_sorted.to_csv('cache/section_history.csv') class_counts = df.groupby(['sem', 'code']).size().reset_index(name='class_count') print("Class counts by semester") print(class_counts) pivot_df = class_counts.pivot_table(index='code', columns='sem', values='class_count', aggfunc='sum', fill_value=0) # Reset the index to move 'class_name' back to a column pivot_df.reset_index(inplace=True) print(pivot_df) pivot_df.to_csv('cache/section_counts_history.csv') # Group by semester and class type, and then count the number of occurrences of each class type class_type_counts = df.groupby(['sem', 'code', 'type']).size().reset_index(name='class_type_count') print("Class type by semester") print(class_type_counts) pivot_df2 = class_type_counts.pivot_table(index='code', columns=['sem','type'], values='class_type_count', aggfunc='sum', fill_value=0) # Reset the index to move 'class_name' back to a column pivot_df2.reset_index(inplace=True) '''kmeans = try_clustering(pivot_df2.copy()) pivot_df2.insert(0, "Cluster", kmeans.labels_) print(pivot_df2) pivot_df2.to_csv('cache/section_and_mode_counts_history.csv') # Group by teacher class_teacher_counts = df.groupby(['sem', 'code', 'teacher']).size().reset_index(name='class_teacher_count') print("Class teacher by semester") print(class_teacher_counts) # group by COURSE (ie: ENGL1A) # For each historical WINTER, SPRING, SUMMER, FALL: # number of sections offered, by mode, time of day, campus # all teachers who taught it (and their qual to teach online) # fill percentage for each section, then by mode, tod, campus ## moved: try_clustering now in search.py # Add labels to the DataFrame #df['clusters'] = labels #print(df) #df.to_csv('cache/section_and_mode_counts_history_clusters.csv') return kmeans ''' def unpublish_a_course(course_id=0): if course_id == 0: course_id = input('course id? ') u = url + f"/api/v1/courses/{course_id}" data = { 'course[event]':'claim' } r = requests.put(u, data=data, headers=header) print(r.text) def course_log(): course_id = 19566 L = fetch(f"{url}/api/v1/audit/course/courses/{course_id}") print(json.dumps(L,indent=2)) def fetch_rubric(): course = 21274 r_id = 35961 u = f"{url}/api/v1/courses/{course}/rubrics/{r_id}" result = fetch(u) #print(json.dumps(result,indent=2)) rows = [] for row in result['data']: r = [] r.append(f"{row['description']}
    {row['long_description']}") for item in row['ratings']: r.append(f"{item['description']}
    {item['long_description']}
    {item['points']} points") rows.append("" + "\n".join( r ) + "\n") output = f"

    {result['title']}

    \n" output += "" + ''.join( [ f"{x}\n" for x in rows] ) + "
    \n" print(output) if __name__ == "__main__": options = { 1: ['Cross check schedule with ztc responses',make_ztc_list] , 2: ['Add announcements to homepage', change_course_ann_homepage], 3: ['Unpublish a course', unpublish_a_course], 4: ['List the terms', getTerms], 5: ['Show courses in a term', getCoursesInTerm], 6: ['Save enrollments in a course', course_enrollment], 7: ['Simple list of course data, search by sis_id', course_search_by_sis], 8: ['Overview of a term', course_term_summary], 9: ['process the semester overview output (8)', course_term_summary_2], 10: ['Enroll orientation students (refresh local db first)', enroll_orientation_students], 11: ['Enroll ORIENTATION and STEM student shells after catching up database.', enroll_o_s_students], 12: ['Enroll stem students', enroll_stem_students_live], 13: ['Enroll ART students', enroll_art_students_live], 20: ['Get a course info by id',getCourses], 21: ['Reset course conclude date',update_course_conclude], 22: ['Create calendar events for orientation shells', create_calendar_event], 23: ['Remove all calendar events from a course', remove_all_course_events], 24: ['Build participation report for a term', build_term_participation_report], 25: ['Audit student orientation enrollments', audit_student_orientation_enrollments], 26: ['list all assignments', list_all_assignments], 27: ['Bulk unenroll from course', bulk_unenroll], 28: ['enrollment helper', enrollment_helper], 29: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid], 30: ['* Overview semester start dates',overview_start_dates], 31: ['Fine tune term dates and winter session', course_by_depts_terms], 32: ['Set summer start dates', set_custom_start_dates], 33: ['Cross list, ask for sections', ez_xlist], 34: ['Cross list a semester from argos export file', semester_cross_lister], 35: ['Cross list from manually created file', do_manual_xlist], 36: ['Cross list CWE courses', xlist_cwe], 40: ['Enroll GOTT Workshops', enroll_gott_workshops], 41: ['Create some sandbox courses', create_sandboxes], 42: ['Add teacher to many shells', teacher_to_many_shells], 44: ['List users who passed GOTT 1 / Bootcamp', get_gott1_passers], 45: ['List users who passed Plagiarism Module', get_plague_passers], 50: ['Fetch rubric scores and comments', fetch_rubric_scores], 51: ['Fetch announcements in a course', fetch_announcements], 52: ['show course audit log', course_log], 53: ['fetch a rubric', fetch_rubric], # --- Course Nav / External Tools --- 70: ['ext tools',get_ext_tools], 71: ['set ext tools',set_ext_tools], 72: ['Get course ext tools', get_course_ext_tools], 73: ['Remove "new analytics" from all courses navs in a semester', remove_n_analytics], 74: ['Add course evals', add_evals], 75: ['Remove course evals all sections', remove_evals_all_sections], 76: ['Course nav: index + bulk hide/show (one semester)', clean_course_nav_setup_semester], 77: ['Nav: add GavConnect to list of course ids', add_gav_connect_prompt_list], 78: ['Nav: add Pathways to all courses in a term (default OFF)', add_pathways_all_courses_in_term], 79: ['Nav: ensure Pathways exists for a term (add if missing, OFF)', ensure_pathways_in_term], # 24: ['Add course evals to whole semester',instructor_list_to_activate_evals], # 21: ['Add announcements to homepage', change_course_ann_homepage], #32: ['Cross-list classes', xlist ], #33: ['Cross list helper', eslCrosslister], ##55: ['Check all courses & their sections in semester', all_semester_course_sanity_check], #4: ['List students who passed quiz X', get_quiz_passers], # TODO wanted: group shell for each GP (guided pathway) as a basic student services gateway.... # } print ('') if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]()