import os import builtins as _builtins import codecs as _codecs # --- Safe file I/O monkey patches --- # Ensure parent folders exist for write/append modes when using open/codecs.open _orig_open = _builtins.open _orig_codecs_open = _codecs.open def _ensure_parent_dir(path): try: if isinstance(path, (str, bytes, os.PathLike)): d = os.path.dirname(os.fspath(path)) if d and not os.path.exists(d): os.makedirs(d, exist_ok=True) except Exception: # Never block the open call due to directory check errors pass def _open_with_dirs(file, mode='r', *args, **kwargs): try: if isinstance(file, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')): _ensure_parent_dir(file) finally: return _orig_open(file, mode, *args, **kwargs) def _codecs_open_with_dirs(filename, mode='r', encoding=None, errors='strict', buffering=1): try: if isinstance(filename, (str, bytes, os.PathLike)) and any(m in mode for m in ('w','a','x','+')): _ensure_parent_dir(filename) finally: return _orig_codecs_open(filename, mode, encoding, errors, buffering) # Apply patches once _builtins.open = _open_with_dirs _codecs.open = _codecs_open_with_dirs # Patch pandas to_csv to auto-create parent folder if available try: import pandas as _pd # noqa: F401 _orig_to_csv = _pd.DataFrame.to_csv def _to_csv_with_dirs(self, path_or_buf=None, *args, **kwargs): if isinstance(path_or_buf, (str, bytes, os.PathLike)): _ensure_parent_dir(path_or_buf) return _orig_to_csv(self, path_or_buf, *args, **kwargs) _pd.DataFrame.to_csv = _to_csv_with_dirs except Exception: pass import re, csv from collections import defaultdict import pytz, datetime, dateutil, json from datetime import timedelta from dateutil import tz from functools import reduce # Teacher name format changed. Remove commas and switch first to last def fix_t_name(str): str = str.strip() str = re.sub(r'\s+',' ',str) parts = str.split(', ') if len(parts)>1: return parts[1].strip() + " " + parts[0].strip() return str # Separate dept and code def split_class_dept(c): return c.split(' ')[0] def split_class_code(c): num = c.split(' ')[1] parts = re.match(r'(\d+)([a-zA-Z]+)',num) #ret = "Got %s, " % c if parts: r = int(parts.group(1)) #print(ret + "returning %i." % r) return r #print(ret + "returning %s." % num) return int(num) def split_class_code_letter(c): num = c.split(' ')[1] parts = re.match(r'(\d+)([A-Za-z]+)',num) if parts: return parts.group(2) return '' def nowAsStr(): #Get the current time, printed in the right format currentTime = datetime.datetime.utcnow() prettyTime = currentTime.strftime('%a, %d %b %Y %H:%M:%S GMT') return prettyTime def contains_key_value(lst, x, y): """ Checks if a list contains a dictionary with a specific key-value pair. :param lst: List of dictionaries to search through. :param x: The key to look for. :param y: The value associated with the key. :return: True if a dictionary in the list contains the key-value pair, otherwise False. """ return reduce(lambda acc, item: acc or (isinstance(item, dict) and item.get(x) == y), lst, False) def find_dict_with_key_value(lst, x, y): """ Finds the first dictionary in a list where the key x has the value y. :param lst: List of dictionaries to search through. :param x: The key to look for. :param y: The value associated with the key. :return: The first dictionary containing the key-value pair, or None if not found. """ return next((d for d in lst if isinstance(d, dict) and d.get(x) == y), None) def extract_key_values(lst, x): """ Extracts the values of the given key from a list of dictionaries. :param lst: List of dictionaries to search through. :param x: The key to look for. :return: A list of values corresponding to the key. """ return reduce(lambda acc, item: acc + [item[x]] if isinstance(item, dict) and x in item else acc, lst, []) def stripper(s): from bs4 import BeautifulSoup as bs REMOVE_ATTRIBUTES = [ 'lang','language','onmouseover','onmouseout','script','style','font', 'dir','face','size','color','style','class','width','height','hspace', 'border','valign','align','background','bgcolor','text','link','vlink', 'alink','cellpadding','cellspacing'] #doc = '''Page title

This is paragraph one.

This is paragraph two.''' soup = bs(s, features='lxml') for tag in soup.recursiveChildGenerator(): try: tag.attrs = {key:value for key,value in tag.attrs.iteritems() if key not in REMOVE_ATTRIBUTES} except AttributeError: # 'NavigableString' object has no attribute 'attrs' pass return soup.prettify() def mycleaner(s): s = re.sub(r'','\n',s) s = re.sub(r'<\/?b>','',s) s = re.sub(r' +',' ',s) s = re.sub(r'^[\s\t\r\n]+$','',s,flags=re.MULTILINE) s = re.sub('^ ','',s) return s def print_table(table): longest_cols = [ (max([len(str(row[i])) for row in table]) + 3) for i in range(len(table[0])) ] row_format = "".join(["{:>" + str(longest_col) + "}" for longest_col in longest_cols]) for row in table: print(row_format.format(*row)) def remove_nl(str): return str.rstrip() def UnicodeDictReader(utf8_data, **kwargs): csv_reader = csv.DictReader(utf8_data, **kwargs) for row in csv_reader: yield {str(key, 'utf-8'):str(value, 'utf-8') for key, value in iter(list(row.items()))} def minimal_string(s): s = s.lower() s = re.sub(r'[^a-zA-Z0-9]',' ',s) s = re.sub(r'(\s+)',' ',s) s = s.strip() return s def to_file_friendly(st): st = st.lower() st = re.sub( r"[^a-z0-9]+","_",st) return st def clean_title(st): sq = re.sub( r"[^a-zA-Z0-9\.\-\!]"," ",st ) if sq: st = sq if len(st)>50: return st[:50]+'...' return st def int_or_zero(x): if x == None: return 0 else: return int(x) def float_or_zero(x): if x == None: return 0 else: return float(x) def match59(x): if x['links']['context']==7959: return True return False def item_2(x): return x[2] def unix_time_millis(dt): wst = pytz.timezone("US/Pacific") epoch = datetime.datetime.fromtimestamp(0) epoch = wst.localize(epoch) return (dt - epoch).total_seconds() * 1000.0 # ENGL250 returns ENGL def dept_from_name(n): m = re.search(r'^([a-zA-Z]+)\s?[\d\/]+',n) if m: return m.group(1) print(("Couldn't find dept from: " + n)) return '' # ENGL250 returns 250 def num_from_name(n): m = re.search(r'^([a-zA-Z]+)\s?([\d\/]+[A-Z]?)',n) if m: return m.group(2) print(("Couldn't find num from: " + n)) return '' def most_common_item(li): d = defaultdict(int) for x in li: d[x] += 1 s = sorted(iter(list(d.items())), key=lambda k_v: (k_v[1],k_v[0]), reverse=True) #pdb.set_trace() return s[0][0] def srt_times(a,b): HERE = tz.tzlocal() da = dateutil.parser.parse(a) da = da.astimezone(HERE) db = dateutil.parser.parse(b) db = db.astimezone(HERE) diff = da - db return diff.seconds + diff.days * 24 * 3600 def how_long_ago(a): # number of hours ago 'a' was... if not a: return 9999 HERE = tz.tzlocal() d_now = datetime.datetime.now() d_now = d_now.replace(tzinfo=None) #d_now = d_now.astimezone(HERE) d_then = dateutil.parser.parse(a) d_then = d_then.replace(tzinfo=None) #d_then = d_then.astimezone(HERE) diff = d_now - d_then return (diff.seconds/3600) + (diff.days * 24) + 8 # add 8 hours to get back from UTC timezone def partition(times_list): # get a list of times in this format: 2017-02-14T17:01:46Z # and break them into a list of sessions, [start, hits, minutes] minutes_till_new_session = 26 global dd mm = ['x','Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec'] start = "" last = "" hits = 0 delta = timedelta(minutes=26) HERE = tz.tzlocal() sessions = [] sorted_times_list = sorted(times_list, srt_times) current_set = [] timeline_times = [] for T in sorted_times_list: dt_naive = dateutil.parser.parse(T) dt = dt_naive.astimezone(HERE) timeline_st = unix_time_millis(dt) timeline_et = timeline_st + (1 * 60 * 1000) # always end 1 minute later.... timeline_dict = {} timeline_dict['starting_time'] = timeline_st timeline_dict['ending_time'] = timeline_et timeline_times.append(timeline_dict) month = mm[ int(dt.strftime("%m"))] formatted = month + " " + dt.strftime("%d %H:%M") if not start: # start a new session start = dt start_f = formatted last = dt current_set.append(formatted) hits = 1 else: # if dt > last + delta: # too long. save sesh. start another, if hits > 2 minutes = (last - start) minutes = (minutes.seconds / 60) + 5 if hits > 2: sessions.append( [start_f, hits, minutes,current_set] ) start = dt start_f = formatted last = dt hits = 1 current_set = [formatted] else: # put in current session last = dt current_set.append(formatted) hits += 1 # save last sesh if (last): minutes = (last - start) minutes = (minutes.seconds / 60) + 5 if hits > 2: sessions.append( [start_f,hits,minutes,current_set] ) dd.write(json.dumps(timeline_times)) return sessions def clean_fn(s): s = re.sub(r'[\s:]+','',s) s = re.sub(r'\/','+',s) return s def format_html(html): from bs4 import BeautifulSoup as bs soup = bs(html, 'html.parser') return soup.prettify()