920 lines
28 KiB
Python
920 lines
28 KiB
Python
import curses
|
|
import heapq, re, csv, os, shutil, datetime, urllib
|
|
import itertools, time, markdown, csv, json, os.path, webbrowser, threading
|
|
from functools import wraps
|
|
from flask import Flask, request, send_from_directory, Response, render_template
|
|
from flask import send_file
|
|
from flask_socketio import SocketIO, emit
|
|
from werkzeug.routing import PathConverter
|
|
from queue import Queue
|
|
|
|
from importlib import reload
|
|
|
|
import server
|
|
import localcache
|
|
from server import *
|
|
from secrets import flask_secretkey
|
|
|
|
q = Queue()
|
|
|
|
|
|
HOST_NAME = '127.0.0.1' #
|
|
HOST_NAME = '192.168.1.6' #
|
|
HOST_NAME = '192.168.1.6' #
|
|
PORT_NUMBER = 8080 # Maybe set this to 9000.
|
|
|
|
datafile = 'lambda.csv'
|
|
|
|
#writing_path = 'c:/users/peter/Nextcloud/Documents/writing/'
|
|
|
|
|
|
####
|
|
#### This little web server is going to work with the "gui" folder / vue app
|
|
####
|
|
####
|
|
|
|
|
|
|
|
def dict_generator(indict, pre=None):
|
|
pre = pre[:] if pre else []
|
|
if isinstance(indict, dict):
|
|
for key, value in indict.items():
|
|
if isinstance(value, dict):
|
|
for d in dict_generator(value, pre + [key]):
|
|
yield d
|
|
elif isinstance(value, list) or isinstance(value, tuple):
|
|
for v in value:
|
|
for d in dict_generator(v, pre + [key]):
|
|
yield d
|
|
else:
|
|
yield str(pre) + " " + str([key, value]) + "\n"
|
|
else:
|
|
yield pre + [indict]
|
|
yield str(pre) + " " + str([indict]) + "\n"
|
|
|
|
|
|
|
|
def print_dict(v, prefix='',indent=''):
|
|
if isinstance(v, dict):
|
|
return [ print_dict(v2, "{}['{}']".format(prefix, k) + "<br />", indent+" " ) for k, v2 in v.items() ]
|
|
elif isinstance(v, list):
|
|
return [ print_dict( v2, "{}[{}]".format(prefix , i) + "<br />", indent+" ") for i, v2 in enumerate(v) ]
|
|
else:
|
|
return '{} = {}'.format(prefix, repr(v)) + "\n"
|
|
|
|
|
|
def walk_file():
|
|
j = json.loads(open('cache/programs/programs_2.txt','r').read())
|
|
|
|
return print_dict(j)
|
|
|
|
|
|
def tag(x,y): return "<%s>%s</%s>" % (x,y,x)
|
|
|
|
def tagc(x,c,y): return '<%s class="%s">%s</%s>' % (x,c,y,x)
|
|
|
|
def a(t,h): return '<a href="%s">%s</a>' % (h,t)
|
|
|
|
def server_save(key,value):
|
|
codecs.open('cache/server_data.txt','a').write( "%s=%s\n" % (str(key),str(value)))
|
|
|
|
def flask_thread(q):
|
|
#app = Flask(__name__, static_url_path='/cache',
|
|
# static_folder='cache',)
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = flask_secretkey
|
|
app.jinja_env.auto_reload = True
|
|
socketio = SocketIO(app)
|
|
|
|
app.config['TEMPLATES_AUTO_RELOAD'] = True
|
|
|
|
def before_request():
|
|
app.jinja_env.cache = {}
|
|
|
|
app.before_request(before_request)
|
|
|
|
|
|
|
|
|
|
@app.route('/clearscreens')
|
|
def clears():
|
|
clearscreens()
|
|
return homepage()
|
|
|
|
|
|
@app.route('/displaypi/on')
|
|
def dpi():
|
|
displaypi_on()
|
|
return homepage()
|
|
|
|
|
|
@app.route('/displaypi/off')
|
|
def dpi2():
|
|
displaypi_off()
|
|
return homepage()
|
|
|
|
|
|
@app.route('/screensoff')
|
|
def screenoff_a():
|
|
screenoff()
|
|
return homepage()
|
|
|
|
|
|
|
|
@app.route('/light')
|
|
def light():
|
|
desklight()
|
|
return homepage()
|
|
|
|
|
|
@app.route('/image/<filename>', methods=['GET','POST'])
|
|
def do_image(filename):
|
|
return image_edit(filename)
|
|
|
|
@app.route('/imagecrop/<filename>/<x>/<y>/<w>/<h>/<newname>', methods=['GET','POST'])
|
|
def do_image_crop(filename,x,y,w,h,newname):
|
|
return image_crop(filename,x,y,w,h,newname)
|
|
|
|
|
|
|
|
#
|
|
# SAVING STUFF
|
|
#
|
|
|
|
@app.route('/save', methods=['POST'])
|
|
def save_post():
|
|
now = datetime.now().strftime('%Y%m%dT%H%M')
|
|
path = request.form['path']
|
|
txt = request.form['content']
|
|
|
|
o3 = codecs.open(server.writing_path + path, 'r', 'utf-8')
|
|
orig_text = o3.read()
|
|
o3.close()
|
|
|
|
bu_filename = server.writing_path + 'older_copies/' + path + '_' + now + '.md'
|
|
o2 = codecs.open( bu_filename, 'w', 'utf-8' )
|
|
o2.write(orig_text)
|
|
o2.close()
|
|
print('wrote backup to %s.' % bu_filename)
|
|
|
|
o1 = codecs.open(server.writing_path+path, 'w', 'utf-8')
|
|
o1.write(txt)
|
|
o1.close()
|
|
return "<h1>Successfully Saved</h1><br>" + a('back to writing folder','/x/writing/index') + \
|
|
" " + a('back to home','/')
|
|
|
|
|
|
@app.route('/x/writing/images/<fname>')
|
|
def writing_img(fname):
|
|
img_path = "/media/hd2/peter_home/Documents/writing_img/"
|
|
print(img_path + fname + " - writing images folder")
|
|
img_ext = fname.split('.')[-1]
|
|
if img_ext == "gif":
|
|
return send_from_directory(img_path, fname)
|
|
if img_ext == "jpg":
|
|
return send_from_directory(img_path, fname)
|
|
if img_ext == "png":
|
|
return send_from_directory(img_path, fname)
|
|
return send_from_directory(img_path, fname)
|
|
|
|
#
|
|
# SERVER maintenance type stuff
|
|
@app.route('/rl')
|
|
def restart():
|
|
reload(server)
|
|
reload(localcache)
|
|
return "Server code reloaded"
|
|
|
|
@app.route("/x/<func>/<arg>/<arrg>")
|
|
def dispatch3(func,arg,arrg):
|
|
print("2 args")
|
|
return "" + server_dispatch(func, arg, arrg)
|
|
|
|
@app.route("/x/<func>/<arg>")
|
|
def dispatch2(func,arg):
|
|
print("1 arg")
|
|
return "" + server_dispatch(func, arg)
|
|
|
|
@app.route("/x/<func>")
|
|
def dispatch(func):
|
|
print("0 arg")
|
|
return server_dispatch(func)
|
|
|
|
@app.route("/api/<func>/<arg>/<arrg>")
|
|
def dispatch3j(func,arg,arrg):
|
|
print("json, 3 args")
|
|
return Response(server_dispatch(func, arg, arrg), mimetype='text/json')
|
|
|
|
@app.route("/api/<func>/<arg>")
|
|
def dispatch2j(func,arg):
|
|
print("json, 1 arg")
|
|
return Response(server_dispatch(func, arg), mimetype='text/json')
|
|
|
|
@app.route("/api/<func>")
|
|
def dispatch1j(func):
|
|
print("json, 0 arg")
|
|
return Response(server_dispatch(func), mimetype='text/json')
|
|
|
|
@app.route("/")
|
|
def home():
|
|
return server.homepage()
|
|
|
|
#
|
|
# STATIC ROUTES
|
|
#
|
|
|
|
|
|
@app.route('/data/<path:path>')
|
|
def send_cachedata(path):
|
|
#myfile = os.path.join('cache', path).replace('\\','/')
|
|
print(path)
|
|
#return app.send_static_file(myfile)
|
|
return send_from_directory('cache', path)
|
|
|
|
|
|
|
|
|
|
# Departments, classes in each, and students (with hits) in each of those.
|
|
|
|
"""@app.route('/iii/<path:path>')
|
|
def send_js(path):
|
|
return send_from_directory('gui/dist', path)"""
|
|
"""@app.route('/lib/<path:path>')
|
|
def send_jslib(path):
|
|
return send_from_directory('gui/lib', path)"""
|
|
|
|
#@app.route('/hello/')
|
|
#@app.route('/hello/<name>')
|
|
|
|
|
|
@app.route("/save/<key>/<val>")
|
|
def s(key,val):
|
|
server_save(key,val)
|
|
return tag('h1','Saved.') + "<br />" + tag('p', 'Saved: %s = %s' % (str(key),str(val)))
|
|
|
|
@app.route("/sample")
|
|
def do_sample():
|
|
return sample()
|
|
|
|
|
|
@app.route('/podcast/media/<string:file_id>')
|
|
def media(file_id):
|
|
return send_file(LECPATH + urllib.parse.unquote(file_id), attachment_filename=urllib.parse.unquote(file_id))
|
|
|
|
@app.route("/podcast")
|
|
def podcast():
|
|
return lectures()
|
|
|
|
@app.route("/lectures")
|
|
def weblec():
|
|
return web_lectures()
|
|
|
|
|
|
@app.route("/crazy")
|
|
def hello():
|
|
r = '<link rel="stylesheet" href="static/bootstrap.min.css">'
|
|
r += tag('style', 'textarea { white-space:nowrap; }')
|
|
r += tag('body', \
|
|
tagc('div','container-fluid', \
|
|
tagc('div','row', \
|
|
tagc( 'div', 'col-md-6', tag('pre', walk_file() ) ) + \
|
|
tagc( 'div', 'col-md-6', 'Column 2' + a('Shut Down','/shutdown' ) ) ) ) )
|
|
|
|
|
|
|
|
return r
|
|
|
|
@app.route("/sd")
|
|
def sd():
|
|
print('SIGINT or CTRL-C detected. Exiting gracefully')
|
|
func = request.environ.get('werkzeug.server.shutdown')
|
|
if func is None:
|
|
raise RuntimeError('Not running with the Werkzeug Server')
|
|
func()
|
|
return "Server has shut down."
|
|
|
|
|
|
@socketio.on('my event', namespace='/test')
|
|
def test_message(message):
|
|
print('received and event: "my event" from page. message is: %s' % message)
|
|
emit('my response', {'data': 'got it! it is MYEVENT'})
|
|
|
|
|
|
|
|
socketio.run(app, host= '0.0.0.0')
|
|
|
|
|
|
|
|
def serve():
|
|
x = threading.Thread(target=flask_thread, args=(q,))
|
|
x.start()
|
|
#webbrowser.open_new_tab("http://localhost:5000")
|
|
|
|
y = threading.Thread(target=mqtt_loop)
|
|
y.start()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
serve()
|
|
|
|
|
|
|
|
|
|
|
|
"""class HelloWorldExample(object):
|
|
def make_teacher_rel(self, tchr, clss):
|
|
with self._driver.session() as tx:
|
|
tx.run("MERGE (tchr:Teacher {name: $tchr}) MERGE (tchr)-[:TEACHES]->(clss:Class {name: $clss})", \
|
|
tchr=tchr, clss=clss)
|
|
|
|
def __init__(self, uri, user, password):
|
|
self._driver = GraphDatabase.driver(uri, auth=(user, password))
|
|
|
|
def close(self):
|
|
self._driver.close()
|
|
|
|
|
|
|
|
def print_greeting(self, message):
|
|
with self._driver.session() as session:
|
|
greeting = session.write_transaction(self._create_and_return_greeting, message)
|
|
print(greeting)
|
|
|
|
@staticmethod
|
|
def _create_and_return_greeting(tx, message):
|
|
result = tx.run("CREATE (a:Greeting) "
|
|
"SET a.message = $message "
|
|
"RETURN a.message + ', from node ' + id(a)", message=message)
|
|
return result.single()[0]
|
|
"""
|
|
|
|
|
|
def make_teacher_rel(g, tchr, clss):
|
|
g.run("MERGE (tchr:Teacher {name: $tchr}) MERGE (tchr)-[:TEACHES]->(clss:Class {name: $clss})", \
|
|
tchr=tchr, clss=clss)
|
|
|
|
|
|
def testgraph():
|
|
gg = Graph("bolt://localhost:7687", auth=("neo4j", "asdf"))
|
|
|
|
#gg.run("DROP CONSTRAINT ON (tchr:Teacher) ASSERT tchr.name IS UNIQUE")
|
|
#gg.run("DROP CONSTRAINT ON (clss:Class) ASSERT clss.name IS UNIQUE")
|
|
|
|
#gg.run("CREATE INDEX ON :Teacher(name)")
|
|
#gg.run("CREATE INDEX ON :Class(name)")
|
|
|
|
stuff = json.loads( open('output/semesters/2020spring/sp20_sched.json','r').read())
|
|
|
|
# make lists of unique course code+name, teacher, locations
|
|
tch = {}
|
|
crs = {}
|
|
loc = {}
|
|
sem = Node("Semester", name="sp20")
|
|
for c in stuff:
|
|
if not c['teacher'] in tch:
|
|
tch[c['teacher']] = Node("Teacher", name=c['teacher'])
|
|
gg.create(tch[c['teacher']])
|
|
if not c['code'] in crs:
|
|
crs[ c['code'] ] = Node("Course section", name=c['name'], code=c['code'])
|
|
gg.create(crs[ c['code'] ])
|
|
if not c['loc'] in loc:
|
|
loc[ c['loc'] ] = Node("Location", loc=c['loc'])
|
|
gg.create(loc[ c['loc'] ])
|
|
sect = Node("Section", crn=int(c['crn']))
|
|
gg.create(Relationship(tch[c['teacher']], "TEACHES", sect ))
|
|
gg.create(Relationship(sect, "CLASS OF", crs[ c['code'] ] ))
|
|
gg.create(Relationship( sect, "LOCATED AT", loc[ c['loc'] ] ))
|
|
|
|
"""
|
|
for c in stuff:
|
|
print(c['crn'])
|
|
q = "CREATE (section:Section { Name: "+c['name']+", Code: "+c['code']+", Crn: "+c['crn']+", Teacher: "+c['teacher']+" })"
|
|
q = 'CREATE (section:Section { Name: "%s", Code: "%s", Crn: "%s", Teacher: "%s" })' % \
|
|
(c['name'], c['code'], c['crn'], c['teacher'])
|
|
gg.run(q)
|
|
"""
|
|
#gg = HelloWorldExample("bolt://localhost:7687", "neo4j", "asdf")
|
|
#gg.print_greeting("hi there world")
|
|
"""
|
|
make_teacher_rel(gg, "Peter Howell","CSIS 42")
|
|
make_teacher_rel(gg, "Alex Stoykov","CSIS 42")
|
|
make_teacher_rel(gg, "Sabrina Lawrence","CSIS 85")
|
|
make_teacher_rel(gg, "Peter Howell","CSIS 85")
|
|
"""
|
|
|
|
screen = 0
|
|
|
|
def Memoize( func):
|
|
"""
|
|
Memoize decorator
|
|
"""
|
|
cache = {}
|
|
|
|
@wraps(func)
|
|
def wrapper(*args):
|
|
if args not in cache:
|
|
cache[args] = func(*args)
|
|
return cache[args]
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
class MyRepl:
|
|
description = {
|
|
"switch ": "Switch stream. You can use either 'switch public' or 'switch mine'",
|
|
"home " : "Show your timeline. 'home 7' will show 7 tweet.",
|
|
"harry " : "a guys name.",
|
|
"homo " : "means the same.",
|
|
"view " : "'view @mdo' will show @mdo's home.",
|
|
"h " : "Show help.",
|
|
"t " : "'t opps' will tweet 'opps' immediately.",
|
|
"s " : "'s #AKB48' will search for '#AKB48' and return 5 newest tweets."
|
|
}
|
|
|
|
|
|
def startup(self, outfile):
|
|
global screen # make it self
|
|
self.g = {}
|
|
self.buf = {}
|
|
screen = None
|
|
self.enter_ary = [curses.KEY_ENTER,10]
|
|
self.delete_ary = [curses.KEY_BACKSPACE,curses.KEY_DC,8,127,263]
|
|
self.tab_ary = [9]
|
|
self.up_ary = [curses.KEY_UP]
|
|
self.down_ary = [curses.KEY_DOWN]
|
|
|
|
# Init curses screen
|
|
screen = curses.initscr()
|
|
screen.keypad(1)
|
|
curses.noecho()
|
|
try:
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
for i in range(0, curses.COLORS):
|
|
curses.init_pair(i + 1, i, -1)
|
|
except curses.error:
|
|
pass
|
|
curses.cbreak()
|
|
self.g['height'] , self.g['width'] = screen.getmaxyx()
|
|
#print("Width: %i" % self.g['width'])
|
|
|
|
# Init color function
|
|
s = self
|
|
self.white = lambda x:curses_print_word(x,7) #0)
|
|
self.grey = lambda x:curses_print_word(x, 3) #3)1)
|
|
self.red = lambda x:curses_print_word(x,7) #2)
|
|
self.green = lambda x:curses_print_word(x, 3) #3)
|
|
self.yellow = lambda x:curses_print_word(x,7) #4)
|
|
self.blue = lambda x:curses_print_word(x,3)
|
|
self.magenta = lambda x:curses_print_word(x,7) #6)
|
|
self.cyan = lambda x:curses_print_word(x,7) #7)
|
|
self.colors_shuffle = [s.grey, s.red, s.green, s.yellow, s.blue, s.magenta, s.cyan]
|
|
self.cyc = itertools.cycle(s.colors_shuffle[1:])
|
|
self.index_cyc = itertools.cycle(range(1,8))
|
|
self.setup_command(outfile)
|
|
|
|
|
|
def set_my_dict(self,d):
|
|
self.description = d
|
|
|
|
@Memoize
|
|
def cycle_color(self, s):
|
|
"""
|
|
Cycle the colors_shuffle
|
|
"""
|
|
return next(self.cyc)
|
|
|
|
|
|
def ascii_art(self, text):
|
|
"""
|
|
Draw the Ascii Art
|
|
"""
|
|
fi = figlet_format(text, font='doom')
|
|
for i in fi.split('\n'):
|
|
self.curses_print_line(i,next(self.index_cyc))
|
|
|
|
|
|
def close_window(self, ):
|
|
"""
|
|
Close screen
|
|
"""
|
|
global screen
|
|
screen.keypad(0);
|
|
curses.nocbreak();
|
|
curses.echo()
|
|
curses.endwin()
|
|
|
|
|
|
def suggest(self, word):
|
|
"""
|
|
Find suggestion
|
|
"""
|
|
rel = []
|
|
if not word: return rel
|
|
word = word.lower()
|
|
|
|
for candidate in self.description:
|
|
|
|
ca = candidate.lower()
|
|
#if ca.startswith(word): rel.append(candidate)
|
|
|
|
for eachword in ca.split(" "):
|
|
if eachword.startswith(word):
|
|
rel.append(candidate)
|
|
|
|
return rel
|
|
|
|
|
|
def curses_print_word(self, word,color_pair_code):
|
|
"""
|
|
Print a word
|
|
"""
|
|
global screen
|
|
word = word.encode('utf8')
|
|
screen.addstr(word,curses.color_pair(color_pair_code))
|
|
|
|
|
|
def curses_print_line(self, line,color_pair_code):
|
|
"""
|
|
Print a line, scroll down if need
|
|
"""
|
|
global screen
|
|
line = line.encode('utf8')
|
|
y,x = screen.getyx()
|
|
if y - self.g['height'] == -3:
|
|
self.scroll_down(2,y,x)
|
|
screen.addstr(y,0,line,curses.color_pair(color_pair_code))
|
|
self.buf[y] = line, color_pair_code
|
|
elif y - self.g['height'] == -2:
|
|
self.scroll_down(3,y,x)
|
|
screen.addstr(y-1,0,line,curses.color_pair(color_pair_code))
|
|
self.buf[y-1] = line ,color_pair_code
|
|
else:
|
|
screen.addstr(y+1,0,line,curses.color_pair(color_pair_code))
|
|
self.buf[y+1] = line, color_pair_code
|
|
|
|
|
|
def redraw(self, start_y,end_y,fallback_y,fallback_x):
|
|
"""
|
|
Redraw lines from buf
|
|
"""
|
|
global screen
|
|
for cursor in range(start_y,end_y):
|
|
screen.move(cursor,0)
|
|
screen.clrtoeol()
|
|
try:
|
|
line, color_pair_code = self.buf[cursor]
|
|
screen.addstr(cursor,0,line,curses.color_pair(color_pair_code))
|
|
except:
|
|
pass
|
|
screen.move(fallback_y,fallback_x)
|
|
|
|
|
|
def scroll_down(self, noredraw,fallback_y,fallback_x):
|
|
"""
|
|
Scroll down 1 line
|
|
"""
|
|
global screen
|
|
# Recreate buf
|
|
# noredraw = n means that screen will scroll down n-1 line
|
|
trip_list = heapq.nlargest(noredraw-1,buf)
|
|
for i in buf:
|
|
if i not in trip_list:
|
|
self.buf[i] = self.buf[i+noredraw-1]
|
|
for j in trip_list:
|
|
buf.pop(j)
|
|
|
|
# Clear and redraw
|
|
screen.clear()
|
|
self.redraw(1,g['height']-noredraw,fallback_y,fallback_x)
|
|
|
|
|
|
def clear_upside(self, n,y,x):
|
|
"""
|
|
Clear n lines upside
|
|
"""
|
|
global screen
|
|
for i in range(1,n+1):
|
|
screen.move(y-i,0)
|
|
screen.clrtoeol()
|
|
screen.refresh()
|
|
screen.move(y,x)
|
|
|
|
|
|
def display_suggest(self, y,x,word):
|
|
"""
|
|
Display box of suggestion
|
|
"""
|
|
global screen
|
|
g = self.g
|
|
side = 2
|
|
|
|
# Check if need to print upside
|
|
upside = y+6 > int(g['height'])
|
|
|
|
# Redraw if suggestion is not the same as previous display
|
|
sug = self.suggest(word)
|
|
if sug != self.g['prev']:
|
|
# 0-line means there is no suggetions (height = 0)
|
|
# 3-line means there are many suggetions (height = 3)
|
|
# 5-line means there is only one suggetions (height = 5)
|
|
# Clear upside section
|
|
if upside:
|
|
# Clear upside is a bit difficult. Here it's seperate to 4 case.
|
|
# now: 3-lines / previous : 0 line
|
|
if len(sug) > 1 and not self.g['prev']:
|
|
self.clear_upside(3,y,x)
|
|
# now: 0-lines / previous :3 lines
|
|
elif not sug and len(g['prev'])>1:
|
|
self.redraw(y-3,y,y,x)
|
|
# now: 3-lines / previous :5 lines
|
|
elif len(sug) > 1 == len(g['prev']):
|
|
self.redraw(y-5,y-3,y,x)
|
|
self.clear_upside(3,y,x)
|
|
# now: 5-lines / previous :3 lines
|
|
elif len(sug) == 1 < len(g['prev']):
|
|
self.clear_upside(3,y,x)
|
|
# now: 0-lines / previous :5 lines
|
|
elif not sug and len(g['prev'])==1:
|
|
self.redraw(y-5,y,y,x)
|
|
# now: 3-lines / previous :3 lines
|
|
elif len(sug) == len(g['prev']) > 1:
|
|
self.clear_upside(3,y,x)
|
|
# now: 5-lines / previous :5 lines
|
|
elif len(sug) == len(g['prev']) == 1:
|
|
self.clear_upside(5,y,x)
|
|
screen.refresh()
|
|
else:
|
|
# Clear downside
|
|
screen.clrtobot()
|
|
screen.refresh()
|
|
self.g['prev'] = sug
|
|
|
|
if sug:
|
|
# More than 1 suggestion
|
|
if len(sug) > 1:
|
|
if len(sug) > 5: sug = sug[:5]
|
|
|
|
#needed_lenth = sum([len(i)+side for i in sug]) + side
|
|
needed_lenth = max( self.g['width']-5, sum([len(i)+side for i in sug]) + side)
|
|
print(self.g['width'])
|
|
print(word)
|
|
print(sug)
|
|
print(needed_lenth)
|
|
if upside:
|
|
win = curses.newwin(3,needed_lenth,y-3,0)
|
|
win.erase()
|
|
win.box()
|
|
win.refresh()
|
|
cur_width = side
|
|
for i in range(len(sug)):
|
|
if cur_width+len(sug[i]) > self.g['width']: break
|
|
screen.addstr(y-2,cur_width,sug[i],curses.color_pair(4))
|
|
cur_width += len(sug[i]) + side
|
|
if cur_width > self.g['width']:
|
|
break
|
|
else:
|
|
win = curses.newwin(3,needed_lenth,y+1,0)
|
|
win.erase()
|
|
win.box()
|
|
win.refresh()
|
|
cur_width = side
|
|
for i in range(len(sug)):
|
|
screen.addstr(y+2,cur_width,sug[i],curses.color_pair(4))
|
|
cur_width += len(sug[i]) + side
|
|
if cur_width > self.g['width']:
|
|
break
|
|
# Only 1 suggestion
|
|
else:
|
|
can = sug[0]
|
|
if upside:
|
|
win = curses.newwin(5,len(self.description[can])+2*side,y-5,0)
|
|
win.box()
|
|
win.refresh()
|
|
screen.addstr(y-4,side,can,curses.color_pair(4))
|
|
screen.addstr(y-2,side,self.description[can],curses.color_pair(3))
|
|
else:
|
|
win = curses.newwin(5,len(self.description[can])+2*side,y+1,0)
|
|
win.box()
|
|
win.refresh()
|
|
screen.addstr(y+2,side,can,curses.color_pair(4))
|
|
screen.addstr(y+4,side,self.description[can],curses.color_pair(3))
|
|
|
|
|
|
def inputloop(self, ):
|
|
"""
|
|
Main loop input
|
|
"""
|
|
global screen
|
|
word = ''
|
|
screen.addstr("\n" + self.g['prefix'],curses.color_pair(7))
|
|
|
|
while True:
|
|
# Current position
|
|
y,x = screen.getyx()
|
|
# Get char
|
|
event = screen.getch()
|
|
try :
|
|
char = chr(event)
|
|
except:
|
|
char = ''
|
|
|
|
# Test curses_print_line
|
|
if char == '?':
|
|
self.buf[y] = self.g['prefix'] + '?', 0
|
|
self.ascii_art('dtvd88')
|
|
|
|
# TAB to complete
|
|
elif event in self.tab_ary:
|
|
# First tab
|
|
try:
|
|
if not self.g['tab_cycle']:
|
|
self.g['tab_cycle'] = itertools.cycle(self.suggest(word))
|
|
|
|
suggestion = next(self.g['tab_cycle'])
|
|
# Clear current line
|
|
screen.move(y,len(self.g['prefix']))
|
|
screen.clrtoeol()
|
|
# Print out suggestion
|
|
word = suggestion
|
|
screen.addstr(y,len(self.g['prefix']),word)
|
|
self.display_suggest(y,x,word)
|
|
screen.move(y,len(word)+len(self.g['prefix']))
|
|
except:
|
|
pass
|
|
|
|
# UP key
|
|
elif event in self.up_ary:
|
|
if self.g['hist']:
|
|
# Clear current line
|
|
screen.move(y,len(self.g['prefix']))
|
|
screen.clrtoeol()
|
|
# Print out previous history
|
|
if self.g['hist_index'] > 0 - len(self.g['hist']):
|
|
self.g['hist_index'] -= 1
|
|
word = self.g['hist'][self.g['hist_index']]
|
|
screen.addstr(y,len(self.g['prefix']),word)
|
|
self.display_suggest(y,x,word)
|
|
screen.move(y,len(word)+len(self.g['prefix']))
|
|
|
|
# DOWN key
|
|
elif event in self.down_ary:
|
|
if self.g['hist']:
|
|
# clear current line
|
|
screen.move(y,len(self.g['prefix']))
|
|
screen.clrtoeol()
|
|
# print out previous history
|
|
if not self.g['hist_index']:
|
|
self.g['hist_index'] = -1
|
|
if self.g['hist_index'] < -1:
|
|
self.g['hist_index'] += 1
|
|
word = self.g['hist'][self.g['hist_index']]
|
|
screen.addstr(y,len(self.g['prefix']),word)
|
|
self.display_suggest(y,x,word)
|
|
screen.move(y,len(word)+len(self.g['prefix']))
|
|
|
|
# Enter key #### I should get the command out of there?
|
|
# #### Can I register a callback function?
|
|
|
|
elif event in self.enter_ary:
|
|
self.g['tab_cycle'] = None
|
|
self.g['hist_index'] = 0
|
|
self.g['hist'].append(word)
|
|
if word== 'q':
|
|
self.cleanup_command()
|
|
break;
|
|
self.display_suggest(y,x,'')
|
|
screen.clrtobot()
|
|
self.handle_command(word)
|
|
|
|
self.buf[y] = self.g['prefix'] + word, 0
|
|
# Touch the screen's end
|
|
if y - self.g['height'] > -3:
|
|
self.scroll_down(2,y,x)
|
|
screen.addstr(y,0,self.g['prefix'],curses.color_pair(7)) ## SHOW NEW PROMPT
|
|
else:
|
|
screen.addstr(y+1,0,self.g['prefix'],curses.color_pair(7))
|
|
word = ''
|
|
|
|
# Delete / Backspace
|
|
elif event in self.delete_ary:
|
|
self.g['tab_cycle'] = None
|
|
# Touch to line start
|
|
if x < len(self.g['prefix']) + 1:
|
|
screen.move(y,x)
|
|
word = ''
|
|
# Midle of line
|
|
else:
|
|
word = word[:-1]
|
|
screen.move(y,x-1)
|
|
screen.clrtoeol()
|
|
self.display_suggest(y,x,word)
|
|
screen.move(y,x-1)
|
|
|
|
# Another keys
|
|
else:
|
|
self.g['tab_cycle'] = None
|
|
# Explicitly print char
|
|
try:
|
|
screen.addstr(char)
|
|
word += char
|
|
self.display_suggest(y,x,word)
|
|
screen.move(y,x+1)
|
|
except ValueError as e: # got errors here when i adjusted the volume....
|
|
pass
|
|
|
|
# Reset
|
|
self.close_window()
|
|
|
|
def setup_command(self,outfile):
|
|
self.data = open(outfile,'a')
|
|
|
|
self.g['prev'] = None
|
|
self.g['tab_cycle'] = None
|
|
self.g['prefix'] = '[gav]: '
|
|
self.g['hist_index'] = 0
|
|
# Load history from previous session
|
|
try:
|
|
o = open('completer.hist')
|
|
self.g['hist'] = [i.strip() for i in o.readlines()]
|
|
except:
|
|
self.g['hist'] = []
|
|
|
|
def cleanup_command(self):
|
|
o = open('completer.hist','a')
|
|
o.write("\n".join(self.g['hist']))
|
|
o.close()
|
|
self.data.close()
|
|
|
|
def handle_command(self, cmd):
|
|
r1 = re.search( r'^n\s(.*)$',cmd)
|
|
if r1:
|
|
# new data collection mode
|
|
mode = r1.group(1)
|
|
self.g['prefix'] = "[" + mode + "]"
|
|
|
|
self.data.write("\n\n# %s\n" % mode)
|
|
else:
|
|
#winsound.Beep(440,300)
|
|
self.data.write(cmd + "\n")
|
|
self.data.flush()
|
|
|
|
|
|
|
|
def repl_staff():
|
|
|
|
tch = json.loads( open('cache/teacherdata/teachers.json','r').read() )
|
|
newdict = {}
|
|
for T in tch:
|
|
newdict[T['name']] = 'teacher with id ' + T['login_id']
|
|
c = MyRepl()
|
|
|
|
c.set_my_dict(newdict)
|
|
c.startup('cache/people_logs.txt')
|
|
c.inputloop()
|
|
|
|
|
|
def repl_degs():
|
|
|
|
tch = csv.reader( open('cache/attainment_masterlist.csv','r'),delimiter=",")
|
|
|
|
newdict = {}
|
|
num = 0
|
|
for row in tch:
|
|
if num==0:
|
|
pass
|
|
else:
|
|
d = ' '
|
|
if row[0]: d = row[0]
|
|
newdict[row[4]] = d
|
|
num += 1
|
|
|
|
#print(newdict)
|
|
#input('ready')
|
|
c = MyRepl()
|
|
|
|
c.set_my_dict(newdict)
|
|
|
|
#c.startup('cache/g_path_cluster2020_.txt')
|
|
# c.inputloop()
|
|
|
|
def repl():
|
|
repl_degs()
|
|
|
|
|
|
|
|
#input('ready')
|
|
c = MyRepl()
|
|
|
|
c.set_my_dict(newdict)
|
|
|
|
#c.startup('cache/g_path_cluster2020_.txt')
|
|
# c.inputloop()
|
|
|
|
def repl():
|
|
repl_degs()
|
|
|
|
|
|
|