canvasapp/util.py

205 lines
6.3 KiB
Python

import re, csv
from collections import defaultdict
from bs4 import BeautifulSoup as bs
import pytz, datetime, dateutil, json
from datetime import timedelta
from dateutil import tz
def stripper(s):
REMOVE_ATTRIBUTES = [
'lang','language','onmouseover','onmouseout','script','style','font',
'dir','face','size','color','style','class','width','height','hspace',
'border','valign','align','background','bgcolor','text','link','vlink',
'alink','cellpadding','cellspacing']
#doc = '''<html><head><title>Page title</title></head><body><p id="firstpara" align="center">This is <i>paragraph</i> <a onmouseout="">one</a>.<p id="secondpara" align="blah">This is <i>paragraph</i> <b>two</b>.</html>'''
soup = bs(s, features='lxml')
for tag in soup.recursiveChildGenerator():
try:
tag.attrs = {key:value for key,value in tag.attrs.iteritems()
if key not in REMOVE_ATTRIBUTES}
except AttributeError:
# 'NavigableString' object has no attribute 'attrs'
pass
return soup.prettify()
def mycleaner(s):
s = re.sub(r'<br\s?\/>','\n',s)
s = re.sub(r'<\/?b>','',s)
s = re.sub(r' +',' ',s)
s = re.sub(r'^[\s\t\r\n]+$','',s,flags=re.MULTILINE)
s = re.sub('^ ','',s)
return s
def print_table(table):
longest_cols = [
(max([len(str(row[i])) for row in table]) + 3)
for i in range(len(table[0]))
]
row_format = "".join(["{:>" + str(longest_col) + "}" for longest_col in longest_cols])
for row in table:
print(row_format.format(*row))
def remove_nl(str):
return str.rstrip()
def UnicodeDictReader(utf8_data, **kwargs):
csv_reader = csv.DictReader(utf8_data, **kwargs)
for row in csv_reader:
yield {str(key, 'utf-8'):str(value, 'utf-8') for key, value in iter(list(row.items()))}
def minimal_string(s):
s = s.lower()
s = re.sub(r'[^a-zA-Z0-9]',' ',s)
s = re.sub(r'(\s+)',' ',s)
s = s.strip()
return s
def to_file_friendly(st):
st = st.lower()
st = re.sub( r"[^a-z0-9]+","_",st)
return st
def clean_title(st):
sq = re.sub( r"[^a-zA-Z0-9\.\-\!]"," ",st )
if sq: st = sq
if len(st)>50: return st[:50]+'...'
return st
def int_or_zero(x):
if x == None: return 0
else: return int(x)
def float_or_zero(x):
if x == None: return 0
else: return float(x)
def match59(x):
if x['links']['context']==7959: return True
return False
def item_2(x): return x[2]
def unix_time_millis(dt):
wst = pytz.timezone("US/Pacific")
epoch = datetime.datetime.fromtimestamp(0)
epoch = wst.localize(epoch)
return (dt - epoch).total_seconds() * 1000.0
# ENGL250 returns ENGL
def dept_from_name(n):
m = re.search('^([a-zA-Z]+)\s?[\d\/]+',n)
if m: return m.group(1)
print(("Couldn't find dept from: " + n))
return ''
# ENGL250 returns 250
def num_from_name(n):
m = re.search('^([a-zA-Z]+)\s?([\d\/]+[A-Z]?)',n)
if m: return m.group(2)
print(("Couldn't find num from: " + n))
return ''
def most_common_item(li):
d = defaultdict(int)
for x in li:
d[x] += 1
s = sorted(iter(list(d.items())), key=lambda k_v: (k_v[1],k_v[0]), reverse=True)
#pdb.set_trace()
return s[0][0]
def srt_times(a,b):
HERE = tz.tzlocal()
da = dateutil.parser.parse(a)
da = da.astimezone(HERE)
db = dateutil.parser.parse(b)
db = db.astimezone(HERE)
diff = da - db
return diff.seconds + diff.days * 24 * 3600
def how_long_ago(a): # number of hours ago 'a' was...
if not a: return 9999
HERE = tz.tzlocal()
d_now = datetime.datetime.now()
d_now = d_now.replace(tzinfo=None)
#d_now = d_now.astimezone(HERE)
d_then = dateutil.parser.parse(a)
d_then = d_then.replace(tzinfo=None)
#d_then = d_then.astimezone(HERE)
diff = d_now - d_then
return (diff.seconds/3600) + (diff.days * 24) + 8 # add 8 hours to get back from UTC timezone
def partition(times_list):
# get a list of times in this format: 2017-02-14T17:01:46Z
# and break them into a list of sessions, [start, hits, minutes]
minutes_till_new_session = 26
global dd
mm = ['x','Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
start = ""
last = ""
hits = 0
delta = timedelta(minutes=26)
HERE = tz.tzlocal()
sessions = []
sorted_times_list = sorted(times_list, srt_times)
current_set = []
timeline_times = []
for T in sorted_times_list:
dt_naive = dateutil.parser.parse(T)
dt = dt_naive.astimezone(HERE)
timeline_st = unix_time_millis(dt)
timeline_et = timeline_st + (1 * 60 * 1000) # always end 1 minute later....
timeline_dict = {}
timeline_dict['starting_time'] = timeline_st
timeline_dict['ending_time'] = timeline_et
timeline_times.append(timeline_dict)
month = mm[ int(dt.strftime("%m"))]
formatted = month + " " + dt.strftime("%d %H:%M")
if not start: # start a new session
start = dt
start_f = formatted
last = dt
current_set.append(formatted)
hits = 1
else: #
if dt > last + delta: # too long. save sesh. start another, if hits > 2
minutes = (last - start)
minutes = (minutes.seconds / 60) + 5
if hits > 2:
sessions.append( [start_f, hits, minutes,current_set] )
start = dt
start_f = formatted
last = dt
hits = 1
current_set = [formatted]
else: # put in current session
last = dt
current_set.append(formatted)
hits += 1
# save last sesh
if (last):
minutes = (last - start)
minutes = (minutes.seconds / 60) + 5
if hits > 2:
sessions.append( [start_f,hits,minutes,current_set] )
dd.write(json.dumps(timeline_times))
return sessions