552 lines
16 KiB
Python
552 lines
16 KiB
Python
import re, datetime, urllib, sys
|
|
import markdown, json, threading
|
|
|
|
#import heapq, csv, os, shutil, itertools, time, os.path, webbrowser
|
|
#from functools import wraps
|
|
# from content import my_site
|
|
|
|
from flask import Flask, request, send_from_directory, Response, render_template
|
|
from flask import send_file
|
|
from flask_socketio import SocketIO, emit
|
|
from werkzeug.routing import PathConverter
|
|
from queue import Queue
|
|
from importlib import reload
|
|
|
|
import server
|
|
import localcache
|
|
from server import *
|
|
from canvas_secrets import flask_secretkey
|
|
|
|
|
|
|
|
|
|
|
|
### SET THIS ENV VAR TO GET RELOADING
|
|
# (win)
|
|
# set FLASK_ENV=development
|
|
# (linux)
|
|
# export FLASK_ENV=development
|
|
|
|
import socket
|
|
this_host = socket.gethostname()
|
|
|
|
print('\n\n' + this_host, '\n\n')
|
|
|
|
has_curses = 0
|
|
if this_host != 'ROGDESKTOP':
|
|
import curses
|
|
has_curses = 1
|
|
else:
|
|
print("Skipping curses stuff")
|
|
|
|
q = Queue()
|
|
|
|
|
|
HOST_NAME = '192.168.1.6' #
|
|
PORT_NUMBER = 8080 # Maybe set this to 9000.
|
|
|
|
datafile = 'lambda.csv'
|
|
|
|
|
|
|
|
####
|
|
#### This little web server is going to work with the "gui" folder / vue app
|
|
####
|
|
####
|
|
|
|
|
|
|
|
def dict_generator(indict, pre=None):
|
|
pre = pre[:] if pre else []
|
|
if isinstance(indict, dict):
|
|
for key, value in indict.items():
|
|
if isinstance(value, dict):
|
|
for d in dict_generator(value, pre + [key]):
|
|
yield d
|
|
elif isinstance(value, list) or isinstance(value, tuple):
|
|
for v in value:
|
|
for d in dict_generator(v, pre + [key]):
|
|
yield d
|
|
else:
|
|
yield str(pre) + " " + str([key, value]) + "\n"
|
|
else:
|
|
yield pre + [indict]
|
|
yield str(pre) + " " + str([indict]) + "\n"
|
|
|
|
|
|
|
|
def print_dict(v, prefix='',indent=''):
|
|
if isinstance(v, dict):
|
|
return [ print_dict(v2, "{}['{}']".format(prefix, k) + "<br />", indent+" " ) for k, v2 in v.items() ]
|
|
elif isinstance(v, list):
|
|
return [ print_dict( v2, "{}[{}]".format(prefix , i) + "<br />", indent+" ") for i, v2 in enumerate(v) ]
|
|
else:
|
|
return '{} = {}'.format(prefix, repr(v)) + "\n"
|
|
|
|
|
|
def walk_file():
|
|
j = json.loads(open('cache/programs/programs_2.txt','r').read())
|
|
|
|
return print_dict(j)
|
|
|
|
|
|
def tag(x,y): return "<%s>%s</%s>" % (x,y,x)
|
|
|
|
def tagc(x,c,y): return '<%s class="%s">%s</%s>' % (x,c,y,x)
|
|
|
|
def a(t,h): return '<a href="%s">%s</a>' % (h,t)
|
|
|
|
def server_save(key,value):
|
|
codecs.open('cache/server_data.txt','a').write( "%s=%s\n" % (str(key),str(value)))
|
|
|
|
def flask_thread(q):
|
|
#app = Flask(__name__, static_url_path='/cache',
|
|
# static_folder='cache',)
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = flask_secretkey
|
|
app.jinja_env.auto_reload = True
|
|
socketio = SocketIO(app)
|
|
|
|
app.config['TEMPLATES_AUTO_RELOAD'] = True
|
|
|
|
def before_request():
|
|
app.jinja_env.cache = {}
|
|
|
|
app.before_request(before_request)
|
|
|
|
|
|
|
|
@app.route('/mirror')
|
|
def mirror():
|
|
return codecs.open('cache/crawl/index.html','r','utf-8').read()
|
|
|
|
|
|
@app.route('/mirror/<filename>')
|
|
def mirror_file(filename):
|
|
return markdown.markdown( codecs.open('cache/crawl/'+filename,'r','utf-8').read() ) + \
|
|
"<pre>" + codecs.open('cache/crawl/'+filename,'r','utf-8').read() + "</pre>"
|
|
|
|
@app.route('/clearscreens')
|
|
def clears():
|
|
clearscreens()
|
|
return homepage()
|
|
|
|
|
|
@app.route('/displaypi/on')
|
|
def dpi():
|
|
displaypi_on()
|
|
return homepage()
|
|
|
|
|
|
@app.route('/displaypi/off')
|
|
def dpi2():
|
|
displaypi_off()
|
|
return homepage()
|
|
|
|
|
|
@app.route('/screensoff')
|
|
def screenoff_a():
|
|
screenoff()
|
|
return homepage()
|
|
|
|
|
|
|
|
@app.route('/light')
|
|
def light():
|
|
desklight()
|
|
return homepage()
|
|
|
|
|
|
@app.route('/image/<filename>', methods=['GET','POST'])
|
|
def do_image(filename):
|
|
return image_edit(filename)
|
|
|
|
@app.route('/imagecrop/<filename>/<x>/<y>/<w>/<h>/<newname>', methods=['GET','POST'])
|
|
def do_image_crop(filename,x,y,w,h,newname):
|
|
return image_crop(filename,x,y,w,h,newname)
|
|
|
|
|
|
@app.route('/md', methods=['GET', 'POST'])
|
|
def convert_markdown():
|
|
if request.method == 'POST':
|
|
markdown_input = request.form.get('markdown_input', '')
|
|
html_output = markdown.markdown(markdown_input)
|
|
return render_template('basic.html', title='md -> html', html_output=html_output)
|
|
return render_template('convert.html')
|
|
|
|
|
|
|
|
#
|
|
# SAVING STUFF
|
|
#
|
|
|
|
@app.route('/save', methods=['POST'])
|
|
def save_post():
|
|
now = datetime.now().strftime('%Y%m%dT%H%M')
|
|
path = request.form['path']
|
|
txt = request.form['content']
|
|
|
|
try:
|
|
o3 = codecs.open(server.writing_path + path, 'r', 'utf-8')
|
|
orig_text = o3.read()
|
|
o3.close()
|
|
except Exception as e:
|
|
orig_text = ''
|
|
|
|
bu_filename = server.writing_path + 'older_copies/' + path + '_' + now + '.md'
|
|
o2 = codecs.open( bu_filename, 'w', 'utf-8' )
|
|
o2.write(orig_text)
|
|
o2.close()
|
|
print('wrote backup to %s.' % bu_filename)
|
|
|
|
o1 = codecs.open(server.writing_path+path, 'w', 'utf-8')
|
|
o1.write(txt)
|
|
o1.close()
|
|
return "<h1>Successfully Saved</h1><br>" + a('back to writing folder','/x/writing/index') + \
|
|
" " + a('back to home','/')
|
|
|
|
|
|
@app.route('/x/writing/images/<fname>')
|
|
def writing_img(fname):
|
|
# TODO
|
|
img_path = "/media/hd2/peter_home/Documents/writing_img/"
|
|
print(img_path + fname + " - writing images folder")
|
|
img_ext = fname.split('.')[-1]
|
|
if img_ext == "gif":
|
|
return send_from_directory(img_path, fname)
|
|
if img_ext == "jpg":
|
|
return send_from_directory(img_path, fname)
|
|
if img_ext == "png":
|
|
return send_from_directory(img_path, fname)
|
|
return send_from_directory(img_path, fname)
|
|
|
|
#
|
|
# SERVER maintenance type stuff
|
|
@app.route('/rl')
|
|
def restart():
|
|
reload(server)
|
|
reload(localcache)
|
|
return "Server code reloaded"
|
|
|
|
@app.route("/x/<func>/<arg>/<arrg>")
|
|
def dispatch3(func,arg,arrg):
|
|
print("2 args")
|
|
return "" + server_dispatch(func, arg, arrg)
|
|
|
|
@app.route("/x/<func>/<arg>")
|
|
def dispatch2(func,arg):
|
|
print("1 arg")
|
|
return "" + server_dispatch(func, arg)
|
|
|
|
@app.route("/x/<func>")
|
|
def dispatch(func):
|
|
print("0 arg")
|
|
return server_dispatch(func)
|
|
|
|
@app.route("/api/<func>/<arg>/<arrg>")
|
|
def dispatch3j(func,arg,arrg):
|
|
print("json, 3 args")
|
|
return Response(server_dispatch(func, arg, arrg), mimetype='text/json')
|
|
|
|
@app.route("/api/<func>/<arg>")
|
|
def dispatch2j(func,arg):
|
|
print("json, 1 arg")
|
|
return Response(server_dispatch(func, arg), mimetype='text/json')
|
|
|
|
@app.route("/api/<func>")
|
|
def dispatch1j(func):
|
|
print("json, 0 arg")
|
|
return Response(server_dispatch(func), mimetype='text/json')
|
|
|
|
@app.route("/")
|
|
def home():
|
|
return server.homepage()
|
|
|
|
#
|
|
# STATIC ROUTES
|
|
#
|
|
|
|
|
|
@app.route('/data/<path:path>')
|
|
def send_cachedata(path):
|
|
#myfile = os.path.join('cache', path).replace('\\','/')
|
|
print(path)
|
|
#return app.send_static_file(myfile)
|
|
return send_from_directory('cache', path)
|
|
|
|
|
|
|
|
|
|
# Departments, classes in each, and students (with hits) in each of those.
|
|
|
|
"""@app.route('/iii/<path:path>')
|
|
def send_js(path):
|
|
return send_from_directory('gui/dist', path)"""
|
|
"""@app.route('/lib/<path:path>')
|
|
def send_jslib(path):
|
|
return send_from_directory('gui/lib', path)"""
|
|
|
|
#@app.route('/hello/')
|
|
#@app.route('/hello/<name>')
|
|
|
|
|
|
@app.route("/save/<key>/<val>")
|
|
def s(key,val):
|
|
server_save(key,val)
|
|
return tag('h1','Saved.') + "<br />" + tag('p', 'Saved: %s = %s' % (str(key),str(val)))
|
|
|
|
@app.route("/sample")
|
|
def do_sample():
|
|
return sample()
|
|
|
|
|
|
@app.route('/podcast/media/<string:file_id>')
|
|
def media(file_id):
|
|
return send_file(LECPATH + urllib.parse.unquote(file_id), attachment_filename=urllib.parse.unquote(file_id))
|
|
|
|
@app.route("/podcast")
|
|
def podcast():
|
|
return lectures()
|
|
|
|
@app.route("/lectures")
|
|
def weblec():
|
|
return web_lectures()
|
|
|
|
|
|
@app.route("/crazy")
|
|
def hello():
|
|
r = '<link rel="stylesheet" href="static/bootstrap.min.css">'
|
|
r += tag('style', 'textarea { white-space:nowrap; }')
|
|
r += tag('body', \
|
|
tagc('div','container-fluid', \
|
|
tagc('div','row', \
|
|
tagc( 'div', 'col-md-6', tag('pre', walk_file() ) ) + \
|
|
tagc( 'div', 'col-md-6', 'Column 2' + a('Shut Down','/shutdown' ) ) ) ) )
|
|
|
|
|
|
|
|
return r
|
|
|
|
@app.route("/sd")
|
|
def sd():
|
|
print('SIGINT or CTRL-C detected. Exiting gracefully')
|
|
func = request.environ.get('werkzeug.server.shutdown')
|
|
if func is None:
|
|
raise RuntimeError('Not running with the Werkzeug Server')
|
|
func()
|
|
return "Server has shut down."
|
|
|
|
|
|
@socketio.on('my event', namespace='/test')
|
|
def test_message(message):
|
|
print('received and event: "my event" from page. message is: %s' % message)
|
|
emit('my response', {'data': 'got it! it is MYEVENT'})
|
|
|
|
|
|
|
|
socketio.run(app, host= '0.0.0.0') # , debug=True
|
|
|
|
|
|
|
|
def serve():
|
|
x = threading.Thread(target=flask_thread, args=(q,))
|
|
x.start()
|
|
#webbrowser.open_new_tab("http://localhost:5000")
|
|
|
|
y = threading.Thread(target=mqtt_loop)
|
|
y.start()
|
|
|
|
|
|
|
|
from random import randint
|
|
import asciimatics
|
|
from asciimatics.scene import Scene
|
|
from asciimatics.widgets import Frame, Layout, Button, ListBox, TextBox, Widget, Divider
|
|
from asciimatics.exceptions import StopApplication, ResizeScreenError
|
|
from asciimatics.event import MouseEvent, KeyboardEvent
|
|
from asciimatics.screen import Screen
|
|
import traceback
|
|
|
|
def update():
|
|
pass
|
|
def enroll_ori():
|
|
pass
|
|
|
|
class OutputView(Frame):
|
|
def __init__(self, screen):
|
|
super(OutputView, self).__init__(screen,
|
|
screen.height,
|
|
(screen.width * 2 // 3),
|
|
on_load=self.nothing,
|
|
hover_focus=True,
|
|
has_border=True,
|
|
title="Output")
|
|
|
|
|
|
class MenuView(Frame):
|
|
def __init__(self, screen):
|
|
super(MenuView, self).__init__(screen,
|
|
screen.height,
|
|
screen.width,
|
|
on_load=self.nothing,
|
|
hover_focus=False,
|
|
has_border=False,
|
|
title="Actions")
|
|
# Save off the model that accesses the contacts database.
|
|
self.model = [["Update database","update"], ["Enroll Orientation Shell","enroll_ori"]]
|
|
|
|
# Main UI: Actions and their output
|
|
self._list_view = ListBox(
|
|
height=screen.height - 5,
|
|
options=self.model, name="actions", on_select=self.nothing)
|
|
self._output_view = TextBox(height=screen.height - 5, name="output", readonly=False)
|
|
self._output_view.value = ['default']
|
|
|
|
self._edit_button = Button("Edit", self.nothing)
|
|
self._delete_button = Button("Delete", self.a)
|
|
layout = Layout([1,2], fill_frame=False)
|
|
self.add_layout(layout)
|
|
layout.add_widget(self._list_view)
|
|
layout.add_widget(self._output_view,1)
|
|
#layout.add_widget(Divider())
|
|
|
|
layout2 = Layout([1, 1, 1, 1])
|
|
self.add_layout(layout2)
|
|
layout2.add_widget(Button("Add", self.nothing), 0)
|
|
layout2.add_widget(self._edit_button, 1)
|
|
layout2.add_widget(self._delete_button, 2)
|
|
layout2.add_widget(Button("Quit", self._quit), 3)
|
|
self.fix()
|
|
|
|
def nothing(self):
|
|
pass
|
|
|
|
def a(self):
|
|
self.update_output('delete button pressed')
|
|
|
|
def update_output(self,value):
|
|
li = self._output_view.value.copy()
|
|
li.append(value)
|
|
self._output_view.value = li
|
|
|
|
|
|
def _quit(self):
|
|
raise StopApplication("User pressed quit")
|
|
|
|
def text_app():
|
|
while True:
|
|
Screen.wrapper(demo, catch_interrupt=False)
|
|
sys.exit(0)
|
|
|
|
'''try:
|
|
Screen.wrapper(demo, catch_interrupt=False)
|
|
sys.exit(0)
|
|
except ResizeScreenError as e:
|
|
pass
|
|
except Exception as e:
|
|
print(traceback.format_exc())
|
|
sys.exit(0)'''
|
|
#mv = MenuView(demo)
|
|
#Screen.wrapper()
|
|
#Screen.wrapper(demo)
|
|
|
|
def demo(screen):
|
|
scene1 = Scene([MenuView(screen)],-1)
|
|
screen.play([scene1], stop_on_resize=True, start_scene=scene1, allow_int=True )
|
|
|
|
def demo2(screen):
|
|
last_event = KeyboardEvent(0)
|
|
while True:
|
|
screen.print_at('Hello world!',
|
|
randint(0, screen.width), randint(1, screen.height),
|
|
colour=randint(0, screen.colours - 1),
|
|
bg=randint(0, screen.colours - 1))
|
|
ev = screen.get_event()
|
|
if type(ev) == KeyboardEvent:
|
|
last_event = ev
|
|
screen.print_at(ev,
|
|
0, 0,
|
|
colour=randint(0, screen.colours - 1),
|
|
bg=randint(0, screen.colours - 1))
|
|
if last_event.key_code in (ord('Q'), ord('q')):
|
|
return
|
|
screen.refresh()
|
|
|
|
|
|
|
|
|
|
# Most common actions
|
|
|
|
## Courses
|
|
# enroll orientation / stem
|
|
# add/remove course evals
|
|
# add/remove gav_connect
|
|
# bulk add calendar events
|
|
|
|
## Outcomes
|
|
# term status report
|
|
# pull fresh course / program list from cq
|
|
# build gavilan.cc catalog
|
|
#
|
|
|
|
## Users
|
|
# new teachers this semester
|
|
# download and format a user's logs
|
|
# show all convos for a user
|
|
# fetch/update employee list
|
|
|
|
## Stats pipelines - highlight outliers
|
|
# user hits
|
|
# user messaging
|
|
# user writing sample - is discussion, message, or assignment available to create a writing sample? How is it?
|
|
# course activity
|
|
# finished course: does it appear to be used for grades?
|
|
# in progress course: has this teacher/class combo had valid-looking gredes in the past?
|
|
|
|
|
|
## Semester setup
|
|
# cross list / merge from file
|
|
# merge ad-hoc
|
|
# Move courses to winter term
|
|
# x-ref schedule and check published courses
|
|
# create a docx semester weekly calendar
|
|
|
|
|
|
## Logistics
|
|
# cross reference training status of users
|
|
# enroll into gott courses
|
|
# create sandbox shells
|
|
# build gott certificates
|
|
|
|
## Content
|
|
# build ppt from md
|
|
# download course content to a folder / word file
|
|
|
|
|
|
## Background actions on/off, exceptions
|
|
# fetch rosters hourly
|
|
# update database every 5 hours
|
|
|
|
|
|
if __name__ == "__main__":
|
|
options = { 1: ['start web server',serve] ,
|
|
2: ['start text app', text_app],
|
|
}
|
|
print ('')
|
|
|
|
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
|
|
resp = int(sys.argv[1])
|
|
print("\n\nPerforming: %s\n\n" % options[resp][0])
|
|
|
|
else:
|
|
print ('')
|
|
for key in options:
|
|
print(str(key) + '.\t' + options[key][0])
|
|
|
|
print('')
|
|
resp = input('Choose: ')
|
|
|
|
# Call the function in the options dict
|
|
options[ int(resp)][1]()
|
|
|