canvasapp/server.py

981 lines
32 KiB
Python

import json, codecs, re, markdown, os, pypandoc, striprtf, sqlite3, random, urllib
import subprocess, html, time
from markdownify import markdownify as md
from striprtf.striprtf import rtf_to_text
from flask import Flask, render_template, Response, jsonify, request
from flask import send_from_directory
import hashlib, funcy, platform, requests
from datetime import datetime
#from orgpython import to_html
from localcache import sqlite_file, db # personnel_meta # personnel_fetch
from localcache import user_enrolled_in
from localcache import arrange_data_for_web, depts_with_classcounts, dept_with_studentviews, course_quick_stats
from yattag import Doc
import socket
this_host = socket.gethostname()
print('\n\n server host: ' + this_host, '\n\n')
datafile2 = "cache/datafile.txt"
LECPATH = "/media/hd2/peter_home_offload/lecture/"
host = 'http://deep1:5000'
news_path = '/media/hd2/peter_home/Documents/scripts/browser/'
writing_path = '/media/hd2/peter_home/Documents/writing/'
img_path = '/media/hd2/peter_home/Documents/writing_img/'
pics_path = '/media/hd2/peter_home/misc/'
if this_host == 'ROGDESKTOP':
LECPATH = "d:/peter_home_offload/lecture/"
host = 'http://192.168.1.7:5000'
news_path = 'd:/peter_home/Documents/scripts/browser/'
writing_path = 'd:/peter_home/Documents/writing/'
img_path = 'd:/peter_home/Documents/writing_img/'
pics_path = 'd:/peter_home/misc/'
import sys
app = Flask(__name__, template_folder='templates')
#################################################################################################################
#################################################################################################################
######
###### writing & knowledgebase
######
br = "<br />"
nl = "\n"
style = """<link rel="stylesheet" href="/data/gui/public/simplemde.min.css">
<style> body { line-height: 2.3em; margin: 4em; } #editor { line-height: 1em!important; }
</style>
<script src="/data/gui/public/simplemde.min.js"></script>"""
## I implement the backend of a web based GUI for the canvas functions
## and dashboard stuff.
## Stories / Next Steps
## 1. browse my writings. Follow links in markdown format to other files
## in this folder. html, md, rtf are handled so far.
## 2a. Sort by date, title, topic/tag
## 2b. Use w/ larger topic modeling project to suggest relations
## 2. Run through everything in the folder, and index the 'backlinks'.
## Append those to each file.
## 3 (interrupted) Use vue and implement editing. Q: convert back to
## original format? Or maintain new one. A: Convert back.
## 3. Do similar for tags.
## 4. Do similar but automatically, using nlp and keywords.
def tag(x,y): return "<%s>%s</%s>" % (x,y,x)
def tagc(x,c,y): return '<%s class="%s">%s</%s>' % (x,c,y,x)
def a(t,h): return '<a href="%s">%s</a>' % (h,t)
@app.route('/')
def homepage():
return tag('h1','Canvas Tools') + br + \
a('Useful Emails','/useful-info') + br + \
a('Course Changes','/courses') + br + \
a('User Changes','/users') + br + br + \
a('Reload server','/rl') + br + \
a('Shut down','/sd')
@app.route('/useful-info')
def useful_info_page():
return render_template('useful_info.html')
@app.route('/useful-info/<tag>')
def useful_info_page_with_tag(tag):
# Same template; Vue reads the tag from the URL path
return render_template('useful_info.html')
@app.route('/api/useful-info')
def useful_info_api():
# Filters: start, end (ISO date), tags (comma-separated)
start = request.args.get('start') or None
end = request.args.get('end') or None
tags = request.args.get('tags') or ''
tag_list = [t.strip() for t in tags.split(',') if t.strip()]
try:
from localcache2 import db
CON, CUR = db()
try:
params = []
where = []
if start:
where.append('s.created_at >= %s')
params.append(start)
if end:
where.append('s.created_at <= %s')
params.append(end)
where_sql = ('WHERE ' + ' AND '.join(where)) if where else ''
CUR.execute(
f"""
SELECT s.id, s.date_label, s.short_text, s.summary_text,
COALESCE(array_agg(DISTINCT t.name) FILTER (WHERE t.name IS NOT NULL), '{{}}') AS tags,
COALESCE(array_agg(DISTINCT a.path) FILTER (WHERE a.path IS NOT NULL), '{{}}') AS attachments
FROM useful_info_summary s
LEFT JOIN useful_info_summary_tag st ON st.summary_id=s.id
LEFT JOIN useful_info_tag t ON t.id=st.tag_id
LEFT JOIN useful_info_email e ON e.summary_id = s.id
LEFT JOIN useful_info_email_attachment ea ON ea.email_id = e.id
LEFT JOIN useful_info_attachment a ON a.id = ea.attachment_id
{where_sql}
GROUP BY s.id
ORDER BY to_date('01/' || s.date_label, 'DD/MM/YY') DESC NULLS LAST, s.created_at DESC
""",
tuple(params)
)
rows = CUR.fetchall()
# Filter by tags in python (any-match)
out = []
for r in rows:
rid, dlabel, short, summary, tgs, atts = r[0], r[1], r[2], r[3], r[4] or [], r[5] or []
tgs2 = list(tgs)
atts2 = list(atts)
if tag_list:
if not any(t in tgs2 for t in tag_list):
continue
out.append({'id': rid, 'date': dlabel, 'title': short, 'summary': summary, 'tags': tgs2, 'attachments': atts2})
return jsonify(out)
finally:
CUR.close(); CON.close()
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/data/<path:path>')
def send_cachedata(path):
# serve files under cache
return send_from_directory('cache', path)
@app.route('/api/useful-info/tag/<tag>')
def useful_info_by_tag(tag):
# Convenience endpoint to filter by a single tag
# Delegates to the main handler by injecting the tag query param
args = request.args.to_dict(flat=True)
args['tags'] = tag
with app.test_request_context('/api/useful-info', query_string=args):
return useful_info_api()
@app.route('/courses')
def courses_page():
return render_template('courses.html')
@app.route('/users')
def users_page():
return render_template('users.html')
@app.route('/courses/<course_id>')
def courses_page_deeplink(course_id):
# Client-side Vue app will read location.pathname and deep-link.
return render_template('courses.html')
@app.route('/users/<user_id>')
def users_page_deeplink(user_id):
# Client-side Vue app will read location.pathname and deep-link.
return render_template('users.html')
def _shutdown_server():
func = request.environ.get('werkzeug.server.shutdown')
if func is None:
raise RuntimeError('Not running with the Werkzeug Server')
func()
@app.route('/sd')
def shutdown():
try:
_shutdown_server()
return 'Server shutting down...'
except Exception as e:
return f'Error shutting down: {e}', 500
@app.route('/rl')
def reload_self():
# Attempt to restart the process in-place
try:
python = sys.executable
os.execv(python, [python] + sys.argv)
except Exception as e:
return f'Error reloading: {e}', 500
return 'Reloading...'
@app.route('/health')
def health():
return jsonify({'app': 'server.py', 'status': 'ok'}), 200
# Load enrollment change index JSON for a given term like '2025fall'.
def _load_enrollment_changes(term):
base = os.path.join('cache', 'rosters', term, 'enrollment_changes.json')
if not os.path.exists(base):
return None
try:
with open(base, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return None
# Resolve a term from query args: accept 'term' directly, or 'year'+'sem'.
def _resolve_term(args):
term = args.get('term')
if term:
return term
year = args.get('year')
sem = args.get('sem') # expect 'spring'|'summer'|'fall'
if year and sem:
return f"{year}{sem}"
return None
def _list_terms_with_changes():
"""Scan cache/rosters for terms that contain enrollment_changes.json and return sorted list.
Sort order uses semesters.py by converting 'YYYY<season>' to 'Season YYYY' and then to sort key.
"""
base = os.path.join('cache', 'rosters')
out = []
if not os.path.isdir(base):
return out
# map lowercase season to title
title_map = {'spring': 'Spring', 'summer': 'Summer', 'fall': 'Fall', 'winter': 'Winter'}
try:
from semesters import season_to_number
except Exception:
season_to_number = {'Spring': '30', 'Summer': '50', 'Fall': '70', 'Winter': '10'}
for name in os.listdir(base):
term_path = os.path.join(base, name)
if not os.path.isdir(term_path):
continue
# accept both underscore and hyphen variants
f1 = os.path.join(term_path, 'enrollment_changes.json')
f2 = os.path.join(term_path, 'enrollment-changes.json')
if not (os.path.exists(f1) or os.path.exists(f2)):
continue
# parse 'YYYYseason'
yr = ''.join([c for c in name if c.isdigit()])
season = name[len(yr):]
if not (yr and season):
continue
season_title = title_map.get(season.lower())
if not season_title:
continue
human = f"{season_title} {yr}"
sis_code = f"{yr}{season_to_number.get(season_title, '00')}"
out.append({
'term': name,
'label': human,
'sis_code': sis_code,
'course_prefix': f"{sis_code}-",
'path': term_path,
})
# Sort descending by sis_code (most recent first)
out.sort(key=lambda x: x['sis_code'], reverse=True)
return out
@app.route('/api/rosters/terms')
def api_roster_terms():
return jsonify(_list_terms_with_changes())
def _read_csv_dicts(path):
"""Tolerant CSV reader: returns list of dicts with lowercase, stripped keys/values.
Tries utf-8-sig first, falls back to latin-1.
"""
import csv as _csv
rows = []
if not path or not os.path.exists(path):
return rows
for enc in ('utf-8-sig', 'utf-8', 'latin-1'):
try:
with open(path, 'r', encoding=enc, newline='') as f:
rdr = _csv.DictReader(f)
for r in rdr or []:
norm = {}
for k, v in (r.items() if r else []):
nk = (k.strip().lower() if isinstance(k, str) else k)
if isinstance(v, str):
v = v.strip()
norm[nk] = v
if norm:
rows.append(norm)
break
except Exception:
rows = []
continue
return rows
def _latest_snapshot_file(term, prefix):
base = os.path.join('cache', 'rosters', term)
if not os.path.isdir(base):
return None
try:
files = [f for f in os.listdir(base) if f.startswith(prefix + '.') and f.endswith('.csv')]
def ts_of(name):
label = name[len(prefix)+1:-4]
try:
return datetime.strptime(label, '%Y-%m-%dT%H-%M')
except Exception:
return datetime.min
files.sort(key=ts_of)
if not files:
return None
return os.path.join(base, files[-1])
except Exception:
return None
@app.route('/api/rosters/users')
def api_roster_users():
"""Return latest users list for the given term (from users.*.csv) with optional remote flags from login.*.csv."""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term'}), 400
term_path = os.path.join('cache', 'rosters', term)
users_path = _latest_snapshot_file(term, 'users')
if not users_path or not os.path.exists(users_path):
return jsonify([])
users = []
try:
for r in _read_csv_dicts(users_path):
users.append({
'user_id': r.get('user_id',''),
'login_id': r.get('login_id',''),
'first_name': r.get('first_name',''),
'last_name': r.get('last_name',''),
'email': r.get('email',''),
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Attach remote info from login snapshots if present
try:
rem = {}
files = [f for f in os.listdir(term_path) if f.startswith('login.') and f.endswith('.csv')]
def key_fn(name):
label = name[len('login.'):-4]
try:
return datetime.strptime(label, '%Y-%m-%dT%H-%M')
except Exception:
return datetime.min
files.sort(key=key_fn)
for fname in files:
for r in _read_csv_dicts(os.path.join(term_path, fname)):
uid = r.get('user_id')
if uid:
rem[str(uid)] = r.get('root_account','') or 'remote'
if rem:
for u in users:
if u['user_id'] in rem:
u['remote'] = True
u['root_account'] = rem[u['user_id']]
except Exception:
pass
return jsonify(users)
@app.route('/api/rosters/courses')
def api_roster_courses():
"""Return latest courses list for the given term (from courses.*.csv)."""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term'}), 400
courses_path = _latest_snapshot_file(term, 'courses')
if not courses_path or not os.path.exists(courses_path):
return jsonify([])
rows = []
try:
for r in _read_csv_dicts(courses_path):
rows.append({
'course_id': r.get('course_id',''),
'short_name': r.get('short_name',''),
'long_name': r.get('long_name',''),
'term_id': r.get('term_id',''),
})
except Exception as e:
return jsonify({'error': str(e)}), 500
return jsonify(rows)
# Serve enrollment change data and optional filtered views by user_id or course_id.
@app.route('/api/rosters/changes')
def api_roster_changes():
"""Return enrollment change data; supports optional filters 'user_id' or 'course_id'.
Query params: term=2025fall (or year=2025&sem=fall), user_id=..., course_id=...
"""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term; supply term=YYYY<sem> or year and sem'}), 400
data = _load_enrollment_changes(term)
if data is None:
return jsonify({'error': f'no enrollment_changes.json for term {term}'}), 404
uid = request.args.get('user_id')
cid = request.args.get('course_id')
if uid:
events = data.get('by_user', {}).get(str(uid), [])
return jsonify({'term': term, 'user_id': str(uid), 'events': events})
if cid:
events = data.get('by_course', {}).get(str(cid), [])
return jsonify({'term': term, 'course_id': str(cid), 'events': events})
# No filter: return summary keys only
return jsonify({
'term': term,
'generated': data.get('generated'),
'by_course_keys': sorted(list(data.get('by_course', {}).keys())),
'by_user_keys': sorted(list(data.get('by_user', {}).keys())),
})
# Serve events for a given user_id within a term.
@app.route('/api/rosters/changes/user/<user_id>')
def api_roster_changes_by_user(user_id):
"""Get enrollment/presence events for a specific user_id for the given term."""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term; supply term=YYYY<sem> or year and sem'}), 400
data = _load_enrollment_changes(term)
if data is None:
return jsonify({'error': f'no enrollment_changes.json for term {term}'}), 404
events = data.get('by_user', {}).get(str(user_id), [])
return jsonify({'term': term, 'user_id': str(user_id), 'events': events})
# Serve events for a given course_id within a term.
@app.route('/api/rosters/changes/course/<course_id>')
def api_roster_changes_by_course(course_id):
"""Get enrollment/presence events for a specific course_id (CRN-like) for the given term."""
term = _resolve_term(request.args)
if not term:
return jsonify({'error': 'missing term; supply term=YYYY<sem> or year and sem'}), 400
data = _load_enrollment_changes(term)
if data is None:
return jsonify({'error': f'no enrollment_changes.json for term {term}'}), 404
events = data.get('by_course', {}).get(str(course_id), [])
return jsonify({'term': term, 'course_id': str(course_id), 'events': events})
if __name__ == '__main__':
host = os.environ.get('HOST', '0.0.0.0')
port = int(os.environ.get('PORT', '5000'))
app.run(host=host, port=port, debug=True)
def orgline(L):
L.strip()
if re.search(r"^\s*$", L): return ""
a = re.search( r'^\*\s(.*)$', L)
if a: return "<h2>%s</h2>\n" % a.group(1)
b = re.search( r'TODO\s\[\#A\](.*)$', L)
if b: return "<b><i>Todo - Priority 1</i>: %s</b>" % b.group(1) + br + nl
d = re.search( r'^\*\*\*\s(.*)$', L)
if d: return d.group(1)
d = re.search( r'^\*\*\s(.*)$', L)
if d: L = d.group(1)
return L + br + nl
def editor(src):
return br + br + br + """<textarea name='content' id='editor'>%s</textarea><script>
var simplemde = new SimpleMDE({ element: document.getElementById("editor") });
</script>""" % src
def in_form(txt,path):
return '<form method="post" action="/save"><input type="hidden" name="what" value="writing">' + \
'<input type="hidden" name="path" value="' + path + '" />' + \
txt + \
'<input type="submit" value="Save" name="Save" /></form>'
def mytime(fname):
return os.path.getmtime( os.path.join(writing_path, fname) )
def index():
#f = [ os.path.join(writing_path, x) for x in os.listdir(writing_path) ]
f = os.listdir(writing_path)
f.sort(key=mytime)
f.reverse()
return "<br /><br />\n".join( ["<a href='%s'>%s</a> (%s)" % (x,x,datetime.fromtimestamp(mytime(x)).strftime('%Y-%m-%d %H')) for x in f ] )
def writing(fname):
if fname == 'index': return index()
inp = codecs.open(writing_path + fname, 'r', 'utf-8')
ext = fname.split('.')[-1]
if ext == "py" or ext == "php":
src = inp.read()
return "<pre>" + html.escape(src) + "</pre>"
if ext == "html":
src = inp.read()
return src
if ext == "md":
src = inp.read()
return style + markdown.markdown(src) + in_form(editor(src),fname)
#if ext == "org":
# src = inp.read()
# return (src, toc=True, offset=0, highlight=True)
if ext == "rtto_htmlf":
text = "<br />\n".join( rtf_to_text(inp.read()).split('\n') )
return style + text
if ext == "docx":
hash = hashlib.sha1("my message".encode("UTF-8")).hexdigest()
hash = hash[:10]
#output = pypandoc.convert_file('C:/Users/peter/Nextcloud/Documents/writing/' + fname, 'html',
output = pypandoc.convert_file(writing_path + fname, 'html',
extra_args=['--extract-media=%s' % hash ]) # file:///c:/Users/peter/Nextcloud/Documents/writing
output2 = pypandoc.convert_file(writing_path + fname, 'markdown',
extra_args=['--extract-media=%s' % hash+'_md' ]) # file:///c:/Users/peter/Nextcloud/Documents/writing
new_fname = fname[:-5] + '.md'
#as_md = md(output)
return style + output + in_form(editor(output2),new_fname)
return style + markdown.markdown( "".join( [ orgline(x) for x in inp.readlines() ] ) )
#################################################################################################################
#################################################################################################################
######
###### kiosk display
######
def dashboard():
return 'kiosk moved'
def dash():
return 'kiosk moved'
def mycalendar():
return 'kiosk moved'
def most_recent_file_of(target, folder):
return ''
def news():
return ''
def randPic():
return ''
def do_img_crop(im):
return ''
#################################################################################################################
#################################################################################################################
######
###### db info helpers
######
def sample():
return "<h1>I am a sample</h1>"
def sample2(a=""):
return "I'm a placeholder"
# Filter a stream of loglines for those that match a course's url / id
def has_course(stream,courseid):
regex = '/courses/%i/' % int(courseid)
while True:
L = stream.readline()
if re.search(regex, L): yield L
def js(s):
return json.dumps(s, indent=2)
def sem_from_array_crn(crn):
if not crn[2]: return ""
if crn[2] == "": return ""
return crn[2][:6]
#################################################################################################################
#################################################################################################################
######
###### db ilearn course / user / hits
######
def user_courses(uid):
return js(user_enrolled_in(uid))
def user_course_history_summary(usr_id):
q = """SELECT r.timeblock, r.viewcount, c.sis, c.code, c.canvasid FROM requests_sum1 AS r
JOIN users AS u ON r.userid=u.id
JOIN courses AS c ON c.id=r.courseid
WHERE u.canvasid=%s
GROUP BY r.courseid ORDER BY r.viewcount DESC;""" % str(usr_id)
(conn,cur) = db()
cur.execute(q)
r = cur.fetchall()
return js(r)
groups = funcy.group_by(sem_from_array_crn, r)
g = {}
for K in groups.keys(): g[K] = [ x[3] for x in groups[K] ]
return js( g )
def roster(crn):
q = """SELECT u.name, u.sortablename, u.canvasid as user_id, c.canvasid as course_id, e.workflow, e."type" FROM enrollment AS e
JOIN users AS u ON e.user_id=u.id
JOIN courses AS c ON c.id=e.course_id
WHERE c.canvasid="%s" ;""" % str(crn)
(conn,cur) = db()
cur.execute(q)
return js(cur.fetchall())
def user_course_hits(usr,courseid):
return list(has_course( codecs.open('cache/users/logs/%s.csv' % usr, 'r', 'utf-8'), courseid))
#return "\n".join( [x for x in next(gen)] )
def profiles(id=1,b=2,c=3):
import os
pics = os.listdir('cache/picsCanvas')
return ''.join([ "<img height='45' width='45' hspace='5' vspace='5' src='/cache/picsCanvas/%s' />" % s for s in pics ])
# Departments, classes in each, and students (with hits) in each of those.
def enrollment(a):
return js(depts_with_classcounts())
# All the classes in this dept, w/ all the students in each, with count of their views.
def dept(d=''):
if not d: return js(dept_with_studentviews())
return js(dept_with_studentviews(d))
def user(canvas_id=None):
info = json.loads( codecs.open( 'cache/users/%s.txt' % canvas_id, 'r', 'utf-8').read() )
return render_template('hello.html', id=canvas_id, name=info['name'])
#################################################################################################################
#################################################################################################################
######
###### podcast feed
######
def lectures():
fi = os.listdir(LECPATH)
doc, tag, text = Doc().tagtext()
doc.asis('<?xml version="1.0" encoding="UTF-8"?>')
doc.asis('<rss xmlns:itunes="http://www.itunes.com/dtds/podcast-1.0.dtd" version="2.0">')
with tag('channel'):
with tag('title'): text("Peter's Lecture Series")
with tag('description'): text("Since 2019")
with tag('link'): text(host)
for f in fi:
if f.endswith('.mp3'):
#print(f)
with tag('item'):
name = f.split('.')[0]
ff = re.sub(r'\s','%20',f)
with tag('title'): text(name)
with tag('guid'): text(f)
b = os.path.getsize(LECPATH+f)
doc.stag('enclosure', url=host+'/podcast/media/'+urllib.parse.quote(ff), type='audio/mpeg',length=b)
doc.asis('</rss>')
#doc.asis('</xml>')
return doc.getvalue()
def web_lectures():
fi = os.listdir(LECPATH)
output = "<h1>Lectures</h1>\n"
for f in fi:
if f.endswith('.mp3'):
name = f.split('.')[0]
ff = urllib.parse.quote(f)
#ff = re.sub('\s','%20',f)
output += '<a href="%s">%s</a><br />\n' % ( host + '/podcast/media/' + ff, name)
return output
#################################################################################################################
#################################################################################################################
######
###### editing personnel app
######
# personnel_fetch, personnel_meta
# todo: update: dept, title, any of the other fields.
# insert: new dept, new title,
# update a value: dept id of a personnel id
def update_pers_title(pid, tid):
q = "UPDATE personnel SET `title`='%s' WHERE `id`='%s'" % (str(tid), str(pid))
(conn,cur) = db()
result = cur.execute(q)
conn.commit()
return js( {'result': 'success'} )
# update a value: dept id of a personnel id
def update_pers_dept(pid, did):
q = "UPDATE personnel SET `dept1`='%s' WHERE `id`='%s'" % (str(did), str(pid))
(conn,cur) = db()
result = cur.execute(q)
conn.commit()
return js( {'result': 'success'} )
def user_edit(canvas_id='2'):
info = json.loads( codecs.open( 'cache/users/%s.txt' % str(canvas_id), 'r', 'utf-8').read() )
return render_template('personnel.html', id=canvas_id, name=info['name'])
def staff_dir(search=''):
return render_template('dir.html')
######
###### handling images
######
def find_goo(n):
g = re.search(r'00(\d{6})', n)
if g:
return g.groups()[0]
return ''
def byname(x):
if 'conf_name' in x:
return x['conf_name']
if 'first_name' in x and 'last_name' in x:
return x['first_name'] + " " + x['last_name']
return ''
def fn_to_struct( n, staff ):
g = find_goo(n)
if g:
#print(g)
for s in staff:
cg = s['conf_goo']
if cg == g:
#print("%s - %s - %s" % (n, g, cg) )
return s
return { "conf_goo":g, "conf_name":"unknown - " + n }
return 0
def image_edit(filename=''):
url = "https://hhh.gavilan.edu/phowell/map/dir_api_tester.php?a=list/staffsemester"
staff = json.loads( requests.get(url).text )
badges = 0
web = 1
if web:
files = sorted(os.listdir('cache/picsStaffdir') )
done_files = [ x[:-4] for x in sorted(os.listdir('cache/picsStaffdir/cropped') ) ]
if badges:
files = sorted(os.listdir('cache/picsId/originals_20211022') )
done_files = [ x[:6] for x in sorted(os.listdir('cache/picsId/2021crop') ) ]
files_match = []
files_no_match = []
raw_filenames = files
for f in files:
sa = fn_to_struct(f,staff)
if sa:
ss = sa.copy()
else:
ss = sa
if ss:
ss['filename'] = f
files_match.append(ss)
else: files_no_match.append(f)
fm = json.dumps( sorted(files_match,key=byname) )
fnm = json.dumps(files_no_match)
sm = json.dumps(staff)
return render_template('images.html', staff=sm, matches=fm, nomatches=fnm, checked=done_files)
def image_crop(filename,x,y,w,h,newname=''):
from PIL import Image
import piexif
badges = 0
web = 1
if not newname: newname = filename
if web:
im = Image.open('cache/picsStaffdir/%s' % filename)
savepath = 'cache/picsStaffdir/cropped/%s.jpg' % newname
if badges:
im = Image.open('cache/picsId/originals_20211022/%s' % filename)
savepath = 'cache/picsId/2021crop/%s.jpg' % newname
out = { 'im': str(im) }
x = int(x)
y = int(y)
w = int(w)
h = int(h)
if "exif" in im.info:
exif_dict = piexif.load(im.info['exif'])
#out['exif'] = exif_dict
#print(exif_dict)
if piexif.ImageIFD.Orientation in exif_dict['0th']:
#exif_dict['0th'][piexif.ImageIFD.Orientation] = 3
print(piexif.ImageIFD.Orientation)
print(exif_dict['0th'])
out['rotation'] = 'messed up'
if exif_dict['0th'][piexif.ImageIFD.Orientation] == 6:
im = im.rotate(270, expand=True)
#im.save('cache/picsId/originals_20211022/crotated_%s' % filename, quality=95)
im_crop = im.crop((x,y,x+w,y+h))
img_resize = im_crop.resize((250, 333))
img_resize.save(savepath, quality=95)
return json.dumps( out )
#if filename=='list':
# #return '<br />\n'.join([ "<a href='/data/picsId/originals_20211022/%s'>%s</a>" % ( x,x ) for x in
# return '<br />\n'.join([ "<a href='/image/%s'>%s</a>" % ( x,x ) for x in sorted(os.listdir('cache/picsId/originals_20211022')) ])
#################################################################################################################
#################################################################################################################
######
###### server infrastructure
######
def server_save(key,value):
codecs.open(datafile2,'a').write( "%s=%s\n" % (str(key),str(value)))
def server_dispatch_json(function_name,arg='', arg2=''):
print("Looking for function: %s. arg:%s. arg2:%s." % (function_name, arg, arg2))
try:
result = "" + globals()[function_name](arg, arg2)
print("doing 2 args")
return result
except Exception as e:
print("Error with that: %s" % str(e))
try:
result = "" + globals()[function_name](arg) #
print("doing 1 arg")
return result
except Exception as f:
print("Error with that: %s" % str(f))
try:
result = globals()[function_name]()
print("doing 0 arg")
return result
except Exception as gg:
print("Error with that: %s" % str(gg))
return json.dumps({'result':'failed: exception', 'e1':str(e), 'e2':str(f), 'e3':str(gg)}, indent=2)
def server_dispatch(function_name,arg='', arg2=''):
print("Looking for function: %s. arg:%s. arg2:%s." % (function_name, arg, arg2))
try:
result = "" + globals()[function_name](arg, arg2)
print("doing 2 args")
return result
except Exception as e:
print("Error with that: %s" % str(e))
try:
result = "" + globals()[function_name](arg) #
print("doing 1 arg")
return result
except Exception as f:
print("Error with that: %s" % str(f))
try:
result = globals()[function_name]()
print("doing 0 arg")
return result
except Exception as gg:
print("Error with that: %s" % str(gg))
return json.dumps({'result':'failed: exception', 'e1':str(e), 'e2':str(f), 'e3':str(gg)}, indent=2)