domain/entities.py
Status
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Accepted | YYYY-MM-DD
domain/entities.py
from dataclasses import dataclass from datetime import datetime from typing import Optional, List, Dict, Any from uuid import UUID
@dataclass class Document: """Domain entity for a document""" doc_uuid: UUID filename: str content: str mime_type: str created_at: datetime metadata: Dict[str, Any]
@dataclass class Chunk: """Domain entity for a document chunk""" chunk_uuid: UUID doc_uuid: UUID content: str sequence_num: int start_offset: int end_offset: int embedding: Optional[List[float]] = None
@dataclass class ChunkRelationship: """Domain entity for relationships between chunks""" source_uuid: UUID target_uuid: UUID relationship_type: str weight: float metadata: Dict[str, Any]
domain/value_objects.py
@dataclass class ProcessingOptions: """Value object for document processing options""" chunk_size: int overlap_percentage: float store_embeddings: bool create_relationships: bool
@dataclass class SearchQuery: """Value object for search queries""" query_text: str similarity_threshold: float max_results: int filters: Optional[Dict[str, Any]] = None
domain/ports/repositories.py
from abc import ABC, abstractmethod from typing import Optional, List, Dict, Any from uuid import UUID
class DocumentRepository(ABC): """Repository interface for document operations"""
@abstractmethod
async def create(self, document: Document) -> Document:
pass
@abstractmethod
async def get(self, doc_uuid: UUID) -> Optional[Document]:
pass
@abstractmethod
async def update(self, document: Document) -> Document:
pass
@abstractmethod
async def delete(self, doc_uuid: UUID) -> bool:
pass
class ChunkRepository(ABC): """Repository interface for chunk operations"""
@abstractmethod
async def create_batch(self, chunks: List[Chunk]) -> List[Chunk]:
pass
@abstractmethod
async def get_by_document(self, doc_uuid: UUID) -> List[Chunk]:
pass
@abstractmethod
async def update_embedding(
self,
chunk_uuid: UUID,
embedding: List[float]
) -> bool:
pass
@abstractmethod
async def find_similar(
self,
embedding: List[float],
threshold: float,
limit: int
) -> List[Chunk]:
pass
domain/ports/services.py
from abc import ABC, abstractmethod from typing import List, Dict, Any from uuid import UUID
class EmbeddingService(ABC): """Port for embedding generation service"""
@abstractmethod
async def embed_text(self, text: str) -> List[float]:
pass
@abstractmethod
async def embed_batch(self, texts: List[str]) -> List[List[float]]:
pass
class ProcessingService(ABC): """Port for document processing service"""
@abstractmethod
async def process_document(
self,
document: Document,
options: ProcessingOptions
) -> Document:
pass
infrastructure/persistence/postgresql/repositories.py
class PostgresDocumentRepository(DocumentRepository): """PostgreSQL implementation of document repository"""
def __init__(self, connection_pool):
self.db = connection_pool
async def create(self, document: Document) -> Document:
async with self.db.transaction() as conn:
row = await conn.fetchrow("""
INSERT INTO documents (
doc_uuid, filename, content,
mime_type, created_at, metadata
) VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
""", document.doc_uuid, document.filename,
document.content, document.mime_type,
document.created_at, document.metadata)
return Document(**row)
class PostgresChunkRepository(ChunkRepository): """PostgreSQL implementation of chunk repository"""
def __init__(self, connection_pool):
self.db = connection_pool
async def create_batch(self, chunks: List[Chunk]) -> List[Chunk]:
async with self.db.transaction() as conn:
# Prepare values for batch insert
values = [(
chunk.chunk_uuid,
chunk.doc_uuid,
chunk.content,
chunk.sequence_num,
chunk.start_offset,
chunk.end_offset,
chunk.embedding
) for chunk in chunks]
# Perform batch insert
rows = await conn.fetch("""
INSERT INTO chunks (
chunk_uuid, doc_uuid, content,
sequence_num, start_offset, end_offset,
embedding
)
SELECT * FROM unnest($1::uuid[], $2::uuid[],
$3::text[], $4::int[], $5::int[],
$6::int[], $7::vector[])
RETURNING *
""", *zip(*values))
return [Chunk(**row) for row in rows]
application/services/document_processor.py
class DocumentProcessorService(ProcessingService): """Application service for document processing"""
def __init__(
self,
document_repo: DocumentRepository,
chunk_repo: ChunkRepository,
embedding_service: EmbeddingService
):
self.document_repo = document_repo
self.chunk_repo = chunk_repo
self.embedding_service = embedding_service
async def process_document(
self,
document: Document,
options: ProcessingOptions
) -> Document:
# Create chunks with overlap
chunks = self._create_chunks(
document.content,
options.chunk_size,
options.overlap_percentage
)
# Store chunks
stored_chunks = await self.chunk_repo.create_batch(chunks)
if options.store_embeddings:
# Generate and store embeddings
for chunk in stored_chunks:
embedding = await self.embedding_service.embed_text(
chunk.content
)
await self.chunk_repo.update_embedding(
chunk.chunk_uuid,
embedding
)
return document
def _create_chunks(
self,
content: str,
chunk_size: int,
overlap_percentage: float
) -> List[Chunk]:
"""Create overlapping chunks from content"""
chunks = []
overlap_size = int(chunk_size * overlap_percentage)
start = 0
sequence_num = 0
while start < len(content):
# Calculate end position
end = start + chunk_size
if end < len(content):
# Find next space to avoid breaking words
while end < len(content) and not content[end].isspace():
end += 1
else:
end = len(content)
# Create chunk
chunk_content = content[start:end]
chunks.append(Chunk(
chunk_uuid=uuid4(),
doc_uuid=None, # Set during creation
content=chunk_content,
sequence_num=sequence_num,
start_offset=start,
end_offset=end
))
# Move start position for next chunk
start = end - overlap_size
sequence_num += 1
return chunks
application/services/search_service.py
class SearchService: """Application service for search operations"""
def __init__(
self,
chunk_repo: ChunkRepository,
embedding_service: EmbeddingService
):
self.chunk_repo = chunk_repo
self.embedding_service = embedding_service
async def semantic_search(
self,
query: SearchQuery
) -> List[Dict[str, Any]]:
# Generate query embedding
query_embedding = await self.embedding_service.embed_text(
query.query_text
)
# Find similar chunks
similar_chunks = await self.chunk_repo.find_similar(
query_embedding,
query.similarity_threshold,
query.max_results
)
return [{
'chunk_uuid': chunk.chunk_uuid,
'content': chunk.content,
'sequence_num': chunk.sequence_num,
'similarity': similarity
} for chunk, similarity in similar_chunks]
interfaces/api/routes.py
from fastapi import APIRouter, UploadFile, File, HTTPException from typing import Optional
router = APIRouter()
class DocumentRoutes: """API routes for document operations"""
def __init__(
self,
document_processor: DocumentProcessorService,
search_service: SearchService
):
self.processor = document_processor
self.search = search_service
@router.post("/documents")
async def upload_document(
self,
file: UploadFile = File(...),
chunk_size: Optional[int] = 1000,
overlap_percentage: Optional[float] = 0.1
):
try:
content = await file.read()
if isinstance(content, bytes):
content = content.decode('utf-8')
document = Document(
doc_uuid=uuid4(),
filename=file.filename,
content=content,
mime_type=file.content_type,
created_at=datetime.utcnow(),
metadata={}
)
options = ProcessingOptions(
chunk_size=chunk_size,
overlap_percentage=overlap_percentage,
store_embeddings=True,
create_relationships=True
)
processed_doc = await self.processor.process_document(
document,
options
)
return {
"status": "success",
"doc_uuid": str(processed_doc.doc_uuid)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing document: {str(e)}"
)
@router.get("/documents/{doc_uuid}/search")
async def search_document(
self,
doc_uuid: UUID,
query: str,
threshold: Optional[float] = 0.7,
limit: Optional[int] = 10
):
try:
search_query = SearchQuery(
query_text=query,
similarity_threshold=threshold,
max_results=limit,
filters={"doc_uuid": doc_uuid}
)
results = await self.search.semantic_search(search_query)
return {
"status": "success",
"results": results
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error performing search: {str(e)}"
)