###
###
### Text / Knowledge Base
###
### How about downloading all possible info / webpages / sources
### related to Gavilan and creating a master search index?
###
### Goals:
### - Scripted approach to allow re-indexing / updating
### - Break everything down into paragraphs
###
### - Script to extract keywords, topics, entities, summaries, questions answered
### from each paragraph or chunk.
### - Use spacy, gensim, nltk, or gpt-3, or a combination of all of them
###
### - Create vector / embeddings for each paragraph
###
### - Enable a vector search engine and connect to front page of gavilan.cc
### - Use that to feed handful of source paragraphs (& prompt) into gpt and
### receive text answers to questions.
import re, os, codecs, requests, trafilatura, pickle, pypandoc
from collections import defaultdict
from pdfminer.high_level import extract_text
from sentence_transformers import SentenceTransformer, util
from util import clean_fn
def demo_vector_search():
from gensim.models import Word2Vec
from gensim.utils import simple_preprocess
import nltk.data
import spacy
# (might have to upgrade pip first...)
# pip install --upgrade click
#
# python -m spacy download en_core_web_sm
# python -m spacy download en_core_web_lg
def is_complete_sentence(text):
#text = text.text
doc = nlp(text)
sentences = list(doc.sents)
if len(sentences) == 1 and text.strip() == sentences[0].text.strip():
return True
return False
sentences = [
"This is an example sentence.",
"Here is another sentence for training."
]
paragraph = """Financial Aid services are available in person! We are happy to assist you with your financial aid needs. If you are interested in visiting the office in person, please review the guidelines for visiting campus and schedule your appointment:
Guidelines for In-Person Financial Aid Services
Due to FERPA regulations, no student information will be given to anyone other than the student without authorization from the student.
We continue to offer virtual services. Financial Aid staff may be reached by email, phone, text, and zoom! Please refer to the contact information and schedules below.
Gavilan-WelcomeCenter_Peer_Mentors.jpg
Do you need assistance filing the FAFSA or California Dream Act Application? Friendly and knowledgeable Peer Mentors are available to assist you virtually and in person! Details below for an online Zoom visit, phone call, or in-person visit with Peer Mentors.
Monday - Friday 8am - 5pm, Student Center
Join Zoom to Connect with a Peer Mentor
Or call (669) 900-6833 and use meeting ID 408 848 4800
MicrosoftTeams-image.png
Do you need assistance with an existing financial aid application, financial aid document submission, or review of your financial aid package? Schedule an in-person, phone, or zoom appointment with our Financial Aid counter.
Mon - Thurs: 9am - 1:00pm, 2:00pm - 5:00pm
Fri: 10am - 2pm
Office: (408) 848-4727 Email: finaid@gavilan.edu
Schedule an In-Person, Phone or Zoom Appointment"""
tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
sentences1 = tokenizer.tokenize(paragraph)
for i,s in enumerate(sentences1):
print(i, "\t", s)
print("\n\n")
#nlp = spacy.load('en_core_web_sm')
nlp = spacy.load('en_core_web_md')
doc = nlp(paragraph)
sentences2 = list(doc.sents)
for i,s in enumerate(sentences2):
t = re.sub(r'\n+',' ',s.text)
is_sentence = 'yes' if is_complete_sentence(t) else 'no '
print(i, " ", is_sentence, " ", t)
print("\n\n")
#for text in sentences2:
# print(text, "is a complete sentence?" , is_complete_sentence(text))
return
tokenized_sentences = [simple_preprocess(s) for s in sentences]
model = Word2Vec(tokenized_sentences, min_count=1, vector_size=100)
example_word = "example"
vector = model.wv[example_word]
print(f"Vector for the word '{example_word}': {vector}")
def makedir():
files = os.listdir('cache/crawl')
#print(files)
files.sort()
for f in files:
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
if m:
name = m.groups()[0]
parts = name.split('+')
print(parts)
def manual_index():
files = os.listdir('cache/crawl')
#print(files)
ii = codecs.open('cache/crawl/index.html','w','utf-8')
ii.write('
Site index
\n')
files.sort()
for f in files:
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
if m:
name = m.groups()[0]
parts = name.split('+')
ii.write('
'+f+'\n')
def my_site():
files = os.listdir('cache/crawl')
output = []
files.sort()
for f in files:
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
if m:
name = m.groups()[0]
parts = name.split('+')
output.append(parts)
return output
## TODO site scraper
## TODO find package that extracts text from web page
### TODO master list of what to index.
## TODO PDFs and DOCXs
## TODO fix urls w/ anchors
def crawl():
import scrapy, logging
from scrapy.crawler import CrawlerProcess
logger = logging.getLogger()
logger.setLevel(level=logging.CRITICAL)
logging.basicConfig(level=logging.CRITICAL)
logger.disabled = True
avoid = ['ezproxy','community\.gavilan\.edu','archive\/tag','archive\/category', 'my\.gavilan\.edu', 'augusoft',
'eis-prod', 'ilearn\.gavilan', 'mailto', 'cgi-bin', 'edu\/old\/schedule',
'admit\/search\.php', 'GavilanTrusteeAreaMaps2022\.pdf', 'schedule\/2019', 'schedule\/2020', 'schedule\/2021',
'schedule\/2022', 'schedule\/previous', ]
class MySpider(scrapy.Spider):
name = 'myspider'
#start_urls = ['https://gavilan.curriqunet.com/catalog/iq/1826']
start_urls = ['https://www.gavilan.edu']
"""
logging.getLogger("scrapy").setLevel(logging.CRITICAL)
logging.getLogger("scrapy.utils.log").setLevel(logging.CRITICAL)
logging.getLogger("scrapy.extensions.telnet").setLevel(logging.CRITICAL)
logging.getLogger("scrapy.middleware").setLevel(logging.CRITICAL)
logging.getLogger("scrapy.core.engine").setLevel(logging.CRITICAL)
logging.getLogger("scrapy.middleware").setLevel(logging.CRITICAL)
logger.disabled = True"""
def parse(self, response):
print('visited:', repr(response.url), 'status:', response.status)
done = 0
if re.search(r'\.pdf$', response.url):
m = re.search(r'\/([^\/]+\.pdf)$', response.url)
if m:
print("saving to ", save_folder + '/' + clean_fn(response.url))
pdf_response = requests.get(response.url)
with open(save_folder + '/' + clean_fn(response.url), 'wb') as f:
f.write(pdf_response.content)
text = extract_text(save_folder + '/' + clean_fn(response.url))
codecs.open(save_folder + '/' + clean_fn(response.url) + '.txt','w','utf-8').write(text)
done = 1
for ext in ['doc','docx','ppt','pptx','rtf','xls','xlsx']:
if re.search(r'\.'+ext+'$', response.url):
m = re.search(r'\/([^\/]+\.'+ext+')$', response.url)
if m:
print("saving to ", save_folder + '/' + clean_fn(response.url))
pdf_response = requests.get(response.url)
with open(save_folder + '/' + clean_fn(response.url), 'wb') as f:
f.write(pdf_response.content)
#text = extract_text(save_folder + '/' + clean_fn(response.url) + '.txt')
pandoc_infile = save_folder + '/' + clean_fn(response.url)
pandoc_outfile = save_folder + '/' + clean_fn(response.url) + '.html'
print("pandoc in file: %s" % pandoc_infile)
print("pandoc outfile: %s" % pandoc_outfile)
pypandoc.convert_file(pandoc_infile, 'html', outputfile=pandoc_outfile, extra_args=['--from=%s' % ext, '--extract-media=%s' % save_folder + '/img' ])
pandoc_output = codecs.open(pandoc_outfile,'r','utf-8').read()
txt_output = trafilatura.extract(pandoc_output,include_links=True, deduplicate=True, include_images=True, include_formatting=True)
if txt_output:
codecs.open(save_folder + '/' + clean_fn(response.url) + '.txt','w','utf-8').write(txt_output)
done = 1
for ext in ['jpg','jpeg','gif','webp','png','svg','bmp','tiff','tif','ico']:
if re.search(r'\.'+ext+'$', response.url):
m = re.search(r'\/([^\/]+\.'+ext+')$', response.url)
if m:
print("saving to ", save_folder + '/img/' + clean_fn(response.url))
pdf_response = requests.get(response.url)
with open(save_folder + '/img/' + clean_fn(response.url), 'wb') as f:
f.write(pdf_response.content)
done = 1
if not done:
f_out = codecs.open(save_folder + '/' + clean_fn(response.url) + '.txt','w','utf-8')
this_output = trafilatura.extract(response.text,include_links=True, deduplicate=True, include_images=True, include_formatting=True)
if this_output:
f_out.write(this_output)
f_out.close()
links = response.css('a::attr(href)').getall()
# Follow each link and parse its contents
for link in links:
go = 1
full_link = response.urljoin(link)
print('++++++ trying ', full_link)
if not re.search(r'gavilan\.edu',full_link):
go = 0
print('--- not gav edu')
else:
if re.search(r'hhh\.gavilan\.edu',full_link):
pass
elif not re.search(r'^https?:\/\/www\.gavilan\.edu',full_link):
# need to add www to gavilan.edu
m = re.search(r'^(https?:\/\/)gavilan\.edu(\/.*)$',full_link)
if m:
full_link = m.group(1) + 'www.' + m.group(2)
for a in avoid:
if re.search(a,full_link):
go = 0
print('--- avoid ', a)
if go: yield scrapy.Request(full_link, callback=self.parse,
headers={"User-Agent": "Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"})
else:
print("------ avoiding ", full_link)
# Instantiate a CrawlerProcess object
process = CrawlerProcess()
# Add the MySpider spider to the process
process.crawl(MySpider)
# Start the process
logging.basicConfig(level=logging.CRITICAL)
logging.getLogger('scrapy').propagate = False
logging.getLogger("trafilatura").setLevel(logging.CRITICAL)
logging.getLogger("trafilatura").propagate = False
logging.getLogger("pdfminer").setLevel(logging.CRITICAL)
logging.getLogger("pdfminer").propagate = False
logging.getLogger("urllib3").setLevel(logging.CRITICAL)
logging.getLogger("urllib3").propagate = False
logging.basicConfig(level=logging.CRITICAL)
process.start()
save_folder = 'cache/crawl'
clean_folder = 'cache/cleancrawl'
def txt_clean_index():
files = os.listdir(save_folder)
line_freq = defaultdict(int)
# first pass
for f in files:
lines = codecs.open(save_folder + '/' + f,'r','utf-8').readlines()
for L in lines:
L = L.strip()
line_freq[L] += 1
# second pass
for f in files:
print("\n\n",f)
lines = codecs.open(save_folder + '/' + f,'r','utf-8').readlines()
out = codecs.open(clean_folder + '/' + f,'w','utf-8')
for L in lines:
L = L.strip()
if L in line_freq and line_freq[L] > 3:
continue
print(L)
out.write(L + '\n')
out.close()
from whoosh import fields, columns
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID, STORED, NUMERIC
from whoosh.qparser import QueryParser
from whoosh.analysis import StemmingAnalyzer
def priority_from_url(url):
priority = 1
# url is like this: https++www.gavilan.edu+news+Newsletters.php.txt
m = re.search(r'gavilan\.edu\+(.*)\.\w\w\w\w?$',url)
if m:
address = m.group(1)
parts = address.split('+')
if parts[0] in ['accreditation','curriculum','senate','research','old','committee','board','styleguide']:
priority += 20
if parts[0] in ['news','IT','HOM','administration']:
priority += 10
if parts[0] == 'admit' and parts[1] == 'schedule':
priority += 10
if 'accreditation' in parts:
priority += 50
if re.search(r'hhh\.gavilan\.edu',url):
priority += 100
priority *= len(parts)
#print(priority, parts)
else:
priority *= 50
#print(priority, url)
return priority
def test_priority():
ff = os.listdir('cache/crawl')
for f in ff:
priority_from_url(f)
def displayfile(f,aslist=0):
lines = codecs.open('cache/crawl/' + f,'r','utf-8').readlines()
lines = [L.strip() for L in lines]
lines = [L for L in lines if L and not re.search(r'^\|$',L)]
if aslist:
return lines
return "\n".join(lines)
def any_match(line, words):
# true if any of the words are in line
for w in words:
if re.search(w, line, re.IGNORECASE):
return True
return False
def find_match_line(filename, query):
q_words = query.split(" ")
lines = codecs.open('cache/crawl/' + filename,'r','utf-8').readlines()
lines = [L.strip() for L in lines]
lines = [L for L in lines if L and not re.search(r'^\|$',L)]
lines = [L for L in lines if any_match(L, q_words)]
return "\n".join(lines)
def search_index():
s = ''
schema = Schema(url=STORED, title=TEXT(stored=True), content=TEXT, priority=fields.COLUMN(columns.NumericColumn("i")))
ix = open_dir("cache/searchindex")
#with ix.reader() as reader:
#print(reader.doc_count()) # number of documents in the index
#print(reader.doc_frequency("content", "example")) # number of documents that contain the term "example" in the "content" field
#print(reader.field_length("content")) # total number of terms in the "content" field
#print(reader.term_info("content", "example")) # information about the term "example" in the "content" field
#print(reader.dump()) # overview of the entire index
while s != 'q':
s = input("search or 'q' to quit: ")
if s == 'q':
return
# Define the query parser for the index
with ix.searcher() as searcher:
query_parser = QueryParser("content", schema=schema)
# Parse the user's query
query = query_parser.parse(s)
print(query)
# Search the index for documents matching the query
results = searcher.search(query, sortedby="priority")
# Print the results
i = 1
for result in results:
print(i, result) # result["url"], result["content"])
print(find_match_line(result['url'], s))
print()
i += 1
def create_search_index():
# Define the schema for the index
stem_ana = StemmingAnalyzer()
schema = Schema(url=STORED, title=TEXT(stored=True), content=TEXT, priority=fields.COLUMN(columns.NumericColumn("i")))
# Create a new index in the directory "myindex"
ix = create_in("cache/searchindex", schema)
# Open an existing index
#ix = open_dir("cache/searchindex")
# Define the writer for the index
writer = ix.writer()
# Index some documents
files = os.listdir('cache/crawl')
files.sort()
for f in files:
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
if m:
print(f)
writer.add_document(url=f, title=m.group(1), content=displayfile(f), priority=priority_from_url(f))
writer.commit()
from annoy import AnnoyIndex
import random
def test_embed():
model = SentenceTransformer('all-MiniLM-L6-v2')
sample = "What is this world coming to? What happens in the data and the research?"
embed = model.encode(sample)
print("\nSample sentence:", sample)
print("\nEmbedding:", embed)
print("\nEmbedding size:", len(embed))
def create_embeddings():
model = SentenceTransformer('all-MiniLM-L6-v2')
vecsize = 384 # sentence transformer embedding size
t = AnnoyIndex(vecsize, 'angular')
files = os.listdir('cache/crawl')
output = [] # ['index', 'file','sentence']
index = 0
save_embeds = []
files.sort()
for f in files:
print(f)
m = re.match(r'https?..www\.gavilan\.edu\+(.*)\.\w\w\w\w?\.txt$',f)
if m:
lines = displayfile(f,1)
embeddings = model.encode(lines)
print("\n-----", index, f)
for sentence, embedding in zip(lines, embeddings):
if len(sentence.split(' ')) > 5:
print(index, "Sentence:", sentence)
print(embedding[:8])
t.add_item(index, embedding)
output.append( [index,f,sentence] )
index += 1
if index > 500:
break
t.build(30) # 30 trees
t.save('cache/sentences.ann')
pickle.dump( output, open( "cache/embedding_index.p", "wb" ) )
def search_embeddings():
f = 384 # sentence transformer embedding size
n = 10 # how many results
u = AnnoyIndex(f, 'angular')
u.load('cache/sentences.ann') # super fast, will just mmap the file
print(u.get_n_items(), "items in index")
model = SentenceTransformer('all-MiniLM-L6-v2')
search_index = pickle.load( open( "cache/embedding_index.p", "rb" ) )
print(search_index)
s = ''
while s != 'q':
s = input("search or 'q' to quit: ")
if s == 'q':
return
query_embedding = model.encode(s)
results = u.get_nns_by_vector(query_embedding, n)
# Print the top 5 results
for i, r in enumerate(results):
print(f'Top {i+1}: {r}, {search_index[r]}') #{file} - {sentence} - (Score: {score})')
if __name__ == "__main__":
print ('')
options = { 1: ['demo vector search', demo_vector_search],
8: ['crawl',crawl],
9: ['clean text index', txt_clean_index],
10: ['make web dir struct', manual_index],
11: ['create search embeddings', create_embeddings],
12: ['create search index', create_search_index],
13: ['do an index search', search_index],
14: ['do a vector search', search_embeddings],
15: ['test priority', test_priority],
16: ['test embed', test_embed]
}
if len(sys.argv) > 1 and re.search(r'^\d+',sys.argv[1]):
resp = int(sys.argv[1])
print("\n\nPerforming: %s\n\n" % options[resp][0])
else:
print ('')
for key in options:
print(str(key) + '.\t' + options[key][0])
print('')
resp = input('Choose: ')
# Call the function in the options dict
options[ int(resp)][1]()