from __future__ import annotations #saved_titles = json.loads( codecs.open('cache/saved_youtube_titles.json','r','utf-8').read() ) #from calendar import FRIDAY #import html2markdown as h2m from typing import ItemsView import requests, codecs, os, re, json, sys, pypandoc, mimetypes, hashlib from checker import safe_html from pipelines import header, fetch, url from util import clean_title, to_file_friendly from urllib.parse import quote, urljoin, urlparse from bs4 import BeautifulSoup as bs from html.parser import HTMLParser from datetime import datetime, timezone pagebreak = '\n\n\n\n
\n\n' DBG = 1 items = [] def d(s): global DBG if DBG: print(s) def test_forums(id=0): if not id: id = input("ID of course to check? ") verbose = 1 courseinfo = fetch('/api/v1/courses/' + str(id), verbose ) item_id_to_index = {} items_inorder = ["" + courseinfo['name'] + "\n\n" + pagebreak,] running_index = 1 modules = fetch('/api/v1/courses/' + str(id) + '/modules',verbose) items = [] for x in range(9000): items.append(0) for m in modules: items[running_index] = '

%s

%s\n' % ( m['name'], pagebreak ) running_index += 1 mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose) for I in mod_items: if I['type'] in ['SubHeader', 'Page', 'Quiz', 'Discussion', 'ExternalUrl' ] or 'content_id' in I: running_index += 1 if I['type'] == 'SubHeader': #print('subheader: ' + str(I)) items[running_index] = '

%s

\n' % str(json.dumps(I,indent=2)) if I['type'] == 'Page': item_id_to_index[ I['page_url'] ] = running_index if I['type'] == 'Quiz': item_id_to_index[ I['content_id'] ] = running_index if I['type'] == 'Discussion': item_id_to_index[ I['content_id'] ] = running_index if I['type'] == 'ExternalUrl': items[running_index] = "%s
\n\n" % (I['external_url'], I['title']) # ? #if 'content_id' in I: # item_id_to_index[ I['content_id'] ] = running_index else: print("What is this item? " + str(I)) #items_inorder.append('Not included: '+ I['title'] + '(a ' + I['type'] + ')\n\n\n' ) # I['title'] # I['content_id'] # I['page_url'] # I['type'] # I['published'] # assignments and files have content_id, pages have page_url course_folder = '../course_temps/course_'+id index = [] try: os.mkdir(course_folder) except: print("Course folder exists.") index.extend( extract_forums(id, course_folder, item_id_to_index, verbose) ) print(json.dumps(index,indent=2)) def write_message(fd, view, participants): fd.write(f"
\nfrom {participants[view['user_id']]['display_name']}:
\n{view['message']}\n
") if 'replies' in view: for r in view['replies']: write_message(fd, r, participants) fd.write("
\n") def extract_forums(id, course_folder, item_id_to_index, verbose=0, discussion_link_map=None): ### ### FORUMS ### global items index = [] forum_f = course_folder + '/forums' headered = 0 print("\nFORUMS") try: os.mkdir(forum_f) forums = fetch('/api/v1/courses/' + str(id) + '/discussion_topics', verbose) for p in forums: p['title'] = clean_title(p['title']) forum_id = p['id'] easier_filename = p['title'] for a in 'title,posted_at,published'.split(','): print(str(p[a]), "\t", end=' ') print("") t2 = fetch(f"/api/v1/courses/{id}/discussion_topics/{forum_id}", verbose) title = t2['title'] message = t2['message'] t2 = fetch(f"/api/v1/courses/{id}/discussion_topics/{forum_id}/view", verbose) try: participants = {x['id']:x for x in t2['participants']} with codecs.open(forum_f + '/' + easier_filename + '.html', 'w','utf-8') as fd: fd.write(f"

{title}

\n") fd.write(message + "\n\n") for v in t2['view']: write_message(fd, v, participants) if discussion_link_map is not None: discussion_link_map[p['id']] = f"forums/{easier_filename}.html" if not headered: index.append( ('
Discussion Forums
') ) headered = 1 index.append( ( 'forums/' + easier_filename + '.html', p['title'] ) ) # write to running log of content in order of module if p['id'] in item_id_to_index: items[ item_id_to_index[ p['id'] ] ] = f"

{title}

\n\n{message}\n\n{pagebreak}" else: print(' This forum didnt seem to be in the modules list.') except Exception as e: print("Error here:", e) #print p #print results_dict except Exception as e: print("** Forum folder seems to exist. Skipping those.") print(e) return index # # # # # # todo: include front page. # todo: clean html # todo: toc # # # Download everything interesting in a course to a local folder # Build a master file with the entire class content # Adjust image paths in aggregated snippets so they work from the course root. def adjust_fullcourse_image_sources(html_fragment): if not html_fragment: return html_fragment def _prefix_images(match): prefix = match.group(1) path = match.group(2) normalized = path.lstrip('./') if normalized.lower().startswith('pages/'): return f"{prefix}{normalized}" return f"{prefix}pages/{normalized}" src_pattern = re.compile(r'(]+?\bsrc\s*=\s*[\'"])(?:\./)?(images/[^\'"]*)', re.IGNORECASE) html_fragment = src_pattern.sub(_prefix_images, html_fragment) canvas_pattern = re.compile(r'(]+?\bdata-canvas-src\s*=\s*[\'"])(?:\./)?(images/[^\'"]*)', re.IGNORECASE) html_fragment = canvas_pattern.sub(_prefix_images, html_fragment) srcset_pattern = re.compile(r'(]+?\bsrcset\s*=\s*[\'"])([^\'"]*)([\'"])', re.IGNORECASE | re.DOTALL) def _prefix_srcset(match): prefix = match.group(1) value = match.group(2) suffix = match.group(3) entries = [] changed = False for chunk in value.split(','): chunk = chunk.strip() if not chunk: continue parts = chunk.split() url = parts[0] descriptors = parts[1:] normalized = url.lstrip('./') if normalized.lower().startswith('pages/'): new_url = url elif normalized.lower().startswith('images/'): new_url = f"pages/{normalized}" changed = True else: new_url = url descriptor_text = ' '.join(descriptors) entry = f"{new_url} {descriptor_text}".strip() entries.append(entry) if not changed: return match.group(0) return f"{prefix}{', '.join(entries)}{suffix}" html_fragment = srcset_pattern.sub(_prefix_srcset, html_fragment) return html_fragment def course_download(id=""): global items if not id: id = input("ID of course to check? ") # temp hard code #id = "21284" verbose = 0 PAGES_ONLY = 0 videos_log = codecs.open('cache/accessible_check_log.txt','w','utf-8') save_file_types = ['application/pdf','application/docx','image/jpg','image/png','image/gif','image/webp','application/vnd.openxmlformats-officedocument.wordprocessingml.document'] courseinfo = fetch('/api/v1/courses/' + str(id), verbose ) # reverse lookup into items array item_id_to_index = {} modules = fetch('/api/v1/courses/' + str(id) + '/modules',verbose) # headers / module names items = [f"

{courseinfo['name']}

\n{pagebreak}",] running_index = 1 for x in range(9000): items.append(0) video_link_list = [] page_local_map = {} assignment_local_map = {} file_local_map = {} discussion_local_map = {} module_details = [] canvas_host = urlparse(url).hostname if url else None for m in modules: items[running_index] = '

%s

%s\n' % ( m['name'], pagebreak ) running_index += 1 mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose) module_entry = {'name': m['name'], 'items': []} for I in mod_items: if I['type'] in ['SubHeader', 'Page', 'Quiz', 'Discussion', 'ExternalUrl' ] or 'content_id' in I: running_index += 1 if I['type'] == 'SubHeader': #print('subheader: ' + str(I)) items[running_index] = f"

{I['title']}

\n" if I['type'] == 'Page': item_id_to_index[ I['page_url'] ] = running_index if I['type'] == 'Quiz': item_id_to_index[ I['content_id'] ] = running_index if I['type'] == 'Discussion': item_id_to_index[ I['content_id'] ] = running_index if I['type'] == 'ExternalUrl': items[running_index] = "%s
\n\n" % (I['external_url'], I['title']) # ? #if 'content_id' in I: # item_id_to_index[ I['content_id'] ] = running_index else: print("What is this item? " + str(I)) #items_inorder.append('Not included: '+ I['title'] + '(a ' + I['type'] + ')\n\n\n' ) # I['title'] # I['content_id'] # I['page_url'] # I['type'] # I['published'] module_entry['items'].append({ 'type': I.get('type'), 'title': I.get('title'), 'page_url': I.get('page_url'), 'content_id': I.get('content_id'), 'html_url': I.get('html_url'), 'url': I.get('url'), 'external_url': I.get('external_url'), 'id': I.get('id') }) module_details.append(module_entry) # assignments and files have content_id, pages have page_url course_folder = '../course_temps/course_'+id # list of each item, organized by item type. Tuples of (url,title) index = [] try: os.mkdir(course_folder) except: print("Course folder exists.") ### ### FILES ### if not PAGES_ONLY: files_f = course_folder + '/files' headered = 0 print("\nFILES") try: os.mkdir(files_f) except: print(" * Files folder already exists.") files = fetch('/api/v1/courses/' + str(id) + '/files', verbose) print("LISTING COURSE FILES") for f in files: for arg in 'filename,content-type,size,url'.split(','): if arg=='size': f['size'] = str(int(f['size']) / 1000) + 'k' if f['content-type'] in save_file_types: d(' - %s' % f['filename']) if not os.path.exists(files_f + '/' + f['filename']): r = requests.get(f['url'],headers=header, stream=True) with open(files_f + '/' + f['filename'], 'wb') as fd: for chunk in r.iter_content(chunk_size=128): fd.write(chunk) else: d(" - already downloaded %s" % files_f + '/' + f['filename']) if not headered: index.append( ('
Files
') ) headered = 1 relative_path = 'files/' + f['filename'] index.append( (relative_path, f['filename']) ) file_local_map[f['id']] = relative_path ### ### PAGES ### pages_f = course_folder + '/pages' headered = 0 images_f = os.path.join(pages_f, 'images') try: os.makedirs(images_f) except FileExistsError: pass except Exception as e: print(f" * Unable to ensure images folder: {e}") image_map = {} image_counter = 0 def ensure_local_image(src, canvas_override=None): nonlocal image_counter if not src: return (None, None) original_src = src if src.startswith('data:'): return (None, None) if src.startswith('images/'): full_rel = f"pages/{src}" image_map.setdefault(original_src, (src, full_rel)) return image_map[original_src], canvas_override if src.startswith('pages/'): page_rel = src.split('pages/', 1)[-1] page_rel = page_rel if page_rel else src full_rel = src image_map.setdefault(original_src, (page_rel, full_rel)) return image_map[original_src], canvas_override mapped = image_map.get(original_src) if mapped: return mapped, canvas_override or original_src absolute_src = src if not absolute_src.lower().startswith('http'): absolute_src = urljoin(url, absolute_src) mapped = image_map.get(absolute_src) if mapped: image_map[original_src] = mapped return mapped, canvas_override or absolute_src try: target_host = urlparse(absolute_src).hostname request_headers = header if not canvas_host or target_host == canvas_host else None response = requests.get(absolute_src, headers=request_headers, stream=True, timeout=30) response.raise_for_status() except Exception as e: d(f" * error downloading image {absolute_src}: {e}") return (None, canvas_override or absolute_src) content_type = response.headers.get('content-type', '').split(';')[0] ext = '' if content_type: guessed = mimetypes.guess_extension(content_type) if guessed: ext = guessed if not ext: ext = os.path.splitext(urlparse(absolute_src).path)[1] if not ext: ext = '.bin' ext = ext.lstrip('.') local_name = f"img_{image_counter}.{ext}" image_counter += 1 local_path = os.path.join(images_f, local_name) try: with open(local_path, 'wb') as fd: for chunk in response.iter_content(chunk_size=8192): if chunk: fd.write(chunk) except Exception as e: d(f" * error saving image {absolute_src}: {e}") return (None, canvas_override or absolute_src) page_rel = f"images/{local_name}" full_rel = f"pages/{page_rel}" image_map[original_src] = (page_rel, full_rel) if absolute_src != original_src: image_map[absolute_src] = image_map[original_src] return image_map[original_src], canvas_override or absolute_src print("\nPAGES") try: os.mkdir(pages_f) except: print(" * Pages folder already exists.") page_manifest = { 'course_id': str(id), 'generated_at': datetime.now(timezone.utc).isoformat(), 'pages': {} } pages = fetch('/api/v1/courses/' + str(id) + '/pages', verbose) for p in pages: d(' - %s' % p['title']) p['title'] = clean_title(p['title']) easier_filename = clean_title(p['url']) this_page_filename = "%s/%s.html" % (pages_f, easier_filename) #for a in 'title,updated_at,published'.split(','): # print(str(p[a]), "\t", end=' ') if not headered: index.append( ('
Pages
') ) headered = 1 index.append( ( 'pages/' + easier_filename + '.html', p['title'] ) ) t2 = {'title': p['title']} soup_infolder = None soup_in_main = None page_local_map[p['url']] = f"pages/{easier_filename}.html" this_page_content = None fetched_page = fetch('/api/v1/courses/' + str(id) + '/pages/'+p['url'], verbose) if fetched_page and fetched_page.get('body'): t2 = fetched_page soup_infolder = bs(t2['body'], features="lxml") soup_in_main = bs(t2['body'], features="lxml") elif os.path.exists(this_page_filename): d(" - already downloaded %s" % this_page_filename) this_page_content = codecs.open(this_page_filename,'r','utf-8').read() soup_infolder = bs(this_page_content, features="lxml") soup_in_main = bs(this_page_content, features="lxml") else: d(' * nothing returned or bad fetch') continue page_title = (t2.get('title') or p['title']).strip() if isinstance(t2, dict) else p['title'] def strip_leading_heading(soup): if not soup: return first_heading = soup.find(['h1', 'h2']) if first_heading and first_heading.get_text(strip=True) == page_title: first_heading.decompose() strip_leading_heading(soup_infolder) strip_leading_heading(soup_in_main) a_links = soup_infolder.find_all('a') for A in a_links: href = A.get('href') if href and re.search(r'youtu', href): video_link_list.append((A.get('href'), A.text, 'pages/' + easier_filename + ".html")) # Images -> ensure local copies for img in soup_infolder.find_all('img'): mapping, canvas_src = ensure_local_image(img.get('src'), img.get('data-canvas-src')) if mapping: img['src'] = mapping[0] if canvas_src: img['data-canvas-src'] = canvas_src for img in soup_in_main.find_all('img'): mapping, canvas_src = ensure_local_image(img.get('src'), img.get('data-canvas-src')) if mapping: img['src'] = mapping[1] if canvas_src: img['data-canvas-src'] = canvas_src # STUDIO VIDEOS pattern = r"custom_arc_media_id%3D([^&]+)" for iframe in soup_infolder.find_all("iframe"): src = iframe.get("src") if not src: continue match = re.search(pattern, src) if match: videos_log.write(f"page: {p['url']} arc id: {match.group(1)}\n") videos_log.flush() videos_log.write(f"page: {p['url']} iframe src: {src}\n") videos_log.flush() if 'instructuremedia.com' in src: try: iframe_response = requests.get(src, timeout=15) iframe_response.raise_for_status() except Exception as e: print(f"Failed to retrieve iframe content from: {src} ({e})") continue videos_log.write(f"succesfully fetched {src}\n") videos_log.flush() iframe_soup = bs(iframe_response.text, 'html.parser') for source_tag in iframe_soup.find_all('source'): videos_log.write(f"page: {p['url']} video src: {source_tag.get('src')}\n") videos_log.flush() # WRITE out page (always refresh to ensure local paths) try: this_page_content = f"

{t2['title']}

\n{soup_infolder.prettify()}" with codecs.open(this_page_filename, 'w','utf-8') as fd: fd.write(this_page_content) except Exception as e: d(f' * problem writing page content: {e}') # write to running log of content in order of module if p and p['url'] in item_id_to_index and soup_in_main: items[item_id_to_index[p['url']]] = f"

{t2['title']}

\n{soup_in_main.prettify()}\n{pagebreak}" else: d(' -- This page didnt seem to be in the modules list.') if this_page_content is not None: page_hash = hashlib.sha256(this_page_content.encode('utf-8')).hexdigest() page_manifest['pages'][p['url']] = { 'title': t2.get('title') or p['title'], 'filename': f"pages/{easier_filename}.html", 'hash': page_hash } manifest_path = os.path.join(course_folder, 'pages_manifest.json') with codecs.open(manifest_path, 'w', 'utf-8') as manifest_file: manifest_file.write(json.dumps(page_manifest, indent=2)) ### ### ASSIGNMENTS ### if not PAGES_ONLY: headered = 0 asm_f = course_folder + '/assignments' print("\nASSIGNMENTS") try: os.mkdir(asm_f) except: d(" - Assignments dir exists") asm = fetch('/api/v1/courses/' + str(id) + '/assignments', verbose) for p in asm: d(' - %s' % p['name']) try: friendlyfile = to_file_friendly(p['name']) this_assmt_filename = asm_f + '/' + str(p['id'])+"_"+ friendlyfile + '.html' assignment_local_map[p['id']] = 'assignments/' + str(p['id'])+"_"+ friendlyfile + '.html' if os.path.exists(this_assmt_filename): d(" - already downloaded %s" % this_assmt_filename) this_assmt_content = open(this_assmt_filename,'r').read() else: t2 = fetch('/api/v1/courses/' + str(id) + '/assignments/'+str(p['id']), verbose) with codecs.open(this_assmt_filename, 'w','utf-8') as fd: this_assmt_content = "

%s

\n%s\n\n" % (t2['name'], t2['description']) fd.write(this_assmt_content) if not headered: index.append( ('
Assignments
') ) headered = 1 index.append( ('assignments/' + str(p['id'])+"_"+friendlyfile + '.html', p['name']) ) # write to running log of content in order of module if p['id'] in item_id_to_index: items[ item_id_to_index[ p['url'] ] ] = this_assmt_content+'\n\n'+pagebreak except Exception as e: d(' * Problem %s' % str(e)) ### ### FORUMS ### index.extend( extract_forums(id, course_folder, item_id_to_index, verbose, discussion_local_map) ) """ ### ### QUIZZES ### # get a list external urls headered = 0 t = url + '/api/v1/courses/' + str(id) + '/modules' while t: t = fetch(t) mods = results results = [] for m in mods: results = [] t2 = url + '/api/v1/courses/' + str(id) + '/modules/' + str(m['id']) + '/items' while t2: t2 = fetch(t2) items = results for i in items: #print i if i['type'] == "ExternalUrl": #print i for j in 'id,title,external_url'.split(','): print unicode(i[j]), "\t", print "" if not headered: index.append( ('
External Links
') ) headered = 1 index.append( (i['external_url'], i['title']) ) """ # Create index page of all gathered items index.insert(0, ('modules.html', 'Modules Overview')) myindex = codecs.open(course_folder+'/index.html','w','utf-8') for i in index: if len(i)==2: myindex.write(f"{i[1]}
\n") else: myindex.write(i) myindex.close() def resolve_module_item_link(item): item_type = (item.get('type') or '').lower() if item_type == 'page': return page_local_map.get(item.get('page_url')) or item.get('html_url') if item_type == 'assignment': return assignment_local_map.get(item.get('content_id')) or item.get('html_url') if item_type == 'discussion': return discussion_local_map.get(item.get('content_id')) or item.get('html_url') if item_type == 'file': return file_local_map.get(item.get('content_id')) or item.get('html_url') if item_type == 'externalurl': return item.get('external_url') if item_type in ('externaltool', 'quiz', 'assignmentquiz', 'attendance'): return item.get('html_url') or item.get('url') if item_type == 'subheader': return None return item.get('html_url') or item.get('url') module_index_path = course_folder + '/modules.html' with codecs.open(module_index_path, 'w', 'utf-8') as module_index: module_index.write('\n') module_index.write(f"

{courseinfo['name']} - Modules

\n") for module in module_details: module_index.write(f"

{module['name']}

\n
    \n") for item in module['items']: title = item.get('title') or '(Untitled)' item_type = item.get('type') or 'Item' link = resolve_module_item_link(item) if item_type.lower() == 'subheader': module_index.write(f"
  • {title}
  • \n") continue if link: module_index.write(f"
  • {title} ({item_type})
  • \n") else: module_index.write(f"
  • {title} ({item_type})
  • \n") module_index.write('
\n') module_index.write('\n') # Full course content in single file print("Writing main course files...") mycourse = codecs.open(course_folder+'/fullcourse.raw.html','w','utf-8') mycourse.write("\n") for I in items: if I: mycourse.write(adjust_fullcourse_image_sources(I)) mycourse.write("\n") temp = open('cache/coursedump.txt','w') temp.write( "items: " + json.dumps(items,indent=2) ) temp.write("\n\n\n") temp.write( "index: " + json.dumps(index,indent=2) ) temp.write("\n\n\n") #temp.write( "items_inorder: " + json.dumps(items_inorder,indent=2) ) #temp.write("\n\n\n") temp.write( "item_id_to_index: " + json.dumps(item_id_to_index,indent=2) ) if video_link_list: mycourse.write('\n

Videos Linked in Pages

\n') for V in video_link_list: video_url, txt, pg = V mycourse.write("\n") mycourse.write("
"+txt+" on " + pg + "
\n") mycourse.close() try: pypandoc.convert_file(course_folder+'/fullcourse.raw.html', 'html', outputfile=course_folder+"/fullcourse.html") except Exception as e: print(f"couldn't create html fullcourse page: {e}") try: pypandoc.convert_file(course_folder+'/fullcourse.html', 'md', outputfile=course_folder+"/fullcourse.md") except Exception as e: print(f"couldn't create markdown fullcourse page: {e}") try: pypandoc.convert_file(course_folder+'/fullcourse.html', 'docx', outputfile=course_folder+"/fullcourse.docx") except Exception as e: print(f"couldn't create doc fullcourse page: {e}") def restore_canvas_image_sources(html_fragment): soup = bs(html_fragment, features="lxml") changed = False for img in soup.find_all('img'): canvas_src = img.get('data-canvas-src') if canvas_src: img['src'] = canvas_src del img['data-canvas-src'] changed = True body = soup.body if body: restored = ''.join(str(child) for child in body.children) else: restored = soup.decode() return restored, changed def _push_page_update(course_num, page_slug, new_content): endpoint = f"{url}/api/v1/courses/{course_num}/pages/{page_slug}" data = {'wiki_page[body]': new_content} response = requests.put(endpoint, headers=header, params=data) if response.status_code >= 400: print(f" - Failed to upload {page_slug}: {response.status_code} {response.text}") return False print(f" - Uploaded {page_slug}") return True def upload_modified_pages(course_id=None, confirm_each=False): if not course_id: course_id = input("course id> ").strip() if not course_id: print("No course id provided; aborting.") return course_folder = f"../course_temps/course_{course_id}" manifest_path = os.path.join(course_folder, 'pages_manifest.json') if not os.path.exists(manifest_path): print(f"No manifest found at {manifest_path}. Run course_download first.") return with codecs.open(manifest_path, 'r', 'utf-8') as manifest_file: manifest = json.loads(manifest_file.read()) pages = manifest.get('pages', {}) if not pages: print("Manifest contains no page entries.") return updated = False for slug, meta in pages.items(): local_rel = meta.get('filename') local_path = os.path.join(course_folder, local_rel) if local_rel else None if not local_rel or not local_rel.startswith('pages/'): print(f" - Skipping {slug}: not a downloaded page ({local_rel})") continue if not local_path or not os.path.exists(local_path): print(f" - Skipping {slug}: local file missing ({local_rel})") continue with codecs.open(local_path, 'r', 'utf-8') as local_file: local_html = local_file.read() current_hash = hashlib.sha256(local_html.encode('utf-8')).hexdigest() if current_hash == meta.get('hash'): continue restored_html, changed = restore_canvas_image_sources(local_html) payload = restored_html if changed else local_html do_upload = True if confirm_each: ans = input(f"Upload changes for {slug}? [y/N]: ").strip().lower() do_upload = ans in ('y', 'yes') if not do_upload: print(f" - Skipped {slug} by user request") continue if _push_page_update(course_id, slug, payload): manifest['pages'][slug]['hash'] = current_hash updated = True if updated: with codecs.open(manifest_path, 'w', 'utf-8') as manifest_file: manifest_file.write(json.dumps(manifest, indent=2)) print("Updated manifest hashes for uploaded pages.") else: print("No page uploads performed.") def upload_modified_pages_prompt(): upload_modified_pages() def media_testing(): user_id = 285 #ksmith t = f"https://gavilan.instructuremedia.com/api/public/v1/users/{user_id}/media" media = fetch(t,verbose=1,media=1) print(media) def pan_testing(): course_folder = '../course_temps/course_6862' pypandoc.convert_file(course_folder+'/fullcourse.md', 'html', outputfile=course_folder+"/fullcourse.v2.html") # Given course, page url, and new content, upload the new revision of a page def create_page(course_num,new_title,new_content): t3 = url + '/api/v1/courses/' + str(course_num) + '/pages' #xyz = raw_input('Enter 1 to continue and send back to: ' + t3 + ': ') #print("Creating page: %s\nwith content:%s\n\n\n" % (new_title,new_content)) print("Creating page: %s" % new_title) xyz = input('type 1 to confirm: ') #'1' if xyz=='1': data = {'wiki_page[title]':new_title, 'wiki_page[body]':new_content} r3 = requests.post(t3, headers=header, params=data) print(r3) print('ok') def md_to_course(): #input = 'C:/Users/peter/Nextcloud/Documents/gavilan/student_orientation.txt' #output = 'C:/Users/peter/Nextcloud/Documents/gavilan/stu_orientation/student_orientation.html' id = "11214" infile = 'cache/pages/course_%s.md' % id output = 'cache/pages/course_%s_fixed.html' % id output3 = pypandoc.convert_file(infile, 'html', format='md', outputfile=output) xx = codecs.open(output,'r','utf-8').read() soup = bs( xx, features="lxml" ) soup.encode("utf-8") current_page = "" current_title = "" for child in soup.body.children: if child.name == "h1" and not current_title: current_title = child.get_text() elif child.name == "h1": upload_page(id,current_title,current_page) current_title = child.get_text() current_page = "" print( "Next page: %s" % current_title ) else: #print(dir(child)) if 'prettify' in dir(child): current_page += child.prettify(formatter="html") else: current_page += child.string upload_page(id,current_title,current_page) print("Done") # DL pages only def grab_course_pages(course_num=-1): global results, results_dict, url, header # course_num = raw_input("What is the course id? ") if course_num<0: course_num = input("Id of course? ") else: course_num = str(course_num) modpagelist = [] modurllist = [] # We want things in the order of the modules t4 = url + '/api/v1/courses/'+str(course_num)+'/modules?include[]=items' results = fetch(t4) i = 1 pageout = codecs.open('cache/pages/course_'+str(course_num)+'.html','w','utf-8') pageoutm = codecs.open('cache/pages/course_'+str(course_num)+'.md','w','utf-8') divider = "\n### " for M in results: print("Module Name: " + M['name']) for I in M['items']: if I['type']=='Page': modpagelist.append(I['title']) modurllist.append(I['page_url']) pageout.write(divider+I['title']+'### '+I['page_url']+'\n') easier_filename = clean_title(I['page_url']) print(" " + str(i) + ". " + I['title']) t2 = url + '/api/v1/courses/' + str(course_num) + '/pages/'+I['page_url'] print('Getting: ' + t2) mypage = fetch(t2) fixed = safe_html(mypage['body']) if fixed: #markdown = h2m.convert(fixed) #p_data = pandoc.read(mypage['body']) markdown = pypandoc.convert_text("\n

" + I['title'] + "

\n" + mypage['body'], 'md', format='html') pageout.write(fixed+'\n') pageoutm.write(markdown+'\n') pageout.flush() i += 1 pageout.close() pageoutm.close() # Download, clean html, and reupload page def update_page(): global results, results_dict, url, header # course_num = raw_input("What is the course id? ") course_num = '6862' t = url + '/api/v1/courses/' + str(course_num) + '/pages' while t: t = fetch(t) pages = results results = [] mypagelist = [] myurllist = [] modpagelist = [] modurllist = [] for p in pages: p['title'] = clean_title(p['title']) mypagelist.append(p['title']) myurllist.append(p['url']) easier_filename = clean_title(p['url']) #for a in 'title,updated_at,published'.split(','): # print unicode(p[a]), "\t", #print "" # We want things in the order of the modules t4 = url + '/api/v1/courses/'+str(course_num)+'/modules?include[]=items' while t4: t4 = fetch(t4) mods = results results = [] i = 1 print("\nWhat page do you want to repair?") for M in mods: print("Module Name: " + M['name']) for I in M['items']: if I['type']=='Page': modpagelist.append(I['title']) modurllist.append(I['page_url']) print(" " + str(i) + ". " + I['title']) i += 1 choice = input("\n> ") choice = int(choice) - 1 chosen_url = modurllist[choice] print('Fetching: ' + modpagelist[choice]) t2 = url + '/api/v1/courses/' + str(course_num) + '/pages/'+chosen_url print('From: ' + t2) results_dict = {} while(t2): t2 = fetch(t2) mypage = results_dict fixed_page = safe_html(mypage['body']) upload_page(course_num,chosen_url,fixed_page) # given dict of file info (from files api), construct an img tag that works in a page #def file_to_img_tag(f, alt, course, soup): # #tag = f"\"{f['filename']}\"" # return T def html_file_to_page(filename, course, tags): try: soup = bs(codecs.open(filename,'r', 'utf-8').read(), 'html.parser') except Exception as e: print(f"Exception on {filename}: {e}") return img_tags = soup.find_all('img') result = {'title': soup.title.text if soup.title else ''} result['title'].strip() for img in img_tags: src = img['src'] try: alt = img['alt'] except: alt = src orig_filename = os.path.basename(src) if orig_filename in tags: T = soup.new_tag(name='img', src=f"https://ilearn.gavilan.edu/courses/{course}/files/{tags[orig_filename]['id']}/preview") T['id'] = tags[orig_filename]['id'] T['alt'] = alt T['data-api-endpoint'] = f"https://ilearn.gavilan.edu/api/v1/courses/{course}/files/{tags[orig_filename]['id']}" T['data-api-returntype'] = "File" img.replace_with(T) print( f" replaced image: {src} alt: {alt}") else: print( f" couldn't find replacement image: {src} alt: {alt}") outfile = codecs.open(filename+"_mod.html", 'w', 'utf-8') outfile.write( soup.prettify() ) outfile.close() result['body'] = ''.join(map(str, soup.body.contents)) if soup.body else '' return result def create_new_page(course_id, title, body): print(f"Creating page: {title}, length: {len(body)}") request = f"{url}/api/v1/courses/{course_id}/pages" print(request) data = { 'wiki_page[title]': title, 'wiki_page[body]': body } r3 = requests.post(request, headers=header, data=data) try: result = json.loads(r3.text) print( f" + ok: {result['url']}") except: print(" - problem creating page?") # Given a folder full of html pages and their linked images, create Canvas PAGES of them def make_pages_from_folder(folder='cache/csis6/', course = '20558'): if 0: request = f"{url}/api/v1/courses/{course}/files" print("Fetching course files") files = fetch(request) tempfile = codecs.open('cache/csis6filelist.json','w','utf-8') tempfile.write(json.dumps(files)) tempfile.close() if 1: files = json.loads( codecs.open('cache/csis6filelist.json', 'r', 'utf-8').read()) course_files = {f['filename']: f for f in files} tags = {} for f in files: if f['filename'].lower().endswith('.jpg') or f['filename'].lower().endswith('.png'): tags[f['filename']] = f contents = os.listdir(folder) contents = ['welcome.html','welcome2.html', 'welcome3.html'] print(contents) for f in contents: m = re.search(r'^(.*)\.(html?)$', f) if m: print(f"html file: {m.group(1)}, extension: {m.group(2)}") newpage = html_file_to_page(folder+f, course, tags) create_new_page(course, newpage['title'], newpage['body']) else: m = re.search(r'^(.*)\.(.*)$', f) if m: print(f"other file: {m.group(1)}, extension: {m.group(2)}") else: print(f"unknown file: {f}") # Given course, page url, and new content, upload the new revision of a page def upload_page(course_num,pageurl,new_content): print(f"Uploading page: {pageurl}") #print new_content t3 = url + '/api/v1/courses/' + str(course_num) + '/pages/' + pageurl xyz = input('Enter 1 to continue and send back to: ' + t3 + ': ') #xyz = '1' if xyz=='1': data = {'wiki_page[body]':new_content} r3 = requests.put(t3, headers=header, params=data) print(r3) print('ok') def multiple_downloads(): x = input("What IDs? Separate with one space: ") for id in x.split(" "): course_download(id) def fetch_support_page(): u = "https://ilearn.gavilan.edu/courses/20850/pages/online-student-support-hub" course_num = 20850 page_url = "online-student-support-hub" t2 = f"{url}/api/v1/courses/{course_num}/pages/{page_url}" print('Getting: ' + t2) mypage = fetch(t2) print(json.dumps(mypage,indent=2)) print(mypage['body']) from courses import getCoursesInTerm def clear_old_page(shell_id,page_name): # get all pages t = f"{url}/api/v1/courses/{shell_id}/pages" pages = fetch(t) for page in pages: if page['title'] == page_name: print(f"found a page named {page_name}. Deleting it.") id = page['page_id'] t2 = f"{url}/api/v1/courses/{shell_id}/pages/{id}" r2 = requests.delete(t2, headers=header) print(f"{r2}") def add_support_page_full_semester(term=289): print("Fetching list of all active courses") # term = 184 # fa24 # 182 c = getCoursesInTerm(term,0,0) # sp25 = 287 wi24=182 #print(c) check = 'each' print("answer 'all' to do the rest without confirming") for C in c: if check == 'each': answer = input(f"Type 1 to add support page to {C['id']} ({C['name']}) ") if answer == '1': create_support_page(C['id']) else: if answer == 'all': check = 'all' create_support_page(C['id']) continue elif check == 'all': create_support_page(C['id']) def create_support_page(shell_id=18297): # 29): # clear one of same name first. clear_old_page(shell_id, "Online Student Support Hub") # make new one t3 = f"{url}/api/v1/courses/{shell_id}/pages/online-student-support-hub" new_content = codecs.open("cache/support_min.html","r","utf-8").read() title = "Online Student Support Hub" data = {'wiki_page[body]':new_content, 'wiki_page[title]':title, 'wiki_page[published]':"true"} r3 = requests.put(t3, headers=header, params=data) #print(r3.content) print('Page Created') try: response = r3.json() print(f"page id: {response['page_id']}") except Exception as e: print(f"Exception: {e}") # list modules # GET /api/v1/courses/:course_id/modules t4 = f"{url}/api/v1/courses/{shell_id}/modules" modules = fetch(t4) module_id = 0 # what if there are no modules? if len(modules) == 0: t6 = f"{url}/api/v1/courses/{shell_id}/modules/" mod_data = {'module[name]': 'Welcome', 'module[unlock_at]':"2024-01-01T06:00:00-08:00"} r6 = requests.post(t6, headers=header, params=mod_data) mod_response = r6.json() module_id = mod_response['id'] print(f"created module, id: {module_id}") # publish module t7 = f"{url}/api/v1/courses/{shell_id}/modules/{module_id}" mod_data2 = {'module[published]':'true'} r6 = requests.put(t7, headers=header, params=mod_data2) for M in modules: if M['position'] == 1: module_id = M['id'] print(f"found first module 1: ({module_id}) {M['name']}") #print(json.dumps(modules,indent=2)) # # create module item # POST /api/v1/courses/:course_id/modules/:module_id/items t5 = f"{url}/api/v1/courses/{shell_id}/modules/{module_id}/items" item_data = {'module_item[title]': title, 'module_item[type]': 'Page', 'module_item[page_url]': response['url'], 'module_item[position]':1} r5 = requests.post(t5, headers=header, params=item_data) print('ok') def list_modules_and_items(shell_id, verbose=0): modules = fetch(f"{url}/api/v1/courses/{shell_id}/modules?include[]=items&include[]=content_details") if verbose: print(json.dumps(modules,indent=2)) return modules def check_modules_for_old_orientation(): from util import contains_key_value, find_dict_with_key_value, extract_key_values checklist = [] for term in [286, 287]: # wi25, sp25 print("Fetching list of all active courses") #term = 287 # 184 # fa24 # 182 #term = 286 # wi25 c = getCoursesInTerm(term,0,0) # sp25 = 287 wi24=182 for C in c: print(f"{C['id']} - {C['name']}") m = list_modules_and_items(C['id']) if contains_key_value(m, 'name', 'Online Student Support Services - Summer & Fall 2024'): old_mod = find_dict_with_key_value(m,'name','Online Student Support Services - Summer & Fall 2024') print(" this course has the old module") checklist.append(f"{C['id']}") titles = extract_key_values(old_mod, 'title') [ print(f" {T}") for T in titles ] print(f"\nCheck these course ids:") for id in checklist: print(id) def repair_ezproxy_links(): from localcache2 import pages_in_term # get all pages in term all_pages = pages_in_term() # c.id, c.course_code, c.sis_source_id, wp.id as wp_id, wp.title, wp.url, c.name , wp.body for p in all_pages: course = p[1] title = p[4] url = p[5] body = p[7] # print(body) try: #s = re.search('''["']https:\/\/ezproxy\.gavilan\.edu\/login\?url=(.*)["']''',body) a = re.search(r'Online Library Services',title) if a: continue s = re.findall('\n.*ezproxy.*\n',body) if s: print(course, title, url) print(" ", s, "\n") # s.group()) except Exception as e: #print(f"Skipped: {title}, {e}") pass def download_web(): import argparse, os, re, time, hashlib, mimetypes, subprocess from collections import deque from urllib.parse import urlsplit, urlunsplit, urljoin import posixpath as ppath import requests from lxml import html SESSION = requests.Session() SESSION.headers.update({ "User-Agent": "MiniXPathCrawler/1.0 (+for personal archiving; contact admin if issues)" }) def normalize_path(path: str) -> str: np = ppath.normpath(path or "/") if not np.startswith("/"): np = "/" + np return np def base_dir_of(path: str) -> str: # Ensure trailing slash for folder comparison if not path or path.endswith("/"): bd = path or "/" else: bd = ppath.dirname(path) + "/" bd = normalize_path(bd) if not bd.endswith("/"): bd += "/" return bd def canonical_url(u: str, drop_query=True) -> str: sp = urlsplit(u) path = normalize_path(sp.path) if drop_query: sp = sp._replace(path=path, query="", fragment="") else: sp = sp._replace(path=path, fragment="") return urlunsplit(sp) def same_folder_or_below(start_url: str, link_url: str) -> bool: su = urlsplit(start_url); lu = urlsplit(link_url) if su.scheme != lu.scheme or su.netloc != lu.netloc: return False bd = base_dir_of(su.path) # e.g., "/a/b/" tp = normalize_path(lu.path) # e.g., "/a/b/page.html" return (tp == bd[:-1]) or tp.startswith(bd) def is_html_response(resp: requests.Response) -> bool: ctype = resp.headers.get("Content-Type", "") return "html" in ctype.lower() def fetch_html(url: str, timeout=20): try: r = SESSION.get(url, timeout=timeout, allow_redirects=True) except requests.RequestException: return None, None if r.status_code != 200 or not is_html_response(r): return None, None try: doc = html.fromstring(r.content) except Exception: return None, None # make links absolute for easier handling of images and hrefs doc.make_links_absolute(r.url) return r, doc def safe_filename_from_url(u: str, default_ext=".bin") -> str: # hash + best-effort extension h = hashlib.sha1(u.encode("utf-8")).hexdigest()[:16] ext = "" path = urlsplit(u).path if "." in path: ext = "." + path.split(".")[-1].split("?")[0].split("#")[0] if not re.match(r"^\.[A-Za-z0-9]{1,5}$", ext): ext = "" return h + (ext or default_ext) def download_image(img_url: str, assets_dir: str) -> str | None: try: r = SESSION.get(img_url, timeout=20, stream=True) except requests.RequestException: return None if r.status_code != 200: return None # extension: prefer from Content-Type ext = None ctype = r.headers.get("Content-Type", "") if "/" in ctype: ext_guess = mimetypes.guess_extension(ctype.split(";")[0].strip()) if ext_guess: ext = ext_guess fname = safe_filename_from_url(img_url, default_ext=ext or ".img") os.makedirs(assets_dir, exist_ok=True) fpath = os.path.join(assets_dir, fname) try: with open(fpath, "wb") as f: for chunk in r.iter_content(65536): if chunk: f.write(chunk) except Exception: return None return fpath def html_fragment_from_xpath(doc, xpath_expr: str, assets_dir: str): nodes = doc.xpath(xpath_expr) if not nodes: return None, None # (html_fragment, title) # Remove