""" Random tasks to classes and canvas, Making everyone a teacher Fixing closed class Bulk enrolling people Positive attendance / 20 - 60 calculation Bulk crosslisting sftp upload to web badgr stuff """ import pysftp, os, datetime, requests, re, json, sqlite3, codecs, csv, sys import funcy, os.path, datetime, calendar, time, shutil, urllib from datetime import datetime from collections import defaultdict from time import gmtime, strftime from time import mktime from semesters import human_to_short from canvas_secrets import badgr_target, badgr_hd if os.name != 'posix': import win32com.client import win32com.client as win32 import pypandoc from docxtpl import DocxTemplate import xlwt from pipelines import header, url, fetch, convert_roster_files, move_to_folder from courses import course_enrollment from users import teacherRolesCache from util import match59, partition #from localcache import local_data_folder, sqlite_file, db, user_goo_to_email ######### ######### BOOKSTORE ######### ######### def scrape_bookstore(): big_courselist_url = "https://svc.bkstr.com/courseMaterial/courses?storeId=10190&termId=100058761" bcu_cached = json.loads( open('cache/bookstore_courses.json','r').read() ) one_section = "https://svc.bkstr.com/courseMaterial/results?storeId=10190&langId=-1&catalogId=11077&requestType=DDCSBrowse" # NO TEXT another_section = "https://svc.bkstr.com/courseMaterial/results?storeId=10190&langId=-1&catalogId=11077&requestType=DDCSBrowse" # 3 REQUIRED at: # [""0""].courseSectionDTO[""0""].courseMaterialResultsList # # and also: # # [""0""].courseSectionDTO[""0""].sectionAdoptionDTO.materialAdoptions def survey_answer(q=0): if not q: q = int( input("which column? ") ) fff = csv.reader( codecs.open('cache/sp21_survey_answers.csv','r','utf-8'), delimiter=',') for row in fff: print(row[q]) def survey_organize(): fff = csv.reader( codecs.open('cache/sp21_survey_answers.csv','r','utf-8'), delimiter=',') ans = [] for i in range(27): ans.append([]) for row in fff: for i,item in enumerate(row): print(item) ans[i].append(item) for i in range(27): ans[i].sort() outp = codecs.open('cache/sp21_answersinorder.txt','w','utf-8') for i in range(27): outp.write( "\n".join(ans[i]) ) outp.write("\n\n\n\n\n") def build_quiz(filename=""): if not filename: filename = 'cache/he2.txt' quiz_id = "33285" course_id = "10179" quiz_group = 15096 input_lines = codecs.open(filename,'r', 'utf-8').readlines() qs = [] qs_post_data = [] this_q = "" this_as = { } correct_answer = "" state = "q_text" for L in input_lines: if state == "q_text": this_q = L.strip() state = "answers" elif state =="answers": m = re.search( '^Answer\:\s(\w)$', L) if m: correct_answer = m.group(1) qs.append( [this_q, this_as, correct_answer ] ) state = "q_text" this_as = { } correct_answer = "" continue m = re.search( '^(\w)\)\s(.*)$', L) if m: print(m.group(1)) print(m.group(2)) this_as[m.group(1)] = m.group(2) print(json.dumps( qs, indent=2 )) i = 1 for Q in qs: answers = [] for k,v in Q[1].items(): answers.append({"answer_text": v, "answer_weight": 100 if k==Q[2] else 0, }) this_q = { "question": {"question_name": "q"+str(i), "position": i, "question_text": Q[0], "question_type": "multiple_choice_question", "points_possible": 1, "answers": answers}} qs_post_data.append(this_q) i += 1 for Q in qs_post_data: print(json.dumps(Q, indent=2)) if input("enter to upload, or s to skip: ") != "s": u = url + "/api/v1/courses/%s/quizzes/%s/questions" % (course_id, quiz_id) print(u) resp = requests.post( u, headers=header, json=Q ) print ( resp ) print ( resp.text ) print() # Send an email def send_email(fullname, firstname, addr, subj, content): outlook = win32.Dispatch('outlook.application') #get a reference to Outlook mail = outlook.CreateItem(0) #create a new mail item mail.To = addr mail.Subject = subj mail.HTMLBody = content mail.Display() def convert_to_pdf(name1, name2): wd = 'C:\\Users\\peter\\Documents\\gavilan\\canvasapp\\' wd = 'I:/canvasapp/' wd = 'C:/Users/phowell/source/repos/canvasapp/' print( wd + name1 ) try: word = win32.DispatchEx("Word.Application") worddoc = word.Documents.Open(wd+name1) worddoc.SaveAs(wd+name2, FileFormat = 17) worddoc.Close() except Exception as e: print(e) return e finally: word.Quit() # Build (docx/pdf) certificates for gott graduates def certificates_gott_build(): course = "gott_1_su25" coursedate = "Summer 2025" certificate = "gott 1 template.docx" #course = "gott_4_su25" #certificate = "gott 4 template.docx" i = 0 for row in csv.reader( open(f'cache/completers_{course}.csv','r'), delimiter=','): i += 1 if i < 2: continue print(row[0]) try: lname,fname = row[0].split(",") name = fname.strip() + " " + lname.strip() except Exception as e: name = row[0].strip() doc = DocxTemplate(f"cache/certificates/{certificate}") doc.render({ 'name' : name, 'coursedate': coursedate }) name_as_filename = re.sub('\s', '_', name.lower()) fn = f"cache/certificates/{course}_{name_as_filename}." print(fn+'docx') doc.save(fn+'docx') convert_to_pdf(fn+'docx', fn+'pdf') # Change LTI Settings. Experimental def modify_x_tool(): u2 = "https://gavilan.instructure.com:443/api/v1/accounts/1/external_tools/1462" params = {'course_navigation[default]':'false', "course_navigation[enabled]": "true", "course_navigation[text]": "NameCoach", "course_navigation[url]": "https://www.name-coach.com/lti/single_page/participants/init", "course_navigation[visibility]": "public", "course_navigation[label]": "NameCoach", "course_navigation[selection_width]": 800, "course_navigation[selection_height]": 400} r2 = requests.put(u2, headers=header, data=params) print ( r2.text ) # Upload with sftp to www website folder: student/online/srt/classfoldername def put_file(classfoldername): folder = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') cnopts = pysftp.CnOpts() cnopts.hostkeys = None with pysftp.Connection('www.gavilan.edu',username='cms', password='TODO',cnopts=cnopts) as sftp: sftp.chdir('student/online/srt/'+classfoldername) files = sftp.listdir() print ( folder + "\tI see these files on remote: ", files, "\n" ) localf = os.listdir('video_srt/'+classfoldername) print ( "I see these local: ", localf ) # copy files and directories from local static, to remote static, # preserving modification times on the files for f in localf: print ( "This local file: " + f + " ", ) if not f in files: sftp.put('video_srt/'+classfoldername+'/'+f, f, preserve_mtime=True) print ( "Uploaded." ) else: print ( "Skipped." ) if len(files)==3 and 'users.csv' in files: sftp.get('courses.csv','rosters/courses-'+folder+'.csv') sftp.get('users.csv','rosters/users-'+folder+'.csv') sftp.get('enrollments.csv','rosters/enrollments-'+folder+'.csv') print ( folder + '\tSaved three data files in rosters folder.' ) courses = open('rosters/courses-'+folder+'.csv','r') courses.readline() a = courses.readline() print ( a ) courses.close() parts = a.split(',') year = parts[1][0:4] ss = parts[1][4:6] #print ( parts[1] ) sem = {'30':'spring', '50':'summer', '70':'fall' } this_sem = sem[ss] #print ( this_sem, "", year ) print ( folder + '\tbuilding data file...' ) convert_roster_files(this_sem,year,folder) print ( folder + '\tmoving files...' ) move_to_folder(this_sem,year,folder) else: print ( folder + "\tDon't see all three files." ) sftp.close() # Switch everyone in a class to a teacher def switch_enrol(): global results, header results = [] id = input("Id of course? ") url = "https://gavilan.instructure.com:443/api/v1/courses/"+id+"/enrollments?type[]=StudentEnrollment" while (url): url = fetch(url) all_stud = results for S in all_stud: print ( S['user']['name'] ) print( "Switching to teacher." ) u2 = "https://gavilan.instructure.com:443/api/v1/courses/"+id+"/enrollments" params = {'course_id':id, 'enrollment[user_id]':S['user_id'],'enrollment[type]':'TeacherEnrollment'} r2 = requests.post(u2, headers=header, data=params) #print( "Response: ", r2.text ) #res = json.loads(r2.text) #if input('continue? y/n ') == 'y': u3 = "https://gavilan.instructure.com:443/api/v1/courses/"+id+"/enrollments/"+str(S['id']) params = {'course_id':id, 'id':S['id'],'task':'delete'} r3 = requests.delete(u3, headers=header, data=params) #print( "Response: ", r3.text ) # Change dates & term of a class to unrestrict enrollment def unrestrict_course(): id = input('the course id? ') t1 = url + '/api/v1/courses/' + id course = fetch(t1) # CHANGE DATES CHANGE TERM print( str(course['id']) + "\t", course['name'], "\t", course['workflow_state'] ) t2 = url + '/api/v1/courses/' + str(course['id']) data = {'course[end_at]':'','course[restrict_enrollments_to_course_dates]': 'false', 'course[term_id]':'27'} r2 = requests.put(t2, headers=header, params=data) print( "\t", r2 ) print( 'ok' ) print( "\tEnd at: " + str(course['end_at']) ) print( "\tRestricted enrollment: " + str(course['restrict_enrollments_to_course_dates']) ) # ADD ENROLLMENT t3 = url + '/api/v1/courses/' + id + '/enrollments' form = {'enrollment[user_id]':'30286', 'enrollment[type]':'TaEnrollment', 'enrollment[enrollment_state]':'active' } #r3 = requests.post(t3, headers=header, params=form) #print( "\t", r3.text ) print( '\tok' ) # Bulk enroll users into a course """ def enroll_accred(): global results, results_dict,header # enroll this account in every published course in the semester r = url + '/api/v1/accounts/1/courses?enrollment_term_id=23&perpage=100' all_courses = fetch(r) i = 0 #print( "These courses have custom dates and restricted enrollment:" ) for k in all_courses: if k['workflow_state'] in ['completed','available']: i += 1 ### Handle courses with custom end date and restricted entry. Turn that off. print ( str(i) + ".\t", str(k['id']) + "\t", k['name'], "\t", k['workflow_state'] ) t3 = url + '/api/v1/courses/' + str(k['id']) + '/enrollments' form = {'enrollment[user_id]':'30286', 'enrollment[type]':'TeacherEnrollment', 'enrollment[enrollment_state]':'active' } r3 = requests.post(t3, headers=header, params=form) print ( "\t", r2.text ) print ( '\tok' ) """ # Calculate attendance stats based on enrollment/participation at 20% of term progressed, then 60% of term progressed. def twenty_sixty_stats(li): # actual calcs core. li is a list of lines. cen1_only = [] cen2_only = [] neither = [] both = [] for L in li: L = L.strip() parts = L.split(",") # id, lname, fname, before_class_start, before_1st_cen, before_2nd_cen, after_2nd_cen, after_class_end, final_score cen1_yes = int(parts[3]) + int(parts[4]) cen2_yes = int(parts[5]) if cen1_yes and not cen2_yes: cen1_only.append(parts) elif cen2_yes and not cen1_yes: cen2_only.append(parts) elif not cen1_yes and not cen2_yes: neither.append(parts) elif cen1_yes and cen2_yes: both.append(parts) else: print ( "Error: " + L ) #fout = codecs.open('pa_census_'+m.group(1)+'.txt', 'w','utf-8') ret = [] ret.append("cen 1 = " + str(len(cen1_only)+len(both)) + ", cen2 = "+ str(len(cen2_only)+len(both)) + ", AVERAGE = " + str( ( len(cen1_only) +len(both) + len(cen2_only)+len(both) ) / 2.0 ) + "\n\n") ret.append("Census 1 Only: " + str(len(cen1_only)) + "\n") for L in cen1_only: ret.append(",".join(L)+"\n") ret.append("\nCensus 2 Only: " + str(len(cen2_only)) + "\n") for L in cen2_only: ret.append(",".join(L)+"\n") ret.append("\nBoth: " + str(len(both)) + "\n") for L in both: ret.append(",".join(L)+"\n") ret.append("\nNeither: " + str(len(neither)) + "\n") for L in neither: ret.append(",".join(L)+"\n") return ''.join(ret) # Older positive attendance hours calculation. def hours_calc(): # open and read enrollments enrol = json.loads( open('semesters/2018fall/roster_fall18.json','r').read() )[2] # {"course_id": "201870-10001", "status": "active", "role": "student", "user_id": "G00256034"} my_sections = '10689,10977,10978,10979,10980,10981,10982,10983,10985,11074,11075,11076'.split(",") enrollments = defaultdict(list) for E in enrol: id = E['course_id'][7:] if id in my_sections: enrollments[id].append(E['user_id']) #print ( json.dumps(enrollments,indent=2) ) allout = codecs.open('pa_de_noncred.txt','w','utf-8') for f in os.listdir('.'): m = re.match('pa(\d+)\.txt',f) if m: sec = m.group(1) # split up the combined sections if sec == '10977': possible = '10977,10978,10979'.split(',') elif sec == '10980': possible = '10980,10981,10982,10983'.split(',') elif sec == '10985': possible = '10985,11074,11075,11076'.split(',') else: possible = ['10689',] lines_by_sec = {} for s in possible: lines_by_sec[s] = [] fin = codecs.open(f,'r','utf-8').readlines()[1:] for L in fin: parts = L.split(",") # id, lname, fname, before_class_start, before_1st_cen, before_2nd_cen, after_2nd_cen, after_class_end, final_score for s in possible: if parts[0] in enrollments[s]: lines_by_sec[s].append(L) #break #print ( "Split up section " + sec + json.dumps(lines_by_sec,indent=2) ) for S,v in lines_by_sec.items(): allout.write("\n\nSection " + S + "\n") allout.write(twenty_sixty_stats(v) + "\n - - - - - \n\n") def course_2060_dates(crn=""): schedfile = 'su20_sched.json' # TODO schedule = json.loads(open(schedfile,'r').read()) ok = 0 if not crn: crn = input("What is the CRN? ") for s in schedule: if s['crn']== crn: ok = 1 break if not ok: print ( 'I couldn\'t find that CRN in ' + schedfile ) else: a = s['date'].split(' - ') beginT = datetime.strptime(a[0],"%b %d, %Y") endT = datetime.strptime(a[1],"%b %d, %Y") # Begin and end dates - direct from schedule # Calculate 20% / 60% dates. beginDT = datetime.datetime.fromtimestamp(mktime(beginT)) endDT = datetime.datetime.fromtimestamp(mktime(endT)) seconds_length = mktime(endT) - mktime(beginT) length = datetime.timedelta( seconds=seconds_length ) first_cen_date = datetime.timedelta( seconds=(0.2 * seconds_length)) + beginDT second_cen_date = datetime.timedelta( seconds=(0.6 * seconds_length)) + beginDT print ( "Begin: " + str(beginDT) ) print ( "End: " + str(endDT) ) print ( "The length is: " + str(length) ) print ( "First census date is: " + str(first_cen_date) ) print ( "Second census date is: " + str(second_cen_date) ) return (first_cen_date, second_cen_date) def course_update_all_users_locallogs(course_id=''): if not course_id: course_id = input("ID of course to calculate hours? ") emts = course_enrollment(course_id) #print(emts) def hours_calc_pulldata(course_id=''): # broken... endDT = 0 second_cen_date = 0 first_cen_date = 0 beginDT = 0 if not course_id: course_id = input("ID of course to calculate hours? ") emts = course_enrollment(course_id) #print(emts) # all users in this course results = [] t = '/api/v1/courses/' + str(course_id) + '/users' my_users = fetch(t) count = 0 pa = codecs.open('cache/pa.txt','w','utf-8') pa.write("id, lname, fname, before_class_start, before_1st_cen, before_2nd_cen, after_2nd_cen, after_class_end, final_score\n") for S in my_users: try: results = [] #if count > 3: break count += 1 #print ( count ) target = url + '/api/v1/users/' + str(S['id']) + '/page_views?per_page=200' while target: target = fetch(target) # have all student's hits. Filter to only this class results = filter(lambda x: str(x['links']['context']) == id,results) bag = { 0:0, 1:0, 2:0, 3:0, 4:0 } if results: for hit in results: hitT = datetime.strptime(str(hit['created_at']),"%Y-%m-%dT%H:%M:%SZ") hittime = datetime.datetime.fromtimestamp( mktime(hitT) ) if hittime > endDT: bag[4] += 1 elif hittime > second_cen_date: bag[3] += 1 elif hittime > first_cen_date: bag[2] += 1 elif hittime > beginDT: bag[1] += 1 else: bag[0] += 1 record = emts[S['id']]['user']['login_id'] + ", " +emts[S['id']]['user']['sortable_name'] + ", " + str(bag[0]) + ", " + str(bag[1]) + ", " + str(bag[2]) + ", " + str(bag[3]) + ", " + str(bag[4]) + ", " + str(emts[S['id']]['grades']['final_score']) print ( record ) pa.write( record + "\n" ) pa.flush() except Exception as exp: #errors += S['firstname'] + " " + S['lastname'] + " " + S['id'] + "\n" print ( 'exception with ', ) print ( S ) print ( exp ) pass #t = url + '/api/v1/accounts/1/courses/' + str(id) #print ( t ) #while(t): t = fetch_dict(t) #print ( "These are the results: " ) #print ( results_dict ) #prettydate = X.strptime("%b %d, %Y") def pos_atten(): global f, url, results, count, pa, users_by_id, dd errors = "" wr = csv.writer(f,quoting=csv.QUOTE_ALL) pa_wr = csv.writer(pa,quoting=csv.QUOTE_MINIMAL) teacherRolesCache() # get users in course 59 target = url + '/api/v1/courses/3295/users?per_page=100' while target: print ( target ) target = fetch(target) students = results results = [] count = 1 wb = xlwt.Workbook() ws = wb.add_sheet('Course Attendance') ws.write(0,0, "Positive Attendance Report") ws.write(1,0, "2018 Spring Semester") ws.write(2,0, "LIB 732 Lawrence") col = 0 row = 5 for label in "ID,Lastname Firstname,Hits Total,Sessions Total,Minutes Total,Session Date,Session Hits,Session Minutes".split(","): ws.write(row,col) col +=1 col = 0 f.write("userid,time,ip,url,context,browser,action\n") pa.write("id,lastname,firstname,hits total,sessions total, minutes total,session date,session hits, session minutes\n") dd.write("[") for S in students: try: results = [] ###################### if count > 10: break count += 1 print ( count ) target = url + '/api/v1/users/' + str(S['id']) + '/page_views?per_page=200' while target: print ( target ) target = fetch(target) # have all student's hits. Filter to only this class results = filter(match59,results) if results: times = [] for hit in results: L = [hit['links']['user'],hit['updated_at'],hit['remote_ip'],hit['url'],hit['context_type'],hit['user_agent'],hit['action']] times.insert(0,hit['updated_at']) wr.writerow(L) print ( times ) uu = users_by_id[ S['id'] ] dd.write("{ label: '" + uu['sortable_name'] + "', times: ") part = partition(times) # also writes to dd dd.write("},\n") # print ( students list of sessions ) hits_total = sum( [h[1] for h in part] ) mins_total = sum( [h[2] for h in part] ) lname,fname = uu['sortable_name'].split(",") pa_wr.writerow( [ uu['login_id'], lname,fname, hits_total, str(len(part)),mins_total ] ) for xxy in [ uu['login_id'], lname,fname, hits_total, str(len(part)),mins_total ]: ws.write(row,col,xxy) col += 1 row +=1 col = 0 for P in part: pa_wr.writerow([ '','','','','','',P[0],P[1],P[2]]) for xxy in [ '','','','','','',P[0],P[1],P[2]]: ws.write(row,col,xxy) col +=1 row += 1 col = 0 print ( part ) print ( "\n\n" ) except Exception as exp: #errors += S['firstname'] + " " + S['lastname'] + " " + S['id'] + "\n" pass dd.write("];\n") wb.save('pos_atn.xls') err = codecs.open('pa_error.txt','w', encoding='utf-8') err.write(errors) ## ## Images - profile photos - can exist as: ## ## - picsStaffdir ... or the images_sm dir on www/staff. ## + alternative copies have 2..3..etc appended ## - the new badge photo folder ## - ilearn profile pics ## first_name_subs = """Analisa,Analisa (Lisa) Angelic,Angie Beatriz,Bea Christopher,Chris Conception,Connie Cynthia,Cindy David,Dave Deborah,Debbie Debra,Debbie Desiree,Desiree (Danelle) Diana,Diane Doug,Douglas Frank,Frank (Nick) Herbert,Herb Irving,Irv Isabel,Izzy Isela, Isela M. Janet,Jan Jeffrey,Jeff Jiayin,Jiayain Joanne,Jo Anne Jolynda,JoLynda Jonathan,Jon Josefina,Josie Juan,Juan Esteban Kathryn,Katie Kenneth,Ken Kim,Kimberly Lori,Lorraine Lucy,Lucila Margaret,Margie Maria,Maggie Maria,Mari Maria,Maria (Lupe) Mathew,Matthew Miriam,Mayra Nicholas,Nick Osvaldo,Ozzy Pam,Pamela Ronald,Ron Rosangela,Rose Sandra,Sandy Silvia,Sylvia Tamara,Tammy Timothy,Craig Wong-Lane,Wong van Tuyl,Vantuyl""".split("\n") last_name_subs = """Besson,Besson-Silvia Bernabe Perez,Bernabe Chargin,Bernstein Chargin Dequin,Dequin Bena Dufresne,Dufresne Reyes Gonzalez,Gonzalez Mireles Haehl,Lawton-Haehl Hooper,Hooper-Fernandes Lacarra,LaCarra Larose,LaRose MacEdo,Macedo Miller,Miller Young Najar-Santoyo,Najar Rocha-Gaso,Rocha Smith,Keys Vargas-Padilla,Vargas de Reza,DeReza del Carmen,Del Carmen""".split("\n") def lname(x): return x.split(' ')[-1] def l_initial(x): return x.split(' ')[-1][0] def job_titles2(): inn = open('cache/2020_job_titles.csv','r').readlines() inn = [ x.split(',')[1].strip() for x in inn ] inn = list(funcy.distinct(inn)) inn.sort() if 0: ioo = open('cache/2020_job_title_to_ix.csv','w') for x in inn: ioo.write("%s,\n" % x) u1 = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=get/jobtitles" sss = json.loads( requests.get(u1).text ) ibb = [] for s in sss: ibb.append(s['name']) #print(ibb) print(json.dumps(inn,indent=2)) def job_titles(): title_ids = open('cache/2020_job_title_to_ix.csv','r').readlines() t_ids = [ x.strip().split(',') for x in title_ids ] title_to_id = {} for x in t_ids: #print(x) #ttii = x.split(',') title_to_id[ x[0] ] = x[1] #print(title_to_id) inn = open('cache/2020_job_titles.csv','r').readlines() inn = [ x.split(',') for x in inn ] name_to_title = {} for x in inn: #print(x[0].strip()) parts = x[0].strip().split(' ') fl_name = "%s %s" % ( parts[0], parts[-1] ) name_to_title[ x[0] ] = x[1].strip() name_to_title[ fl_name ] = x[1].strip() firstname_variations = [] first = parts[0] lastname = " ".join(parts[1:]) for fns in first_name_subs: fns_parts = fns.split(',') subbed = re.sub('^'+fns_parts[0]+'$',fns_parts[1].strip(), first) if first != subbed: #print("Subbed %s %s for %s %s" % (subbed,lastname, first, lastname)) name_to_title[ subbed + " " + lastname ] = x[1].strip() subbed = re.sub('^'+fns_parts[1].strip()+'$',fns_parts[0], first) if first != subbed: #print("Subbed %s %s for %s %s" % (subbed,lastname, first, lastname)) name_to_title[ subbed + " " + lastname ] = x[1].strip() for lns in last_name_subs: fns_parts = lns.split(',') subbed = re.sub('^'+fns_parts[0]+'$',fns_parts[1].strip(), lastname) if lastname != subbed: #print("L Subbed %s %s for %s %s" % (first, subbed, first, lastname)) name_to_title[ first + " " + subbed ] = x[1].strip() subbed = re.sub('^'+fns_parts[1].strip()+'$',fns_parts[0], lastname) if lastname != subbed: #print("L Subbed %s %s for %s %s" % (first, subbed, first, lastname)) name_to_title[ first + " " + subbed ] = x[1].strip() unmatched_dir_names = [] m1 = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=menus" menus = json.loads( requests.get(m1).text ) id_to_title = {} for m in menus['titles']: id_to_title[ m['id'] ] = m['name'] id_to_dept = {} for m in menus['departments']: id_to_dept[ m['id'] ] = m['name'] u1 = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=list/staffsemester" sss = json.loads( requests.get(u1).text ) count1 = 0 count2 = 0 warning = open('cache/missing_ext_row.txt','w') for s in sss: easy_name = "%s %s" % (s['first_name'].strip(), s['last_name'].strip()) if easy_name in name_to_title: print( " + %s is %s" % (easy_name, name_to_title[easy_name]) ) p1 = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=get/user/%s" % str(s['id']) uuu = json.loads( requests.get(p1).text ) print("\nFound: %s" % easy_name) print("\tDepartment: %s" % uuu['department']) if not 'ext_id' in uuu: print('\tWARNING no personnel_ext row found!') warning.write("%s,%s\n" % (easy_name, str(uuu['id']))) if 'dept1' in uuu and uuu['dept1']: print("\tDept1: %s" % id_to_dept[ uuu['dept1'] ]) if 'gtitle' in uuu and uuu['gtitle']: print("\tTitle: %s" % id_to_title[ uuu['gtitle'] ]) print("\tDo you want to change the title to %s? y/n " % name_to_title[easy_name]) new_title = name_to_title[easy_name] new_title_id = title_to_id[ new_title ] yn = input("\tid: %s " % str(new_title_id)) if yn == 'y': print("...gonna change...") uppy = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=update_xt&cols=%s&vals=%s&id=%s" % ( "gtitle", str(new_title_id), str(uuu['ext_id']) ) print(uppy) #res = json.loads( requests.get(uppy).text ) res = requests.get(uppy).text print("") print(res) print("") #xyz = input() count1 += 1 else: print( " - %s " % easy_name ) unmatched_dir_names.append(easy_name) count2 += 1 #print( json.dumps(s,indent=2) ) print("\nMatched %i names, with %i remaining unmatched" % (count1, count2) ) print(menus['titles']) return cola = funcy.group_by( l_initial, t_names ) colb = funcy.group_by( l_initial, unmatched_dir_names ) initials = list(funcy.concat(cola.keys(), colb.keys())) initials = list(funcy.distinct(initials)) initials.sort() for i in initials: if i in cola: print('-> title file') for a in cola[i]: print("\t"+a) if i in colb: print('-> dir db') for b in colb[i]: print("\t"+b) print() """longer = max(len(t_names), len(unmatched_dir_names)) for i in range(longer): cola = '' colb = '' if len(t_names) > i: cola = t_names[i] if len(unmatched_dir_names) > i: colb = unmatched_dir_names[i] print(" %s\t\t%s" % (cola,colb)) """ # an early version, before tearing up... def job_titles3(): inn = open('cache/2020_job_titles.csv','r').readlines() inn = [ x.split(',') for x in inn ] t_names = [] fl_names = [] name_to_title = {} fl_to_title = {} for x in inn: parts = x[0].strip().split(' ') fl_name = "%s %s" % ( parts[0], parts[-1] ) t_names.append( x[0] ) fl_names.append( fl_name) name_to_title[ x[0] ] = x[1].strip() fl_to_title[ fl_name ] = x[1].strip() #print( json.dumps(name_to_title,indent=2) ) # t_names has the "state list" t_names.sort( key=lname ) unmatched_dir_names = [] u1 = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=list/staffsemester" sss = json.loads( requests.get(u1).text ) count1 = 0 count2 = 0 count3 = 0 for s in sss: easy_name = "%s %s" % (s['first_name'].strip(), s['last_name'].strip()) if easy_name in t_names: print( " + %s is %s" % (easy_name, name_to_title[easy_name]) ) t_names.remove(easy_name) count1 += 1 elif easy_name in fl_names: print( " + %s is %s" % (easy_name, fl_to_title[easy_name]) ) fl_names.remove(easy_name) count3 += 1 else: print( " . %s " % easy_name ) unmatched_dir_names.append(easy_name) count2 += 1 #print( json.dumps(s,indent=2) ) print("\nMatched %i names, %i F->L only, with %i remaining unmatched" % (count1,count3, count2) ) print() cola = funcy.group_by( l_initial, t_names ) colb = funcy.group_by( l_initial, unmatched_dir_names ) initials = list(funcy.concat(cola.keys(), colb.keys())) initials = list(funcy.distinct(initials)) initials.sort() for i in initials: if i in cola: print('-> title file') for a in cola[i]: print("\t"+a) if i in colb: print('-> dir db') for b in colb[i]: print("\t"+b) print() """longer = max(len(t_names), len(unmatched_dir_names)) for i in range(longer): cola = '' colb = '' if len(t_names) > i: cola = t_names[i] if len(unmatched_dir_names) > i: colb = unmatched_dir_names[i] print(" %s\t\t%s" % (cola,colb)) """ def index_pics(): dir_staff = 'cache/picsStaffdir/' # peter_howell dir_ilearn = 'cache/picsCanvas/' # g00102586 dir_badge = 'cache/picsId/2021crop/' # 102586 u1 = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=list/staffsemester" sss = json.loads( requests.get(u1).text ) by_goo = {} by_fl = {} count1 = 0 count2 = 0 for s in sss: myflag = 0 user = s['first_name'] + " " + s['last_name'] fl = s['first_name'].lower() + "_" + s['last_name'].lower() goo_short = '' if 'conf_goo' in s: goo_short = str(s['conf_goo']) goo_long_p = 'g00' + str(goo_short) + ".png" goo_long_j = 'g00' + str(goo_short) + ".jpg" dir1 = fl + ".jpg" dir2 = fl + "2.jpg" dir3 = fl + "3.jpg" if os.path.isfile( dir_staff + dir1 ): print( "%s \t %s" % (user, dir_staff+dir1)) count2 += 1 myflag = 1 if os.path.isfile( dir_staff + dir2 ): print( "%s \t %s" % (user, dir_staff+dir2)) count2 += 1 myflag = 1 if os.path.isfile( dir_staff + dir3 ): print( "%s \t %s" % (user, dir_staff+dir3)) count2 += 1 myflag = 1 if os.path.isfile( dir_ilearn + goo_long_p ): print( "%s \t %s" % (user, dir_ilearn + goo_long_p)) #try: # shutil.copyfile(dir_ilearn + goo_long_p, "cache/picsUpload/"+ goo_long_p) # print("File copied successfully.") #except Exception as e: # print("Failed to copy...") count2 += 1 myflag = 1 if os.path.isfile( dir_ilearn + goo_long_j ): print( "%s \t %s" % (user, dir_ilearn + goo_long_j)) #try: # shutil.copyfile(dir_ilearn + goo_long_j, "cache/picsUpload/"+ goo_long_j) # print("File copied successfully.") #except Exception as e: # print("Failed to copy...") count2 += 1 myflag = 1 if os.path.isfile( dir_badge + goo_short + '.jpg' ): print( "%s \t %s" % (user, dir_badge + goo_short + '.jpg')) count2 += 1 myflag = 1 count1 += myflag by_goo[ goo_short ] = s by_fl[fl] = s print("Found pics for %i users, a total of %s pics" % (count1,count2)) def cmtes(): ii = codecs.open('cache/committees-survey.csv','r','utf-8').readlines() ii = [ x.split(',') for x in ii ] print( json.dumps(ii,indent=2) ) ### ### ### CALENDAR RELATED FUNCTIONS ### def print_a_calendar(): year = 2024 cur_week = datetime.date.today().isocalendar()[1] print(f"Current week number: {cur_week}") if 0: # Create a calendar for the entire year cal = calendar.Calendar() # Iterate over each month of the year for month in range(1, 13): # Print the month name month_name = calendar.month_name[month] print(f"\n{month_name} ({year})") # Print the weekday abbreviation weekdays = ['w ', 'M ', 'T ', 'W ', 'Th', 'F ', 'Sa', 'S '] print(' '.join([ f"{day:<3}" for day in weekdays] )) # Get the month's calendar month_calendar = cal.monthdatescalendar(year, month) # Iterate over each week in the month for week in month_calendar: # Extract the week number and days of the week week_number = week[0].isocalendar()[1] week_days = [day.day if day.month == month else '' for day in week] # Print the week number and days print(f"{week_number:<4}", end=' ') print(' '.join([ f"{day:<2}" for day in week_days])) ### ### ### ### def generate_custom_calendar(year, semesters): # Create a calendar for the entire year cal = calendar.Calendar() # Iterate over each month of the year for month in range(1, 13): # Print the month name month_name = calendar.month_name[month] print(f"\n{month_name} {year}") # Print the weekday abbreviation weekdays = ['w ', 'M ', 'T ', 'W ', 'Th', 'F ', 'Sa', 'S ', 'sem' ] print(' '.join([f"{day:<3}" for day in weekdays])) # Get the month's calendar month_calendar = cal.monthdatescalendar(year, month) # Iterate over each week in the month for week in month_calendar: # Extract the week number and days of the week week_number = week[0].isocalendar()[1] week_days = [day.day if day.month == month else '' for day in week] #print("week: ", week) # Determine the column value for the 'sem' column sem_value = ' ' for (label, start_week, num_weeks) in semesters: if week_number >= start_week and week_number < start_week + num_weeks: sem_value = (week_number - start_week) + 1 # Print the week number, days, and the 'fa23' column value print(f"{week_number:<4}", end=' ') print(' '.join([f"{day:<2}" for day in week_days]) + f" {sem_value:<2}") # Example usage semesters = [ "fa25,08/25,16", ] l_semesters = [] for sem in semesters: column_label, start_date, num_weeks = sem.split(',') start_dt = datetime.datetime.strptime(start_date + "/" + str(year), "%m/%d/%Y") start_wk = start_dt.isocalendar()[1] l_semesters.append( (column_label, start_wk, int(num_weeks)) ) generate_custom_calendar(year, l_semesters) # task list calendar for a semester def word_calendar(): from docx import Document from docx.shared import Inches import datetime # Define the start date of semester start_date = datetime.date(2025, 8, 25) # Prepare a list of 18 weeks beginning from the start date dates = [start_date + datetime.timedelta(weeks=x) for x in range(18)] # Initialize an instance of a word document doc = Document() table = doc.add_table(rows=1, cols=3) # Set the headers hdr_cells = table.rows[0].cells hdr_cells[0].text = 'Week' hdr_cells[1].text = 'Date' hdr_cells[2].text = 'Events/Notes' # Iterate through the list of dates for i, date in enumerate(dates): cells = table.add_row().cells cells[0].text = str(i+1) cells[1].text = date.strftime("%B %d") cells[2].text = '' # Save the document doc.save('cache/tasks_schedule.docx') # more general purpose def word_calendar_v2(): from docx import Document from docx.shared import Inches import datetime # Define the start date of semester start_date = datetime.date(2024, 7, 1) # Prepare a list of 18 weeks beginning from the start date dates = [start_date + datetime.timedelta(weeks=x) for x in range(40)] # Initialize an instance of a word document doc = Document() table = doc.add_table(rows=1, cols=3) # Set the headers hdr_cells = table.rows[0].cells hdr_cells[0].text = 'Week of' hdr_cells[1].text = 'Events' hdr_cells[2].text = 'Notes' # Iterate through the list of dates for i, date in enumerate(dates): end_date = date + datetime.timedelta(days=6) # Calculate the end date cells = table.add_row().cells #cells[0].text = str(i+1) cells[0].text = f"{date.strftime('%B %d')} - {end_date.strftime('%B %d')}" cells[1].text = '' cells[2].text = '' # Save the document doc.save('cache/weekly_calendar.docx') # TODO some weird hour offset issue w/ these activities def cal(): from ics import Calendar u1 = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=get/sessions" gav_activities = json.loads( requests.get(u1).text ) g_by_uid = {} for g in gav_activities: print("\t" + str(g['cal_uid'])) if g['cal_uid']: g_by_uid[ g['cal_uid'] ] = g for g in gav_activities: pass #print(g) #return print(g_by_uid) url = "https://calendar.google.com/calendar/ical/4aq36obt0q5jjr5p82p244qs7c%40group.calendar.google.com/public/basic.ics" # the plwc cal url = "https://calendar.google.com/calendar/ical/if2r74sfiitva2ko9chn2v9qso%40group.calendar.google.com/public/basic.ics" c = Calendar(requests.get(url).text) for e in list(c.timeline): #print(e) #print() print(e.name) #continue if not str(e.uid) in g_by_uid.keys(): year = str(e.begin) year = year[:4] if not year == "2021": continue print("Not in conf_sessions db: \n\t%s\n\t%s" % ( e.name, e.begin )) addit = input("Do you want to add it? (y/n) ") if addit=='y': payload = { 'title':str(e.name) , 'length': 1, 'starttime':str(e.begin) , 'desc': str(e.description), 'type':220, 'location':str(e.location) , 'cal_uid':str(e.uid) } print(json.dumps(payload,indent=2)) print() r = requests.post("https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=set/newsession", data=payload) print("RESPONSE --> ") print(r.text) #print("\t%s" % e.uid) #print("\t%s\n\t%s\n\t%s\n\t%s\n" % ( str(e.begin), e.description, e.location, str(e.last_modified))) #c # #print(c.events) # {, # , # ...} #e = list(c.timeline)[0] #print("Event '{}' started {}".format(e.name, e.begin.humanize())) def file_renamer(): where = 'cache/picsStaffdir/cropped/' ff = os.listdir(where) for F in ff: nn = re.sub("\.jpg$","",F) print("Old name: %s. New name: %s" % (F, nn)) os.rename( where+F, where+nn ) print("ok") # Use api to fix ilearn's authentication method when we can't log in. List. def list_auth(): r = fetch( url + '/api/v1/accounts/1/authentication_providers') print(json.dumps(r,indent=2)) # Use api to fix ilearn's authentication method when we can't log in. Modify. def update_auth(): #r = fetch( url + '/api/v1/accounts/1/authentication_providers') u = url + '/api/v1/accounts/1/authentication_providers/104' opt = {"metadata_uri": r'https://eis-prod.ec.gavilan.edu/saml/idp-metadataxxx.xml'} r2 = requests.put(u, headers=header, data=opt) print ( r2.text ) #print(json.dumps(r,indent=2)) if __name__ == "__main__": options = { 1: ['Print answers to a single survey question',survey_answer] , 2: ['Collate survey answers',survey_organize] , 4: ['parse committees survey',cmtes] , 5: ['job titles',job_titles] , 6: ['fetch calendar events to conf_sessions db',cal] , 7: ['job titles workings....',job_titles2] , 8: ['collate all profile pics for db',index_pics] , #9: ['process schedule csv file from web',parse_schedule] , 10: ['dumb rename images mistake',file_renamer] , 11: ['list auth', list_auth], 12: ['update auth', update_auth], 13: ['print a calendar', print_a_calendar], 14: ['create a week calendar in word (semester)', word_calendar], 15: ['create a week calendar in word (general purpose)', word_calendar_v2], 16: ['create GOTT certificates', certificates_gott_build], 20: ['build_quiz', build_quiz], #21: ['certificates_gott_build, certificates_gott_build'] } if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]): resp = int(sys.argv[1]) print("\n\nPerforming: %s\n\n" % options[resp][0]) else: print ('') for key in options: print(str(key) + '.\t' + options[key][0]) print('') resp = input('Choose: ') # Call the function in the options dict options[ int(resp)][1]() ################################ if (0): out = open('cache/badgr.txt','w') resp = requests.post(badgr_target, data = badgr_hd) print ( resp ) print ( resp.text ) out.write(resp.text) auth = json.loads(resp.text) if (0): auth = json.loads(open('cache/badgr.txt','r').read()) print ( auth )