Files
CryoLens/embed-003.py
2026-01-10 03:54:14 -05:00

317 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import os
from pathlib import Path
from tqdm import tqdm
import requests
import json
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import hashlib
from typing import List, Dict, Optional
import time
class AnythingLLMDocumentProcessor:
def __init__(
self,
anythingllm_url: str = "http://10.100.50.16:30239",
anythingllm_api_key: str = "TN4TAY5-BB6M8AP-KF5PWWF-E3TFJ1E",
workspace_slug: str = "dialogue",
qdrant_url: str = "http://10.100.50.16:6333",
collection_name: str = "dialogue"
):
"""
Initialize the document processor for AnythingLLM.
Args:
anythingllm_url: URL of your local AnythingLLM instance
anythingllm_api_key: API key for AnythingLLM
workspace_slug: The slug of your workspace (e.g., 'dialogue')
qdrant_url: URL of your local Qdrant instance
collection_name: Name of the Qdrant collection
"""
self.anythingllm_url = anythingllm_url.rstrip('/')
self.anythingllm_api_key = anythingllm_api_key
self.workspace_slug = workspace_slug
self.qdrant_client = QdrantClient(url=qdrant_url)
self.collection_name = collection_name
self.headers = {
"Accept": "application/json",
}
if self.anythingllm_api_key:
self.headers["Authorization"] = f"Bearer {self.anythingllm_api_key}"
# Verify workspace exists
self._verify_workspace()
def _verify_workspace(self):
"""Verify that the workspace exists and get existing documents."""
try:
url = f"{self.anythingllm_url}/api/v1/workspace/{self.workspace_slug}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
workspace_data = response.json()
print(f"✓ Connected to workspace: {workspace_data.get('workspace', {}).get('name', self.workspace_slug)}")
# Get existing documents in workspace
self.existing_docs = self._get_workspace_documents()
if self.existing_docs:
print(f" Found {len(self.existing_docs)} existing document(s) in workspace")
else:
print(f"⚠ Warning: Could not verify workspace '{self.workspace_slug}'")
print(f" Status code: {response.status_code}")
self.existing_docs = []
except Exception as e:
print(f"⚠ Warning: Could not verify workspace: {e}")
self.existing_docs = []
def _get_workspace_documents(self) -> set:
"""Get list of documents already in the workspace."""
try:
url = f"{self.anythingllm_url}/api/v1/workspace/{self.workspace_slug}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
workspace_data = response.json()
documents = workspace_data.get('workspace', {}).get('documents', [])
# Debug: Print document structure
if documents and len(documents) > 0:
print(f"\n[DEBUG] Sample document structure:")
print(json.dumps(documents[0], indent=2))
# Extract filenames from docpath - try multiple possible fields
doc_names = set()
for doc in documents:
# Try different possible field names
docpath = doc.get('docpath', '') or doc.get('name', '') or doc.get('filename', '')
if docpath:
# Extract just the filename from the path
filename = Path(docpath).name
doc_names.add(filename)
print(f"[DEBUG] Found existing: {filename}")
return doc_names
except Exception as e:
print(f"Warning: Could not retrieve workspace documents: {e}")
return set()
def _is_document_embedded(self, file_name: str) -> bool:
"""Check if a document with this name is already embedded."""
is_embedded = file_name in self.existing_docs
if is_embedded:
print(f"[DEBUG] Skipping '{file_name}' - already embedded")
return is_embedded
def list_workspaces(self):
"""List all available workspaces."""
try:
url = f"{self.anythingllm_url}/api/v1/workspaces"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
workspaces = response.json().get('workspaces', [])
if workspaces:
print("\nAvailable workspaces:")
print("=" * 60)
for ws in workspaces:
print(f" Name: {ws.get('name')}")
print(f" Slug: {ws.get('slug')}")
print("-" * 60)
else:
print("No workspaces found")
return workspaces
except Exception as e:
print(f"Error listing workspaces: {e}")
return []
def _upload_file_to_anythingllm(self, file_path: Path) -> Dict:
"""Upload a file to AnythingLLM for processing."""
url = f"{self.anythingllm_url}/api/v1/document/upload"
try:
with open(file_path, 'rb') as f:
files = {
'file': (file_path.name, f, self._get_mime_type(file_path))
}
response = requests.post(
url,
headers={"Authorization": f"Bearer {self.anythingllm_api_key}"} if self.anythingllm_api_key else {},
files=files
)
response.raise_for_status()
return response.json()
except Exception as e:
raise Exception(f"Failed to upload file: {e}")
def _embed_document_in_workspace(self, document_location: str) -> Dict:
"""Add an uploaded document to the workspace for embedding."""
url = f"{self.anythingllm_url}/api/v1/workspace/{self.workspace_slug}/update-embeddings"
payload = {
"adds": [document_location]
}
try:
response = requests.post(
url,
headers=self.headers,
json=payload
)
response.raise_for_status()
return response.json()
except Exception as e:
raise Exception(f"Failed to embed document in workspace: {e}")
def _get_workspace_embeddings(self) -> List[Dict]:
"""Get all embeddings from the workspace (via Qdrant)."""
try:
# Get all points from Qdrant collection associated with this workspace
points = self.qdrant_client.scroll(
collection_name=self.collection_name,
limit=1000, # Adjust as needed
with_payload=True,
with_vectors=True
)
return points[0] if points else []
except Exception as e:
print(f"Error retrieving embeddings from Qdrant: {e}")
return []
def _get_mime_type(self, file_path: Path) -> str:
"""Get MIME type based on file extension."""
mime_types = {
'.txt': 'text/plain',
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.md': 'text/markdown',
'.csv': 'text/csv',
'.json': 'application/json',
'.html': 'text/html',
}
return mime_types.get(file_path.suffix.lower(), 'application/octet-stream')
def process_folder(self, folder_path: str, file_extensions: List[str] = None, skip_existing: bool = True):
"""
Process all files in a folder and upload to AnythingLLM workspace.
Args:
folder_path: Path to folder containing files
file_extensions: List of file extensions to process (e.g., ['.txt', '.md'])
If None, process all files
skip_existing: If True, skip files that are already embedded in the workspace
"""
folder = Path(folder_path)
if not folder.exists():
raise ValueError(f"Folder not found: {folder_path}")
# Get list of files
if file_extensions:
files = [f for f in folder.rglob('*') if f.is_file() and f.suffix in file_extensions]
else:
files = [f for f in folder.rglob('*') if f.is_file()]
if not files:
print("No files found to process")
return
# Filter out already embedded files if requested
if skip_existing:
original_count = len(files)
files = [f for f in files if not self._is_document_embedded(f.name)]
skipped = original_count - len(files)
if skipped > 0:
print(f"⊘ Skipping {skipped} already embedded file(s)")
if not files:
print("\n✓ All files are already embedded in the workspace!")
return
print(f"\nFound {len(files)} file(s) to process")
print(f"Target workspace: {self.workspace_slug}\n")
# Process each file with progress bar
successful = 0
failed = 0
for file_path in tqdm(files, desc="Processing files", unit="file"):
try:
self._process_single_file(file_path)
successful += 1
except Exception as e:
failed += 1
tqdm.write(f"✗ Failed to process {file_path.name}: {str(e)}")
print(f"\n{'='*60}")
print(f"Processing complete!")
print(f"✓ Successful: {successful}/{len(files)}")
print(f"✗ Failed: {failed}/{len(files)}")
print(f"{'='*60}\n")
# Show how to access in UI
print(f"You can now access these documents in AnythingLLM UI:")
print(f"{self.anythingllm_url}/workspace/{self.workspace_slug}")
def _process_single_file(self, file_path: Path):
"""Process a single file: upload and embed in workspace."""
steps = ['Uploading', 'Embedding']
with tqdm(total=len(steps), desc=f" {file_path.name}",
leave=False, unit="step", position=1) as pbar:
# Step 1: Upload file to AnythingLLM
pbar.set_description(f" {file_path.name} - Uploading")
upload_result = self._upload_file_to_anythingllm(file_path)
# Extract document location from upload result
documents = upload_result.get('documents', [])
if not documents:
raise Exception("No document location returned from upload")
document_location = documents[0].get('location')
if not document_location:
raise Exception("Document location not found in upload response")
pbar.update(1)
# Step 2: Embed document in workspace
pbar.set_description(f" {file_path.name} - Embedding")
embed_result = self._embed_document_in_workspace(document_location)
pbar.update(1)
# Small delay to avoid rate limiting
time.sleep(0.1)
def main():
"""Example usage"""
# Configuration - defaults are now set in the class
FOLDER_PATH = "./documents" # Change to your folder path
# Specify file types to process
FILE_EXTENSIONS = ['.txt', '.md', '.pdf', '.docx', '.csv', '.json']
# Initialize processor with default values (already configured in __init__)
processor = AnythingLLMDocumentProcessor()
# Optional: List available workspaces
# processor.list_workspaces()
# Process folder
processor.process_folder(FOLDER_PATH, file_extensions=FILE_EXTENSIONS)
# Or to force re-embedding of all files (including existing ones):
# processor.process_folder(FOLDER_PATH, file_extensions=FILE_EXTENSIONS, skip_existing=False)
if __name__ == "__main__":
main()