import os import builtins as _builtins import codecs as _codecs # --- Safe file I/O monkey patches --- # Ensure parent folders exist for write/append modes when using open/codecs.open _orig_open = _builtins.open _orig_codecs_open = _codecs.open def _ensure_parent_dir(path): try: if isinstance(path, (str, bytes, os.PathLike)): d = os.path.dirname(os.fspath(path)) if d and not os.path.exists(d): os.makedirs(d, exist_ok=True) except Exception: # Never block the open call due to directory check errors pass # Wrap builtins.open to auto-create parent folders and sanitize buffering. def _open_with_dirs(file, mode='r', *args, **kwargs): try: if isinstance(file, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')): _ensure_parent_dir(file) finally: # Avoid RuntimeWarning: line buffering (buffering=1) isn't supported in binary mode args_list = None buffering_arg = None if args: args_list = list(args) buffering_arg = args_list[0] elif 'buffering' in kwargs: buffering_arg = kwargs['buffering'] if 'b' in mode and buffering_arg == 1: if args_list is not None: args_list[0] = -1 else: kwargs = dict(kwargs) kwargs['buffering'] = -1 # use default buffering for binary if args_list is not None: return _orig_open(file, mode, *tuple(args_list), **kwargs) return _orig_open(file, mode, *args, **kwargs) # Wrap codecs.open mirrors to ensure parent folders exist and fix buffering. def _codecs_open_with_dirs(filename, mode='r', encoding=None, errors='strict', buffering=1): try: if isinstance(filename, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')): _ensure_parent_dir(filename) finally: # Avoid line-buffering with binary modes if 'b' in mode and buffering == 1: buffering = -1 return _orig_codecs_open(filename, mode, encoding, errors, buffering) # Apply patches once _builtins.open = _open_with_dirs _codecs.open = _codecs_open_with_dirs # Patch pandas to_csv to auto-create parent folder if available try: import pandas as _pd # noqa: F401 _orig_to_csv = _pd.DataFrame.to_csv def _to_csv_with_dirs(self, path_or_buf=None, *args, **kwargs): if isinstance(path_or_buf, (str, bytes, os.PathLike)): _ensure_parent_dir(path_or_buf) return _orig_to_csv(self, path_or_buf, *args, **kwargs) _pd.DataFrame.to_csv = _to_csv_with_dirs except Exception: pass import re, csv from collections import defaultdict import pytz, datetime, dateutil, json from datetime import timedelta from dateutil import tz from functools import reduce # Teacher name format changed. Remove commas and switch first to last def fix_t_name(str): str = str.strip() str = re.sub(r'\s+',' ',str) parts = str.split(', ') if len(parts)>1: return parts[1].strip() + " " + parts[0].strip() return str # Separate dept and code def split_class_dept(c): return c.split(' ')[0] def split_class_code(c): num = c.split(' ')[1] parts = re.match(r'(\d+)([a-zA-Z]+)',num) #ret = "Got %s, " % c if parts: r = int(parts.group(1)) #print(ret + "returning %i." % r) return r #print(ret + "returning %s." % num) return int(num) def split_class_code_letter(c): num = c.split(' ')[1] parts = re.match(r'(\d+)([A-Za-z]+)',num) if parts: return parts.group(2) return '' def nowAsStr(): #Get the current time, printed in the right format currentTime = datetime.datetime.utcnow() prettyTime = currentTime.strftime('%a, %d %b %Y %H:%M:%S GMT') return prettyTime def contains_key_value(lst, x, y): """ Checks if a list contains a dictionary with a specific key-value pair. :param lst: List of dictionaries to search through. :param x: The key to look for. :param y: The value associated with the key. :return: True if a dictionary in the list contains the key-value pair, otherwise False. """ return reduce(lambda acc, item: acc or (isinstance(item, dict) and item.get(x) == y), lst, False) def find_dict_with_key_value(lst, x, y): """ Finds the first dictionary in a list where the key x has the value y. :param lst: List of dictionaries to search through. :param x: The key to look for. :param y: The value associated with the key. :return: The first dictionary containing the key-value pair, or None if not found. """ return next((d for d in lst if isinstance(d, dict) and d.get(x) == y), None) def extract_key_values(lst, x): """ Extracts the values of the given key from a list of dictionaries. :param lst: List of dictionaries to search through. :param x: The key to look for. :return: A list of values corresponding to the key. """ return reduce(lambda acc, item: acc + [item[x]] if isinstance(item, dict) and x in item else acc, lst, []) def stripper(s): from bs4 import BeautifulSoup as bs REMOVE_ATTRIBUTES = [ 'lang','language','onmouseover','onmouseout','script','style','font', 'dir','face','size','color','style','class','width','height','hspace', 'border','valign','align','background','bgcolor','text','link','vlink', 'alink','cellpadding','cellspacing'] #doc = '''
This is paragraph one.
This is paragraph two.'''
soup = bs(s, features='lxml')
for tag in soup.recursiveChildGenerator():
try:
tag.attrs = {key:value for key,value in tag.attrs.iteritems()
if key not in REMOVE_ATTRIBUTES}
except AttributeError:
# 'NavigableString' object has no attribute 'attrs'
pass
return soup.prettify()
def mycleaner(s):
s = re.sub(r'
','\n',s)
s = re.sub(r'<\/?b>','',s)
s = re.sub(r' +',' ',s)
s = re.sub(r'^[\s\t\r\n]+$','',s,flags=re.MULTILINE)
s = re.sub('^ ','',s)
return s
def print_table(table):
longest_cols = [
(max([len(str(row[i])) for row in table]) + 3)
for i in range(len(table[0]))
]
row_format = "".join(["{:>" + str(longest_col) + "}" for longest_col in longest_cols])
for row in table:
print(row_format.format(*row))
def remove_nl(str):
return str.rstrip()
def UnicodeDictReader(utf8_data, **kwargs):
csv_reader = csv.DictReader(utf8_data, **kwargs)
for row in csv_reader:
yield {str(key, 'utf-8'):str(value, 'utf-8') for key, value in iter(list(row.items()))}
def minimal_string(s):
s = s.lower()
s = re.sub(r'[^a-zA-Z0-9]',' ',s)
s = re.sub(r'(\s+)',' ',s)
s = s.strip()
return s
def to_file_friendly(st):
st = st.lower()
st = re.sub( r"[^a-z0-9]+","_",st)
return st
def clean_title(st):
sq = re.sub( r"[^a-zA-Z0-9\.\-\!]"," ",st )
if sq: st = sq
if len(st)>50: return st[:50]+'...'
return st
def int_or_zero(x):
if x == None: return 0
else: return int(x)
def float_or_zero(x):
if x == None: return 0
else: return float(x)
def match59(x):
if x['links']['context']==7959: return True
return False
def item_2(x): return x[2]
def unix_time_millis(dt):
wst = pytz.timezone("US/Pacific")
epoch = datetime.datetime.fromtimestamp(0)
epoch = wst.localize(epoch)
return (dt - epoch).total_seconds() * 1000.0
# ENGL250 returns ENGL
def dept_from_name(n):
m = re.search(r'^([a-zA-Z]+)\s?[\d\/]+',n)
if m: return m.group(1)
print(("Couldn't find dept from: " + n))
return ''
# ENGL250 returns 250
def num_from_name(n):
m = re.search(r'^([a-zA-Z]+)\s?([\d\/]+[A-Z]?)',n)
if m: return m.group(2)
print(("Couldn't find num from: " + n))
return ''
def most_common_item(li):
d = defaultdict(int)
for x in li:
d[x] += 1
s = sorted(iter(list(d.items())), key=lambda k_v: (k_v[1],k_v[0]), reverse=True)
#pdb.set_trace()
return s[0][0]
def srt_times(a,b):
HERE = tz.tzlocal()
da = dateutil.parser.parse(a)
da = da.astimezone(HERE)
db = dateutil.parser.parse(b)
db = db.astimezone(HERE)
diff = da - db
return diff.seconds + diff.days * 24 * 3600
def how_long_ago(a): # number of hours ago 'a' was...
if not a: return 9999
HERE = tz.tzlocal()
d_now = datetime.datetime.now()
d_now = d_now.replace(tzinfo=None)
#d_now = d_now.astimezone(HERE)
d_then = dateutil.parser.parse(a)
d_then = d_then.replace(tzinfo=None)
#d_then = d_then.astimezone(HERE)
diff = d_now - d_then
return (diff.seconds/3600) + (diff.days * 24) + 8 # add 8 hours to get back from UTC timezone
def partition(times_list):
# get a list of times in this format: 2017-02-14T17:01:46Z
# and break them into a list of sessions, [start, hits, minutes]
minutes_till_new_session = 26
global dd
mm = ['x','Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
start = ""
last = ""
hits = 0
delta = timedelta(minutes=26)
HERE = tz.tzlocal()
sessions = []
sorted_times_list = sorted(times_list, srt_times)
current_set = []
timeline_times = []
for T in sorted_times_list:
dt_naive = dateutil.parser.parse(T)
dt = dt_naive.astimezone(HERE)
timeline_st = unix_time_millis(dt)
timeline_et = timeline_st + (1 * 60 * 1000) # always end 1 minute later....
timeline_dict = {}
timeline_dict['starting_time'] = timeline_st
timeline_dict['ending_time'] = timeline_et
timeline_times.append(timeline_dict)
month = mm[ int(dt.strftime("%m"))]
formatted = month + " " + dt.strftime("%d %H:%M")
if not start: # start a new session
start = dt
start_f = formatted
last = dt
current_set.append(formatted)
hits = 1
else: #
if dt > last + delta: # too long. save sesh. start another, if hits > 2
minutes = (last - start)
minutes = (minutes.seconds / 60) + 5
if hits > 2:
sessions.append( [start_f, hits, minutes,current_set] )
start = dt
start_f = formatted
last = dt
hits = 1
current_set = [formatted]
else: # put in current session
last = dt
current_set.append(formatted)
hits += 1
# save last sesh
if (last):
minutes = (last - start)
minutes = (minutes.seconds / 60) + 5
if hits > 2:
sessions.append( [start_f,hits,minutes,current_set] )
dd.write(json.dumps(timeline_times))
return sessions
def clean_fn(s):
s = re.sub(r'[\s:]+','',s)
s = re.sub(r'\/','+',s)
return s
def format_html(html):
from bs4 import BeautifulSoup as bs
soup = bs(html, 'html.parser')
return soup.prettify()