import json, codecs, re, markdown, os, pypandoc, striprtf, sqlite3, random, urllib import subprocess, html, time from markdownify import markdownify as md from striprtf.striprtf import rtf_to_text from flask import Flask, render_template, Response, jsonify, request from flask import send_from_directory import hashlib, funcy, platform, requests from datetime import datetime #from orgpython import to_html from localcache import sqlite_file, db # personnel_meta # personnel_fetch from localcache import user_enrolled_in from localcache import arrange_data_for_web, depts_with_classcounts, dept_with_studentviews, course_quick_stats from yattag import Doc import socket this_host = socket.gethostname() print('\n\n server host: ' + this_host, '\n\n') datafile2 = "cache/datafile.txt" LECPATH = "/media/hd2/peter_home_offload/lecture/" host = 'http://deep1:5000' news_path = '/media/hd2/peter_home/Documents/scripts/browser/' writing_path = '/media/hd2/peter_home/Documents/writing/' img_path = '/media/hd2/peter_home/Documents/writing_img/' pics_path = '/media/hd2/peter_home/misc/' if this_host == 'ROGDESKTOP': LECPATH = "d:/peter_home_offload/lecture/" host = 'http://192.168.1.7:5000' news_path = 'd:/peter_home/Documents/scripts/browser/' writing_path = 'd:/peter_home/Documents/writing/' img_path = 'd:/peter_home/Documents/writing_img/' pics_path = 'd:/peter_home/misc/' import sys app = Flask(__name__, template_folder='templates') ################################################################################################################# ################################################################################################################# ###### ###### writing & knowledgebase ###### br = "
" nl = "\n" style = """ """ ## I implement the backend of a web based GUI for the canvas functions ## and dashboard stuff. ## Stories / Next Steps ## 1. browse my writings. Follow links in markdown format to other files ## in this folder. html, md, rtf are handled so far. ## 2a. Sort by date, title, topic/tag ## 2b. Use w/ larger topic modeling project to suggest relations ## 2. Run through everything in the folder, and index the 'backlinks'. ## Append those to each file. ## 3 (interrupted) Use vue and implement editing. Q: convert back to ## original format? Or maintain new one. A: Convert back. ## 3. Do similar for tags. ## 4. Do similar but automatically, using nlp and keywords. def tag(x,y): return "<%s>%s" % (x,y,x) def tagc(x,c,y): return '<%s class="%s">%s' % (x,c,y,x) def a(t,h): return '%s' % (h,t) @app.route('/') def homepage(): return tag('h1','Canvas Tools') + br + \ a('Useful Emails','/useful-info') + br + br + \ a('Reload server','/rl') + br + \ a('Shut down','/sd') @app.route('/useful-info') def useful_info_page(): return render_template('useful_info.html') @app.route('/useful-info/') def useful_info_page_with_tag(tag): # Same template; Vue reads the tag from the URL path return render_template('useful_info.html') @app.route('/api/useful-info') def useful_info_api(): # Filters: start, end (ISO date), tags (comma-separated) start = request.args.get('start') or None end = request.args.get('end') or None tags = request.args.get('tags') or '' tag_list = [t.strip() for t in tags.split(',') if t.strip()] try: from localcache2 import db CON, CUR = db() try: params = [] where = [] if start: where.append('s.created_at >= %s') params.append(start) if end: where.append('s.created_at <= %s') params.append(end) where_sql = ('WHERE ' + ' AND '.join(where)) if where else '' CUR.execute( f""" SELECT s.id, s.date_label, s.short_text, s.summary_text, COALESCE(array_agg(DISTINCT t.name) FILTER (WHERE t.name IS NOT NULL), '{{}}') AS tags, COALESCE(array_agg(DISTINCT a.path) FILTER (WHERE a.path IS NOT NULL), '{{}}') AS attachments FROM useful_info_summary s LEFT JOIN useful_info_summary_tag st ON st.summary_id=s.id LEFT JOIN useful_info_tag t ON t.id=st.tag_id LEFT JOIN useful_info_email e ON e.summary_id = s.id LEFT JOIN useful_info_email_attachment ea ON ea.email_id = e.id LEFT JOIN useful_info_attachment a ON a.id = ea.attachment_id {where_sql} GROUP BY s.id ORDER BY to_date('01/' || s.date_label, 'DD/MM/YY') DESC NULLS LAST, s.created_at DESC """, tuple(params) ) rows = CUR.fetchall() # Filter by tags in python (any-match) out = [] for r in rows: rid, dlabel, short, summary, tgs, atts = r[0], r[1], r[2], r[3], r[4] or [], r[5] or [] tgs2 = list(tgs) atts2 = list(atts) if tag_list: if not any(t in tgs2 for t in tag_list): continue out.append({'id': rid, 'date': dlabel, 'title': short, 'summary': summary, 'tags': tgs2, 'attachments': atts2}) return jsonify(out) finally: CUR.close(); CON.close() except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/data/') def send_cachedata(path): # serve files under cache return send_from_directory('cache', path) @app.route('/api/useful-info/tag/') def useful_info_by_tag(tag): # Convenience endpoint to filter by a single tag # Delegates to the main handler by injecting the tag query param args = request.args.to_dict(flat=True) args['tags'] = tag with app.test_request_context('/api/useful-info', query_string=args): return useful_info_api() def _shutdown_server(): func = request.environ.get('werkzeug.server.shutdown') if func is None: raise RuntimeError('Not running with the Werkzeug Server') func() @app.route('/sd') def shutdown(): try: _shutdown_server() return 'Server shutting down...' except Exception as e: return f'Error shutting down: {e}', 500 @app.route('/rl') def reload_self(): # Attempt to restart the process in-place try: python = sys.executable os.execv(python, [python] + sys.argv) except Exception as e: return f'Error reloading: {e}', 500 return 'Reloading...' @app.route('/health') def health(): return jsonify({'app': 'server.py', 'status': 'ok'}), 200 if __name__ == '__main__': host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', '5000')) app.run(host=host, port=port, debug=True) def orgline(L): L.strip() if re.search(r"^\s*$", L): return "" a = re.search( r'^\*\s(.*)$', L) if a: return "

%s

\n" % a.group(1) b = re.search( r'TODO\s\[\#A\](.*)$', L) if b: return "Todo - Priority 1: %s" % b.group(1) + br + nl d = re.search( r'^\*\*\*\s(.*)$', L) if d: return d.group(1) d = re.search( r'^\*\*\s(.*)$', L) if d: L = d.group(1) return L + br + nl def editor(src): return br + br + br + """""" % src def in_form(txt,path): return '
' + \ '' + \ txt + \ '
' def mytime(fname): return os.path.getmtime( os.path.join(writing_path, fname) ) def index(): #f = [ os.path.join(writing_path, x) for x in os.listdir(writing_path) ] f = os.listdir(writing_path) f.sort(key=mytime) f.reverse() return "

\n".join( ["%s (%s)" % (x,x,datetime.fromtimestamp(mytime(x)).strftime('%Y-%m-%d %H')) for x in f ] ) def writing(fname): if fname == 'index': return index() inp = codecs.open(writing_path + fname, 'r', 'utf-8') ext = fname.split('.')[-1] if ext == "py" or ext == "php": src = inp.read() return "
" + html.escape(src) + "
" if ext == "html": src = inp.read() return src if ext == "md": src = inp.read() return style + markdown.markdown(src) + in_form(editor(src),fname) #if ext == "org": # src = inp.read() # return (src, toc=True, offset=0, highlight=True) if ext == "rtto_htmlf": text = "
\n".join( rtf_to_text(inp.read()).split('\n') ) return style + text if ext == "docx": hash = hashlib.sha1("my message".encode("UTF-8")).hexdigest() hash = hash[:10] #output = pypandoc.convert_file('C:/Users/peter/Nextcloud/Documents/writing/' + fname, 'html', output = pypandoc.convert_file(writing_path + fname, 'html', extra_args=['--extract-media=%s' % hash ]) # file:///c:/Users/peter/Nextcloud/Documents/writing output2 = pypandoc.convert_file(writing_path + fname, 'markdown', extra_args=['--extract-media=%s' % hash+'_md' ]) # file:///c:/Users/peter/Nextcloud/Documents/writing new_fname = fname[:-5] + '.md' #as_md = md(output) return style + output + in_form(editor(output2),new_fname) return style + markdown.markdown( "".join( [ orgline(x) for x in inp.readlines() ] ) ) ################################################################################################################# ################################################################################################################# ###### ###### kiosk display ###### def dashboard(): return 'kiosk moved' def dash(): return 'kiosk moved' def mycalendar(): return 'kiosk moved' def most_recent_file_of(target, folder): return '' def news(): return '' def randPic(): return '' def do_img_crop(im): return '' ################################################################################################################# ################################################################################################################# ###### ###### db info helpers ###### def sample(): return "

I am a sample

" def sample2(a=""): return "I'm a placeholder" # Filter a stream of loglines for those that match a course's url / id def has_course(stream,courseid): regex = '/courses/%i/' % int(courseid) while True: L = stream.readline() if re.search(regex, L): yield L def js(s): return json.dumps(s, indent=2) def sem_from_array_crn(crn): if not crn[2]: return "" if crn[2] == "": return "" return crn[2][:6] ################################################################################################################# ################################################################################################################# ###### ###### db ilearn course / user / hits ###### def user_courses(uid): return js(user_enrolled_in(uid)) def user_course_history_summary(usr_id): q = """SELECT r.timeblock, r.viewcount, c.sis, c.code, c.canvasid FROM requests_sum1 AS r JOIN users AS u ON r.userid=u.id JOIN courses AS c ON c.id=r.courseid WHERE u.canvasid=%s GROUP BY r.courseid ORDER BY r.viewcount DESC;""" % str(usr_id) (conn,cur) = db() cur.execute(q) r = cur.fetchall() return js(r) groups = funcy.group_by(sem_from_array_crn, r) g = {} for K in groups.keys(): g[K] = [ x[3] for x in groups[K] ] return js( g ) def roster(crn): q = """SELECT u.name, u.sortablename, u.canvasid as user_id, c.canvasid as course_id, e.workflow, e."type" FROM enrollment AS e JOIN users AS u ON e.user_id=u.id JOIN courses AS c ON c.id=e.course_id WHERE c.canvasid="%s" ;""" % str(crn) (conn,cur) = db() cur.execute(q) return js(cur.fetchall()) def user_course_hits(usr,courseid): return list(has_course( codecs.open('cache/users/logs/%s.csv' % usr, 'r', 'utf-8'), courseid)) #return "\n".join( [x for x in next(gen)] ) def profiles(id=1,b=2,c=3): import os pics = os.listdir('cache/picsCanvas') return ''.join([ "" % s for s in pics ]) # Departments, classes in each, and students (with hits) in each of those. def enrollment(a): return js(depts_with_classcounts()) # All the classes in this dept, w/ all the students in each, with count of their views. def dept(d=''): if not d: return js(dept_with_studentviews()) return js(dept_with_studentviews(d)) def user(canvas_id=None): info = json.loads( codecs.open( 'cache/users/%s.txt' % canvas_id, 'r', 'utf-8').read() ) return render_template('hello.html', id=canvas_id, name=info['name']) ################################################################################################################# ################################################################################################################# ###### ###### podcast feed ###### def lectures(): fi = os.listdir(LECPATH) doc, tag, text = Doc().tagtext() doc.asis('') doc.asis('') with tag('channel'): with tag('title'): text("Peter's Lecture Series") with tag('description'): text("Since 2019") with tag('link'): text(host) for f in fi: if f.endswith('.mp3'): #print(f) with tag('item'): name = f.split('.')[0] ff = re.sub(r'\s','%20',f) with tag('title'): text(name) with tag('guid'): text(f) b = os.path.getsize(LECPATH+f) doc.stag('enclosure', url=host+'/podcast/media/'+urllib.parse.quote(ff), type='audio/mpeg',length=b) doc.asis('') #doc.asis('') return doc.getvalue() def web_lectures(): fi = os.listdir(LECPATH) output = "

Lectures

\n" for f in fi: if f.endswith('.mp3'): name = f.split('.')[0] ff = urllib.parse.quote(f) #ff = re.sub('\s','%20',f) output += '%s
\n' % ( host + '/podcast/media/' + ff, name) return output ################################################################################################################# ################################################################################################################# ###### ###### editing personnel app ###### # personnel_fetch, personnel_meta # todo: update: dept, title, any of the other fields. # insert: new dept, new title, # update a value: dept id of a personnel id def update_pers_title(pid, tid): q = "UPDATE personnel SET `title`='%s' WHERE `id`='%s'" % (str(tid), str(pid)) (conn,cur) = db() result = cur.execute(q) conn.commit() return js( {'result': 'success'} ) # update a value: dept id of a personnel id def update_pers_dept(pid, did): q = "UPDATE personnel SET `dept1`='%s' WHERE `id`='%s'" % (str(did), str(pid)) (conn,cur) = db() result = cur.execute(q) conn.commit() return js( {'result': 'success'} ) def user_edit(canvas_id='2'): info = json.loads( codecs.open( 'cache/users/%s.txt' % str(canvas_id), 'r', 'utf-8').read() ) return render_template('personnel.html', id=canvas_id, name=info['name']) def staff_dir(search=''): return render_template('dir.html') ###### ###### handling images ###### def find_goo(n): g = re.search(r'00(\d{6})', n) if g: return g.groups()[0] return '' def byname(x): if 'conf_name' in x: return x['conf_name'] if 'first_name' in x and 'last_name' in x: return x['first_name'] + " " + x['last_name'] return '' def fn_to_struct( n, staff ): g = find_goo(n) if g: #print(g) for s in staff: cg = s['conf_goo'] if cg == g: #print("%s - %s - %s" % (n, g, cg) ) return s return { "conf_goo":g, "conf_name":"unknown - " + n } return 0 def image_edit(filename=''): url = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=list/staffsemester" staff = json.loads( requests.get(url).text ) badges = 0 web = 1 if web: files = sorted(os.listdir('cache/picsStaffdir') ) done_files = [ x[:-4] for x in sorted(os.listdir('cache/picsStaffdir/cropped') ) ] if badges: files = sorted(os.listdir('cache/picsId/originals_20211022') ) done_files = [ x[:6] for x in sorted(os.listdir('cache/picsId/2021crop') ) ] files_match = [] files_no_match = [] raw_filenames = files for f in files: sa = fn_to_struct(f,staff) if sa: ss = sa.copy() else: ss = sa if ss: ss['filename'] = f files_match.append(ss) else: files_no_match.append(f) fm = json.dumps( sorted(files_match,key=byname) ) fnm = json.dumps(files_no_match) sm = json.dumps(staff) return render_template('images.html', staff=sm, matches=fm, nomatches=fnm, checked=done_files) def image_crop(filename,x,y,w,h,newname=''): from PIL import Image import piexif badges = 0 web = 1 if not newname: newname = filename if web: im = Image.open('cache/picsStaffdir/%s' % filename) savepath = 'cache/picsStaffdir/cropped/%s.jpg' % newname if badges: im = Image.open('cache/picsId/originals_20211022/%s' % filename) savepath = 'cache/picsId/2021crop/%s.jpg' % newname out = { 'im': str(im) } x = int(x) y = int(y) w = int(w) h = int(h) if "exif" in im.info: exif_dict = piexif.load(im.info['exif']) #out['exif'] = exif_dict #print(exif_dict) if piexif.ImageIFD.Orientation in exif_dict['0th']: #exif_dict['0th'][piexif.ImageIFD.Orientation] = 3 print(piexif.ImageIFD.Orientation) print(exif_dict['0th']) out['rotation'] = 'messed up' if exif_dict['0th'][piexif.ImageIFD.Orientation] == 6: im = im.rotate(270, expand=True) #im.save('cache/picsId/originals_20211022/crotated_%s' % filename, quality=95) im_crop = im.crop((x,y,x+w,y+h)) img_resize = im_crop.resize((250, 333)) img_resize.save(savepath, quality=95) return json.dumps( out ) #if filename=='list': # #return '
\n'.join([ "%s" % ( x,x ) for x in # return '
\n'.join([ "%s" % ( x,x ) for x in sorted(os.listdir('cache/picsId/originals_20211022')) ]) ################################################################################################################# ################################################################################################################# ###### ###### server infrastructure ###### def server_save(key,value): codecs.open(datafile2,'a').write( "%s=%s\n" % (str(key),str(value))) def server_dispatch_json(function_name,arg='', arg2=''): print("Looking for function: %s. arg:%s. arg2:%s." % (function_name, arg, arg2)) try: result = "" + globals()[function_name](arg, arg2) print("doing 2 args") return result except Exception as e: print("Error with that: %s" % str(e)) try: result = "" + globals()[function_name](arg) # print("doing 1 arg") return result except Exception as f: print("Error with that: %s" % str(f)) try: result = globals()[function_name]() print("doing 0 arg") return result except Exception as gg: print("Error with that: %s" % str(gg)) return json.dumps({'result':'failed: exception', 'e1':str(e), 'e2':str(f), 'e3':str(gg)}, indent=2) def server_dispatch(function_name,arg='', arg2=''): print("Looking for function: %s. arg:%s. arg2:%s." % (function_name, arg, arg2)) try: result = "" + globals()[function_name](arg, arg2) print("doing 2 args") return result except Exception as e: print("Error with that: %s" % str(e)) try: result = "" + globals()[function_name](arg) # print("doing 1 arg") return result except Exception as f: print("Error with that: %s" % str(f)) try: result = globals()[function_name]() print("doing 0 arg") return result except Exception as gg: print("Error with that: %s" % str(gg)) return json.dumps({'result':'failed: exception', 'e1':str(e), 'e2':str(f), 'e3':str(gg)}, indent=2)