Files
CryoLens/embed-001.py
2026-01-10 03:54:14 -05:00

267 lines
9.9 KiB
Python
Executable File

#!/usr/bin/env python3
import os
from pathlib import Path
from tqdm import tqdm
import requests
import json
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import hashlib
from typing import List, Dict, Optional
import time
class AnythingLLMDocumentProcessor:
def __init__(
self,
anythingllm_url: str = "http://10.100.50.16:30239",
anythingllm_api_key: str = "TN4TAY5-BB6M8AP-KF5PWWF-E3TFJ1E",
workspace_slug: str = "dialogue",
qdrant_url: str = "http://10.100.50.16:6333",
collection_name: str = "dialogue"
):
"""
Initialize the document processor for AnythingLLM.
Args:
anythingllm_url: URL of your local AnythingLLM instance
anythingllm_api_key: API key for AnythingLLM
workspace_slug: The slug of your workspace (e.g., 'my-workspace')
qdrant_url: URL of your local Qdrant instance
collection_name: Name of the Qdrant collection
"""
self.anythingllm_url = anythingllm_url.rstrip('/')
self.anythingllm_api_key = anythingllm_api_key
self.workspace_slug = workspace_slug
self.qdrant_client = QdrantClient(url=qdrant_url)
self.collection_name = collection_name
self.headers = {
"Accept": "application/json",
}
if self.anythingllm_api_key:
self.headers["Authorization"] = f"Bearer {self.anythingllm_api_key}"
# Verify workspace exists
self._verify_workspace()
def _verify_workspace(self):
"""Verify that the workspace exists."""
try:
url = f"{self.anythingllm_url}/api/v1/workspace/{self.workspace_slug}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
workspace_data = response.json()
print(f"✓ Connected to workspace: {workspace_data.get('workspace', {}).get('name', self.workspace_slug)}")
else:
print(f"⚠ Warning: Could not verify workspace '{self.workspace_slug}'")
print(f" Status code: {response.status_code}")
except Exception as e:
print(f"⚠ Warning: Could not verify workspace: {e}")
def list_workspaces(self):
"""List all available workspaces."""
try:
url = f"{self.anythingllm_url}/api/v1/workspaces"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
workspaces = response.json().get('workspaces', [])
if workspaces:
print("\nAvailable workspaces:")
print("=" * 60)
for ws in workspaces:
print(f" Name: {ws.get('name')}")
print(f" Slug: {ws.get('slug')}")
print("-" * 60)
else:
print("No workspaces found")
return workspaces
except Exception as e:
print(f"Error listing workspaces: {e}")
return []
def _upload_file_to_anythingllm(self, file_path: Path) -> Dict:
"""Upload a file to AnythingLLM for processing."""
url = f"{self.anythingllm_url}/api/v1/document/upload"
try:
with open(file_path, 'rb') as f:
files = {
'file': (file_path.name, f, self._get_mime_type(file_path))
}
response = requests.post(
url,
headers={"Authorization": f"Bearer {self.anythingllm_api_key}"} if self.anythingllm_api_key else {},
files=files
)
response.raise_for_status()
return response.json()
except Exception as e:
raise Exception(f"Failed to upload file: {e}")
def _embed_document_in_workspace(self, document_location: str) -> Dict:
"""Add an uploaded document to the workspace for embedding."""
url = f"{self.anythingllm_url}/api/v1/workspace/{self.workspace_slug}/update-embeddings"
payload = {
"adds": [document_location]
}
try:
response = requests.post(
url,
headers=self.headers,
json=payload
)
response.raise_for_status()
return response.json()
except Exception as e:
raise Exception(f"Failed to embed document in workspace: {e}")
def _get_workspace_embeddings(self) -> List[Dict]:
"""Get all embeddings from the workspace (via Qdrant)."""
try:
# Get all points from Qdrant collection associated with this workspace
points = self.qdrant_client.scroll(
collection_name=self.collection_name,
limit=1000, # Adjust as needed
with_payload=True,
with_vectors=True
)
return points[0] if points else []
except Exception as e:
print(f"Error retrieving embeddings from Qdrant: {e}")
return []
def _get_mime_type(self, file_path: Path) -> str:
"""Get MIME type based on file extension."""
mime_types = {
'.txt': 'text/plain',
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.md': 'text/markdown',
'.csv': 'text/csv',
'.json': 'application/json',
'.html': 'text/html',
}
return mime_types.get(file_path.suffix.lower(), 'application/octet-stream')
def process_folder(self, folder_path: str, file_extensions: List[str] = None):
"""
Process all files in a folder and upload to AnythingLLM workspace.
Args:
folder_path: Path to folder containing files
file_extensions: List of file extensions to process (e.g., ['.txt', '.md'])
If None, process all files
"""
folder = Path(folder_path)
if not folder.exists():
raise ValueError(f"Folder not found: {folder_path}")
# Get list of files
if file_extensions:
files = [f for f in folder.rglob('*') if f.is_file() and f.suffix in file_extensions]
else:
files = [f for f in folder.rglob('*') if f.is_file()]
if not files:
print("No files found to process")
return
print(f"\nFound {len(files)} file(s) to process")
print(f"Target workspace: {self.workspace_slug}\n")
# Process each file with progress bar
successful = 0
failed = 0
for file_path in tqdm(files, desc="Processing files", unit="file"):
try:
self._process_single_file(file_path)
successful += 1
except Exception as e:
failed += 1
tqdm.write(f"✗ Failed to process {file_path.name}: {str(e)}")
print(f"\n{'='*60}")
print(f"Processing complete!")
print(f"✓ Successful: {successful}/{len(files)}")
print(f"✗ Failed: {failed}/{len(files)}")
print(f"{'='*60}\n")
# Show how to access in UI
print(f"You can now access these documents in AnythingLLM UI:")
print(f"{self.anythingllm_url}/workspace/{self.workspace_slug}")
def _process_single_file(self, file_path: Path):
"""Process a single file: upload and embed in workspace."""
steps = ['Uploading', 'Embedding']
with tqdm(total=len(steps), desc=f" {file_path.name}",
leave=False, unit="step", position=1) as pbar:
# Step 1: Upload file to AnythingLLM
pbar.set_description(f" {file_path.name} - Uploading")
upload_result = self._upload_file_to_anythingllm(file_path)
# Extract document location from upload result
documents = upload_result.get('documents', [])
if not documents:
raise Exception("No document location returned from upload")
document_location = documents[0].get('location')
if not document_location:
raise Exception("Document location not found in upload response")
pbar.update(1)
# Step 2: Embed document in workspace
pbar.set_description(f" {file_path.name} - Embedding")
embed_result = self._embed_document_in_workspace(document_location)
pbar.update(1)
# Small delay to avoid rate limiting
time.sleep(0.1)
def main():
"""Example usage"""
# Configuration
FOLDER_PATH = "./documents" # Change to your folder path
ANYTHINGLLM_URL = "http://10.100.50.16:30239" # Your AnythingLLM URL
ANYTHINGLLM_API_KEY = "TN4TAY5-BB6M8AP-KF5PWWF-E3TFJ1E" # Get from AnythingLLM Settings > API Keys
WORKSPACE_SLUG = "dialogue" # Your workspace slug
QDRANT_URL = "http://10.100.50.16:6333" # Your Qdrant URL
COLLECTION_NAME = "dialogue" # Default collection name used by AnythingLLM
# Specify file types to process
FILE_EXTENSIONS = ['.txt', '.md', '.pdf', '.docx', '.csv', '.json']
# Initialize processor
processor = AnythingLLMDocumentProcessor(
anythingllm_url=ANYTHINGLLM_URL,
anythingllm_api_key=ANYTHINGLLM_API_KEY,
workspace_slug=WORKSPACE_SLUG,
qdrant_url=QDRANT_URL,
collection_name=COLLECTION_NAME
)
# Optional: List available workspaces
processor.list_workspaces()
# Process folder
processor.process_folder(FOLDER_PATH, file_extensions=FILE_EXTENSIONS)
if __name__ == "__main__":
main()