1647 lines
60 KiB
Python
1647 lines
60 KiB
Python
|
|
|
|
#saved_titles = json.loads( codecs.open('cache/saved_youtube_titles.json','r','utf-8').read() )
|
|
from calendar import FRIDAY
|
|
import requests, codecs, os, re, json, sys, pypandoc
|
|
import webbrowser, bs4, trafilatura, pickle, tomd, checker
|
|
import html2markdown as h2m
|
|
from pipelines import header, fetch, url, put_file
|
|
from util import clean_title, to_file_friendly, minimal_string, stripper, mycleaner
|
|
from bs4 import BeautifulSoup as bs
|
|
from html.parser import HTMLParser
|
|
from collections import defaultdict
|
|
from pdfminer.high_level import extract_text
|
|
from sentence_transformers import SentenceTransformer, util
|
|
|
|
h = HTMLParser()
|
|
|
|
pagebreak = '\n\n<!-- BREAK -->\n\n'
|
|
DBG = 1
|
|
|
|
def d(s):
|
|
global DBG
|
|
if DBG: print(s)
|
|
|
|
def test_forums(id=0):
|
|
if not id:
|
|
id = input("ID of course to check? ")
|
|
verbose = 1
|
|
|
|
courseinfo = fetch('/api/v1/courses/' + str(id), verbose )
|
|
|
|
item_id_to_index = {}
|
|
items_inorder = ["<font size='24'>" + courseinfo['name'] + "</font>\n\n" + pagebreak,]
|
|
running_index = 1
|
|
|
|
modules = fetch('/api/v1/courses/' + str(id) + '/modules',verbose)
|
|
|
|
items = []
|
|
for x in range(9000): items.append(0)
|
|
|
|
for m in modules:
|
|
items[running_index] = '<h2>%s</h2>%s\n' % ( m['name'], pagebreak )
|
|
running_index += 1
|
|
|
|
mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose)
|
|
|
|
for I in mod_items:
|
|
|
|
if I['type'] in ['SubHeader', 'Page', 'Quiz', 'Discussion', 'ExternalUrl' ] or 'content_id' in I:
|
|
running_index += 1
|
|
|
|
if I['type'] == 'SubHeader':
|
|
#print('subheader: ' + str(I))
|
|
items[running_index] = '<h3>%s</h3>\n' % str(json.dumps(I,indent=2))
|
|
|
|
if I['type'] == 'Page':
|
|
item_id_to_index[ I['page_url'] ] = running_index
|
|
|
|
if I['type'] == 'Quiz':
|
|
item_id_to_index[ I['content_id'] ] = running_index
|
|
|
|
if I['type'] == 'Discussion':
|
|
item_id_to_index[ I['content_id'] ] = running_index
|
|
|
|
if I['type'] == 'ExternalUrl':
|
|
items[running_index] = "<a href='%s'>%s</a><br />\n\n" % (I['external_url'], I['title'])
|
|
|
|
# ?
|
|
#if 'content_id' in I:
|
|
# item_id_to_index[ I['content_id'] ] = running_index
|
|
else:
|
|
print("What is this item? " + str(I))
|
|
|
|
|
|
#items_inorder.append('<i>Not included: '+ I['title'] + '(a ' + I['type'] + ')</i>\n\n\n' )
|
|
|
|
# I['title']
|
|
# I['content_id']
|
|
# I['page_url']
|
|
# I['type']
|
|
# I['published']
|
|
# assignments and files have content_id, pages have page_url
|
|
|
|
course_folder = '../course_temps/course_'+id
|
|
index = []
|
|
try:
|
|
os.mkdir(course_folder)
|
|
except:
|
|
print("Course folder exists.")
|
|
|
|
index.extend( extract_forums(id, course_folder, items_inorder, item_id_to_index, verbose) )
|
|
print(json.dumps(index,indent=2))
|
|
|
|
def write_message(fd, view, participants):
|
|
fd.write(f"<blockquote>\nfrom <b>{participants[view['user_id']]['display_name']}</b>:<br />\n{view['message']}\n<br />")
|
|
if 'replies' in view:
|
|
for r in view['replies']:
|
|
write_message(fd, r, participants)
|
|
fd.write("</blockquote>\n")
|
|
|
|
def extract_forums(id, course_folder, items_inorder, item_id_to_index, verbose=0):
|
|
###
|
|
### FORUMS
|
|
###
|
|
index = []
|
|
forum_f = course_folder + '/forums'
|
|
headered = 0
|
|
image_count = 0
|
|
print("\nFORUMS")
|
|
try:
|
|
os.mkdir(forum_f)
|
|
forums = fetch('/api/v1/courses/' + str(id) + '/discussion_topics', verbose)
|
|
for p in forums:
|
|
p['title'] = clean_title(p['title'])
|
|
forum_id = p['id']
|
|
easier_filename = p['title']
|
|
for a in 'title,posted_at,published'.split(','):
|
|
print(str(p[a]), "\t", end=' ')
|
|
print("")
|
|
t2 = fetch(f"/api/v1/courses/{id}/discussion_topics/{forum_id}", verbose)
|
|
title = t2['title']
|
|
message = t2['message']
|
|
|
|
t2 = fetch(f"/api/v1/courses/{id}/discussion_topics/{forum_id}/view", verbose)
|
|
try:
|
|
participants = {x['id']:x for x in t2['participants']}
|
|
with codecs.open(forum_f + '/' + easier_filename + '.html', 'w','utf-8') as fd:
|
|
fd.write(f"<h1>{title}</h1>\n")
|
|
fd.write(message + "\n\n")
|
|
for v in t2['view']:
|
|
write_message(fd, v, participants)
|
|
if not headered: index.append( ('<br /><b>Discussion Forums</b><br />') )
|
|
headered = 1
|
|
index.append( ( 'forums/' + easier_filename + '.html', p['title'] ) )
|
|
|
|
|
|
# write to running log of content in order of module
|
|
if p['id'] in item_id_to_index:
|
|
items_inorder[ item_id_to_index[ p['id'] ] ] = f"<h1>{title}</h1>\n\n{message}\n\n{pagebreak}"
|
|
else:
|
|
print(' This forum didnt seem to be in the modules list.')
|
|
except Exception as e:
|
|
print("Error here:", e)
|
|
#print p
|
|
#print results_dict
|
|
except Exception as e:
|
|
print("** Forum folder seems to exist. Skipping those.")
|
|
print(e)
|
|
|
|
return index
|
|
|
|
|
|
|
|
|
|
# Download everything interesting in a course to a local folder
|
|
# Build a master file with the entire class content
|
|
def accessible_check(id=""):
|
|
if not id:
|
|
id = input("ID of course to check? ")
|
|
verbose = 1
|
|
PAGES_ONLY = 1
|
|
|
|
save_file_types = ['application/pdf','application/docx','image/jpg','image/png','image/gif','image/webp','application/vnd.openxmlformats-officedocument.wordprocessingml.document']
|
|
|
|
courseinfo = fetch('/api/v1/courses/' + str(id), verbose )
|
|
|
|
# reverse lookup into items array
|
|
item_id_to_index = {}
|
|
|
|
# is it used?
|
|
items_inorder = ["<font size='24'>" + courseinfo['name'] + "</font>\n\n" + pagebreak,]
|
|
running_index = 1
|
|
|
|
modules = fetch('/api/v1/courses/' + str(id) + '/modules',verbose)
|
|
|
|
# headers / module names
|
|
items = []
|
|
for x in range(9000): items.append(0)
|
|
|
|
video_link_list = []
|
|
|
|
for m in modules:
|
|
items[running_index] = '<h2>%s</h2>%s\n' % ( m['name'], pagebreak )
|
|
running_index += 1
|
|
|
|
mod_items = fetch('/api/v1/courses/' + str(id) + '/modules/'+str(m['id'])+'/items', verbose)
|
|
|
|
for I in mod_items:
|
|
|
|
if I['type'] in ['SubHeader', 'Page', 'Quiz', 'Discussion', 'ExternalUrl' ] or 'content_id' in I:
|
|
running_index += 1
|
|
|
|
if I['type'] == 'SubHeader':
|
|
#print('subheader: ' + str(I))
|
|
items[running_index] = '<h3>%s</h3>\n' % str(json.dumps(I,indent=2))
|
|
|
|
if I['type'] == 'Page':
|
|
item_id_to_index[ I['page_url'] ] = running_index
|
|
|
|
if I['type'] == 'Quiz':
|
|
item_id_to_index[ I['content_id'] ] = running_index
|
|
|
|
if I['type'] == 'Discussion':
|
|
item_id_to_index[ I['content_id'] ] = running_index
|
|
|
|
if I['type'] == 'ExternalUrl':
|
|
items[running_index] = "<a href='%s'>%s</a><br />\n\n" % (I['external_url'], I['title'])
|
|
|
|
# ?
|
|
#if 'content_id' in I:
|
|
# item_id_to_index[ I['content_id'] ] = running_index
|
|
else:
|
|
print("What is this item? " + str(I))
|
|
|
|
|
|
#items_inorder.append('<i>Not included: '+ I['title'] + '(a ' + I['type'] + ')</i>\n\n\n' )
|
|
|
|
# I['title']
|
|
# I['content_id']
|
|
# I['page_url']
|
|
# I['type']
|
|
# I['published']
|
|
# assignments and files have content_id, pages have page_url
|
|
|
|
course_folder = '../course_temps/course_'+id
|
|
|
|
# list of each item, organized by item type. Tuples of (url,title)
|
|
index = []
|
|
try:
|
|
os.mkdir(course_folder)
|
|
except:
|
|
print("Course folder exists.")
|
|
###
|
|
### FILES
|
|
###
|
|
if not PAGES_ONLY:
|
|
files_f = course_folder + '/files'
|
|
headered = 0
|
|
print("\nFILES")
|
|
try:
|
|
os.mkdir(files_f)
|
|
except:
|
|
print(" * Files folder already exists.")
|
|
|
|
files = fetch('/api/v1/courses/' + str(id) + '/files', verbose)
|
|
print("LISTING COURSE FILES")
|
|
for f in files:
|
|
for arg in 'filename,content-type,size,url'.split(','):
|
|
if arg=='size':
|
|
f['size'] = str(int(f['size']) / 1000) + 'k'
|
|
|
|
if f['content-type'] in save_file_types:
|
|
d(' - %s' % f['filename'])
|
|
|
|
if not os.path.exists(files_f + '/' + f['filename']):
|
|
r = requests.get(f['url'],headers=header, stream=True)
|
|
with open(files_f + '/' + f['filename'], 'wb') as fd:
|
|
for chunk in r.iter_content(chunk_size=128):
|
|
fd.write(chunk)
|
|
else:
|
|
d(" - already downloaded %s" % files_f + '/' + f['filename'])
|
|
|
|
if not headered:
|
|
index.append( ('<br /><b>Files</b><br />') )
|
|
headered = 1
|
|
index.append( ('files/' + f['filename'], f['filename']) )
|
|
|
|
###
|
|
### PAGES
|
|
###
|
|
pages_f = course_folder + '/pages'
|
|
headered = 0
|
|
image_count = 0
|
|
print("\nPAGES")
|
|
try:
|
|
os.mkdir(pages_f)
|
|
except:
|
|
print(" * Pages folder already exists.")
|
|
|
|
|
|
pages = fetch('/api/v1/courses/' + str(id) + '/pages', verbose)
|
|
for p in pages:
|
|
d(' - %s' % p['title'])
|
|
|
|
p['title'] = clean_title(p['title'])
|
|
easier_filename = clean_title(p['url'])
|
|
this_page_filename = "%s/%s.html" % (pages_f, easier_filename)
|
|
#for a in 'title,updated_at,published'.split(','):
|
|
# print(str(p[a]), "\t", end=' ')
|
|
|
|
if not headered:
|
|
index.append( ('<br /><b>Pages</b><br />') )
|
|
headered = 1
|
|
index.append( ( 'pages/' + easier_filename + '.html', p['title'] ) )
|
|
|
|
|
|
if os.path.exists(this_page_filename):
|
|
d(" - already downloaded %s" % this_page_filename)
|
|
this_page_content = codecs.open(this_page_filename,'r','utf-8').read()
|
|
#elif re.search(r'eis-prod',p['url']) or re.search(r'gavilan\.ins',p['url']):
|
|
#elif re.search(r'eis-prod',p['url']):
|
|
# d(' * skipping file behind passwords')
|
|
else:
|
|
t2 = fetch('/api/v1/courses/' + str(id) + '/pages/'+p['url'], verbose)
|
|
if t2 and 'body' in t2 and t2['body']:
|
|
bb = bs(t2['body'],features="lxml")
|
|
a_links = bb.find_all('a')
|
|
for A in a_links:
|
|
href = A.get('href')
|
|
|
|
if href and re.search( r'youtu',href):
|
|
video_link_list.append( (A.get('href'), A.text, 'pages/'+easier_filename + ".html") )
|
|
|
|
|
|
page_images = bb.find_all('img')
|
|
for I in page_images:
|
|
src = I.get('src')
|
|
if src:
|
|
d(' - %s' % src)
|
|
#if re.search(r'eis-prod', src) or re.search(r'gavilan\.ins', src):
|
|
# d(' * skipping file behind passwords')
|
|
#else:
|
|
try:
|
|
r = requests.get(src,headers=header, stream=True)
|
|
mytype = r.headers['content-type']
|
|
#print("Response is type: " + str(mytype))
|
|
r_parts = mytype.split("/")
|
|
ending = r_parts[-1]
|
|
|
|
with open(pages_f + '/' + str(image_count) + "." + ending, 'wb') as fd:
|
|
for chunk in r.iter_content(chunk_size=128):
|
|
fd.write(chunk)
|
|
image_count += 1
|
|
except Exception as e:
|
|
d( ' * Error downloading page image, %s' % str(e) )
|
|
|
|
try:
|
|
with codecs.open(this_page_filename, 'w','utf-8') as fd:
|
|
this_page_content = "<h2>%s</h2>\n%s" % ( t2['title'], t2['body'] )
|
|
fd.write(this_page_content)
|
|
except:
|
|
d(' * problem writing page content')
|
|
## TODO include linked pages even if they aren't in module
|
|
else:
|
|
d(' * nothing returned or bad fetch')
|
|
# write to running log of content in order of module
|
|
if p and p['url'] in item_id_to_index:
|
|
items[ item_id_to_index[ p['url'] ] ] = this_page_content +'\n\n'+pagebreak
|
|
else:
|
|
d(' -- This page didnt seem to be in the modules list.')
|
|
|
|
|
|
###
|
|
### ASSIGNMENTS
|
|
###
|
|
|
|
if not PAGES_ONLY:
|
|
headered = 0
|
|
asm_f = course_folder + '/assignments'
|
|
print("\nASSIGNMENTS")
|
|
try:
|
|
os.mkdir(asm_f)
|
|
except:
|
|
d(" - Assignments dir exists")
|
|
|
|
asm = fetch('/api/v1/courses/' + str(id) + '/assignments', verbose)
|
|
for p in asm:
|
|
d(' - %s' % p['name'])
|
|
|
|
|
|
try:
|
|
friendlyfile = to_file_friendly(p['name'])
|
|
this_assmt_filename = asm_f + '/' + str(p['id'])+"_"+ friendlyfile + '.html'
|
|
if os.path.exists(this_assmt_filename):
|
|
d(" - already downloaded %s" % this_assmt_filename)
|
|
this_assmt_content = open(this_assmt_filename,'r').read()
|
|
else:
|
|
t2 = fetch('/api/v1/courses/' + str(id) + '/assignments/'+str(p['id']), verbose)
|
|
with codecs.open(this_assmt_filename, 'w','utf-8') as fd:
|
|
this_assmt_content = "<h2>%s</h2>\n%s\n\n" % (t2['name'], t2['description'])
|
|
fd.write(this_assmt_content)
|
|
if not headered:
|
|
index.append( ('<br /><b>Assignments</b><br />') )
|
|
headered = 1
|
|
index.append( ('assignments/' + str(p['id'])+"_"+friendlyfile + '.html', p['name']) )
|
|
|
|
# write to running log of content in order of module
|
|
if p['id'] in item_id_to_index:
|
|
items[ item_id_to_index[ p['url'] ] ] = this_assmt_content+'\n\n'+pagebreak
|
|
except Exception as e:
|
|
d(' * Problem %s' % str(e))
|
|
|
|
###
|
|
### FORUMS
|
|
###
|
|
|
|
index.extend( extract_forums(id, course_folder, items_inorder, item_id_to_index, verbose) )
|
|
|
|
"""
|
|
|
|
|
|
|
|
###
|
|
### QUIZZES
|
|
###
|
|
|
|
|
|
# get a list external urls
|
|
headered = 0
|
|
t = url + '/api/v1/courses/' + str(id) + '/modules'
|
|
while t: t = fetch(t)
|
|
mods = results
|
|
results = []
|
|
for m in mods:
|
|
results = []
|
|
t2 = url + '/api/v1/courses/' + str(id) + '/modules/' + str(m['id']) + '/items'
|
|
while t2: t2 = fetch(t2)
|
|
items = results
|
|
for i in items:
|
|
#print i
|
|
if i['type'] == "ExternalUrl":
|
|
#print i
|
|
for j in 'id,title,external_url'.split(','):
|
|
print unicode(i[j]), "\t",
|
|
print ""
|
|
if not headered: index.append( ('<br /><b>External Links</b><br />') )
|
|
headered = 1
|
|
index.append( (i['external_url'], i['title']) )
|
|
"""
|
|
|
|
|
|
|
|
# Create index page of all gathered items
|
|
myindex = codecs.open(course_folder+'/index.html','w','utf-8')
|
|
for i in index:
|
|
if len(i)==2: myindex.write("<a href='"+i[0]+"'>"+i[1]+"</a><br />\n")
|
|
else: myindex.write(i)
|
|
|
|
|
|
|
|
# Full course content in single file
|
|
print("Writing main course files...")
|
|
mycourse = codecs.open(course_folder+'/fullcourse.raw.html','w','utf-8')
|
|
|
|
for I in items:
|
|
if I:
|
|
mycourse.write( I )
|
|
|
|
|
|
|
|
temp = open('cache/coursedump.txt','w')
|
|
temp.write( "items: " + json.dumps(items,indent=2) )
|
|
temp.write("\n\n\n")
|
|
temp.write( "index: " + json.dumps(index,indent=2) )
|
|
temp.write("\n\n\n")
|
|
temp.write( "items_inorder: " + json.dumps(items_inorder,indent=2) )
|
|
temp.write("\n\n\n")
|
|
temp.write( "item_id_to_index: " + json.dumps(item_id_to_index,indent=2) )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if video_link_list:
|
|
mycourse.write('\n<h1>Videos Linked in Pages</h1>\n<table>')
|
|
for V in video_link_list:
|
|
(url, txt, pg) = V
|
|
mycourse.write("<tr><td><a target='_blank' href='"+url+"'>"+txt+"</a></td><td> on <a target='_blank' href='" + pg + "'>" + pg + "</a></td></tr>\n")
|
|
mycourse.write("</table>\n")
|
|
|
|
mycourse.close()
|
|
output = pypandoc.convert_file(course_folder+'/fullcourse.raw.html', 'html', outputfile=course_folder+"/fullcourse.html")
|
|
output1 = pypandoc.convert_file(course_folder+'/fullcourse.html', 'md', outputfile=course_folder+"/fullcourse.md")
|
|
output2 = pypandoc.convert_file(course_folder+'/fullcourse.html', 'docx', outputfile=course_folder+"/fullcourse.docx")
|
|
|
|
|
|
def pan_testing():
|
|
course_folder = '../course_temps/course_6862'
|
|
output3 = pypandoc.convert_file(course_folder+'/fullcourse.md', 'html', outputfile=course_folder+"/fullcourse.v2.html")
|
|
|
|
# Given course, page url, and new content, upload the new revision of a page
|
|
def create_page(course_num,new_title,new_content):
|
|
t3 = url + '/api/v1/courses/' + str(course_num) + '/pages'
|
|
#xyz = raw_input('Enter 1 to continue and send back to: ' + t3 + ': ')
|
|
#print("Creating page: %s\nwith content:%s\n\n\n" % (new_title,new_content))
|
|
print("Creating page: %s" % new_title)
|
|
xyz = input('type 1 to confirm: ') #'1'
|
|
if xyz=='1':
|
|
data = {'wiki_page[title]':new_title, 'wiki_page[body]':new_content}
|
|
r3 = requests.post(t3, headers=header, params=data)
|
|
print(r3)
|
|
print('ok')
|
|
|
|
|
|
def md_to_course():
|
|
#input = 'C:/Users/peter/Nextcloud/Documents/gavilan/student_orientation.txt'
|
|
#output = 'C:/Users/peter/Nextcloud/Documents/gavilan/stu_orientation/student_orientation.html'
|
|
id = "11214"
|
|
infile = 'cache/pages/course_%s.md' % id
|
|
output = 'cache/pages/course_%s_fixed.html' % id
|
|
output3 = pypandoc.convert_file(infile, 'html', format='md', outputfile=output)
|
|
|
|
xx = codecs.open(output,'r','utf-8').read()
|
|
soup = bs( xx, features="lxml" )
|
|
soup.encode("utf-8")
|
|
|
|
current_page = ""
|
|
current_title = ""
|
|
|
|
for child in soup.body.children:
|
|
if child.name == "h1" and not current_title:
|
|
current_title = child.get_text()
|
|
elif child.name == "h1":
|
|
upload_page(id,current_title,current_page)
|
|
current_title = child.get_text()
|
|
current_page = ""
|
|
print( "Next page: %s" % current_title )
|
|
else:
|
|
#print(dir(child))
|
|
if 'prettify' in dir(child):
|
|
current_page += child.prettify(formatter="html")
|
|
else:
|
|
current_page += child.string
|
|
|
|
upload_page(id,current_title,current_page)
|
|
print("Done")
|
|
|
|
|
|
# DL pages only
|
|
def grab_course_pages(course_num=-1):
|
|
global results, results_dict, url, header
|
|
# course_num = raw_input("What is the course id? ")
|
|
if course_num<0:
|
|
course_num = input("Id of course? ")
|
|
else:
|
|
course_num = str(course_num)
|
|
modpagelist = []
|
|
modurllist = []
|
|
# We want things in the order of the modules
|
|
t4 = url + '/api/v1/courses/'+str(course_num)+'/modules?include[]=items'
|
|
results = fetch(t4)
|
|
i = 1
|
|
pageout = codecs.open('cache/pages/course_'+str(course_num)+'.html','w','utf-8')
|
|
pageoutm = codecs.open('cache/pages/course_'+str(course_num)+'.md','w','utf-8')
|
|
divider = "\n### "
|
|
for M in results:
|
|
print("Module Name: " + M['name'])
|
|
for I in M['items']:
|
|
if I['type']=='Page':
|
|
modpagelist.append(I['title'])
|
|
modurllist.append(I['page_url'])
|
|
pageout.write(divider+I['title']+'### '+I['page_url']+'\n')
|
|
easier_filename = clean_title(I['page_url'])
|
|
print(" " + str(i) + ". " + I['title'])
|
|
t2 = url + '/api/v1/courses/' + str(course_num) + '/pages/'+I['page_url']
|
|
print('Getting: ' + t2)
|
|
mypage = fetch(t2)
|
|
fixed = checker.safe_html(mypage['body'])
|
|
if fixed:
|
|
#markdown = h2m.convert(fixed)
|
|
#p_data = pandoc.read(mypage['body'])
|
|
markdown = pypandoc.convert_text("\n<h1>" + I['title'] + "</h1>\n" + mypage['body'], 'md', format='html')
|
|
pageout.write(fixed+'\n')
|
|
pageoutm.write(markdown+'\n')
|
|
pageout.flush()
|
|
i += 1
|
|
pageout.close()
|
|
pageoutm.close()
|
|
|
|
# Download, clean html, and reupload page
|
|
def update_page():
|
|
global results, results_dict, url, header
|
|
# course_num = raw_input("What is the course id? ")
|
|
course_num = '6862'
|
|
t = url + '/api/v1/courses/' + str(course_num) + '/pages'
|
|
while t: t = fetch(t)
|
|
pages = results
|
|
results = []
|
|
mypagelist = []
|
|
myurllist = []
|
|
modpagelist = []
|
|
modurllist = []
|
|
for p in pages:
|
|
p['title'] = clean_title(p['title'])
|
|
mypagelist.append(p['title'])
|
|
myurllist.append(p['url'])
|
|
easier_filename = clean_title(p['url'])
|
|
#for a in 'title,updated_at,published'.split(','):
|
|
# print unicode(p[a]), "\t",
|
|
#print ""
|
|
|
|
# We want things in the order of the modules
|
|
t4 = url + '/api/v1/courses/'+str(course_num)+'/modules?include[]=items'
|
|
while t4: t4 = fetch(t4)
|
|
mods = results
|
|
results = []
|
|
i = 1
|
|
print("\nWhat page do you want to repair?")
|
|
for M in mods:
|
|
print("Module Name: " + M['name'])
|
|
for I in M['items']:
|
|
if I['type']=='Page':
|
|
modpagelist.append(I['title'])
|
|
modurllist.append(I['page_url'])
|
|
print(" " + str(i) + ". " + I['title'])
|
|
i += 1
|
|
|
|
choice = input("\n> ")
|
|
choice = int(choice) - 1
|
|
chosen_url = modurllist[choice]
|
|
print('Fetching: ' + modpagelist[choice])
|
|
t2 = url + '/api/v1/courses/' + str(course_num) + '/pages/'+chosen_url
|
|
print('From: ' + t2)
|
|
|
|
results_dict = {}
|
|
while(t2): t2 = fetch(t2)
|
|
mypage = results_dict
|
|
fixed_page = checker.safe_html(mypage['body'])
|
|
upload_page(course_num,chosen_url,fixed_page)
|
|
|
|
# given dict of file info (from files api), construct an img tag that works in a page
|
|
#def file_to_img_tag(f, alt, course, soup):
|
|
# #tag = f"<img id=\"\" src=\"https://ilearn.gavilan.edu/courses/{course}/files/{f['id']}/preview\" alt=\"{f['filename']}\" "
|
|
# #tag += f"data-api-endpoint=\"https://ilearn.gavilan.edu/api/v1/courses/{course}/files/{f['id']}\" data-api-returntype=\"File\" />"
|
|
# return T
|
|
|
|
|
|
def html_file_to_page(filename, course, tags):
|
|
|
|
try:
|
|
soup = bs4.BeautifulSoup(codecs.open(filename,'r', 'utf-8').read(), 'html.parser')
|
|
except Exception as e:
|
|
print(f"Exception on {filename}: {e}")
|
|
return
|
|
img_tags = soup.find_all('img')
|
|
|
|
result = {'title': soup.title.text if soup.title else ''}
|
|
result['title'].strip()
|
|
|
|
for img in img_tags:
|
|
src = img['src']
|
|
try:
|
|
alt = img['alt']
|
|
except:
|
|
alt = src
|
|
orig_filename = os.path.basename(src)
|
|
if orig_filename in tags:
|
|
T = soup.new_tag(name='img', src=f"https://ilearn.gavilan.edu/courses/{course}/files/{tags[orig_filename]['id']}/preview")
|
|
T['id'] = tags[orig_filename]['id']
|
|
T['alt'] = alt
|
|
T['data-api-endpoint'] = f"https://ilearn.gavilan.edu/api/v1/courses/{course}/files/{tags[orig_filename]['id']}"
|
|
T['data-api-returntype'] = "File"
|
|
img.replace_with(T)
|
|
print( f" replaced image: {src} alt: {alt}")
|
|
else:
|
|
print( f" couldn't find replacement image: {src} alt: {alt}")
|
|
outfile = codecs.open(filename+"_mod.html", 'w', 'utf-8')
|
|
outfile.write( soup.prettify() )
|
|
outfile.close()
|
|
result['body'] = ''.join(map(str, soup.body.contents)) if soup.body else ''
|
|
return result
|
|
|
|
def create_new_page(course_id, title, body):
|
|
print(f"Creating page: {title}, length: {len(body)}")
|
|
request = f"{url}/api/v1/courses/{course_id}/pages"
|
|
print(request)
|
|
data = { 'wiki_page[title]': title, 'wiki_page[body]': body }
|
|
r3 = requests.post(request, headers=header, data=data)
|
|
try:
|
|
result = json.loads(r3.text)
|
|
print( f" + ok: {result['url']}")
|
|
except:
|
|
print(" - problem creating page?")
|
|
|
|
# Given a folder full of html pages and their linked images, create Canvas PAGES of them
|
|
def make_pages_from_folder(folder='cache/csis6/', course = '20558'):
|
|
if 0:
|
|
request = f"{url}/api/v1/courses/{course}/files"
|
|
print("Fetching course files")
|
|
files = fetch(request)
|
|
|
|
tempfile = codecs.open('cache/csis6filelist.json','w','utf-8')
|
|
tempfile.write(json.dumps(files))
|
|
tempfile.close()
|
|
|
|
if 1:
|
|
files = json.loads( codecs.open('cache/csis6filelist.json', 'r', 'utf-8').read())
|
|
|
|
|
|
|
|
course_files = {f['filename']: f for f in files}
|
|
tags = {}
|
|
for f in files:
|
|
if f['filename'].lower().endswith('.jpg') or f['filename'].lower().endswith('.png'):
|
|
tags[f['filename']] = f
|
|
|
|
|
|
contents = os.listdir(folder)
|
|
contents = ['welcome.html','welcome2.html', 'welcome3.html']
|
|
print(contents)
|
|
for f in contents:
|
|
m = re.search(r'^(.*)\.(html?)$', f)
|
|
if m:
|
|
print(f"html file: {m.group(1)}, extension: {m.group(2)}")
|
|
newpage = html_file_to_page(folder+f, course, tags)
|
|
create_new_page(course, newpage['title'], newpage['body'])
|
|
else:
|
|
m = re.search(r'^(.*)\.(.*)$', f)
|
|
if m:
|
|
print(f"other file: {m.group(1)}, extension: {m.group(2)}")
|
|
else:
|
|
print(f"unknown file: {f}")
|
|
|
|
|
|
|
|
|
|
# Given course, page url, and new content, upload the new revision of a page
|
|
def upload_page(course_num,pageurl,new_content):
|
|
print("Repaired page:\n\n")
|
|
#print new_content
|
|
print(pageurl)
|
|
t3 = url + '/api/v1/courses/' + str(course_num) + '/pages/' + pageurl
|
|
xyz = input('Enter 1 to continue and send back to: ' + t3 + ': ')
|
|
#xyz = '1'
|
|
if xyz=='1':
|
|
data = {'wiki_page[body]':new_content}
|
|
r3 = requests.put(t3, headers=header, params=data)
|
|
print(r3)
|
|
print('ok')
|
|
|
|
# Use template to build html page with homegrown subtitles
|
|
def build_srt_embed_php(data):
|
|
template = codecs.open('template_srt_and_video.txt','r','utf-8').readlines()
|
|
result = ''
|
|
for L in template:
|
|
L = re.sub('FRAMEID',data['frameid'],L)
|
|
L = re.sub('TITLE',data['title'],L)
|
|
L = re.sub('EMBEDLINK',data['embedlink'],L)
|
|
L = re.sub('SRTFOLDERFILE',data['srtfolderfile'],L)
|
|
result += L
|
|
return result
|
|
|
|
|
|
|
|
|
|
def yt_title(code):
|
|
global saved_titles
|
|
if code in saved_titles:
|
|
return saved_titles[code]
|
|
a = requests.get('https://www.youtube.com/watch?v=%s' % code)
|
|
bbb = bs(a.content,"lxml")
|
|
ccc = bbb.find('title').text
|
|
ccc = re.sub(r'\s\-\sYouTube','',ccc)
|
|
saved_titles[code] = ccc
|
|
codecs.open('saved_youtube_titles.json','w','utf-8').write(json.dumps(saved_titles))
|
|
return ccc
|
|
|
|
def swap_youtube_subtitles():
|
|
# example here: http://siloor.github.io/youtube.external.subtitle/examples/srt/
|
|
|
|
# srt folder, look at all filenames
|
|
srtlist = os.listdir('video_srt')
|
|
i = 0
|
|
for V in srtlist:
|
|
print(str(i) + '. ' + V)
|
|
i += 1
|
|
choice = input("Which SRT folder? ")
|
|
choice = srtlist[int(choice)]
|
|
srt_folder = 'video_srt/'+choice
|
|
class_srt_folder = choice
|
|
srt_files = os.listdir(srt_folder)
|
|
srt_shorts = {}
|
|
print("\nThese are the subtitle files: " + str(srt_files))
|
|
for V in srt_files:
|
|
if V.endswith('srt'):
|
|
V1 = re.sub(r'(\.\w+$)','',V)
|
|
srt_shorts[V] = minimal_string(V1)
|
|
|
|
crs_id = input("What is the id of the course? ")
|
|
grab_course_pages(crs_id)
|
|
v1_pages = codecs.open('page_revisions/course_'+str(crs_id)+'.html','r','utf-8')
|
|
v1_content = v1_pages.read()
|
|
|
|
# a temporary page of all youtube links
|
|
tp = codecs.open('page_revisions/links_' + str(crs_id) + '.html', 'w','utf-8')
|
|
|
|
# course pages, get them all and look for youtube embeds
|
|
title_shorts = {}
|
|
title_embedlink = {}
|
|
title_list = []
|
|
print("I'm looking for iframes and youtube links.")
|
|
for L in v1_content.split('\n'):
|
|
if re.search('<a.*?href="https:\/\/youtu',L):
|
|
print("Possibly there's a linked video instead of embedded:" + L)
|
|
if re.search('iframe',L):
|
|
ma = re.compile('(\w+)=(".*?")')
|
|
#print "\n"
|
|
this_title = ''
|
|
for g in ma.findall(L):
|
|
print(g)
|
|
if g[0]=='title':
|
|
this_title = g[1].replace('"','')
|
|
if g[0]=='src':
|
|
this_src = g[1].replace('"','')
|
|
#print g
|
|
if not this_title:
|
|
tmp = re.search(r'embed\/(.*?)\?',this_src)
|
|
if not tmp: tmp = re.search(r'embed\/(.*?)$',this_src)
|
|
if tmp:
|
|
this_title = yt_title(tmp.groups()[0])
|
|
title_shorts[this_title] = minimal_string(this_title)
|
|
title_list.append(this_title)
|
|
title_embedlink[this_title] = this_src
|
|
print("%s\n" % this_title.encode('ascii','ignore'))
|
|
tp.write( "%s<br><a target='_blank' href='%s'>%s</a><br /><br />" % (this_title, this_src, this_src) )
|
|
# match them
|
|
# lowercase, non alpha or num chars become a single space, try to match
|
|
# if any srts remain unmatched, ask.
|
|
tp.close()
|
|
webbrowser.open_new_tab('file://C:/SCRIPTS/everything-json/page_revisions/links_'+str(crs_id)+'.html')
|
|
|
|
matches = {} # key is Title, value is srt file
|
|
for S,v in list(srt_shorts.items()):
|
|
found_match = 0
|
|
print(v, end=' ')
|
|
for T, Tv in list(title_shorts.items()):
|
|
if v == Tv:
|
|
print(' \tMatches: ' + T, end=' ')
|
|
found_match = 1
|
|
matches[T] = S
|
|
break
|
|
#print "\n"
|
|
|
|
print("\nThese are the srt files: ")
|
|
print(json.dumps(srt_shorts,indent=2))
|
|
print("\nThese are the titles: ")
|
|
print(json.dumps(title_shorts,indent=2))
|
|
print("\nThese are the matches: ")
|
|
print(json.dumps(matches,indent=2))
|
|
|
|
print(("There are %d SRT files and %d VIDEOS found. " % ( len(list(srt_shorts.keys())), len(list(title_shorts.keys())) ) ))
|
|
|
|
for S,v in list(srt_shorts.items()):
|
|
if not S in list(matches.values()):
|
|
print("\nDidn't find a match for: " + S)
|
|
i = 0
|
|
for T in title_list:
|
|
if not T in list(matches.keys()): print(str(i+1) + ". " + T.encode('ascii', 'ignore'))
|
|
i += 1
|
|
print("Here's the first few lines of the SRT:")
|
|
print(( re.sub(r'\s+',' ', '\n'.join(open(srt_folder+"/"+S,'r').readlines()[0:10]))+"\n\n"))
|
|
choice = input("Which one should I match it to? (zero for no match) ")
|
|
if int(choice)>0:
|
|
matches[ title_list[ int(choice)-1 ] ] = S
|
|
print("SRT clean name was: %s, and TITLE clean name was: %s" % (v,title_shorts[title_list[ int(choice)-1 ]] ))
|
|
print("ok, here are the matches:")
|
|
print(json.dumps(matches,indent=2))
|
|
|
|
# construct subsidiary pages, upload them
|
|
i = 0
|
|
for m,v in list(matches.items()):
|
|
# open template
|
|
# do replacement
|
|
i += 1
|
|
data = {'frameid':'videoframe'+str(i), 'title':m, 'embedlink':title_embedlink[m], 'srtfolderfile':v }
|
|
print(json.dumps(data,indent=2))
|
|
file_part = v.split('.')[0]
|
|
new_php = codecs.open(srt_folder + '/' + file_part + '.php','w','utf-8')
|
|
new_php.write(build_srt_embed_php(data))
|
|
new_php.close()
|
|
#srt_files = os.listdir(srt_folder)
|
|
put_file(class_srt_folder)
|
|
|
|
|
|
def test_swap():
|
|
crs_id = '6923'
|
|
# swap in embed code and re-upload canvas pages
|
|
v2_pages = codecs.open('page_revisions/course_'+str(crs_id)+'.html','r','utf-8')
|
|
v2_content = v2_pages.read()
|
|
ma = re.compile('(\w+)=(".*?")')
|
|
|
|
for L in v2_content.split('\n'):
|
|
find = re.findall('<iframe(.*?)>',L)
|
|
if find:
|
|
print("Found: ", find)
|
|
for each in find:
|
|
#print "\n"
|
|
this_title = ''
|
|
this_src = ''
|
|
for g in ma.findall(each):
|
|
#print g
|
|
if g[0]=='title':
|
|
this_title = g[1].replace('"','')
|
|
if g[0]=='src':
|
|
this_src = g[1].replace('"','')
|
|
#print g
|
|
if not this_title:
|
|
tmp = re.search(r'embed\/(.*?)\?',this_src)
|
|
if not tmp: tmp = re.search(r'embed\/(.*?)$',this_src)
|
|
if tmp:
|
|
this_title = yt_title(tmp.groups()[0])
|
|
print("Found embed link: %s\n and title: %s\n" % (this_src,this_title.encode('ascii','ignore')))
|
|
|
|
|
|
def multiple_downloads():
|
|
|
|
x = input("What IDs? Separate with one space: ")
|
|
for id in x.split(" "):
|
|
accessible_check(id)
|
|
|
|
|
|
###
|
|
###
|
|
### Text / Knowledge Base
|
|
###
|
|
### How about downloading all possible info / webpages / sources
|
|
### related to Gavilan and creating a master search index?
|
|
###
|
|
### Goals:
|
|
### - Scripted approach to allow re-indexing / updating
|
|
### - Break everything down into paragraphs
|
|
###
|
|
### - Script to extract keywords, topics, entities, summaries, questions answered
|
|
### from each paragraph or chunk.
|
|
### - Use spacy, gensim, nltk, or gpt-3, or a combination of all of them
|
|
###
|
|
### - Create vector / embeddings for each paragraph
|
|
###
|
|
### - Enable a vector search engine and connect to front page of gavilan.cc
|
|
### - Use that to feed handful of source paragraphs (& prompt) into gpt and
|
|
### receive text answers to questions.
|
|
|
|
def demo_vector_search():
|
|
from gensim.models import Word2Vec
|
|
from gensim.utils import simple_preprocess
|
|
import nltk.data
|
|
import spacy
|
|
|
|
# (might have to upgrade pip first...)
|
|
# pip install --upgrade click
|
|
#
|
|
# python -m spacy download en_core_web_sm
|
|
# python -m spacy download en_core_web_lg
|
|
|
|
def is_complete_sentence(text):
|
|
#text = text.text
|
|
doc = nlp(text)
|
|
sentences = list(doc.sents)
|
|
if len(sentences) == 1 and text.strip() == sentences[0].text.strip():
|
|
return True
|
|
return False
|
|
|
|
|
|
sentences = [
|
|
"This is an example sentence.",
|
|
"Here is another sentence for training."
|
|
]
|
|
|
|
paragraph = """Financial Aid services are available in person! We are happy to assist you with your financial aid needs. If you are interested in visiting the office in person, please review the guidelines for visiting campus and schedule your appointment:
|
|
|
|
Guidelines for In-Person Financial Aid Services
|
|
|
|
Due to FERPA regulations, no student information will be given to anyone other than the student without authorization from the student.
|
|
We continue to offer virtual services. Financial Aid staff may be reached by email, phone, text, and zoom! Please refer to the contact information and schedules below.
|
|
|
|
Gavilan-WelcomeCenter_Peer_Mentors.jpg
|
|
|
|
Do you need assistance filing the FAFSA or California Dream Act Application? Friendly and knowledgeable Peer Mentors are available to assist you virtually and in person! Details below for an online Zoom visit, phone call, or in-person visit with Peer Mentors.
|
|
|
|
Monday - Friday 8am - 5pm, Student Center
|
|
Join Zoom to Connect with a Peer Mentor
|
|
Or call (669) 900-6833 and use meeting ID 408 848 4800
|
|
|
|
MicrosoftTeams-image.png
|
|
|
|
|
|
|
|
Do you need assistance with an existing financial aid application, financial aid document submission, or review of your financial aid package? Schedule an in-person, phone, or zoom appointment with our Financial Aid counter.
|
|
|
|
Mon - Thurs: 9am - 1:00pm, 2:00pm - 5:00pm
|
|
Fri: 10am - 2pm
|
|
Office: (408) 848-4727 Email: finaid@gavilan.edu
|
|
Schedule an In-Person, Phone or Zoom Appointment"""
|
|
|
|
tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
|
|
sentences1 = tokenizer.tokenize(paragraph)
|
|
for i,s in enumerate(sentences1):
|
|
print(i, "\t", s)
|
|
print("\n\n")
|
|
|
|
#nlp = spacy.load('en_core_web_sm')
|
|
nlp = spacy.load('en_core_web_md')
|
|
|
|
doc = nlp(paragraph)
|
|
sentences2 = list(doc.sents)
|
|
for i,s in enumerate(sentences2):
|
|
t = re.sub(r'\n+',' ',s.text)
|
|
is_sentence = 'yes' if is_complete_sentence(t) else 'no '
|
|
print(i, " ", is_sentence, " ", t)
|
|
print("\n\n")
|
|
|
|
#for text in sentences2:
|
|
# print(text, "is a complete sentence?" , is_complete_sentence(text))
|
|
|
|
return
|
|
|
|
tokenized_sentences = [simple_preprocess(s) for s in sentences]
|
|
model = Word2Vec(tokenized_sentences, min_count=1, vector_size=100)
|
|
|
|
example_word = "example"
|
|
vector = model.wv[example_word]
|
|
print(f"Vector for the word '{example_word}': {vector}")
|
|
|
|
|
|
|
|
def makedir():
|
|
files = os.listdir('cache/crawl')
|
|
#print(files)
|
|
files.sort()
|
|
for f in files:
|
|
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
|
|
if m:
|
|
name = m.groups()[0]
|
|
parts = name.split('+')
|
|
print(parts)
|
|
|
|
def manual_index():
|
|
files = os.listdir('cache/crawl')
|
|
#print(files)
|
|
ii = codecs.open('cache/crawl/index.html','w','utf-8')
|
|
ii.write('<html><body><h1>Site index</h1>\n')
|
|
files.sort()
|
|
for f in files:
|
|
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
|
|
if m:
|
|
name = m.groups()[0]
|
|
parts = name.split('+')
|
|
ii.write('<br /><a href="mirror/'+f+'">'+f+'</a>\n')
|
|
|
|
def my_site():
|
|
files = os.listdir('cache/crawl')
|
|
output = []
|
|
files.sort()
|
|
for f in files:
|
|
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
|
|
if m:
|
|
name = m.groups()[0]
|
|
parts = name.split('+')
|
|
output.append(parts)
|
|
return output
|
|
|
|
|
|
## TODO site scraper
|
|
## TODO find package that extracts text from web page
|
|
### TODO master list of what to index.
|
|
|
|
## TODO PDFs and DOCXs
|
|
## TODO fix urls w/ anchors
|
|
|
|
def crawl():
|
|
import scrapy, logging
|
|
from scrapy.crawler import CrawlerProcess
|
|
|
|
logger = logging.getLogger()
|
|
logger.setLevel(level=logging.CRITICAL)
|
|
logging.basicConfig(level=logging.CRITICAL)
|
|
logger.disabled = True
|
|
|
|
|
|
avoid = ['ezproxy','community\.gavilan\.edu','archive\/tag','archive\/category', 'my\.gavilan\.edu', 'augusoft',
|
|
'eis-prod', 'ilearn\.gavilan', 'mailto', 'cgi-bin', 'edu\/old\/schedule',
|
|
'admit\/search\.php', 'GavilanTrusteeAreaMaps2022\.pdf', 'schedule\/2019', 'schedule\/2020', 'schedule\/2021',
|
|
'schedule\/2022', 'schedule\/previous', ]
|
|
|
|
class MySpider(scrapy.Spider):
|
|
name = 'myspider'
|
|
#start_urls = ['https://gavilan.curriqunet.com/catalog/iq/1826']
|
|
start_urls = ['https://www.gavilan.edu']
|
|
|
|
|
|
"""
|
|
logging.getLogger("scrapy").setLevel(logging.CRITICAL)
|
|
logging.getLogger("scrapy.utils.log").setLevel(logging.CRITICAL)
|
|
logging.getLogger("scrapy.extensions.telnet").setLevel(logging.CRITICAL)
|
|
logging.getLogger("scrapy.middleware").setLevel(logging.CRITICAL)
|
|
logging.getLogger("scrapy.core.engine").setLevel(logging.CRITICAL)
|
|
logging.getLogger("scrapy.middleware").setLevel(logging.CRITICAL)
|
|
|
|
logger.disabled = True"""
|
|
|
|
def parse(self, response):
|
|
print('visited:', repr(response.url), 'status:', response.status)
|
|
done = 0
|
|
|
|
if re.search(r'\.pdf$', response.url):
|
|
m = re.search(r'\/([^\/]+\.pdf)$', response.url)
|
|
if m:
|
|
print("saving to ", save_folder + '/' + clean_fn(response.url))
|
|
pdf_response = requests.get(response.url)
|
|
with open(save_folder + '/' + clean_fn(response.url), 'wb') as f:
|
|
f.write(pdf_response.content)
|
|
text = extract_text(save_folder + '/' + clean_fn(response.url))
|
|
codecs.open(save_folder + '/' + clean_fn(response.url) + '.txt','w','utf-8').write(text)
|
|
done = 1
|
|
|
|
for ext in ['doc','docx','ppt','pptx','rtf','xls','xlsx']:
|
|
if re.search(r'\.'+ext+'$', response.url):
|
|
m = re.search(r'\/([^\/]+\.'+ext+')$', response.url)
|
|
if m:
|
|
print("saving to ", save_folder + '/' + clean_fn(response.url))
|
|
pdf_response = requests.get(response.url)
|
|
with open(save_folder + '/' + clean_fn(response.url), 'wb') as f:
|
|
f.write(pdf_response.content)
|
|
#text = extract_text(save_folder + '/' + clean_fn(response.url) + '.txt')
|
|
pandoc_infile = save_folder + '/' + clean_fn(response.url)
|
|
pandoc_outfile = save_folder + '/' + clean_fn(response.url) + '.html'
|
|
print("pandoc in file: %s" % pandoc_infile)
|
|
print("pandoc outfile: %s" % pandoc_outfile)
|
|
pypandoc.convert_file(pandoc_infile, 'html', outputfile=pandoc_outfile, extra_args=['--from=%s' % ext, '--extract-media=%s' % save_folder + '/img' ])
|
|
pandoc_output = codecs.open(pandoc_outfile,'r','utf-8').read()
|
|
txt_output = trafilatura.extract(pandoc_output,include_links=True, deduplicate=True, include_images=True, include_formatting=True)
|
|
if txt_output:
|
|
codecs.open(save_folder + '/' + clean_fn(response.url) + '.txt','w','utf-8').write(txt_output)
|
|
done = 1
|
|
|
|
for ext in ['jpg','jpeg','gif','webp','png','svg','bmp','tiff','tif','ico']:
|
|
if re.search(r'\.'+ext+'$', response.url):
|
|
m = re.search(r'\/([^\/]+\.'+ext+')$', response.url)
|
|
if m:
|
|
print("saving to ", save_folder + '/img/' + clean_fn(response.url))
|
|
pdf_response = requests.get(response.url)
|
|
with open(save_folder + '/img/' + clean_fn(response.url), 'wb') as f:
|
|
f.write(pdf_response.content)
|
|
done = 1
|
|
|
|
if not done:
|
|
f_out = codecs.open(save_folder + '/' + clean_fn(response.url) + '.txt','w','utf-8')
|
|
|
|
this_output = trafilatura.extract(response.text,include_links=True, deduplicate=True, include_images=True, include_formatting=True)
|
|
if this_output:
|
|
f_out.write(this_output)
|
|
f_out.close()
|
|
links = response.css('a::attr(href)').getall()
|
|
|
|
# Follow each link and parse its contents
|
|
for link in links:
|
|
go = 1
|
|
full_link = response.urljoin(link)
|
|
print('++++++ trying ', full_link)
|
|
|
|
if not re.search(r'gavilan\.edu',full_link):
|
|
go = 0
|
|
print('--- not gav edu')
|
|
else:
|
|
if re.search(r'hhh\.gavilan\.edu',full_link):
|
|
pass
|
|
elif not re.search(r'^https?:\/\/www\.gavilan\.edu',full_link):
|
|
# need to add www to gavilan.edu
|
|
m = re.search(r'^(https?:\/\/)gavilan\.edu(\/.*)$',full_link)
|
|
if m:
|
|
full_link = m.group(1) + 'www.' + m.group(2)
|
|
for a in avoid:
|
|
if re.search(a,full_link):
|
|
go = 0
|
|
print('--- avoid ', a)
|
|
|
|
if go: yield scrapy.Request(full_link, callback=self.parse,
|
|
headers={"User-Agent": "Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"})
|
|
else:
|
|
print("------ avoiding ", full_link)
|
|
# Instantiate a CrawlerProcess object
|
|
process = CrawlerProcess()
|
|
|
|
# Add the MySpider spider to the process
|
|
process.crawl(MySpider)
|
|
|
|
# Start the process
|
|
logging.basicConfig(level=logging.CRITICAL)
|
|
logging.getLogger('scrapy').propagate = False
|
|
logging.getLogger("trafilatura").setLevel(logging.CRITICAL)
|
|
logging.getLogger("trafilatura").propagate = False
|
|
logging.getLogger("pdfminer").setLevel(logging.CRITICAL)
|
|
logging.getLogger("pdfminer").propagate = False
|
|
logging.getLogger("urllib3").setLevel(logging.CRITICAL)
|
|
logging.getLogger("urllib3").propagate = False
|
|
logging.basicConfig(level=logging.CRITICAL)
|
|
process.start()
|
|
|
|
|
|
|
|
save_folder = 'cache/crawl'
|
|
clean_folder = 'cache/cleancrawl'
|
|
|
|
def clean_fn(s):
|
|
s = re.sub(r'[\s:]+','',s)
|
|
s = re.sub(r'\/','+',s)
|
|
return s
|
|
|
|
def format_html(html):
|
|
soup = bs4.BeautifulSoup(html, 'html.parser')
|
|
return soup.prettify()
|
|
|
|
|
|
|
|
|
|
def txt_clean_index():
|
|
files = os.listdir(save_folder)
|
|
line_freq = defaultdict(int)
|
|
|
|
# first pass
|
|
for f in files:
|
|
lines = codecs.open(save_folder + '/' + f,'r','utf-8').readlines()
|
|
for L in lines:
|
|
L = L.strip()
|
|
line_freq[L] += 1
|
|
|
|
# second pass
|
|
for f in files:
|
|
print("\n\n",f)
|
|
lines = codecs.open(save_folder + '/' + f,'r','utf-8').readlines()
|
|
out = codecs.open(clean_folder + '/' + f,'w','utf-8')
|
|
for L in lines:
|
|
L = L.strip()
|
|
if L in line_freq and line_freq[L] > 3:
|
|
continue
|
|
print(L)
|
|
out.write(L + '\n')
|
|
out.close()
|
|
|
|
|
|
|
|
|
|
from whoosh import fields, columns
|
|
from whoosh.index import create_in, open_dir
|
|
from whoosh.fields import Schema, TEXT, ID, STORED, NUMERIC
|
|
from whoosh.qparser import QueryParser
|
|
from whoosh.analysis import StemmingAnalyzer
|
|
|
|
def priority_from_url(url):
|
|
priority = 1
|
|
# url is like this: https++www.gavilan.edu+news+Newsletters.php.txt
|
|
m = re.search(r'gavilan\.edu\+(.*)\.\w\w\w\w?$',url)
|
|
if m:
|
|
address = m.group(1)
|
|
parts = address.split('+')
|
|
if parts[0] in ['accreditation','curriculum','senate','research','old','committee','board','styleguide']:
|
|
priority += 20
|
|
if parts[0] in ['news','IT','HOM','administration']:
|
|
priority += 10
|
|
if parts[0] == 'admit' and parts[1] == 'schedule':
|
|
priority += 10
|
|
if 'accreditation' in parts:
|
|
priority += 50
|
|
if re.search(r'hhh\.gavilan\.edu',url):
|
|
priority += 100
|
|
priority *= len(parts)
|
|
#print(priority, parts)
|
|
else:
|
|
priority *= 50
|
|
#print(priority, url)
|
|
return priority
|
|
|
|
|
|
def test_priority():
|
|
ff = os.listdir('cache/crawl')
|
|
for f in ff:
|
|
priority_from_url(f)
|
|
|
|
|
|
|
|
def displayfile(f,aslist=0):
|
|
lines = codecs.open('cache/crawl/' + f,'r','utf-8').readlines()
|
|
lines = [L.strip() for L in lines]
|
|
lines = [L for L in lines if L and not re.search(r'^\|$',L)]
|
|
if aslist:
|
|
return lines
|
|
return "\n".join(lines)
|
|
|
|
def any_match(line, words):
|
|
# true if any of the words are in line
|
|
for w in words:
|
|
if re.search(w, line, re.IGNORECASE):
|
|
return True
|
|
return False
|
|
|
|
|
|
def find_match_line(filename, query):
|
|
q_words = query.split(" ")
|
|
lines = codecs.open('cache/crawl/' + filename,'r','utf-8').readlines()
|
|
lines = [L.strip() for L in lines]
|
|
lines = [L for L in lines if L and not re.search(r'^\|$',L)]
|
|
lines = [L for L in lines if any_match(L, q_words)]
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
def search_index():
|
|
s = ''
|
|
schema = Schema(url=STORED, title=TEXT(stored=True), content=TEXT, priority=fields.COLUMN(columns.NumericColumn("i")))
|
|
ix = open_dir("cache/searchindex")
|
|
|
|
|
|
#with ix.reader() as reader:
|
|
#print(reader.doc_count()) # number of documents in the index
|
|
#print(reader.doc_frequency("content", "example")) # number of documents that contain the term "example" in the "content" field
|
|
#print(reader.field_length("content")) # total number of terms in the "content" field
|
|
#print(reader.term_info("content", "example")) # information about the term "example" in the "content" field
|
|
#print(reader.dump()) # overview of the entire index
|
|
|
|
|
|
while s != 'q':
|
|
s = input("search or 'q' to quit: ")
|
|
if s == 'q':
|
|
return
|
|
|
|
# Define the query parser for the index
|
|
with ix.searcher() as searcher:
|
|
query_parser = QueryParser("content", schema=schema)
|
|
|
|
# Parse the user's query
|
|
query = query_parser.parse(s)
|
|
print(query)
|
|
|
|
# Search the index for documents matching the query
|
|
results = searcher.search(query, sortedby="priority")
|
|
|
|
# Print the results
|
|
i = 1
|
|
for result in results:
|
|
print(i, result) # result["url"], result["content"])
|
|
print(find_match_line(result['url'], s))
|
|
print()
|
|
i += 1
|
|
|
|
|
|
|
|
def create_search_index():
|
|
# Define the schema for the index
|
|
|
|
stem_ana = StemmingAnalyzer()
|
|
schema = Schema(url=STORED, title=TEXT(stored=True), content=TEXT, priority=fields.COLUMN(columns.NumericColumn("i")))
|
|
|
|
# Create a new index in the directory "myindex"
|
|
ix = create_in("cache/searchindex", schema)
|
|
|
|
# Open an existing index
|
|
#ix = open_dir("cache/searchindex")
|
|
|
|
# Define the writer for the index
|
|
writer = ix.writer()
|
|
|
|
# Index some documents
|
|
files = os.listdir('cache/crawl')
|
|
files.sort()
|
|
for f in files:
|
|
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
|
|
if m:
|
|
print(f)
|
|
writer.add_document(url=f, title=m.group(1), content=displayfile(f), priority=priority_from_url(f))
|
|
writer.commit()
|
|
|
|
|
|
|
|
from annoy import AnnoyIndex
|
|
import random
|
|
|
|
def test_embed():
|
|
model = SentenceTransformer('all-MiniLM-L6-v2')
|
|
sample = "What is this world coming to? What happens in the data and the research?"
|
|
embed = model.encode(sample)
|
|
|
|
print("\nSample sentence:", sample)
|
|
print("\nEmbedding:", embed)
|
|
print("\nEmbedding size:", len(embed))
|
|
|
|
|
|
def create_embeddings():
|
|
model = SentenceTransformer('all-MiniLM-L6-v2')
|
|
vecsize = 384 # sentence transformer embedding size
|
|
t = AnnoyIndex(vecsize, 'angular')
|
|
files = os.listdir('cache/crawl')
|
|
output = [] # ['index', 'file','sentence']
|
|
index = 0
|
|
save_embeds = []
|
|
files.sort()
|
|
for f in files:
|
|
print(f)
|
|
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
|
|
if m:
|
|
lines = displayfile(f,1)
|
|
embeddings = model.encode(lines)
|
|
|
|
print("\n-----", index, f)
|
|
|
|
for sentence, embedding in zip(lines, embeddings):
|
|
if len(sentence.split(' ')) > 5:
|
|
print(index, "Sentence:", sentence)
|
|
print(embedding[:8])
|
|
t.add_item(index, embedding)
|
|
output.append( [index,f,sentence] )
|
|
index += 1
|
|
if index > 500:
|
|
break
|
|
t.build(30) # 30 trees
|
|
t.save('cache/sentences.ann')
|
|
pickle.dump( output, open( "cache/embedding_index.p", "wb" ) )
|
|
|
|
|
|
|
|
|
|
def search_embeddings():
|
|
f = 384 # sentence transformer embedding size
|
|
n = 10 # how many results
|
|
|
|
u = AnnoyIndex(f, 'angular')
|
|
u.load('cache/sentences.ann') # super fast, will just mmap the file
|
|
print(u.get_n_items(), "items in index")
|
|
model = SentenceTransformer('all-MiniLM-L6-v2')
|
|
search_index = pickle.load( open( "cache/embedding_index.p", "rb" ) )
|
|
print(search_index)
|
|
|
|
|
|
s = ''
|
|
while s != 'q':
|
|
s = input("search or 'q' to quit: ")
|
|
if s == 'q':
|
|
return
|
|
query_embedding = model.encode(s)
|
|
results = u.get_nns_by_vector(query_embedding, n)
|
|
|
|
# Print the top 5 results
|
|
for i, r in enumerate(results):
|
|
print(f'Top {i+1}: {r}, {search_index[r]}') #{file} - {sentence} - (Score: {score})')
|
|
|
|
|
|
|
|
def repairy_ezproxy_links():
|
|
from localcache2 import pages_in_term
|
|
|
|
# get all pages in term
|
|
all_pages = pages_in_term()
|
|
|
|
# c.id, c.course_code, c.sis_source_id, wp.id as wp_id, wp.title, wp.url, c.name , wp.body
|
|
for p in all_pages:
|
|
course = p[1]
|
|
title = p[4]
|
|
url = p[5]
|
|
body = p[7]
|
|
# print(body)
|
|
try:
|
|
#s = re.search('''["']https:\/\/ezproxy\.gavilan\.edu\/login\?url=(.*)["']''',body)
|
|
a = re.search(r'Online Library Services',title)
|
|
if a:
|
|
continue
|
|
s = re.findall('\n.*ezproxy.*\n',body)
|
|
if s:
|
|
print(course, title, url)
|
|
print(" ", s, "\n") # s.group())
|
|
except Exception as e:
|
|
#print(f"Skipped: {title}, {e}")
|
|
pass
|
|
|
|
|
|
|
|
def fetch_support_page():
|
|
u = "https://ilearn.gavilan.edu/courses/20850/pages/online-student-support-hub"
|
|
course_num = 20850
|
|
page_url = "online-student-support-hub"
|
|
t2 = f"{url}/api/v1/courses/{course_num}/pages/{page_url}"
|
|
print('Getting: ' + t2)
|
|
mypage = fetch(t2)
|
|
print(json.dumps(mypage,indent=2))
|
|
print(mypage['body'])
|
|
|
|
|
|
from courses import getCoursesInTerm
|
|
|
|
def clear_old_page(shell_id,page_name):
|
|
# get all pages
|
|
t = f"{url}/api/v1/courses/{shell_id}/pages"
|
|
pages = fetch(t)
|
|
for page in pages:
|
|
if page['title'] == page_name:
|
|
print(f"found a page named {page_name}. Deleting it.")
|
|
id = page['page_id']
|
|
t2 = f"{url}/api/v1/courses/{shell_id}/pages/{id}"
|
|
r2 = requests.delete(t2, headers=header)
|
|
print(f"{r2}")
|
|
|
|
def add_support_page_full_semester(term=287):
|
|
print("Fetching list of all active courses")
|
|
# term = 184 # fa24 # 182
|
|
c = getCoursesInTerm(term,0,0) # sp25 = 287 wi24=182
|
|
|
|
#print(c)
|
|
|
|
check = 'each'
|
|
print("answer 'all' to do the rest without confirming")
|
|
|
|
for C in c:
|
|
if check == 'each':
|
|
answer = input(f"Type 1 <enter> to add support page to {C['id']} ({C['name']}) ")
|
|
if answer == '1':
|
|
create_support_page(C['id'])
|
|
else:
|
|
if answer == 'all':
|
|
check = 'all'
|
|
create_support_page(C['id'])
|
|
continue
|
|
elif check == 'all':
|
|
create_support_page(C['id'])
|
|
|
|
def create_support_page(shell_id=18297): # 29):
|
|
|
|
# clear one of same name first.
|
|
clear_old_page(shell_id, "Online Student Support Hub")
|
|
|
|
# make new one
|
|
t3 = f"{url}/api/v1/courses/{shell_id}/pages/online-student-support-hub"
|
|
new_content = codecs.open("cache/support_min.html","r","utf-8").read()
|
|
title = "Online Student Support Hub"
|
|
data = {'wiki_page[body]':new_content, 'wiki_page[title]':title, 'wiki_page[published]':"true"}
|
|
r3 = requests.put(t3, headers=header, params=data)
|
|
#print(r3.content)
|
|
|
|
print('Page Created')
|
|
try:
|
|
response = r3.json()
|
|
print(f"page id: {response['page_id']}")
|
|
except Exception as e:
|
|
print(f"Exception: {e}")
|
|
|
|
|
|
# list modules
|
|
# GET /api/v1/courses/:course_id/modules
|
|
t4 = f"{url}/api/v1/courses/{shell_id}/modules"
|
|
modules = fetch(t4)
|
|
module_id = 0
|
|
|
|
# what if there are no modules?
|
|
if len(modules) == 0:
|
|
t6 = f"{url}/api/v1/courses/{shell_id}/modules/"
|
|
mod_data = {'module[name]': 'Welcome', 'module[unlock_at]':"2024-01-01T06:00:00-08:00"}
|
|
r6 = requests.post(t6, headers=header, params=mod_data)
|
|
mod_response = r6.json()
|
|
module_id = mod_response['id']
|
|
print(f"created module, id: {module_id}")
|
|
|
|
# publish module
|
|
t7 = f"{url}/api/v1/courses/{shell_id}/modules/{module_id}"
|
|
mod_data2 = {'module[published]':'true'}
|
|
r6 = requests.put(t7, headers=header, params=mod_data2)
|
|
|
|
for M in modules:
|
|
if M['position'] == 1:
|
|
module_id = M['id']
|
|
print(f"found first module 1: ({module_id}) {M['name']}")
|
|
#print(json.dumps(modules,indent=2))
|
|
#
|
|
# create module item
|
|
# POST /api/v1/courses/:course_id/modules/:module_id/items
|
|
t5 = f"{url}/api/v1/courses/{shell_id}/modules/{module_id}/items"
|
|
item_data = {'module_item[title]': title, 'module_item[type]': 'Page', 'module_item[page_url]': response['url'], 'module_item[position]':1}
|
|
r5 = requests.post(t5, headers=header, params=item_data)
|
|
|
|
print('ok')
|
|
|
|
def list_modules_and_items(shell_id, verbose=0):
|
|
modules = fetch(f"{url}/api/v1/courses/{shell_id}/modules?include[]=items&include[]=content_details")
|
|
if verbose: print(json.dumps(modules,indent=2))
|
|
return modules
|
|
|
|
def check_modules_for_old_orientation():
|
|
from util import contains_key_value, find_dict_with_key_value, extract_key_values
|
|
|
|
checklist = []
|
|
|
|
for term in [286, 287]: # wi25, sp25
|
|
|
|
print("Fetching list of all active courses")
|
|
#term = 287 # 184 # fa24 # 182
|
|
#term = 286 # wi25
|
|
c = getCoursesInTerm(term,0,0) # sp25 = 287 wi24=182
|
|
|
|
for C in c:
|
|
print(f"{C['id']} - {C['name']}")
|
|
m = list_modules_and_items(C['id'])
|
|
|
|
if contains_key_value(m, 'name', 'Online Student Support Services - Summer & Fall 2024'):
|
|
old_mod = find_dict_with_key_value(m,'name','Online Student Support Services - Summer & Fall 2024')
|
|
|
|
print(" this course has the old module")
|
|
checklist.append(f"{C['id']}")
|
|
titles = extract_key_values(old_mod, 'title')
|
|
[ print(f" {T}") for T in titles ]
|
|
|
|
print(f"\nCheck these course ids:")
|
|
for id in checklist:
|
|
print(id)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
print ('')
|
|
options = { 1: ['download a class into a folder / word file', accessible_check] ,
|
|
2: ['download multiple classes', multiple_downloads ],
|
|
3: ['convert stuff', pan_testing ],
|
|
4: ['convert md to html', md_to_course ],
|
|
5: ['course download tester', test_forums ],
|
|
# 5: ['import freshdesk content', freshdesk ],
|
|
6: ['download all a courses pages', grab_course_pages],
|
|
7: ['demo vector search', demo_vector_search],
|
|
8: ['crawl',crawl],
|
|
9: ['clean text index', txt_clean_index],
|
|
10: ['make web dir struct', manual_index],
|
|
11: ['create search embeddings', create_embeddings],
|
|
12: ['create search index', create_search_index],
|
|
13: ['do an index search', search_index],
|
|
14: ['do a vector search', search_embeddings],
|
|
15: ['test priority', test_priority],
|
|
16: ['test embed', test_embed],
|
|
17: ['repair ezproxy links', repairy_ezproxy_links],
|
|
18: ['create pages from html files', make_pages_from_folder],
|
|
19: ['fetch support page', fetch_support_page],
|
|
20: ['create support page', create_support_page],
|
|
21: ['add support page to all shells in semester', add_support_page_full_semester],
|
|
22: ['fetch all modules / items', check_modules_for_old_orientation]
|
|
}
|
|
|
|
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
|
|
resp = int(sys.argv[1])
|
|
print("\n\nPerforming: %s\n\n" % options[resp][0])
|
|
|
|
else:
|
|
print ('')
|
|
for key in options:
|
|
print(str(key) + '.\t' + options[key][0])
|
|
|
|
print('')
|
|
resp = input('Choose: ')
|
|
|
|
# Call the function in the options dict
|
|
options[ int(resp)][1]()
|
|
|