services/vector_service.py
Status
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Accepted | YYYY-MM-DD
services/vector_service.py
from typing import List, Dict, Any, Optional import numpy as np from uuid import UUID
class VectorService: """Handles vector operations and similarity search"""
def __init__(self, db_connection, embedding_model):
self.db = db_connection
self.embedding_model = embedding_model
async def create_embedding(self, text: str) -> np.ndarray:
"""Create embedding for text using the embedding model"""
try:
embedding = await self.embedding_model.embed_text(text)
return np.array(embedding)
except Exception as e:
logger.error(f"Embedding creation failed: {str(e)}")
raise
async def similarity_search(
self,
query_text: str,
threshold: float = 0.7,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Perform similarity search using vector embeddings"""
query_embedding = await self.create_embedding(query_text)
async with self.db.transaction() as conn:
results = await conn.fetch("""
SELECT
c.chunk_uuid,
c.content,
c.doc_uuid,
c.sequence_num,
1 - (c.embedding <=> $1) as similarity
FROM chunks c
WHERE 1 - (c.embedding <=> $1) > $2
ORDER BY similarity DESC
LIMIT $3
""", query_embedding.tolist(), threshold, limit)
return [dict(r) for r in results]
services/graph_service.py
class GraphService: """Manages graph relationships between chunks"""
def __init__(self, db_connection):
self.db = db_connection
async def create_relationships(
self,
source_uuid: UUID,
relationships: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Create multiple relationships for a chunk"""
async with self.db.transaction() as conn:
results = []
for rel in relationships:
row = await conn.fetchrow("""
INSERT INTO chunk_relationships (
source_chunk_uuid,
target_chunk_uuid,
relationship_type,
weight,
metadata
) VALUES ($1, $2, $3, $4, $5)
RETURNING *
""", source_uuid, rel['target_uuid'],
rel['type'], rel['weight'],
rel.get('metadata', {}))
results.append(dict(row))
return results
async def find_related_chunks(
self,
chunk_uuid: UUID,
max_depth: int = 2,
min_weight: float = 0.5
) -> List[Dict[str, Any]]:
"""Find related chunks using graph traversal"""
async with self.db.transaction() as conn:
return await conn.fetch("""
WITH RECURSIVE chunk_tree AS (
-- Base case: direct relationships
SELECT
cr.source_chunk_uuid,
cr.target_chunk_uuid,
cr.relationship_type,
cr.weight,
1 as depth
FROM chunk_relationships cr
WHERE cr.source_chunk_uuid = $1
AND cr.weight >= $3
UNION
-- Recursive case: follow relationships
SELECT
cr.source_chunk_uuid,
cr.target_chunk_uuid,
cr.relationship_type,
cr.weight * ct.weight as weight,
ct.depth + 1
FROM chunk_relationships cr
JOIN chunk_tree ct
ON cr.source_chunk_uuid = ct.target_chunk_uuid
WHERE ct.depth < $2
AND cr.weight >= $3
)
SELECT
c.*,
ct.depth,
ct.weight as relationship_weight
FROM chunk_tree ct
JOIN chunks c ON ct.target_chunk_uuid = c.chunk_uuid
ORDER BY ct.depth, ct.weight DESC
""", chunk_uuid, max_depth, min_weight)
api/routes/document.py
from fastapi import APIRouter, UploadFile, File, BackgroundTasks from typing import List, Optional
router = APIRouter()
class DocumentProcessor: def init( self, document_service, vector_service, graph_service ): self.document_service = document_service self.vector_service = vector_service self.graph_service = graph_service
async def process_document(
self,
content: str,
filename: str,
chunk_size: int = 1000,
overlap: int = 100
) -> Dict[str, Any]:
# Create document and chunks
doc = await self.document_service.create_document(
content=content,
filename=filename
)
# Process chunks with overlap
chunks = await self.document_service.create_chunks(
doc_uuid=doc['doc_uuid'],
content=content,
chunk_size=chunk_size,
overlap=overlap
)
# Create embeddings for chunks
for chunk in chunks:
embedding = await self.vector_service.create_embedding(
chunk['content']
)
await self.document_service.update_chunk_embedding(
chunk['chunk_uuid'],
embedding
)
# Create relationships between chunks
for i in range(len(chunks) - 1):
await self.graph_service.create_relationships(
chunks[i]['chunk_uuid'],
[{
'target_uuid': chunks[i + 1]['chunk_uuid'],
'type': 'sequential',
'weight': 1.0,
'metadata': {'overlap_size': overlap}
}]
)
return {
'doc_uuid': doc['doc_uuid'],
'total_chunks': len(chunks),
'status': 'processed'
}
@router.post("/documents/") async def upload_document( background_tasks: BackgroundTasks, file: UploadFile = File(...), chunk_size: Optional[int] = 1000, overlap: Optional[int] = 100 ): # Read file content content = await file.read() if isinstance(content, bytes): content = content.decode('utf-8')
# Create processor instance
processor = DocumentProcessor(
document_service=document_service,
vector_service=vector_service,
graph_service=graph_service
)
# Add processing task to background tasks
background_tasks.add_task(
processor.process_document,
content=content,
filename=file.filename,
chunk_size=chunk_size,
overlap=overlap
)
return {
"status": "processing",
"filename": file.filename
}
@router.get("/documents/{doc_uuid}/status") async def get_document_status(doc_uuid: UUID): async with db_connection.transaction() as conn: status = await conn.fetchrow(""" SELECT d.status, d.processed_at, COUNT(c.chunk_uuid) as total_chunks, COUNT(c.embedding) as processed_chunks FROM documents d LEFT JOIN chunks c ON d.doc_uuid = c.doc_uuid WHERE d.doc_uuid = $1 GROUP BY d.doc_uuid, d.status, d.processed_at """, doc_uuid)
return dict(status)
@router.get("/documents/{doc_uuid}/search") async def search_document( doc_uuid: UUID, query: str, threshold: Optional[float] = 0.7, limit: Optional[int] = 10 ): # Create services vector_service = VectorService(db_connection, embedding_model) graph_service = GraphService(db_connection)
# Perform vector search
similar_chunks = await vector_service.similarity_search(
query_text=query,
threshold=threshold,
limit=limit
)
# Enhance results with related chunks
enhanced_results = []
for chunk in similar_chunks:
related = await graph_service.find_related_chunks(
chunk_uuid=chunk['chunk_uuid'],
max_depth=2,
min_weight=0.5
)
enhanced_results.append({
**chunk,
'related_chunks': [dict(r) for r in related]
})
return {
"query": query,
"results": enhanced_results
}
api/middleware/error_handler.py
from fastapi import Request from fastapi.responses import JSONResponse from typing import Union import logging
async def error_handler( request: Request, exc: Union[Exception, HTTPException] ) -> JSONResponse: """Global error handler for API endpoints"""
# Log the error
logging.error(f"Error processing request: {str(exc)}")
# Determine status code
if isinstance(exc, HTTPException):
status_code = exc.status_code
else:
status_code = 500
# Create error response
error_response = {
"status": "error",
"message": str(exc),
"type": exc.__class__.__name__,
"path": request.url.path
}
# Add request ID if available
if hasattr(request.state, 'request_id'):
error_response['request_id'] = request.state.request_id
return JSONResponse(
status_code=status_code,
content=error_response
)
main.py
from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(title="Document Processing API")
Add CORS middleware
app.add_middleware( CORSMiddleware, allow_origins=[""], allow_credentials=True, allow_methods=[""], allow_headers=["*"], )
Add error handler
app.exception_handler(Exception)(error_handler)
Include routers
app.include_router(document_router, prefix="/api/v1")
if name == "main": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)