Skip to main content

C4 Architecture Diagram - Level 3: Component

CODITECT Context Intelligence Platform

Diagram Level: 3 (Component) Abstraction: Shows internal components within each container Audience: Developers, architects Purpose: Understand component responsibilities and internal architecture


Component Overview

This diagram zooms into the API Layer and Business Logic containers to show their internal components and how they interact. Components are organized following clean architecture principles (separation of concerns, dependency inversion).


C4 Level 3 Diagram (Mermaid) - API Layer Components


Component Descriptions - API Layer

1. API Endpoints Component

Responsibilities:

  • Expose RESTful API endpoints
  • Request validation (Pydantic models or Django serializers)
  • Response formatting (JSON)
  • OpenAPI documentation generation

Endpoints Breakdown:

Endpoint GroupCountComponent ClassDescription
Conversations8ConversationEndpointsCRUD operations
Search4SearchEndpointsKeyword, semantic, hybrid
Commits6CommitEndpointsWebhook handling, queries
Analytics10AnalyticsEndpointsTeam metrics, insights
Users6UserEndpointsUser management
Organizations4OrganizationEndpointsOrg settings
Webhooks2WebhookEndpointsGitHub, GitLab

FastAPI Example:

# api/endpoints/conversation_endpoints.py

from fastapi import APIRouter, Depends, HTTPException, status
from uuid import UUID
from typing import List

from core.services.search_service import SearchService
from core.schemas.conversation import ConversationCreate, ConversationResponse
from api.dependencies import get_current_user, get_search_service

router = APIRouter(prefix="/conversations", tags=["conversations"])

@router.post("", response_model=ConversationResponse, status_code=status.HTTP_201_CREATED)
async def create_conversation(
conversation: ConversationCreate,
current_user: User = Depends(get_current_user),
conversation_service: ConversationService = Depends(get_conversation_service)
):
"""Create a new conversation."""
return await conversation_service.create(
organization_id=current_user.organization_id,
user_id=current_user.id,
data=conversation
)

@router.get("/search", response_model=List[ConversationResponse])
async def search_conversations(
q: str,
alpha: float = 0.5,
limit: int = 20,
current_user: User = Depends(get_current_user),
search_service: SearchService = Depends(get_search_service)
):
"""Hybrid search for conversations."""
return await search_service.hybrid_search(
organization_id=current_user.organization_id,
query=q,
alpha=alpha,
limit=limit
)

2. WebSocket Handler Component

Responsibilities:

  • Maintain persistent WebSocket connections
  • Broadcast real-time updates to connected clients
  • Handle connection lifecycle (connect, disconnect, heartbeat)

Use Cases:

  • Notify when new message added to conversation
  • Notify when commit linked to conversation
  • Notify when team analytics updated

FastAPI WebSocket Example:

# api/websocket/conversation_updates.py

from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set
from uuid import UUID

class ConversationConnectionManager:
def __init__(self):
self.active_connections: Dict[UUID, Set[WebSocket]] = {}

async def connect(self, websocket: WebSocket, conversation_id: UUID):
await websocket.accept()
if conversation_id not in self.active_connections:
self.active_connections[conversation_id] = set()
self.active_connections[conversation_id].add(websocket)

async def disconnect(self, websocket: WebSocket, conversation_id: UUID):
self.active_connections[conversation_id].remove(websocket)
if not self.active_connections[conversation_id]:
del self.active_connections[conversation_id]

async def broadcast(self, conversation_id: UUID, message: dict):
if conversation_id in self.active_connections:
for connection in self.active_connections[conversation_id]:
await connection.send_json(message)

manager = ConversationConnectionManager()

@router.websocket("/ws/conversations/{conversation_id}")
async def conversation_websocket(websocket: WebSocket, conversation_id: UUID):
await manager.connect(websocket, conversation_id)
try:
while True:
await websocket.receive_text() # Keep connection alive
except WebSocketDisconnect:
await manager.disconnect(websocket, conversation_id)

3. Authentication Middleware Component

Responsibilities:

  • Extract and validate JWT tokens (standalone) or Django sessions (integrated)
  • Set user context for downstream components
  • Return 401 Unauthorized for invalid/expired tokens

Flow:

1. Extract token from Authorization header
2. Decode JWT and verify signature
3. Check token expiration
4. Load user from database
5. Set current_user in request context
6. Pass request to next middleware/endpoint

FastAPI Dependency Example:

# api/dependencies/auth.py

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt

from core.config import settings
from core.models.user import User
from core.repositories.user_repository import UserRepository

security = HTTPBearer()

async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
user_repo: UserRepository = Depends(get_user_repository)
) -> User:
"""Extract and validate JWT token, return current user."""
try:
payload = jwt.decode(
credentials.credentials,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)

user = await user_repo.get_by_id(UUID(user_id))
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)

return user

4. Rate Limiter Component

Responsibilities:

  • Track API request counts per user/IP
  • Enforce tier-based rate limits
  • Return 429 Too Many Requests when limits exceeded

Rate Limits by Tier:

TierLimitWindowBurst
Starter60 req/min1 minute10 req
Pro300 req/min1 minute50 req
Enterprise1000 req/min1 minute200 req

Implementation (Redis-backed):

# api/middleware/rate_limiter.py

from fastapi import Request, HTTPException, status
from redis import Redis
from datetime import datetime, timedelta

class RateLimiter:
def __init__(self, redis: Redis):
self.redis = redis

async def check_limit(self, user_id: str, tier: str, endpoint: str):
limits = {
"starter": 60,
"pro": 300,
"enterprise": 1000
}

key = f"ratelimit:{user_id}:{endpoint}:{datetime.utcnow().minute}"
current = self.redis.incr(key)

if current == 1:
self.redis.expire(key, 60) # Expire after 1 minute

if current > limits[tier]:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded. Limit: {limits[tier]} requests/minute"
)

5. Error Handler Component

Responsibilities:

  • Catch all exceptions from endpoints/services
  • Format errors consistently (RFC 7807 Problem Details)
  • Log errors with stack traces
  • Hide sensitive details from users

Error Response Format:

{
"type": "https://api.context-intelligence.com/errors/not-found",
"title": "Resource Not Found",
"status": 404,
"detail": "Conversation with ID abc-123 does not exist",
"instance": "/conversations/abc-123",
"trace_id": "7b3f0a2c-8d1e-4f5b-9c3a-1e2d3f4g5h6i"
}

FastAPI Global Exception Handler:

# api/middleware/error_handler.py

from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from uuid import uuid4

app = FastAPI()

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
trace_id = str(uuid4())

logger.error(
f"Unhandled exception: {exc}",
exc_info=True,
extra={"trace_id": trace_id, "path": request.url.path}
)

return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"type": "https://api.context-intelligence.com/errors/internal-server-error",
"title": "Internal Server Error",
"status": 500,
"detail": "An unexpected error occurred. Please try again later.",
"instance": str(request.url),
"trace_id": trace_id
}
)

6. Request Logger Component

Responsibilities:

  • Log all incoming requests (method, path, user, latency)
  • Generate correlation IDs for distributed tracing
  • Redact sensitive data (passwords, tokens) from logs

Structured Logging Example (JSON format for Loki/ELK):

# api/middleware/request_logger.py

import structlog
from fastapi import Request
from time import time
from uuid import uuid4

logger = structlog.get_logger()

async def log_request_middleware(request: Request, call_next):
correlation_id = request.headers.get("X-Correlation-ID", str(uuid4()))
request.state.correlation_id = correlation_id

start_time = time()

logger.info(
"request_started",
method=request.method,
path=request.url.path,
correlation_id=correlation_id,
user_agent=request.headers.get("User-Agent")
)

response = await call_next(request)

latency = (time() - start_time) * 1000 # ms

logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status_code=response.status_code,
latency_ms=latency,
correlation_id=correlation_id
)

response.headers["X-Correlation-ID"] = correlation_id
return response

7. CORS Middleware Component

Responsibilities:

  • Handle cross-origin resource sharing (CORS) headers
  • Allow requests from approved origins (web app domain)
  • Reject requests from unknown origins

Configuration:

# api/middleware/cors.py

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://app.context-intelligence.com", # Production frontend
"https://staging.context-intelligence.com", # Staging frontend
"http://localhost:3000" # Local development
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Authorization", "Content-Type", "X-Organization-ID"],
max_age=3600 # Cache preflight for 1 hour
)

Component Descriptions - Business Logic Layer

1. SearchService Component

Responsibilities:

  • Keyword search (PostgreSQL full-text search)
  • Semantic search (Weaviate vector similarity)
  • Hybrid search (RRF fusion of keyword + semantic)

Methods:

class SearchService:
async def keyword_search(org_id: UUID, query: str, limit: int) -> List[Conversation]
async def semantic_search(org_id: UUID, query: str, limit: int) -> List[Conversation]
async def hybrid_search(org_id: UUID, query: str, alpha: float, limit: int) -> List[Conversation]
def _rrf_fusion(keyword_results, semantic_results, alpha: float) -> List[Conversation]

Algorithm (RRF Fusion):

For each conversation c:
keyword_score = 1 / (k + keyword_rank(c))
semantic_score = 1 / (k + semantic_rank(c))
final_score = alpha * keyword_score + (1 - alpha) * semantic_score

Sort by final_score descending
Return top N

2. CorrelationService Component

Responsibilities:

  • Find conversations that likely led to commits
  • Multi-signal scoring (temporal proximity + semantic similarity + explicit links)

Correlation Algorithm:

For each commit:
1. Find conversations within ±6 hours (temporal window)
2. Calculate semantic similarity (commit message vs conversation content)
3. Calculate final score:
score = 0.6 * temporal_score + 0.3 * semantic_score + 0.1 * explicit_link_score
4. If score > 0.7, create link

Methods:

class CorrelationService:
async def correlate_commit(commit: Commit) -> List[Tuple[Conversation, float]]
async def correlate_conversation(conversation: Conversation) -> List[Tuple[Commit, float]]
def _calculate_temporal_score(time_diff: timedelta) -> float
def _calculate_semantic_score(commit_msg: str, conversation_text: str) -> float

3. AnalyticsService Component

Responsibilities:

  • Team productivity metrics
  • Conversation patterns analysis
  • Usage reports

Metrics Calculated:

  • Team Velocity: Conversations → Commits ratio
  • AI Adoption: % of commits with linked conversations
  • Topic Trends: Most discussed topics (clustering)
  • User Activity: Conversations per user per day

Methods:

class AnalyticsService:
async def team_velocity(org_id: UUID, start_date, end_date) -> Dict
async def topic_trends(org_id: UUID, limit: int) -> List[Dict]
async def user_activity(org_id: UUID, start_date, end_date) -> List[Dict]
async def conversation_patterns(org_id: UUID) -> Dict

4. AuthenticationService Component

Responsibilities:

  • Generate JWT tokens
  • Validate OAuth tokens
  • Handle password hashing/verification

Methods:

class AuthenticationService:
async def login(email: str, password: str) -> Tuple[str, str] # (access_token, refresh_token)
async def oauth_callback(provider: str, code: str) -> User
def generate_jwt(user: User) -> str
def verify_password(plain: str, hashed: str) -> bool
def hash_password(plain: str) -> str

5. AuthorizationService Component

Responsibilities:

  • RBAC enforcement (owner, admin, member, viewer)
  • Feature gating by tier
  • Quota checking

Methods:

class AuthorizationService:
async def check_permission(user: User, action: str, resource: str) -> bool
async def check_feature_access(organization: Organization, feature: str) -> bool
async def check_quota(organization: Organization, resource: str, count: int) -> bool

RBAC Matrix:

ActionOwnerAdminMemberViewer
Create Conversation
View Own Conversations
View All Conversations
Delete ConversationOwn only
Manage Users
Change Subscription

Component Descriptions - Repository Layer

1. ConversationRepository Component

Responsibilities:

  • Abstract database access for conversations
  • Implement RLS (Row-Level Security) enforcement
  • Query optimization (indexes, joins)

Interface:

class ConversationRepository:
async def create(conversation: Conversation) -> Conversation
async def get_by_id(conversation_id: UUID, org_id: UUID) -> Optional[Conversation]
async def list(org_id: UUID, page: int, limit: int) -> List[Conversation]
async def search_by_keyword(org_id: UUID, query: str, limit: int) -> List[Conversation]
async def update(conversation: Conversation) -> Conversation
async def delete(conversation_id: UUID, org_id: UUID) -> bool

RLS Enforcement (every query):

async def get_by_id(self, conversation_id: UUID, org_id: UUID):
# ALWAYS filter by organization_id (multi-tenant isolation)
query = """
SELECT * FROM conversations
WHERE id = $1 AND organization_id = $2
"""
return await self.db.fetchrow(query, conversation_id, org_id)

2. CommitRepository Component

Responsibilities:

  • Store git commits from webhooks
  • Query commits by repository, user, date range

Interface:

class CommitRepository:
async def create(commit: Commit) -> Commit
async def get_by_sha(sha: str, org_id: UUID) -> Optional[Commit]
async def list_by_repository(repo: str, org_id: UUID, limit: int) -> List[Commit]
async def list_by_date_range(org_id: UUID, start: datetime, end: datetime) -> List[Commit]

3. UserRepository Component

Responsibilities:

  • User CRUD operations
  • Role management

Interface:

class UserRepository:
async def create(user: User) -> User
async def get_by_id(user_id: UUID) -> Optional[User]
async def get_by_email(email: str, org_id: UUID) -> Optional[User]
async def update_role(user_id: UUID, role: str) -> User
async def list_by_organization(org_id: UUID) -> List[User]

Component Interaction Flow

Example: Hybrid Search Request

1. Web App → API Endpoints: GET /conversations/search?q=auth&alpha=0.5
2. API Endpoints → Auth Middleware: Validate token
3. Auth Middleware → Auth Service: decode_jwt()
4. Auth Service → User Repository: get_by_id()
5. User Repository → PostgreSQL: SELECT * FROM users WHERE id = ?
6. Auth Middleware → API Endpoints: Set current_user
7. API Endpoints → Rate Limiter: check_limit(user, "pro", "/search")
8. Rate Limiter → Redis: INCR ratelimit:user123:search:minute
9. API Endpoints → Authorization Service: check_feature_access(org, "semantic_search")
10. Authorization Service → Redis: GET feature_gate:org123:semantic_search
11. API Endpoints → Search Service: hybrid_search(org_id, "auth", 0.5, 20)
12. Search Service → [PARALLEL]:
a. Conversation Repository → PostgreSQL: Full-text search
b. Search Service → Weaviate: Vector similarity search
13. Search Service: RRF fusion (merge results)
14. Search Service → API Endpoints: Return results
15. API Endpoints → Request Logger: Log request (200, 85ms)
16. API Endpoints → Web App: 200 OK [{conversations}]

Total Components Involved: 9 Database Queries: 3 (user lookup, keyword search, check cache) External API Calls: 1 (Weaviate)


Component Dependencies

API Endpoints
├─ depends on → Auth Middleware
├─ depends on → Rate Limiter
├─ depends on → Authorization Service
├─ depends on → Search Service
├─ depends on → Correlation Service
├─ depends on → Analytics Service
└─ depends on → Error Handler

Search Service
├─ depends on → Conversation Repository
└─ depends on → Weaviate Client

Correlation Service
├─ depends on → Conversation Repository
└─ depends on → Commit Repository

Analytics Service
└─ depends on → Conversation Repository

Authorization Service
├─ depends on → User Repository
└─ depends on → Redis (feature gates)

Conversation Repository
└─ depends on → PostgreSQL

Commit Repository
└─ depends on → PostgreSQL

User Repository
└─ depends on → PostgreSQL

Testing Strategy by Component

ComponentTest TypeCoverageMocking
API EndpointsIntegration70%+Mock services
Search ServiceUnit90%+Mock repos, Weaviate
Correlation ServiceUnit90%+Mock repos
Analytics ServiceUnit85%+Mock repos
Auth ServiceUnit95%+Mock user repo, JWT lib
Authz ServiceUnit95%+Mock repos, Redis
RepositoriesIntegration85%+Real testcontainers DB

Next Level: Code Diagram (C4 Level 4)

The Component diagram shows what components exist and how they interact. The next level (Code Diagram) will show the actual class structure for a specific component (e.g., SearchService) with methods, attributes, and relationships.

See: c4-l4-code.md


Diagram Maintained By: Engineering Team Last Updated: 2025-11-26 Review Cycle: Monthly Related Documents: