200 lines
6.5 KiB
Python
200 lines
6.5 KiB
Python
import os, re, uuid, yaml, glob, docx, PyPDF2, json
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
|
|
INDEX_FILE = "kb_index.json"
|
|
WORD_THRESHOLD = 500 # Only generate summary/tags if text > 500 words
|
|
|
|
KB_DIR = "kb" # Directory where your Markdown notes are stored
|
|
UPLOAD_DIR = 'uploads'
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
os.makedirs(KB_DIR, exist_ok=True)
|
|
|
|
|
|
def load_index():
|
|
if os.path.exists(INDEX_FILE):
|
|
with open(INDEX_FILE, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_index(index):
|
|
with open(INDEX_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(index, f, indent=2)
|
|
|
|
# --- Helper Functions for File Extraction ---
|
|
|
|
def extract_text_from_md(file_path):
|
|
"""
|
|
Extract YAML front matter (if any) and body text from a Markdown file.
|
|
"""
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
metadata = {}
|
|
body = content
|
|
if content.startswith("---"):
|
|
# Look for the second '---' delimiter
|
|
match = re.search(r"^---\s*\n(.*?)\n---\s*\n", content, re.DOTALL)
|
|
if match:
|
|
frontmatter_str = match.group(1)
|
|
try:
|
|
metadata = yaml.safe_load(frontmatter_str) or {}
|
|
except Exception as e:
|
|
print(f"Error parsing YAML in {file_path}: {e}")
|
|
body = content[match.end():].strip()
|
|
return metadata, body
|
|
|
|
def extract_text_from_pdf(file_path):
|
|
"""
|
|
Extract text from a PDF file using PyPDF2.
|
|
"""
|
|
text = ""
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
reader = PyPDF2.PdfReader(f)
|
|
for page in reader.pages:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text += page_text + "\n"
|
|
except Exception as e:
|
|
print(f"Error reading PDF {file_path}: {e}")
|
|
return text
|
|
|
|
def extract_text_from_docx(file_path):
|
|
"""
|
|
Extract text from a DOCX file using python-docx.
|
|
"""
|
|
text = ""
|
|
try:
|
|
doc = docx.Document(file_path)
|
|
for para in doc.paragraphs:
|
|
text += para.text + "\n"
|
|
except Exception as e:
|
|
print(f"Error reading DOCX {file_path}: {e}")
|
|
return text
|
|
|
|
def chunk_text(text, max_words=WORD_THRESHOLD):
|
|
"""
|
|
Break text into chunks of up to max_words.
|
|
Attempts to maintain coherence by splitting on word count.
|
|
"""
|
|
words = text.split()
|
|
chunks = []
|
|
for i in range(0, len(words), max_words):
|
|
chunk = " ".join(words[i:i+max_words])
|
|
chunks.append(chunk)
|
|
return chunks
|
|
|
|
# --- LLM Integration ---
|
|
|
|
def llm_summarize_and_tag(text):
|
|
"""
|
|
Stub function to call your LLM API for summarization and tag generation.
|
|
Replace this with an actual API call.
|
|
Returns a tuple: (summary, [tags])
|
|
"""
|
|
# For demonstration, we take the first 100 characters as a 'summary'
|
|
# and return some dummy tags.
|
|
summary = f"Summary: {text[:100]}..."
|
|
tags = ["auto-tag1", "auto-tag2"]
|
|
return summary, tags
|
|
|
|
# --- Processing Function ---
|
|
|
|
def process_file(file_path, index):
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
# Process Markdown notes
|
|
if ext == ".md":
|
|
metadata, body = extract_text_from_md(file_path)
|
|
# If text is long and summary is missing, generate summary and tags.
|
|
if len(body.split()) > WORD_THRESHOLD and "summary" not in metadata:
|
|
summary, auto_tags = llm_summarize_and_tag(body)
|
|
metadata["summary"] = summary
|
|
# Combine with any existing tags.
|
|
if "tags" in metadata:
|
|
metadata["tags"] = list(set(metadata["tags"] + auto_tags))
|
|
else:
|
|
metadata["tags"] = auto_tags
|
|
# Ensure an ID exists (use filename as fallback).
|
|
if "id" not in metadata:
|
|
metadata["id"] = os.path.basename(file_path)
|
|
metadata["body"] = body
|
|
index[metadata["id"]] = metadata
|
|
print(f"Indexed Markdown note: {metadata['id']}")
|
|
|
|
# Process PDFs
|
|
elif ext == ".pdf":
|
|
text = extract_text_from_pdf(file_path)
|
|
chunks = chunk_text(text, max_words=WORD_THRESHOLD)
|
|
for i, chunk in enumerate(chunks):
|
|
note_id = f"{os.path.basename(file_path)}_chunk_{i}"
|
|
note_data = {
|
|
"id": note_id,
|
|
"source": file_path,
|
|
"chunk_index": i,
|
|
"body": chunk
|
|
}
|
|
if len(chunk.split()) > WORD_THRESHOLD:
|
|
summary, auto_tags = llm_summarize_and_tag(chunk)
|
|
note_data["summary"] = summary
|
|
note_data["tags"] = auto_tags
|
|
index[note_id] = note_data
|
|
print(f"Indexed PDF chunk: {note_id}")
|
|
|
|
# Process Word documents (.doc, .docx)
|
|
elif ext in [".doc", ".docx"]:
|
|
text = extract_text_from_docx(file_path)
|
|
chunks = chunk_text(text, max_words=WORD_THRESHOLD)
|
|
for i, chunk in enumerate(chunks):
|
|
note_id = f"{os.path.basename(file_path)}_chunk_{i}"
|
|
note_data = {
|
|
"id": note_id,
|
|
"source": file_path,
|
|
"chunk_index": i,
|
|
"body": chunk
|
|
}
|
|
if len(chunk.split()) > WORD_THRESHOLD:
|
|
summary, auto_tags = llm_summarize_and_tag(chunk)
|
|
note_data["summary"] = summary
|
|
note_data["tags"] = auto_tags
|
|
index[note_id] = note_data
|
|
print(f"Indexed DOCX chunk: {note_id}")
|
|
else:
|
|
print(f"Unsupported file type: {file_path}")
|
|
|
|
# --- Watchdog Event Handler ---
|
|
|
|
class KBEventHandler(FileSystemEventHandler):
|
|
def __init__(self, index):
|
|
self.index = index
|
|
|
|
def process(self, file_path):
|
|
# Only process supported file types.
|
|
if file_path.lower().endswith((".md", ".pdf", ".doc", ".docx")):
|
|
process_file(file_path, self.index)
|
|
save_index(self.index)
|
|
|
|
def on_created(self, event):
|
|
if not event.is_directory:
|
|
self.process(event.src_path)
|
|
|
|
def on_modified(self, event):
|
|
if not event.is_directory:
|
|
self.process(event.src_path)
|
|
|
|
def main():
|
|
index = load_index()
|
|
event_handler = KBEventHandler(index)
|
|
observer = Observer()
|
|
observer.schedule(event_handler, path=KB_DIR, recursive=False)
|
|
observer.start()
|
|
print("Monitoring KB directory for changes...")
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
observer.stop()
|
|
observer.join()
|
|
|
|
if __name__ == "__main__":
|
|
main() |