')
def mirror_file(filename):
return markdown.markdown( codecs.open('cache/crawl/'+filename,'r','utf-8').read() ) + \
"" + codecs.open('cache/crawl/'+filename,'r','utf-8').read() + ""
# Smart home routes removed; handled in smarthome.py
@app.route('/image/', methods=['GET','POST'])
def do_image(filename):
return image_edit(filename)
@app.route('/imagecrop//////', methods=['GET','POST'])
def do_image_crop(filename,x,y,w,h,newname):
return image_crop(filename,x,y,w,h,newname)
@app.route('/md', methods=['GET', 'POST'])
def convert_markdown():
if request.method == 'POST':
markdown_input = request.form.get('markdown_input', '')
html_output = markdown.markdown(markdown_input)
return render_template('basic.html', title='md -> html', html_output=html_output)
return render_template('convert.html')
#
# SAVING STUFF
#
@app.route('/save', methods=['POST'])
def save_post():
now = datetime.now().strftime('%Y%m%dT%H%M')
path = request.form['path']
txt = request.form['content']
try:
o3 = codecs.open(server.writing_path + path, 'r', 'utf-8')
orig_text = o3.read()
o3.close()
except Exception as e:
orig_text = ''
bu_filename = server.writing_path + 'older_copies/' + path + '_' + now + '.md'
o2 = codecs.open( bu_filename, 'w', 'utf-8' )
o2.write(orig_text)
o2.close()
print('wrote backup to %s.' % bu_filename)
o1 = codecs.open(server.writing_path+path, 'w', 'utf-8')
o1.write(txt)
o1.close()
return "Successfully Saved
" + a('back to writing folder','/x/writing/index') + \
" " + a('back to home','/')
@app.route('/x/writing/images/')
def writing_img(fname):
# TODO
img_path = "/media/hd2/peter_home/Documents/writing_img/"
print(img_path + fname + " - writing images folder")
img_ext = fname.split('.')[-1]
if img_ext == "gif":
return send_from_directory(img_path, fname)
if img_ext == "jpg":
return send_from_directory(img_path, fname)
if img_ext == "png":
return send_from_directory(img_path, fname)
return send_from_directory(img_path, fname)
#
# SERVER maintenance type stuff
@app.route('/rl')
def restart():
reload(server)
reload(localcache)
return "Server code reloaded"
@app.route("/x///")
def dispatch3(func,arg,arrg):
print("2 args")
return "" + server_dispatch(func, arg, arrg)
@app.route("/x//")
def dispatch2(func,arg):
print("1 arg")
return "" + server_dispatch(func, arg)
@app.route("/x/")
def dispatch(func):
print("0 arg")
return server_dispatch(func)
@app.route("/api///")
def dispatch3j(func,arg,arrg):
print("json, 3 args")
return Response(server_dispatch(func, arg, arrg), mimetype='text/json')
@app.route("/api//")
def dispatch2j(func,arg):
print("json, 1 arg")
return Response(server_dispatch(func, arg), mimetype='text/json')
@app.route("/api/")
def dispatch1j(func):
print("json, 0 arg")
return Response(server_dispatch(func), mimetype='text/json')
@app.route("/")
def home():
return server.homepage()
# Bridge new server routes so this app serves them too
@app.route('/useful-info')
def useful_info_page_bridge():
return server.useful_info_page()
@app.route('/api/useful-info')
def useful_info_api_bridge():
return server.useful_info_api()
# Bridge roster change APIs and simple pages
@app.route('/api/rosters/changes')
def roster_changes_bridge():
return server.api_roster_changes()
@app.route('/api/rosters/changes/user/')
def roster_changes_user_bridge(user_id):
return server.api_roster_changes_by_user(user_id)
@app.route('/api/rosters/changes/course/')
def roster_changes_course_bridge(course_id):
return server.api_roster_changes_by_course(course_id)
@app.route('/api/rosters/terms')
def roster_terms_bridge():
return server.api_roster_terms()
@app.route('/api/rosters/users')
def roster_users_bridge():
return server.api_roster_users()
@app.route('/api/rosters/courses')
def roster_courses_bridge():
return server.api_roster_courses()
@app.route('/courses')
def courses_page_bridge():
return server.courses_page()
@app.route('/users')
def users_page_bridge():
return server.users_page()
@app.route('/courses/')
def courses_page_deeplink_bridge(course_id):
return server.courses_page_deeplink(course_id)
@app.route('/users/')
def users_page_deeplink_bridge(user_id):
return server.users_page_deeplink(user_id)
@app.route('/health')
def health():
return jsonify({'app': 'interactive.py', 'status': 'ok'})
#
# STATIC ROUTES
#
@app.route('/data/')
def send_cachedata(path):
#myfile = os.path.join('cache', path).replace('\\','/')
print(path)
#return app.send_static_file(myfile)
return send_from_directory('cache', path)
# Departments, classes in each, and students (with hits) in each of those.
"""@app.route('/iii/')
def send_js(path):
return send_from_directory('gui/dist', path)"""
"""@app.route('/lib/')
def send_jslib(path):
return send_from_directory('gui/lib', path)"""
#@app.route('/hello/')
#@app.route('/hello/')
@app.route("/save//")
def s(key,val):
server_save(key,val)
return tag('h1','Saved.') + "
" + tag('p', 'Saved: %s = %s' % (str(key),str(val)))
@app.route("/sample")
def do_sample():
return sample()
@app.route('/podcast/media/')
def media(file_id):
return send_file(LECPATH + urllib.parse.unquote(file_id), attachment_filename=urllib.parse.unquote(file_id))
@app.route("/podcast")
def podcast():
return lectures()
@app.route("/lectures")
def weblec():
return web_lectures()
@app.route("/crazy")
def hello():
r = ''
r += tag('style', 'textarea { white-space:nowrap; }')
r += tag('body', \
tagc('div','container-fluid', \
tagc('div','row', \
tagc( 'div', 'col-md-6', tag('pre', walk_file() ) ) + \
tagc( 'div', 'col-md-6', 'Column 2' + a('Shut Down','/shutdown' ) ) ) ) )
return r
@app.route("/sd")
def sd():
print('SIGINT or CTRL-C detected. Exiting gracefully')
func = request.environ.get('werkzeug.server.shutdown')
if func is None:
raise RuntimeError('Not running with the Werkzeug Server')
func()
return "Server has shut down."
@socketio.on('my event', namespace='/test')
def test_message(message):
print('received and event: "my event" from page. message is: %s' % message)
emit('my response', {'data': 'got it! it is MYEVENT'})
socketio.run(app, host= '0.0.0.0') # , debug=True
def serve():
x = threading.Thread(target=flask_thread, args=(q,))
x.start()
#webbrowser.open_new_tab("http://localhost:5000")
# mqtt disabled in this app
'''
from random import randint
import asciimatics
from asciimatics.scene import Scene
from asciimatics.widgets import Frame, Layout, Button, ListBox, TextBox, Widget, Divider
from asciimatics.exceptions import StopApplication, ResizeScreenError
from asciimatics.event import MouseEvent, KeyboardEvent
from asciimatics.screen import Screen
import traceback
def update():
pass
def enroll_ori():
pass
class OutputView(Frame):
def __init__(self, screen):
super(OutputView, self).__init__(screen,
screen.height,
(screen.width * 2 // 3),
on_load=self.nothing,
hover_focus=True,
has_border=True,
title="Output")
class MenuView(Frame):
def __init__(self, screen):
super(MenuView, self).__init__(screen,
screen.height,
screen.width,
on_load=self.nothing,
hover_focus=False,
has_border=False,
title="Actions")
# Save off the model that accesses the contacts database.
self.model = [["Update database","update"], ["Enroll Orientation Shell","enroll_ori"]]
# Main UI: Actions and their output
self._list_view = ListBox(
height=screen.height - 5,
options=self.model, name="actions", on_select=self.nothing)
self._output_view = TextBox(height=screen.height - 5, name="output", readonly=False)
self._output_view.value = ['default']
self._edit_button = Button("Edit", self.nothing)
self._delete_button = Button("Delete", self.a)
layout = Layout([1,2], fill_frame=False)
self.add_layout(layout)
layout.add_widget(self._list_view)
layout.add_widget(self._output_view,1)
#layout.add_widget(Divider())
layout2 = Layout([1, 1, 1, 1])
self.add_layout(layout2)
layout2.add_widget(Button("Add", self.nothing), 0)
layout2.add_widget(self._edit_button, 1)
layout2.add_widget(self._delete_button, 2)
layout2.add_widget(Button("Quit", self._quit), 3)
self.fix()
def nothing(self):
pass
def a(self):
self.update_output('delete button pressed')
def update_output(self,value):
li = self._output_view.value.copy()
li.append(value)
self._output_view.value = li
def _quit(self):
raise StopApplication("User pressed quit")
def text_app():
while True:
Screen.wrapper(demo, catch_interrupt=False)
sys.exit(0)
def demo(screen):
scene1 = Scene([MenuView(screen)],-1)
screen.play([scene1], stop_on_resize=True, start_scene=scene1, allow_int=True )
def demo2(screen):
last_event = KeyboardEvent(0)
while True:
screen.print_at('Hello world!',
randint(0, screen.width), randint(1, screen.height),
colour=randint(0, screen.colours - 1),
bg=randint(0, screen.colours - 1))
ev = screen.get_event()
if type(ev) == KeyboardEvent:
last_event = ev
screen.print_at(ev,
0, 0,
colour=randint(0, screen.colours - 1),
bg=randint(0, screen.colours - 1))
if last_event.key_code in (ord('Q'), ord('q')):
return
screen.refresh()
'''
# Most common actions
## Courses
# enroll orientation / stem
# add/remove course evals
# add/remove gav_connect
# bulk add calendar events
## Outcomes
# term status report
# pull fresh course / program list from cq
# build gavilan.cc catalog
#
## Users
# new teachers this semester
# download and format a user's logs
# show all convos for a user
# fetch/update employee list
## Stats pipelines - highlight outliers
# user hits
# user messaging
# user writing sample - is discussion, message, or assignment available to create a writing sample? How is it?
# course activity
# finished course: does it appear to be used for grades?
# in progress course: has this teacher/class combo had valid-looking gredes in the past?
## Semester setup
# cross list / merge from file
# merge ad-hoc
# Move courses to winter term
# x-ref schedule and check published courses
# create a docx semester weekly calendar
## Logistics
# cross reference training status of users
# enroll into gott courses
# create sandbox shells
# build gott certificates
## Content
# build ppt from md
# download course content to a folder / word file
## Background actions on/off, exceptions
# fetch rosters hourly
# update database every 5 hours
if __name__ == "__main__":
options = { 1: ['start web server',serve] ,
#2: ['start text app', text_app],
}
print ('')
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
resp = int(sys.argv[1])
print("\n\nPerforming: %s\n\n" % options[resp][0])
else:
print ('')
for key in options:
print(str(key) + '.\t' + options[key][0])
print('')
resp = input('Choose: ')
# Call the function in the options dict
options[ int(resp)][1]()