From a01ef8084d0b923b1da3c6177a3d851d9d923b5e Mon Sep 17 00:00:00 2001 From: Peter Howell Date: Tue, 7 Oct 2025 19:08:00 +0000 Subject: [PATCH] updates --- content.py | 473 ++++++++++++++++++++++++++++++++++++++------------- courses.py | 157 ++++++++++++++--- schedules.py | 2 +- tasks.py | 23 +-- 4 files changed, 502 insertions(+), 153 deletions(-) diff --git a/content.py b/content.py index 91eebb6..a2587f0 100644 --- a/content.py +++ b/content.py @@ -6,12 +6,14 @@ from __future__ import annotations #import html2markdown as h2m from typing import ItemsView -import requests, codecs, os, re, json, sys, pypandoc +import requests, codecs, os, re, json, sys, pypandoc, mimetypes, hashlib from checker import safe_html from pipelines import header, fetch, url from util import clean_title, to_file_friendly +from urllib.parse import quote, urljoin, urlparse from bs4 import BeautifulSoup as bs from html.parser import HTMLParser +from datetime import datetime, timezone pagebreak = '\n\n\n\n
\n\n' @@ -42,11 +44,10 @@ def test_forums(id=0): for m in modules: items[running_index] = '

%s

%s\n' % ( m['name'], pagebreak ) running_index += 1 - mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose) - + for I in mod_items: - + if I['type'] in ['SubHeader', 'Page', 'Quiz', 'Discussion', 'ExternalUrl' ] or 'content_id' in I: running_index += 1 @@ -99,7 +100,7 @@ def write_message(fd, view, participants): write_message(fd, r, participants) fd.write("\n") -def extract_forums(id, course_folder, item_id_to_index, verbose=0): +def extract_forums(id, course_folder, item_id_to_index, verbose=0, discussion_link_map=None): ### ### FORUMS ### @@ -109,7 +110,6 @@ def extract_forums(id, course_folder, item_id_to_index, verbose=0): index = [] forum_f = course_folder + '/forums' headered = 0 - image_count = 0 print("\nFORUMS") try: os.mkdir(forum_f) @@ -133,6 +133,8 @@ def extract_forums(id, course_folder, item_id_to_index, verbose=0): fd.write(message + "\n\n") for v in t2['view']: write_message(fd, v, participants) + if discussion_link_map is not None: + discussion_link_map[p['id']] = f"forums/{easier_filename}.html" if not headered: index.append( ('
Discussion Forums
') ) headered = 1 index.append( ( 'forums/' + easier_filename + '.html', p['title'] ) ) @@ -196,12 +198,18 @@ def course_download(id=""): for x in range(9000): items.append(0) video_link_list = [] + page_local_map = {} + assignment_local_map = {} + file_local_map = {} + discussion_local_map = {} + module_details = [] for m in modules: items[running_index] = '

%s

%s\n' % ( m['name'], pagebreak ) running_index += 1 - + mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose) + module_entry = {'name': m['name'], 'items': []} for I in mod_items: @@ -214,13 +222,13 @@ def course_download(id=""): if I['type'] == 'Page': item_id_to_index[ I['page_url'] ] = running_index - + if I['type'] == 'Quiz': item_id_to_index[ I['content_id'] ] = running_index - + if I['type'] == 'Discussion': item_id_to_index[ I['content_id'] ] = running_index - + if I['type'] == 'ExternalUrl': items[running_index] = "%s
\n\n" % (I['external_url'], I['title']) @@ -238,6 +246,17 @@ def course_download(id=""): # I['page_url'] # I['type'] # I['published'] + module_entry['items'].append({ + 'type': I.get('type'), + 'title': I.get('title'), + 'page_url': I.get('page_url'), + 'content_id': I.get('content_id'), + 'html_url': I.get('html_url'), + 'url': I.get('url'), + 'external_url': I.get('external_url'), + 'id': I.get('id') + }) + module_details.append(module_entry) # assignments and files have content_id, pages have page_url course_folder = '../course_temps/course_'+id @@ -281,14 +300,95 @@ def course_download(id=""): if not headered: index.append( ('
Files
') ) headered = 1 - index.append( ('files/' + f['filename'], f['filename']) ) + relative_path = 'files/' + f['filename'] + index.append( (relative_path, f['filename']) ) + file_local_map[f['id']] = relative_path ### ### PAGES ### pages_f = course_folder + '/pages' headered = 0 - image_count = 0 + images_f = os.path.join(pages_f, 'images') + try: + os.makedirs(images_f) + except FileExistsError: + pass + except Exception as e: + print(f" * Unable to ensure images folder: {e}") + + image_map = {} + image_counter = 0 + + def ensure_local_image(src, canvas_override=None): + nonlocal image_counter + if not src: + return (None, None) + original_src = src + if src.startswith('data:'): + return (None, None) + if src.startswith('images/'): + full_rel = f"pages/{src}" + image_map.setdefault(original_src, (src, full_rel)) + return image_map[original_src], canvas_override + if src.startswith('pages/'): + page_rel = src.split('pages/', 1)[-1] + page_rel = page_rel if page_rel else src + full_rel = src + image_map.setdefault(original_src, (page_rel, full_rel)) + return image_map[original_src], canvas_override + + mapped = image_map.get(original_src) + if mapped: + return mapped, canvas_override or original_src + + absolute_src = src + if not absolute_src.lower().startswith('http'): + absolute_src = urljoin(url, absolute_src) + + mapped = image_map.get(absolute_src) + if mapped: + image_map[original_src] = mapped + return mapped, canvas_override or absolute_src + + try: + response = requests.get(absolute_src, headers=header, stream=True, timeout=30) + response.raise_for_status() + except Exception as e: + d(f" * error downloading image {absolute_src}: {e}") + return (None, canvas_override or absolute_src) + + content_type = response.headers.get('content-type', '').split(';')[0] + ext = '' + if content_type: + guessed = mimetypes.guess_extension(content_type) + if guessed: + ext = guessed + if not ext: + ext = os.path.splitext(urlparse(absolute_src).path)[1] + if not ext: + ext = '.bin' + ext = ext.lstrip('.') + + local_name = f"img_{image_counter}.{ext}" + image_counter += 1 + local_path = os.path.join(images_f, local_name) + + try: + with open(local_path, 'wb') as fd: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + fd.write(chunk) + except Exception as e: + d(f" * error saving image {absolute_src}: {e}") + return (None, canvas_override or absolute_src) + + page_rel = f"images/{local_name}" + full_rel = f"pages/{page_rel}" + image_map[original_src] = (page_rel, full_rel) + if absolute_src != original_src: + image_map[absolute_src] = image_map[original_src] + return image_map[original_src], canvas_override or absolute_src print("\nPAGES") try: os.mkdir(pages_f) @@ -296,6 +396,12 @@ def course_download(id=""): print(" * Pages folder already exists.") + page_manifest = { + 'course_id': str(id), + 'generated_at': datetime.now(timezone.utc).isoformat(), + 'pages': {} + } + pages = fetch('/api/v1/courses/' + str(id) + '/pages', verbose) for p in pages: d(' - %s' % p['title']) @@ -312,119 +418,111 @@ def course_download(id=""): index.append( ( 'pages/' + easier_filename + '.html', p['title'] ) ) - if os.path.exists(this_page_filename): + t2 = {'title': p['title']} + soup_infolder = None + soup_in_main = None + page_local_map[p['url']] = f"pages/{easier_filename}.html" + this_page_content = None + + fetched_page = fetch('/api/v1/courses/' + str(id) + '/pages/'+p['url'], verbose) + if fetched_page and fetched_page.get('body'): + t2 = fetched_page + soup_infolder = bs(t2['body'], features="lxml") + soup_in_main = bs(t2['body'], features="lxml") + elif os.path.exists(this_page_filename): d(" - already downloaded %s" % this_page_filename) this_page_content = codecs.open(this_page_filename,'r','utf-8').read() - #elif re.search(r'eis-prod',p['url']) or re.search(r'gavilan\.ins',p['url']): - #elif re.search(r'eis-prod',p['url']): - # d(' * skipping file behind passwords') - else: - t2 = fetch('/api/v1/courses/' + str(id) + '/pages/'+p['url'], verbose) - if t2 and 'body' in t2 and t2['body']: - soup_infolder = bs(t2['body'],features="lxml") - soup_in_main = bs(t2['body'],features="lxml") - a_links = soup_infolder.find_all('a') - for A in a_links: - href = A.get('href') + soup_infolder = bs(this_page_content, features="lxml") + soup_in_main = bs(this_page_content, features="lxml") + else: + d(' * nothing returned or bad fetch') + continue - if href and re.search( r'youtu',href): - video_link_list.append( (A.get('href'), A.text, 'pages/'+easier_filename + ".html") ) - - # Images - page_images = soup_infolder.find_all('img') - page_image_paths = {} - for I in page_images: - src = I.get('src') - if src: - d(' - %s' % src) - try: - r = requests.get(src,headers=header, stream=True) - mytype = r.headers['content-type'] - #print("Response is type: " + str(mytype)) - r_parts = mytype.split("/") - ending = r_parts[-1] + page_title = (t2.get('title') or p['title']).strip() if isinstance(t2, dict) else p['title'] - if ending=='jpeg': ending = "jpg" + def strip_leading_heading(soup): + if not soup: + return + first_heading = soup.find(['h1', 'h2']) + if first_heading and first_heading.get_text(strip=True) == page_title: + first_heading.decompose() - img_full_path = f"{pages_f}/{str(image_count)}.{ending}" - local_src = f"{str(image_count)}.{ending}" - page_image_paths[src] = f"pages/{local_src}" - I['src'] = local_src + strip_leading_heading(soup_infolder) + strip_leading_heading(soup_in_main) - with open(img_full_path, 'wb') as fd: - for chunk in r.iter_content(chunk_size=128): - fd.write(chunk) - image_count += 1 - except Exception as e: - d( ' * Error downloading page image, %s' % str(e) ) - - # Repeat for version for main file - page_main_images = soup_in_main.find_all('img') - for I in page_main_images: - src = I.get('src') - if src: - I['src'] = page_image_paths[src] + a_links = soup_infolder.find_all('a') + for A in a_links: + href = A.get('href') + if href and re.search(r'youtu', href): + video_link_list.append((A.get('href'), A.text, 'pages/' + easier_filename + ".html")) + # Images -> ensure local copies + for img in soup_infolder.find_all('img'): + mapping, canvas_src = ensure_local_image(img.get('src'), img.get('data-canvas-src')) + if mapping: + img['src'] = mapping[0] + if canvas_src: + img['data-canvas-src'] = canvas_src - # STUDIO VIDEOS - # Regex pattern to match "custom_arc_media_id%3D" and capture everything - # until the next '&' or end of string - pattern = r"custom_arc_media_id%3D([^&]+)" - found_ids = [] + for img in soup_in_main.find_all('img'): + mapping, canvas_src = ensure_local_image(img.get('src'), img.get('data-canvas-src')) + if mapping: + img['src'] = mapping[1] + if canvas_src: + img['data-canvas-src'] = canvas_src - replacement_tag = '''''' - - # Iterate over all