Skip to main content

TDD-001: API Layer - Technical Design Document

Version: 1.0.0 Status: Approved Last Updated: 2025-12-28 Author: Hal Casteel


1. Overview

This Technical Design Document specifies the implementation details for the CODITECT Document Management System API layer built with FastAPI.

1.1 Scope

  • REST API endpoint design
  • Authentication and authorization
  • Request/response validation
  • Error handling patterns
  • Middleware architecture
  • Dependency injection
  • Rate limiting
  • OpenAPI documentation

1.2 Goals

  • P95 latency < 100ms for standard operations
  • P95 latency < 500ms for vector search operations
  • 99.9% API availability
  • Complete OpenAPI documentation
  • Type-safe request/response handling

Context

The current situation requires a decision because:

  • Requirement 1
  • Constraint 2
  • Need 3

Status

Accepted | YYYY-MM-DD

2. Project Structure

src/backend/
├── api/
│ ├── __init__.py
│ ├── main.py # FastAPI application factory
│ ├── config.py # API configuration
│ ├── dependencies.py # Dependency injection
│ ├── middleware/
│ │ ├── __init__.py
│ │ ├── cors.py # CORS middleware
│ │ ├── logging.py # Request logging
│ │ ├── metrics.py # Prometheus metrics
│ │ ├── rate_limit.py # Rate limiting
│ │ ├── tenant.py # Tenant context
│ │ └── tracing.py # OpenTelemetry
│ ├── routes/
│ │ ├── __init__.py
│ │ ├── auth.py # Authentication endpoints
│ │ ├── documents.py # Document CRUD
│ │ ├── search.py # Semantic search
│ │ ├── chunks.py # Chunk operations
│ │ ├── analytics.py # Metrics & analytics
│ │ ├── admin.py # Admin operations
│ │ └── health.py # Health checks
│ └── exceptions.py # Custom exceptions
├── schemas/
│ ├── __init__.py
│ ├── base.py # Base schemas
│ ├── auth.py # Auth schemas
│ ├── documents.py # Document schemas
│ ├── search.py # Search schemas
│ ├── chunks.py # Chunk schemas
│ └── responses.py # Standard responses
├── services/ # Business logic (existing)
└── models/ # SQLAlchemy models (existing)

3. Application Factory

3.1 Main Application

# src/backend/api/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from .config import settings
from .middleware import setup_middleware
from .routes import setup_routes


@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
# Startup
await startup_db_pool()
await startup_redis_pool()
await startup_celery()

yield

# Shutdown
await shutdown_db_pool()
await shutdown_redis_pool()


def create_app() -> FastAPI:
"""Application factory."""
app = FastAPI(
title="CODITECT Document Management API",
description="Enterprise document management with semantic search",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan,
)

# Setup middleware
setup_middleware(app)

# Setup routes
setup_routes(app)

return app


app = create_app()

3.2 Configuration

# src/backend/api/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict


class APISettings(BaseSettings):
"""API configuration."""
model_config = SettingsConfigDict(
env_prefix="API_",
env_file=".env",
)

# Server
host: str = "0.0.0.0"
port: int = 8000
workers: int = 4
debug: bool = False

# CORS
cors_origins: list[str] = ["http://localhost:3000"]
cors_allow_credentials: bool = True

# Rate Limiting
rate_limit_requests: int = 100
rate_limit_window_seconds: int = 60

# Authentication
jwt_secret_key: str
jwt_algorithm: str = "RS256"
jwt_expire_minutes: int = 30

# Database
database_url: str
database_pool_size: int = 20
database_pool_max_overflow: int = 10

# Redis
redis_url: str = "redis://localhost:6379"

# External APIs
openai_api_key: str = ""
voyage_api_key: str = ""


settings = APISettings()

4. Routing Architecture

4.1 Route Registration

# src/backend/api/routes/__init__.py
from fastapi import FastAPI

from .auth import router as auth_router
from .documents import router as documents_router
from .search import router as search_router
from .chunks import router as chunks_router
from .analytics import router as analytics_router
from .health import router as health_router


def setup_routes(app: FastAPI) -> None:
"""Register all API routes."""
app.include_router(
health_router,
prefix="/health",
tags=["Health"],
)
app.include_router(
auth_router,
prefix="/api/v1/auth",
tags=["Authentication"],
)
app.include_router(
documents_router,
prefix="/api/v1/documents",
tags=["Documents"],
)
app.include_router(
search_router,
prefix="/api/v1/search",
tags=["Search"],
)
app.include_router(
chunks_router,
prefix="/api/v1/chunks",
tags=["Chunks"],
)
app.include_router(
analytics_router,
prefix="/api/v1/analytics",
tags=["Analytics"],
)

4.2 API Endpoints

4.2.1 Authentication

MethodEndpointDescription
POST/api/v1/auth/loginAuthenticate user
POST/api/v1/auth/logoutInvalidate token
POST/api/v1/auth/refreshRefresh JWT token
GET/api/v1/auth/meGet current user

4.2.2 Documents

MethodEndpointDescription
GET/api/v1/documentsList documents
POST/api/v1/documentsUpload document
GET/api/v1/documents/{id}Get document
PUT/api/v1/documents/{id}Update document
DELETE/api/v1/documents/{id}Delete document
GET/api/v1/documents/{id}/chunksGet document chunks
POST/api/v1/documents/{id}/reprocessReprocess document
MethodEndpointDescription
POST/api/v1/searchSemantic search
POST/api/v1/search/hybridHybrid search
POST/api/v1/search/graphragGraphRAG search
GET/api/v1/search/suggestQuery suggestions

4.2.4 Analytics

MethodEndpointDescription
GET/api/v1/analytics/documentsDocument metrics
GET/api/v1/analytics/searchSearch metrics
GET/api/v1/analytics/usageUsage metrics
GET/api/v1/analytics/dashboardDashboard data

5. Request/Response Schemas

5.1 Base Schemas

# src/backend/schemas/base.py
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict


class BaseSchema(BaseModel):
"""Base schema with common configuration."""
model_config = ConfigDict(
from_attributes=True,
populate_by_name=True,
)


class TimestampMixin(BaseModel):
"""Mixin for timestamp fields."""
created_at: datetime
updated_at: datetime


class PaginatedResponse(BaseModel):
"""Paginated response wrapper."""
items: list
total: int
page: int
page_size: int
total_pages: int


class ErrorResponse(BaseModel):
"""Standard error response."""
error: str
message: str
details: dict | None = None
request_id: str | None = None

5.2 Document Schemas

# src/backend/schemas/documents.py
from datetime import datetime
from enum import Enum
from uuid import UUID
from pydantic import BaseModel, Field

from .base import BaseSchema, TimestampMixin


class DocumentStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"


class DocumentType(str, Enum):
REFERENCE = "reference"
GUIDE = "guide"
ADR = "adr"
WORKFLOW = "workflow"
UNKNOWN = "unknown"


class DocumentCreate(BaseModel):
"""Document upload request."""
filename: str = Field(..., min_length=1, max_length=255)
content_type: str = Field(default="text/markdown")
metadata: dict = Field(default_factory=dict)


class DocumentResponse(BaseSchema, TimestampMixin):
"""Document response."""
id: UUID
filename: str
filepath: str
mime_type: str
file_size: int
status: DocumentStatus
document_type: DocumentType
title: str | None
summary: str | None
chunk_count: int = 0


class DocumentListResponse(BaseModel):
"""Paginated document list."""
items: list[DocumentResponse]
total: int
page: int
page_size: int

5.3 Search Schemas

# src/backend/schemas/search.py
from uuid import UUID
from pydantic import BaseModel, Field

from .base import BaseSchema


class SearchRequest(BaseModel):
"""Search request."""
query: str = Field(..., min_length=1, max_length=10000)
top_k: int = Field(default=10, ge=1, le=100)
min_score: float = Field(default=0.0, ge=0.0, le=1.0)
include_content: bool = True
include_metadata: bool = True
expand_context: bool = False
document_types: list[str] | None = None


class SearchResult(BaseSchema):
"""Single search result."""
chunk_id: UUID
doc_id: UUID
score: float
content: str | None
section_title: str | None
document_title: str | None
highlight: str | None


class SearchResponse(BaseModel):
"""Search response."""
query: str
results: list[SearchResult]
total_results: int
search_time_ms: int
mode: str

6. Dependency Injection

6.1 Core Dependencies

# src/backend/api/dependencies.py
from typing import Annotated, AsyncGenerator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession

from ..services.embedding_service import EmbeddingService
from ..services.search_service import VectorSearchService
from ..services.redis_service import RedisService


# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")


# Database session
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Get database session."""
async with async_session() as session:
try:
yield session
finally:
await session.close()


# Redis client
async def get_redis() -> AsyncGenerator[RedisService, None]:
"""Get Redis client."""
redis = RedisService()
try:
await redis.connect()
yield redis
finally:
await redis.close()


# Current user
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
"""Get authenticated user from JWT token."""
try:
payload = jwt.decode(
token,
settings.jwt_secret_key,
algorithms=[settings.jwt_algorithm],
)
user_id = UUID(payload.get("sub"))
except (JWTError, ValueError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
)

user = await user_service.get_by_id(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)

return user


# Current tenant
async def get_current_tenant(
user: Annotated[User, Depends(get_current_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> Tenant:
"""Get tenant for current user."""
tenant = await tenant_service.get_by_user(db, user.id)
if not tenant:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No tenant access",
)
return tenant


# Services
async def get_embedding_service() -> EmbeddingService:
"""Get embedding service instance."""
return EmbeddingService(default_model="text-embedding-3-small")


async def get_search_service(
embedding_service: Annotated[EmbeddingService, Depends(get_embedding_service)],
db: Annotated[AsyncSession, Depends(get_db)],
redis: Annotated[RedisService, Depends(get_redis)],
) -> VectorSearchService:
"""Get search service instance."""
return VectorSearchService(
embedding_service=embedding_service,
db_pool=db,
redis_cache=redis,
)


# Type aliases for cleaner signatures
DBSession = Annotated[AsyncSession, Depends(get_db)]
CurrentUser = Annotated[User, Depends(get_current_user)]
CurrentTenant = Annotated[Tenant, Depends(get_current_tenant)]
SearchService = Annotated[VectorSearchService, Depends(get_search_service)]

7. Middleware Architecture

7.1 Middleware Stack

# src/backend/api/middleware/__init__.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware

from .logging import RequestLoggingMiddleware
from .metrics import PrometheusMiddleware
from .rate_limit import RateLimitMiddleware
from .tenant import TenantContextMiddleware
from .tracing import TracingMiddleware
from ..config import settings


def setup_middleware(app: FastAPI) -> None:
"""Configure middleware stack."""

# CORS (must be first)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=["*"],
allow_headers=["*"],
)

# Compression
app.add_middleware(GZipMiddleware, minimum_size=1000)

# Request logging
app.add_middleware(RequestLoggingMiddleware)

# Prometheus metrics
app.add_middleware(PrometheusMiddleware)

# Distributed tracing
app.add_middleware(TracingMiddleware)

# Rate limiting
app.add_middleware(
RateLimitMiddleware,
requests_per_window=settings.rate_limit_requests,
window_seconds=settings.rate_limit_window_seconds,
)

# Tenant context
app.add_middleware(TenantContextMiddleware)

7.2 Rate Limiting Middleware

# src/backend/api/middleware/rate_limit.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse


class RateLimitMiddleware(BaseHTTPMiddleware):
"""Rate limiting middleware using Redis sliding window."""

def __init__(
self,
app,
requests_per_window: int = 100,
window_seconds: int = 60,
):
super().__init__(app)
self.requests = requests_per_window
self.window = window_seconds

async def dispatch(self, request: Request, call_next) -> Response:
# Skip rate limiting for health checks
if request.url.path.startswith("/health"):
return await call_next(request)

# Get client identifier
client_id = self._get_client_id(request)

# Check rate limit
redis = request.app.state.redis
is_allowed, remaining = await redis.check_rate_limit(
key=f"ratelimit:{client_id}",
limit=self.requests,
window=self.window,
)

# Add rate limit headers
response = await call_next(request) if is_allowed else JSONResponse(
status_code=429,
content={"error": "rate_limit_exceeded", "message": "Too many requests"},
)

response.headers["X-RateLimit-Limit"] = str(self.requests)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(self.window)

return response

def _get_client_id(self, request: Request) -> str:
"""Get client identifier from request."""
# Prefer authenticated user ID
if hasattr(request.state, "user"):
return f"user:{request.state.user.id}"

# Fall back to IP address
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return f"ip:{forwarded.split(',')[0].strip()}"

return f"ip:{request.client.host}"

8. Error Handling

8.1 Exception Classes

# src/backend/api/exceptions.py
from fastapi import HTTPException, status


class APIException(HTTPException):
"""Base API exception."""
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
error = "internal_error"
message = "An unexpected error occurred"

def __init__(self, message: str | None = None, details: dict | None = None):
super().__init__(
status_code=self.status_code,
detail={
"error": self.error,
"message": message or self.message,
"details": details,
},
)


class NotFoundError(APIException):
status_code = status.HTTP_404_NOT_FOUND
error = "not_found"
message = "Resource not found"


class ValidationError(APIException):
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
error = "validation_error"
message = "Request validation failed"


class AuthenticationError(APIException):
status_code = status.HTTP_401_UNAUTHORIZED
error = "authentication_error"
message = "Authentication required"


class AuthorizationError(APIException):
status_code = status.HTTP_403_FORBIDDEN
error = "authorization_error"
message = "Access denied"


class RateLimitError(APIException):
status_code = status.HTTP_429_TOO_MANY_REQUESTS
error = "rate_limit_exceeded"
message = "Too many requests"


class ServiceUnavailableError(APIException):
status_code = status.HTTP_503_SERVICE_UNAVAILABLE
error = "service_unavailable"
message = "Service temporarily unavailable"

8.2 Exception Handlers

# In main.py
from fastapi import Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError as PydanticValidationError


@app.exception_handler(APIException)
async def api_exception_handler(request: Request, exc: APIException):
"""Handle API exceptions."""
return JSONResponse(
status_code=exc.status_code,
content=exc.detail,
)


@app.exception_handler(PydanticValidationError)
async def validation_exception_handler(request: Request, exc: PydanticValidationError):
"""Handle Pydantic validation errors."""
return JSONResponse(
status_code=422,
content={
"error": "validation_error",
"message": "Request validation failed",
"details": exc.errors(),
},
)


@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
"""Handle unexpected exceptions."""
logger.exception("Unhandled exception", exc_info=exc)
return JSONResponse(
status_code=500,
content={
"error": "internal_error",
"message": "An unexpected error occurred",
},
)

9. Health Checks

# src/backend/api/routes/health.py
from fastapi import APIRouter, Depends
from pydantic import BaseModel

from ..dependencies import get_db, get_redis


router = APIRouter()


class HealthStatus(BaseModel):
status: str
version: str
database: str
redis: str
celery: str


@router.get("")
async def health_check() -> dict:
"""Basic health check."""
return {"status": "healthy"}


@router.get("/ready")
async def readiness_check(
db=Depends(get_db),
redis=Depends(get_redis),
) -> HealthStatus:
"""Detailed readiness check."""
# Check database
try:
await db.execute("SELECT 1")
db_status = "healthy"
except Exception:
db_status = "unhealthy"

# Check Redis
try:
await redis.ping()
redis_status = "healthy"
except Exception:
redis_status = "unhealthy"

# Check Celery
try:
celery_app.control.ping(timeout=1.0)
celery_status = "healthy"
except Exception:
celery_status = "unhealthy"

overall = "healthy" if all(
s == "healthy" for s in [db_status, redis_status, celery_status]
) else "degraded"

return HealthStatus(
status=overall,
version="1.0.0",
database=db_status,
redis=redis_status,
celery=celery_status,
)


@router.get("/live")
async def liveness_check() -> dict:
"""Kubernetes liveness probe."""
return {"status": "alive"}

10. OpenAPI Documentation

10.1 Custom OpenAPI Schema

# In main.py
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema

openapi_schema = get_openapi(
title="CODITECT Document Management API",
version="1.0.0",
description="""
## Overview

Enterprise document management with semantic search capabilities.

## Authentication

All endpoints (except health checks) require JWT authentication.
Include the token in the `Authorization` header:

Authorization: Bearer <your_token>


## Rate Limiting

API requests are rate limited per user:
- Default: 100 requests per minute
- Headers: `X-RateLimit-Limit`, `X-RateLimit-Remaining`, `X-RateLimit-Reset`
""",
routes=app.routes,
)

# Security scheme
openapi_schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
}
}

# Apply security globally
openapi_schema["security"] = [{"BearerAuth": []}]

app.openapi_schema = openapi_schema
return app.openapi_schema


app.openapi = custom_openapi

11. Testing Strategy

11.1 Test Structure

tests/backend/api/
├── conftest.py # Fixtures
├── test_auth.py # Auth endpoint tests
├── test_documents.py # Document endpoint tests
├── test_search.py # Search endpoint tests
├── test_middleware.py # Middleware tests
└── integration/
└── test_full_flow.py # Integration tests

11.2 Test Fixtures

# tests/backend/api/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

from src.backend.api.main import create_app
from src.backend.models.base import Base


@pytest.fixture
async def db_session():
"""Create test database session."""
engine = create_async_engine("postgresql+asyncpg://test:test@localhost/test")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

async with AsyncSession(engine) as session:
yield session

async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)


@pytest.fixture
def client(db_session):
"""Create test client."""
app = create_app()
app.dependency_overrides[get_db] = lambda: db_session
return TestClient(app)


@pytest.fixture
def auth_headers(client):
"""Get authenticated headers."""
response = client.post("/api/v1/auth/login", json={
"username": "test@example.com",
"password": "testpassword",
})
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}


13. Revision History

VersionDateAuthorChanges
1.0.02025-12-28Hal CasteelInitial TDD creation