C4 Architecture Diagram - Level 3: Component
CODITECT Context Intelligence Platform
Diagram Level: 3 (Component) Abstraction: Shows internal components within each container Audience: Developers, architects Purpose: Understand component responsibilities and internal architecture
Component Overview
This diagram zooms into the API Layer and Business Logic containers to show their internal components and how they interact. Components are organized following clean architecture principles (separation of concerns, dependency inversion).
C4 Level 3 Diagram (Mermaid) - API Layer Components
Component Descriptions - API Layer
1. API Endpoints Component
Responsibilities:
- Expose RESTful API endpoints
- Request validation (Pydantic models or Django serializers)
- Response formatting (JSON)
- OpenAPI documentation generation
Endpoints Breakdown:
| Endpoint Group | Count | Component Class | Description |
|---|---|---|---|
| Conversations | 8 | ConversationEndpoints | CRUD operations |
| Search | 4 | SearchEndpoints | Keyword, semantic, hybrid |
| Commits | 6 | CommitEndpoints | Webhook handling, queries |
| Analytics | 10 | AnalyticsEndpoints | Team metrics, insights |
| Users | 6 | UserEndpoints | User management |
| Organizations | 4 | OrganizationEndpoints | Org settings |
| Webhooks | 2 | WebhookEndpoints | GitHub, GitLab |
FastAPI Example:
# api/endpoints/conversation_endpoints.py
from fastapi import APIRouter, Depends, HTTPException, status
from uuid import UUID
from typing import List
from core.services.search_service import SearchService
from core.schemas.conversation import ConversationCreate, ConversationResponse
from api.dependencies import get_current_user, get_search_service
router = APIRouter(prefix="/conversations", tags=["conversations"])
@router.post("", response_model=ConversationResponse, status_code=status.HTTP_201_CREATED)
async def create_conversation(
conversation: ConversationCreate,
current_user: User = Depends(get_current_user),
conversation_service: ConversationService = Depends(get_conversation_service)
):
"""Create a new conversation."""
return await conversation_service.create(
organization_id=current_user.organization_id,
user_id=current_user.id,
data=conversation
)
@router.get("/search", response_model=List[ConversationResponse])
async def search_conversations(
q: str,
alpha: float = 0.5,
limit: int = 20,
current_user: User = Depends(get_current_user),
search_service: SearchService = Depends(get_search_service)
):
"""Hybrid search for conversations."""
return await search_service.hybrid_search(
organization_id=current_user.organization_id,
query=q,
alpha=alpha,
limit=limit
)
2. WebSocket Handler Component
Responsibilities:
- Maintain persistent WebSocket connections
- Broadcast real-time updates to connected clients
- Handle connection lifecycle (connect, disconnect, heartbeat)
Use Cases:
- Notify when new message added to conversation
- Notify when commit linked to conversation
- Notify when team analytics updated
FastAPI WebSocket Example:
# api/websocket/conversation_updates.py
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set
from uuid import UUID
class ConversationConnectionManager:
def __init__(self):
self.active_connections: Dict[UUID, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, conversation_id: UUID):
await websocket.accept()
if conversation_id not in self.active_connections:
self.active_connections[conversation_id] = set()
self.active_connections[conversation_id].add(websocket)
async def disconnect(self, websocket: WebSocket, conversation_id: UUID):
self.active_connections[conversation_id].remove(websocket)
if not self.active_connections[conversation_id]:
del self.active_connections[conversation_id]
async def broadcast(self, conversation_id: UUID, message: dict):
if conversation_id in self.active_connections:
for connection in self.active_connections[conversation_id]:
await connection.send_json(message)
manager = ConversationConnectionManager()
@router.websocket("/ws/conversations/{conversation_id}")
async def conversation_websocket(websocket: WebSocket, conversation_id: UUID):
await manager.connect(websocket, conversation_id)
try:
while True:
await websocket.receive_text() # Keep connection alive
except WebSocketDisconnect:
await manager.disconnect(websocket, conversation_id)
3. Authentication Middleware Component
Responsibilities:
- Extract and validate JWT tokens (standalone) or Django sessions (integrated)
- Set user context for downstream components
- Return 401 Unauthorized for invalid/expired tokens
Flow:
1. Extract token from Authorization header
2. Decode JWT and verify signature
3. Check token expiration
4. Load user from database
5. Set current_user in request context
6. Pass request to next middleware/endpoint
FastAPI Dependency Example:
# api/dependencies/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from core.config import settings
from core.models.user import User
from core.repositories.user_repository import UserRepository
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
user_repo: UserRepository = Depends(get_user_repository)
) -> User:
"""Extract and validate JWT token, return current user."""
try:
payload = jwt.decode(
credentials.credentials,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
user = await user_repo.get_by_id(UUID(user_id))
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return user
4. Rate Limiter Component
Responsibilities:
- Track API request counts per user/IP
- Enforce tier-based rate limits
- Return 429 Too Many Requests when limits exceeded
Rate Limits by Tier:
| Tier | Limit | Window | Burst |
|---|---|---|---|
| Starter | 60 req/min | 1 minute | 10 req |
| Pro | 300 req/min | 1 minute | 50 req |
| Enterprise | 1000 req/min | 1 minute | 200 req |
Implementation (Redis-backed):
# api/middleware/rate_limiter.py
from fastapi import Request, HTTPException, status
from redis import Redis
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, redis: Redis):
self.redis = redis
async def check_limit(self, user_id: str, tier: str, endpoint: str):
limits = {
"starter": 60,
"pro": 300,
"enterprise": 1000
}
key = f"ratelimit:{user_id}:{endpoint}:{datetime.utcnow().minute}"
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, 60) # Expire after 1 minute
if current > limits[tier]:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded. Limit: {limits[tier]} requests/minute"
)
5. Error Handler Component
Responsibilities:
- Catch all exceptions from endpoints/services
- Format errors consistently (RFC 7807 Problem Details)
- Log errors with stack traces
- Hide sensitive details from users
Error Response Format:
{
"type": "https://api.context-intelligence.com/errors/not-found",
"title": "Resource Not Found",
"status": 404,
"detail": "Conversation with ID abc-123 does not exist",
"instance": "/conversations/abc-123",
"trace_id": "7b3f0a2c-8d1e-4f5b-9c3a-1e2d3f4g5h6i"
}
FastAPI Global Exception Handler:
# api/middleware/error_handler.py
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from uuid import uuid4
app = FastAPI()
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
trace_id = str(uuid4())
logger.error(
f"Unhandled exception: {exc}",
exc_info=True,
extra={"trace_id": trace_id, "path": request.url.path}
)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"type": "https://api.context-intelligence.com/errors/internal-server-error",
"title": "Internal Server Error",
"status": 500,
"detail": "An unexpected error occurred. Please try again later.",
"instance": str(request.url),
"trace_id": trace_id
}
)
6. Request Logger Component
Responsibilities:
- Log all incoming requests (method, path, user, latency)
- Generate correlation IDs for distributed tracing
- Redact sensitive data (passwords, tokens) from logs
Structured Logging Example (JSON format for Loki/ELK):
# api/middleware/request_logger.py
import structlog
from fastapi import Request
from time import time
from uuid import uuid4
logger = structlog.get_logger()
async def log_request_middleware(request: Request, call_next):
correlation_id = request.headers.get("X-Correlation-ID", str(uuid4()))
request.state.correlation_id = correlation_id
start_time = time()
logger.info(
"request_started",
method=request.method,
path=request.url.path,
correlation_id=correlation_id,
user_agent=request.headers.get("User-Agent")
)
response = await call_next(request)
latency = (time() - start_time) * 1000 # ms
logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status_code=response.status_code,
latency_ms=latency,
correlation_id=correlation_id
)
response.headers["X-Correlation-ID"] = correlation_id
return response
7. CORS Middleware Component
Responsibilities:
- Handle cross-origin resource sharing (CORS) headers
- Allow requests from approved origins (web app domain)
- Reject requests from unknown origins
Configuration:
# api/middleware/cors.py
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://app.context-intelligence.com", # Production frontend
"https://staging.context-intelligence.com", # Staging frontend
"http://localhost:3000" # Local development
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Authorization", "Content-Type", "X-Organization-ID"],
max_age=3600 # Cache preflight for 1 hour
)
Component Descriptions - Business Logic Layer
1. SearchService Component
Responsibilities:
- Keyword search (PostgreSQL full-text search)
- Semantic search (Weaviate vector similarity)
- Hybrid search (RRF fusion of keyword + semantic)
Methods:
class SearchService:
async def keyword_search(org_id: UUID, query: str, limit: int) -> List[Conversation]
async def semantic_search(org_id: UUID, query: str, limit: int) -> List[Conversation]
async def hybrid_search(org_id: UUID, query: str, alpha: float, limit: int) -> List[Conversation]
def _rrf_fusion(keyword_results, semantic_results, alpha: float) -> List[Conversation]
Algorithm (RRF Fusion):
For each conversation c:
keyword_score = 1 / (k + keyword_rank(c))
semantic_score = 1 / (k + semantic_rank(c))
final_score = alpha * keyword_score + (1 - alpha) * semantic_score
Sort by final_score descending
Return top N
2. CorrelationService Component
Responsibilities:
- Find conversations that likely led to commits
- Multi-signal scoring (temporal proximity + semantic similarity + explicit links)
Correlation Algorithm:
For each commit:
1. Find conversations within ±6 hours (temporal window)
2. Calculate semantic similarity (commit message vs conversation content)
3. Calculate final score:
score = 0.6 * temporal_score + 0.3 * semantic_score + 0.1 * explicit_link_score
4. If score > 0.7, create link
Methods:
class CorrelationService:
async def correlate_commit(commit: Commit) -> List[Tuple[Conversation, float]]
async def correlate_conversation(conversation: Conversation) -> List[Tuple[Commit, float]]
def _calculate_temporal_score(time_diff: timedelta) -> float
def _calculate_semantic_score(commit_msg: str, conversation_text: str) -> float
3. AnalyticsService Component
Responsibilities:
- Team productivity metrics
- Conversation patterns analysis
- Usage reports
Metrics Calculated:
- Team Velocity: Conversations → Commits ratio
- AI Adoption: % of commits with linked conversations
- Topic Trends: Most discussed topics (clustering)
- User Activity: Conversations per user per day
Methods:
class AnalyticsService:
async def team_velocity(org_id: UUID, start_date, end_date) -> Dict
async def topic_trends(org_id: UUID, limit: int) -> List[Dict]
async def user_activity(org_id: UUID, start_date, end_date) -> List[Dict]
async def conversation_patterns(org_id: UUID) -> Dict
4. AuthenticationService Component
Responsibilities:
- Generate JWT tokens
- Validate OAuth tokens
- Handle password hashing/verification
Methods:
class AuthenticationService:
async def login(email: str, password: str) -> Tuple[str, str] # (access_token, refresh_token)
async def oauth_callback(provider: str, code: str) -> User
def generate_jwt(user: User) -> str
def verify_password(plain: str, hashed: str) -> bool
def hash_password(plain: str) -> str
5. AuthorizationService Component
Responsibilities:
- RBAC enforcement (owner, admin, member, viewer)
- Feature gating by tier
- Quota checking
Methods:
class AuthorizationService:
async def check_permission(user: User, action: str, resource: str) -> bool
async def check_feature_access(organization: Organization, feature: str) -> bool
async def check_quota(organization: Organization, resource: str, count: int) -> bool
RBAC Matrix:
| Action | Owner | Admin | Member | Viewer |
|---|---|---|---|---|
| Create Conversation | ✅ | ✅ | ✅ | ❌ |
| View Own Conversations | ✅ | ✅ | ✅ | ✅ |
| View All Conversations | ✅ | ✅ | ❌ | ❌ |
| Delete Conversation | ✅ | ✅ | Own only | ❌ |
| Manage Users | ✅ | ✅ | ❌ | ❌ |
| Change Subscription | ✅ | ❌ | ❌ | ❌ |
Component Descriptions - Repository Layer
1. ConversationRepository Component
Responsibilities:
- Abstract database access for conversations
- Implement RLS (Row-Level Security) enforcement
- Query optimization (indexes, joins)
Interface:
class ConversationRepository:
async def create(conversation: Conversation) -> Conversation
async def get_by_id(conversation_id: UUID, org_id: UUID) -> Optional[Conversation]
async def list(org_id: UUID, page: int, limit: int) -> List[Conversation]
async def search_by_keyword(org_id: UUID, query: str, limit: int) -> List[Conversation]
async def update(conversation: Conversation) -> Conversation
async def delete(conversation_id: UUID, org_id: UUID) -> bool
RLS Enforcement (every query):
async def get_by_id(self, conversation_id: UUID, org_id: UUID):
# ALWAYS filter by organization_id (multi-tenant isolation)
query = """
SELECT * FROM conversations
WHERE id = $1 AND organization_id = $2
"""
return await self.db.fetchrow(query, conversation_id, org_id)
2. CommitRepository Component
Responsibilities:
- Store git commits from webhooks
- Query commits by repository, user, date range
Interface:
class CommitRepository:
async def create(commit: Commit) -> Commit
async def get_by_sha(sha: str, org_id: UUID) -> Optional[Commit]
async def list_by_repository(repo: str, org_id: UUID, limit: int) -> List[Commit]
async def list_by_date_range(org_id: UUID, start: datetime, end: datetime) -> List[Commit]
3. UserRepository Component
Responsibilities:
- User CRUD operations
- Role management
Interface:
class UserRepository:
async def create(user: User) -> User
async def get_by_id(user_id: UUID) -> Optional[User]
async def get_by_email(email: str, org_id: UUID) -> Optional[User]
async def update_role(user_id: UUID, role: str) -> User
async def list_by_organization(org_id: UUID) -> List[User]
Component Interaction Flow
Example: Hybrid Search Request
1. Web App → API Endpoints: GET /conversations/search?q=auth&alpha=0.5
2. API Endpoints → Auth Middleware: Validate token
3. Auth Middleware → Auth Service: decode_jwt()
4. Auth Service → User Repository: get_by_id()
5. User Repository → PostgreSQL: SELECT * FROM users WHERE id = ?
6. Auth Middleware → API Endpoints: Set current_user
7. API Endpoints → Rate Limiter: check_limit(user, "pro", "/search")
8. Rate Limiter → Redis: INCR ratelimit:user123:search:minute
9. API Endpoints → Authorization Service: check_feature_access(org, "semantic_search")
10. Authorization Service → Redis: GET feature_gate:org123:semantic_search
11. API Endpoints → Search Service: hybrid_search(org_id, "auth", 0.5, 20)
12. Search Service → [PARALLEL]:
a. Conversation Repository → PostgreSQL: Full-text search
b. Search Service → Weaviate: Vector similarity search
13. Search Service: RRF fusion (merge results)
14. Search Service → API Endpoints: Return results
15. API Endpoints → Request Logger: Log request (200, 85ms)
16. API Endpoints → Web App: 200 OK [{conversations}]
Total Components Involved: 9 Database Queries: 3 (user lookup, keyword search, check cache) External API Calls: 1 (Weaviate)
Component Dependencies
API Endpoints
├─ depends on → Auth Middleware
├─ depends on → Rate Limiter
├─ depends on → Authorization Service
├─ depends on → Search Service
├─ depends on → Correlation Service
├─ depends on → Analytics Service
└─ depends on → Error Handler
Search Service
├─ depends on → Conversation Repository
└─ depends on → Weaviate Client
Correlation Service
├─ depends on → Conversation Repository
└─ depends on → Commit Repository
Analytics Service
└─ depends on → Conversation Repository
Authorization Service
├─ depends on → User Repository
└─ depends on → Redis (feature gates)
Conversation Repository
└─ depends on → PostgreSQL
Commit Repository
└─ depends on → PostgreSQL
User Repository
└─ depends on → PostgreSQL
Testing Strategy by Component
| Component | Test Type | Coverage | Mocking |
|---|---|---|---|
| API Endpoints | Integration | 70%+ | Mock services |
| Search Service | Unit | 90%+ | Mock repos, Weaviate |
| Correlation Service | Unit | 90%+ | Mock repos |
| Analytics Service | Unit | 85%+ | Mock repos |
| Auth Service | Unit | 95%+ | Mock user repo, JWT lib |
| Authz Service | Unit | 95%+ | Mock repos, Redis |
| Repositories | Integration | 85%+ | Real testcontainers DB |
Next Level: Code Diagram (C4 Level 4)
The Component diagram shows what components exist and how they interact. The next level (Code Diagram) will show the actual class structure for a specific component (e.g., SearchService) with methods, attributes, and relationships.
See: c4-l4-code.md
Diagram Maintained By: Engineering Team Last Updated: 2025-11-26 Review Cycle: Monthly Related Documents: