1357 lines
48 KiB
Python
1357 lines
48 KiB
Python
import json, re, requests, codecs, sys, time, funcy, os
|
|
import pandas as pd
|
|
from dateutil import parser
|
|
from datetime import datetime
|
|
from util import print_table, int_or_zero, float_or_zero
|
|
|
|
from pipelines import fetch, fetch_stream, getSemesterSchedule, fetch_collapse, header, url, shortToLongSem
|
|
from pipelines import sems
|
|
from localcache import users_new_this_semester, db, course_quick_stats, get_courses_in_term_local, course_student_stats, all_sem_courses_teachers, full_reload
|
|
from collections import defaultdict
|
|
|
|
|
|
|
|
stem_course_id = '11015' # TODO
|
|
|
|
|
|
#########
|
|
######### GET FACTS FROM INDIVIDUAL COURSES
|
|
#########
|
|
#########
|
|
|
|
# Gott 1 Bootcamp - report on who completed it.
|
|
def get_gott1_passers():
|
|
course = '1561'
|
|
|
|
min_passing = 85
|
|
passers_filename = 'cache/teacherdata/bootcamp_passed.csv'
|
|
still_active_filename = 'cache/teacherdata/bootcamp_active.csv'
|
|
get_course_passers(course, min_passing, passers_filename, still_active_filename)
|
|
|
|
# Plagiarism Module - report on who completed it.
|
|
def get_plague_passers():
|
|
course = '11633'
|
|
min_passing = 85
|
|
passers_filename = 'cache/teacherdata/plagiarism_passed.csv'
|
|
still_active_filename = 'cache/teacherdata/plagiarism_active.csv'
|
|
(passed, didnt) = get_course_passers(course, min_passing, passers_filename, still_active_filename)
|
|
passed = set( [z[2] for z in passed] )
|
|
didnt = set( [z[2] for z in didnt] )
|
|
enrol = [ [ str(z) for z in list(course_enrollment(cr)) ] for cr in ['11677','11698'] ]
|
|
|
|
print(enrol)
|
|
|
|
enrol = set(funcy.cat(enrol))
|
|
everyone = passed.union(didnt,enrol)
|
|
|
|
reportable = passed.intersection(enrol)
|
|
outputfile = open('cache/plagcheck.txt','w').write( json.dumps( [ list(reportable), list(enrol), list(passed), list(didnt), list(everyone) ],indent=2))
|
|
return 1
|
|
|
|
#enrol = { cr: [ str(z) for z in list(course_enrollment(cr).keys()) ] for cr in ['11677','11698',] }
|
|
# # [x['user_id'] for x in course_enrollment(cr)]
|
|
outputfile = open('cache/plagcheck.txt','w').write( json.dumps( [ [z[2] for z in passed],[z[2] for z in didnt],enrol],indent=2))
|
|
return 1
|
|
|
|
passed_d = {}
|
|
didnt_d = {}
|
|
|
|
output_by_course = {}
|
|
course_s = {}
|
|
|
|
for p in passed: passed_d[str(p[2])] = p
|
|
for p in didnt: didnt_d[str(p[2])] = p
|
|
|
|
passed_s = [ str(k) for k in passed_d.keys() ]
|
|
didnt_s = [ str(k) for k in didnt_d.keys() ]
|
|
|
|
|
|
crossref = ['11677','11698',]
|
|
|
|
outputfile = open('cache/plagcheck.txt','w')
|
|
oo = { 'passed': passed_d, 'didnt': didnt_d }
|
|
|
|
for cr in crossref:
|
|
student_int = course_enrollment(cr)
|
|
student_d = { str(k): v for k,v in student_int.items() }
|
|
oo[cr] = student_d
|
|
|
|
output_by_course[cr] = { 'passed':{}, 'didnt':{}, 'missing':{} }
|
|
|
|
course_s[cr] = set( [ str(k) for k in student_d.keys() ])
|
|
|
|
for k,v in student_d.items():
|
|
key_s = str(k)
|
|
|
|
if key_s in passed_d:
|
|
output_by_course[cr]['passed'][key_s] = passed_d[key_s]
|
|
elif key_s in didnt_d:
|
|
output_by_course[cr]['didnt'][key_s] = didnt_d[key_s]
|
|
else:
|
|
output_by_course[cr]['missing'][key_s] = v['user']
|
|
|
|
oo['final_output'] = output_by_course
|
|
oo['passed_s'] = list(passed_s)
|
|
oo['didnt_s'] = list(didnt_s)
|
|
|
|
course_sd = {k: list(v) for k,v in course_s.items() }
|
|
|
|
oo['course_s'] = course_sd
|
|
|
|
outputfile.write(json.dumps(oo,indent=2))
|
|
|
|
|
|
# Who, in a class, passed?
|
|
def get_course_passers(course, min_passing, passers_filename, still_active_filename):
|
|
path = url + '/api/v1/courses/%s/enrollments' % str(course)
|
|
|
|
tempout = open('cache/passers_temp.txt','w')
|
|
|
|
enrl = fetch( path, 0)
|
|
passed = []
|
|
didnt = []
|
|
for E in enrl:
|
|
try:
|
|
n = E['user']['name']
|
|
oo = E['user']['sis_user_id']
|
|
i = str(E['user_id'])
|
|
r = E['role']
|
|
g = E['grades']['current_score']
|
|
l = E['last_activity_at']
|
|
p = float_or_zero(g) > min_passing
|
|
print( "%s: a %s, grade of %s. Passed? %s. Last seen: %s" % (n,r,str(g),str(p),l) )
|
|
|
|
tempout.write(json.dumps(E['user']['name']) + "\n")
|
|
tempout.write(json.dumps(E['grades'],indent=2) + "\n\n-----\n\n")
|
|
|
|
if p:
|
|
passed.append( [n, oo, i, r, g, l ] )
|
|
else:
|
|
didnt.append( [n, oo, i, r, g, l ] )
|
|
except:
|
|
pass
|
|
|
|
columns = ['name', 'goo','canvas_id','role','grade','last_activity']
|
|
pp = pd.DataFrame(passed, columns=columns)
|
|
pp.sort_values(by='last_activity',inplace=True)
|
|
pp.to_csv(passers_filename, index=False)
|
|
dd = pd.DataFrame(didnt, columns=columns)
|
|
dd.sort_values(by='last_activity',inplace=True)
|
|
dd.to_csv(still_active_filename, index=False)
|
|
|
|
print("Saved output to \n - passed: %s\n - not passed: %s\n" % (passers_filename, still_active_filename))
|
|
return (passed,didnt)
|
|
|
|
|
|
# Gott 1A
|
|
"""course = '2908'
|
|
quiz = '15250'
|
|
pass_grade = 0.90
|
|
|
|
path = url + '/api/v1/courses/%s/quizzes/%s/submissions' % (course,quiz)
|
|
q_subs = fetch_collapse(path, 'quiz_submissions')
|
|
for Q in q_subs:
|
|
prct = float_or_zero(Q['score']) / float_or_zero( Q['quiz_points_possible'] )
|
|
print( 'Passed: %s\t Score: %s,\tUser: %s' % \
|
|
( str(prct>0.9), str(int_or_zero(Q['score'])), Q['user_id'] ))"""
|
|
|
|
|
|
|
|
# Who, in a class and a quiz, passed?
|
|
def get_quiz_passers():
|
|
# Gott 1 Bootcamp
|
|
course = '1561'
|
|
path = url + '/api/v1/courses/%s/enrollments' % course
|
|
enrl = fetch( path, 0)
|
|
min_passing = 85
|
|
passed = []
|
|
didnt = []
|
|
for E in enrl:
|
|
try:
|
|
n = E['user']['name']
|
|
i = E['user_id']
|
|
r = E['role']
|
|
g = E['grades']['current_score']
|
|
l = E['last_activity_at']
|
|
p = float_or_zero(g) > min_passing
|
|
print( "%s: a %s, grade of %s. Passed? %s. Last seen: %s" % (n,r,str(g),str(p),l) )
|
|
if p:
|
|
passed.append( [n, i, r, g, l ] )
|
|
else:
|
|
didnt.append( [n, i, r, g, l ] )
|
|
except:
|
|
pass
|
|
|
|
columns = ['name','canvas_id','role','grade','last_activity']
|
|
pp = pd.DataFrame(passed, columns=columns)
|
|
pp.sort_values(by='last_activity',inplace=True)
|
|
pp.to_csv('cache/teacherdata/bootcamp_passed.csv', index=False)
|
|
dd = pd.DataFrame(didnt, columns=columns)
|
|
dd.sort_values(by='last_activity',inplace=True)
|
|
dd.to_csv('cache/teacherdata/bootcamp_active.csv', index=False)
|
|
|
|
print("Saved output to ./teachers/bootcamp_*")
|
|
|
|
# Gott 1A
|
|
"""course = '2908'
|
|
quiz = '15250'
|
|
pass_grade = 0.90
|
|
|
|
path = url + '/api/v1/courses/%s/quizzes/%s/submissions' % (course,quiz)
|
|
q_subs = fetch_collapse(path, 'quiz_submissions')
|
|
for Q in q_subs:
|
|
prct = float_or_zero(Q['score']) / float_or_zero( Q['quiz_points_possible'] )
|
|
print( 'Passed: %s\t Score: %s,\tUser: %s' % \
|
|
( str(prct>0.9), str(int_or_zero(Q['score'])), Q['user_id'] ))"""
|
|
|
|
|
|
|
|
|
|
# Change courses to show 2 announcements
|
|
def change_course_ann_homepage(id="10458"):
|
|
u = url + "/api/v1/courses/%s/settings" % id
|
|
data = { 'show_announcements_on_home_page':'true', \
|
|
'home_page_announcement_limit':'2'}
|
|
r = requests.put(u, data=data, headers=header)
|
|
print(r.text)
|
|
|
|
|
|
|
|
# All students enrolled in a class in the given semester. Simpler verson of below. Return SET of course_ids.
|
|
def users_in_semester():
|
|
all_c = getCoursesInTerm('65',0,0) # fall 2020 TODO
|
|
all_s = set()
|
|
for c in all_c:
|
|
for u in course_enrollment(c['id']).values():
|
|
if u['type'] != "StudentEnrollment": continue
|
|
all_s.add(u['id'])
|
|
return all_s
|
|
|
|
|
|
#
|
|
# All students in STEM (or any list of depts.. match the course_code). Return SET of canvas ids.
|
|
def users_in_depts_live(depts=[], termid='171'):
|
|
courses_by_dept = {}
|
|
students_by_dept = {}
|
|
|
|
all_c = getCoursesInTerm(termid,0,0)
|
|
codecs.open('cache/courses_in_term_%s.json' % termid,'w','utf-8').write( json.dumps(all_c,indent=2) )
|
|
for c in all_c:
|
|
#print(c['course_code'])
|
|
for d in depts:
|
|
#print("Dept: %s" % d)
|
|
match = re.search('^(%s)' % d, c['course_code'])
|
|
if match:
|
|
print("Getting enrollments for %s" % c['course_code'])
|
|
if d in courses_by_dept: courses_by_dept[d].append(c)
|
|
else: courses_by_dept[d] = [ c, ]
|
|
for u in course_enrollment(c['id']).values():
|
|
if u['type'] != "StudentEnrollment": continue
|
|
if not (d in students_by_dept):
|
|
students_by_dept[d] = set()
|
|
students_by_dept[d].add(u['user_id'])
|
|
continue
|
|
print(students_by_dept)
|
|
codecs.open('cache/students_by_dept_in_term_%s.json' % termid,'w','utf-8').write( str(students_by_dept) )
|
|
all_students = set()
|
|
for dd in students_by_dept.values(): all_students.update(dd)
|
|
codecs.open('cache/all_students_in_depts_in_term_%s.json' % termid,'w','utf-8').write( str(all_students) )
|
|
return all_students
|
|
|
|
|
|
|
|
def course_enrollment(id=''):
|
|
print("Getting enrollments for course id %s" % str(id))
|
|
if not id:
|
|
id = input('Course id? ')
|
|
t = url + '/api/v1/courses/%s/enrollments?role[]=StudentEnrollment' % str(id)
|
|
print(t)
|
|
emts = fetch(t,0)
|
|
print(emts)
|
|
emt_by_id = {}
|
|
for E in emts:
|
|
print(E)
|
|
try:
|
|
emt_by_id[E['user_id']] = E
|
|
except Exception as exp:
|
|
print("Skipped that class with this exception: %s" % str(exp))
|
|
ff = codecs.open('cache/courses/%s.json' % str(id), 'w', 'utf-8')
|
|
ff.write(json.dumps(emt_by_id, indent=2))
|
|
print( " %i results" % len(emts) )
|
|
return emt_by_id
|
|
|
|
|
|
def askForTerms():
|
|
user_input = input("The term id? (separate multiples with commas) ")
|
|
return user_input.split(",")
|
|
|
|
"""
|
|
names = []
|
|
if not term:
|
|
s = url + '/api/v1/accounts/1/terms?workflow_state[]=all'
|
|
s = fetch_collapse(s,"enrollment_terms",1)
|
|
print(json.dumps(s,indent=2))
|
|
print("Terms: ")
|
|
for u in s:
|
|
print(str(u['id']) + "\t" + u['name'])
|
|
#print json.dumps(results_dict,indent=2)
|
|
term = input("The term id? ")
|
|
"""
|
|
|
|
|
|
|
|
# Return a list of term names and IDs. Also store in cache/courses/terms.txt
|
|
def getTerms(printme=1, ask=1):
|
|
s = url + '/api/v1/accounts/1/terms' #?workflow_state[]=all'
|
|
terms = fetch_collapse(s,'enrollment_terms')
|
|
ff = codecs.open('cache/courses/terms.txt', 'w', 'utf-8') # TODO unsafe overwrite
|
|
#print(terms)
|
|
ff.write(json.dumps(terms, indent=2))
|
|
ff.close()
|
|
|
|
if printme:
|
|
print("Terms: ")
|
|
for u in terms:
|
|
print(str(u['id']) + "\t" + u['name'])
|
|
if ask:
|
|
return input("The term id? ")
|
|
return terms
|
|
|
|
def getCourses(x=0): # a dict
|
|
if not x:
|
|
user_input = input("The Course IDs to get? (separate with spaces: ")
|
|
courselist = list(map(int, user_input.split()))
|
|
else:
|
|
courselist = [x, ]
|
|
|
|
for id in courselist:
|
|
t = url + '/api/v1/courses/' + str(id) # + '?perpage=100'
|
|
t = fetch(t,1)
|
|
print(t)
|
|
return t
|
|
|
|
|
|
def update_course_conclude(courseid="13590",enddate='2021-12-23T01:00Z'):
|
|
(connection,cursor) = db()
|
|
q = "SELECT * FROM courses AS c WHERE c.code LIKE '%FA21%' AND c.conclude='2021-08-29 07:00:00.000'"
|
|
result = cursor.execute(q)
|
|
for R in result:
|
|
try:
|
|
#print(R)
|
|
print('doing course: %s' % R[6])
|
|
courseid = R[1]
|
|
#d = getCourses(courseid)
|
|
#print("\tconclude on: %s" % d['end_at'])
|
|
|
|
data = { 'course[end_at]': enddate }
|
|
t = url + '/api/v1/courses/' + str(courseid)
|
|
r3 = requests.put(t, headers=header, params=data)
|
|
#print(" " + r3.text)
|
|
except Exception as e:
|
|
print('****%s' % str(e))
|
|
|
|
# Relevant stuff trying to see if its even being used or not
|
|
def course_term_summary_local(term="176",term_label="FA22"):
|
|
O = "\t<li>Course: <a href='%s' target='_blank' class='%s'>%s</a><br />Status: <b>%s</b><br />Teacher: %s<br />Number students: %s</li>\n"
|
|
courses = get_courses_in_term_local(term)
|
|
oo = codecs.open('cache/semester_summary.html','w','utf-8')
|
|
oo.write('<style>.a{background-color:yellow;}.b{background-color:pink;}</style><ul>\n')
|
|
|
|
for C in sorted(courses):
|
|
style = ''
|
|
info = course_quick_stats(C[3])
|
|
sinfo = course_student_stats(C[3])
|
|
D = list(C)
|
|
D.append(info)
|
|
D.append(sinfo)
|
|
#print(D)
|
|
if D[6][0][0] == 0: continue
|
|
if D[2] == 'claimed': style="a"
|
|
mystr = O % ( "https://ilearn.gavilan.edu/courses/"+str(D[3]), style, D[1], D[2], str(', '.join(D[5])), str(D[6][0][0]))
|
|
print(D[1])
|
|
oo.write(mystr )
|
|
oo.flush()
|
|
#print(info)
|
|
oo.write('\n</ul>\n')
|
|
|
|
# Relevant stuff trying to see if its even being used or not
|
|
def course_term_summary(term="176",term_label="FA22"):
|
|
print("Summary of %s" % term_label)
|
|
courses = getCoursesInTerm(term,0,0)
|
|
|
|
print("output to cache/term_summary.txt")
|
|
outp = codecs.open('cache/term_summary.txt','w','utf-8')
|
|
|
|
tup = tuple("id course_code default_view workflow_state".split(" "))
|
|
smaller = [ funcy.project(x , tup) for x in courses ]
|
|
#print(json.dumps(smaller, indent=2))
|
|
by_code = {}
|
|
(connection,cursor) = db()
|
|
(pub, not_pub) = funcy.split( lambda x: x['workflow_state'] == "available", smaller)
|
|
|
|
for S in smaller:
|
|
print(S)
|
|
by_code[ S['course_code'] ] = str(S) + "\n"
|
|
outp.write( str(S) + "\n" )
|
|
q = """SELECT c.id AS courseid, c.code, tt.name, c.state, COUNT(u.id) AS student_count FROM courses AS c
|
|
JOIN enrollment AS e ON e.course_id=c.id
|
|
JOIN users AS u ON u.id=e.user_id
|
|
JOIN ( SELECT c.id AS courseid, u.id AS userid, c.code, u.name FROM courses AS c
|
|
JOIN enrollment AS e ON e.course_id=c.id
|
|
JOIN users AS u ON u.id=e.user_id
|
|
WHERE c.canvasid=%s
|
|
AND e."type"="TeacherEnrollment" ) AS tt ON c.id=tt.courseid
|
|
WHERE c.canvasid=%s
|
|
AND e."type"="StudentEnrollment"
|
|
GROUP BY c.code ORDER BY c.state, c.code""" % (S['id'],S['id'])
|
|
result = cursor.execute(q)
|
|
for R in result:
|
|
print(R)
|
|
by_code[ S['course_code'] ] += str(R) + "\n"
|
|
outp.write( str(R) + "\n\n" )
|
|
pages = fetch(url + "/api/v1/courses/%s/pages" % S['id'])
|
|
by_code[ S['course_code'] ] += json.dumps(pages, indent=2) + "\n\n"
|
|
modules = fetch(url + "/api/v1/courses/%s/modules" % S['id'])
|
|
by_code[ S['course_code'] ] += json.dumps(modules, indent=2) + "\n\n"
|
|
|
|
print()
|
|
|
|
out2 = codecs.open('cache/summary2.txt','w', 'utf-8')
|
|
|
|
for K in sorted(by_code.keys()):
|
|
out2.write('\n------ ' + K + '\n' + by_code[K])
|
|
out2.flush()
|
|
|
|
return
|
|
|
|
#published = list(funcy.where( smaller, workflow_state="available" ))
|
|
#notpub = list(filter( lambda x: x['workflow_state'] != "available", smaller))
|
|
notpub_ids = [ x['id'] for x in notpub ]
|
|
|
|
#for ix in notpub_ids:
|
|
# # print(course_quick_stats(ix))
|
|
|
|
|
|
outp.write(json.dumps(courses, indent=2))
|
|
|
|
outp2 = codecs.open('cache/term_summary_pub.txt','w','utf-8')
|
|
outp2.write("PUBLISHED\n\n" + json.dumps(published, indent=2))
|
|
outp2.write("\n\n---------\nNOT PUBLISHED\n\n" + json.dumps(notpub, indent=2))
|
|
|
|
# Fetch all courses in a given term
|
|
def getCoursesInTerm(term=0,get_fresh=1,show=0,active=0): # a list
|
|
if not term:
|
|
term = getTerms(1,1)
|
|
ff = 'cache/courses_in_term_%s.json' % str(term)
|
|
if not get_fresh:
|
|
if os.path.isfile(ff):
|
|
return json.loads( codecs.open(ff,'r','utf-8').read() )
|
|
else:
|
|
print(" -> couldn't find cached classes at: %s" % ff)
|
|
|
|
# https://gavilan.instructure.com:443/api/v1/accounts/1/courses?published=true&enrollment_term_id=11
|
|
names = []
|
|
if active:
|
|
active = "published=true&"
|
|
else:
|
|
active = ""
|
|
t = url + '/api/v1/accounts/1/courses?' + active + 'enrollment_term_id=' + str(term) #+ '&perpage=30'
|
|
results = fetch(t,show)
|
|
if show:
|
|
for R in results:
|
|
try:
|
|
print(str(R['id']) + "\t" + R['name'])
|
|
except Exception as e:
|
|
print("Caused a problem: ")
|
|
print(R)
|
|
#print json.dumps(results,indent=2)
|
|
info = []
|
|
for a in results:
|
|
names.append(a['name'])
|
|
info.append( [a['id'], a['name'], a['workflow_state'] ] )
|
|
if show: print_table(info)
|
|
codecs.open(ff, 'w', 'utf-8').write(json.dumps(results,indent=2))
|
|
return results
|
|
|
|
|
|
def getCoursesTermSearch(term=0,search='',v=0):
|
|
term = term or input("term id? ")
|
|
search = search or input("What to search for? ")
|
|
|
|
s = url + '/api/v1/accounts/1/courses?enrollment_term_id=%s&search_term=%s' % ( str(term) , search )
|
|
if v: print(s)
|
|
|
|
courses = fetch(s)
|
|
if v: print(json.dumps(courses,indent=2))
|
|
return courses
|
|
|
|
def courseLineSummary(c,sections={}):
|
|
ss = "\t"
|
|
crn = "\t"
|
|
host = ""
|
|
if 'crn' in c:
|
|
crn = "crn: %s\t" % c['crn']
|
|
|
|
if c['id'] in sections:
|
|
ss = "section: %s\t" % str(sections[c['id']])
|
|
|
|
if 'host' in c:
|
|
host = "send to crn: %s\t" % c['host']
|
|
|
|
out = "%i\t%s%s%s%s" % (c['id'], ss ,crn, host, c['name'])
|
|
return out
|
|
|
|
def xlistLineSummary(c,sections={}):
|
|
# can_id incoming_sec_id crn name
|
|
|
|
new_sec = "missing"
|
|
if 'partner' in c and 'sectionid' in c['partner']:
|
|
new_sec = c['partner']['sectionid']
|
|
|
|
out = "can_id:%i\t new_sec_id:%s\t crn:%s\t %s" % (c['id'], new_sec ,c['crn'], c['name'])
|
|
return out
|
|
|
|
def numbers_in_common(L):
|
|
# how many leading numbers do the strings in L share?
|
|
for i in [0,1,2,3,4]:
|
|
number = L[0][i]
|
|
for s in L:
|
|
#print("%s -> %s" % (number,s[i]))
|
|
if s[i] != number: return i
|
|
return 5
|
|
|
|
def combined_name(nic,L):
|
|
# string with prettier section numbers combined
|
|
if len(L) < 2:
|
|
return L[0]
|
|
if nic < 2:
|
|
return "/".join(L)
|
|
L_mod = [ x[nic:6] for x in L]
|
|
L_mod[0] = L[0]
|
|
new_name = "/".join(L_mod)
|
|
#print(nic, " ", L_mod)
|
|
return new_name
|
|
|
|
|
|
def semester_cross_lister():
|
|
checkfile = codecs.open('cache/xlist_check.html','w','utf-8')
|
|
checkfile.write('<html><body><table>\n')
|
|
|
|
current_term = '178'
|
|
xlistfile = codecs.open('cache/sp23_crosslist.csv','r','utf-8').readlines()[1:]
|
|
by_section = {}
|
|
by_group = defaultdict( list )
|
|
crn_to_canvasid = {}
|
|
crn_to_canvasname = {}
|
|
crn_to_canvascode = {}
|
|
|
|
get_fresh = 0
|
|
if get_fresh:
|
|
c = getCoursesInTerm(178,0,0) # sp23
|
|
codecs.open('cache/courses_in_term_178.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
else:
|
|
c = json.loads( codecs.open('cache/courses_in_term_178.json','r','utf-8').read() )
|
|
|
|
for C in c:
|
|
if 'sis_course_id' in C and C['sis_course_id']:
|
|
crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id'])
|
|
crn_to_canvasname[C['sis_course_id'][7:13]] = str(C['name'])
|
|
crn_to_canvascode[C['sis_course_id'][7:13]] = str(C['course_code'])
|
|
# "Term","PrtTerm","xlstGroup","Subject","CrseNo","EffectCrseTitle","CRN","Session","SecSchdType","AttnMeth","MtgSchdType","MtgType","MaxEnroll","TotalEnroll","SeatsAvail","Bldg","Room","Units","LecHrs","LabHrs","HrsPerDay","HrsPerWk","TotalHrs","Days","D/E","Wks","BegTime","EndTime","StartDate","EndDate","LastName","FirstName","PercentResp"
|
|
for xc in xlistfile:
|
|
parts = xc.split(r',')
|
|
course = parts[3] + " " + parts[4]
|
|
group = parts[2]
|
|
crn = parts[6]
|
|
|
|
if crn in crn_to_canvasid:
|
|
cid = crn_to_canvasid[crn]
|
|
oldname = crn_to_canvasname[crn]
|
|
oldcode = crn_to_canvascode[crn]
|
|
else:
|
|
print("! Not seeing crn %s in canvas semester" % crn)
|
|
cid = ''
|
|
oldname = ''
|
|
oldcode = ''
|
|
|
|
if crn in by_section: continue
|
|
by_section[crn] = [crn, course, group, cid, oldname, oldcode]
|
|
by_group[group].append( [crn, course, group, cid, oldname, oldcode] )
|
|
|
|
for x in by_section.values():
|
|
print(x)
|
|
href = '<a target="_blank" href="%s">%s</a>' % ('https://ilearn.gavilan.edu/courses/'+x[3]+'/settings#tab-details', x[3])
|
|
checkfile.write('<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>' % (x[0],x[2],x[1],href) )
|
|
checkfile.write('</table></body></html>')
|
|
|
|
print("GROUPS")
|
|
for y in by_group.keys():
|
|
sects = [ z[0] for z in by_group[y] ]
|
|
sects.sort()
|
|
nic = numbers_in_common(sects)
|
|
new_sec = combined_name(nic,sects)
|
|
new_name = by_group[y][0][4][0:-5] + new_sec
|
|
new_code = by_group[y][0][5][0:-5] + new_sec
|
|
print(y)
|
|
print("\t", sects)
|
|
#print("\tThey share %i leading numbers" % nic)
|
|
print("\t", by_group[y])
|
|
print("\t", new_name)
|
|
print()
|
|
|
|
host_id = by_group[y][0][3]
|
|
sections = by_group[y][1:]
|
|
|
|
for target_section in sections:
|
|
xlist_ii(target_section[3],host_id,new_name,new_code)
|
|
|
|
|
|
def xlist_ii(parasite_id,host_id,new_name,new_code):
|
|
print("Parasite id: ",parasite_id," Host id: ", host_id)
|
|
print("New name: ", new_name)
|
|
xyz = input("Perform cross list? Enter for yes, n for no: ")
|
|
if xyz != 'n':
|
|
uu = url + '/api/v1/courses/%s/sections' % parasite_id
|
|
c_sect = fetch(uu)
|
|
#print(json.dumps(c_sect,indent=2))
|
|
if len(c_sect) > 1:
|
|
print("* * * * Already Crosslisted!!")
|
|
return
|
|
if not c_sect:
|
|
print("* * * * Already Crosslisted!!")
|
|
return
|
|
else:
|
|
parasite_sxn_id = str(c_sect[0]['id'])
|
|
print("Parasite section id: ", parasite_sxn_id)
|
|
|
|
u = url + "/api/v1/sections/%s/crosslist/%s" % (parasite_sxn_id,host_id)
|
|
print(u)
|
|
res = requests.post(u, headers = header)
|
|
print(res.text)
|
|
|
|
u3 = url + "/api/v1/courses/%s" % host_id
|
|
data = {'course[name]': new_name, 'course[course_code]': new_code}
|
|
print(data)
|
|
print(u3)
|
|
r3 = requests.put(u3, headers=header, params=data)
|
|
print(r3.text)
|
|
print("\n\n")
|
|
|
|
def all_semester_course_sanity_check():
|
|
c = getCoursesInTerm(178,0,0) # sp23
|
|
codecs.open('cache/courses_in_term_178.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
output = codecs.open('cache/courses_w_sections.csv','w','utf-8')
|
|
output.write( ",".join(['what','id','parent_course_id','sis_course_id','name']) + "\n" )
|
|
output2 = codecs.open('cache/courses_checker.csv','w','utf-8')
|
|
output2.write( ",".join(['id','sis_course_id','name','state','students']) + "\n" )
|
|
i = 0
|
|
for course in c:
|
|
u2 = url + '/api/v1/courses/%s?include[]=total_students' % str(course['id'])
|
|
course['info'] = fetch(u2)
|
|
#print(json.dumps(course['info'],indent=2))
|
|
ts = '?'
|
|
try:
|
|
ts = course['info']['total_students']
|
|
except Exception as e:
|
|
pass
|
|
info = [ 'course', course['id'], '', course['sis_course_id'], course['name'], course['workflow_state'], ts ]
|
|
info = list(map(str,info))
|
|
info2 = [ course['id'], course['sis_course_id'], course['name'], course['workflow_state'], ts ]
|
|
info2 = list(map(str,info2))
|
|
output2.write( ",".join(info2) + "\n" )
|
|
output2.flush()
|
|
print(info2)
|
|
output.write( ",".join(info) + "\n" )
|
|
#uu = url + '/api/v1/courses/%s/sections' % str(course['id'])
|
|
#course['sections'] = fetch(uu)
|
|
#s_info = [ [ 'section', y['id'], y['course_id'], y['sis_course_id'], y['name'], y['total_students'] ] for y in course['sections'] ]
|
|
#for row in s_info:
|
|
# print(row)
|
|
# output.write( ",".join( map(str,row) ) + "\n" )
|
|
output.flush()
|
|
i += 1
|
|
if i % 5 == 0:
|
|
codecs.open('cache/courses_w_sections.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
codecs.open('cache/courses_w_sections.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
|
|
|
|
def eslCrosslister():
|
|
fives = []
|
|
sevens = []
|
|
others = []
|
|
|
|
course_by_crn = {}
|
|
|
|
sections = {}
|
|
|
|
combos = [ [y.strip() for y in x.split(',') ] for x in open('cache/xcombos.txt','r').readlines() ]
|
|
|
|
combo_checklist = [ 0 for i in range(len(combos)) ]
|
|
|
|
#print("\n\nCombos:")
|
|
#[ print("%s - %s" % (x[0],x[1])) for x in combos]
|
|
|
|
#return
|
|
|
|
courses = getCoursesTermSearch(62,"ESL",0)
|
|
|
|
for C in courses:
|
|
ma = re.search( r'(\d{5})', C['name'])
|
|
if ma:
|
|
#print("Found Section: %s from course %s" % (ma.group(1), C['name']))
|
|
C['crn'] = ma.group(1)
|
|
course_by_crn[C['crn']] = C
|
|
|
|
if C['name'].startswith("ESL5"): fives.append(C)
|
|
elif C['name'].startswith("ESL7"): sevens.append(C)
|
|
else: others.append(C)
|
|
|
|
for S in sevens:
|
|
uu = url + '/api/v1/courses/%i/sections' % S['id']
|
|
#print(uu)
|
|
c_sect = fetch(uu)
|
|
print(".",end='')
|
|
#print(json.dumps(c_sect,indent=2))
|
|
if len(c_sect) > 1:
|
|
print("* * * * Already Crosslisted!!")
|
|
if c_sect:
|
|
sections[ S['id'] ] = c_sect[0]['id']
|
|
S['sectionid'] = c_sect[0]['id']
|
|
|
|
if S['crn']:
|
|
for i,co in enumerate(combos):
|
|
if S['crn'] == co[0]:
|
|
S['partner'] = co[1]
|
|
combo_checklist[i] = 1
|
|
course_by_crn[co[1]]['partner'] = S
|
|
elif S['crn'] == co[1]:
|
|
S['partner'] = co[0]
|
|
combo_checklist[i] = 1
|
|
course_by_crn[co[0]]['partner'] = S
|
|
|
|
|
|
print("Others:")
|
|
for F in sorted(others, key=lambda x: x['name']):
|
|
print(courseLineSummary(F))
|
|
|
|
print("\n\nFive hundreds")
|
|
for F in sorted(fives, key=lambda x: x['name']):
|
|
print(courseLineSummary(F))
|
|
|
|
print("\n\nSeven hundreds")
|
|
for F in sorted(sevens, key=lambda x: x['name']):
|
|
print(courseLineSummary(F,sections))
|
|
|
|
|
|
print("\n\nMake a x-list: ")
|
|
for F in sorted(fives, key=lambda x: x['name']):
|
|
if 'partner' in F:
|
|
print(xlistLineSummary(F,sections))
|
|
if 'partner' in F and 'sectionid' in F['partner']:
|
|
if not input('ready to crosslist. Are you? Enter "q" to quit. ') == 'q':
|
|
xlist( F['partner']['sectionid'], F['id'] )
|
|
else:
|
|
break
|
|
for i,c in enumerate(combo_checklist):
|
|
if not c:
|
|
print("Didn't catch: "+ str(combos[i]))
|
|
|
|
def xlist(parasite='', host=''): # section id , new course id
|
|
|
|
host = host or input("ID number of the HOSTING COURSE? ")
|
|
if not parasite:
|
|
parasite = input("ID number of the SECTION to add to above? (or 'q' to quit) ")
|
|
|
|
while parasite != 'q':
|
|
#h_sections = fetch( url + "/api/v1/courses/%s/sections" % str(host))
|
|
#print(h_sections)
|
|
|
|
p_sections = fetch( url + "/api/v1/courses/%s/sections" % str(parasite))
|
|
#print(p_sections)
|
|
parasite_section = p_sections[0]['id']
|
|
# TODO need to get the section id from each course:
|
|
# GET /api/v1/courses/:course_id/sections
|
|
|
|
# POST /api/v1/sections/:id/crosslist/:new_course_id
|
|
# SECTION ID (to move) NEW __COURSE__ ID
|
|
|
|
u = url + "/api/v1/sections/%s/crosslist/%s" % (str(parasite_section),str(host))
|
|
print(u)
|
|
res = requests.post(u, headers = header)
|
|
print(res.text)
|
|
parasite = input("ID number of the SECTION to add to above? ")
|
|
|
|
def unenroll_student(courseid,enrolid):
|
|
t = url + "/api/v1/courses/%s/enrollments/%s" % ( str(courseid), str(enrolid) )
|
|
data = {"task": "delete" }
|
|
r4 = requests.delete(t, headers=header, params=data)
|
|
print(data)
|
|
|
|
#def get_enrollments(courseid):
|
|
# t = url + "/api/v1/courses/%s/enrollments?type=StudentEnrollment" % courseid
|
|
# return fetch(t,1)
|
|
|
|
|
|
def enroll_stem_students_live():
|
|
the_term = '178'
|
|
do_removes = 0
|
|
depts = "MATH BIO CHEM CSIS PHYS PSCI GEOG ASTR ECOL ENVS ENGR".split(" ")
|
|
users_to_enroll = users_in_depts_live(depts, the_term) # term id
|
|
|
|
stem_enrollments = course_enrollment(stem_course_id) # by user_id
|
|
|
|
users_in_stem_shell = set( [ x['user_id'] for x in stem_enrollments.values() ])
|
|
|
|
print("ALL STEM STUDENTS %s" % str(users_to_enroll))
|
|
print("\n\nALREADY IN STEM SHELL %s" % str(users_in_stem_shell))
|
|
|
|
enroll_us = users_to_enroll.difference(users_in_stem_shell)
|
|
#enroll_us = users_to_enroll
|
|
remove_us = users_in_stem_shell.difference(users_to_enroll)
|
|
|
|
print("\n\nTO ENROLL %s" % str(enroll_us))
|
|
(connection,cursor) = db()
|
|
|
|
#xyz = input('enter to continue')
|
|
|
|
|
|
|
|
eee = 0
|
|
uuu = 0
|
|
|
|
if do_removes:
|
|
print("\n\nTO REMOVE %s" % str(remove_us))
|
|
for j in remove_us:
|
|
try:
|
|
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Removing: %s" % s[0])
|
|
r1 = unenroll_student(str(stem_course_id), stem_enrollments[j]['id'])
|
|
print(r1)
|
|
uuu += 1
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
|
|
for j in enroll_us:
|
|
try:
|
|
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Enrolling: %s" % s[0])
|
|
enrollment = { }
|
|
#print(s)
|
|
t = url + '/api/v1/courses/%s/enrollments' % stem_course_id
|
|
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
|
|
'enrollment[enrollment_state]': 'active' }
|
|
#print(data)
|
|
#if input('enter to enroll %s or q to quit: ' % s[0]) == 'q':
|
|
#break
|
|
r3 = requests.post(t, headers=header, params=data)
|
|
print(data)
|
|
eee += 0
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
#print(r3.text)
|
|
print("\n\nTO ENROLL %s" % str(enroll_us))
|
|
#print("\n\nTO REMOVE %s" % str(remove_us))
|
|
return (eee,uuu)
|
|
|
|
|
|
|
|
###########################
|
|
|
|
def enroll_bulk_students_bydept(course_id, depts, the_term="172", cautious=1): # a string, a list of strings
|
|
users_to_enroll = users_in_depts_live(depts, the_term) # term id
|
|
|
|
targeted_enrollments = course_enrollment(course_id) # by user_id.. (live, uses api)
|
|
|
|
current_enrollments = set( [ x['user_id'] for x in targeted_enrollments.values() ])
|
|
|
|
print("ALL TARGET STUDENTS %s" % str(users_to_enroll))
|
|
print("\nALREADY IN SHELL %s" % str(current_enrollments))
|
|
|
|
enroll_us = users_to_enroll.difference(current_enrollments)
|
|
remove_us = current_enrollments.difference(users_to_enroll)
|
|
|
|
print("\n\nTO ENROLL %s" % str(enroll_us))
|
|
xyz = input('enter to continue')
|
|
print("\n\nTO REMOVE %s" % str(remove_us))
|
|
|
|
(connection,cursor) = db()
|
|
|
|
|
|
for j in remove_us:
|
|
try:
|
|
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Removing: %s" % s[0])
|
|
|
|
## TODO not done here
|
|
# r1 = unenroll_student(str(course_id), stem_enrollments[j]['id'])
|
|
#print(r1)
|
|
|
|
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
|
|
for j in enroll_us:
|
|
try:
|
|
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print("Enrolling: %s" % s[0])
|
|
enrollment = { }
|
|
#print(s)
|
|
t = url + '/api/v1/courses/%s/enrollments' % course_id
|
|
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
|
|
'enrollment[enrollment_state]': 'active' }
|
|
|
|
if cautious:
|
|
print(t)
|
|
print(data)
|
|
prompt = input('enter to enroll %s, k to go ahead with everyone, or q to quit: ' % s[0])
|
|
if prompt == 'q':
|
|
break
|
|
elif prompt == 'k':
|
|
cautious = 0
|
|
r3 = requests.post(t, headers=header, params=data)
|
|
if cautious:
|
|
print(data)
|
|
time.sleep(0.600)
|
|
except Exception as e:
|
|
print("Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
#print(r3.text)
|
|
|
|
|
|
|
|
def enroll_art_students_live():
|
|
depts = "THEA ART DM MUS MCTV".split(" ")
|
|
course_id = "13717"
|
|
enroll_bulk_students_bydept(course_id,depts)
|
|
print("done.")
|
|
|
|
def enroll_orientation_students():
|
|
ori_shell_id = "15924" # 2023 orientation shell # 2022: "9768"
|
|
the_semester = "202330"
|
|
|
|
users_to_enroll = users_new_this_semester(the_semester) ### ##### USES LOCAL DB
|
|
users_in_ori_shell = set( \
|
|
[ str(x['user_id']) for x in course_enrollment(ori_shell_id).values() ])
|
|
|
|
print("ALL ORIENTATION STUDENTS %s" % str(users_to_enroll))
|
|
print("\n\nALREADY IN ORI SHELL %s" % str(users_in_ori_shell))
|
|
|
|
enroll_us = users_to_enroll.difference(users_in_ori_shell)
|
|
|
|
print("\n\nTO ENROLL %s" % str(enroll_us))
|
|
print("%i new users to enroll." % len(enroll_us))
|
|
|
|
eee = 0
|
|
uuu = 0
|
|
|
|
(connection,cursor) = db()
|
|
|
|
for j in enroll_us:
|
|
s = ""
|
|
try:
|
|
q = "SELECT name,canvasid FROM users WHERE canvasid=%s" % j
|
|
cursor.execute(q)
|
|
s = cursor.fetchall()
|
|
if s:
|
|
s = s[0]
|
|
print(" + Enrolling: %s" % s[0])
|
|
t = url + '/api/v1/courses/%s/enrollments' % ori_shell_id
|
|
data = { 'enrollment[user_id]': j, 'enrollment[type]':'StudentEnrollment',
|
|
'enrollment[enrollment_state]': 'active' }
|
|
#print(data)
|
|
r3 = requests.post(t, headers=header, params=data)
|
|
eee += 1
|
|
#print(r3.text)
|
|
time.sleep(0.400)
|
|
except Exception as e:
|
|
print(" - Something went wrong with id %s, %s, %s" % (j, str(s), str(e)))
|
|
return (eee,uuu)
|
|
|
|
def enroll_o_s_students():
|
|
#full_reload()
|
|
|
|
(es,us) = enroll_stem_students_live()
|
|
(eo, uo) = enroll_orientation_students()
|
|
|
|
print("Enrolled %i and unenrolled %i students in STEM shell" % (es,us))
|
|
print("Enrolled %i students in Orientation shell" % eo)
|
|
|
|
|
|
|
|
|
|
def make_ztc_list(sem='sp20'):
|
|
sched = json.loads(open('output/semesters/2020spring/sp20_sched.json','r').read())
|
|
responses = open('cache/ztc_responses_sp20.csv','r').readlines()[1:]
|
|
|
|
result = open('cache/ztc_crossref.csv','w')
|
|
result.write('Course,Section,Name,Teacher,ZTC teacher\n')
|
|
|
|
ztc_dict = {}
|
|
for R in responses:
|
|
R = re.sub(',Yes','',R)
|
|
R = re.sub('\s\s+',',',R)
|
|
|
|
parts = R.split(r',') #name courselist yes
|
|
#print(parts[1])
|
|
name = parts[0]
|
|
|
|
for C in parts[1:] :
|
|
C = C.strip()
|
|
#print(C)
|
|
if C in ztc_dict:
|
|
ztc_dict[C] += ', ' + parts[0]
|
|
else:
|
|
ztc_dict[C] = parts[0]
|
|
print(ztc_dict)
|
|
for CO in sched:
|
|
#if re.match(r'CWE',CO['code']):
|
|
#print(CO)
|
|
|
|
if CO['code'] in ztc_dict:
|
|
print(('Possible match, ' + CO['code'] + ' ' + ztc_dict[CO['code']] + ' is ztc, this section taught by: ' + CO['teacher'] ))
|
|
result.write( ','.join( [CO['code'] ,CO['crn'] , CO['name'] , CO['teacher'] , ztc_dict[CO['code']] ]) + "\n" )
|
|
|
|
def course_search_by_sis():
|
|
term = 65
|
|
all_courses = getCoursesInTerm(term)
|
|
all = []
|
|
for course in all_courses:
|
|
#u = "/api/v1/accounts/1/courses/%s" % course_id
|
|
#i = fetch( url + u)
|
|
all.append([ course['name'], course['sis_course_id'] ])
|
|
print_table(all)
|
|
# print(json.dumps(x, indent=2))
|
|
|
|
|
|
def mod_eval_visibility( shell_id, visible=True ):
|
|
evals_hidden = True
|
|
if (visible): evals_hidden = False
|
|
data = {'position':2, 'hidden':evals_hidden}
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % shell_id
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
#print(" " + r3.text)
|
|
|
|
|
|
|
|
def instructor_list_to_activate_evals():
|
|
courses = all_sem_courses_teachers()
|
|
|
|
mylist = codecs.open('cache/fa21_eval_teachers.txt','r','utf-8').readlines()
|
|
mylist = [ x.split(',')[2].strip() for x in mylist ]
|
|
|
|
count = 0
|
|
limit = 5000
|
|
|
|
for c in courses:
|
|
shell_id = c[1]
|
|
teacher_id = c[6]
|
|
teacher_name = c[5]
|
|
course_name = c[3]
|
|
|
|
if teacher_id in mylist:
|
|
print("Teacher: %s \t course: %s" % (teacher_name,course_name))
|
|
mod_eval_visibility( shell_id, False)
|
|
count += 1
|
|
if count > limit: return
|
|
|
|
|
|
#print(mylist)
|
|
|
|
|
|
|
|
def add_evals(section=0):
|
|
# show or hide?
|
|
hidden = False
|
|
#s = [ x.strip() for x in codecs.open('cache/sp21_eval_sections.txt','r').readlines()]
|
|
#s = [ x.split(',')[4].split('::') for x in codecs.open('cache/fa22_eval_sections.csv','r').readlines()]
|
|
#s = [ x.strip() for x in codecs.open('cache/fa22_eval_sections.csv','r').readlines()]
|
|
s = [ x.strip() for x in codecs.open('cache/sp23_eval_sections.csv','r').readlines()]
|
|
s = list(funcy.flatten(s))
|
|
s.sort()
|
|
xyz = input('hit return to continue')
|
|
|
|
#c = getCoursesInTerm(168,0,1)
|
|
#c = getCoursesInTerm(174,0,1) # sp22
|
|
#c = getCoursesInTerm(176,0,1) # fa22
|
|
c = getCoursesInTerm(178,0,1) # sp23
|
|
print(c)
|
|
ids = []
|
|
courses = {}
|
|
for C in c:
|
|
if C and 'sis_course_id' in C and C['sis_course_id']:
|
|
parts = C['sis_course_id'].split('-')
|
|
if parts[1] in s:
|
|
print(C['name'])
|
|
courses[str(C['id'])] = C
|
|
ids.append(str(C['id']))
|
|
|
|
ask = 1
|
|
data = {'position':2, 'hidden':hidden}
|
|
|
|
for i in ids:
|
|
if ask:
|
|
a = input("Hit q to quit, a to do all, or enter to activate eval for: " + str(courses[i]))
|
|
if a == 'a': ask = 0
|
|
if a == 'q': return
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % i
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(r3.text)
|
|
time.sleep(0.400)
|
|
|
|
|
|
return 1
|
|
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/12001/tabs"
|
|
r = fetch(u2)
|
|
print(json.dumps(r,indent=2))
|
|
|
|
|
|
|
|
# PUT /api/v1/courses/:course_id/tabs/:tab_id
|
|
|
|
def course_dates_terms(section=0):
|
|
"""s = [ x.strip() for x in codecs.open('cache/fa22_eval_sections.csv','r').readlines()]
|
|
s = list(funcy.flatten(s))
|
|
s.sort()
|
|
xyz = input('hit return to continue')
|
|
"""
|
|
|
|
#c = getCoursesInTerm(168,0,1)
|
|
#c = getCoursesInTerm(174,0,1) # sp22
|
|
#c = getCoursesInTerm(176,0,1) # fa22
|
|
|
|
get_fresh = 0
|
|
|
|
if get_fresh:
|
|
c = getCoursesInTerm(178,0,0) # sp23
|
|
codecs.open('cache/courses_in_term_178.json','w','utf-8').write(json.dumps(c,indent=2))
|
|
else:
|
|
c = json.loads( codecs.open('cache/courses_in_term_178.json','r','utf-8').read() )
|
|
|
|
crn_to_canvasid = {}
|
|
for C in c:
|
|
#print(C['name'])
|
|
if 'sis_course_id' in C and C['sis_course_id']:
|
|
crn_to_canvasid[C['sis_course_id'][7:13]] = str(C['id'])
|
|
|
|
#print(crn_to_canvasid)
|
|
#return
|
|
|
|
s = json.loads( codecs.open('cache/sp23_sched_expanded.json','r','utf-8').read() )
|
|
for S in s:
|
|
start = re.sub( r'\-','/', S['start']) + '/2023'
|
|
d_start = datetime.strptime(start,"%m/%d/%Y")
|
|
|
|
if d_start.month > 5:
|
|
print("Ignoring ", d_start, " starting too late...")
|
|
continue
|
|
|
|
if d_start.month == 1 and d_start.day == 12:
|
|
print("- Aviation ", start, d_start, " - ", S['code'], " ", S['crn'] )
|
|
continue
|
|
|
|
if d_start.month == 1 and d_start.day ==3:
|
|
print("+ winter session: ", d_start, " - ", S['code'])
|
|
winter_term = '177'
|
|
data = {'course[term_id]':winter_term}
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s" % crn_to_canvasid[S['crn']]
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(u2, " OK")
|
|
#print(r3.text)
|
|
continue
|
|
|
|
if d_start.month == 1 and d_start.day == 30:
|
|
# normal class
|
|
continue
|
|
|
|
print("- Late start? ", start, d_start, " - ", S['code'], " ", S['crn'] )
|
|
data = {'course[start_at]':d_start.isoformat(), 'course[restrict_enrollments_to_course_dates]': True}
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s" % crn_to_canvasid[S['crn']]
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(u2, " OK")
|
|
|
|
return
|
|
|
|
|
|
|
|
def remove_n_analytics(section=0):
|
|
print("Fetching list of all active courses")
|
|
|
|
c = getCoursesInTerm(172,1,0)
|
|
print(c)
|
|
ids = []
|
|
courses = {}
|
|
data = {'hidden':True}
|
|
|
|
pause = 1
|
|
|
|
for C in c:
|
|
#print( json.dumps(C,indent=2) )
|
|
parts = C['sis_course_id'].split('-')
|
|
#print("\n")
|
|
print(C['name'])
|
|
courses[str(C['id'])] = C
|
|
ids.append(str(C['id']))
|
|
|
|
u3 = url + '/api/v1/courses/%s/tabs' % str(C['id'])
|
|
tabs = fetch(u3)
|
|
for T in tabs:
|
|
if T['label'] == "New Analytics":
|
|
print( "\tVisibility is: " + T["visibility"] ) # json.dumps(tabs,indent=2) )
|
|
if "hidden" in T:
|
|
print( "\tHidden is: " + str(T["hidden"]) ) # json.dumps(tabs,indent=2) )
|
|
if 1: # T["visibility"] != "admins":
|
|
u4 = url + "/api/v1/courses/%s/tabs/%s" % ( str(C['id']), str(T['id']) )
|
|
print( "\tChanging visiblity of a. tab" )
|
|
r4 = requests.put(u4, headers=header, params=data)
|
|
print("\t" + r4.text)
|
|
if pause:
|
|
xyz = input('\n\nenter for next one or [y] to do all: ')
|
|
if xyz == 'y': pause = 0
|
|
|
|
|
|
exit()
|
|
|
|
|
|
"""ask = 1
|
|
|
|
evals_hidden = True
|
|
|
|
|
|
data = {'position':2, 'hidden':evals_hidden}
|
|
|
|
for i in ids:
|
|
if ask:
|
|
a = input("Hit q to quit, a to do all, or enter to activate eval for: \n " + str(courses[i]) + "\n> ")
|
|
if a == 'a': ask = 0
|
|
if a == 'q': return
|
|
u2 = "https://gavilan.instructure.com:443/api/v1/courses/%s/tabs/context_external_tool_1953" % i
|
|
print(courses[i]['name'])
|
|
r3 = requests.put(u2, headers=header, params=data)
|
|
print(" " + r3.text)
|
|
time.sleep(0.300)
|
|
"""
|
|
|
|
|
|
|
|
def create_sandboxes():
|
|
names = input("what are the initials of people? Separate with spaces ").split()
|
|
for N in names:
|
|
print(N)
|
|
u2 = url + "/api/v1/accounts/1/courses"
|
|
data = {
|
|
"course[name]": "%s Sandbox SU21 G2" % N,
|
|
"course[code]": "%s SU21 G2" % N,
|
|
"course[term_id]": "8",
|
|
}
|
|
#print(u2)
|
|
r3 = requests.post(u2, headers=header, params=data)
|
|
course_data = json.loads(r3.text)
|
|
id = course_data['id']
|
|
u3 = url + "/api/v1/courses/%i/enrollments" % id
|
|
usrid = input("id of %s? " % N)
|
|
data2 = { "enrollment[type]":"TeacherEnrollment", "enrollment[user_id]":usrid}
|
|
r4 = requests.post(u3, headers=header, params=data2)
|
|
#print(json.dumps(json.loads(r4.text),indent=2))
|
|
print()
|
|
|
|
|
|
def course_term_summary_2():
|
|
lines = codecs.open('cache/term_summary.txt','r','utf-8').readlines()
|
|
output = codecs.open('cache/term_summary.html','w','utf-8')
|
|
for L in lines:
|
|
try:
|
|
L = L.strip()
|
|
print(L)
|
|
ll = json.loads(L)
|
|
print(ll)
|
|
print(ll['course_code'])
|
|
if ll['workflow_state'] == 'unpublished':
|
|
ss = "<br />Course: <a href='%s' target='_blank'>%s</a><br />" % ("https://ilearn.gavilan.edu/courses/"+str(ll['id']), ll['course_code'] )
|
|
output.write( ss )
|
|
print(ss+"\n")
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
def get_ext_tools():
|
|
r = url + '/api/v1/accounts/1/external_tools'
|
|
s = fetch(r)
|
|
print(json.dumps(s,indent=2))
|
|
|
|
def set_ext_tools():
|
|
TOOL = 733
|
|
r = url + '/api/v1/accounts/1/external_tools/%s' % str(TOOL)
|
|
data = { 'course_navigation[default]': 'disabled' }
|
|
s = json.loads(requests.put(r, headers=header, params=data).text)
|
|
print(json.dumps(s,indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
options = { 1: ['Cross check schedule with ztc responses',make_ztc_list] ,
|
|
30: ['List latestart classes', list_latestarts ],
|
|
2: ['Add announcements to homepage', change_course_ann_homepage],
|
|
3: ['Cross-list classes', xlist ],
|
|
4: ['List students who passed quiz X', get_quiz_passers],
|
|
5: ['List the terms', getTerms],
|
|
6: ['Cross list helper', eslCrosslister],
|
|
7: ['Show courses in a term', getCoursesInTerm],
|
|
8: ['Save enrollments in a course', course_enrollment],
|
|
9: ['Simple list of course data, search by sis_id', course_search_by_sis],
|
|
10: ['Overview of a term', course_term_summary],
|
|
11: ['Enroll ORIENTATION and STEM student shells after catching up database.', enroll_o_s_students],
|
|
12: ['Enroll stem students', enroll_stem_students_live],
|
|
13: ['Enroll orientation students (refresh local db)', enroll_orientation_students],
|
|
14: ['Enroll ART students', enroll_art_students_live],
|
|
15: ['List users who passed GOTT 1 / Bootcamp', get_gott1_passers],
|
|
16: ['List users who passed Plagiarism Module', get_plague_passers],
|
|
17: ['Remove "new analytics" from all courses navs in a semester', remove_n_analytics],
|
|
18: ['Create some sandbox courses', create_sandboxes],
|
|
19: ['Add course evals', add_evals],
|
|
20: ['process the semester overview output (10)', course_term_summary_2],
|
|
21: ['Add announcements to homepage', change_course_ann_homepage],
|
|
22: ['Get a course info by id',getCourses],
|
|
23: ['Reset course conclude date',update_course_conclude],
|
|
#24: ['Add course evals to whole semester',instructor_list_to_activate_evals],
|
|
25: ['ext tools',get_ext_tools],
|
|
26: ['set ext tools',set_ext_tools],
|
|
27: ['Fine tune term dates and winter session', course_dates_terms],
|
|
28: ['Cross list a semester from file', semester_cross_lister],
|
|
29: ['Check all courses & their sections in semester', all_semester_course_sanity_check],
|
|
# TODO wanted: group shell for each GP (guided pathway) as a basic student services gateway....
|
|
#
|
|
}
|
|
print ('')
|
|
|
|
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
|
|
resp = int(sys.argv[1])
|
|
print("\n\nPerforming: %s\n\n" % options[resp][0])
|
|
|
|
else:
|
|
print ('')
|
|
for key in options:
|
|
print(str(key) + '.\t' + options[key][0])
|
|
|
|
print('')
|
|
resp = input('Choose: ')
|
|
|
|
# Call the function in the options dict
|
|
options[ int(resp)][1]()
|