C4 Architecture Diagram - Level 4: Code
CODITECT Context Intelligence Platform
Diagram Level: 4 (Code) Abstraction: Shows actual class structure, methods, and relationships for a specific component Audience: Developers implementing the code Purpose: Understand implementation details and class design
Code Diagram Overview
This diagram provides the detailed class structure for the SearchService component, showing attributes, methods, dependencies, and design patterns. This level is typically only created for complex or critical components.
C4 Level 4 Diagram (Mermaid) - SearchService Class
Class Descriptions
1. SearchService (Primary Focus)
Purpose: Orchestrate hybrid search combining keyword (PostgreSQL) and semantic (Weaviate) search results.
Design Patterns:
- Dependency Injection: Receives repositories via constructor
- Strategy Pattern: RRF fusion algorithm encapsulated in private methods
- Repository Pattern: Delegates data access to repositories
Attributes:
class SearchService:
def __init__(
self,
conversation_repo: ConversationRepository,
weaviate_client: WeaviateClient,
rrf_k: int = 60
):
"""
Initialize search service with dependencies.
Args:
conversation_repo: Repository for conversation data access
weaviate_client: Client for vector search operations
rrf_k: RRF constant (default: 60, higher = less rank-biased)
"""
self.conversation_repo = conversation_repo
self.weaviate_client = weaviate_client
self.rrf_k = rrf_k
Public Methods:
hybrid_search(org_id, query, alpha, limit) -> List[Conversation]
Purpose: Perform hybrid search combining keyword and semantic results
Algorithm:
- Run keyword and semantic searches in parallel
- Fuse results using Reciprocal Rank Fusion (RRF)
- Sort by fused score
- Return top N results
Parameters:
org_id(UUID): Organization ID for multi-tenant isolationquery(str): Search query textalpha(float): Weight for keyword search (0.0 = semantic only, 1.0 = keyword only)limit(int): Maximum results to return
Returns: List of Conversation objects with relevance_score attribute
Example:
# Search for "authentication bug" with balanced hybrid search
results = await search_service.hybrid_search(
org_id=org_123,
query="authentication bug",
alpha=0.5, # 50% keyword, 50% semantic
limit=20
)
for conversation in results:
print(f"{conversation.title} - Score: {conversation.relevance_score:.3f}")
Implementation:
async def hybrid_search(
self,
organization_id: UUID,
query: str,
limit: int = 20,
alpha: float = 0.5
) -> List[Conversation]:
"""
Perform hybrid search using RRF fusion.
Combines keyword search (PostgreSQL full-text) with semantic search
(Weaviate vector similarity) using Reciprocal Rank Fusion algorithm.
Time Complexity: O(n log n) where n = limit * 2
Space Complexity: O(n)
"""
# Run both searches in parallel (async)
keyword_task = self.keyword_search(organization_id, query, limit * 2)
semantic_task = self.semantic_search(organization_id, query, limit * 2)
keyword_results, semantic_results = await asyncio.gather(
keyword_task,
semantic_task
)
# Fuse results using RRF
fused_results = self._rrf_fusion(
keyword_results,
semantic_results,
alpha
)
# Return top N results
return fused_results[:limit]
keyword_search(org_id, query, limit) -> List[Conversation]
Purpose: Perform full-text keyword search using PostgreSQL
Implementation:
async def keyword_search(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""
Perform keyword search using PostgreSQL full-text search.
Uses tsvector index on (title, content) for fast search.
Time Complexity: O(log n) with GIN index
Space Complexity: O(limit)
"""
return await self.conversation_repo.search_by_keyword(
organization_id,
query,
limit
)
semantic_search(org_id, query, limit) -> List[Conversation]
Purpose: Perform semantic similarity search using Weaviate
Implementation:
async def semantic_search(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""
Perform semantic search using Weaviate vector similarity.
Converts query to embedding vector using OpenAI API, then performs
cosine similarity search in Weaviate.
Time Complexity: O(n) with HNSW index approximation
Space Complexity: O(limit)
"""
# Generate embedding for query (via background service)
query_vector = await self.embedding_service.generate_embedding(query)
# Search Weaviate with vector
return await self.weaviate_client.search_semantic(
organization_id,
query_vector,
limit
)
Private Methods:
_rrf_fusion(keyword_results, semantic_results, alpha) -> List[Conversation]
Purpose: Fuse keyword and semantic results using RRF algorithm
Algorithm (Reciprocal Rank Fusion):
For each conversation c:
if c in keyword_results:
keyword_score = 1 / (k + rank_in_keyword)
else:
keyword_score = 0
if c in semantic_results:
semantic_score = 1 / (k + rank_in_semantic)
else:
semantic_score = 0
final_score = alpha * keyword_score + (1 - alpha) * semantic_score
Sort conversations by final_score descending
Return sorted list
Implementation:
def _rrf_fusion(
self,
keyword_results: List[Conversation],
semantic_results: List[Conversation],
alpha: float
) -> List[Conversation]:
"""
Fuse keyword and semantic results using RRF.
RRF Formula: score(c) = Σ 1 / (k + rank(c))
where k = rrf_k (default: 60)
Args:
keyword_results: Results from keyword search (ranked)
semantic_results: Results from semantic search (ranked)
alpha: Weight for keyword (0.0-1.0)
Returns:
Fused results sorted by score descending
"""
# Build score map
scores: Dict[UUID, float] = {}
# Calculate keyword scores
for rank, conversation in enumerate(keyword_results, start=1):
keyword_score = self._calculate_rrf_score(rank, self.rrf_k)
scores[conversation.id] = alpha * keyword_score
# Calculate semantic scores
for rank, conversation in enumerate(semantic_results, start=1):
semantic_score = self._calculate_rrf_score(rank, self.rrf_k)
if conversation.id in scores:
scores[conversation.id] += (1 - alpha) * semantic_score
else:
scores[conversation.id] = (1 - alpha) * semantic_score
# Combine all unique conversations
all_conversations = {c.id: c for c in keyword_results}
all_conversations.update({c.id: c for c in semantic_results})
# Attach scores and sort
for conv_id, score in scores.items():
all_conversations[conv_id].relevance_score = score
sorted_conversations = sorted(
all_conversations.values(),
key=lambda c: c.relevance_score,
reverse=True
)
return sorted_conversations
_calculate_rrf_score(rank, k) -> float
Purpose: Calculate RRF score for a single rank
Formula: score = 1 / (k + rank)
Implementation:
@staticmethod
def _calculate_rrf_score(rank: int, k: int) -> float:
"""
Calculate RRF score for a given rank.
Args:
rank: Position in ranked list (1-indexed)
k: RRF constant (default: 60)
Returns:
RRF score (higher is better)
Example:
rank=1, k=60 → score = 1/61 ≈ 0.0164
rank=2, k=60 → score = 1/62 ≈ 0.0161
rank=10, k=60 → score = 1/70 ≈ 0.0143
"""
return 1.0 / (k + rank)
2. ConversationRepository (Interface)
Purpose: Abstract data access interface for conversations
Design Pattern: Repository Pattern (interface/implementation separation)
Why Interface?:
- Allows swapping implementations (PostgreSQL → MySQL → DynamoDB)
- Enables mock implementations for unit testing
- Enforces consistent API across implementations
Interface Definition:
from abc import ABC, abstractmethod
from typing import List, Optional
from uuid import UUID
class ConversationRepository(ABC):
"""Abstract repository for conversation data access."""
@abstractmethod
async def create(self, conversation: Conversation) -> Conversation:
"""Create a new conversation."""
pass
@abstractmethod
async def get_by_id(
self,
conversation_id: UUID,
organization_id: UUID
) -> Optional[Conversation]:
"""Get conversation by ID (multi-tenant safe)."""
pass
@abstractmethod
async def list(
self,
organization_id: UUID,
page: int = 1,
limit: int = 20
) -> List[Conversation]:
"""List conversations with pagination."""
pass
@abstractmethod
async def search_by_keyword(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""Full-text keyword search."""
pass
@abstractmethod
async def update(self, conversation: Conversation) -> Conversation:
"""Update existing conversation."""
pass
@abstractmethod
async def delete(
self,
conversation_id: UUID,
organization_id: UUID
) -> bool:
"""Delete conversation (soft delete)."""
pass
3. PostgreSQLConversationRepository (Implementation)
Purpose: PostgreSQL implementation of ConversationRepository
Attributes:
class PostgreSQLConversationRepository(ConversationRepository):
def __init__(self, db: AsyncSession):
"""
Initialize repository with database session.
Args:
db: SQLAlchemy async session
"""
self.db = db
Key Method:
search_by_keyword(org_id, query, limit)
Purpose: Implement full-text search using PostgreSQL
Implementation:
async def search_by_keyword(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""
Search conversations using PostgreSQL full-text search.
Uses GIN index on tsvector for fast search:
CREATE INDEX idx_conversations_fts ON conversations
USING GIN (to_tsvector('english', title || ' ' || content));
Time Complexity: O(log n) with index
"""
# Convert query to tsquery format
tsquery = ' & '.join(query.split())
sql = """
SELECT
id, organization_id, user_id, title, created_at, updated_at,
message_count, token_count,
ts_rank(
to_tsvector('english', title || ' ' || content),
to_tsquery('english', :tsquery)
) AS relevance_score
FROM conversations
WHERE
organization_id = :org_id
AND to_tsvector('english', title || ' ' || content)
@@ to_tsquery('english', :tsquery)
ORDER BY relevance_score DESC
LIMIT :limit
"""
result = await self.db.execute(
text(sql),
{"org_id": organization_id, "tsquery": tsquery, "limit": limit}
)
rows = result.fetchall()
return [Conversation.from_dict(dict(row)) for row in rows]
Private Method:
_apply_rls_filter(query, org_id)
Purpose: Enforce Row-Level Security on all queries
Implementation:
def _apply_rls_filter(self, query, organization_id: UUID):
"""
Apply organization_id filter to query (RLS enforcement).
CRITICAL: This ensures multi-tenant data isolation.
MUST be called on every query.
"""
return query.filter(Conversation.organization_id == organization_id)
4. WeaviateClient (External Service Wrapper)
Purpose: Wrapper for Weaviate Cloud API with multi-tenancy support
Attributes:
class WeaviateClient:
def __init__(
self,
url: str,
api_key: str,
class_name: str = "Conversation"
):
"""
Initialize Weaviate client.
Args:
url: Weaviate Cloud URL
api_key: API key for authentication
class_name: Weaviate class name
"""
self.client = weaviate.Client(
url=url,
auth_client_secret=weaviate.AuthApiKey(api_key)
)
self.class_name = class_name
Key Method:
search_semantic(org_id, query_vector, limit)
Purpose: Perform vector similarity search with multi-tenant isolation
Implementation:
async def search_semantic(
self,
organization_id: UUID,
query_vector: List[float],
limit: int = 20
) -> List[Conversation]:
"""
Search by vector similarity using Weaviate.
Uses cosine distance with HNSW index for fast approximate search.
Args:
organization_id: Tenant ID for isolation
query_vector: 1536-dimensional embedding vector
limit: Maximum results
Returns:
List of conversations sorted by similarity descending
"""
# Get tenant-specific client
tenant_client = self._get_tenant_client(organization_id)
# Perform vector search
result = (
tenant_client
.query
.get(self.class_name, ["id", "title", "content", "created_at"])
.with_near_vector({"vector": query_vector})
.with_limit(limit)
.with_additional(["distance"])
.do()
)
# Convert to Conversation objects
conversations = []
for item in result["data"]["Get"][self.class_name]:
conversation = Conversation(
id=UUID(item["id"]),
organization_id=organization_id,
title=item["title"],
created_at=datetime.fromisoformat(item["created_at"]),
relevance_score=1 - item["_additional"]["distance"] # Convert distance to score
)
conversations.append(conversation)
return conversations
Private Method:
_get_tenant_client(org_id)
Purpose: Get Weaviate client for specific tenant
Implementation:
def _get_tenant_client(self, organization_id: UUID):
"""
Get Weaviate client scoped to specific tenant.
Weaviate multi-tenancy ensures complete data isolation.
"""
return self.client.multi_tenancy.get_tenant(str(organization_id))
5. Conversation (Domain Model)
Purpose: Domain model representing a conversation entity
Design Pattern: Anemic Domain Model (data + conversion methods only)
Attributes:
@dataclass
class Conversation:
"""Domain model for AI conversation."""
id: UUID
organization_id: UUID
user_id: UUID
title: str
created_at: datetime
updated_at: datetime
message_count: int
token_count: int
relevance_score: float = 0.0 # Populated by search
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
"id": str(self.id),
"organization_id": str(self.organization_id),
"user_id": str(self.user_id),
"title": self.title,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"message_count": self.message_count,
"token_count": self.token_count,
"relevance_score": self.relevance_score
}
@classmethod
def from_dict(cls, data: dict) -> "Conversation":
"""Create from dictionary (deserialization)."""
return cls(
id=UUID(data["id"]),
organization_id=UUID(data["organization_id"]),
user_id=UUID(data["user_id"]),
title=data["title"],
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
message_count=data["message_count"],
token_count=data["token_count"],
relevance_score=data.get("relevance_score", 0.0)
)
Class Relationships
Dependency Injection Pattern
SearchService
├─ depends on → ConversationRepository (interface)
│ ├─ injected → PostgreSQLConversationRepository (implementation)
│ └─ injected → MockConversationRepository (test implementation)
└─ depends on → WeaviateClient (concrete class)
Benefits:
- Testability: Can inject mocks for unit tests
- Flexibility: Can swap implementations without changing SearchService
- Decoupling: SearchService doesn't know about PostgreSQL specifics
Dependency Injection Container (FastAPI):
# api/dependencies.py
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
def get_db() -> AsyncSession:
"""Get database session."""
# Implementation omitted
pass
def get_conversation_repository(
db: AsyncSession = Depends(get_db)
) -> ConversationRepository:
"""Get conversation repository."""
return PostgreSQLConversationRepository(db)
def get_weaviate_client() -> WeaviateClient:
"""Get Weaviate client."""
return WeaviateClient(
url=settings.WEAVIATE_URL,
api_key=settings.WEAVIATE_API_KEY
)
def get_search_service(
conversation_repo: ConversationRepository = Depends(get_conversation_repository),
weaviate_client: WeaviateClient = Depends(get_weaviate_client)
) -> SearchService:
"""Get search service with injected dependencies."""
return SearchService(conversation_repo, weaviate_client, rrf_k=60)
Performance Considerations
Time Complexity Analysis
| Method | Best Case | Average Case | Worst Case | Notes |
|---|---|---|---|---|
| hybrid_search() | O(n log n) | O(n log n) | O(n log n) | n = limit * 2 (sorting) |
| keyword_search() | O(log m) | O(log m + k) | O(m) | m = total docs, k = results, with GIN index |
| semantic_search() | O(log m) | O(log m + k) | O(m) | With HNSW index approximation |
| _rrf_fusion() | O(n) | O(n log n) | O(n log n) | n = combined unique results |
Space Complexity Analysis
| Method | Space | Notes |
|---|---|---|
| hybrid_search() | O(n) | n = limit * 2 (2 result sets) |
| _rrf_fusion() | O(n) | n = combined unique results (dictionary) |
Optimization Strategies
- Parallel Execution: Keyword and semantic searches run concurrently
- Index Usage: PostgreSQL GIN index, Weaviate HNSW index
- Result Limiting: Fetch 2x results to account for overlap before fusion
- Caching: Cache search results in Redis (60s TTL)
Unit Testing Example
# tests/unit/services/test_search_service.py
import pytest
from unittest.mock import Mock, AsyncMock
from uuid import uuid4
from core.services.search_service import SearchService
from core.models.conversation import Conversation
class TestSearchService:
@pytest.fixture
def mock_conversation_repo(self):
return Mock()
@pytest.fixture
def mock_weaviate_client(self):
return Mock()
@pytest.fixture
def search_service(self, mock_conversation_repo, mock_weaviate_client):
return SearchService(
conversation_repo=mock_conversation_repo,
weaviate_client=mock_weaviate_client,
rrf_k=60
)
@pytest.mark.asyncio
async def test_hybrid_search_fuses_results_correctly(
self,
search_service,
mock_conversation_repo,
mock_weaviate_client
):
# ARRANGE
org_id = uuid4()
query = "authentication bug"
conv1 = Conversation(id=uuid4(), title="Auth bug fix", ...)
conv2 = Conversation(id=uuid4(), title="Security issue", ...)
mock_conversation_repo.search_by_keyword = AsyncMock(return_value=[conv1])
mock_weaviate_client.search_semantic = AsyncMock(return_value=[conv2, conv1])
# ACT
results = await search_service.hybrid_search(org_id, query, alpha=0.5, limit=10)
# ASSERT
assert len(results) == 2
assert results[0].id == conv1.id # conv1 appears in both, should rank first
mock_conversation_repo.search_by_keyword.assert_called_once()
mock_weaviate_client.search_semantic.assert_called_once()
Next Steps: Workflow Diagrams
The Code diagram shows how individual classes are structured. The next set of diagrams (C4 Workflow) will show user workflows and system workflows at different abstraction levels.
See: ../workflows/c4-l1-workflow-system.md
Diagram Maintained By: Engineering Team Last Updated: 2025-11-26 Review Cycle: Monthly (or when SearchService changes) Related Documents: