#saved_titles = json.loads( codecs.open('cache/saved_youtube_titles.json','r','utf-8').read() ) #from calendar import FRIDAY #import html2markdown as h2m from typing import ItemsView import requests, codecs, os, re, json, sys, pypandoc from checker import safe_html from pipelines import header, fetch, url from util import clean_title, to_file_friendly from bs4 import BeautifulSoup as bs from html.parser import HTMLParser pagebreak = '\n\n\n\n
\n\n' DBG = 1 items = [] def d(s): global DBG if DBG: print(s) def test_forums(id=0): if not id: id = input("ID of course to check? ") verbose = 1 courseinfo = fetch('/api/v1/courses/' + str(id), verbose ) item_id_to_index = {} items_inorder = ["" + courseinfo['name'] + "\n\n" + pagebreak,] running_index = 1 modules = fetch('/api/v1/courses/' + str(id) + '/modules',verbose) items = [] for x in range(9000): items.append(0) for m in modules: items[running_index] = '

%s

%s\n' % ( m['name'], pagebreak ) running_index += 1 mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose) for I in mod_items: if I['type'] in ['SubHeader', 'Page', 'Quiz', 'Discussion', 'ExternalUrl' ] or 'content_id' in I: running_index += 1 if I['type'] == 'SubHeader': #print('subheader: ' + str(I)) items[running_index] = '

%s

\n' % str(json.dumps(I,indent=2)) if I['type'] == 'Page': item_id_to_index[ I['page_url'] ] = running_index if I['type'] == 'Quiz': item_id_to_index[ I['content_id'] ] = running_index if I['type'] == 'Discussion': item_id_to_index[ I['content_id'] ] = running_index if I['type'] == 'ExternalUrl': items[running_index] = "%s
\n\n" % (I['external_url'], I['title']) # ? #if 'content_id' in I: # item_id_to_index[ I['content_id'] ] = running_index else: print("What is this item? " + str(I)) #items_inorder.append('Not included: '+ I['title'] + '(a ' + I['type'] + ')\n\n\n' ) # I['title'] # I['content_id'] # I['page_url'] # I['type'] # I['published'] # assignments and files have content_id, pages have page_url course_folder = '../course_temps/course_'+id index = [] try: os.mkdir(course_folder) except: print("Course folder exists.") index.extend( extract_forums(id, course_folder, item_id_to_index, verbose) ) print(json.dumps(index,indent=2)) def write_message(fd, view, participants): fd.write(f"
\nfrom {participants[view['user_id']]['display_name']}:
\n{view['message']}\n
") if 'replies' in view: for r in view['replies']: write_message(fd, r, participants) fd.write("
\n") def extract_forums(id, course_folder, item_id_to_index, verbose=0): ### ### FORUMS ### global items index = [] forum_f = course_folder + '/forums' headered = 0 image_count = 0 print("\nFORUMS") try: os.mkdir(forum_f) forums = fetch('/api/v1/courses/' + str(id) + '/discussion_topics', verbose) for p in forums: p['title'] = clean_title(p['title']) forum_id = p['id'] easier_filename = p['title'] for a in 'title,posted_at,published'.split(','): print(str(p[a]), "\t", end=' ') print("") t2 = fetch(f"/api/v1/courses/{id}/discussion_topics/{forum_id}", verbose) title = t2['title'] message = t2['message'] t2 = fetch(f"/api/v1/courses/{id}/discussion_topics/{forum_id}/view", verbose) try: participants = {x['id']:x for x in t2['participants']} with codecs.open(forum_f + '/' + easier_filename + '.html', 'w','utf-8') as fd: fd.write(f"

{title}

\n") fd.write(message + "\n\n") for v in t2['view']: write_message(fd, v, participants) if not headered: index.append( ('
Discussion Forums
') ) headered = 1 index.append( ( 'forums/' + easier_filename + '.html', p['title'] ) ) # write to running log of content in order of module if p['id'] in item_id_to_index: items[ item_id_to_index[ p['id'] ] ] = f"

{title}

\n\n{message}\n\n{pagebreak}" else: print(' This forum didnt seem to be in the modules list.') except Exception as e: print("Error here:", e) #print p #print results_dict except Exception as e: print("** Forum folder seems to exist. Skipping those.") print(e) return index # # # # # # todo: include front page. # todo: clean html # todo: toc # # # Download everything interesting in a course to a local folder # Build a master file with the entire class content def course_download(id=""): global items if not id: id = input("ID of course to check? ") # temp hard code #id = "21284" verbose = 0 PAGES_ONLY = 0 videos_log = codecs.open('cache/accessible_check_log.txt','w','utf-8') save_file_types = ['application/pdf','application/docx','image/jpg','image/png','image/gif','image/webp','application/vnd.openxmlformats-officedocument.wordprocessingml.document'] courseinfo = fetch('/api/v1/courses/' + str(id), verbose ) # reverse lookup into items array item_id_to_index = {} modules = fetch('/api/v1/courses/' + str(id) + '/modules',verbose) # headers / module names items = [f"

{courseinfo['name']}

\n{pagebreak}",] running_index = 1 for x in range(9000): items.append(0) video_link_list = [] for m in modules: items[running_index] = '

%s

%s\n' % ( m['name'], pagebreak ) running_index += 1 mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose) for I in mod_items: if I['type'] in ['SubHeader', 'Page', 'Quiz', 'Discussion', 'ExternalUrl' ] or 'content_id' in I: running_index += 1 if I['type'] == 'SubHeader': #print('subheader: ' + str(I)) items[running_index] = f"

{I['title']}

\n" if I['type'] == 'Page': item_id_to_index[ I['page_url'] ] = running_index if I['type'] == 'Quiz': item_id_to_index[ I['content_id'] ] = running_index if I['type'] == 'Discussion': item_id_to_index[ I['content_id'] ] = running_index if I['type'] == 'ExternalUrl': items[running_index] = "%s
\n\n" % (I['external_url'], I['title']) # ? #if 'content_id' in I: # item_id_to_index[ I['content_id'] ] = running_index else: print("What is this item? " + str(I)) #items_inorder.append('Not included: '+ I['title'] + '(a ' + I['type'] + ')\n\n\n' ) # I['title'] # I['content_id'] # I['page_url'] # I['type'] # I['published'] # assignments and files have content_id, pages have page_url course_folder = '../course_temps/course_'+id # list of each item, organized by item type. Tuples of (url,title) index = [] try: os.mkdir(course_folder) except: print("Course folder exists.") ### ### FILES ### if not PAGES_ONLY: files_f = course_folder + '/files' headered = 0 print("\nFILES") try: os.mkdir(files_f) except: print(" * Files folder already exists.") files = fetch('/api/v1/courses/' + str(id) + '/files', verbose) print("LISTING COURSE FILES") for f in files: for arg in 'filename,content-type,size,url'.split(','): if arg=='size': f['size'] = str(int(f['size']) / 1000) + 'k' if f['content-type'] in save_file_types: d(' - %s' % f['filename']) if not os.path.exists(files_f + '/' + f['filename']): r = requests.get(f['url'],headers=header, stream=True) with open(files_f + '/' + f['filename'], 'wb') as fd: for chunk in r.iter_content(chunk_size=128): fd.write(chunk) else: d(" - already downloaded %s" % files_f + '/' + f['filename']) if not headered: index.append( ('
Files
') ) headered = 1 index.append( ('files/' + f['filename'], f['filename']) ) ### ### PAGES ### pages_f = course_folder + '/pages' headered = 0 image_count = 0 print("\nPAGES") try: os.mkdir(pages_f) except: print(" * Pages folder already exists.") pages = fetch('/api/v1/courses/' + str(id) + '/pages', verbose) for p in pages: d(' - %s' % p['title']) p['title'] = clean_title(p['title']) easier_filename = clean_title(p['url']) this_page_filename = "%s/%s.html" % (pages_f, easier_filename) #for a in 'title,updated_at,published'.split(','): # print(str(p[a]), "\t", end=' ') if not headered: index.append( ('
Pages
') ) headered = 1 index.append( ( 'pages/' + easier_filename + '.html', p['title'] ) ) if os.path.exists(this_page_filename): d(" - already downloaded %s" % this_page_filename) this_page_content = codecs.open(this_page_filename,'r','utf-8').read() #elif re.search(r'eis-prod',p['url']) or re.search(r'gavilan\.ins',p['url']): #elif re.search(r'eis-prod',p['url']): # d(' * skipping file behind passwords') else: t2 = fetch('/api/v1/courses/' + str(id) + '/pages/'+p['url'], verbose) if t2 and 'body' in t2 and t2['body']: soup_infolder = bs(t2['body'],features="lxml") soup_in_main = bs(t2['body'],features="lxml") a_links = soup_infolder.find_all('a') for A in a_links: href = A.get('href') if href and re.search( r'youtu',href): video_link_list.append( (A.get('href'), A.text, 'pages/'+easier_filename + ".html") ) # Images page_images = soup_infolder.find_all('img') page_image_paths = {} for I in page_images: src = I.get('src') if src: d(' - %s' % src) try: r = requests.get(src,headers=header, stream=True) mytype = r.headers['content-type'] #print("Response is type: " + str(mytype)) r_parts = mytype.split("/") ending = r_parts[-1] if ending=='jpeg': ending = "jpg" img_full_path = f"{pages_f}/{str(image_count)}.{ending}" local_src = f"{str(image_count)}.{ending}" page_image_paths[src] = f"pages/{local_src}" I['src'] = local_src with open(img_full_path, 'wb') as fd: for chunk in r.iter_content(chunk_size=128): fd.write(chunk) image_count += 1 except Exception as e: d( ' * Error downloading page image, %s' % str(e) ) # Repeat for version for main file page_main_images = soup_in_main.find_all('img') for I in page_main_images: src = I.get('src') if src: I['src'] = page_image_paths[src] # STUDIO VIDEOS # Regex pattern to match "custom_arc_media_id%3D" and capture everything # until the next '&' or end of string pattern = r"custom_arc_media_id%3D([^&]+)" found_ids = [] replacement_tag = '''''' # Iterate over all