smorgasboard of changes

This commit is contained in:
Peter Howell 2025-08-26 11:46:33 -07:00
parent 1a0daf82b5
commit f4c82c237c
8 changed files with 534 additions and 172 deletions

View File

@ -996,6 +996,250 @@ def repair_ezproxy_links():
pass
def download_web():
import argparse, os, re, time, hashlib, mimetypes, subprocess
from collections import deque
from urllib.parse import urlsplit, urlunsplit, urljoin
import posixpath as ppath
import requests
from lxml import html
SESSION = requests.Session()
SESSION.headers.update({
"User-Agent": "MiniXPathCrawler/1.0 (+for personal archiving; contact admin if issues)"
})
def normalize_path(path: str) -> str:
np = ppath.normpath(path or "/")
if not np.startswith("/"):
np = "/" + np
return np
def base_dir_of(path: str) -> str:
# Ensure trailing slash for folder comparison
if not path or path.endswith("/"):
bd = path or "/"
else:
bd = ppath.dirname(path) + "/"
bd = normalize_path(bd)
if not bd.endswith("/"):
bd += "/"
return bd
def canonical_url(u: str, drop_query=True) -> str:
sp = urlsplit(u)
path = normalize_path(sp.path)
if drop_query:
sp = sp._replace(path=path, query="", fragment="")
else:
sp = sp._replace(path=path, fragment="")
return urlunsplit(sp)
def same_folder_or_below(start_url: str, link_url: str) -> bool:
su = urlsplit(start_url); lu = urlsplit(link_url)
if su.scheme != lu.scheme or su.netloc != lu.netloc:
return False
bd = base_dir_of(su.path) # e.g., "/a/b/"
tp = normalize_path(lu.path) # e.g., "/a/b/page.html"
return (tp == bd[:-1]) or tp.startswith(bd)
def is_html_response(resp: requests.Response) -> bool:
ctype = resp.headers.get("Content-Type", "")
return "html" in ctype.lower()
def fetch_html(url: str, timeout=20):
try:
r = SESSION.get(url, timeout=timeout, allow_redirects=True)
except requests.RequestException:
return None, None
if r.status_code != 200 or not is_html_response(r):
return None, None
try:
doc = html.fromstring(r.content)
except Exception:
return None, None
# make links absolute for easier handling of images and hrefs
doc.make_links_absolute(r.url)
return r, doc
def safe_filename_from_url(u: str, default_ext=".bin") -> str:
# hash + best-effort extension
h = hashlib.sha1(u.encode("utf-8")).hexdigest()[:16]
ext = ""
path = urlsplit(u).path
if "." in path:
ext = "." + path.split(".")[-1].split("?")[0].split("#")[0]
if not re.match(r"^\.[A-Za-z0-9]{1,5}$", ext):
ext = ""
return h + (ext or default_ext)
def download_image(img_url: str, assets_dir: str) -> str | None:
try:
r = SESSION.get(img_url, timeout=20, stream=True)
except requests.RequestException:
return None
if r.status_code != 200:
return None
# extension: prefer from Content-Type
ext = None
ctype = r.headers.get("Content-Type", "")
if "/" in ctype:
ext_guess = mimetypes.guess_extension(ctype.split(";")[0].strip())
if ext_guess:
ext = ext_guess
fname = safe_filename_from_url(img_url, default_ext=ext or ".img")
os.makedirs(assets_dir, exist_ok=True)
fpath = os.path.join(assets_dir, fname)
try:
with open(fpath, "wb") as f:
for chunk in r.iter_content(65536):
if chunk:
f.write(chunk)
except Exception:
return None
return fpath
def html_fragment_from_xpath(doc, xpath_expr: str, assets_dir: str):
nodes = doc.xpath(xpath_expr)
if not nodes:
return None, None # (html_fragment, title)
# Remove <script>/<style> inside nodes
for n in nodes:
for bad in n.xpath(".//script|.//style|.//noscript"):
bad.getparent().remove(bad)
# Download images and rewrite src
for n in nodes:
for img in n.xpath(".//img[@src]"):
src = img.get("src")
if not src:
continue
local = download_image(src, assets_dir)
if local:
# Use relative path from markdown file location later (we'll keep md in parent of assets)
rel = os.path.join("assets", os.path.basename(local)).replace("\\", "/")
img.set("src", rel)
frag_html = "".join(html.tostring(n, encoding="unicode") for n in nodes)
# Title from <title> or first heading in fragment
doc_title = (doc.xpath("string(//title)") or "").strip()
if not doc_title:
h = html.fromstring(frag_html)
t2 = (h.xpath("string(//h1)") or h.xpath("string(//h2)") or "").strip()
doc_title = t2 or "Untitled"
return frag_html, doc_title
def html_to_markdown_with_pandoc(html_str: str) -> str:
try:
p = subprocess.run(
["pandoc", "-f", "html", "-t", "gfm"],
input=html_str.encode("utf-8"),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
if p.returncode == 0:
return p.stdout.decode("utf-8", errors="ignore")
# fallback to raw HTML if conversion failed
return html_str
except FileNotFoundError:
# pandoc missing; return raw HTML
return html_str
def build_docx_from_markdown(md_path: str, out_docx: str, resource_path: str):
# Create .docx with ToC
cmd = [
"pandoc",
"-s",
md_path,
"-o",
out_docx,
"--toc",
"--toc-depth=3",
f"--resource-path={resource_path}",
"--from=markdown+raw_html",
]
subprocess.run(cmd, check=True)
def crawl(start_url: str, xpath_expr: str, out_dir: str, max_pages: int, delay: float):
os.makedirs(out_dir, exist_ok=True)
assets_dir = os.path.join(out_dir, "assets")
os.makedirs(assets_dir, exist_ok=True)
visited = set()
q = deque([start_url])
md_sections = []
base_folder = base_dir_of(urlsplit(start_url).path)
while q and len(visited) < max_pages:
url = q.popleft()
canon = canonical_url(url)
if canon in visited:
continue
visited.add(canon)
resp, doc = fetch_html(url)
if doc is None:
print(f"[skip] Non-HTML or fetch failed: {url}")
continue
# Extract and rewrite images for the chosen XPath fragment
frag_html, title = html_fragment_from_xpath(doc, xpath_expr, assets_dir)
if frag_html:
md = html_to_markdown_with_pandoc(frag_html)
section = f"# {title}\n\n_Source: {resp.url}_\n\n{md}\n"
md_sections.append(section)
print(f"[ok] {resp.url}")
# Enqueue in-scope links (from the whole page)
for a in doc.xpath("//a[@href]"):
href = a.get("href")
if not href:
continue
absu = urljoin(resp.url, href)
# Drop fragments for comparison/enqueue
absu_nf = urlunsplit(urlsplit(absu)._replace(fragment=""))
if absu_nf in visited:
continue
if same_folder_or_below(start_url, absu_nf):
q.append(absu_nf)
time.sleep(delay)
merged_md = os.path.join(out_dir, "merged.md")
with open(merged_md, "w", encoding="utf-8") as f:
f.write("\n\n".join(md_sections))
out_docx = os.path.join(out_dir, "merged.docx")
try:
build_docx_from_markdown(merged_md, out_docx, out_dir)
except subprocess.CalledProcessError as e:
print("[warn] pandoc failed to create .docx:", e)
print(f"\nDone.\nMarkdown: {merged_md}\nWord: {out_docx}\nPages: {len(md_sections)} (in scope)")
myurl = "https://govt.westlaw.com/calregs/Browse/Home/California/CaliforniaCodeofRegulations?guid=I2A5DA5204C6911EC93A8000D3A7C4BC3&originationContext=documenttoc&transitionType=Default&contextData=(sc.Default)"
crawl(myurl, '//*[@id="co_contentColumn"]', "cache/content", 600, 0.65)
if __name__ == "__main__":
print ('')
@ -1005,6 +1249,7 @@ if __name__ == "__main__":
4: ['convert md to html', md_to_course ],
5: ['course download tester', test_forums ],
6: ['download all a courses pages', grab_course_pages],
7: ['quick site downloader', download_web],
17: ['repair ezproxy links', repair_ezproxy_links],
18: ['create pages from html files', make_pages_from_folder],
19: ['fetch support page', fetch_support_page],

View File

@ -12,6 +12,7 @@ from schedules import get_semester_schedule
#from pipelines import sems
from localcache import course_quick_stats, get_courses_in_term_local, course_student_stats, all_sem_courses_teachers, full_reload
from localcache2 import db, users_new_this_semester, users_new_this_2x_semester, course_from_id, user_ids_in_shell
from localcache2 import student_count, teacher_list, course_from_id, course_sched_entry_from_id
from collections import defaultdict
from semesters import find_term
@ -398,99 +399,6 @@ def course_term_summary_local(term="180",term_label="FA23"):
#print(info)
oo.write('\n</ul>\n')
from localcache2 import student_count, teacher_list, course_from_id, course_sched_entry_from_id
# Relevant stuff trying to see if its even being used or not
# relies on schedule being in database
def course_term_summary(term="289",term_label="FA25"):
print("Summary of %s" % term_label)
get_fresh = 1
courses = getCoursesInTerm(term, get_fresh, 0)
print("output to cache/term_summary.txt")
outp = codecs.open('cache/term_summary.txt','w','utf-8')
outp.write('id,name,view,type,state,sched_start,ilearn_start,sched_students,ilearn_students,num_teachers,teacher1,teacher2,teacher2\n')
for c in courses:
c_db = course_from_id(c['id'])
try:
ilearn_start = c_db['start_at']
s_db = course_sched_entry_from_id(c['id'])
except:
print(f"problem with this course: {c_db}")
continue
sched_start = ''
sched_students = ''
type = ''
if (s_db):
sched_start = s_db['start']
sched_students =s_db['act']
type = s_db['type']
#print(s_db)
num_students = student_count(c['id'])
tchr = teacher_list(c['id'])
tt = ','.join([x[1] for x in tchr])
line = f"{c['id']},{c['course_code']},{c['default_view']},{type},{c['workflow_state']},{sched_start},{ilearn_start},{sched_students},{num_students},{len(tchr)},{tt}"
print(line)
outp.write(line + "\n")
return
tup = tuple("id course_code default_view workflow_state".split(" "))
smaller = [ funcy.project(x , tup) for x in courses ]
#print(json.dumps(smaller, indent=2))
by_code = {}
(connection,cursor) = db()
(pub, not_pub) = funcy.split( lambda x: x['workflow_state'] == "available", smaller)
for S in smaller:
print(S)
by_code[ S['course_code'] ] = str(S) + "\n"
outp.write( str(S) + "\n" )
q = """SELECT c.id AS courseid, c.code, tt.name, c.state, COUNT(u.id) AS student_count FROM courses AS c
JOIN enrollment AS e ON e.course_id=c.id
JOIN users AS u ON u.id=e.user_id
JOIN ( SELECT c.id AS courseid, u.id AS userid, c.code, u.name FROM courses AS c
JOIN enrollment AS e ON e.course_id=c.id
JOIN users AS u ON u.id=e.user_id
WHERE c.canvasid=%s
AND e."type"="TeacherEnrollment" ) AS tt ON c.id=tt.courseid
WHERE c.canvasid=%s
AND e."type"="StudentEnrollment"
GROUP BY c.code ORDER BY c.state, c.code""" % (S['id'],S['id'])
result = cursor.execute(q)
for R in result:
print(R)
by_code[ S['course_code'] ] += str(R) + "\n"
outp.write( str(R) + "\n\n" )
pages = fetch(url + "/api/v1/courses/%s/pages" % S['id'])
by_code[ S['course_code'] ] += json.dumps(pages, indent=2) + "\n\n"
modules = fetch(url + "/api/v1/courses/%s/modules" % S['id'])
by_code[ S['course_code'] ] += json.dumps(modules, indent=2) + "\n\n"
print()
out2 = codecs.open('cache/summary2.txt','w', 'utf-8')
for K in sorted(by_code.keys()):
out2.write('\n------ ' + K + '\n' + by_code[K])
out2.flush()
return
#published = list(funcy.where( smaller, workflow_state="available" ))
#notpub = list(filter( lambda x: x['workflow_state'] != "available", smaller))
notpub_ids = [ x['id'] for x in notpub ]
#for ix in notpub_ids:
# # print(course_quick_stats(ix))
outp.write(json.dumps(courses, indent=2))
outp2 = codecs.open('cache/term_summary_pub.txt','w','utf-8')
outp2.write("PUBLISHED\n\n" + json.dumps(published, indent=2))
outp2.write("\n\n---------\nNOT PUBLISHED\n\n" + json.dumps(notpub, indent=2))
# Fetch all courses in a given term
def getCoursesInTerm(term=0,get_fresh=1,show=1,active=0): # a list
@ -594,8 +502,15 @@ def all_equal2(iterator):
def semester_cross_lister():
sem = "fa25"
term = 289
tt = find_term( input("term? (ex: fa25) ") )
if not tt or (not 'canvas_term_id' in tt) or (not 'code' in tt):
print(f"Couldn't find term.")
return
term = tt['canvas_term_id']
sem = tt['code']
xlist_filename = f"cache/{sem}_crosslist.csv"
checkfile = codecs.open('cache/xlist_check.html','w','utf-8')
checkfile.write('<html><body><table>\n')
@ -787,6 +702,108 @@ def xlist_ii(parasite_id,host_id,new_name,new_code):
print("\n\n")
# Relevant stuff trying to see if its even being used or not
# relies on schedule being in database
def course_term_summary():
term = find_term( input("term? (ex: fa25) ") )
if not term or (not 'canvas_term_id' in term) or (not 'code' in term):
print(f"Couldn't find term.")
return
term = term['canvas_term_id']
SEM = term['code']
print(f"Summary of {SEM}")
get_fresh = 1
courses = getCoursesInTerm(term, get_fresh, 0)
print(f"output to cache/term_summary_{term}.csv")
outp = codecs.open(f'cache/term_summary_{term}.csv','w','utf-8')
outp.write('id,name,view,type,state,sched_start,ilearn_start,sched_students,ilearn_students,num_teachers,teacher1,teacher2,teacher2\n')
for c in courses:
c_db = course_from_id(c['id'])
try:
ilearn_start = c_db['start_at']
s_db = course_sched_entry_from_id(c['id'])
except:
print(f"problem with this course: {c_db}")
continue
sched_start = ''
sched_students = ''
type = ''
if (s_db):
sched_start = s_db['start']
sched_students =s_db['act']
type = s_db['type']
#print(s_db)
num_students = student_count(c['id'])
tchr = teacher_list(c['id'])
tt = ','.join([x[1] for x in tchr])
line = f"{c['id']},{c['course_code']},{c['default_view']},{type},{c['workflow_state']},{sched_start},{ilearn_start},{sched_students},{num_students},{len(tchr)},{tt}"
print(line)
outp.write(line + "\n")
return
tup = tuple("id course_code default_view workflow_state".split(" "))
smaller = [ funcy.project(x , tup) for x in courses ]
#print(json.dumps(smaller, indent=2))
by_code = {}
(connection,cursor) = db()
(pub, not_pub) = funcy.split( lambda x: x['workflow_state'] == "available", smaller)
for S in smaller:
print(S)
by_code[ S['course_code'] ] = str(S) + "\n"
outp.write( str(S) + "\n" )
q = """SELECT c.id AS courseid, c.code, tt.name, c.state, COUNT(u.id) AS student_count FROM courses AS c
JOIN enrollment AS e ON e.course_id=c.id
JOIN users AS u ON u.id=e.user_id
JOIN ( SELECT c.id AS courseid, u.id AS userid, c.code, u.name FROM courses AS c
JOIN enrollment AS e ON e.course_id=c.id
JOIN users AS u ON u.id=e.user_id
WHERE c.canvasid=%s
AND e."type"="TeacherEnrollment" ) AS tt ON c.id=tt.courseid
WHERE c.canvasid=%s
AND e."type"="StudentEnrollment"
GROUP BY c.code ORDER BY c.state, c.code""" % (S['id'],S['id'])
result = cursor.execute(q)
for R in result:
print(R)
by_code[ S['course_code'] ] += str(R) + "\n"
outp.write( str(R) + "\n\n" )
pages = fetch(url + "/api/v1/courses/%s/pages" % S['id'])
by_code[ S['course_code'] ] += json.dumps(pages, indent=2) + "\n\n"
modules = fetch(url + "/api/v1/courses/%s/modules" % S['id'])
by_code[ S['course_code'] ] += json.dumps(modules, indent=2) + "\n\n"
print()
out2 = codecs.open('cache/summary2.txt','w', 'utf-8')
for K in sorted(by_code.keys()):
out2.write('\n------ ' + K + '\n' + by_code[K])
out2.flush()
return
#published = list(funcy.where( smaller, workflow_state="available" ))
#notpub = list(filter( lambda x: x['workflow_state'] != "available", smaller))
notpub_ids = [ x['id'] for x in notpub ]
#for ix in notpub_ids:
# # print(course_quick_stats(ix))
outp.write(json.dumps(courses, indent=2))
outp2 = codecs.open('cache/term_summary_pub.txt','w','utf-8')
outp2.write("PUBLISHED\n\n" + json.dumps(published, indent=2))
outp2.write("\n\n---------\nNOT PUBLISHED\n\n" + json.dumps(notpub, indent=2))
def course_term_summary_2():
lines = codecs.open('cache/term_summary.txt','r','utf-8').readlines()
output = codecs.open('cache/term_summary.html','w','utf-8')
@ -806,29 +823,8 @@ def course_term_summary_2():
def course_term_summary_3():
# doesn't work cause of single, not double quotes?!?!
lines = codecs.open('cache/term_summary.txt','r','utf-8').readlines()
output = codecs.open('cache/term_summary.html','w','utf-8')
for L in lines:
try:
L = L.strip()
print(L)
ll = json.loads(L)
print(ll)
print(ll['course_code'])
if ll['workflow_state'] == 'unpublished':
ss = "<br />Course: <a href='%s' target='_blank'>%s</a><br />" % ("https://ilearn.gavilan.edu/courses/"+str(ll['id']), ll['course_code'] )
output.write( ss )
print(ss+"\n")
except Exception as e:
print(e)
# check number of students and publish state of all shells in a term
'''
def all_semester_course_sanity_check():
term = "su25"
target_start = "6-14"
@ -895,7 +891,7 @@ def all_semester_course_sanity_check():
htmlout.write(h)
htmlout.write('</table></body></html>\n')
print(f"wrote to {outputfile}")
'''
def eslCrosslister():
@ -1596,6 +1592,11 @@ def overview_start_dates():
output = codecs.open(f"cache/overview_semester_shells_{SEM}.csv","w","utf-8")
get_fresh = 0
if not get_fresh:
gf = input('Fetch new list of semester courses? (y/n) ')
if gf=='y':
get_fresh = 1
# get list of online course shells
c = getCoursesInTerm(TERM,get_fresh,0)
@ -1609,7 +1610,7 @@ def overview_start_dates():
else:
print( f"---NO CRN IN: {C['name']} -> {C}" )
header = f"id,shell_shortname,sched_start,shell_start,shell_end,shell_restrict_view_dates,shell_restrict_view_dates,shell_state,shell_numstudents,shell_numsections"
header = f"id,shell_shortname,type,enrolled,max,sched_start,shell_start,shell_end,shell_restrict_view_dates,shell_restrict_view_dates,shell_state,shell_numstudents,shell_numsections"
output.write(header + "\n")
print("\n\n" + header)
@ -1644,12 +1645,17 @@ def overview_start_dates():
enrollments = fetch(ss, params={"enrollment_type[]":"student"})
shell_numstudents = len(enrollments)
# get teachers
s2 = f"{url}/api/v1/courses/{this_id}/users"
teachers = fetch(s2, params={"enrollment_type[]":"teacher"})
shell_teachers = [(x['id'],x['name']) for x in teachers]
# cross-listed?
sec = f"{url}/api/v1/courses/{this_id}/sections"
sections = fetch(sec, params={"include[]":"total_students"})
shell_numsections = len(sections)
content = f"{this_id},{shell_shortname},{d_start},{shell_start},{shell_end},{shell_restrict_view_dates},{shell_restrict_view_dates},{shell_state},{shell_numstudents},{shell_numsections}"
content = f"{this_id},{shell_shortname},{S['type']},{S['act']},{S['cap']},{d_start},{shell_start},{shell_end},{shell_restrict_view_dates},{shell_restrict_view_dates},{shell_state},{shell_numstudents},{shell_numsections},{S['teacher']},{shell_teachers}"
output.write(content + "\n")
print(content)
@ -2314,24 +2320,33 @@ def my_nav_filter(row):
def clean_course_nav_setup_semester(section=0):
t = find_term( input("term? (ex: fa25) ") )
if not t or (not 'canvas_term_id' in t) or (not 'code' in t):
print(f"Couldn't find term.")
return
term = t['canvas_term_id']
SEM = t['code']
print("Fetching list of all active courses")
term = 289
c = getCoursesInTerm(term,1,0)
print(c)
#print(c)
ids = []
courses = {}
data = {'hidden':True}
pause = 1
nav_out = codecs.open(f'cache/course_nav_summary_{term}.csv','w','utf-8')
nav_out = codecs.open(f'cache/course_nav_summary_{SEM}.csv','w','utf-8')
nav_writer = csv.writer(nav_out)
columns = "id name code start state label position hidden visibility type url".split(" ")
nav_writer.writerow(columns)
for C in c:
try:
print( f'Fetching course {json.dumps(C,indent=2)}' )
#print( f'Fetching course {json.dumps(C,indent=2)}' )
parts = C['sis_course_id'].split('-')
print(C['name'])
courses[str(C['id'])] = C
@ -2350,7 +2365,6 @@ def clean_course_nav_setup_semester(section=0):
nav_out.flush()
except Exception as err:
print(f"Exception: {err}")
exit()
def fetch_rubric_scores(course_id=16528, assignment_id=1):
@ -2743,7 +2757,7 @@ if __name__ == "__main__":
9: ['Simple list of course data, search by sis_id', course_search_by_sis],
10: ['Overview of a term', course_term_summary],
20: ['process the semester overview output (10)', course_term_summary_2],
55: ['Check all courses & their sections in semester', all_semester_course_sanity_check],
##55: ['Check all courses & their sections in semester', all_semester_course_sanity_check],
11: ['Enroll ORIENTATION and STEM student shells after catching up database.', enroll_o_s_students],
12: ['Enroll stem students', enroll_stem_students_live],
@ -2762,7 +2776,7 @@ if __name__ == "__main__":
56: ['Remove course evals all sections', remove_evals_all_sections],
52: ['Cleanup semester / course nav', clean_course_nav_setup_semester], # not done, just lists nav right now
29: ['Overview summer start dates',overview_start_dates],
29: ['* Overview semester start dates',overview_start_dates],
31: ['Fine tune term dates and winter session', course_by_depts_terms],
32: ['Set summer start dates', set_custom_start_dates],
#32: ['Cross-list classes', xlist ],

View File

@ -79,7 +79,7 @@ def clean(st):
if not T.name in ok:
if not T.name in seen:
seen.append(T.name)
print("- %s" % T.name)
#print("- %s" % T.name)
#print(seen)
T.unwrap()
else:
@ -159,7 +159,7 @@ def single_program_path_parse(c):
print(c["attributes"]["entityId"])
return (c["attributes"]["entityId"], pathstyle(c))
else:
print("I couldn't recognize a program in that")
print(f"I couldn't recognize a program in: {json.dumps(c,indent=2)}")
ooops = codecs.open('cache/programs/failedcourse_%i.json' % num_failed_course, 'w', 'utf-8')
ooops.write(json.dumps(c,indent=2))
ooops.close()
@ -875,6 +875,7 @@ def extract_digits(input_string):
:param input_string: The string to process.
:return: An integer containing only the digits from the input string.
"""
#return input_string
digits_only = ''.join(char for char in input_string if char.isdigit())
return int(digits_only) if digits_only else 0

View File

@ -24,25 +24,14 @@ import pandas as pd
from pipelines import fetch, url, header
from outcomes import quick_add_course_outcomes, code_from_ilearn_name, all_linked_outcomes_in_term
from courses import getCoursesInTerm, getCourses
from semesters import find_term
import codecs, json, sys, re, csv, requests, textwrap
from path_dict import PathDict
outputfile = ''
csvwriter = ''
# 289 2025 Fall
# 288 2025 Summer
# 287 2025 Spring
# 286 2025 Winter
# 184 2024 Fall
# 183 2024 Summer
# 181 2024 Spring
# 182 2024 Winter
TERM = 287
# TERM = 286 # fall = 287
TERM = 0
def escape_commas(s):
if ',' in s:
@ -151,7 +140,7 @@ def outcomes_in_shell(course_id):
return root_og, the_outcomes, g2
def ilearn_shell_slo_to_csv(shell_slos):
def ilearn_shell_slo_to_csv(shell_slos,TERM):
L = ['canvasid','name','crn','has_outcomes',]
for i in range(1,11):
L.append("o%i_id" % i)
@ -198,11 +187,12 @@ def ilearn_shell_slo_to_csv(shell_slos):
except Exception as e:
print(f"*** Exception {e} with {S}\n\n")
df.to_csv(f'cache/outcome_{TERM}.csv')
print(df)
df.to_csv(f'cache/outcome_{TERM}.csv')
print(f'Wrote {TERM} shell slo contents to: cache/outcome_{TERM}.csv')
def get_outcomes_term_index():
def get_outcomes_term_index(TERM):
global outputfile, csvwriter
NUM_THREADS = 20
get_fresh = 0
@ -235,7 +225,7 @@ def get_outcomes_term_index():
print(future.result())
raw_log.write( json.dumps(future.result(),indent=2) + "\n" )
csvfile.close()
ilearn_shell_slo_to_csv(output)
ilearn_shell_slo_to_csv(output,TERM)
@ -373,7 +363,7 @@ def add_o_dept_dry_run():
add_o_dept(1)
def add_o_whole_term():
course_groups = full_term_overview(0)
course_groups = full_term_overview(term, 0)
dept_shells_to_add = [ a for a in course_groups['no outcomes'] ]
sorted_dept_shells_to_add = sorted(dept_shells_to_add, key=lambda x: f"{x['dept']}{x['code']}")
@ -388,10 +378,20 @@ def add_o_whole_term():
print(f"Failed on {shell['id']}: {e}")
def add_o_dept(dry_run=0):
def add_o_dept(term=0, dry_run=0):
if not term:
tt = find_term( input("term? (ex: fa25) ") )
if not tt or (not 'canvas_term_id' in tt) or (not 'code' in tt):
print(f"Couldn't find term.")
return
term = tt['canvas_term_id']
sem = tt['code']
d = input("Enter dept or deps separated with a space > ")
d_list = d.split(' ')
course_groups = full_term_overview(0)
course_groups = full_term_overview(term, 0)
dept_shells_to_add = [ a for a in course_groups['no outcomes'] if a['dept'] in d_list ]
sorted_dept_shells_to_add = sorted(dept_shells_to_add, key=lambda x: f"{x['dept']}{x['code']}")
@ -417,7 +417,7 @@ def remove_all_bad_points():
remove_old_outcomes(shell['id'])
def full_term_overview(verbose=1):
def full_term_overview(TERM, verbose=1):
out2 = codecs.open(f'cache/slo/slo_status_{TERM}.json','w','utf-8')
out3 = codecs.open(f'cache/slo/slo_status_{TERM}.csv','w','utf-8')
csv_fields = 'outcome_count,id,name,dept,code,crn,assessed_count,points_ok'.split(',')
@ -512,8 +512,20 @@ def full_term_overview(verbose=1):
return course_groups
def fetch_term_outcomes_and_report():
get_outcomes_term_index()
full_term_overview()
global TERM
if not TERM:
tt = find_term( input("term? (ex: fa25) ") )
if not tt or (not 'canvas_term_id' in tt) or (not 'code' in tt):
print(f"Couldn't find term.")
return
TERM = tt['canvas_term_id']
sem = tt['code']
get_outcomes_term_index(TERM)
full_term_overview(TERM)
if __name__ == "__main__":
options = { 1: ['Refresh term outcome list & report', fetch_term_outcomes_and_report],

View File

@ -2,10 +2,9 @@
#from time import strptime
#from util import UnicodeDictReader
import codecs, json, requests, re, csv, datetime, pysftp, os, jsondiff, os.path
import sys, shutil, hmac, hashlib, base64, schedule, time, pathlib, datetime
import sys, shutil, hmac, hashlib, base64, schedule, time, pathlib
#import pdb
from datetime import timedelta
import datetime
#from collections import defaultdict
from canvas_secrets import apiKey, apiSecret, FTP_SITE, FTP_USER, FTP_PW, url, domain, account_id, header, header_media, g_id, g_secret
@ -221,7 +220,7 @@ async def canvas_data_2024():
#connection_string: str = "postgresql://postgres:rolley34@192.168.1.6/db"
# todo: use secrets
connection_string: str = "postgresql://postgres:rolley34@deep1/db"
connection_string: str = "postgresql://postgres:rolley34@192.168.1.199/db"
desired_tables = "users,courses,communication_channels,context_modules,conversation_message_participants,conversation_messages,conversation_participants,conversations,course_sections,enrollment_states,enrollment_dates_overrides,enrollment_terms,enrollments,learning_outcome_groups,learning_outcome_question_results,learning_outcomes,pseudonyms,quizzes,scores,submissions,submission_versions,wiki_pages,wikis".split(',')
credentials = Credentials.create(client_id=client_id, client_secret=client_secret)
@ -1130,7 +1129,7 @@ def get_doc_generic(docid, bracket=1, verbose=0):
def process_reg_history():
def process_reg_history(term='fa25'):
from collections import defaultdict
from itertools import groupby
from operator import itemgetter
@ -1182,10 +1181,45 @@ def process_reg_history():
changes[crn].append((dt, f"Waitlist exceeds 10: {n['waitlisted']}."))
return changes
def time_to_iso(s):
return datetime.datetime.strptime(s, "%Y-%m-%dT%H-%M").isoformat()
def detect_changes_structured(prev, curr):
changes = defaultdict(list)
all_crns = prev.keys() | curr.keys()
for crn in all_crns:
o, n = prev.get(crn), curr.get(crn)
if not o:
changes[crn].append({'time':time_to_iso(n['datetime']), "type":'section update', 'message': "Section was added."})
elif not n:
changes[crn].append(
{'time':time_to_iso(o['datetime']), "type":'section update', 'message': "Section was removed.",
'value': o['enrolled'], 'capacity': o['max'], })
else:
dt = time_to_iso(n['datetime'])
if o['teacher'] != n['teacher']:
changes[crn].append({'time':dt, "type":'teacher_change',
'message': f"Teacher changed from {o['teacher']} to {n['teacher']}.",
'old_teacher': o['teacher'], 'new_teacher': n['teacher'], })
if o['enrolled'] != n['enrolled']:
crossed, percent = crossed_threshold(o['enrolled'], n['enrolled'], n['max'])
if crossed:
changes[crn].append({'time':dt, "type":'enrollment_milestone',
'message': f"Enrollment crossed {percent}% ({n['enrolled']}/{n['max']}).",
'percent':percent,'value':n['enrolled'],'capacity':n['max'] })
if int(n['waitlisted']) > 10 and o['waitlisted'] < n['waitlisted']:
changes[crn].append({'time':dt, "type":'enrollment_milestone',
'message': f"Waitlist exceeds 10: {n['waitlisted']}).",
'value':n['waitlisted']})
return changes
def process_diff_timeline(path):
snapshots = read_grouped_csv(path)
timeline = sorted(snapshots.keys())
timeline_diffs = []
timeline_diffs_structured = []
course_names = {} # crn -> latest known course name
for i in range(1, len(timeline)):
@ -1199,30 +1233,75 @@ def process_reg_history():
delta = detect_changes(prev, curr)
timeline_diffs.append(delta)
delta_structured = detect_changes_structured(prev,curr)
timeline_diffs_structured.append(delta_structured)
# Flatten and group by crn
crn_changes = defaultdict(list)
for delta in timeline_diffs:
for crn, changes in delta.items():
crn_changes[crn].extend(changes)
# Flatten and group by crn
crn_changes_structured = defaultdict(list)
for delta in timeline_diffs_structured:
for crn, changes in delta.items():
crn_changes_structured[crn].extend(changes)
# Sort changes for each CRN by datetime
for crn in crn_changes:
crn_changes[crn].sort(key=lambda x: x[0])
return crn_changes, course_names
# Sort changes for each CRN by datetime
for crn in crn_changes_structured:
crn_changes[crn].sort(key=lambda x: x[0])
return crn_changes, crn_changes_structured, course_names
output1 = codecs.open('cache/reg_timeline_fa25.txt','w','utf-8')
changes, course_names = process_diff_timeline("cache/reg_history_fa25.csv")
fresh_history = requests.get(f"http://gavilan.cc/schedule/reg_history_{term}.csv").text
fresh_file = codecs.open(f'cache/reg_history_{term}.csv','w','utf-8')
fresh_file.write(fresh_history)
fresh_file.close()
output1 = codecs.open(f'cache/reg_timeline_{term}.txt','w','utf-8')
output2 = codecs.open(f'cache/reg_timeline_{term}.json','w','utf-8')
changes, changes_structured, course_names = process_diff_timeline(f"cache/reg_history_{term}.csv")
# once for plain text
for crn in sorted(changes, key=lambda c: course_names.get(c, "")):
course = course_names.get(crn, "")
course_output = {'code': course, 'crn':crn,'events':[]}
print(f"\n{course} (CRN {crn}):")
output1.write(f"\n{course} (CRN {crn}):\n")
for dt, msg in changes[crn]:
print(f" [{dt}] {msg}")
output1.write(f" [{dt}] {msg}\n")
course_output['events'].append({'message':msg, 'time':time_to_iso(dt)})
def recreate_reg_data():
# again for structured
crn_list = []
for crn in sorted(changes_structured, key=lambda c: course_names.get(c, "")):
course = course_names.get(crn, "")
course_output = {'code': course, 'crn':crn,'events':changes_structured[crn]}
crn_list.append(course_output)
output2.write( json.dumps(crn_list,indent=2) )
output2.close()
def recreate_all():
for x in 'sp20 su20 fa20 sp21 su21 fa21 sp22 su22 fa22 sp23 su23 fa23 sp24'.split(' '):
try:
recreate_reg_data(x)
except Exception as e:
print(f'Failed on {x} with: {e}')
def recreate_reg_data(term="fa25"):
from collections import defaultdict
from datetime import datetime
@ -1268,20 +1347,30 @@ def recreate_reg_data():
return ['crn'] + headers, table
with open("cache/reg_history_fa25.csv", newline='') as f:
fieldnames = ['datetime', 'crn', 'course', 'teacher', 'max', 'enrolled', 'waitlistmax', 'waitlisted']
reader = csv.DictReader(f, fieldnames=fieldnames)
rows = list(reader)
#with open(f"cache/reg_history_{term}.csv", newline='') as f:
from io import StringIO
url = f"https://gavilan.cc/schedule/reg_history_{term}.csv"
# Download
resp = requests.get(url)
resp.raise_for_status() # raises if bad status
# Wrap the text in a file-like object
f = StringIO(resp.text)
fieldnames = ['datetime', 'crn', 'course', 'teacher', 'max', 'enrolled', 'waitlistmax', 'waitlisted']
reader = csv.DictReader(f, fieldnames=fieldnames)
rows = list(reader)
latest, headers = reduce_latest_per_day(rows)
header_row, table = pivot_table(latest, headers)
with open("cache/reg_data_fa25.csv", "w", newline='') as f:
with open(f"cache/reg_data_{term}.csv", "w", newline='') as f:
writer = csv.writer(f)
writer.writerow(header_row)
writer.writerows(table)
if __name__ == "__main__":
print ('')
@ -1289,7 +1378,8 @@ if __name__ == "__main__":
2: ['Get canvas data 2024 style', canvas_data_2024_run ],
3: ['Set up canvas data 2024 style', setup_canvas_data_2024_run],
4: ['Narrative timeline of section updates', process_reg_history],
5: ['Recreate reg_data from full reg history', recreate_reg_data],
5: ['Create narrative format all semesters', recreate_all],
6: ['Recreate reg_data from full reg history', recreate_reg_data],
}
'''1: ['Re-create schedule csv and json files from raw html',recent_schedules] ,

View File

@ -160,7 +160,7 @@ def dump():
print(json.dumps(sems_by_short_name,indent=2))
GET_FRESH_TERMS = 0
GET_FRESH_TERMS = 1
if (GET_FRESH_TERMS):
from pipelines import url, fetch_collapse

View File

@ -1201,13 +1201,13 @@ def visualize_course_modes_multi_semester():
import plotly.express as px
from plotly.subplots import make_subplots
seasons = {'sp':'30','su':'50','fa':'70'}
semcodes = "sp18 su18 fa18 sp19 su19 fa19 sp20 su20 fa20 sp21 su21 fa21 sp22 su22 fa22 sp23 su23 fa23 sp24 su24 fa24".split(" ")
semcodes = "sp18 su18 fa18 sp19 su19 fa19 sp20 su20 fa20 sp21 su21 fa21 sp22 su22 fa22 sp23 su23 fa23 sp24 su24 fa24 sp25 su25 fa25".split(" ")
sems = { x:'20' + x[2:] + seasons[x[:2]] for x in semcodes }
sem_dfs = []
sem_dfs_depts = []
for s in sems.keys():
print(f"fetching {s}")
sched = requests.get(f"http://gavilan.cc/schedule/{s}_sched_expanded.json").json()
sched = requests.get(f"https://gavilan.cc/schedule/{s}_sched_expanded.json").json()
for crs in sched:
if 'extra' in crs: del crs['extra']
crs['dept'] = crs['code'].split(' ')[0]

View File

@ -1138,7 +1138,7 @@ def print_a_calendar():
print(' '.join([f"{day:<2}" for day in week_days]) + f" {sem_value:<2}")
# Example usage
semesters = [ "su23,06/12,6", "fa23,08/28,16" ]
semesters = [ "fa25,08/25,16", ]
l_semesters = []
for sem in semesters:
column_label, start_date, num_weeks = sem.split(',')
@ -1155,7 +1155,7 @@ def word_calendar():
import datetime
# Define the start date of semester
start_date = datetime.date(2024, 7, 1)
start_date = datetime.date(2025, 8, 25)
# Prepare a list of 18 weeks beginning from the start date
dates = [start_date + datetime.timedelta(weeks=x) for x in range(18)]