canvasapp/util.py

355 lines
12 KiB
Python
Executable File

import os
import builtins as _builtins
import codecs as _codecs
# --- Safe file I/O monkey patches ---
# Ensure parent folders exist for write/append modes when using open/codecs.open
_orig_open = _builtins.open
_orig_codecs_open = _codecs.open
def _ensure_parent_dir(path):
try:
if isinstance(path, (str, bytes, os.PathLike)):
d = os.path.dirname(os.fspath(path))
if d and not os.path.exists(d):
os.makedirs(d, exist_ok=True)
except Exception:
# Never block the open call due to directory check errors
pass
# Wrap builtins.open to auto-create parent folders and sanitize buffering.
def _open_with_dirs(file, mode='r', *args, **kwargs):
try:
if isinstance(file, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')):
_ensure_parent_dir(file)
finally:
# Avoid RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode
args_list = None
buffering_arg = None
if args:
args_list = list(args)
buffering_arg = args_list[0]
elif 'buffering' in kwargs:
buffering_arg = kwargs['buffering']
if 'b' in mode and buffering_arg == 1:
if args_list is not None:
args_list[0] = -1
else:
kwargs = dict(kwargs)
kwargs['buffering'] = -1 # use default buffering for binary
if args_list is not None:
return _orig_open(file, mode, *tuple(args_list), **kwargs)
return _orig_open(file, mode, *args, **kwargs)
# Wrap codecs.open mirrors to ensure parent folders exist and fix buffering.
def _codecs_open_with_dirs(filename, mode='r', encoding=None, errors='strict', buffering=1):
try:
if isinstance(filename, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')):
_ensure_parent_dir(filename)
finally:
# Avoid line-buffering with binary modes
if 'b' in mode and buffering == 1:
buffering = -1
return _orig_codecs_open(filename, mode, encoding, errors, buffering)
# Apply patches once
_builtins.open = _open_with_dirs
_codecs.open = _codecs_open_with_dirs
# Patch pandas to_csv to auto-create parent folder if available
try:
import pandas as _pd # noqa: F401
_orig_to_csv = _pd.DataFrame.to_csv
def _to_csv_with_dirs(self, path_or_buf=None, *args, **kwargs):
if isinstance(path_or_buf, (str, bytes, os.PathLike)):
_ensure_parent_dir(path_or_buf)
return _orig_to_csv(self, path_or_buf, *args, **kwargs)
_pd.DataFrame.to_csv = _to_csv_with_dirs
except Exception:
pass
import re, csv
from collections import defaultdict
import pytz, datetime, dateutil, json
from datetime import timedelta
from dateutil import tz
from functools import reduce
# Teacher name format changed. Remove commas and switch first to last
def fix_t_name(str):
str = str.strip()
str = re.sub(r'\s+',' ',str)
parts = str.split(', ')
if len(parts)>1:
return parts[1].strip() + " " + parts[0].strip()
return str
# Separate dept and code
def split_class_dept(c):
return c.split(' ')[0]
def split_class_code(c):
num = c.split(' ')[1]
parts = re.match(r'(\d+)([a-zA-Z]+)',num)
#ret = "Got %s, " % c
if parts:
r = int(parts.group(1))
#print(ret + "returning %i." % r)
return r
#print(ret + "returning %s." % num)
return int(num)
def split_class_code_letter(c):
num = c.split(' ')[1]
parts = re.match(r'(\d+)([A-Za-z]+)',num)
if parts:
return parts.group(2)
return ''
def nowAsStr():
#Get the current time, printed in the right format
currentTime = datetime.datetime.utcnow()
prettyTime = currentTime.strftime('%a, %d %b %Y %H:%M:%S GMT')
return prettyTime
def contains_key_value(lst, x, y):
"""
Checks if a list contains a dictionary with a specific key-value pair.
:param lst: List of dictionaries to search through.
:param x: The key to look for.
:param y: The value associated with the key.
:return: True if a dictionary in the list contains the key-value pair, otherwise False.
"""
return reduce(lambda acc, item: acc or (isinstance(item, dict) and item.get(x) == y), lst, False)
def find_dict_with_key_value(lst, x, y):
"""
Finds the first dictionary in a list where the key x has the value y.
:param lst: List of dictionaries to search through.
:param x: The key to look for.
:param y: The value associated with the key.
:return: The first dictionary containing the key-value pair, or None if not found.
"""
return next((d for d in lst if isinstance(d, dict) and d.get(x) == y), None)
def extract_key_values(lst, x):
"""
Extracts the values of the given key from a list of dictionaries.
:param lst: List of dictionaries to search through.
:param x: The key to look for.
:return: A list of values corresponding to the key.
"""
return reduce(lambda acc, item: acc + [item[x]] if isinstance(item, dict) and x in item else acc, lst, [])
def stripper(s):
from bs4 import BeautifulSoup as bs
REMOVE_ATTRIBUTES = [
'lang','language','onmouseover','onmouseout','script','style','font',
'dir','face','size','color','style','class','width','height','hspace',
'border','valign','align','background','bgcolor','text','link','vlink',
'alink','cellpadding','cellspacing']
#doc = '''<html><head><title>Page title</title></head><body><p id="firstpara" align="center">This is <i>paragraph</i> <a onmouseout="">one</a>.<p id="secondpara" align="blah">This is <i>paragraph</i> <b>two</b>.</html>'''
soup = bs(s, features='lxml')
for tag in soup.recursiveChildGenerator():
try:
tag.attrs = {key:value for key,value in tag.attrs.iteritems()
if key not in REMOVE_ATTRIBUTES}
except AttributeError:
# 'NavigableString' object has no attribute 'attrs'
pass
return soup.prettify()
def mycleaner(s):
s = re.sub(r'<br\s?\/>','\n',s)
s = re.sub(r'<\/?b>','',s)
s = re.sub(r' +',' ',s)
s = re.sub(r'^[\s\t\r\n]+$','',s,flags=re.MULTILINE)
s = re.sub('^ ','',s)
return s
def print_table(table):
longest_cols = [
(max([len(str(row[i])) for row in table]) + 3)
for i in range(len(table[0]))
]
row_format = "".join(["{:>" + str(longest_col) + "}" for longest_col in longest_cols])
for row in table:
print(row_format.format(*row))
def remove_nl(str):
return str.rstrip()
def UnicodeDictReader(utf8_data, **kwargs):
csv_reader = csv.DictReader(utf8_data, **kwargs)
for row in csv_reader:
yield {str(key, 'utf-8'):str(value, 'utf-8') for key, value in iter(list(row.items()))}
def minimal_string(s):
s = s.lower()
s = re.sub(r'[^a-zA-Z0-9]',' ',s)
s = re.sub(r'(\s+)',' ',s)
s = s.strip()
return s
def to_file_friendly(st):
st = st.lower()
st = re.sub( r"[^a-z0-9]+","_",st)
return st
def clean_title(st):
sq = re.sub( r"[^a-zA-Z0-9\.\-\!]"," ",st )
if sq: st = sq
if len(st)>50: return st[:50]+'...'
return st
def int_or_zero(x):
if x == None: return 0
else: return int(x)
def float_or_zero(x):
if x == None: return 0
else: return float(x)
def match59(x):
if x['links']['context']==7959: return True
return False
def item_2(x): return x[2]
def unix_time_millis(dt):
wst = pytz.timezone("US/Pacific")
epoch = datetime.datetime.fromtimestamp(0)
epoch = wst.localize(epoch)
return (dt - epoch).total_seconds() * 1000.0
# ENGL250 returns ENGL
def dept_from_name(n):
m = re.search(r'^([a-zA-Z]+)\s?[\d\/]+',n)
if m: return m.group(1)
print(("Couldn't find dept from: " + n))
return ''
# ENGL250 returns 250
def num_from_name(n):
m = re.search(r'^([a-zA-Z]+)\s?([\d\/]+[A-Z]?)',n)
if m: return m.group(2)
print(("Couldn't find num from: " + n))
return ''
def most_common_item(li):
d = defaultdict(int)
for x in li:
d[x] += 1
s = sorted(iter(list(d.items())), key=lambda k_v: (k_v[1],k_v[0]), reverse=True)
#pdb.set_trace()
return s[0][0]
def srt_times(a,b):
HERE = tz.tzlocal()
da = dateutil.parser.parse(a)
da = da.astimezone(HERE)
db = dateutil.parser.parse(b)
db = db.astimezone(HERE)
diff = da - db
return diff.seconds + diff.days * 24 * 3600
def how_long_ago(a): # number of hours ago 'a' was...
if not a: return 9999
HERE = tz.tzlocal()
d_now = datetime.datetime.now()
d_now = d_now.replace(tzinfo=None)
#d_now = d_now.astimezone(HERE)
d_then = dateutil.parser.parse(a)
d_then = d_then.replace(tzinfo=None)
#d_then = d_then.astimezone(HERE)
diff = d_now - d_then
return (diff.seconds/3600) + (diff.days * 24) + 8 # add 8 hours to get back from UTC timezone
def partition(times_list):
# get a list of times in this format: 2017-02-14T17:01:46Z
# and break them into a list of sessions, [start, hits, minutes]
minutes_till_new_session = 26
global dd
mm = ['x','Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
start = ""
last = ""
hits = 0
delta = timedelta(minutes=26)
HERE = tz.tzlocal()
sessions = []
sorted_times_list = sorted(times_list, srt_times)
current_set = []
timeline_times = []
for T in sorted_times_list:
dt_naive = dateutil.parser.parse(T)
dt = dt_naive.astimezone(HERE)
timeline_st = unix_time_millis(dt)
timeline_et = timeline_st + (1 * 60 * 1000) # always end 1 minute later....
timeline_dict = {}
timeline_dict['starting_time'] = timeline_st
timeline_dict['ending_time'] = timeline_et
timeline_times.append(timeline_dict)
month = mm[ int(dt.strftime("%m"))]
formatted = month + " " + dt.strftime("%d %H:%M")
if not start: # start a new session
start = dt
start_f = formatted
last = dt
current_set.append(formatted)
hits = 1
else: #
if dt > last + delta: # too long. save sesh. start another, if hits > 2
minutes = (last - start)
minutes = (minutes.seconds / 60) + 5
if hits > 2:
sessions.append( [start_f, hits, minutes,current_set] )
start = dt
start_f = formatted
last = dt
hits = 1
current_set = [formatted]
else: # put in current session
last = dt
current_set.append(formatted)
hits += 1
# save last sesh
if (last):
minutes = (last - start)
minutes = (minutes.seconds / 60) + 5
if hits > 2:
sessions.append( [start_f,hits,minutes,current_set] )
dd.write(json.dumps(timeline_times))
return sessions
def clean_fn(s):
s = re.sub(r'[\s:]+','',s)
s = re.sub(r'\/','+',s)
return s
def format_html(html):
from bs4 import BeautifulSoup as bs
soup = bs(html, 'html.parser')
return soup.prettify()