import re, csv from collections import defaultdict from bs4 import BeautifulSoup as bs import pytz, datetime, dateutil, json from datetime import timedelta from dateutil import tz import functools from functools import reduce def contains_key_value(lst, x, y): """ Checks if a list contains a dictionary with a specific key-value pair. :param lst: List of dictionaries to search through. :param x: The key to look for. :param y: The value associated with the key. :return: True if a dictionary in the list contains the key-value pair, otherwise False. """ return reduce(lambda acc, item: acc or (isinstance(item, dict) and item.get(x) == y), lst, False) def find_dict_with_key_value(lst, x, y): """ Finds the first dictionary in a list where the key x has the value y. :param lst: List of dictionaries to search through. :param x: The key to look for. :param y: The value associated with the key. :return: The first dictionary containing the key-value pair, or None if not found. """ return next((d for d in lst if isinstance(d, dict) and d.get(x) == y), None) def extract_key_values(lst, x): """ Extracts the values of the given key from a list of dictionaries. :param lst: List of dictionaries to search through. :param x: The key to look for. :return: A list of values corresponding to the key. """ return reduce(lambda acc, item: acc + [item[x]] if isinstance(item, dict) and x in item else acc, lst, []) def stripper(s): REMOVE_ATTRIBUTES = [ 'lang','language','onmouseover','onmouseout','script','style','font', 'dir','face','size','color','style','class','width','height','hspace', 'border','valign','align','background','bgcolor','text','link','vlink', 'alink','cellpadding','cellspacing'] #doc = '''Page title

This is paragraph one.

This is paragraph two.''' soup = bs(s, features='lxml') for tag in soup.recursiveChildGenerator(): try: tag.attrs = {key:value for key,value in tag.attrs.iteritems() if key not in REMOVE_ATTRIBUTES} except AttributeError: # 'NavigableString' object has no attribute 'attrs' pass return soup.prettify() def mycleaner(s): s = re.sub(r'','\n',s) s = re.sub(r'<\/?b>','',s) s = re.sub(r' +',' ',s) s = re.sub(r'^[\s\t\r\n]+$','',s,flags=re.MULTILINE) s = re.sub('^ ','',s) return s def print_table(table): longest_cols = [ (max([len(str(row[i])) for row in table]) + 3) for i in range(len(table[0])) ] row_format = "".join(["{:>" + str(longest_col) + "}" for longest_col in longest_cols]) for row in table: print(row_format.format(*row)) def remove_nl(str): return str.rstrip() def UnicodeDictReader(utf8_data, **kwargs): csv_reader = csv.DictReader(utf8_data, **kwargs) for row in csv_reader: yield {str(key, 'utf-8'):str(value, 'utf-8') for key, value in iter(list(row.items()))} def minimal_string(s): s = s.lower() s = re.sub(r'[^a-zA-Z0-9]',' ',s) s = re.sub(r'(\s+)',' ',s) s = s.strip() return s def to_file_friendly(st): st = st.lower() st = re.sub( r"[^a-z0-9]+","_",st) return st def clean_title(st): sq = re.sub( r"[^a-zA-Z0-9\.\-\!]"," ",st ) if sq: st = sq if len(st)>50: return st[:50]+'...' return st def int_or_zero(x): if x == None: return 0 else: return int(x) def float_or_zero(x): if x == None: return 0 else: return float(x) def match59(x): if x['links']['context']==7959: return True return False def item_2(x): return x[2] def unix_time_millis(dt): wst = pytz.timezone("US/Pacific") epoch = datetime.datetime.fromtimestamp(0) epoch = wst.localize(epoch) return (dt - epoch).total_seconds() * 1000.0 # ENGL250 returns ENGL def dept_from_name(n): m = re.search('^([a-zA-Z]+)\s?[\d\/]+',n) if m: return m.group(1) print(("Couldn't find dept from: " + n)) return '' # ENGL250 returns 250 def num_from_name(n): m = re.search('^([a-zA-Z]+)\s?([\d\/]+[A-Z]?)',n) if m: return m.group(2) print(("Couldn't find num from: " + n)) return '' def most_common_item(li): d = defaultdict(int) for x in li: d[x] += 1 s = sorted(iter(list(d.items())), key=lambda k_v: (k_v[1],k_v[0]), reverse=True) #pdb.set_trace() return s[0][0] def srt_times(a,b): HERE = tz.tzlocal() da = dateutil.parser.parse(a) da = da.astimezone(HERE) db = dateutil.parser.parse(b) db = db.astimezone(HERE) diff = da - db return diff.seconds + diff.days * 24 * 3600 def how_long_ago(a): # number of hours ago 'a' was... if not a: return 9999 HERE = tz.tzlocal() d_now = datetime.datetime.now() d_now = d_now.replace(tzinfo=None) #d_now = d_now.astimezone(HERE) d_then = dateutil.parser.parse(a) d_then = d_then.replace(tzinfo=None) #d_then = d_then.astimezone(HERE) diff = d_now - d_then return (diff.seconds/3600) + (diff.days * 24) + 8 # add 8 hours to get back from UTC timezone def partition(times_list): # get a list of times in this format: 2017-02-14T17:01:46Z # and break them into a list of sessions, [start, hits, minutes] minutes_till_new_session = 26 global dd mm = ['x','Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec'] start = "" last = "" hits = 0 delta = timedelta(minutes=26) HERE = tz.tzlocal() sessions = [] sorted_times_list = sorted(times_list, srt_times) current_set = [] timeline_times = [] for T in sorted_times_list: dt_naive = dateutil.parser.parse(T) dt = dt_naive.astimezone(HERE) timeline_st = unix_time_millis(dt) timeline_et = timeline_st + (1 * 60 * 1000) # always end 1 minute later.... timeline_dict = {} timeline_dict['starting_time'] = timeline_st timeline_dict['ending_time'] = timeline_et timeline_times.append(timeline_dict) month = mm[ int(dt.strftime("%m"))] formatted = month + " " + dt.strftime("%d %H:%M") if not start: # start a new session start = dt start_f = formatted last = dt current_set.append(formatted) hits = 1 else: # if dt > last + delta: # too long. save sesh. start another, if hits > 2 minutes = (last - start) minutes = (minutes.seconds / 60) + 5 if hits > 2: sessions.append( [start_f, hits, minutes,current_set] ) start = dt start_f = formatted last = dt hits = 1 current_set = [formatted] else: # put in current session last = dt current_set.append(formatted) hits += 1 # save last sesh if (last): minutes = (last - start) minutes = (minutes.seconds / 60) + 5 if hits > 2: sessions.append( [start_f,hits,minutes,current_set] ) dd.write(json.dumps(timeline_times)) return sessions