cleanup ssb version

This commit is contained in:
Peter Howell 2025-10-22 20:54:33 +00:00
parent a01ef8084d
commit f73480ab32
6 changed files with 556 additions and 50 deletions

35
aws.py
View File

@ -412,27 +412,38 @@ def log_section_filling2(current_sched_list):
todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now])
todays_df = todays_df.rename_axis('crn') todays_df = todays_df.rename_axis('crn')
todays_df.index = todays_df.index.astype(str)
#print(todays_df) #print(todays_df)
todays_df.to_csv('cache/reg_today_new.csv', index=True) todays_df.to_csv('cache/reg_today_new.csv', index=True)
csv_path = pathlib.Path('cache') / f'reg_data_{short_sem}.csv'
csv_path.parent.mkdir(parents=True, exist_ok=True)
try: try:
myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') myframe = pd.read_csv(csv_path)
print(myframe) print(myframe)
except: except FileNotFoundError:
fff = open('cache/reg_data_'+short_sem+'.csv','w') myframe = pd.DataFrame(columns=['crn'])
fff.write('crn\n')
fff.close()
myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv')
#myframe = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now])
#myframe = myframe.rename_axis('crn')
print("Creating new data file for this semester.") print("Creating new data file for this semester.")
except pd.errors.EmptyDataError:
myframe = pd.DataFrame(columns=['crn'])
print("Existing data file was empty; starting fresh for this semester.")
new_df = myframe.join( todays_df, on='crn', how='outer' ) if 'crn' in myframe.columns:
new_df = new_df.rename_axis('crn') myframe = myframe.set_index('crn')
print(new_df) else:
myframe = myframe.rename_axis('crn')
myframe.index = myframe.index.astype(str)
combined_df = myframe.reindex(myframe.index.union(todays_df.index))
combined_df[now] = todays_df[now]
combined_df = combined_df.sort_index()
combined_df = combined_df.reset_index()
combined_df = combined_df.fillna('')
print(combined_df)
reg_data_filename = 'reg_data_' + short_sem + '.csv' reg_data_filename = 'reg_data_' + short_sem + '.csv'
new_df.to_csv('cache/' + reg_data_filename, index=False) tmp_path = csv_path.with_suffix(csv_path.suffix + '.tmp')
combined_df.to_csv(tmp_path, index=False)
tmp_path.replace(csv_path)
put_file('/home/public/schedule/', 'cache/', reg_data_filename, 0) put_file('/home/public/schedule/', 'cache/', reg_data_filename, 0)
# Input: xxxx_sched.json. Output: xxxx_latestarts.txt # Input: xxxx_sched.json. Output: xxxx_latestarts.txt

View File

@ -1,4 +1,4 @@
import json, re, requests, codecs, sys, time, funcy, os import json, re, requests, codecs, sys, time, funcy, os, csv, random
import pandas as pd import pandas as pd
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import pytz import pytz
@ -6,8 +6,19 @@ from util import print_table, int_or_zero, float_or_zero, dept_from_name, num_fr
from pipelines import fetch, fetch_stream, fetch_collapse, header, url from pipelines import fetch, fetch_stream, fetch_collapse, header, url
from schedules import get_semester_schedule from schedules import get_semester_schedule
from localcache import course_quick_stats, get_courses_in_term_local, course_student_stats, all_sem_courses_teachers, full_reload from localcache import course_quick_stats, get_courses_in_term_local, course_student_stats, all_sem_courses_teachers, full_reload
from localcache2 import db, users_new_this_semester, users_new_this_2x_semester, course_from_id, user_ids_in_shell from localcache2 import (
from localcache2 import student_count, teacher_list, course_from_id, course_sched_entry_from_id db,
users_new_this_semester,
users_new_this_2x_semester,
course_from_id,
user_ids_in_shell,
student_count,
teacher_list,
course_sched_entry_from_id,
get_orientation_shells,
get_orientation_memberships,
get_student_enrollment_summary,
)
from collections import defaultdict from collections import defaultdict
from semesters import find_term from semesters import find_term
@ -1906,7 +1917,9 @@ def create_sandboxes():
#(23015, ' Sandbox GOTT2 SU25'), #(23015, ' Sandbox GOTT2 SU25'),
#(21898, ' Sandbox GOTT4 SU25'), #(21898, ' Sandbox GOTT4 SU25'),
#(23270, ' Sandbox GOTT1 FA25SEPT'), #(23270, ' Sandbox GOTT1 FA25SEPT'),
(23290, ' Sandbox GOTT2 FA25SEPT'), #(23290, ' Sandbox GOTT2 FA25SEPT'),
(23314, ' Sandbox GOTT5 FA25'),
(23315, ' Sandbox GOTT4 FA25'),
] ]
filepath = 'cache/sandbox_courses.pkl' filepath = 'cache/sandbox_courses.pkl'
@ -2906,6 +2919,242 @@ def remove_all_course_events():
print(f"failed: {response.status_code} {response.text}") print(f"failed: {response.status_code} {response.text}")
# Build a term-wide CSV summarizing student participation metrics for every course.
def build_term_participation_report():
term_alias = input("Term alias (ex: fa25): ").strip()
if not term_alias:
print("No term alias provided; aborting.")
return
normalized_alias = term_alias.lower()
term_record = find_term(normalized_alias)
if not term_record:
print(f"Unknown term alias: {term_alias}")
return
term_id = term_record.get('canvas_term_id')
if not term_id:
print(f"Canvas term id missing for {term_alias}")
return
term_code = (term_record.get('code') or normalized_alias).lower()
mode_choice = input("Demo run with ~10 random courses? (y/N): ").strip().lower()
demo_mode = mode_choice == 'y'
courses = getCoursesInTerm(term_id, get_fresh=0, show=0)
if not isinstance(courses, list):
print("Unable to fetch courses for this term; aborting.")
return
if demo_mode:
random.shuffle(courses)
print("Demo mode: targeting up to 10 courses with analytics data")
output_path = f"cache/{term_code}_participation.csv"
base_fields = [
'term_code',
'course_id',
'course_name',
'course_sis_id',
'course_code',
'student_canvas_id',
'student_sortable_name',
'student_sis_user_id',
'student_login_id',
'student_name',
'student_email',
]
rows = []
data_fields = set()
def flatten_value(prefix, value, dest):
if isinstance(value, dict):
for key, val in value.items():
next_key = f"{prefix}.{key}" if prefix else str(key)
flatten_value(next_key, val, dest)
elif isinstance(value, list):
dest[prefix] = json.dumps(value)
else:
dest[prefix] = value
processed_courses = 0
for course in courses:
if demo_mode and processed_courses >= 10:
break
course_id = course.get('id')
if not course_id:
continue
course_name = course.get('name', '')
print(f"Fetching analytics for course {course_id}: {course_name}")
enrollment_index = {}
try:
enrollment_params = {
'type[]': 'StudentEnrollment',
'per_page': 100,
}
enrollments = fetch(f"/api/v1/courses/{course_id}/enrollments", params=enrollment_params)
if isinstance(enrollments, list):
for enrollment in enrollments:
user = enrollment.get('user') or {}
user_id = user.get('id') or enrollment.get('user_id')
if not user_id:
continue
entry = {
'sortable_name': user.get('sortable_name', ''),
'sis_user_id': user.get('sis_user_id', ''),
'login_id': user.get('login_id', ''),
'sis_login_id': user.get('sis_login_id', ''),
'email': user.get('email', ''),
'name': user.get('name', ''),
}
enrollment_index[user_id] = entry
enrollment_index[str(user_id)] = entry
except Exception as exc:
print(f"Failed to fetch enrollments for {course_id}: {exc}")
try:
summaries = fetch(f"/api/v1/courses/{course_id}/analytics/student_summaries")
except Exception as exc:
print(f"Failed to fetch analytics for {course_id}: {exc}")
continue
if not isinstance(summaries, list):
print(f"Unexpected analytics payload for {course_id}; skipping")
continue
course_rows_added = 0
for summary in summaries:
flattened = {}
flatten_value('', summary, flattened)
user_id = (
summary.get('id')
or summary.get('user_id')
or flattened.get('user_id')
or flattened.get('user.id')
)
enrollment_details = {}
if user_id in enrollment_index:
enrollment_details = enrollment_index[user_id]
elif isinstance(user_id, str) and user_id.isdigit():
enrollment_details = enrollment_index.get(int(user_id), {})
elif isinstance(user_id, int):
enrollment_details = enrollment_index.get(str(user_id), {})
row = {
'term_code': term_code,
'course_id': str(course_id),
'course_name': course_name,
'course_sis_id': course.get('sis_course_id', ''),
'course_code': course.get('course_code', ''),
'student_canvas_id': str(user_id) if user_id else '',
'student_sortable_name': enrollment_details.get('sortable_name') or '',
'student_sis_user_id': (enrollment_details.get('sis_user_id') or enrollment_details.get('sis_login_id')) or '',
'student_login_id': enrollment_details.get('login_id') or '',
'student_name': enrollment_details.get('name') or '',
'student_email': enrollment_details.get('email') or '',
}
if enrollment_details:
data_fields.add('student_name')
data_fields.add('student_email')
for key, value in flattened.items():
if not key:
continue
row[key] = value
data_fields.add(key)
rows.append(row)
course_rows_added += 1
if course_rows_added == 0:
print(f"Skipping course {course_id}: no student analytics data")
continue
processed_courses += 1
if demo_mode and processed_courses < 10:
print(f"Demo mode finished early: only {processed_courses} courses had analytics data")
if not rows:
print("No analytics data found; nothing to write.")
return
field_order = base_fields + sorted([field for field in data_fields if field not in base_fields])
print(f"Writing {len(rows)} rows to {output_path}")
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=field_order)
writer.writeheader()
for row in rows:
writer.writerow({field: row.get(field, '') for field in field_order})
# Summarize student orientation enrollments across the account and flag coverage gaps.
def audit_student_orientation_enrollments():
orientation_years = ['2022', '2023', '2024', '2025', '2026']
orientation_shells = get_orientation_shells(orientation_years)
missing_years = [year for year in orientation_years if year not in orientation_shells]
if missing_years:
print(f"Warning: orientation shells not found for years: {', '.join(missing_years)}")
if not orientation_shells:
print("No orientation courses located; aborting.")
return
orientation_memberships = get_orientation_memberships(orientation_years)
student_summaries = get_student_enrollment_summary()
if not student_summaries:
print("No student enrollment data available; aborting.")
return
rows = []
for summary in student_summaries:
user_id = summary.get('user_id')
user_key = str(user_id)
membership = orientation_memberships.get(user_key, {'years': set(), 'total': 0})
membership_years = membership.get('years', set())
orientation_total = membership.get('total', 0)
row = {
'student_id': user_key,
'sortable_name': summary.get('sortable_name') or summary.get('name') or '',
'sis_id': summary.get('sis_user_id') or '',
'student_enrollment_count': summary.get('student_enrollments', 0),
'teacher_enrollment_count': summary.get('teacher_enrollments', 0),
'orientation_enrollment_total': orientation_total,
'missing_student_orientation': 1 if orientation_total == 0 else 0,
}
for year in orientation_years:
row[year] = 1 if year in membership_years else 0
rows.append(row)
if not rows:
print("No rows to write; aborting.")
return
rows.sort(key=lambda r: (r.get('missing_student_orientation', 0), r.get('sortable_name', '')))
output_path = 'cache/student_orientation_audit.csv'
fieldnames = [
'student_id',
'sortable_name',
'sis_id',
'student_enrollment_count',
'teacher_enrollment_count',
*orientation_years,
'orientation_enrollment_total',
'missing_student_orientation'
]
print(f"Writing {len(rows)} rows to {output_path}")
with open(output_path, 'w', newline='', encoding='utf-8') as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow({field: row.get(field, '') for field in fieldnames})
missing_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) == 0)
multi_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) > 1)
print(f"Orientation audit complete. Missing: {missing_count}, duplicates: {multi_count}.")
# Create Canvas calendar events for predefined orientation shells from CSV input. # Create Canvas calendar events for predefined orientation shells from CSV input.
def create_calendar_event(): def create_calendar_event():
events = codecs.open('cache/academic_calendar_2025.csv','r','utf-8').readlines() events = codecs.open('cache/academic_calendar_2025.csv','r','utf-8').readlines()
@ -3189,10 +3438,12 @@ if __name__ == "__main__":
21: ['Reset course conclude date',update_course_conclude], 21: ['Reset course conclude date',update_course_conclude],
22: ['Create calendar events for orientation shells', create_calendar_event], 22: ['Create calendar events for orientation shells', create_calendar_event],
23: ['Remove all calendar events from a course', remove_all_course_events], 23: ['Remove all calendar events from a course', remove_all_course_events],
24: ['list all assignments', list_all_assignments], 24: ['Build participation report for a term', build_term_participation_report],
25: ['Bulk unenroll from course', bulk_unenroll], 25: ['Audit student orientation enrollments', audit_student_orientation_enrollments],
26: ['enrollment helper', enrollment_helper], 26: ['list all assignments', list_all_assignments],
27: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid], 27: ['Bulk unenroll from course', bulk_unenroll],
28: ['enrollment helper', enrollment_helper],
29: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid],
30: ['* Overview semester start dates',overview_start_dates], 30: ['* Overview semester start dates',overview_start_dates],
31: ['Fine tune term dates and winter session', course_by_depts_terms], 31: ['Fine tune term dates and winter session', course_by_depts_terms],

View File

@ -62,7 +62,7 @@ def all_gav_employees():
h.updated_at, p.last_request_at, p.last_login_at, p.current_login_at, p.last_login_ip, h.updated_at, p.last_request_at, p.last_login_at, p.current_login_at, p.last_login_ip,
p.current_login_ip, p.sis_user_id, p.unique_name FROM users AS u p.current_login_ip, p.sis_user_id, p.unique_name FROM users AS u
JOIN comm_channel AS h ON u.id=h.user_id JOIN comm_channel AS h ON u.id=h.user_id
JOIN pseudonym AS p ON p.user_id=u.id JOIN pseudonyms AS p ON p.user_id=u.id
WHERE h.address LIKE "%@gavilan.edu" WHERE h.address LIKE "%@gavilan.edu"
ORDER BY u.sortablename""" ORDER BY u.sortablename"""
cursor = connection.cursor() cursor = connection.cursor()
@ -200,6 +200,107 @@ ORDER BY c.sis_source_id, wp.title;"""
return all return all
# Fetch orientation shell course ids keyed by year from the local Canvas database.
def get_orientation_shells(years=None):
year_filter = set(str(y) for y in years) if years else None
(connection, _cursor) = db()
cursor = connection.cursor()
cursor.execute(
"""
SELECT id,
name,
substring(name FROM '(\\d{4})') AS year
FROM canvas.courses
WHERE name ILIKE 'iLearn Student Orientation %'
"""
)
shells = {}
for course_id, name, year in cursor.fetchall():
if not year:
continue
if year_filter and year not in year_filter:
continue
if year not in shells:
shells[year] = {'id': course_id, 'name': name}
return shells
# Collect per-student orientation membership details keyed by Canvas user id.
def get_orientation_memberships(years=None):
year_filter = set(str(y) for y in years) if years else None
(connection, _cursor) = db()
cursor = connection.cursor()
cursor.execute(
"""
SELECT e.user_id,
substring(c.name FROM '(\\d{4})') AS year
FROM canvas.enrollments e
JOIN canvas.courses c ON c.id = e.course_id
WHERE c.name ILIKE 'iLearn Student Orientation %'
AND e.type = 'StudentEnrollment'
AND e.workflow_state IN ('active', 'completed', 'inactive')
"""
)
memberships = {}
for user_id, year in cursor.fetchall():
if not year:
continue
if year_filter and year not in year_filter:
continue
user_key = str(user_id)
membership = memberships.setdefault(user_key, {'years': set(), 'total': 0})
if year not in membership['years']:
membership['years'].add(year)
membership['total'] += 1
return memberships
# Produce student enrollment counts for the orientation audit.
def get_student_enrollment_summary():
(connection, _cursor) = db()
cursor = connection.cursor()
cursor.execute(
"""
SELECT u.id,
u.sortable_name,
u.name,
sis_map.sis_user_id,
COALESCE(SUM(CASE WHEN e.type = 'StudentEnrollment' THEN 1 ELSE 0 END), 0) AS student_enrollments,
COALESCE(SUM(CASE WHEN e.type = 'TeacherEnrollment' THEN 1 ELSE 0 END), 0) AS teacher_enrollments
FROM canvas.users u
LEFT JOIN canvas.enrollments e
ON e.user_id = u.id
AND e.workflow_state IN ('active', 'completed', 'inactive')
LEFT JOIN LATERAL (
SELECT p.sis_user_id
FROM canvas.pseudonyms p
WHERE p.user_id = u.id
AND p.workflow_state = 'active'
AND p.sis_user_id IS NOT NULL
AND p.sis_user_id <> ''
ORDER BY p.position NULLS FIRST, p.id
LIMIT 1
) sis_map ON TRUE
GROUP BY u.id, u.sortable_name, u.name, sis_map.sis_user_id
HAVING COALESCE(SUM(CASE WHEN e.type = 'StudentEnrollment' THEN 1 ELSE 0 END), 0) > 0
"""
)
results = []
for row in cursor.fetchall():
user_id, sortable_name, name, sis_user_id, student_count, teacher_count = row
results.append(
{
'user_id': user_id,
'sortable_name': sortable_name,
'name': name,
'sis_user_id': sis_user_id,
'student_enrollments': int(student_count or 0),
'teacher_enrollments': int(teacher_count or 0),
}
)
return results
def user_ids_in_shell(shellid): def user_ids_in_shell(shellid):
q = f"""select e.user_id from canvas.enrollments e q = f"""select e.user_id from canvas.enrollments e
where e.course_id = {shellid} and e.type='StudentEnrollment' and e.workflow_state='active';""" where e.course_id = {shellid} and e.type='StudentEnrollment' and e.workflow_state='active';"""

View File

@ -358,27 +358,38 @@ def log_section_filling2(current_sched_list, short_sem):
todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now])
todays_df = todays_df.rename_axis('crn') todays_df = todays_df.rename_axis('crn')
todays_df.index = todays_df.index.astype(str)
#print(todays_df) #print(todays_df)
todays_df.to_csv('cache/reg_today_new.csv', index=True) todays_df.to_csv('cache/reg_today_new.csv', index=True)
csv_path = pathlib.Path('cache') / f'reg_data_{short_sem}.csv'
csv_path.parent.mkdir(parents=True, exist_ok=True)
try: try:
myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') myframe = pd.read_csv(csv_path)
print(myframe) print(myframe)
except: except FileNotFoundError:
fff = open('cache/reg_data_'+short_sem+'.csv','w') myframe = pd.DataFrame(columns=['crn'])
fff.write('crn\n')
fff.close()
myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv')
#myframe = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now])
#myframe = myframe.rename_axis('crn')
print("Creating new data file for this semester.") print("Creating new data file for this semester.")
except pd.errors.EmptyDataError:
myframe = pd.DataFrame(columns=['crn'])
print("Existing data file was empty; starting fresh for this semester.")
new_df = myframe.join( todays_df, on='crn', how='outer' ) if 'crn' in myframe.columns:
new_df = new_df.rename_axis('crn') myframe = myframe.set_index('crn')
print(new_df) else:
myframe = myframe.rename_axis('crn')
myframe.index = myframe.index.astype(str)
combined_df = myframe.reindex(myframe.index.union(todays_df.index))
combined_df[now] = todays_df[now]
combined_df = combined_df.sort_index()
combined_df = combined_df.reset_index()
combined_df = combined_df.fillna('')
print(combined_df)
reg_data_filename = 'reg_data_' + short_sem + '.csv' reg_data_filename = 'reg_data_' + short_sem + '.csv'
new_df.to_csv('cache/' + reg_data_filename, index=False) tmp_path = csv_path.with_suffix(csv_path.suffix + '.tmp')
combined_df.to_csv(tmp_path, index=False)
tmp_path.replace(csv_path)
put_file('/home/public/schedule/', 'cache/', reg_data_filename, 0) put_file('/home/public/schedule/', 'cache/', reg_data_filename, 0)

36
ssb.py
View File

@ -63,27 +63,38 @@ def log_section_filling2(current_sched_list, short_sem):
todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now])
todays_df = todays_df.rename_axis('crn') todays_df = todays_df.rename_axis('crn')
todays_df.index = todays_df.index.astype(str)
#print(todays_df) #print(todays_df)
todays_df.to_csv('cache/reg_today_new.csv', index=True) todays_df.to_csv('cache/reg_today_new.csv', index=True)
csv_path = pathlib.Path('cache') / f'reg_data_{short_sem}.csv'
csv_path.parent.mkdir(parents=True, exist_ok=True)
try: try:
myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') myframe = pd.read_csv(csv_path)
print(myframe) print(myframe)
except: except FileNotFoundError:
fff = open('cache/reg_data_'+short_sem+'.csv','w') myframe = pd.DataFrame(columns=['crn'])
fff.write('crn\n')
fff.close()
myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv')
#myframe = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now])
#myframe = myframe.rename_axis('crn')
print("Creating new data file for this semester.") print("Creating new data file for this semester.")
except pd.errors.EmptyDataError:
myframe = pd.DataFrame(columns=['crn'])
print("Existing data file was empty; starting fresh for this semester.")
new_df = myframe.join( todays_df, on='crn', how='outer' ) if 'crn' in myframe.columns:
new_df = new_df.rename_axis('crn') myframe = myframe.set_index('crn')
print(new_df) else:
myframe = myframe.rename_axis('crn')
myframe.index = myframe.index.astype(str)
combined_df = myframe.reindex(myframe.index.union(todays_df.index))
combined_df[now] = todays_df[now]
combined_df = combined_df.sort_index()
combined_df = combined_df.reset_index()
combined_df = combined_df.fillna('')
print(combined_df)
reg_data_filename = 'reg_data_' + short_sem + '.csv' reg_data_filename = 'reg_data_' + short_sem + '.csv'
new_df.to_csv('cache/' + reg_data_filename, index=False) tmp_path = csv_path.with_suffix(csv_path.suffix + '.tmp')
combined_df.to_csv(tmp_path, index=False)
tmp_path.replace(csv_path)
# Take banner's html and make a csv(?) file # Take banner's html and make a csv(?) file
@ -593,4 +604,3 @@ for item in semesters:
time.sleep(45) time.sleep(45)
else: else:
print(f"Stopped due to error: {result}") print(f"Stopped due to error: {result}")

126
tasks.py
View File

@ -14,6 +14,7 @@
""" """
import pysftp, os, datetime, requests, re, json, sqlite3, codecs, csv, sys import pysftp, os, datetime, requests, re, json, sqlite3, codecs, csv, sys
import yaml
import funcy, os.path, datetime, calendar, time, shutil, urllib import funcy, os.path, datetime, calendar, time, shutil, urllib
from datetime import datetime from datetime import datetime
from collections import defaultdict from collections import defaultdict
@ -184,9 +185,9 @@ def convert_to_pdf(name1, name2):
# Build (docx/pdf) certificates for gott graduates # Build (docx/pdf) certificates for gott graduates
def certificates_gott_build(): def certificates_gott_build():
course = "gott_1_fa25_sept" course = "gott_6_fa25_sept"
coursedate = "Fall 2025" coursedate = "Fall 2025"
certificate = "gott 1 template.docx" certificate = "gott 6 template.docx"
#course = "gott_4_su25" #course = "gott_4_su25"
#certificate = "gott 4 template.docx" #certificate = "gott 4 template.docx"
@ -1287,6 +1288,125 @@ def file_renamer():
print("ok") print("ok")
# Collect developer key metadata for a Canvas account and write an audit TXT report.
def audit_developer_keys(account_id=1, output_path='cache/developer_keys_audit.txt', include_details=True):
params = {'per_page': 100}
developer_keys = fetch(f"/api/v1/accounts/{account_id}/developer_keys", params=params)
if not isinstance(developer_keys, list):
print('Unexpected response when fetching developer keys')
return developer_keys
records = []
for key in developer_keys:
summary = {
'id': key.get('id'),
'name': key.get('name'),
'description': key.get('description') or key.get('notes'),
'owner_user_id': key.get('user_id'),
'created_at': key.get('created_at'),
'updated_at': key.get('updated_at'),
'last_used_at': key.get('last_used_at') or key.get('last_developer_key_use'),
'workflow_state': key.get('workflow_state'),
'visible': key.get('visible'),
'require_scopes': key.get('require_scopes'),
'redirect_uris': key.get('redirect_uris') or key.get('redirect_uri'),
'access_token_count': key.get('access_token_count'),
'scopes': key.get('scopes'),
}
record = {'summary': summary}
if include_details:
record['detail'] = key
records.append(record)
if not records:
print('No developer keys found; skipping file write.')
return records
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as handle:
yaml.safe_dump(records, handle, sort_keys=False, allow_unicode=True)
print(f"Wrote {len(records)} developer keys to {output_path}")
return records
# Collect external tool metadata for a Canvas account and write an audit TXT report.
def audit_external_tools(account_id=1, output_path='cache/external_tools_audit.txt', include_details=True):
params = {'per_page': 100, 'include[]': ['usage', 'editor_button', 'course_navigation']}
tools = fetch(f"/api/v1/accounts/{account_id}/external_tools", params=params)
if not isinstance(tools, list):
print('Unexpected response when fetching external tools')
return tools
records = []
for tool in tools:
detail = tool
if include_details and tool.get('id'):
try:
detail = fetch(f"/api/v1/accounts/{account_id}/external_tools/{tool['id']}")
if not isinstance(detail, dict):
detail = tool
except Exception as exc:
print(f"Failed to fetch detail for external tool {tool.get('id')}: {exc}")
detail = tool
usage = detail.get('usage') or tool.get('usage') or {}
placements = detail.get('placements')
if isinstance(placements, list):
placements_value = placements
elif isinstance(placements, dict):
placements_value = list(placements.keys())
else:
placements_value = placements
summary = {
'id': detail.get('id'),
'name': detail.get('name'),
'description': detail.get('description'),
'consumer_key': detail.get('consumer_key'),
'created_at': detail.get('created_at'),
'updated_at': detail.get('updated_at'),
'workflow_state': detail.get('workflow_state'),
'last_used_at': usage.get('last_used_at') or detail.get('last_used_at'),
'launch_count_30d': usage.get('past_30_days'),
'launch_count_total': usage.get('total'),
'privacy_level': detail.get('privacy_level'),
'url': detail.get('url'),
'domain': detail.get('domain') or detail.get('custom_fields', {}).get('domain'),
'default': detail.get('is_default'),
'placements': placements_value,
}
record = {'summary': summary}
if include_details:
record['detail'] = detail
records.append(record)
if not records:
print('No external tools found; skipping file write.')
return records
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as handle:
yaml.safe_dump(records, handle, sort_keys=False, allow_unicode=True)
print(f"Wrote {len(records)} external tools to {output_path}")
return records
# Run a combined audit of developer keys and external tools for a Canvas account.
def run_canvas_integration_audit(account_id=1):
developer_keys = audit_developer_keys(account_id)
external_tools = audit_external_tools(account_id)
print(f"Developer keys audited: {len(developer_keys) if isinstance(developer_keys, list) else 0}")
print(f"External tools audited: {len(external_tools) if isinstance(external_tools, list) else 0}")
return developer_keys, external_tools
# Use api to fix ilearn's authentication method when we can't log in. List. # Use api to fix ilearn's authentication method when we can't log in. List.
def list_auth(): def list_auth():
r = fetch( url + '/api/v1/accounts/1/authentication_providers') r = fetch( url + '/api/v1/accounts/1/authentication_providers')
@ -1322,6 +1442,7 @@ if __name__ == "__main__":
15: ['create a week calendar in word (general purpose)', word_calendar_v2], 15: ['create a week calendar in word (general purpose)', word_calendar_v2],
16: ['create GOTT certificates', certificates_gott_build], 16: ['create GOTT certificates', certificates_gott_build],
20: ['build_quiz', build_quiz], 20: ['build_quiz', build_quiz],
21: ['audit external access', run_canvas_integration_audit],
#21: ['certificates_gott_build, certificates_gott_build'] #21: ['certificates_gott_build, certificates_gott_build']
} }
@ -1357,3 +1478,4 @@ if __name__ == "__main__":
auth = json.loads(open('cache/badgr.txt','r').read()) auth = json.loads(open('cache/badgr.txt','r').read())
print ( auth ) print ( auth )