From f73480ab32e4ba0549177d983a1c9dba0818ca3e Mon Sep 17 00:00:00 2001 From: Peter Howell Date: Wed, 22 Oct 2025 20:54:33 +0000 Subject: [PATCH] cleanup ssb version --- aws.py | 37 ++++--- courses.py | 267 +++++++++++++++++++++++++++++++++++++++++++++++-- localcache2.py | 103 ++++++++++++++++++- schedules.py | 37 ++++--- ssb.py | 36 ++++--- tasks.py | 126 ++++++++++++++++++++++- 6 files changed, 556 insertions(+), 50 deletions(-) diff --git a/aws.py b/aws.py index 8e78bc7..c740cfd 100644 --- a/aws.py +++ b/aws.py @@ -412,27 +412,38 @@ def log_section_filling2(current_sched_list): todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) todays_df = todays_df.rename_axis('crn') + todays_df.index = todays_df.index.astype(str) #print(todays_df) todays_df.to_csv('cache/reg_today_new.csv', index=True) + csv_path = pathlib.Path('cache') / f'reg_data_{short_sem}.csv' + csv_path.parent.mkdir(parents=True, exist_ok=True) try: - myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') + myframe = pd.read_csv(csv_path) print(myframe) - except: - fff = open('cache/reg_data_'+short_sem+'.csv','w') - fff.write('crn\n') - fff.close() - myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') - #myframe = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) - #myframe = myframe.rename_axis('crn') + except FileNotFoundError: + myframe = pd.DataFrame(columns=['crn']) print("Creating new data file for this semester.") + except pd.errors.EmptyDataError: + myframe = pd.DataFrame(columns=['crn']) + print("Existing data file was empty; starting fresh for this semester.") - new_df = myframe.join( todays_df, on='crn', how='outer' ) - new_df = new_df.rename_axis('crn') - print(new_df) + if 'crn' in myframe.columns: + myframe = myframe.set_index('crn') + else: + myframe = myframe.rename_axis('crn') + myframe.index = myframe.index.astype(str) + combined_df = myframe.reindex(myframe.index.union(todays_df.index)) + combined_df[now] = todays_df[now] + combined_df = combined_df.sort_index() + combined_df = combined_df.reset_index() + combined_df = combined_df.fillna('') + print(combined_df) reg_data_filename = 'reg_data_' + short_sem + '.csv' - new_df.to_csv('cache/' + reg_data_filename, index=False) + tmp_path = csv_path.with_suffix(csv_path.suffix + '.tmp') + combined_df.to_csv(tmp_path, index=False) + tmp_path.replace(csv_path) put_file('/home/public/schedule/', 'cache/', reg_data_filename, 0) # Input: xxxx_sched.json. Output: xxxx_latestarts.txt @@ -750,4 +761,4 @@ def put_file(remotepath,localpath, localfile,prompt=1): sftp.close() print("Uploaded %s" % localfile) -scrape_schedule_multi() \ No newline at end of file +scrape_schedule_multi() diff --git a/courses.py b/courses.py index 3a703a6..1598b89 100644 --- a/courses.py +++ b/courses.py @@ -1,4 +1,4 @@ -import json, re, requests, codecs, sys, time, funcy, os +import json, re, requests, codecs, sys, time, funcy, os, csv, random import pandas as pd from datetime import datetime, timedelta, timezone import pytz @@ -6,8 +6,19 @@ from util import print_table, int_or_zero, float_or_zero, dept_from_name, num_fr from pipelines import fetch, fetch_stream, fetch_collapse, header, url from schedules import get_semester_schedule from localcache import course_quick_stats, get_courses_in_term_local, course_student_stats, all_sem_courses_teachers, full_reload -from localcache2 import db, users_new_this_semester, users_new_this_2x_semester, course_from_id, user_ids_in_shell -from localcache2 import student_count, teacher_list, course_from_id, course_sched_entry_from_id +from localcache2 import ( + db, + users_new_this_semester, + users_new_this_2x_semester, + course_from_id, + user_ids_in_shell, + student_count, + teacher_list, + course_sched_entry_from_id, + get_orientation_shells, + get_orientation_memberships, + get_student_enrollment_summary, +) from collections import defaultdict from semesters import find_term @@ -1906,7 +1917,9 @@ def create_sandboxes(): #(23015, ' Sandbox GOTT2 SU25'), #(21898, ' Sandbox GOTT4 SU25'), #(23270, ' Sandbox GOTT1 FA25SEPT'), - (23290, ' Sandbox GOTT2 FA25SEPT'), + #(23290, ' Sandbox GOTT2 FA25SEPT'), + (23314, ' Sandbox GOTT5 FA25'), + (23315, ' Sandbox GOTT4 FA25'), ] filepath = 'cache/sandbox_courses.pkl' @@ -2906,6 +2919,242 @@ def remove_all_course_events(): print(f"failed: {response.status_code} {response.text}") +# Build a term-wide CSV summarizing student participation metrics for every course. +def build_term_participation_report(): + term_alias = input("Term alias (ex: fa25): ").strip() + if not term_alias: + print("No term alias provided; aborting.") + return + + normalized_alias = term_alias.lower() + term_record = find_term(normalized_alias) + if not term_record: + print(f"Unknown term alias: {term_alias}") + return + + term_id = term_record.get('canvas_term_id') + if not term_id: + print(f"Canvas term id missing for {term_alias}") + return + + term_code = (term_record.get('code') or normalized_alias).lower() + mode_choice = input("Demo run with ~10 random courses? (y/N): ").strip().lower() + demo_mode = mode_choice == 'y' + courses = getCoursesInTerm(term_id, get_fresh=0, show=0) + if not isinstance(courses, list): + print("Unable to fetch courses for this term; aborting.") + return + + if demo_mode: + random.shuffle(courses) + print("Demo mode: targeting up to 10 courses with analytics data") + + output_path = f"cache/{term_code}_participation.csv" + base_fields = [ + 'term_code', + 'course_id', + 'course_name', + 'course_sis_id', + 'course_code', + 'student_canvas_id', + 'student_sortable_name', + 'student_sis_user_id', + 'student_login_id', + 'student_name', + 'student_email', + ] + rows = [] + data_fields = set() + + def flatten_value(prefix, value, dest): + if isinstance(value, dict): + for key, val in value.items(): + next_key = f"{prefix}.{key}" if prefix else str(key) + flatten_value(next_key, val, dest) + elif isinstance(value, list): + dest[prefix] = json.dumps(value) + else: + dest[prefix] = value + + processed_courses = 0 + for course in courses: + if demo_mode and processed_courses >= 10: + break + course_id = course.get('id') + if not course_id: + continue + course_name = course.get('name', '') + print(f"Fetching analytics for course {course_id}: {course_name}") + enrollment_index = {} + try: + enrollment_params = { + 'type[]': 'StudentEnrollment', + 'per_page': 100, + } + enrollments = fetch(f"/api/v1/courses/{course_id}/enrollments", params=enrollment_params) + if isinstance(enrollments, list): + for enrollment in enrollments: + user = enrollment.get('user') or {} + user_id = user.get('id') or enrollment.get('user_id') + if not user_id: + continue + entry = { + 'sortable_name': user.get('sortable_name', ''), + 'sis_user_id': user.get('sis_user_id', ''), + 'login_id': user.get('login_id', ''), + 'sis_login_id': user.get('sis_login_id', ''), + 'email': user.get('email', ''), + 'name': user.get('name', ''), + } + enrollment_index[user_id] = entry + enrollment_index[str(user_id)] = entry + except Exception as exc: + print(f"Failed to fetch enrollments for {course_id}: {exc}") + + try: + summaries = fetch(f"/api/v1/courses/{course_id}/analytics/student_summaries") + except Exception as exc: + print(f"Failed to fetch analytics for {course_id}: {exc}") + continue + + if not isinstance(summaries, list): + print(f"Unexpected analytics payload for {course_id}; skipping") + continue + + course_rows_added = 0 + for summary in summaries: + flattened = {} + flatten_value('', summary, flattened) + user_id = ( + summary.get('id') + or summary.get('user_id') + or flattened.get('user_id') + or flattened.get('user.id') + ) + enrollment_details = {} + if user_id in enrollment_index: + enrollment_details = enrollment_index[user_id] + elif isinstance(user_id, str) and user_id.isdigit(): + enrollment_details = enrollment_index.get(int(user_id), {}) + elif isinstance(user_id, int): + enrollment_details = enrollment_index.get(str(user_id), {}) + row = { + 'term_code': term_code, + 'course_id': str(course_id), + 'course_name': course_name, + 'course_sis_id': course.get('sis_course_id', ''), + 'course_code': course.get('course_code', ''), + 'student_canvas_id': str(user_id) if user_id else '', + 'student_sortable_name': enrollment_details.get('sortable_name') or '', + 'student_sis_user_id': (enrollment_details.get('sis_user_id') or enrollment_details.get('sis_login_id')) or '', + 'student_login_id': enrollment_details.get('login_id') or '', + 'student_name': enrollment_details.get('name') or '', + 'student_email': enrollment_details.get('email') or '', + } + if enrollment_details: + data_fields.add('student_name') + data_fields.add('student_email') + for key, value in flattened.items(): + if not key: + continue + row[key] = value + data_fields.add(key) + rows.append(row) + course_rows_added += 1 + + if course_rows_added == 0: + print(f"Skipping course {course_id}: no student analytics data") + continue + + processed_courses += 1 + + if demo_mode and processed_courses < 10: + print(f"Demo mode finished early: only {processed_courses} courses had analytics data") + + if not rows: + print("No analytics data found; nothing to write.") + return + + field_order = base_fields + sorted([field for field in data_fields if field not in base_fields]) + print(f"Writing {len(rows)} rows to {output_path}") + with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=field_order) + writer.writeheader() + for row in rows: + writer.writerow({field: row.get(field, '') for field in field_order}) + + +# Summarize student orientation enrollments across the account and flag coverage gaps. +def audit_student_orientation_enrollments(): + orientation_years = ['2022', '2023', '2024', '2025', '2026'] + + orientation_shells = get_orientation_shells(orientation_years) + missing_years = [year for year in orientation_years if year not in orientation_shells] + if missing_years: + print(f"Warning: orientation shells not found for years: {', '.join(missing_years)}") + if not orientation_shells: + print("No orientation courses located; aborting.") + return + + orientation_memberships = get_orientation_memberships(orientation_years) + student_summaries = get_student_enrollment_summary() + + if not student_summaries: + print("No student enrollment data available; aborting.") + return + + rows = [] + for summary in student_summaries: + user_id = summary.get('user_id') + user_key = str(user_id) + membership = orientation_memberships.get(user_key, {'years': set(), 'total': 0}) + membership_years = membership.get('years', set()) + orientation_total = membership.get('total', 0) + + row = { + 'student_id': user_key, + 'sortable_name': summary.get('sortable_name') or summary.get('name') or '', + 'sis_id': summary.get('sis_user_id') or '', + 'student_enrollment_count': summary.get('student_enrollments', 0), + 'teacher_enrollment_count': summary.get('teacher_enrollments', 0), + 'orientation_enrollment_total': orientation_total, + 'missing_student_orientation': 1 if orientation_total == 0 else 0, + } + + for year in orientation_years: + row[year] = 1 if year in membership_years else 0 + + rows.append(row) + + if not rows: + print("No rows to write; aborting.") + return + + rows.sort(key=lambda r: (r.get('missing_student_orientation', 0), r.get('sortable_name', ''))) + output_path = 'cache/student_orientation_audit.csv' + fieldnames = [ + 'student_id', + 'sortable_name', + 'sis_id', + 'student_enrollment_count', + 'teacher_enrollment_count', + *orientation_years, + 'orientation_enrollment_total', + 'missing_student_orientation' + ] + + print(f"Writing {len(rows)} rows to {output_path}") + with open(output_path, 'w', newline='', encoding='utf-8') as handle: + writer = csv.DictWriter(handle, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow({field: row.get(field, '') for field in fieldnames}) + + missing_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) == 0) + multi_count = sum(1 for row in rows if row.get('orientation_enrollment_total', 0) > 1) + print(f"Orientation audit complete. Missing: {missing_count}, duplicates: {multi_count}.") + + # Create Canvas calendar events for predefined orientation shells from CSV input. def create_calendar_event(): events = codecs.open('cache/academic_calendar_2025.csv','r','utf-8').readlines() @@ -3189,10 +3438,12 @@ if __name__ == "__main__": 21: ['Reset course conclude date',update_course_conclude], 22: ['Create calendar events for orientation shells', create_calendar_event], 23: ['Remove all calendar events from a course', remove_all_course_events], - 24: ['list all assignments', list_all_assignments], - 25: ['Bulk unenroll from course', bulk_unenroll], - 26: ['enrollment helper', enrollment_helper], - 27: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid], + 24: ['Build participation report for a term', build_term_participation_report], + 25: ['Audit student orientation enrollments', audit_student_orientation_enrollments], + 26: ['list all assignments', list_all_assignments], + 27: ['Bulk unenroll from course', bulk_unenroll], + 28: ['enrollment helper', enrollment_helper], + 29: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid], 30: ['* Overview semester start dates',overview_start_dates], 31: ['Fine tune term dates and winter session', course_by_depts_terms], diff --git a/localcache2.py b/localcache2.py index ff4a3a0..8b558e7 100644 --- a/localcache2.py +++ b/localcache2.py @@ -62,7 +62,7 @@ def all_gav_employees(): h.updated_at, p.last_request_at, p.last_login_at, p.current_login_at, p.last_login_ip, p.current_login_ip, p.sis_user_id, p.unique_name FROM users AS u JOIN comm_channel AS h ON u.id=h.user_id - JOIN pseudonym AS p ON p.user_id=u.id + JOIN pseudonyms AS p ON p.user_id=u.id WHERE h.address LIKE "%@gavilan.edu" ORDER BY u.sortablename""" cursor = connection.cursor() @@ -200,6 +200,107 @@ ORDER BY c.sis_source_id, wp.title;""" return all +# Fetch orientation shell course ids keyed by year from the local Canvas database. +def get_orientation_shells(years=None): + year_filter = set(str(y) for y in years) if years else None + (connection, _cursor) = db() + cursor = connection.cursor() + cursor.execute( + """ + SELECT id, + name, + substring(name FROM '(\\d{4})') AS year + FROM canvas.courses + WHERE name ILIKE 'iLearn Student Orientation %' + """ + ) + shells = {} + for course_id, name, year in cursor.fetchall(): + if not year: + continue + if year_filter and year not in year_filter: + continue + if year not in shells: + shells[year] = {'id': course_id, 'name': name} + return shells + + +# Collect per-student orientation membership details keyed by Canvas user id. +def get_orientation_memberships(years=None): + year_filter = set(str(y) for y in years) if years else None + (connection, _cursor) = db() + cursor = connection.cursor() + cursor.execute( + """ + SELECT e.user_id, + substring(c.name FROM '(\\d{4})') AS year + FROM canvas.enrollments e + JOIN canvas.courses c ON c.id = e.course_id + WHERE c.name ILIKE 'iLearn Student Orientation %' + AND e.type = 'StudentEnrollment' + AND e.workflow_state IN ('active', 'completed', 'inactive') + """ + ) + memberships = {} + for user_id, year in cursor.fetchall(): + if not year: + continue + if year_filter and year not in year_filter: + continue + user_key = str(user_id) + membership = memberships.setdefault(user_key, {'years': set(), 'total': 0}) + if year not in membership['years']: + membership['years'].add(year) + membership['total'] += 1 + return memberships + + +# Produce student enrollment counts for the orientation audit. +def get_student_enrollment_summary(): + (connection, _cursor) = db() + cursor = connection.cursor() + cursor.execute( + """ + SELECT u.id, + u.sortable_name, + u.name, + sis_map.sis_user_id, + COALESCE(SUM(CASE WHEN e.type = 'StudentEnrollment' THEN 1 ELSE 0 END), 0) AS student_enrollments, + COALESCE(SUM(CASE WHEN e.type = 'TeacherEnrollment' THEN 1 ELSE 0 END), 0) AS teacher_enrollments + FROM canvas.users u + LEFT JOIN canvas.enrollments e + ON e.user_id = u.id + AND e.workflow_state IN ('active', 'completed', 'inactive') + LEFT JOIN LATERAL ( + SELECT p.sis_user_id + FROM canvas.pseudonyms p + WHERE p.user_id = u.id + AND p.workflow_state = 'active' + AND p.sis_user_id IS NOT NULL + AND p.sis_user_id <> '' + ORDER BY p.position NULLS FIRST, p.id + LIMIT 1 + ) sis_map ON TRUE + GROUP BY u.id, u.sortable_name, u.name, sis_map.sis_user_id + HAVING COALESCE(SUM(CASE WHEN e.type = 'StudentEnrollment' THEN 1 ELSE 0 END), 0) > 0 + """ + ) + results = [] + for row in cursor.fetchall(): + user_id, sortable_name, name, sis_user_id, student_count, teacher_count = row + results.append( + { + 'user_id': user_id, + 'sortable_name': sortable_name, + 'name': name, + 'sis_user_id': sis_user_id, + 'student_enrollments': int(student_count or 0), + 'teacher_enrollments': int(teacher_count or 0), + } + ) + return results + + def user_ids_in_shell(shellid): q = f"""select e.user_id from canvas.enrollments e where e.course_id = {shellid} and e.type='StudentEnrollment' and e.workflow_state='active';""" diff --git a/schedules.py b/schedules.py index 8954e97..9aa8584 100644 --- a/schedules.py +++ b/schedules.py @@ -358,27 +358,38 @@ def log_section_filling2(current_sched_list, short_sem): todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) todays_df = todays_df.rename_axis('crn') + todays_df.index = todays_df.index.astype(str) #print(todays_df) todays_df.to_csv('cache/reg_today_new.csv', index=True) + csv_path = pathlib.Path('cache') / f'reg_data_{short_sem}.csv' + csv_path.parent.mkdir(parents=True, exist_ok=True) try: - myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') + myframe = pd.read_csv(csv_path) print(myframe) - except: - fff = open('cache/reg_data_'+short_sem+'.csv','w') - fff.write('crn\n') - fff.close() - myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') - #myframe = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) - #myframe = myframe.rename_axis('crn') + except FileNotFoundError: + myframe = pd.DataFrame(columns=['crn']) print("Creating new data file for this semester.") + except pd.errors.EmptyDataError: + myframe = pd.DataFrame(columns=['crn']) + print("Existing data file was empty; starting fresh for this semester.") - new_df = myframe.join( todays_df, on='crn', how='outer' ) - new_df = new_df.rename_axis('crn') - print(new_df) + if 'crn' in myframe.columns: + myframe = myframe.set_index('crn') + else: + myframe = myframe.rename_axis('crn') + myframe.index = myframe.index.astype(str) + combined_df = myframe.reindex(myframe.index.union(todays_df.index)) + combined_df[now] = todays_df[now] + combined_df = combined_df.sort_index() + combined_df = combined_df.reset_index() + combined_df = combined_df.fillna('') + print(combined_df) reg_data_filename = 'reg_data_' + short_sem + '.csv' - new_df.to_csv('cache/' + reg_data_filename, index=False) + tmp_path = csv_path.with_suffix(csv_path.suffix + '.tmp') + combined_df.to_csv(tmp_path, index=False) + tmp_path.replace(csv_path) put_file('/home/public/schedule/', 'cache/', reg_data_filename, 0) @@ -1394,4 +1405,4 @@ if __name__ == "__main__": resp = input('Choose: ') # Call the function in the options dict - options[ int(resp)][1]() \ No newline at end of file + options[ int(resp)][1]() diff --git a/ssb.py b/ssb.py index 7d82fc1..dea1fc2 100644 --- a/ssb.py +++ b/ssb.py @@ -63,27 +63,38 @@ def log_section_filling2(current_sched_list, short_sem): todays_df = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) todays_df = todays_df.rename_axis('crn') + todays_df.index = todays_df.index.astype(str) #print(todays_df) todays_df.to_csv('cache/reg_today_new.csv', index=True) + csv_path = pathlib.Path('cache') / f'reg_data_{short_sem}.csv' + csv_path.parent.mkdir(parents=True, exist_ok=True) try: - myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') + myframe = pd.read_csv(csv_path) print(myframe) - except: - fff = open('cache/reg_data_'+short_sem+'.csv','w') - fff.write('crn\n') - fff.close() - myframe = pd.read_csv('cache/reg_data_' + short_sem + '.csv') - #myframe = pd.DataFrame.from_dict(todays_data, orient='index', columns=[now]) - #myframe = myframe.rename_axis('crn') + except FileNotFoundError: + myframe = pd.DataFrame(columns=['crn']) print("Creating new data file for this semester.") + except pd.errors.EmptyDataError: + myframe = pd.DataFrame(columns=['crn']) + print("Existing data file was empty; starting fresh for this semester.") - new_df = myframe.join( todays_df, on='crn', how='outer' ) - new_df = new_df.rename_axis('crn') - print(new_df) + if 'crn' in myframe.columns: + myframe = myframe.set_index('crn') + else: + myframe = myframe.rename_axis('crn') + myframe.index = myframe.index.astype(str) + combined_df = myframe.reindex(myframe.index.union(todays_df.index)) + combined_df[now] = todays_df[now] + combined_df = combined_df.sort_index() + combined_df = combined_df.reset_index() + combined_df = combined_df.fillna('') + print(combined_df) reg_data_filename = 'reg_data_' + short_sem + '.csv' - new_df.to_csv('cache/' + reg_data_filename, index=False) + tmp_path = csv_path.with_suffix(csv_path.suffix + '.tmp') + combined_df.to_csv(tmp_path, index=False) + tmp_path.replace(csv_path) # Take banner's html and make a csv(?) file @@ -593,4 +604,3 @@ for item in semesters: time.sleep(45) else: print(f"Stopped due to error: {result}") - diff --git a/tasks.py b/tasks.py index 0bcec75..5525b52 100644 --- a/tasks.py +++ b/tasks.py @@ -14,6 +14,7 @@ """ import pysftp, os, datetime, requests, re, json, sqlite3, codecs, csv, sys +import yaml import funcy, os.path, datetime, calendar, time, shutil, urllib from datetime import datetime from collections import defaultdict @@ -184,9 +185,9 @@ def convert_to_pdf(name1, name2): # Build (docx/pdf) certificates for gott graduates def certificates_gott_build(): - course = "gott_1_fa25_sept" + course = "gott_6_fa25_sept" coursedate = "Fall 2025" - certificate = "gott 1 template.docx" + certificate = "gott 6 template.docx" #course = "gott_4_su25" #certificate = "gott 4 template.docx" @@ -1287,6 +1288,125 @@ def file_renamer(): print("ok") + + + +# Collect developer key metadata for a Canvas account and write an audit TXT report. +def audit_developer_keys(account_id=1, output_path='cache/developer_keys_audit.txt', include_details=True): + params = {'per_page': 100} + developer_keys = fetch(f"/api/v1/accounts/{account_id}/developer_keys", params=params) + if not isinstance(developer_keys, list): + print('Unexpected response when fetching developer keys') + return developer_keys + + records = [] + for key in developer_keys: + summary = { + 'id': key.get('id'), + 'name': key.get('name'), + 'description': key.get('description') or key.get('notes'), + 'owner_user_id': key.get('user_id'), + 'created_at': key.get('created_at'), + 'updated_at': key.get('updated_at'), + 'last_used_at': key.get('last_used_at') or key.get('last_developer_key_use'), + 'workflow_state': key.get('workflow_state'), + 'visible': key.get('visible'), + 'require_scopes': key.get('require_scopes'), + 'redirect_uris': key.get('redirect_uris') or key.get('redirect_uri'), + 'access_token_count': key.get('access_token_count'), + 'scopes': key.get('scopes'), + } + + record = {'summary': summary} + if include_details: + record['detail'] = key + records.append(record) + + if not records: + print('No developer keys found; skipping file write.') + return records + + os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) + with open(output_path, 'w', encoding='utf-8') as handle: + yaml.safe_dump(records, handle, sort_keys=False, allow_unicode=True) + print(f"Wrote {len(records)} developer keys to {output_path}") + return records + + +# Collect external tool metadata for a Canvas account and write an audit TXT report. +def audit_external_tools(account_id=1, output_path='cache/external_tools_audit.txt', include_details=True): + params = {'per_page': 100, 'include[]': ['usage', 'editor_button', 'course_navigation']} + tools = fetch(f"/api/v1/accounts/{account_id}/external_tools", params=params) + if not isinstance(tools, list): + print('Unexpected response when fetching external tools') + return tools + + records = [] + for tool in tools: + detail = tool + if include_details and tool.get('id'): + try: + detail = fetch(f"/api/v1/accounts/{account_id}/external_tools/{tool['id']}") + if not isinstance(detail, dict): + detail = tool + except Exception as exc: + print(f"Failed to fetch detail for external tool {tool.get('id')}: {exc}") + detail = tool + + usage = detail.get('usage') or tool.get('usage') or {} + placements = detail.get('placements') + if isinstance(placements, list): + placements_value = placements + elif isinstance(placements, dict): + placements_value = list(placements.keys()) + else: + placements_value = placements + + summary = { + 'id': detail.get('id'), + 'name': detail.get('name'), + 'description': detail.get('description'), + 'consumer_key': detail.get('consumer_key'), + 'created_at': detail.get('created_at'), + 'updated_at': detail.get('updated_at'), + 'workflow_state': detail.get('workflow_state'), + 'last_used_at': usage.get('last_used_at') or detail.get('last_used_at'), + 'launch_count_30d': usage.get('past_30_days'), + 'launch_count_total': usage.get('total'), + 'privacy_level': detail.get('privacy_level'), + 'url': detail.get('url'), + 'domain': detail.get('domain') or detail.get('custom_fields', {}).get('domain'), + 'default': detail.get('is_default'), + 'placements': placements_value, + } + + record = {'summary': summary} + if include_details: + record['detail'] = detail + records.append(record) + + if not records: + print('No external tools found; skipping file write.') + return records + + os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True) + with open(output_path, 'w', encoding='utf-8') as handle: + yaml.safe_dump(records, handle, sort_keys=False, allow_unicode=True) + print(f"Wrote {len(records)} external tools to {output_path}") + return records + + +# Run a combined audit of developer keys and external tools for a Canvas account. +def run_canvas_integration_audit(account_id=1): + developer_keys = audit_developer_keys(account_id) + external_tools = audit_external_tools(account_id) + print(f"Developer keys audited: {len(developer_keys) if isinstance(developer_keys, list) else 0}") + print(f"External tools audited: {len(external_tools) if isinstance(external_tools, list) else 0}") + return developer_keys, external_tools + + + + # Use api to fix ilearn's authentication method when we can't log in. List. def list_auth(): r = fetch( url + '/api/v1/accounts/1/authentication_providers') @@ -1322,6 +1442,7 @@ if __name__ == "__main__": 15: ['create a week calendar in word (general purpose)', word_calendar_v2], 16: ['create GOTT certificates', certificates_gott_build], 20: ['build_quiz', build_quiz], + 21: ['audit external access', run_canvas_integration_audit], #21: ['certificates_gott_build, certificates_gott_build'] } @@ -1357,3 +1478,4 @@ if __name__ == "__main__": auth = json.loads(open('cache/badgr.txt','r').read()) print ( auth ) +