This commit is contained in:
Peter Howell 2025-10-07 19:08:00 +00:00
parent 776ff0a45b
commit a01ef8084d
4 changed files with 502 additions and 153 deletions

View File

@ -6,12 +6,14 @@ from __future__ import annotations
#import html2markdown as h2m #import html2markdown as h2m
from typing import ItemsView from typing import ItemsView
import requests, codecs, os, re, json, sys, pypandoc import requests, codecs, os, re, json, sys, pypandoc, mimetypes, hashlib
from checker import safe_html from checker import safe_html
from pipelines import header, fetch, url from pipelines import header, fetch, url
from util import clean_title, to_file_friendly from util import clean_title, to_file_friendly
from urllib.parse import quote, urljoin, urlparse
from bs4 import BeautifulSoup as bs from bs4 import BeautifulSoup as bs
from html.parser import HTMLParser from html.parser import HTMLParser
from datetime import datetime, timezone
pagebreak = '\n\n<!-- BREAK -->\n\n<div style="page-break-before: always;"></div>\n\n' pagebreak = '\n\n<!-- BREAK -->\n\n<div style="page-break-before: always;"></div>\n\n'
@ -42,7 +44,6 @@ def test_forums(id=0):
for m in modules: for m in modules:
items[running_index] = '<h2>%s</h2>%s\n' % ( m['name'], pagebreak ) items[running_index] = '<h2>%s</h2>%s\n' % ( m['name'], pagebreak )
running_index += 1 running_index += 1
mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose) mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose)
for I in mod_items: for I in mod_items:
@ -99,7 +100,7 @@ def write_message(fd, view, participants):
write_message(fd, r, participants) write_message(fd, r, participants)
fd.write("</blockquote>\n") fd.write("</blockquote>\n")
def extract_forums(id, course_folder, item_id_to_index, verbose=0): def extract_forums(id, course_folder, item_id_to_index, verbose=0, discussion_link_map=None):
### ###
### FORUMS ### FORUMS
### ###
@ -109,7 +110,6 @@ def extract_forums(id, course_folder, item_id_to_index, verbose=0):
index = [] index = []
forum_f = course_folder + '/forums' forum_f = course_folder + '/forums'
headered = 0 headered = 0
image_count = 0
print("\nFORUMS") print("\nFORUMS")
try: try:
os.mkdir(forum_f) os.mkdir(forum_f)
@ -133,6 +133,8 @@ def extract_forums(id, course_folder, item_id_to_index, verbose=0):
fd.write(message + "\n\n") fd.write(message + "\n\n")
for v in t2['view']: for v in t2['view']:
write_message(fd, v, participants) write_message(fd, v, participants)
if discussion_link_map is not None:
discussion_link_map[p['id']] = f"forums/{easier_filename}.html"
if not headered: index.append( ('<br /><b>Discussion Forums</b><br />') ) if not headered: index.append( ('<br /><b>Discussion Forums</b><br />') )
headered = 1 headered = 1
index.append( ( 'forums/' + easier_filename + '.html', p['title'] ) ) index.append( ( 'forums/' + easier_filename + '.html', p['title'] ) )
@ -196,12 +198,18 @@ def course_download(id=""):
for x in range(9000): items.append(0) for x in range(9000): items.append(0)
video_link_list = [] video_link_list = []
page_local_map = {}
assignment_local_map = {}
file_local_map = {}
discussion_local_map = {}
module_details = []
for m in modules: for m in modules:
items[running_index] = '<h2>%s</h2>%s\n' % ( m['name'], pagebreak ) items[running_index] = '<h2>%s</h2>%s\n' % ( m['name'], pagebreak )
running_index += 1 running_index += 1
mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose) mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose)
module_entry = {'name': m['name'], 'items': []}
for I in mod_items: for I in mod_items:
@ -238,6 +246,17 @@ def course_download(id=""):
# I['page_url'] # I['page_url']
# I['type'] # I['type']
# I['published'] # I['published']
module_entry['items'].append({
'type': I.get('type'),
'title': I.get('title'),
'page_url': I.get('page_url'),
'content_id': I.get('content_id'),
'html_url': I.get('html_url'),
'url': I.get('url'),
'external_url': I.get('external_url'),
'id': I.get('id')
})
module_details.append(module_entry)
# assignments and files have content_id, pages have page_url # assignments and files have content_id, pages have page_url
course_folder = '../course_temps/course_'+id course_folder = '../course_temps/course_'+id
@ -281,14 +300,95 @@ def course_download(id=""):
if not headered: if not headered:
index.append( ('<br /><b>Files</b><br />') ) index.append( ('<br /><b>Files</b><br />') )
headered = 1 headered = 1
index.append( ('files/' + f['filename'], f['filename']) ) relative_path = 'files/' + f['filename']
index.append( (relative_path, f['filename']) )
file_local_map[f['id']] = relative_path
### ###
### PAGES ### PAGES
### ###
pages_f = course_folder + '/pages' pages_f = course_folder + '/pages'
headered = 0 headered = 0
image_count = 0 images_f = os.path.join(pages_f, 'images')
try:
os.makedirs(images_f)
except FileExistsError:
pass
except Exception as e:
print(f" * Unable to ensure images folder: {e}")
image_map = {}
image_counter = 0
def ensure_local_image(src, canvas_override=None):
nonlocal image_counter
if not src:
return (None, None)
original_src = src
if src.startswith('data:'):
return (None, None)
if src.startswith('images/'):
full_rel = f"pages/{src}"
image_map.setdefault(original_src, (src, full_rel))
return image_map[original_src], canvas_override
if src.startswith('pages/'):
page_rel = src.split('pages/', 1)[-1]
page_rel = page_rel if page_rel else src
full_rel = src
image_map.setdefault(original_src, (page_rel, full_rel))
return image_map[original_src], canvas_override
mapped = image_map.get(original_src)
if mapped:
return mapped, canvas_override or original_src
absolute_src = src
if not absolute_src.lower().startswith('http'):
absolute_src = urljoin(url, absolute_src)
mapped = image_map.get(absolute_src)
if mapped:
image_map[original_src] = mapped
return mapped, canvas_override or absolute_src
try:
response = requests.get(absolute_src, headers=header, stream=True, timeout=30)
response.raise_for_status()
except Exception as e:
d(f" * error downloading image {absolute_src}: {e}")
return (None, canvas_override or absolute_src)
content_type = response.headers.get('content-type', '').split(';')[0]
ext = ''
if content_type:
guessed = mimetypes.guess_extension(content_type)
if guessed:
ext = guessed
if not ext:
ext = os.path.splitext(urlparse(absolute_src).path)[1]
if not ext:
ext = '.bin'
ext = ext.lstrip('.')
local_name = f"img_{image_counter}.{ext}"
image_counter += 1
local_path = os.path.join(images_f, local_name)
try:
with open(local_path, 'wb') as fd:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
fd.write(chunk)
except Exception as e:
d(f" * error saving image {absolute_src}: {e}")
return (None, canvas_override or absolute_src)
page_rel = f"images/{local_name}"
full_rel = f"pages/{page_rel}"
image_map[original_src] = (page_rel, full_rel)
if absolute_src != original_src:
image_map[absolute_src] = image_map[original_src]
return image_map[original_src], canvas_override or absolute_src
print("\nPAGES") print("\nPAGES")
try: try:
os.mkdir(pages_f) os.mkdir(pages_f)
@ -296,6 +396,12 @@ def course_download(id=""):
print(" * Pages folder already exists.") print(" * Pages folder already exists.")
page_manifest = {
'course_id': str(id),
'generated_at': datetime.now(timezone.utc).isoformat(),
'pages': {}
}
pages = fetch('/api/v1/courses/' + str(id) + '/pages', verbose) pages = fetch('/api/v1/courses/' + str(id) + '/pages', verbose)
for p in pages: for p in pages:
d(' - %s' % p['title']) d(' - %s' % p['title'])
@ -312,118 +418,110 @@ def course_download(id=""):
index.append( ( 'pages/' + easier_filename + '.html', p['title'] ) ) index.append( ( 'pages/' + easier_filename + '.html', p['title'] ) )
if os.path.exists(this_page_filename): t2 = {'title': p['title']}
soup_infolder = None
soup_in_main = None
page_local_map[p['url']] = f"pages/{easier_filename}.html"
this_page_content = None
fetched_page = fetch('/api/v1/courses/' + str(id) + '/pages/'+p['url'], verbose)
if fetched_page and fetched_page.get('body'):
t2 = fetched_page
soup_infolder = bs(t2['body'], features="lxml")
soup_in_main = bs(t2['body'], features="lxml")
elif os.path.exists(this_page_filename):
d(" - already downloaded %s" % this_page_filename) d(" - already downloaded %s" % this_page_filename)
this_page_content = codecs.open(this_page_filename,'r','utf-8').read() this_page_content = codecs.open(this_page_filename,'r','utf-8').read()
#elif re.search(r'eis-prod',p['url']) or re.search(r'gavilan\.ins',p['url']): soup_infolder = bs(this_page_content, features="lxml")
#elif re.search(r'eis-prod',p['url']): soup_in_main = bs(this_page_content, features="lxml")
# d(' * skipping file behind passwords')
else: else:
t2 = fetch('/api/v1/courses/' + str(id) + '/pages/'+p['url'], verbose) d(' * nothing returned or bad fetch')
if t2 and 'body' in t2 and t2['body']: continue
soup_infolder = bs(t2['body'],features="lxml")
soup_in_main = bs(t2['body'],features="lxml")
a_links = soup_infolder.find_all('a')
for A in a_links:
href = A.get('href')
if href and re.search( r'youtu',href): page_title = (t2.get('title') or p['title']).strip() if isinstance(t2, dict) else p['title']
video_link_list.append( (A.get('href'), A.text, 'pages/'+easier_filename + ".html") )
# Images def strip_leading_heading(soup):
page_images = soup_infolder.find_all('img') if not soup:
page_image_paths = {} return
for I in page_images: first_heading = soup.find(['h1', 'h2'])
src = I.get('src') if first_heading and first_heading.get_text(strip=True) == page_title:
if src: first_heading.decompose()
d(' - %s' % src)
try:
r = requests.get(src,headers=header, stream=True)
mytype = r.headers['content-type']
#print("Response is type: " + str(mytype))
r_parts = mytype.split("/")
ending = r_parts[-1]
if ending=='jpeg': ending = "jpg" strip_leading_heading(soup_infolder)
strip_leading_heading(soup_in_main)
img_full_path = f"{pages_f}/{str(image_count)}.{ending}" a_links = soup_infolder.find_all('a')
local_src = f"{str(image_count)}.{ending}" for A in a_links:
page_image_paths[src] = f"pages/{local_src}" href = A.get('href')
I['src'] = local_src if href and re.search(r'youtu', href):
video_link_list.append((A.get('href'), A.text, 'pages/' + easier_filename + ".html"))
with open(img_full_path, 'wb') as fd: # Images -> ensure local copies
for chunk in r.iter_content(chunk_size=128): for img in soup_infolder.find_all('img'):
fd.write(chunk) mapping, canvas_src = ensure_local_image(img.get('src'), img.get('data-canvas-src'))
image_count += 1 if mapping:
except Exception as e: img['src'] = mapping[0]
d( ' * Error downloading page image, %s' % str(e) ) if canvas_src:
img['data-canvas-src'] = canvas_src
# Repeat for version for main file for img in soup_in_main.find_all('img'):
page_main_images = soup_in_main.find_all('img') mapping, canvas_src = ensure_local_image(img.get('src'), img.get('data-canvas-src'))
for I in page_main_images: if mapping:
src = I.get('src') img['src'] = mapping[1]
if src: if canvas_src:
I['src'] = page_image_paths[src] img['data-canvas-src'] = canvas_src
# STUDIO VIDEOS
# STUDIO VIDEOS pattern = r"custom_arc_media_id%3D([^&]+)"
# Regex pattern to match "custom_arc_media_id%3D" and capture everything for iframe in soup_infolder.find_all("iframe"):
# until the next '&' or end of string src = iframe.get("src")
pattern = r"custom_arc_media_id%3D([^&]+)" if not src:
found_ids = [] continue
match = re.search(pattern, src)
replacement_tag = '''<video width="480" height="320" controls="controls"><source src="http://serverIP_or_domain/location_of_video.mp4" type="video/mp4"></video>''' if match:
videos_log.write(f"page: {p['url']} arc id: {match.group(1)}\n")
# Iterate over all <iframe> tags videos_log.flush()
for iframe in soup_infolder.find_all("iframe"): videos_log.write(f"page: {p['url']} iframe src: {src}\n")
src = iframe.get("src") videos_log.flush()
if src: if 'instructuremedia.com' in src:
# Search for the pattern in the src
match = re.search(pattern, src)
if match:
found_ids.append(match.group(1))
videos_log.write(f"page: {p['url']} iframe src: {src}\n")
videos_log.flush()
match2 = re.search('instructuremedia\.com', src)
if match2:
iframe_response = requests.get(src)
if iframe_response.status_code != 200:
print(f"Failed to retrieve iframe content from: {src}")
continue
videos_log.write(f"succesfully fetched {src}\n")
videos_log.flush()
# Step 4: Parse the iframes HTML
iframe_soup = bs(iframe_response.text, 'html.parser')
video_tag = iframe_soup.find('video')
if video_tag:
# Find the <source> tag(s) within the video
source_tags = video_tag.find_all('source')
# Extract each 'src' attribute
for source_tag in source_tags:
print("Video Source found:", source_tag.get('src'))
videos_log.write(f"page: {p['url']} video src: {source_tag.get('src')}\n")
videos_log.flush()
# WRITE out page
try: try:
this_page_content = f"<h2>{t2['title']}</h2>\n{soup_infolder.prettify()}" iframe_response = requests.get(src, timeout=15)
with codecs.open(this_page_filename, 'w','utf-8') as fd: iframe_response.raise_for_status()
fd.write(this_page_content) except Exception as e:
except: print(f"Failed to retrieve iframe content from: {src} ({e})")
d(' * problem writing page content') continue
## TODO include linked pages even if they aren't in module videos_log.write(f"succesfully fetched {src}\n")
else: videos_log.flush()
d(' * nothing returned or bad fetch') iframe_soup = bs(iframe_response.text, 'html.parser')
for source_tag in iframe_soup.find_all('source'):
videos_log.write(f"page: {p['url']} video src: {source_tag.get('src')}\n")
videos_log.flush()
# WRITE out page (always refresh to ensure local paths)
try:
this_page_content = f"<h2>{t2['title']}</h2>\n{soup_infolder.prettify()}"
with codecs.open(this_page_filename, 'w','utf-8') as fd:
fd.write(this_page_content)
except Exception as e:
d(f' * problem writing page content: {e}')
# write to running log of content in order of module # write to running log of content in order of module
if p and p['url'] in item_id_to_index: if p and p['url'] in item_id_to_index and soup_in_main:
items[ item_id_to_index[ p['url'] ] ] = f"<h2>{t2['title']}</h2>\n{soup_in_main.prettify()}\n{pagebreak}" items[item_id_to_index[p['url']]] = f"<h2>{t2['title']}</h2>\n{soup_in_main.prettify()}\n{pagebreak}"
else: else:
d(' -- This page didnt seem to be in the modules list.') d(' -- This page didnt seem to be in the modules list.')
if this_page_content is not None:
page_hash = hashlib.sha256(this_page_content.encode('utf-8')).hexdigest()
page_manifest['pages'][p['url']] = {
'title': t2.get('title') or p['title'],
'filename': f"pages/{easier_filename}.html",
'hash': page_hash
}
manifest_path = os.path.join(course_folder, 'pages_manifest.json')
with codecs.open(manifest_path, 'w', 'utf-8') as manifest_file:
manifest_file.write(json.dumps(page_manifest, indent=2))
### ###
### ASSIGNMENTS ### ASSIGNMENTS
@ -446,6 +544,7 @@ def course_download(id=""):
try: try:
friendlyfile = to_file_friendly(p['name']) friendlyfile = to_file_friendly(p['name'])
this_assmt_filename = asm_f + '/' + str(p['id'])+"_"+ friendlyfile + '.html' this_assmt_filename = asm_f + '/' + str(p['id'])+"_"+ friendlyfile + '.html'
assignment_local_map[p['id']] = 'assignments/' + str(p['id'])+"_"+ friendlyfile + '.html'
if os.path.exists(this_assmt_filename): if os.path.exists(this_assmt_filename):
d(" - already downloaded %s" % this_assmt_filename) d(" - already downloaded %s" % this_assmt_filename)
this_assmt_content = open(this_assmt_filename,'r').read() this_assmt_content = open(this_assmt_filename,'r').read()
@ -469,7 +568,7 @@ def course_download(id=""):
### FORUMS ### FORUMS
### ###
index.extend( extract_forums(id, course_folder, item_id_to_index, verbose) ) index.extend( extract_forums(id, course_folder, item_id_to_index, verbose, discussion_local_map) )
""" """
@ -506,18 +605,60 @@ def course_download(id=""):
# Create index page of all gathered items # Create index page of all gathered items
index.insert(0, ('modules.html', 'Modules Overview'))
myindex = codecs.open(course_folder+'/index.html','w','utf-8') myindex = codecs.open(course_folder+'/index.html','w','utf-8')
for i in index: for i in index:
if len(i)==2: myindex.write("<a href='"+i[0]+"'>"+i[1]+"</a><br />\n") if len(i)==2:
else: myindex.write(i) myindex.write(f"<a href='{i[0]}'>{i[1]}</a><br />\n")
else:
myindex.write(i)
myindex.close()
def resolve_module_item_link(item):
item_type = (item.get('type') or '').lower()
if item_type == 'page':
return page_local_map.get(item.get('page_url')) or item.get('html_url')
if item_type == 'assignment':
return assignment_local_map.get(item.get('content_id')) or item.get('html_url')
if item_type == 'discussion':
return discussion_local_map.get(item.get('content_id')) or item.get('html_url')
if item_type == 'file':
return file_local_map.get(item.get('content_id')) or item.get('html_url')
if item_type == 'externalurl':
return item.get('external_url')
if item_type in ('externaltool', 'quiz', 'assignmentquiz', 'attendance'):
return item.get('html_url') or item.get('url')
if item_type == 'subheader':
return None
return item.get('html_url') or item.get('url')
module_index_path = course_folder + '/modules.html'
with codecs.open(module_index_path, 'w', 'utf-8') as module_index:
module_index.write('<html><body>\n')
module_index.write(f"<h1>{courseinfo['name']} - Modules</h1>\n")
for module in module_details:
module_index.write(f"<h2>{module['name']}</h2>\n<ul>\n")
for item in module['items']:
title = item.get('title') or '(Untitled)'
item_type = item.get('type') or 'Item'
link = resolve_module_item_link(item)
if item_type.lower() == 'subheader':
module_index.write(f"<li><strong>{title}</strong></li>\n")
continue
if link:
module_index.write(f"<li><a href='{link}'>{title}</a> <em>({item_type})</em></li>\n")
else:
module_index.write(f"<li>{title} <em>({item_type})</em></li>\n")
module_index.write('</ul>\n')
module_index.write('</body></html>\n')
# Full course content in single file # Full course content in single file
print("Writing main course files...") print("Writing main course files...")
mycourse = codecs.open(course_folder+'/fullcourse.raw.html','w','utf-8') mycourse = codecs.open(course_folder+'/fullcourse.raw.html','w','utf-8')
mycourse.write(f"<html><head><base href='file:///C:/Users/phowell/source/repos/course_temps/course_{id}/'></head><body>\n") mycourse.write("<html><head></head><body>\n")
for I in items: for I in items:
if I: if I:
@ -562,6 +703,100 @@ def course_download(id=""):
print(f"couldn't create doc fullcourse page: {e}") print(f"couldn't create doc fullcourse page: {e}")
def restore_canvas_image_sources(html_fragment):
soup = bs(html_fragment, features="lxml")
changed = False
for img in soup.find_all('img'):
canvas_src = img.get('data-canvas-src')
if canvas_src:
img['src'] = canvas_src
del img['data-canvas-src']
changed = True
body = soup.body
if body:
restored = ''.join(str(child) for child in body.children)
else:
restored = soup.decode()
return restored, changed
def _push_page_update(course_num, page_slug, new_content):
endpoint = f"{url}/api/v1/courses/{course_num}/pages/{page_slug}"
data = {'wiki_page[body]': new_content}
response = requests.put(endpoint, headers=header, params=data)
if response.status_code >= 400:
print(f" - Failed to upload {page_slug}: {response.status_code} {response.text}")
return False
print(f" - Uploaded {page_slug}")
return True
def upload_modified_pages(course_id=None, confirm_each=False):
if not course_id:
course_id = input("course id> ").strip()
if not course_id:
print("No course id provided; aborting.")
return
course_folder = f"../course_temps/course_{course_id}"
manifest_path = os.path.join(course_folder, 'pages_manifest.json')
if not os.path.exists(manifest_path):
print(f"No manifest found at {manifest_path}. Run course_download first.")
return
with codecs.open(manifest_path, 'r', 'utf-8') as manifest_file:
manifest = json.loads(manifest_file.read())
pages = manifest.get('pages', {})
if not pages:
print("Manifest contains no page entries.")
return
updated = False
for slug, meta in pages.items():
local_rel = meta.get('filename')
local_path = os.path.join(course_folder, local_rel) if local_rel else None
if not local_rel or not local_rel.startswith('pages/'):
print(f" - Skipping {slug}: not a downloaded page ({local_rel})")
continue
if not local_path or not os.path.exists(local_path):
print(f" - Skipping {slug}: local file missing ({local_rel})")
continue
with codecs.open(local_path, 'r', 'utf-8') as local_file:
local_html = local_file.read()
current_hash = hashlib.sha256(local_html.encode('utf-8')).hexdigest()
if current_hash == meta.get('hash'):
continue
restored_html, changed = restore_canvas_image_sources(local_html)
payload = restored_html if changed else local_html
do_upload = True
if confirm_each:
ans = input(f"Upload changes for {slug}? [y/N]: ").strip().lower()
do_upload = ans in ('y', 'yes')
if not do_upload:
print(f" - Skipped {slug} by user request")
continue
if _push_page_update(course_id, slug, payload):
manifest['pages'][slug]['hash'] = current_hash
updated = True
if updated:
with codecs.open(manifest_path, 'w', 'utf-8') as manifest_file:
manifest_file.write(json.dumps(manifest, indent=2))
print("Updated manifest hashes for uploaded pages.")
else:
print("No page uploads performed.")
def upload_modified_pages_prompt():
upload_modified_pages()
def media_testing(): def media_testing():
user_id = 285 #ksmith user_id = 285 #ksmith
t = f"https://gavilan.instructuremedia.com/api/public/v1/users/{user_id}/media" t = f"https://gavilan.instructuremedia.com/api/public/v1/users/{user_id}/media"
@ -810,9 +1045,8 @@ def make_pages_from_folder(folder='cache/csis6/', course = '20558'):
# Given course, page url, and new content, upload the new revision of a page # Given course, page url, and new content, upload the new revision of a page
def upload_page(course_num,pageurl,new_content): def upload_page(course_num,pageurl,new_content):
print("Repaired page:\n\n") print(f"Uploading page: {pageurl}")
#print new_content #print new_content
print(pageurl)
t3 = url + '/api/v1/courses/' + str(course_num) + '/pages/' + pageurl t3 = url + '/api/v1/courses/' + str(course_num) + '/pages/' + pageurl
xyz = input('Enter 1 to continue and send back to: ' + t3 + ': ') xyz = input('Enter 1 to continue and send back to: ' + t3 + ': ')
#xyz = '1' #xyz = '1'
@ -1984,6 +2218,7 @@ if __name__ == "__main__":
5: ['course download tester', test_forums ], 5: ['course download tester', test_forums ],
6: ['download all a courses pages', grab_course_pages], 6: ['download all a courses pages', grab_course_pages],
7: ['quick site downloader', download_web], 7: ['quick site downloader', download_web],
8: ['upload modified pages back to Canvas', upload_modified_pages_prompt],
17: ['repair ezproxy links', repair_ezproxy_links], 17: ['repair ezproxy links', repair_ezproxy_links],
18: ['create pages from html files', make_pages_from_folder], 18: ['create pages from html files', make_pages_from_folder],
19: ['fetch support page', fetch_support_page], 19: ['fetch support page', fetch_support_page],

View File

@ -1,8 +1,7 @@
import json, re, requests, codecs, sys, time, funcy, os import json, re, requests, codecs, sys, time, funcy, os
import pandas as pd import pandas as pd
from datetime import datetime from datetime import datetime, timedelta, timezone
import pytz import pytz
from datetime import datetime
from util import print_table, int_or_zero, float_or_zero, dept_from_name, num_from_name from util import print_table, int_or_zero, float_or_zero, dept_from_name, num_from_name
from pipelines import fetch, fetch_stream, fetch_collapse, header, url from pipelines import fetch, fetch_stream, fetch_collapse, header, url
from schedules import get_semester_schedule from schedules import get_semester_schedule
@ -2240,13 +2239,19 @@ def instructor_list_to_activate_evals():
#print(mylist) #print(mylist)
# Toggle the eval tool visibility for all courses in the selected Canvas term.
def add_evals(section=0): def add_evals(section=0):
# show or hide? # show or hide?
TERM = 287 term_record = find_term(input('term? '))
SEM = "sp25" if not term_record:
raise ValueError(f"Unknown term")
term_id = term_record.get('canvas_term_id')
if term_id is None:
raise ValueError(f"Canvas term id missing for {term_record}")
term_code = term_record.get('code')
# fetch list of courses? # fetch list of courses?
GET_FRESH_LIST = 0 GET_FRESH_LIST = 0
@ -2261,16 +2266,16 @@ def add_evals(section=0):
ASK = 0 ASK = 0
# are we showing or hiding the course eval link? # are we showing or hiding the course eval link?
HIDE = True HIDE = False
s = [ x.strip() for x in codecs.open(f'cache/{SEM}_eval_sections.txt','r').readlines()] s = [ x.strip() for x in codecs.open(f"cache/{term_code}_eval_sections.txt",'r').readlines()]
s = list(funcy.flatten(s)) s = list(funcy.flatten(s))
s.sort() s.sort()
print(f"Going to activate course evals in these sections: \n{s}\n") print(f"Going to activate course evals in these sections: \n{s}\n")
xyz = input('hit return to continue') xyz = input('hit return to continue')
all_semester_courses = getCoursesInTerm(TERM, GET_FRESH_LIST, 1) all_semester_courses = getCoursesInTerm(term_id, GET_FRESH_LIST, 1)
eval_course_ids = [] eval_course_ids = []
courses = {} courses = {}
for C in all_semester_courses: for C in all_semester_courses:
@ -2814,21 +2819,119 @@ def quick_sem_course_list(term=180):
print(C['name']) print(C['name'])
# Check Canvas for an existing calendar event that matches the provided metadata.
def find_existing_calendar_event(context_code, title, start_at_iso, description="", tolerance_hours=12):
def _normalize_iso(value):
if not value:
return None
if value.endswith('Z'):
value = value[:-1] + '+00:00'
try:
return datetime.fromisoformat(value)
except ValueError:
return None
target_start = _normalize_iso(start_at_iso)
if not target_start:
return None
window_start = (target_start - timedelta(hours=tolerance_hours)).date().isoformat()
window_end = (target_start + timedelta(hours=tolerance_hours)).date().isoformat()
params = {
"context_codes[]": context_code,
"start_date": window_start,
"end_date": window_end,
}
existing_events = fetch("/api/v1/calendar_events", params=params)
if not isinstance(existing_events, list):
print(f"Unable to inspect existing events for context {context_code}: unexpected response")
return None
normalized_title = title.strip().lower() if isinstance(title, str) else ""
normalized_description = description.strip().lower() if isinstance(description, str) else ""
for event in existing_events:
event_title = (event.get('title') or "").strip().lower()
event_description = (event.get('description') or "").strip().lower()
event_start = _normalize_iso(event.get('start_at') or "")
if not event_start:
continue
time_difference = abs((event_start - target_start).total_seconds())
if time_difference > tolerance_hours * 3600:
continue
if event_title == normalized_title:
return event
if normalized_description and event_description == normalized_description:
return event
return None
# Remove all calendar events attached to a course after user confirmation.
def remove_all_course_events():
course_id = input("course id> ").strip()
if not course_id:
print("No course id provided; aborting.")
return
context_code = course_id if course_id.startswith("course_") else f"course_{course_id}"
today = datetime.now(timezone.utc).date()
start_date = (today - timedelta(days=730)).isoformat()
end_date = (today + timedelta(days=365)).isoformat()
print(f"Fetching existing events for {context_code} between {start_date} and {end_date}...")
params = {
"context_codes[]": context_code,
"per_page": 100,
"start_date": start_date,
"end_date": end_date,
}
events = fetch("/api/v1/calendar_events", params=params)
if not events:
print("No events found for this course.")
return
print(f"Found {len(events)} events. Beginning removal...")
for event in events:
event_id = event.get("id")
event_title = event.get("title", "(no title)")
if not event_id:
print(f"Skipping event '{event_title}' with missing id")
continue
print(f"Deleting event '{event_title}' (id {event_id}) in {context_code}...", end=' ')
delete_url = f"{url}/api/v1/calendar_events/{event_id}"
response = requests.delete(delete_url, headers=header)
if response.ok:
print("deleted successfully")
else:
print(f"failed: {response.status_code} {response.text}")
# Create Canvas calendar events for predefined orientation shells from CSV input.
def create_calendar_event(): def create_calendar_event():
events = codecs.open('cache/academic_calendar_2025.csv','r','utf-8').readlines() events = codecs.open('cache/academic_calendar_2025.csv','r','utf-8').readlines()
orientation_shells = ["course_15924","course_19094","course_20862"] orientation_shells = ["course_15924","course_19094","course_20862", "course_23313"]
for ori_shell in orientation_shells: for ori_shell in orientation_shells:
for e in events: for e in events:
(date, title, desc) = e.split(',') if not e.strip():
continue
parts = [part.strip() for part in e.split(',', 2)]
if len(parts) < 3:
continue
date, title, desc = parts
local = pytz.timezone("America/Los_Angeles") local = pytz.timezone("America/Los_Angeles")
naive = datetime.strptime(date, "%Y-%m-%d") naive = datetime.strptime(date, "%Y-%m-%d")
local_dt = local.localize(naive, is_dst=None) local_dt = local.localize(naive, is_dst=None)
utc_dt = local_dt.astimezone(pytz.utc).isoformat() utc_dt = local_dt.astimezone(pytz.utc).isoformat()
print(f"Checking event '{title}' ({date}) in {ori_shell}...", end=' ')
existing_event = find_existing_calendar_event(ori_shell, title, utc_dt, desc)
if existing_event:
existing_id = existing_event.get('id')
print(f"exists as id {existing_id} in {ori_shell}, skipping add")
continue
print(f"no existing event in {ori_shell}, attempting add")
params = { params = {
"calendar_event[context_code]": ori_shell, "calendar_event[context_code]": ori_shell,
@ -2840,12 +2943,21 @@ def create_calendar_event():
u = url + "/api/v1/calendar_events" u = url + "/api/v1/calendar_events"
res = requests.post(u, headers = header, params=params) res = requests.post(u, headers = header, params=params)
result = json.loads(res.text) if res.ok:
print(title,end=" ") try:
if "errors" in result: result = json.loads(res.text)
print(result["errors"]) except json.JSONDecodeError:
if "id" in result: print(f"add completed for '{title}' in {ori_shell} (status {res.status_code}) but response parse failed")
print("ok, id#", result["id"]) continue
new_id = result.get("id")
if new_id:
print(f"added successfully as id {new_id} in {ori_shell} (status {res.status_code})")
elif "errors" in result:
print(f"add failed for '{title}' in {ori_shell}: {result['errors']}")
else:
print(f"add attempted for '{title}' in {ori_shell} with unexpected response {result}")
else:
print(f"add failed for '{title}' in {ori_shell}: {res.status_code} {res.text}")
def utc_to_local(utc_str): def utc_to_local(utc_str):
if not utc_str: return "" if not utc_str: return ""
@ -3076,10 +3188,11 @@ if __name__ == "__main__":
20: ['Get a course info by id',getCourses], 20: ['Get a course info by id',getCourses],
21: ['Reset course conclude date',update_course_conclude], 21: ['Reset course conclude date',update_course_conclude],
22: ['Create calendar events for orientation shells', create_calendar_event], 22: ['Create calendar events for orientation shells', create_calendar_event],
23: ['list all assignments', list_all_assignments], 23: ['Remove all calendar events from a course', remove_all_course_events],
24: ['Bulk unenroll from course', bulk_unenroll], 24: ['list all assignments', list_all_assignments],
25: ['enrollment helper', enrollment_helper], 25: ['Bulk unenroll from course', bulk_unenroll],
26: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid], 26: ['enrollment helper', enrollment_helper],
27: ['g number list enroll to shell id', enroll_gnumber_list_to_courseid],
30: ['* Overview semester start dates',overview_start_dates], 30: ['* Overview semester start dates',overview_start_dates],
31: ['Fine tune term dates and winter session', course_by_depts_terms], 31: ['Fine tune term dates and winter session', course_by_depts_terms],

View File

@ -1,6 +1,6 @@
# schedule.py # schedule.py
# #
# experimenting with manipulating and querying the schedule of courses # manipulating and querying the schedule of courses
#from telnetlib import GA #from telnetlib import GA

View File

@ -23,12 +23,13 @@ from time import mktime
from semesters import human_to_short from semesters import human_to_short
from canvas_secrets import badgr_target, badgr_hd from canvas_secrets import badgr_target, badgr_hd
from docxtpl import DocxTemplate
if os.name != 'posix': if os.name != 'posix':
import win32com.client import win32com.client
import win32com.client as win32 import win32com.client as win32
import pypandoc import pypandoc
from docxtpl import DocxTemplate
import xlwt import xlwt
from pipelines import header, url, fetch, convert_roster_files, move_to_folder from pipelines import header, url, fetch, convert_roster_files, move_to_folder
@ -112,7 +113,7 @@ def build_quiz(filename=""):
this_q = L.strip() this_q = L.strip()
state = "answers" state = "answers"
elif state =="answers": elif state =="answers":
m = re.search( '^Answer\:\s(\w)$', L) m = re.search( r'^Answer\:\s(\w)$', L)
if m: if m:
correct_answer = m.group(1) correct_answer = m.group(1)
qs.append( [this_q, this_as, correct_answer ] ) qs.append( [this_q, this_as, correct_answer ] )
@ -120,7 +121,7 @@ def build_quiz(filename=""):
this_as = { } this_as = { }
correct_answer = "" correct_answer = ""
continue continue
m = re.search( '^(\w)\)\s(.*)$', L) m = re.search( r'^(\w)\)\s(.*)$', L)
if m: if m:
print(m.group(1)) print(m.group(1))
print(m.group(2)) print(m.group(2))
@ -183,7 +184,7 @@ def convert_to_pdf(name1, name2):
# Build (docx/pdf) certificates for gott graduates # Build (docx/pdf) certificates for gott graduates
def certificates_gott_build(): def certificates_gott_build():
course = "gott_1_fa25" course = "gott_1_fa25_sept"
coursedate = "Fall 2025" coursedate = "Fall 2025"
certificate = "gott 1 template.docx" certificate = "gott 1 template.docx"
@ -202,7 +203,7 @@ def certificates_gott_build():
name = row[0].strip() name = row[0].strip()
doc = DocxTemplate(f"cache/certificates/{certificate}") doc = DocxTemplate(f"cache/certificates/{certificate}")
doc.render({ 'name' : name, 'coursedate': coursedate }) doc.render({ 'name' : name, 'coursedate': coursedate })
name_as_filename = re.sub('\s', '_', name.lower()) name_as_filename = re.sub(r'\s', '_', name.lower())
fn = f"cache/certificates/{course}_{name_as_filename}." fn = f"cache/certificates/{course}_{name_as_filename}."
print(fn+'docx') print(fn+'docx')
doc.save(fn+'docx') doc.save(fn+'docx')
@ -409,7 +410,7 @@ def hours_calc():
allout = codecs.open('pa_de_noncred.txt','w','utf-8') allout = codecs.open('pa_de_noncred.txt','w','utf-8')
for f in os.listdir('.'): for f in os.listdir('.'):
m = re.match('pa(\d+)\.txt',f) m = re.match(r'pa(\d+)\.txt',f)
if m: if m:
sec = m.group(1) sec = m.group(1)
# split up the combined sections # split up the combined sections
@ -785,21 +786,21 @@ def job_titles():
lastname = " ".join(parts[1:]) lastname = " ".join(parts[1:])
for fns in first_name_subs: for fns in first_name_subs:
fns_parts = fns.split(',') fns_parts = fns.split(',')
subbed = re.sub('^'+fns_parts[0]+'$',fns_parts[1].strip(), first) subbed = re.sub(r'^'+fns_parts[0]+'$',fns_parts[1].strip(), first)
if first != subbed: if first != subbed:
#print("Subbed %s %s for %s %s" % (subbed,lastname, first, lastname)) #print("Subbed %s %s for %s %s" % (subbed,lastname, first, lastname))
name_to_title[ subbed + " " + lastname ] = x[1].strip() name_to_title[ subbed + " " + lastname ] = x[1].strip()
subbed = re.sub('^'+fns_parts[1].strip()+'$',fns_parts[0], first) subbed = re.sub(r'^'+fns_parts[1].strip()+'$',fns_parts[0], first)
if first != subbed: if first != subbed:
#print("Subbed %s %s for %s %s" % (subbed,lastname, first, lastname)) #print("Subbed %s %s for %s %s" % (subbed,lastname, first, lastname))
name_to_title[ subbed + " " + lastname ] = x[1].strip() name_to_title[ subbed + " " + lastname ] = x[1].strip()
for lns in last_name_subs: for lns in last_name_subs:
fns_parts = lns.split(',') fns_parts = lns.split(',')
subbed = re.sub('^'+fns_parts[0]+'$',fns_parts[1].strip(), lastname) subbed = re.sub(r'^'+fns_parts[0]+'$',fns_parts[1].strip(), lastname)
if lastname != subbed: if lastname != subbed:
#print("L Subbed %s %s for %s %s" % (first, subbed, first, lastname)) #print("L Subbed %s %s for %s %s" % (first, subbed, first, lastname))
name_to_title[ first + " " + subbed ] = x[1].strip() name_to_title[ first + " " + subbed ] = x[1].strip()
subbed = re.sub('^'+fns_parts[1].strip()+'$',fns_parts[0], lastname) subbed = re.sub(r'^'+fns_parts[1].strip()+'$',fns_parts[0], lastname)
if lastname != subbed: if lastname != subbed:
#print("L Subbed %s %s for %s %s" % (first, subbed, first, lastname)) #print("L Subbed %s %s for %s %s" % (first, subbed, first, lastname))
name_to_title[ first + " " + subbed ] = x[1].strip() name_to_title[ first + " " + subbed ] = x[1].strip()
@ -1280,7 +1281,7 @@ def file_renamer():
ff = os.listdir(where) ff = os.listdir(where)
for F in ff: for F in ff:
nn = re.sub("\.jpg$","",F) nn = re.sub(r"\.jpg$","",F)
print("Old name: %s. New name: %s" % (F, nn)) print("Old name: %s. New name: %s" % (F, nn))
os.rename( where+F, where+nn ) os.rename( where+F, where+nn )
print("ok") print("ok")