2253 lines
83 KiB
Python
2253 lines
83 KiB
Python
import requests,json,os,re, bisect, csv, codecs
|
|
import sortedcontainers as sc
|
|
from collections import defaultdict
|
|
from toolz.itertoolz import groupby
|
|
#from docx.shared import Inches
|
|
#from docx import Document
|
|
#import docx
|
|
from durable.lang import *
|
|
from durable.engine import *
|
|
from pampy import match, _
|
|
from bs4 import BeautifulSoup as bs
|
|
import pandas as pd
|
|
import sys, locale, re
|
|
from pipelines import getSemesterSchedule
|
|
|
|
from canvas_secrets import cq_url, cq_user, cq_pasw
|
|
|
|
|
|
#sys.stdout = codecs.getwriter(locale.getpreferredencoding())(sys.stdout)
|
|
|
|
TRACING = codecs.open('cache/progdebug.txt','w','utf-8')
|
|
param = "?method=getCourses"
|
|
|
|
|
|
def dbg(x):
|
|
if TRACING: TRACING.write(' + %s\n' % str(x))
|
|
|
|
sems = ['sp20','fa19', 'su19','sp19']
|
|
|
|
|
|
|
|
filen = 1
|
|
def another_request(url,startat):
|
|
global cq_user, cq_pasw, TRACING
|
|
newparam = "&skip=" + str(startat)
|
|
print((url+newparam))
|
|
r = requests.get(url+newparam, auth=(cq_user,cq_pasw))
|
|
try:
|
|
TRACING.write(r.text + "\n\n")
|
|
TRACING.flush()
|
|
mydata = json.loads(r.text)
|
|
except Exception as e:
|
|
print("Couldn't read that last bit")
|
|
print((r.text))
|
|
print(e)
|
|
return 0,0,[]
|
|
|
|
size = mydata['resultSetMetadata']['ResultSetSize']
|
|
endn = mydata['resultSetMetadata']['EndResultNum']
|
|
items = mydata['entityInstances']
|
|
print((' Got ' + str(size) + ' instances, ending at item number ' + str(endn)))
|
|
return size,endn,items
|
|
def fetch_all_classes():
|
|
global cq_url,param
|
|
size = 100
|
|
endn = 0
|
|
filen = 1
|
|
while(size > 99):
|
|
size, endn, items = another_request(cq_url+param,endn)
|
|
out = open('cache/courses/classes_'+str(filen)+'.txt','w')
|
|
out.write(json.dumps(items,indent=2))
|
|
out.close()
|
|
filen += 1
|
|
print("Written to 'cache/classes....")
|
|
|
|
def fetch_all_programs():
|
|
global cq_url
|
|
size = 100
|
|
endn = 0
|
|
filen = 1
|
|
param = "?returnFormat=json&method=getPrograms&status=Active"
|
|
while(size > 99):
|
|
size, endn, items = another_request(cq_url+param,endn)
|
|
out = open('cache/programs/programs_'+str(filen)+'.txt','w')
|
|
out.write(json.dumps(items,indent=4))
|
|
out.close()
|
|
filen += 1
|
|
print("Written to 'cache/programs....")
|
|
def sortable_class(li):
|
|
dept = li[1]
|
|
rest = ''
|
|
|
|
print(li)
|
|
# another dumb special case / error
|
|
if li[2] == "ASTR 1L": li[2] = "1L"
|
|
# little error case here
|
|
n = re.match(r'([A-Za-z]+)(\d+)',li[2])
|
|
if n:
|
|
num = int(n.group(2))
|
|
else:
|
|
m = re.match(r'(\d+)([A-Za-z]+)$',li[2])
|
|
if m:
|
|
num = int(m.group(1))
|
|
rest = m.group(2)
|
|
else:
|
|
num = int(li[2])
|
|
if num < 10: num = '00'+str(num)
|
|
elif num < 100: num = '0'+str(num)
|
|
else: num = str(num)
|
|
return dept+num+rest
|
|
|
|
def c_name(c):
|
|
delivery = set()
|
|
units = []
|
|
slos = []
|
|
hybridPct = ''
|
|
active = 'Active'
|
|
id = c['entityMetadata']['entityId']
|
|
if c['entityMetadata']['status'] != 'Active':
|
|
active = 'Inactive'
|
|
#return ()
|
|
for r in c['entityFormData']['rootSections']:
|
|
|
|
if r['attributes']['sectionName'] == 'Course Description':
|
|
|
|
for ss in r['subsections']:
|
|
for f in ss['fields']:
|
|
|
|
if f['attributes']['fieldName'] == 'Course Discipline':
|
|
dept = f['lookUpDisplay']
|
|
if f['attributes']['fieldName'] == 'Course Number':
|
|
num = f['fieldValue']
|
|
if f['attributes']['fieldName'] == 'Course Title':
|
|
title = f['fieldValue']
|
|
#print "\n" + title
|
|
if f['attributes']['fieldName'] == 'Course Description':
|
|
desc = re.sub(r'\n',' ', f['fieldValue'])
|
|
if r['attributes']['sectionName'] == 'Units/Hours/Status':
|
|
for ss in r['subsections']:
|
|
if ss['attributes']['sectionName'] == '':
|
|
for f in ss['fields']:
|
|
if f['attributes']['fieldName'] == 'Minimum Units' and f['fieldValue'] not in units:
|
|
units.insert(0,f['fieldValue'])
|
|
if f['attributes']['fieldName'] == 'Maximum Units' and f['fieldValue'] and f['fieldValue'] not in units:
|
|
units.append(f['fieldValue'])
|
|
|
|
|
|
# Newer entered courses have this filled out
|
|
if r['attributes']['sectionName'] == 'Distance Education Delivery':
|
|
for ss in r['subsections']:
|
|
if ss['attributes']['sectionName'] == 'Distance Education Delivery':
|
|
for ssa in ss['subsections']:
|
|
for f in ssa['fields']:
|
|
if f['attributes']['fieldName'] == 'Delivery Method':
|
|
delivery.add(f['lookUpDisplay'])
|
|
if ss['attributes']['sectionName'] == "":
|
|
if ss['fields'][0]['attributes']['fieldName'] == "If this course is Hybrid, what percent is online?":
|
|
hybridPct = str(ss['fields'][0]['fieldValue'])
|
|
|
|
# Older ones seem to have it this way
|
|
if r['attributes']['sectionName'] == 'Distance Education':
|
|
for ss in r['subsections']:
|
|
for f2 in ss['fields']:
|
|
if 'fieldName' in f2['attributes'] and f2['attributes']['fieldName'] == 'Methods of Instruction':
|
|
#print f2['fieldValue']
|
|
if f2['fieldValue'] == 'Dist. Ed Internet Delayed':
|
|
delivery.add('Online')
|
|
|
|
# SLO
|
|
if r['attributes']['sectionName'] == 'Student Learning Outcomes':
|
|
for ss in r['subsections']:
|
|
if 'subsections' in ss:
|
|
if ss['attributes']['sectionName'] == 'Learning Outcomes':
|
|
for s3 in ss['subsections']:
|
|
for ff in s3['fields']:
|
|
if ff['attributes']['fieldName'] == 'Description':
|
|
slos.append(ff['fieldValue'])
|
|
|
|
#print ff
|
|
#[0]['fields']:
|
|
#print ff['fieldValue']
|
|
#for f2 in ss['fields']:
|
|
# if 'fieldName' in f2['attributes'] and f2['attributes']['fieldName'] == 'Methods of Instruction':
|
|
# if f2['fieldValue'] == 'Dist. Ed Internet Delayed':
|
|
# delivery.append('online(x)')
|
|
|
|
if len(units)==1: units.append('')
|
|
if len(delivery)==0: delivery.add('')
|
|
u0 = 0
|
|
try:
|
|
u0 = units[0]
|
|
except:
|
|
pass
|
|
|
|
u1 = 0
|
|
try:
|
|
u1 = units[2]
|
|
except:
|
|
pass
|
|
|
|
return id,dept,num,active,title,u0,u1,'/'.join(delivery),hybridPct,desc,slos
|
|
|
|
|
|
def show_classes(createoutput=1):
|
|
max_active = {} # hold the id of the class if seen. only include the highest id class in main list.
|
|
used_course = {} # hold the actual course info, the version we'll actually use.
|
|
slo_by_id = {} # values are a list of slos.
|
|
slo_by_id_included = {} # just the ids of active or most recent versions.
|
|
#tmp = codecs.open('cache/course_temp.txt','w','utf-8')
|
|
for f in os.listdir('cache/courses'):
|
|
if re.search('classes_',f):
|
|
print(f)
|
|
cls = json.loads(open('cache/courses/'+f,'r').read())
|
|
for c in cls:
|
|
dir_data = list(c_name(c))
|
|
#tmp.write(str(dir_data) + "\n\n")
|
|
slo_by_id[dir_data[0]] = dir_data[10] #
|
|
info = list(map(str,dir_data[:10]))
|
|
info.append(dir_data[10])
|
|
#pdb.set_trace()
|
|
#print info
|
|
course_key = sortable_class(info)
|
|
curqnt_id = int(info[0])
|
|
if course_key in max_active:
|
|
if curqnt_id < max_active[course_key]:
|
|
continue
|
|
max_active[course_key] = curqnt_id
|
|
used_course[course_key] = info
|
|
|
|
if not createoutput: return 1
|
|
|
|
# now we have the ideal version of each course
|
|
all = sc.SortedList(key=sortable_class)
|
|
for key,crs in list(used_course.items()): all.add(crs)
|
|
|
|
by_dept = groupby(1,all)
|
|
|
|
t = open('cache/courses/index.json','w')
|
|
t.write(json.dumps(sorted(by_dept.keys())))
|
|
|
|
u = open('cache/courses/slos.json','w')
|
|
|
|
for d in list(by_dept.keys()):
|
|
s = open('cache/courses/' + d.lower() + '.json','w')
|
|
for course in by_dept[d]:
|
|
try:
|
|
course.append(slo_by_id[int(d[0])])
|
|
except:
|
|
pass
|
|
s.write(json.dumps(by_dept[d], indent=2))
|
|
s.close()
|
|
for c in by_dept[d]:
|
|
ss = slo_by_id[int(c[0])] # [ x.encode('ascii ','ignore') for x in slo_by_id[int(c[0])] ]
|
|
slo_by_id_included[int(c[0])] = ss
|
|
|
|
u.write( json.dumps(slo_by_id_included, indent=2))
|
|
|
|
def clean_d_name(d):
|
|
d = d.lower()
|
|
d = re.sub(r'[\&\(\)\.\/\:]','',d)
|
|
d = re.sub(r'[\s\-]+','_',d)
|
|
return d
|
|
|
|
def show_programs():
|
|
allprogs = defaultdict(list)
|
|
dept_index = set([('Liberal Arts','liberal_arts'),])
|
|
prog_index = defaultdict(list)
|
|
for f in os.listdir('cache/programs'):
|
|
if re.search('programs_',f):
|
|
print(f)
|
|
pro = json.loads(open('cache/programs/'+f,'r').read())
|
|
for c in pro:
|
|
this_prog = prog_take_4(c)
|
|
if not 'dept' in this_prog: this_prog['dept'] = 'Liberal Arts'
|
|
if not 'type' in this_prog:
|
|
this_prog['type'] = '' #print "*** Why no type?"
|
|
this_prog['key'] = clean_d_name(this_prog['title']+'_'+this_prog['type'])
|
|
dept_index.add( (this_prog['dept'],clean_d_name(this_prog['dept'] )) )
|
|
bisect.insort(prog_index[this_prog['dept']], (this_prog['title'], this_prog['type'], clean_d_name(this_prog['dept'])+'/'+clean_d_name(this_prog['title'])+'/'+clean_d_name(this_prog['type'])))
|
|
allprogs[this_prog['dept']].append( this_prog )
|
|
for D,li in list(allprogs.items()):
|
|
dept = clean_d_name(D)
|
|
s = open('cache/programs/'+dept+'.json','w')
|
|
s.write( json.dumps(sorted(li,key=lambda x: x['title']),indent=2) )
|
|
s.close()
|
|
|
|
s = open('cache/programs/index.json','w')
|
|
s.write( json.dumps({'departments':sorted(list(dept_index)), 'programs':prog_index}, indent=2) )
|
|
s.close()
|
|
|
|
#r = open('cache/deg_certs.json','w')
|
|
#r.write( json.dumps(sorted(list(allprogs.items())), indent=2) )
|
|
#r.close()
|
|
|
|
organize_programs_stage2( )
|
|
|
|
def dd(): return defaultdict(dd)
|
|
|
|
def organize_courses():
|
|
keys = "id,dept,num,active,title,low_unit,hi_unit,is_online,hybrid_pct,desc,slos".split(",")
|
|
|
|
depts = defaultdict(dd)
|
|
|
|
for f in os.listdir('cache/courses'):
|
|
if f == 'index.json': continue
|
|
if f == 'slos.json': continue
|
|
#print f
|
|
u = open('cache/courses/' + f,'r')
|
|
w = json.loads(u.read())
|
|
for A in w:
|
|
course = {}
|
|
i = 0
|
|
for k in keys:
|
|
course[k] = A[i]
|
|
i += 1
|
|
depts[ course['dept'] ][ course['num'] ] = course
|
|
|
|
print((A[7], "\t", A[8], "\t",A[4]))
|
|
o = open('cache/courses_org.json','w')
|
|
o.write(json.dumps(depts,indent=2))
|
|
|
|
def check_de():
|
|
for f in os.listdir('cache/courses'):
|
|
if f == 'index.json': continue
|
|
if f == 'slos.json': continue
|
|
#print f
|
|
u = open('cache/courses/' + f,'r')
|
|
w = json.loads(u.read())
|
|
for A in w:
|
|
print((A[7], "\t", A[8], "\t",A[4]))
|
|
|
|
def clean_programs():
|
|
#rewrite_list = 0 ########################################### Careful you don't overwrite the existing file!
|
|
|
|
re_list = open('req_phrases.txt','r').readlines()
|
|
req_what_do = {}
|
|
last_times_seen = {}
|
|
req_times_seen = defaultdict(int)
|
|
for L in re_list:
|
|
L = L.strip()
|
|
parts = L.split('|')
|
|
req_what_do[parts[0]] = parts[1]
|
|
req_times_seen[parts[0]] = 0
|
|
last_times_seen[parts[0]] = parts[2]
|
|
|
|
attained = csv.DictReader(open("cache/degrees_attained.csv"))
|
|
att_keys = []
|
|
for row in attained:
|
|
att_keys.append(row['Program'])
|
|
|
|
#temp = open('cache/temp.txt','w')
|
|
#temp.write(json.dumps(sorted(att_keys), indent=2))
|
|
|
|
progs = json.loads(open('cache/programs.json','r').read())
|
|
|
|
# list of phrases that describe requirements
|
|
reqs = Set()
|
|
|
|
prog_keys = []
|
|
for k in progs:
|
|
if not 'title' in k or not 'type' in k or not 'dept' in k:
|
|
pass
|
|
#print "Funny prog: "
|
|
#print k
|
|
else:
|
|
ty = re.sub('Degree','',k['type'])
|
|
ty = re.sub('\.','',ty)
|
|
prog_title = k['dept'] + ": " + k['title'] + " " + ty
|
|
prog_keys.append(prog_title)
|
|
for b in k['blocks']:
|
|
rule = ''
|
|
if 'courses' in b and len(b['courses']):
|
|
if 'rule' in b and not b['rule']==' ':
|
|
#print b['rule']
|
|
reqs.add(b['rule'])
|
|
rule = b['rule']
|
|
req_times_seen[rule] += 1
|
|
if req_what_do[rule] == 'q':
|
|
print(("\nIn Program: " + prog_title))
|
|
print(("What does this rule mean? " + rule))
|
|
print(("(I see it " + last_times_seen[rule] + " times.)"))
|
|
for C in b['courses']: print((" " + C))
|
|
z = eval(input())
|
|
for c in b['courses']:
|
|
if re.search('header2',c):
|
|
parts = c.split('|')
|
|
#print "" + parts[1]
|
|
reqs.add(parts[1])
|
|
rule = parts[1]
|
|
req_times_seen[rule] += 1
|
|
|
|
if req_what_do[rule] == 'q':
|
|
print(("\nIn Program: " + prog_title))
|
|
print(("What does this rule mean? " + rule))
|
|
print(("(I see it " + last_times_seen[rule] + " times.)"))
|
|
for C in b['courses']: print((" " + C))
|
|
z = eval(input())
|
|
|
|
# Key for the list
|
|
# q - ask whats up with this rule
|
|
# n1 - single class required
|
|
# n2 - two classes required
|
|
# n3
|
|
# n4
|
|
# u1 - u99 - that many units required (minimum. ignore max)
|
|
# a - all of them
|
|
# x - ignore
|
|
# s - special or more logic needed
|
|
# e - recommended electives
|
|
#reqs_list = open('req_phrases.txt','w')
|
|
#for a in sorted( list(reqs) ):
|
|
# if a == ' ': continue
|
|
# reqs_list.write(a+"|" + req_what_do[a] + "|" + str(req_times_seen[a]) + "\n")
|
|
|
|
|
|
#mat = process.extractOne(prog_title, att_keys)
|
|
#print "Title: " + prog_title + " Closest match: " + mat[0] + " at " + str(mat[1]) + "% conf"
|
|
#temp.write(json.dumps(sorted(prog_keys),indent=2))
|
|
def course_lil_format(s):
|
|
# "02-125706|THEA12B - Acting II 3.000 *Historical*"
|
|
|
|
parts = s.split('|')
|
|
parts2 = parts[1].split(' - ')
|
|
parts3 = parts2[1].split(' ')[0:-3]
|
|
return parts2[0], parts3 ### code, name
|
|
|
|
def header_lil_format(s):
|
|
# "04-125802header2|Choose 2 courses from following list:"
|
|
|
|
parts = s.split('|')
|
|
return parts[1]
|
|
|
|
|
|
def organize_programs():
|
|
re_list = open('req_phrases.txt','r').readlines()
|
|
req_what_do = {'':'x'}
|
|
last_times_seen = {}
|
|
req_times_seen = defaultdict(int)
|
|
|
|
num_programs = 0
|
|
num_w_special_logic = 0
|
|
num_okay = 0
|
|
|
|
fout = open('program_worksheet.txt','w')
|
|
|
|
|
|
for L in re_list:
|
|
L = L.strip()
|
|
parts = L.split('|')
|
|
req_what_do[parts[0]] = parts[1]
|
|
req_times_seen[parts[0]] = 0
|
|
last_times_seen[parts[0]] = parts[2]
|
|
|
|
progs = json.loads(open('cache/programs.json','r').read())
|
|
prog_keys = []
|
|
output = ''
|
|
for k in progs:
|
|
rule_sequence = []
|
|
if not 'title' in k or not 'type' in k or not 'dept' in k:
|
|
pass
|
|
#print "Funny prog: "
|
|
#print k
|
|
else:
|
|
num_programs += 1
|
|
ty = re.sub('Degree','',k['type'])
|
|
ty = re.sub('\.','',ty)
|
|
prog_title = k['dept'] + ": " + k['title'] + " " + ty
|
|
output += "\n" + prog_title + "\n"
|
|
prog_keys.append(prog_title)
|
|
for b in k['blocks']:
|
|
rule = ''
|
|
if 'courses' in b and len(b['courses']):
|
|
if 'rule' in b and not b['rule']==' ':
|
|
rule = b['rule']
|
|
output += " Rule: ("+ req_what_do[rule]+ ") " + b['rule'] + "\n"
|
|
rule_sequence.append(req_what_do[rule])
|
|
#reqs.add(b['rule'])
|
|
req_times_seen[rule] += 1
|
|
if req_what_do[rule] == 'q':
|
|
print(("\nIn Program: " + prog_title))
|
|
print(("What does this rule mean? " + rule))
|
|
print(("(I see it " + last_times_seen[rule] + " times.)"))
|
|
for C in b['courses']: print((" " + C))
|
|
z = eval(input())
|
|
|
|
miniblocks = []
|
|
this_miniblock = {'courses':[], 'header':''}
|
|
for c in sorted(b['courses'])[::-1]:
|
|
if re.search('header2',c):
|
|
parts = c.split('|')
|
|
if this_miniblock['courses'] or req_what_do[this_miniblock['header']]!='x':
|
|
miniblocks.append(this_miniblock)
|
|
rule_sequence.append(req_what_do[this_miniblock['header']])
|
|
rule = parts[1]
|
|
this_miniblock = {'header':rule,'courses':[] }
|
|
req_times_seen[rule] += 1
|
|
|
|
if req_what_do[rule] == 'q':
|
|
print(("\nIn Program: " + prog_title))
|
|
print(("What does this rule mean? " + rule))
|
|
print(("(I see it " + last_times_seen[rule] + " times.)"))
|
|
for C in b['courses']: print((" " + C))
|
|
z = eval(input())
|
|
else:
|
|
code,name = course_lil_format(c)
|
|
|
|
this_miniblock['courses'].append(code)
|
|
if not this_miniblock['header']:
|
|
output += " "
|
|
for ccc in this_miniblock['courses']:
|
|
output += ccc + " "
|
|
output += "\n"
|
|
# final course, final mb append
|
|
if this_miniblock['courses']:
|
|
miniblocks.append(this_miniblock)
|
|
rule_sequence.append(req_what_do[this_miniblock['header']])
|
|
|
|
if miniblocks:
|
|
for m in miniblocks:
|
|
if m['header']:
|
|
output += " Miniblock rule: ("+ req_what_do[rule] + ") " + m['header'] + "\n"
|
|
output += " "
|
|
for c in m['courses']:
|
|
output += c + " "
|
|
output += "\n"
|
|
if 's' in rule_sequence:
|
|
num_w_special_logic += 1
|
|
else:
|
|
num_okay += 1
|
|
output += " Summary: [" + " ".join(rule_sequence) + " ]" + "\n"
|
|
|
|
fout.write(output)
|
|
print(("Number of programs: " + str(num_programs)))
|
|
print(("Number without special logic: " + str(num_okay)))
|
|
print(("Number with special logic: " + str(num_w_special_logic)))
|
|
# Key for the list
|
|
# q - ask whats up with this rule
|
|
# n1 - single class required
|
|
# n2 - two classes required
|
|
# n3
|
|
# n4
|
|
# u1 - u99 - that many units required (minimum. ignore max)
|
|
# a - all of them
|
|
# x - ignore
|
|
# s - special or more logic needed
|
|
# e - recommended electives
|
|
|
|
def divide_courses_list(li,rwd,online):
|
|
# return a list of lists.
|
|
lol = []
|
|
cur_list = []
|
|
|
|
for L in sorted(li):
|
|
if re.search('header2',L):
|
|
if cur_list: lol.append(cur_list)
|
|
cur_list = []
|
|
L = header_lil_format(L)
|
|
L = rwd[L] + ": " + L
|
|
else:
|
|
L,x = course_lil_format(L)
|
|
if online[L]: L = L + " " + online[L]
|
|
if L[0]!='x': cur_list.append(L)
|
|
lol.append(cur_list)
|
|
return lol
|
|
|
|
def organize_programs2():
|
|
re_list = open('req_phrases.txt','r').readlines()
|
|
classes = json.loads(open('cache/courses_org.json','r').read())
|
|
classes_bycode = {}
|
|
|
|
for d in list(classes.keys()):
|
|
for c in list(classes[d].keys()):
|
|
classes_bycode[d+" "+c] = classes[d][c]['is_online']
|
|
classes_bycode[d+c] = classes[d][c]['is_online']
|
|
#print d+c+":\t"+classes_bycode[d+c]
|
|
|
|
req_what_do = {'':'x', ' ':'x'}
|
|
last_times_seen = {}
|
|
req_times_seen = defaultdict(int)
|
|
|
|
num_programs = 0
|
|
num_w_special_logic = 0
|
|
num_okay = 0
|
|
|
|
fout = open('cache/program_worksheet.txt','w')
|
|
|
|
cout = open('cache/classes_online.json','w')
|
|
cout.write(json.dumps(classes_bycode))
|
|
cout.close()
|
|
|
|
|
|
for L in re_list:
|
|
L = L.strip()
|
|
parts = L.split('|')
|
|
req_what_do[parts[0]] = parts[1]
|
|
req_times_seen[parts[0]] = 0
|
|
last_times_seen[parts[0]] = parts[2]
|
|
|
|
progs = json.loads(open('cache/programs.json','r').read())
|
|
prog_keys = []
|
|
output = ''
|
|
for k in progs:
|
|
rule_sequence = []
|
|
if not 'title' in k or not 'type' in k or not 'dept' in k:
|
|
pass
|
|
#print "Funny prog: "
|
|
#print k
|
|
else:
|
|
num_programs += 1
|
|
ty = re.sub('Degree','',k['type'])
|
|
ty = re.sub('\.','',ty)
|
|
prog_title = k['dept'] + ": " + k['title'] + " " + ty
|
|
output += "\n" + prog_title + "\n"
|
|
for b in sorted(k['blocks'], key=lambda x: x['order'] ):
|
|
rule = ''
|
|
if 'courses' in b and len(b['courses']) and 'rule' in b and req_what_do[b['rule']]!='x':
|
|
#req_what_do[b['rule']]
|
|
output += " "+req_what_do[b['rule']]+": " + b['rule'] + "\n"
|
|
output += json.dumps(divide_courses_list(b['courses'],req_what_do,classes_bycode),indent=2) + "\n"
|
|
"""
|
|
prog_keys.append(prog_title)
|
|
for b in k['blocks']:
|
|
rule = ''
|
|
if 'courses' in b and len(b['courses']):
|
|
if 'rule' in b and not b['rule']==' ':
|
|
rule = b['rule']
|
|
output += " Rule: ("+ req_what_do[rule]+ ") " + b['rule'] + "\n"
|
|
rule_sequence.append(req_what_do[rule])
|
|
#reqs.add(b['rule'])
|
|
req_times_seen[rule] += 1
|
|
if req_what_do[rule] == 'q':
|
|
print "\nIn Program: " + prog_title
|
|
print "What does this rule mean? " + rule
|
|
print "(I see it " + last_times_seen[rule] + " times.)"
|
|
for C in b['courses']: print " " + C
|
|
z = raw_input()
|
|
|
|
miniblocks = []
|
|
this_miniblock = {'courses':[], 'header':''}
|
|
for c in sorted(b['courses'])[::-1]:
|
|
if re.search('header2',c):
|
|
parts = c.split('|')
|
|
if this_miniblock['courses'] or req_what_do[this_miniblock['header']]!='x':
|
|
miniblocks.append(this_miniblock)
|
|
rule_sequence.append(req_what_do[this_miniblock['header']])
|
|
rule = parts[1]
|
|
this_miniblock = {'header':rule,'courses':[] }
|
|
req_times_seen[rule] += 1
|
|
|
|
if req_what_do[rule] == 'q':
|
|
print "\nIn Program: " + prog_title
|
|
print "What does this rule mean? " + rule
|
|
print "(I see it " + last_times_seen[rule] + " times.)"
|
|
for C in b['courses']: print " " + C
|
|
z = raw_input()
|
|
else:
|
|
code,name = course_lil_format(c)
|
|
|
|
this_miniblock['courses'].append(code)
|
|
if not this_miniblock['header']:
|
|
output += " "
|
|
for ccc in this_miniblock['courses']:
|
|
output += ccc + " "
|
|
output += "\n"
|
|
# final course, final mb append
|
|
if this_miniblock['courses']:
|
|
miniblocks.append(this_miniblock)
|
|
rule_sequence.append(req_what_do[this_miniblock['header']])
|
|
|
|
if miniblocks:
|
|
for m in miniblocks:
|
|
if m['header']:
|
|
output += " Miniblock rule: ("+ req_what_do[rule] + ") " + m['header'] + "\n"
|
|
output += " "
|
|
for c in m['courses']:
|
|
output += c + " "
|
|
output += "\n"
|
|
if 's' in rule_sequence:
|
|
num_w_special_logic += 1
|
|
else:
|
|
num_okay += 1
|
|
output += " Summary: [" + " ".join(rule_sequence) + " ]" + "\n"
|
|
"""
|
|
fout.write(output)
|
|
print(("Number of programs: " + str(num_programs)))
|
|
print(("Number without special logic: " + str(num_okay)))
|
|
print(("Number with special logic: " + str(num_w_special_logic)))
|
|
# Key for the list
|
|
# q - ask whats up with this rule
|
|
# n1 - single class required
|
|
# n2 - two classes required
|
|
# n3
|
|
# n4
|
|
# u1 - u99 - that many units required (minimum. ignore max)
|
|
# a - all of them
|
|
# x - ignore
|
|
# s - special or more logic needed
|
|
# e - recommended electives
|
|
# sorting by order key of dict
|
|
def cmp_2(a):
|
|
return a['order']
|
|
def cmp_order(a,b):
|
|
if a['order'] > b['order']: return 1
|
|
if a['order'] < b['order']: return -1
|
|
if a['order'] == b['order']: return 0
|
|
|
|
# decipher the grouped up courses line
|
|
def split_course(st):
|
|
# "01-127153|SOC1A - Introduction to Sociology 3.000 *Active*"
|
|
if 'header2' in st:
|
|
return st.split("|")[1] #"Header - " + st
|
|
|
|
parts = re.search( r'^(.*)\|(.+?)\s-\s(.+?)\s([\d|\.|\s|\-]+)\s+(\*.+\*)([\s\|\sOR]*)$', st)
|
|
if parts:
|
|
#print "Matched: ", parts
|
|
name = parts.group(3)
|
|
units = parts.group(4)
|
|
units = re.sub( r'(\d)\.000', r'\1', units)
|
|
units = re.sub( r'\.500', r'.5', units)
|
|
|
|
if units=='1500 3 ':
|
|
units = 3
|
|
name += " 1500" # hack for HIST 4
|
|
return {'cn_code':parts.group(1), 'code':parts.group(2), 'name':name,
|
|
'units':units, 'status':parts.group(5), 'or':parts.group(6) }
|
|
print("*** Didn't match that class")
|
|
return 0
|
|
|
|
# Any number gets an X (checked). Blank or zero gets no check.
|
|
def units_to_x(u):
|
|
if u: return 'X'
|
|
return ' '
|
|
|
|
def p_block_rule(r,printme,doc,out=0):
|
|
if printme:
|
|
if out: out.write("\t".join([r,'Units','Spring 19','Summer 19','Fall 19']) + "\n")
|
|
if not len(doc.tables):
|
|
t = doc.add_table(1, 5, style='Table Grid')
|
|
else:
|
|
t = doc.tables[-1]
|
|
t.rows[0].cells[0].text = r
|
|
t.rows[0].cells[1].text = 'Units'
|
|
t.rows[0].cells[2].text = 'Spring 19'
|
|
t.rows[0].cells[3].text = 'Summer 19'
|
|
t.rows[0].cells[4].text = 'Fall 19'
|
|
else:
|
|
if out: out.write("\t" + r + "\n")
|
|
t = doc.tables[-1].add_row()
|
|
t = doc.tables[-1].add_row()
|
|
t.cells[0].text = r
|
|
|
|
def p_cert_header(type,doc,r='',out=0):
|
|
if out: out.write("DEGREE: " + type + " (" + r + ")" + "\n")
|
|
if r: doc.add_heading(type + " (" + r + ")", 2)
|
|
else: doc.add_heading(type , 2)
|
|
t = doc.add_table(1, 5, style='Table Grid')
|
|
t.rows[0].cells[0].width = Inches(3.0)
|
|
#print(type)
|
|
|
|
|
|
def p_block_header(r,doc,out=0):
|
|
t = doc.tables[-1].add_row()
|
|
t = doc.tables[-1].add_row()
|
|
t.cells[0].text = r
|
|
if out: out.write("\t"+r+"\n" )
|
|
|
|
def p_cert_course_missing(cd,doc,out=0):
|
|
if out: out.write(cd['code'] + " - " + cd['name'] + "\t" + cd['units'] + "\n")
|
|
t = doc.tables[-1].add_row()
|
|
t.cells[0].text = cd['code'] + " - " + cd['name']
|
|
t.cells[1].text = cd['units']
|
|
|
|
|
|
def p_cert_course(cd,history,doc,out=0):
|
|
if out:
|
|
line = "\t" + units_to_x(history['sp19']) + "\t" \
|
|
+ units_to_x(history['su19']) + "\t" + units_to_x(history['fa19'])
|
|
out.write(cd['code'] + " - " + cd['name'] + "\t" + cd['units'] + line + "\n")
|
|
|
|
t = doc.tables[-1].add_row()
|
|
t.cells[0].text = cd['code'] + " - " + cd['name']
|
|
if cd['or']: t.cells[0].text += " OR "
|
|
|
|
t.cells[1].text = str(cd['units'])
|
|
t.cells[2].text = units_to_x(history['sp19'])
|
|
t.cells[3].text = units_to_x(history['su19'])
|
|
t.cells[4].text = units_to_x(history['fa19'])
|
|
#print("\t" + cd['code'] + "\t" + cd['name'] + "\t" + cd['units']+line)
|
|
|
|
|
|
def p_end_block(out=0):
|
|
if out: out.write("\n")
|
|
|
|
def p_end_cert(bigdoc, out=0):
|
|
if out: out.write("\n\n\n")
|
|
bigdoc.add_page_break()
|
|
|
|
|
|
def ask_for_rule(r):
|
|
print(("Can't find this rule: " + r))
|
|
print("""Possible answers:
|
|
# q - ask whats up with this rule # u1 - u99 - that many units required (minimum. ignore max)
|
|
# n1 - single class required a - all of them
|
|
# n2 - two classes required x - ignore
|
|
# n3 s - special or more logic needed
|
|
# n4 e - recommended electives""")
|
|
answer = input("What should it be? ").strip()
|
|
f= open("cache/req_phrases.txt","a+",encoding="utf-8")
|
|
f.write("\n" + r + "|" + answer + "|1")
|
|
f.close()
|
|
return answer
|
|
|
|
def action_to_english(a):
|
|
if a == 'x': return 0
|
|
if a == 'e': return 'Electives'
|
|
if a == 's': return 'More logic needed / Special rule'
|
|
if a == 'a': return "Required - Complete ALL of the following courses:"
|
|
m = re.search(r'^([a-z])([\d\.]+)$',a)
|
|
if m:
|
|
if m.group(1) == 'u':
|
|
return "Choose %s units from the following courses: " % m.group(2)
|
|
if m.group(1) == 'n':
|
|
return "Choose %s courses from the following: " % m.group(2)
|
|
return 0
|
|
|
|
|
|
# block = { rule, num } and courses is a DataFrame
|
|
# Return True if the courses satisfy the rule
|
|
def check_a_block(b, courses, verbose=False):
|
|
indent = " "
|
|
if verbose:
|
|
print((indent+"Trying the rule: " + b['englrule']))
|
|
|
|
if b['rule'] == 'all':
|
|
for C in b['courses']:
|
|
if verbose: print(C)
|
|
if not C[3]:
|
|
if verbose: print((indent+"Failed."))
|
|
return False
|
|
return True
|
|
elif b['rule'] == 'min_units':
|
|
num = float(b['num'])
|
|
count = 0.0
|
|
for C in b['courses']:
|
|
if C[3]: count += C[2]
|
|
return count >= num
|
|
elif b['rule'] == 'min_courses':
|
|
num = float(b['num'])
|
|
count = 0
|
|
for C in b['courses']:
|
|
if C[3]: count += 1
|
|
if not count >= num:
|
|
if verbose: print((indent+"Failed."))
|
|
return count >= num
|
|
if b['rule'] in [ 'elective', 'special' ]: return 1
|
|
print("I didn't understand the rule")
|
|
|
|
return True
|
|
|
|
def read_block_english_to_code():
|
|
blockrules = {}
|
|
for L in open('cache/req_phrases.txt','r',encoding='utf-8').readlines():
|
|
parts = L.strip().split('|')
|
|
blockrules[ parts[0] ] = [ parts[1], parts[2] ]
|
|
return blockrules
|
|
|
|
|
|
def read_section_online_history():
|
|
sections = pd.read_csv('cache/one_year_course_modes.csv') # todo: this file depends on other fxns. which?
|
|
sections.set_index('Unnamed: 0',inplace=True)
|
|
sections.sort_values('Unnamed: 0', inplace=True)
|
|
for i, course in sections.iterrows():
|
|
if course['sp19'] or course['su19'] or course['fa19']:
|
|
sections.loc[i,'was_online'] = 1
|
|
else:
|
|
sections.loc[i,'was_online'] = 0
|
|
return sections
|
|
|
|
|
|
# Use an easy data structure (dataframes and dicts) and functions that operate on them.
|
|
# This is the 3rd attempt.
|
|
def simple_find_online_programs():
|
|
|
|
## Step 1: Gather the relevant details.
|
|
## Read in all data, and perform whatever analysis that can be
|
|
## done individually. * list of depts * list of which classes offered online
|
|
## * the rules in english vs. code * the programs themselves
|
|
##
|
|
##
|
|
## Step 2: Do the big pass through the programs (degrees and certs). Focus on the leaves and
|
|
## branches first. Those are the individual courses and the blocks.
|
|
## Process each block on this first pass.
|
|
##
|
|
## Result of each block is a dataframe (holding course info) and a dict (holding details, labels,
|
|
## conclusions and analysis).
|
|
##
|
|
## After the blocks, There's enough info to process the cert. Do that, and conclude if it
|
|
## is online, close, or far. (Later this same pipeline will work for whether it is evening, etc...)
|
|
##
|
|
## Step 3: Do the second pass, and output to documents, web, or whatever other format.
|
|
##
|
|
##
|
|
|
|
# 1. Gathering data
|
|
section_history = read_section_online_history() # a dataframe indexed by course codename.
|
|
blockrules = read_block_english_to_code()
|
|
alldepts = [x[1] for x in json.loads( open('cache/programs/index.json','r').read() )['departments']]
|
|
|
|
|
|
# todo: courses with a HYPHEN in NAME get parsed wrong.
|
|
|
|
# 2. First pass: Process blocks, certs.
|
|
for prog in alldepts:
|
|
fname = 'cache/programs/'+prog+'.json'
|
|
print(("Reading %s" % fname))
|
|
inp = open(fname,'r')
|
|
filedata = inp.read()
|
|
p_info = json.loads(filedata)
|
|
for p in p_info:
|
|
print((" "+p['dept'] + "\t" + p['type'] + "\t" + p['title']))
|
|
b = p['blocks']
|
|
b.sort(key=cmp_2)
|
|
for block in b:
|
|
if 'rule' in block:
|
|
### RIGHT HERE - fxn to extract block to DF
|
|
print((" " + block['rule']))
|
|
for_df = []
|
|
for crs in block['courses']:
|
|
c_data = split_course(crs)
|
|
if type(c_data) is dict:
|
|
c_data['code'] = re.sub(r'\s','',c_data['code'])
|
|
try:
|
|
c_data['was_online'] = section_history.loc[ c_data['code'] , 'was_online' ]
|
|
except KeyError:
|
|
c_data['was_online'] = 0
|
|
for_df.append(c_data)
|
|
else:
|
|
print((" ", c_data))
|
|
if len(for_df):
|
|
this_df = pd.DataFrame(for_df)
|
|
print(this_df)
|
|
#input("\n\nPress enter to continue...\n\n")
|
|
|
|
|
|
|
|
def check_a_block_a(b,verbose=False):
|
|
indent = " "
|
|
if verbose: print((indent+"Trying the rule: " + b['englrule']))
|
|
|
|
if b['rule'] == 'all':
|
|
for C in b['courses']:
|
|
if verbose: print(C)
|
|
if not C[3]:
|
|
if verbose: print((indent+"Failed."))
|
|
return False
|
|
return True
|
|
elif b['rule'] == 'min_units':
|
|
num = float(b['num'])
|
|
count = 0.0
|
|
for C in b['courses']:
|
|
if C[3]: count += C[2]
|
|
return count >= num
|
|
elif b['rule'] == 'min_courses':
|
|
num = float(b['num'])
|
|
count = 0
|
|
for C in b['courses']:
|
|
if C[3]: count += 1
|
|
if not count >= num:
|
|
if verbose: print((indent+"Failed."))
|
|
return count >= num
|
|
if b['rule'] in [ 'elective', 'special' ]: return 1
|
|
print("I didn't understand the rule")
|
|
|
|
return True
|
|
|
|
def smart_find_online_programs():
|
|
|
|
big_block_list = []
|
|
|
|
with ruleset('curriculum'):
|
|
|
|
# COURSES in BLOCKS
|
|
@when_all( (m.relationship == 'contains') & (+m.course) )
|
|
def show_contained_class(c):
|
|
#print( str(c.m.block) + " is a block that contains " \
|
|
# + str(c.m.course) + " with " + str(c.m.units) + " units" )
|
|
pass
|
|
|
|
# BLOCK Rule/Condition with and without numbers
|
|
@when_all( (+m.blockrule) & (+m.number) )
|
|
def show_block(c):
|
|
#print( str(c.m.block) + " is a block that needs " + str(c.m.blockrule) + " of " + c.m.number )
|
|
big_block_list.append( [ c.m.block, "rule", c.m.blockrule, c.m.number, c.m.englrule ] )
|
|
|
|
@when_all( (+m.blockrule) & (-m.number) )
|
|
def show_block(c):
|
|
#print( str(c.m.block) + " is a block that needs " + str(c.m.blockrule) )
|
|
print(("++RULE: " + str(c.m)))
|
|
big_block_list.append( [ c.m.block, "rule", c.m.blockrule, 0, c.m.englrule ] )
|
|
|
|
# Has course historically been OFFERED ONLINE
|
|
@when_all(m.sem1>0 or m.sem2>0 or m.sem3>0)
|
|
def is_online(c):
|
|
#print("Class counts as online: " + str(c.m.course))
|
|
c.assert_fact('curriculum', { 'course': c.m.course, 'status': 'was_offered_online', 'value': True })
|
|
|
|
# Or NEVER ONLINE
|
|
@when_all(m.sem1==0 and m.sem2==0 and m.sem3==0)
|
|
def is_online(c):
|
|
#print("Class was never online: " + str(c.m.course))
|
|
c.assert_fact('curriculum', { 'course': c.m.course, 'status': 'was_offered_online', 'value': False })
|
|
|
|
# Has course in the block OFFERED ONLINE?
|
|
@when_all( c.zero << +m.blockrule,
|
|
c.first << (m.relationship == 'contains') & (m.block==c.zero.block),
|
|
c.second << (m.course == c.first.course ) & (m.status == 'was_offered_online') & (m.value==True) )
|
|
def is_online_inblock(c):
|
|
#print(" and it was online! " + c.first.block + " / " + c.second.course)
|
|
#print(c.first.block + "\t" + c.first.course['code'] + "\t Yes online")
|
|
print(" Yes online")
|
|
big_block_list.append( [ c.first.block, c.first.course, c.first.units, True, c.first ] )
|
|
|
|
# Has course in the block *NOT OFFERED ONLINE?
|
|
@when_all( c.three << +m.blockrule,
|
|
c.four << (m.relationship == 'contains') & (m.block==c.three.block),
|
|
c.five << (m.course == c.four.course ) & (m.status == 'was_offered_online') & (m.value==False) )
|
|
def is_online_inblock(c):
|
|
#print(" and it was online! " + c.four.block + " / " + c.five.course)
|
|
#print(c.first.block + "\t" + c.first.course['code'] + "\t NOT online")
|
|
print(" NOT online")
|
|
big_block_list.append( [ c.four.block, c.four.course, c.four.units, False, c.four ] )
|
|
|
|
|
|
|
|
sections = pd.read_csv('cache/one_year_course_modes.csv')
|
|
sections.set_index('Unnamed: 0',inplace=True)
|
|
sections.sort_values('Unnamed: 0', inplace=True)
|
|
alldepts = [x[1] for x in json.loads( open('cache/programs/index.json','r').read() )['departments']]
|
|
|
|
#history = sections.df.to_dict('index')
|
|
|
|
print('starting...')
|
|
for i, course in sections.iterrows():
|
|
try:
|
|
assert_fact('curriculum', { 'course': str(i), 'sem1': int(course['sp19']), 'sem2': int(course['su19']), 'sem3':int(course['fa19']) })
|
|
except Exception as e:
|
|
pass
|
|
|
|
blockrules = {}
|
|
for L in open('cache/req_phrases.txt','r',encoding='utf-8').readlines():
|
|
parts = L.strip().split('|')
|
|
blockrules[ parts[0] ] = [ parts[1], parts[2] ]
|
|
|
|
blockindex = 0
|
|
|
|
for prog in alldepts:
|
|
p_info = json.loads(open('cache/programs/'+prog+'.json','r').read())
|
|
for p in p_info:
|
|
deg_longname = p['dept'] + ' - ' + p['type'] + ' - ' + p['title']
|
|
print(deg_longname)
|
|
big_block_list.append( [ deg_longname ] )
|
|
|
|
for block in sorted(p['blocks'],key=cmp_2):
|
|
if not 'rule' in block: continue
|
|
|
|
# Look up code for what is needed with this block of classes.
|
|
the_rule = block['rule'].strip()
|
|
if not the_rule in blockrules:
|
|
blockrules[ the_rule ] = [ ask_for_rule( the_rule ), 1 ]
|
|
|
|
action = blockrules[ the_rule][0]
|
|
engl = action_to_english(action)
|
|
if not engl: continue
|
|
print((" + " + engl))
|
|
|
|
blockindex += 1
|
|
blocklabel = 'block_' + str(blockindex)
|
|
|
|
|
|
|
|
# Assert if the courses make the block qualify
|
|
#print(action)
|
|
# needs to be a rule too....... # Required - Complete ALL of the following courses:
|
|
|
|
#print("\n\n")
|
|
try:
|
|
match = re.search(r'^([a-z])([\d\.]+)$',action)
|
|
if action == 'a':
|
|
assert_fact('curriculum', { 'block':blocklabel, 'degree': deg_longname, 'blockrule': 'all', 'englrule':engl})
|
|
elif action == 'x':
|
|
pass
|
|
elif action == 'e':
|
|
assert_fact('curriculum', { 'block':blocklabel, 'degree': deg_longname, 'blockrule': 'elective', 'englrule':engl})
|
|
elif action == 's':
|
|
assert_fact('curriculum', { 'block':blocklabel, 'degree': deg_longname, 'blockrule': 'special', 'englrule':engl})
|
|
elif match and match.group(1) == 'u':
|
|
assert_fact('curriculum', { 'block':blocklabel, 'degree': deg_longname, 'blockrule': 'min_units', 'number': match.group(2), 'englrule':engl })
|
|
elif match and match.group(1) == 'n':
|
|
assert_fact('curriculum', { 'block':blocklabel, 'degree': deg_longname, 'blockrule': 'min_courses', 'number': match.group(2), 'englrule':engl })
|
|
except MessageNotHandledException as e:
|
|
pass
|
|
#print(e)
|
|
|
|
|
|
for crs in block['courses']:
|
|
if re.search(r'header2',crs):
|
|
descr = crs.split("|")[1]
|
|
big_block_list.append( [ 'header', descr ] )
|
|
continue
|
|
c_data = split_course(crs)
|
|
#c_data['code'] = re.sub(r'\s','',c_data['code'])
|
|
|
|
try:
|
|
if 'code' in c_data and c_data['code']:
|
|
fixed_code = re.sub(r'\s','',c_data['code'])
|
|
history = sections.loc[fixed_code]
|
|
else:
|
|
msg = "BAD COURSE DATA: " + str(crs)
|
|
data = {'code':'?','name':'?','units':'?'}
|
|
continue
|
|
except Exception as e:
|
|
msg = "COULDNT FIND ONLINE DATA for " + c_data['code'] + " - " + c_data['name']
|
|
continue
|
|
#p_cert_course(c_data,history,output,doc)
|
|
|
|
# Handle the class
|
|
#print("\t" + str(c_data))
|
|
try:
|
|
print((" Asserting " + blocklabel + "\t" + json.dumps({ 'block':blocklabel, 'course': fixed_code,
|
|
'relationship': 'contains', 'units':float(c_data['units']),
|
|
'code': fixed_code, 'name': c_data['name'],
|
|
'status': c_data['status'], 'or': c_data['or'] })))
|
|
assert_fact('curriculum', { 'block':blocklabel, 'course': fixed_code,
|
|
'relationship': 'contains', 'units':float(c_data['units']),
|
|
'code': fixed_code, 'name': c_data['name'],
|
|
'status': c_data['status'], 'or': c_data['or'] })
|
|
except Exception as e:
|
|
pass
|
|
#print(e)
|
|
# END block of courses
|
|
|
|
|
|
#print("Finished reading "+deg_longname)
|
|
# END cert or degree
|
|
eval(input('hit return...'))
|
|
# Big Structure of all degrees
|
|
degs_main = {}
|
|
this_deg = ''
|
|
for R in big_block_list:
|
|
if R[0] == 'header': # its a funny header, not quite a rule....
|
|
#print(R)
|
|
degs_main[this_deg]['blocks'].append( {'rule':'', 'englrule':'', 'courses':[], 'header':R[1] } )
|
|
elif not R[0].startswith('block'): # everything starts with block except new degrees
|
|
degs_main[R[0]] = { 'deg':R[0], 'blocks':[] }
|
|
this_deg = R[0]
|
|
#print(this_deg)
|
|
elif R[1] == 'rule':
|
|
degs_main[this_deg]['blocks'].append( {'rule':R[2], 'englrule':R[4], 'courses':[], 'header':'' } )
|
|
#print(" "+R[4])
|
|
if len(R) > 3:
|
|
degs_main[this_deg]['blocks'][-1]['num'] = R[3]
|
|
else:
|
|
degs_main[this_deg]['blocks'][-1]['courses'].append(R)
|
|
#print(" "+str(R))
|
|
|
|
# Print them
|
|
bigdoc = Document()
|
|
for k,v in list(degs_main.items()):
|
|
|
|
print((v['deg']))
|
|
qualifies = True
|
|
if not re.search(r'chievement',v['deg']):
|
|
qualifies = False ## JUST DOING CAs
|
|
print(" Skipping because not a CA")
|
|
if not qualifies: continue
|
|
for vv in v['blocks']:
|
|
for CC in vv['courses']:
|
|
print((" " + "\t".join([ CC[0], CC[1], str(CC[3]), CC[4]['name']])))
|
|
if not check_a_block_a(vv,1):
|
|
qualifies = False
|
|
break
|
|
if not qualifies: continue
|
|
|
|
print(" + OK, including this one.")
|
|
|
|
bigdoc.add_heading('Gavilan College', 2)
|
|
#bigdoc.add_heading(v['deg'], 2)
|
|
p_cert_header(v['deg'],bigdoc)
|
|
print_headers = 1
|
|
|
|
for vv in v['blocks']:
|
|
p_block_rule(vv['englrule'],print_headers,bigdoc)
|
|
print_headers = 0
|
|
|
|
more = ''
|
|
if 'num' in vv: more = ' / ' + str( vv['num'] )
|
|
|
|
#print( " " + vv['rule'] + more )
|
|
if vv['header']:
|
|
p_block_header(vv['header'],bigdoc)
|
|
#print(" ("+vv['header']+")")
|
|
for vvv in vv['courses']:
|
|
#print(vvv[4])
|
|
#print(vvv)
|
|
#print(" " + json.dumps(vvv))
|
|
p_cert_course(vvv[4], sections.loc[ vvv[1] ],bigdoc)
|
|
p_end_cert(bigdoc)
|
|
bigdoc.save('output/onlinecerts/all_cert_achievement.docx')
|
|
|
|
|
|
|
|
|
|
# 9/2021 clean programs to good json
|
|
def organize_programs_stage2():
|
|
alldepts = [x[1] for x in json.loads( open('cache/programs/index.json','r').read() )['departments']]
|
|
output = codecs.open('cache/deg_certs.json','w','utf-8')
|
|
|
|
all_progs = []
|
|
|
|
for prog in alldepts:
|
|
fname = 'cache/programs/'+prog+'.json'
|
|
print(("Reading %s" % fname))
|
|
filedata = open(fname,'r').read()
|
|
p_info = json.loads(filedata)
|
|
|
|
|
|
for p in p_info:
|
|
pretty_p = {}
|
|
print(p['dept'] + "\t" + p['type'] + "\t" + p['title'])
|
|
pretty_p['title'] = p['title']
|
|
pretty_p['dept'] = p['dept']
|
|
if 'desc' in p: pretty_p['desc'] = p['desc']
|
|
if 'type' in p: pretty_p['type'] = p['type']
|
|
print(" - %s\n - %s\n" % (p['dept'],p['title']))
|
|
pretty_p['groups'] = []
|
|
|
|
b = p['blocks']
|
|
b.sort(key=cmp_2)
|
|
for block in b:
|
|
this_block = {'courses':[],'header':""}
|
|
if 'rule' in block:
|
|
#print("\t"+block['order'] + "\t" + block['rule'])
|
|
#p_block_rule(block['rule'],output,print_headers,doc)
|
|
this_block['header'] = block['rule']
|
|
|
|
for crs in sorted(block['courses']):
|
|
if re.search(r'header2',crs):
|
|
if len(this_block['courses']):
|
|
pretty_p['groups'].append(this_block)
|
|
this_block = {'courses':[],'header':""}
|
|
parts = crs.split("|")
|
|
#print(parts)
|
|
this_block['header'] = parts[1]
|
|
continue
|
|
c_data = split_course(crs)
|
|
if type({})==type(c_data) and 'code' in c_data:
|
|
code = c_data['code']
|
|
if type({})==type(c_data) and 'or' in c_data and c_data['or']: code += " or"
|
|
|
|
if c_data:
|
|
this_block['courses'].append( [ code,c_data['name'],c_data['units'] ])
|
|
# a string or a dict
|
|
# {'cn_code':parts.group(1), 'code':parts.group(2), 'name':parts.group(3),
|
|
# 'units':parts.group(4), 'status':parts.group(5), 'or':parts.group(6) }
|
|
pretty_p['groups'].append(this_block)
|
|
all_progs.append(pretty_p)
|
|
output.write(json.dumps( all_progs,indent=2))
|
|
|
|
|
|
# of all the programs, what can be accomplished online?
|
|
def find_online_programs():
|
|
#sections = summarize_online_sections()
|
|
sections = pd.read_csv('cache/one_year_course_modes.csv')
|
|
sections.set_index('Unnamed: 0',inplace=True)
|
|
|
|
bigdoc = Document()
|
|
#bigdoc.styles.add_style('Table Grid', docx.styles.style._TableStyle, builtin=True)
|
|
|
|
alldepts = [x[1] for x in json.loads( open('cache/programs/index.json','r').read() )['departments']]
|
|
|
|
for prog in alldepts:
|
|
|
|
#prog = 'administration_of_justice'
|
|
fname = 'cache/programs/'+prog+'.json'
|
|
print(("Reading %s" % fname))
|
|
input = open(fname,'r')
|
|
filedata = input.read()
|
|
p_info = json.loads(filedata)
|
|
#print p_info
|
|
|
|
output = open('output/onlinecerts/'+prog+'.txt','w')
|
|
for p in p_info:
|
|
#print(p['dept'] + "\t" + p['type'] + "\t" + p['title'])
|
|
|
|
if re.search(r'chievement',p['type']):
|
|
use_bigdoc = 1
|
|
bigdoc.add_heading('Gavilan College', 2)
|
|
bigdoc.add_heading(p['dept'], 2)
|
|
p_cert_header(p['type'],bigdoc,p['title'],output)
|
|
|
|
else:
|
|
use_bigdoc = 0
|
|
|
|
#doc = Document()
|
|
#doc.add_heading('Gavilan College', 2)
|
|
#p_cert_header(p['type'],p['title'],output,doc)
|
|
b = p['blocks']
|
|
b.sort(key=cmp_2)
|
|
|
|
print_headers = 1
|
|
|
|
for block in b:
|
|
if 'rule' in block:
|
|
#print("\t"+block['order'] + "\t" + block['rule'])
|
|
#p_block_rule(block['rule'],output,print_headers,doc)
|
|
if use_bigdoc: p_block_rule(block['rule'],output,print_headers,bigdoc)
|
|
|
|
print_headers = 0
|
|
for crs in block['courses']:
|
|
if re.search(r'header2',crs):
|
|
parts = crs.split("|")
|
|
#p_block_header(parts[1],output,doc)
|
|
if use_bigdoc: p_block_header(parts[1],output,bigdoc)
|
|
continue
|
|
|
|
c_data = split_course(crs)
|
|
try:
|
|
if 'code' in c_data and c_data['code']:
|
|
fixed_code = re.sub(r'\s','',c_data['code'])
|
|
history = sections.loc[fixed_code]
|
|
else:
|
|
print(("BAD COURSE DATA: " + str(crs)))
|
|
#p_cert_course_missing({'code':'?','name':'?','units':'?'},output,doc)
|
|
if use_bigdoc: p_cert_course_missing({'code':'?','name':'?','units':'?'},output,bigdoc)
|
|
continue
|
|
except Exception as e:
|
|
#print("COULDNT FIND ONLINE DATA for " + c_data['code'] + " - " + c_data['name'])
|
|
#p_cert_course_missing(c_data,output,doc)
|
|
if use_bigdoc: p_cert_course_missing(c_data,output,bigdoc)
|
|
#print(e)
|
|
continue
|
|
#print("\t\t[", crs, "]")
|
|
#print("\t\t", c_data)
|
|
#p_cert_course(c_data,history,output,doc)
|
|
if use_bigdoc: p_cert_course(c_data,history,output,bigdoc)
|
|
#p_end_block(output)
|
|
if use_bigdoc: p_end_cert(output,bigdoc)
|
|
#doc_title = re.sub(r'\/','_',p['title'])
|
|
#doc.save('output/onlinecerts/'+prog+'_' + doc_title + '.docx')
|
|
bigdoc.save('output/onlinecerts/all_ca.docx')
|
|
|
|
|
|
# take a string of all the types of classes offered, return a vector of [tot,lec,hyb,onl]
|
|
def string_to_types(st):
|
|
l,h,o,s = (0,0,0,0)
|
|
for p in st.split(','):
|
|
s += 1
|
|
if p == 'online': o+=1
|
|
elif p == 'face to face': l += 1
|
|
elif p == 'hybrid': h += 1
|
|
#else: print "Didn't catch this: ", p
|
|
return [s,l,h,o]
|
|
|
|
def my_default_counter():
|
|
temp = {}
|
|
for S in sems:
|
|
temp[S] = 0
|
|
return temp
|
|
#return {'units':'','Spring 19':0,'Summer 19':0,'Fall 19',0}
|
|
|
|
# Of the recent schedules, what was actually offered online?
|
|
def summarize_online_sections():
|
|
scheds = list(map(getSemesterSchedule,sems))
|
|
all = pd.concat(scheds,sort=True)
|
|
selected = all[['code','type','sem']]
|
|
selected.to_csv('cache/one_year_course_sections.csv')
|
|
|
|
# Count the online sections offered by semester
|
|
counter = defaultdict(my_default_counter)
|
|
for index,row in selected.iterrows():
|
|
# print(row)
|
|
code = row['code']
|
|
code = re.sub('\s','',code)
|
|
entry = counter[code]
|
|
if row['type'] == 'online':
|
|
entry[ row['sem'] ] += 1
|
|
df_counter = pd.DataFrame.from_dict(counter,orient='index')
|
|
#print(df_counter)
|
|
df_counter.to_csv('cache/one_year_course_modes.csv')
|
|
#return df_counter
|
|
|
|
bycode = selected.groupby('code')
|
|
try:
|
|
ff = bycode.agg( lambda x: string_to_types(','.join(x)) )
|
|
except Exception as e:
|
|
print("There was a problem with the schedules. One may not have the 'type' column.")
|
|
print("Check 'cache/one_year_course_modes.csv' for details")
|
|
return
|
|
|
|
types_by_course = {}
|
|
for row_index, row in ff.iterrows():
|
|
types_by_course[row_index.replace(" ","")] = row['type']
|
|
|
|
df = pd.DataFrame.from_dict(types_by_course,orient='index',columns=['sections','lec','hyb','online'])
|
|
#print(df)
|
|
df.to_csv('cache/one_year_online_courses.csv')
|
|
print("Saved to cache/one_year_online_courses.csv")
|
|
return df
|
|
|
|
def fibonacci(n):
|
|
return match(n,
|
|
1, 1,
|
|
2, 1,
|
|
_, lambda x: fibonacci(x-1) + fibonacci(x-2)
|
|
)
|
|
|
|
|
|
def test_pampy():
|
|
for i in [1,2,3,4,5,7,9,15]:
|
|
print(("fib(%i) is: %i" % (i,fibonacci(i))))
|
|
|
|
|
|
def cq_parse_experiment(root=0, indent=''):
|
|
# call this on anything that's a list. It'll recurse on each element of it.
|
|
|
|
# if the value was false, roll it back up and dont
|
|
# display
|
|
|
|
ret = ''
|
|
if type(root) == type({}):
|
|
ret += indent + "{"
|
|
for K,V in list(root.items()):
|
|
ret += K + ": " + \
|
|
cq_parse_experiment(V,indent+" ")+ ", " +indent
|
|
ret += "}"
|
|
|
|
elif type(root) == type([]):
|
|
for K in root:
|
|
ret += "[" + cq_parse_experiment(K, indent+" ") + "]"
|
|
|
|
elif type(root) == type("abc"): ret += root
|
|
elif type(root) == type(55): ret += str(root)
|
|
elif type(root) == type(5.5): ret += str(root)
|
|
else: ret += str(root)
|
|
|
|
return ret
|
|
|
|
def cq_start():
|
|
root = json.loads( open('cache/programs/programs_1.txt','r').read())
|
|
outt = open('cache/test_prog.txt','w')
|
|
outt.write(cq_parse_experiment(root,'\n'))
|
|
|
|
"""my first pattern
|
|
|
|
"dataTypeDetails": {
|
|
"scale": 2,
|
|
"type": "numeric",
|
|
"precision": 6
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cq_pattern_backup1(root=0, indent=''):
|
|
# call this on anything that's a list. It'll recurse on each element of it.
|
|
|
|
# if the value was false, roll it back up and dont
|
|
# display
|
|
|
|
ret = ''
|
|
|
|
# xxxx Rules here catches them top-down
|
|
|
|
if type(root) == type({}):
|
|
ret += indent + "{"
|
|
for K,V in list(root.items()):
|
|
ret += '"'+K+'"' + ": " + \
|
|
str(cq_pattern(V,indent+" "))+ ", " +indent
|
|
ret += "}"
|
|
|
|
elif type(root) == type([]):
|
|
for K in root:
|
|
ret += "[" + str(cq_pattern(K, indent+" ")) + "]"
|
|
|
|
elif type(root) == type("abc"): ret += '"'+root+'"'
|
|
elif type(root) == type(55): ret += str(root)
|
|
elif type(root) == type(5.5): ret += str(root)
|
|
elif type(root) == type(False):
|
|
if root == False: return "False"
|
|
elif root == True: return "True"
|
|
else:
|
|
result = lookForMatch(pat,rule)
|
|
if result: ret = str(result)
|
|
else: ret += '"'+str(root)+'"'
|
|
|
|
return ret
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
def found(*x):
|
|
#print(len(x))
|
|
print(x)
|
|
|
|
return str(x)
|
|
|
|
def lookForMatch(rules,item):
|
|
var1 = ''
|
|
for i,x in enumerate(rules):
|
|
if i % 2 == 1:
|
|
a = match(item, var1, x, default='')
|
|
if a:
|
|
labels[i-1 / 2] += 1
|
|
break
|
|
else:
|
|
var1 = x
|
|
|
|
|
|
#a = match(root,*curic_patterns,default='')
|
|
if a:
|
|
#print("Matched: " + str(a))
|
|
return a
|
|
#print("Didn't match: " + str(item) + "\n")
|
|
return False
|
|
|
|
|
|
|
|
#from curriculum_patterns import pat
|
|
from patterns_topdown import pat
|
|
|
|
labels = defaultdict(int)
|
|
|
|
def cq_pattern(root=0, indent=''):
|
|
# call this on anything that's a list. It'll recurse on each element of it.
|
|
|
|
# if the value was false, roll it back up and dont
|
|
# display
|
|
|
|
ret = ''
|
|
|
|
# xxxx Rules here catches them top-down
|
|
# instead we'll do each element of data structure, and then try to match the whole thing.
|
|
|
|
if type(root) == type({}):
|
|
ret = {}
|
|
for K,V in list(root.items()):
|
|
ret[K] = cq_pattern(V,indent+" ")
|
|
|
|
elif type(root) == type([]):
|
|
ret = []
|
|
for K in root:
|
|
ret.append(cq_pattern(K, indent+" "))
|
|
|
|
elif type(root) == type("abc"): ret = root # ret += '"'+root+'"'
|
|
elif type(root) == type(55): ret = root # ret += str(root)
|
|
elif type(root) == type(5.5): ret = root # ret += str(root)
|
|
elif type(root) == type(False):
|
|
if root == False: ret = root # return "False"
|
|
elif root == True: ret = root # return "True"
|
|
|
|
result = lookForMatch(pat,root)
|
|
if result: ret = result
|
|
|
|
if ret: return ret
|
|
return root
|
|
|
|
|
|
|
|
def myprinter(item, indent=''):
|
|
if type(item) == type( ('a','b') ) and len(item)==2 and type(item[0])==type('a') and type(item[1])==type( {"a":2} ):
|
|
return "[[" + item[0] + ": " + ('\n'+indent+' ').join( [ K+":> "+ myprinter(V,indent+" ") for K,V in item[1].items() ] ) + "]]"
|
|
if type(item) == type( {} ):
|
|
return "{" + ('\n'+indent+' ').join( [ K+": "+ myprinter(V,indent+" ") for K,V in item.items() ] )+"}"
|
|
if type(item) == type( [] ):
|
|
return "[" + ('\n'+indent+' ').join( [ myprinter(I,indent+" ") for I in item ] )+"]"
|
|
return '"|'+str(item)+'|"'
|
|
|
|
|
|
def cq_pattern_start():
|
|
root = json.loads( open('cache/programs/programs_2.txt','r').read())
|
|
outt = open('cache/test_prog.txt','w')
|
|
|
|
result = cq_pattern(root,'\n')
|
|
for R in result:
|
|
outt.write(myprinter(R)+"\n")
|
|
|
|
k_srt = sorted(labels.keys())
|
|
|
|
for k in k_srt:
|
|
v = labels[k]
|
|
print(" Slot %i:\t%i hits" % (k/2,v))
|
|
|
|
def baby_int(j):
|
|
if j=='': return 0
|
|
return int(j)
|
|
|
|
def find_deg_in_cluster( clusters, deg ):
|
|
for k,v in clusters.items():
|
|
if deg in v: return k
|
|
return "pathway_not_found"
|
|
|
|
def try_match_deg_programs():
|
|
# my index, from curricunet, is the "longname". The 'attained' file has medium. kind of.
|
|
|
|
type_lookup = { "Certificate of Proficiency":"CP", "A.A. Degree":"AA", "A.S. Degree":"AS", "Certificate of Achievement":"CA", "A.S.-T Degree":"AS_T", "A.A.-T Degree":"AA_T", "NC-Cmptncy: NC Certificate of Competency":"COMP", "NC-Complet: NC Certificate of Completion":"COMP" }
|
|
|
|
# Curricunet
|
|
curicunet_version = {}
|
|
for f in os.listdir('cache/programs'):
|
|
if not re.search('index|programs',f):
|
|
#print(f)
|
|
pro = json.loads(open('cache/programs/'+f,'r').read()) # blocks title dept key type desc
|
|
for c in pro:
|
|
longname = c['dept'] + " | " + c['title'] + " | " + c['type']
|
|
curicunet_version[longname] = 0
|
|
abbrev = "??"
|
|
if c['type'] in type_lookup:
|
|
abbrev = type_lookup[ c['type'] ]
|
|
#print(" " + abbrev + ": " + longname)
|
|
|
|
# for each in 'attained' list, try to match to correspondent in variants/long list.
|
|
#
|
|
#
|
|
|
|
gp_clusters = {}
|
|
current_cluster = "X"
|
|
gp_file = open('cache/g_path_cluster2020.txt','r')
|
|
for L in gp_file:
|
|
L = L.strip()
|
|
if L:
|
|
if L.startswith('#'):
|
|
mch = re.search(r'^\#\s(.*)$',L)
|
|
if mch:
|
|
current_cluster = mch.group(1)
|
|
gp_clusters[ current_cluster ] = []
|
|
else:
|
|
gp_clusters[ current_cluster ].append( L )
|
|
|
|
#print( gp_clusters )
|
|
#x = input('paused')
|
|
|
|
matchers = csv.reader(open('cache/deg_name_variants.csv','r'),delimiter=",")
|
|
by_long = {}
|
|
by_medium = {}
|
|
by_med_unmatched = {}
|
|
by_gp_name = {}
|
|
line = 0
|
|
for row in matchers: # variants
|
|
if line==0:
|
|
pass
|
|
else:
|
|
by_long[ row[3] ] = row
|
|
by_gp_name[ row[2] ] = row
|
|
by_medium[ row[1] ] = row # # # **
|
|
by_med_unmatched[ row[1] ] = row # # # **
|
|
#if row[1]: print(row[1])
|
|
|
|
# remove from curricunet list so i can see whats left
|
|
if row[3] in curicunet_version:
|
|
curicunet_version[ row[3] ] = 1
|
|
line += 1
|
|
|
|
by_medium[''] = [0,0,0,0,0,0,0,0,0,0]
|
|
#print(by_medium)
|
|
# Attained List
|
|
attained = csv.reader(open('cache\degrees_attained.csv','r'),delimiter=",") # 1 6 22 17
|
|
|
|
line = 0
|
|
matched = {}
|
|
unmatched = {}
|
|
#print("These ones I can't match.")
|
|
|
|
for row in attained:
|
|
if line==0:
|
|
attained_columns = row
|
|
attained_columns.append("total")
|
|
attained_columns.insert(0,"shortname")
|
|
attained_columns.insert(0,"pathway")
|
|
attained_columns.insert(0,"dept")
|
|
attained_columns.insert(0,"type")
|
|
|
|
attained_columns.insert(5,"longname")
|
|
else:
|
|
row.insert(0,'sn')
|
|
row.insert(0,'p')
|
|
row.insert(0,'d')
|
|
row.insert(0,'t')
|
|
row.insert(5,'')
|
|
#print("Matching by medium name: %s" % str(row))
|
|
#print("Matched to: %s" % str(by_medium[ row[4] ]) )
|
|
|
|
matching_longname = by_medium[row[4]][3]
|
|
|
|
if len(matching_longname):
|
|
#print("matching longname: %s" % matching_longname)
|
|
row[5] = matching_longname ### THE matching longname
|
|
m_parts = matching_longname.split(" | ")
|
|
dept = m_parts[0]
|
|
ttype = m_parts[2]
|
|
row[1] = dept
|
|
row[0] = ttype
|
|
matched[row[4]] = row
|
|
|
|
row[3] = by_medium[row[4]][0] # shortname
|
|
|
|
row[2] = find_deg_in_cluster(gp_clusters, by_medium[row[4]][2])
|
|
|
|
print("OK: " + str(row))
|
|
else:
|
|
row[0] = ''
|
|
row[1] = ''
|
|
row[2] = ''
|
|
row[3] = ''
|
|
row[5] = ''
|
|
print("XX: " + str(row))
|
|
unmatched[row[4]] = row
|
|
line += 1
|
|
print("matched %i and missed %i." % (len(matched),len(unmatched)))
|
|
#print("\nactually missed %i." % len(by_med_unmatched))
|
|
|
|
print("\nLeftover degrees:")
|
|
for k,v in curicunet_version.items():
|
|
if not v: print(k)
|
|
|
|
|
|
mash_cols = "type dept pathway shortname mediumname longname grad09 10 11 12 13 14 15 16 17 18 total".split(" ")
|
|
mash_rows = []
|
|
|
|
# attained / matched
|
|
for xrow in matched.values():
|
|
mash_rows.append(xrow)
|
|
|
|
# attained / unmatched
|
|
for xrow in unmatched.values():
|
|
mash_rows.append(xrow)
|
|
|
|
# curricunet leftovers
|
|
|
|
|
|
mydf = pd.DataFrame(mash_rows, columns=mash_cols)
|
|
mydf.to_csv('cache/attainment_masterlist.csv',index=False)
|
|
return
|
|
|
|
|
|
|
|
# open('cache/programs/programs_1.txt','r').read()
|
|
|
|
""" SEE serve.py .... i mean ... interactive.py
|
|
def dict_generator(indict, pre=None):
|
|
pre = pre[:] if pre else []
|
|
if isinstance(indict, dict):
|
|
for key, value in indict.items():
|
|
if isinstance(value, dict):
|
|
for d in dict_generator(value, pre + [key]):
|
|
yield d
|
|
elif isinstance(value, list) or isinstance(value, tuple):
|
|
for v in value:
|
|
for d in dict_generator(v, pre + [key]):
|
|
yield d
|
|
else:
|
|
yield str(pre) + " " + str([key, value]) + "\n"
|
|
else:
|
|
yield pre + [indict]
|
|
yield str(pre) + " " + str([indict]) + "\n"
|
|
|
|
|
|
|
|
def print_dict(v, prefix='',indent=''):
|
|
if isinstance(v, dict):
|
|
return [ print_dict(v2, "{}['{}']".format(prefix, k) + "<br />", indent+" " ) for k, v2 in v.items() ]
|
|
elif isinstance(v, list):
|
|
return [ print_dict( v2, "{}[{}]".format(prefix , i) + "<br />", indent+" ") for i, v2 in enumerate(v) ]
|
|
else:
|
|
return '{} = {}'.format(prefix, repr(v)) + "\n"
|
|
|
|
|
|
def walk_file():
|
|
j = json.loads(open('cache/programs/programs_2.txt','r').read())
|
|
|
|
return print_dict(j)
|
|
|
|
from flask import Flask
|
|
from flask import request
|
|
|
|
def tag(x,y): return "<%s>%s</%s>" % (x,y,x)
|
|
|
|
def tagc(x,c,y): return '<%s class="%s">%s</%s>' % (x,c,y,x)
|
|
|
|
def a(t,h): return '<a href="%s">%s</a>' % (h,t)
|
|
|
|
def server_save(key,value):
|
|
codecs.open('cache/server_data.txt','a').write( "%s=%s\n" % (str(key),str(value)))
|
|
|
|
def flask_thread(q):
|
|
app = Flask(__name__)
|
|
|
|
@app.route("/")
|
|
def home():
|
|
return tag('h1','This is my server.') + "<br />" + a('want to shut down?','/sd')
|
|
|
|
@app.route("/save/<key>/<val>")
|
|
def s(key,val):
|
|
server_save(key,val)
|
|
return tag('h1','Saved.') + "<br />" + tag('p', 'Saved: %s = %s' % (str(key),str(val)))
|
|
|
|
@app.route("/crazy")
|
|
def hello():
|
|
r = '<link rel="stylesheet" href="static/bootstrap.min.css">'
|
|
r += tag('style', 'textarea { white-space:nowrap; }')
|
|
r += tag('body', \
|
|
tagc('div','container-fluid', \
|
|
tagc('div','row', \
|
|
tagc( 'div', 'col-md-6', tag('pre', walk_file() ) ) + \
|
|
tagc( 'div', 'col-md-6', 'Column 2' + a('Shut Down','/shutdown' ) ) ) ) )
|
|
|
|
|
|
|
|
return r
|
|
|
|
@app.route("/sd")
|
|
def sd():
|
|
print('SIGINT or CTRL-C detected. Exiting gracefully')
|
|
func = request.environ.get('werkzeug.server.shutdown')
|
|
if func is None:
|
|
raise RuntimeError('Not running with the Werkzeug Server')
|
|
func()
|
|
return "Server has shut down."
|
|
app.run()
|
|
|
|
|
|
from queue import Queue
|
|
|
|
q = Queue()
|
|
|
|
def serve():
|
|
import webbrowser
|
|
import threading
|
|
x = threading.Thread(target=flask_thread, args=(q,))
|
|
x.start()
|
|
webbrowser.open_new_tab("http://localhost:5000")
|
|
|
|
|
|
|
|
|
|
#s = open('cache/programs/index.json','w')
|
|
#s.write( json.dumps({'departments':sorted(list(dept_index)), 'programs':prog_index}, indent=2) )
|
|
#s.close()
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
# feb 2020 goal:
|
|
# - treemap of graduates in each division, dept, pathway, degree type, major
|
|
# - rollover or other interactive explorer of pathways
|
|
|
|
|
|
# sept short term goals:
|
|
# 1. viable presentation on web pages w/ good overview
|
|
# 1a. - will necessarily include courses, learning outcomes cause it depends on them.
|
|
# 2. show progs close to 50% limit
|
|
# 3. foundation for visualization, model degree attainment, and simulation
|
|
# 4. prep for work on iLearn -> SLO -> any useful contributions I can make
|
|
#
|
|
|
|
|
|
# sept 8, 2020 approach:
|
|
# 1 hr: pull labels, types, interesting notes, discern structures that are most
|
|
# important. The 20% that gets me 80%.
|
|
#
|
|
# 1 hr: 2-3 short experiments for different ways of pattern matching them.
|
|
#
|
|
# 1 hr: best (rushed) effort to condense it all into accurate (if incomplete)
|
|
# compact data structure.
|
|
#
|
|
# 1 hr: php to fetch and display for a given prog, deg, dept, cert, or overview.
|
|
#
|
|
# show draft live on wed sept 10.
|
|
#
|
|
|
|
|
|
|
|
|
|
"""
|
|
def attempt_match8020(rules,item):
|
|
var1 = ''
|
|
for i,x in enumerate(rules):
|
|
if i % 2 == 1:
|
|
a = match(item, var1, x, default='')
|
|
if a:
|
|
labels8020[i-1 / 2] += 1
|
|
|
|
else:
|
|
var1 = x
|
|
|
|
|
|
#a = match(root,*curic_patterns,default='')
|
|
if a:
|
|
print("Matched: " + str(a))
|
|
return a
|
|
print("Didn't match: " + str(item) + "\n")
|
|
return False
|
|
"""
|
|
|
|
|
|
def clever_printer(item, indent=''):
|
|
if type(item) == type( ('a','b') ) and len(item)==2 and type(item[0])==type('a') and type(item[1])==type( {"a":2} ):
|
|
return "[[" + item[0] + ": " + ('\n'+indent+' ').join( [ K+":> "+ myprinter(V,indent+" ") for K,V in item[1].items() ] ) + "]]"
|
|
if type(item) == type( {} ):
|
|
return "{" + ('\n'+indent+' ').join( [ K+": "+ myprinter(V,indent+" ") for K,V in item.items() ] )+"}"
|
|
if type(item) == type( [] ):
|
|
return "[" + ('\n'+indent+' ').join( [ myprinter(I,indent+" ") for I in item ] )+"]"
|
|
return '"|'+str(item)+'|"'
|
|
|
|
|
|
def print_return(x):
|
|
print('got a hit')
|
|
print()
|
|
return x
|
|
|
|
|
|
from patterns_8020 import pat8020
|
|
|
|
labels8020 = defaultdict(int)
|
|
|
|
def cq_8020(root=0, indent=''):
|
|
# Try to match the root, and if no match, try to break it up ( dicts, lists ) and
|
|
# recurse on those parts.
|
|
|
|
# if no matches below this point in the tree, return false
|
|
ret = []
|
|
|
|
for pattern in pat8020:
|
|
m = match( root, pattern, print_return )
|
|
if m:
|
|
print('case 1')
|
|
print('this: ' + str(m))
|
|
print('matched this pattern: ' + str(pattern))
|
|
print(print_return)
|
|
|
|
xyz = input('enter to continue')
|
|
ret.append(m)
|
|
|
|
"""
|
|
if type(root) == type({}):
|
|
for K,V in list(root.items()):
|
|
m = cq_8020(V)
|
|
if m:
|
|
print('case 2')
|
|
ret.append(m)
|
|
|
|
elif type(root) == type([]):
|
|
for V in root:
|
|
m = cq_8020(V)
|
|
if m:
|
|
print('case 3')
|
|
ret.append(m)"""
|
|
|
|
return ret
|
|
|
|
|
|
|
|
|
|
def cq_8020_start():
|
|
""" (programs) entityType entityTitle status proposalType sectionName lastUpdated lastUpdatedBy
|
|
fieldName displayName lookUpDisplay fieldValue instanceSortOrder
|
|
lookUpDataset (array of dicts, each has keys: name, value, and corresponding values.)
|
|
|
|
subsections or fields (arrays) - ignore for now just takem in order
|
|
|
|
(courses) same as above?
|
|
|
|
html values: markdown convert?
|
|
|
|
"""
|
|
root = json.loads( open('cache/programs/programs_2.txt','r').read())
|
|
outt = open('cache/test_prog8020.txt','w')
|
|
|
|
result = cq_8020(root,'\n')
|
|
|
|
outt.write( json.dumps( result, indent=2 ) )
|
|
|
|
|
|
|
|
|
|
|
|
##### Restored from an earlier version
|
|
|
|
def recurse3(sec,path=''):
|
|
output = ''
|
|
if 'subsections' in sec and len(sec['subsections']):
|
|
for subsec in sec['subsections']:
|
|
#pdb.set_trace()
|
|
id = get_id_sortorder(subsec)
|
|
output += recurse3(subsec, path + subsec['attributes']['sectionName'] + " ("+id+") | ")
|
|
if 'fields' in sec and len(sec['fields']):
|
|
for subfld in sec['fields']:
|
|
try:
|
|
fld = handleField(subfld)
|
|
if fld:
|
|
dbg('Field: %s' % str(fld))
|
|
output += path + subfld['attributes']['fieldName'] + " | " + fld + "\n"
|
|
except Exception as e:
|
|
print("Problem in field: %s"% str(e))
|
|
print(subfld)
|
|
x = input('enter to continue')
|
|
return output
|
|
|
|
|
|
def get_id_sortorder(sec):
|
|
ord = ''
|
|
if 'instanceSortOrder' in sec['attributes']:
|
|
ord = str(sec['attributes']['instanceSortOrder'])
|
|
if 'sectionSortOrder' in sec['attributes']:
|
|
ord = str(sec['attributes']['sectionSortOrder'])
|
|
if ord and int(ord)<10: ord = '0'+ord
|
|
if 'instanceId' in sec['attributes']:
|
|
return ord + '-' + str(sec['attributes']['instanceId'])
|
|
elif 'sectionSortOrder' in sec['attributes']:
|
|
return ord + '-' + str(sec['attributes']['sectionSortOrder'])
|
|
else: return ord
|
|
|
|
|
|
def include_exclude(str,inc,exc=[]):
|
|
# True if str contains anything in inc, and does not contain anything in exc
|
|
good = False
|
|
for i in inc:
|
|
if i in str: good = True
|
|
if not good: return False
|
|
for e in exc:
|
|
if e in str: return False
|
|
return True
|
|
|
|
def pbd3(str):
|
|
# get the id from the 'Program Block Definitions' in the 3rd position
|
|
p = str.split("|")
|
|
if len(p)>3: str = p[2]
|
|
m = re.search(r'Program\sBlock\sDefinitions\s\(([\-\d]+)\)',str)
|
|
if m:
|
|
if m.group(1) != '0':
|
|
return m.group(1)
|
|
return 0
|
|
|
|
def handleField(f):
|
|
lud = ''
|
|
if 'lookUpDisplay' in f: lud = boolToStr(f['lookUpDisplay'])
|
|
#fv = unicode(f['fieldValue']).replace('\n', ' ').replace('\r', '')
|
|
fv = str(f['fieldValue']).replace('\n', ' ').replace('\r', '')
|
|
if not lud and not fv: return False
|
|
|
|
return f['attributes']['fieldName'] + ': ' + lud + " / " + fv
|
|
|
|
def boolToStr(b):
|
|
if isinstance(b,bool):
|
|
if b: return "True"
|
|
return "False"
|
|
return b
|
|
|
|
# Almost final formatting
|
|
def prog_info_to_entry(c):
|
|
out = {}
|
|
p1 = c.split(" | ")
|
|
if p1[2]=="Program Title":
|
|
print(p1[3][18:])
|
|
return {'title':p1[3][18:]}
|
|
if p1[2]=="Department":
|
|
d = p1[3][12:]
|
|
prt = d.split(" / ")
|
|
return {'dept':prt[0]}
|
|
if p1[2]=="Award Type":
|
|
return {'type':p1[3][12:].split(' /')[0]}
|
|
if p1[2]=="Description":
|
|
desc = p1[3][16:]
|
|
soup = bs(desc, 'html.parser')
|
|
for s in soup.find_all('span'): s.unwrap()
|
|
for e in soup.find_all(True):
|
|
e.attrs = {}
|
|
dd = str(soup)
|
|
dd = re.sub('\u00a0',' ',dd)
|
|
return {'desc':dd}
|
|
return {}
|
|
|
|
def cbd_to_entry(c):
|
|
parts = c.split(" | ")
|
|
if parts[3]=='Course Block Definition':
|
|
p2 = parts[4].split(" / ")
|
|
return { 'rule':p2[1] }
|
|
return {}
|
|
|
|
def pc5(str):
|
|
# get the id from the 'Program Courses' in the 5th position
|
|
p = str.split("|")
|
|
if len(p)>5: str = p[4]
|
|
m = re.search(r'Program\sCourses\s\(([\-\d]+)\)',str)
|
|
if m:
|
|
if m.group(1) != '0':
|
|
return m.group(1)
|
|
return 0
|
|
|
|
def remove_prefix(str,i):
|
|
p = str.split(" | ")
|
|
if len(p) > i:
|
|
return " | ".join(p[i:])
|
|
return str
|
|
|
|
def course_to_entry(c,order="0"):
|
|
p1 = c.split(" | ")
|
|
dbg(" c2e: %s" % str(c))
|
|
if p1[1] == "Course":
|
|
p2 = p1[2].split(" / ")
|
|
origname = order+"|"+p2[0][8:]
|
|
id = p2[1]
|
|
#return {'id':id,'origname':origname}
|
|
dbg(" c2e is course: %s" % str(origname))
|
|
return origname
|
|
if p1[1] == "Condition":
|
|
#print p1[2][11:13]
|
|
if p1[2][11:13] == 'or':
|
|
#return {'ornext':1}
|
|
dbg(" c2e is OR")
|
|
return " | OR "
|
|
if p1[0] == "Non-Course Requirements":
|
|
#pdb.set_trace()
|
|
dbg(" c2e is header: %s" % str(p1[1][28:]))
|
|
return order + "header2" + "|" + p1[1][28:]
|
|
return ''
|
|
|
|
def courseline_to_pretty(line):
|
|
# from this: 01-125780|THEA1 - Theatre History: Greece to Restoration 3.000 *Active*
|
|
# 09-125764|THEA19 - Acting and Voice for TV/Film/Media 3.000 *Historical* | OR
|
|
# 11-129282header2|Choose additional units from the courses below to complete the unit minimum requirement:
|
|
# to decent looking
|
|
return line
|
|
out = ''
|
|
oor = 0
|
|
#pdb.set_trace()
|
|
parts = line.split("|")
|
|
if re.search('header2',parts[0]):
|
|
out = "<div class='secondary'>" + parts[1] + "</div>"
|
|
elif len(parts) > 2 and parts[2]==" OR":
|
|
oor = 1
|
|
m = re.search(r'(.*)\s\-\s(.*)\s([0-9{1,3}\.\s\-]+)\s\*(\w*)\*',parts[1])
|
|
if m:
|
|
code = m.group(1)
|
|
name = m.group(2)
|
|
units = m.group(3)
|
|
active = m.group(4)
|
|
if oor: name += "<strong>OR</strong>"
|
|
out = "<div class='aclass'><span class='cl_code'>"+code+"</span><span class='cl_name'>"+name+"<span class='cl_unit'>"+units+"</span></div>"
|
|
return out
|
|
|
|
|
|
|
|
# restarted oct 2019 and try to simplify
|
|
def prog_take_4(program):
|
|
fullyProcessed = ''
|
|
for r in program['entityFormData']['rootSections']:
|
|
dbg('a recurse3 call...')
|
|
fullyProcessed += recurse3(r,program['entityMetadata']['entityTitle']+" | ")
|
|
taken = []
|
|
|
|
for L in (program['entityMetadata']['entityTitle'] + fullyProcessed).split('\n'):
|
|
if include_exclude(L,['Description','Department','Award Type','Program Title','Course Block Definition','Program Courses','Outcome | Outcome | Outcome | Outcome'], ['Map SLO to']):
|
|
taken.append(L)
|
|
program_struct = { 'blocks':[]}
|
|
# start dividing up course blocks
|
|
blocks = groupby(pbd3,taken)
|
|
for k,v in blocks.items(): # each of the PDBs
|
|
block = { 'order':str(k) }
|
|
for a in v:
|
|
dbg('block: ' + str(k))
|
|
course_list = []
|
|
if k == 0:
|
|
program_struct.update(prog_info_to_entry(a))
|
|
else:
|
|
#pdb.set_trace()
|
|
block.update(cbd_to_entry(a))
|
|
courses = groupby(pc5,blocks[k])
|
|
for C,cval in courses.items(): # each of the courses
|
|
df = [remove_prefix(x,5) for x in cval]
|
|
#my_c = { 'order':str(C) }
|
|
courseline = ''
|
|
for K in df:
|
|
c2e = course_to_entry(K,C)
|
|
dbg(" c2e: %s" % str(c2e))
|
|
if re.search('header2',c2e):
|
|
course_list.append( courseline_to_pretty(courseline))
|
|
courseline = c2e
|
|
continue
|
|
if re.search('3\sUnit\sMin',c2e):
|
|
dbg(" --courseline: %s" % str(courseline))
|
|
courseline = re.sub('1\.000\s+\-\s+2\.000','3.000',courseline)
|
|
dbg(" ---courseline changed: %s" % str(courseline))
|
|
continue # hack for span non native opt 2
|
|
# TODO
|
|
courseline += c2e
|
|
dbg(" courseline: %s" % str(courseline))
|
|
#if courseline:
|
|
# my_c.update(courseline)
|
|
# #if 'id' in my_c and my_c['id'] in ids:
|
|
# # my_c['reference'] = ids[my_c['id']]
|
|
dbg('--new courseline--')
|
|
if courseline: course_list.append( courseline_to_pretty(courseline))
|
|
block['courses'] = sorted(course_list)
|
|
if block: program_struct['blocks'].append(block)
|
|
#jsonout.write( json.dumps(program_struct,indent=2) )
|
|
#return '\n'.join(taken)
|
|
return program_struct
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
#cq_8020_start()
|
|
#exit()
|
|
|
|
print ('')
|
|
options = { 1: ['Fetch all class data from curricunet',fetch_all_classes] ,
|
|
2: ['Fetch all program data from curricunet', fetch_all_programs] ,
|
|
3: ['Translate class data to condensed json files', show_classes] ,
|
|
4: ['Translate program data to condensed json files', show_programs] ,
|
|
5: ['Try to consolidate lists of programs and degrees and # attained', try_match_deg_programs] ,
|
|
#5: ['Check DE', check_de] ,
|
|
#6: ['Sort courses', organize_courses] ,
|
|
#7: ['Clean up degree/program entries', clean_programs] ,
|
|
#8: ['Reorganize degree/program entries', organize_programs] ,
|
|
#9: ['Reorganize degree/program entries, take 2', organize_programs2] ,
|
|
10:['Find online programs', find_online_programs],
|
|
11:['Which courses were scheduled as online?', summarize_online_sections],
|
|
12:['First try with logic rules', smart_find_online_programs],
|
|
13:['Another try, simplified', simple_find_online_programs],
|
|
14:['Parse programs with pattern matching', cq_start],
|
|
15:['Parse programs with pattern matching, take 2', cq_pattern_start],
|
|
#16:['Baby web server', serve],
|
|
16:['80 20 effort. Sept 2020', cq_8020_start],
|
|
17:['Organize programs stage 2 (2021)', organize_programs_stage2],
|
|
}
|
|
|
|
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
|
|
resp = int(sys.argv[1])
|
|
print("\n\nPerforming: %s\n\n" % options[resp][0])
|
|
|
|
else:
|
|
print ('')
|
|
for key in options:
|
|
print(str(key) + '.\t' + options[key][0])
|
|
|
|
print('')
|
|
resp = input('Choose: ')
|
|
|
|
# Call the function in the options dict
|
|
options[ int(resp)][1]()
|
|
|