Skip to main content

C4 Architecture Diagram - Level 4: Code

CODITECT Context Intelligence Platform

Diagram Level: 4 (Code) Abstraction: Shows actual class structure, methods, and relationships for a specific component Audience: Developers implementing the code Purpose: Understand implementation details and class design


Code Diagram Overview

This diagram provides the detailed class structure for the SearchService component, showing attributes, methods, dependencies, and design patterns. This level is typically only created for complex or critical components.


C4 Level 4 Diagram (Mermaid) - SearchService Class


Class Descriptions

1. SearchService (Primary Focus)

Purpose: Orchestrate hybrid search combining keyword (PostgreSQL) and semantic (Weaviate) search results.

Design Patterns:

  • Dependency Injection: Receives repositories via constructor
  • Strategy Pattern: RRF fusion algorithm encapsulated in private methods
  • Repository Pattern: Delegates data access to repositories

Attributes:

class SearchService:
def __init__(
self,
conversation_repo: ConversationRepository,
weaviate_client: WeaviateClient,
rrf_k: int = 60
):
"""
Initialize search service with dependencies.

Args:
conversation_repo: Repository for conversation data access
weaviate_client: Client for vector search operations
rrf_k: RRF constant (default: 60, higher = less rank-biased)
"""
self.conversation_repo = conversation_repo
self.weaviate_client = weaviate_client
self.rrf_k = rrf_k

Public Methods:

hybrid_search(org_id, query, alpha, limit) -> List[Conversation]

Purpose: Perform hybrid search combining keyword and semantic results

Algorithm:

  1. Run keyword and semantic searches in parallel
  2. Fuse results using Reciprocal Rank Fusion (RRF)
  3. Sort by fused score
  4. Return top N results

Parameters:

  • org_id (UUID): Organization ID for multi-tenant isolation
  • query (str): Search query text
  • alpha (float): Weight for keyword search (0.0 = semantic only, 1.0 = keyword only)
  • limit (int): Maximum results to return

Returns: List of Conversation objects with relevance_score attribute

Example:

# Search for "authentication bug" with balanced hybrid search
results = await search_service.hybrid_search(
org_id=org_123,
query="authentication bug",
alpha=0.5, # 50% keyword, 50% semantic
limit=20
)

for conversation in results:
print(f"{conversation.title} - Score: {conversation.relevance_score:.3f}")

Implementation:

async def hybrid_search(
self,
organization_id: UUID,
query: str,
limit: int = 20,
alpha: float = 0.5
) -> List[Conversation]:
"""
Perform hybrid search using RRF fusion.

Combines keyword search (PostgreSQL full-text) with semantic search
(Weaviate vector similarity) using Reciprocal Rank Fusion algorithm.

Time Complexity: O(n log n) where n = limit * 2
Space Complexity: O(n)
"""
# Run both searches in parallel (async)
keyword_task = self.keyword_search(organization_id, query, limit * 2)
semantic_task = self.semantic_search(organization_id, query, limit * 2)

keyword_results, semantic_results = await asyncio.gather(
keyword_task,
semantic_task
)

# Fuse results using RRF
fused_results = self._rrf_fusion(
keyword_results,
semantic_results,
alpha
)

# Return top N results
return fused_results[:limit]

keyword_search(org_id, query, limit) -> List[Conversation]

Purpose: Perform full-text keyword search using PostgreSQL

Implementation:

async def keyword_search(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""
Perform keyword search using PostgreSQL full-text search.

Uses tsvector index on (title, content) for fast search.

Time Complexity: O(log n) with GIN index
Space Complexity: O(limit)
"""
return await self.conversation_repo.search_by_keyword(
organization_id,
query,
limit
)

semantic_search(org_id, query, limit) -> List[Conversation]

Purpose: Perform semantic similarity search using Weaviate

Implementation:

async def semantic_search(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""
Perform semantic search using Weaviate vector similarity.

Converts query to embedding vector using OpenAI API, then performs
cosine similarity search in Weaviate.

Time Complexity: O(n) with HNSW index approximation
Space Complexity: O(limit)
"""
# Generate embedding for query (via background service)
query_vector = await self.embedding_service.generate_embedding(query)

# Search Weaviate with vector
return await self.weaviate_client.search_semantic(
organization_id,
query_vector,
limit
)

Private Methods:

_rrf_fusion(keyword_results, semantic_results, alpha) -> List[Conversation]

Purpose: Fuse keyword and semantic results using RRF algorithm

Algorithm (Reciprocal Rank Fusion):

For each conversation c:
if c in keyword_results:
keyword_score = 1 / (k + rank_in_keyword)
else:
keyword_score = 0

if c in semantic_results:
semantic_score = 1 / (k + rank_in_semantic)
else:
semantic_score = 0

final_score = alpha * keyword_score + (1 - alpha) * semantic_score

Sort conversations by final_score descending
Return sorted list

Implementation:

def _rrf_fusion(
self,
keyword_results: List[Conversation],
semantic_results: List[Conversation],
alpha: float
) -> List[Conversation]:
"""
Fuse keyword and semantic results using RRF.

RRF Formula: score(c) = Σ 1 / (k + rank(c))
where k = rrf_k (default: 60)

Args:
keyword_results: Results from keyword search (ranked)
semantic_results: Results from semantic search (ranked)
alpha: Weight for keyword (0.0-1.0)

Returns:
Fused results sorted by score descending
"""
# Build score map
scores: Dict[UUID, float] = {}

# Calculate keyword scores
for rank, conversation in enumerate(keyword_results, start=1):
keyword_score = self._calculate_rrf_score(rank, self.rrf_k)
scores[conversation.id] = alpha * keyword_score

# Calculate semantic scores
for rank, conversation in enumerate(semantic_results, start=1):
semantic_score = self._calculate_rrf_score(rank, self.rrf_k)
if conversation.id in scores:
scores[conversation.id] += (1 - alpha) * semantic_score
else:
scores[conversation.id] = (1 - alpha) * semantic_score

# Combine all unique conversations
all_conversations = {c.id: c for c in keyword_results}
all_conversations.update({c.id: c for c in semantic_results})

# Attach scores and sort
for conv_id, score in scores.items():
all_conversations[conv_id].relevance_score = score

sorted_conversations = sorted(
all_conversations.values(),
key=lambda c: c.relevance_score,
reverse=True
)

return sorted_conversations

_calculate_rrf_score(rank, k) -> float

Purpose: Calculate RRF score for a single rank

Formula: score = 1 / (k + rank)

Implementation:

@staticmethod
def _calculate_rrf_score(rank: int, k: int) -> float:
"""
Calculate RRF score for a given rank.

Args:
rank: Position in ranked list (1-indexed)
k: RRF constant (default: 60)

Returns:
RRF score (higher is better)

Example:
rank=1, k=60 → score = 1/61 ≈ 0.0164
rank=2, k=60 → score = 1/62 ≈ 0.0161
rank=10, k=60 → score = 1/70 ≈ 0.0143
"""
return 1.0 / (k + rank)

2. ConversationRepository (Interface)

Purpose: Abstract data access interface for conversations

Design Pattern: Repository Pattern (interface/implementation separation)

Why Interface?:

  • Allows swapping implementations (PostgreSQL → MySQL → DynamoDB)
  • Enables mock implementations for unit testing
  • Enforces consistent API across implementations

Interface Definition:

from abc import ABC, abstractmethod
from typing import List, Optional
from uuid import UUID

class ConversationRepository(ABC):
"""Abstract repository for conversation data access."""

@abstractmethod
async def create(self, conversation: Conversation) -> Conversation:
"""Create a new conversation."""
pass

@abstractmethod
async def get_by_id(
self,
conversation_id: UUID,
organization_id: UUID
) -> Optional[Conversation]:
"""Get conversation by ID (multi-tenant safe)."""
pass

@abstractmethod
async def list(
self,
organization_id: UUID,
page: int = 1,
limit: int = 20
) -> List[Conversation]:
"""List conversations with pagination."""
pass

@abstractmethod
async def search_by_keyword(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""Full-text keyword search."""
pass

@abstractmethod
async def update(self, conversation: Conversation) -> Conversation:
"""Update existing conversation."""
pass

@abstractmethod
async def delete(
self,
conversation_id: UUID,
organization_id: UUID
) -> bool:
"""Delete conversation (soft delete)."""
pass

3. PostgreSQLConversationRepository (Implementation)

Purpose: PostgreSQL implementation of ConversationRepository

Attributes:

class PostgreSQLConversationRepository(ConversationRepository):
def __init__(self, db: AsyncSession):
"""
Initialize repository with database session.

Args:
db: SQLAlchemy async session
"""
self.db = db

Key Method:

search_by_keyword(org_id, query, limit)

Purpose: Implement full-text search using PostgreSQL

Implementation:

async def search_by_keyword(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""
Search conversations using PostgreSQL full-text search.

Uses GIN index on tsvector for fast search:
CREATE INDEX idx_conversations_fts ON conversations
USING GIN (to_tsvector('english', title || ' ' || content));

Time Complexity: O(log n) with index
"""
# Convert query to tsquery format
tsquery = ' & '.join(query.split())

sql = """
SELECT
id, organization_id, user_id, title, created_at, updated_at,
message_count, token_count,
ts_rank(
to_tsvector('english', title || ' ' || content),
to_tsquery('english', :tsquery)
) AS relevance_score
FROM conversations
WHERE
organization_id = :org_id
AND to_tsvector('english', title || ' ' || content)
@@ to_tsquery('english', :tsquery)
ORDER BY relevance_score DESC
LIMIT :limit
"""

result = await self.db.execute(
text(sql),
{"org_id": organization_id, "tsquery": tsquery, "limit": limit}
)

rows = result.fetchall()
return [Conversation.from_dict(dict(row)) for row in rows]

Private Method:

_apply_rls_filter(query, org_id)

Purpose: Enforce Row-Level Security on all queries

Implementation:

def _apply_rls_filter(self, query, organization_id: UUID):
"""
Apply organization_id filter to query (RLS enforcement).

CRITICAL: This ensures multi-tenant data isolation.
MUST be called on every query.
"""
return query.filter(Conversation.organization_id == organization_id)

4. WeaviateClient (External Service Wrapper)

Purpose: Wrapper for Weaviate Cloud API with multi-tenancy support

Attributes:

class WeaviateClient:
def __init__(
self,
url: str,
api_key: str,
class_name: str = "Conversation"
):
"""
Initialize Weaviate client.

Args:
url: Weaviate Cloud URL
api_key: API key for authentication
class_name: Weaviate class name
"""
self.client = weaviate.Client(
url=url,
auth_client_secret=weaviate.AuthApiKey(api_key)
)
self.class_name = class_name

Key Method:

search_semantic(org_id, query_vector, limit)

Purpose: Perform vector similarity search with multi-tenant isolation

Implementation:

async def search_semantic(
self,
organization_id: UUID,
query_vector: List[float],
limit: int = 20
) -> List[Conversation]:
"""
Search by vector similarity using Weaviate.

Uses cosine distance with HNSW index for fast approximate search.

Args:
organization_id: Tenant ID for isolation
query_vector: 1536-dimensional embedding vector
limit: Maximum results

Returns:
List of conversations sorted by similarity descending
"""
# Get tenant-specific client
tenant_client = self._get_tenant_client(organization_id)

# Perform vector search
result = (
tenant_client
.query
.get(self.class_name, ["id", "title", "content", "created_at"])
.with_near_vector({"vector": query_vector})
.with_limit(limit)
.with_additional(["distance"])
.do()
)

# Convert to Conversation objects
conversations = []
for item in result["data"]["Get"][self.class_name]:
conversation = Conversation(
id=UUID(item["id"]),
organization_id=organization_id,
title=item["title"],
created_at=datetime.fromisoformat(item["created_at"]),
relevance_score=1 - item["_additional"]["distance"] # Convert distance to score
)
conversations.append(conversation)

return conversations

Private Method:

_get_tenant_client(org_id)

Purpose: Get Weaviate client for specific tenant

Implementation:

def _get_tenant_client(self, organization_id: UUID):
"""
Get Weaviate client scoped to specific tenant.

Weaviate multi-tenancy ensures complete data isolation.
"""
return self.client.multi_tenancy.get_tenant(str(organization_id))

5. Conversation (Domain Model)

Purpose: Domain model representing a conversation entity

Design Pattern: Anemic Domain Model (data + conversion methods only)

Attributes:

@dataclass
class Conversation:
"""Domain model for AI conversation."""

id: UUID
organization_id: UUID
user_id: UUID
title: str
created_at: datetime
updated_at: datetime
message_count: int
token_count: int
relevance_score: float = 0.0 # Populated by search

def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
"id": str(self.id),
"organization_id": str(self.organization_id),
"user_id": str(self.user_id),
"title": self.title,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"message_count": self.message_count,
"token_count": self.token_count,
"relevance_score": self.relevance_score
}

@classmethod
def from_dict(cls, data: dict) -> "Conversation":
"""Create from dictionary (deserialization)."""
return cls(
id=UUID(data["id"]),
organization_id=UUID(data["organization_id"]),
user_id=UUID(data["user_id"]),
title=data["title"],
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
message_count=data["message_count"],
token_count=data["token_count"],
relevance_score=data.get("relevance_score", 0.0)
)

Class Relationships

Dependency Injection Pattern

SearchService
├─ depends on → ConversationRepository (interface)
│ ├─ injected → PostgreSQLConversationRepository (implementation)
│ └─ injected → MockConversationRepository (test implementation)
└─ depends on → WeaviateClient (concrete class)

Benefits:

  • Testability: Can inject mocks for unit tests
  • Flexibility: Can swap implementations without changing SearchService
  • Decoupling: SearchService doesn't know about PostgreSQL specifics

Dependency Injection Container (FastAPI):

# api/dependencies.py

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

def get_db() -> AsyncSession:
"""Get database session."""
# Implementation omitted
pass

def get_conversation_repository(
db: AsyncSession = Depends(get_db)
) -> ConversationRepository:
"""Get conversation repository."""
return PostgreSQLConversationRepository(db)

def get_weaviate_client() -> WeaviateClient:
"""Get Weaviate client."""
return WeaviateClient(
url=settings.WEAVIATE_URL,
api_key=settings.WEAVIATE_API_KEY
)

def get_search_service(
conversation_repo: ConversationRepository = Depends(get_conversation_repository),
weaviate_client: WeaviateClient = Depends(get_weaviate_client)
) -> SearchService:
"""Get search service with injected dependencies."""
return SearchService(conversation_repo, weaviate_client, rrf_k=60)

Performance Considerations

Time Complexity Analysis

MethodBest CaseAverage CaseWorst CaseNotes
hybrid_search()O(n log n)O(n log n)O(n log n)n = limit * 2 (sorting)
keyword_search()O(log m)O(log m + k)O(m)m = total docs, k = results, with GIN index
semantic_search()O(log m)O(log m + k)O(m)With HNSW index approximation
_rrf_fusion()O(n)O(n log n)O(n log n)n = combined unique results

Space Complexity Analysis

MethodSpaceNotes
hybrid_search()O(n)n = limit * 2 (2 result sets)
_rrf_fusion()O(n)n = combined unique results (dictionary)

Optimization Strategies

  1. Parallel Execution: Keyword and semantic searches run concurrently
  2. Index Usage: PostgreSQL GIN index, Weaviate HNSW index
  3. Result Limiting: Fetch 2x results to account for overlap before fusion
  4. Caching: Cache search results in Redis (60s TTL)

Unit Testing Example

# tests/unit/services/test_search_service.py

import pytest
from unittest.mock import Mock, AsyncMock
from uuid import uuid4

from core.services.search_service import SearchService
from core.models.conversation import Conversation

class TestSearchService:
@pytest.fixture
def mock_conversation_repo(self):
return Mock()

@pytest.fixture
def mock_weaviate_client(self):
return Mock()

@pytest.fixture
def search_service(self, mock_conversation_repo, mock_weaviate_client):
return SearchService(
conversation_repo=mock_conversation_repo,
weaviate_client=mock_weaviate_client,
rrf_k=60
)

@pytest.mark.asyncio
async def test_hybrid_search_fuses_results_correctly(
self,
search_service,
mock_conversation_repo,
mock_weaviate_client
):
# ARRANGE
org_id = uuid4()
query = "authentication bug"

conv1 = Conversation(id=uuid4(), title="Auth bug fix", ...)
conv2 = Conversation(id=uuid4(), title="Security issue", ...)

mock_conversation_repo.search_by_keyword = AsyncMock(return_value=[conv1])
mock_weaviate_client.search_semantic = AsyncMock(return_value=[conv2, conv1])

# ACT
results = await search_service.hybrid_search(org_id, query, alpha=0.5, limit=10)

# ASSERT
assert len(results) == 2
assert results[0].id == conv1.id # conv1 appears in both, should rank first
mock_conversation_repo.search_by_keyword.assert_called_once()
mock_weaviate_client.search_semantic.assert_called_once()

Next Steps: Workflow Diagrams

The Code diagram shows how individual classes are structured. The next set of diagrams (C4 Workflow) will show user workflows and system workflows at different abstraction levels.

See: ../workflows/c4-l1-workflow-system.md


Diagram Maintained By: Engineering Team Last Updated: 2025-11-26 Review Cycle: Monthly (or when SearchService changes) Related Documents: