TDD-001: API Layer - Technical Design Document
Version: 1.0.0 Status: Approved Last Updated: 2025-12-28 Author: Hal Casteel
1. Overview
This Technical Design Document specifies the implementation details for the CODITECT Document Management System API layer built with FastAPI.
1.1 Scope
- REST API endpoint design
- Authentication and authorization
- Request/response validation
- Error handling patterns
- Middleware architecture
- Dependency injection
- Rate limiting
- OpenAPI documentation
1.2 Goals
- P95 latency < 100ms for standard operations
- P95 latency < 500ms for vector search operations
- 99.9% API availability
- Complete OpenAPI documentation
- Type-safe request/response handling
Context
The current situation requires a decision because:
- Requirement 1
- Constraint 2
- Need 3
Status
Accepted | YYYY-MM-DD
2. Project Structure
src/backend/
├── api/
│ ├── __init__.py
│ ├── main.py # FastAPI application factory
│ ├── config.py # API configuration
│ ├── dependencies.py # Dependency injection
│ ├── middleware/
│ │ ├── __init__.py
│ │ ├── cors.py # CORS middleware
│ │ ├── logging.py # Request logging
│ │ ├── metrics.py # Prometheus metrics
│ │ ├── rate_limit.py # Rate limiting
│ │ ├── tenant.py # Tenant context
│ │ └── tracing.py # OpenTelemetry
│ ├── routes/
│ │ ├── __init__.py
│ │ ├── auth.py # Authentication endpoints
│ │ ├── documents.py # Document CRUD
│ │ ├── search.py # Semantic search
│ │ ├── chunks.py # Chunk operations
│ │ ├── analytics.py # Metrics & analytics
│ │ ├── admin.py # Admin operations
│ │ └── health.py # Health checks
│ └── exceptions.py # Custom exceptions
├── schemas/
│ ├── __init__.py
│ ├── base.py # Base schemas
│ ├── auth.py # Auth schemas
│ ├── documents.py # Document schemas
│ ├── search.py # Search schemas
│ ├── chunks.py # Chunk schemas
│ └── responses.py # Standard responses
├── services/ # Business logic (existing)
└── models/ # SQLAlchemy models (existing)
3. Application Factory
3.1 Main Application
# src/backend/api/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .config import settings
from .middleware import setup_middleware
from .routes import setup_routes
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
# Startup
await startup_db_pool()
await startup_redis_pool()
await startup_celery()
yield
# Shutdown
await shutdown_db_pool()
await shutdown_redis_pool()
def create_app() -> FastAPI:
"""Application factory."""
app = FastAPI(
title="CODITECT Document Management API",
description="Enterprise document management with semantic search",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan,
)
# Setup middleware
setup_middleware(app)
# Setup routes
setup_routes(app)
return app
app = create_app()
3.2 Configuration
# src/backend/api/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class APISettings(BaseSettings):
"""API configuration."""
model_config = SettingsConfigDict(
env_prefix="API_",
env_file=".env",
)
# Server
host: str = "0.0.0.0"
port: int = 8000
workers: int = 4
debug: bool = False
# CORS
cors_origins: list[str] = ["http://localhost:3000"]
cors_allow_credentials: bool = True
# Rate Limiting
rate_limit_requests: int = 100
rate_limit_window_seconds: int = 60
# Authentication
jwt_secret_key: str
jwt_algorithm: str = "RS256"
jwt_expire_minutes: int = 30
# Database
database_url: str
database_pool_size: int = 20
database_pool_max_overflow: int = 10
# Redis
redis_url: str = "redis://localhost:6379"
# External APIs
openai_api_key: str = ""
voyage_api_key: str = ""
settings = APISettings()
4. Routing Architecture
4.1 Route Registration
# src/backend/api/routes/__init__.py
from fastapi import FastAPI
from .auth import router as auth_router
from .documents import router as documents_router
from .search import router as search_router
from .chunks import router as chunks_router
from .analytics import router as analytics_router
from .health import router as health_router
def setup_routes(app: FastAPI) -> None:
"""Register all API routes."""
app.include_router(
health_router,
prefix="/health",
tags=["Health"],
)
app.include_router(
auth_router,
prefix="/api/v1/auth",
tags=["Authentication"],
)
app.include_router(
documents_router,
prefix="/api/v1/documents",
tags=["Documents"],
)
app.include_router(
search_router,
prefix="/api/v1/search",
tags=["Search"],
)
app.include_router(
chunks_router,
prefix="/api/v1/chunks",
tags=["Chunks"],
)
app.include_router(
analytics_router,
prefix="/api/v1/analytics",
tags=["Analytics"],
)
4.2 API Endpoints
4.2.1 Authentication
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/v1/auth/login | Authenticate user |
| POST | /api/v1/auth/logout | Invalidate token |
| POST | /api/v1/auth/refresh | Refresh JWT token |
| GET | /api/v1/auth/me | Get current user |
4.2.2 Documents
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/documents | List documents |
| POST | /api/v1/documents | Upload document |
| GET | /api/v1/documents/{id} | Get document |
| PUT | /api/v1/documents/{id} | Update document |
| DELETE | /api/v1/documents/{id} | Delete document |
| GET | /api/v1/documents/{id}/chunks | Get document chunks |
| POST | /api/v1/documents/{id}/reprocess | Reprocess document |
4.2.3 Search
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/v1/search | Semantic search |
| POST | /api/v1/search/hybrid | Hybrid search |
| POST | /api/v1/search/graphrag | GraphRAG search |
| GET | /api/v1/search/suggest | Query suggestions |
4.2.4 Analytics
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/analytics/documents | Document metrics |
| GET | /api/v1/analytics/search | Search metrics |
| GET | /api/v1/analytics/usage | Usage metrics |
| GET | /api/v1/analytics/dashboard | Dashboard data |
5. Request/Response Schemas
5.1 Base Schemas
# src/backend/schemas/base.py
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class BaseSchema(BaseModel):
"""Base schema with common configuration."""
model_config = ConfigDict(
from_attributes=True,
populate_by_name=True,
)
class TimestampMixin(BaseModel):
"""Mixin for timestamp fields."""
created_at: datetime
updated_at: datetime
class PaginatedResponse(BaseModel):
"""Paginated response wrapper."""
items: list
total: int
page: int
page_size: int
total_pages: int
class ErrorResponse(BaseModel):
"""Standard error response."""
error: str
message: str
details: dict | None = None
request_id: str | None = None
5.2 Document Schemas
# src/backend/schemas/documents.py
from datetime import datetime
from enum import Enum
from uuid import UUID
from pydantic import BaseModel, Field
from .base import BaseSchema, TimestampMixin
class DocumentStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class DocumentType(str, Enum):
REFERENCE = "reference"
GUIDE = "guide"
ADR = "adr"
WORKFLOW = "workflow"
UNKNOWN = "unknown"
class DocumentCreate(BaseModel):
"""Document upload request."""
filename: str = Field(..., min_length=1, max_length=255)
content_type: str = Field(default="text/markdown")
metadata: dict = Field(default_factory=dict)
class DocumentResponse(BaseSchema, TimestampMixin):
"""Document response."""
id: UUID
filename: str
filepath: str
mime_type: str
file_size: int
status: DocumentStatus
document_type: DocumentType
title: str | None
summary: str | None
chunk_count: int = 0
class DocumentListResponse(BaseModel):
"""Paginated document list."""
items: list[DocumentResponse]
total: int
page: int
page_size: int
5.3 Search Schemas
# src/backend/schemas/search.py
from uuid import UUID
from pydantic import BaseModel, Field
from .base import BaseSchema
class SearchRequest(BaseModel):
"""Search request."""
query: str = Field(..., min_length=1, max_length=10000)
top_k: int = Field(default=10, ge=1, le=100)
min_score: float = Field(default=0.0, ge=0.0, le=1.0)
include_content: bool = True
include_metadata: bool = True
expand_context: bool = False
document_types: list[str] | None = None
class SearchResult(BaseSchema):
"""Single search result."""
chunk_id: UUID
doc_id: UUID
score: float
content: str | None
section_title: str | None
document_title: str | None
highlight: str | None
class SearchResponse(BaseModel):
"""Search response."""
query: str
results: list[SearchResult]
total_results: int
search_time_ms: int
mode: str
6. Dependency Injection
6.1 Core Dependencies
# src/backend/api/dependencies.py
from typing import Annotated, AsyncGenerator
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from ..services.embedding_service import EmbeddingService
from ..services.search_service import VectorSearchService
from ..services.redis_service import RedisService
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
# Database session
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Get database session."""
async with async_session() as session:
try:
yield session
finally:
await session.close()
# Redis client
async def get_redis() -> AsyncGenerator[RedisService, None]:
"""Get Redis client."""
redis = RedisService()
try:
await redis.connect()
yield redis
finally:
await redis.close()
# Current user
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
"""Get authenticated user from JWT token."""
try:
payload = jwt.decode(
token,
settings.jwt_secret_key,
algorithms=[settings.jwt_algorithm],
)
user_id = UUID(payload.get("sub"))
except (JWTError, ValueError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
)
user = await user_service.get_by_id(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return user
# Current tenant
async def get_current_tenant(
user: Annotated[User, Depends(get_current_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> Tenant:
"""Get tenant for current user."""
tenant = await tenant_service.get_by_user(db, user.id)
if not tenant:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No tenant access",
)
return tenant
# Services
async def get_embedding_service() -> EmbeddingService:
"""Get embedding service instance."""
return EmbeddingService(default_model="text-embedding-3-small")
async def get_search_service(
embedding_service: Annotated[EmbeddingService, Depends(get_embedding_service)],
db: Annotated[AsyncSession, Depends(get_db)],
redis: Annotated[RedisService, Depends(get_redis)],
) -> VectorSearchService:
"""Get search service instance."""
return VectorSearchService(
embedding_service=embedding_service,
db_pool=db,
redis_cache=redis,
)
# Type aliases for cleaner signatures
DBSession = Annotated[AsyncSession, Depends(get_db)]
CurrentUser = Annotated[User, Depends(get_current_user)]
CurrentTenant = Annotated[Tenant, Depends(get_current_tenant)]
SearchService = Annotated[VectorSearchService, Depends(get_search_service)]
7. Middleware Architecture
7.1 Middleware Stack
# src/backend/api/middleware/__init__.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from .logging import RequestLoggingMiddleware
from .metrics import PrometheusMiddleware
from .rate_limit import RateLimitMiddleware
from .tenant import TenantContextMiddleware
from .tracing import TracingMiddleware
from ..config import settings
def setup_middleware(app: FastAPI) -> None:
"""Configure middleware stack."""
# CORS (must be first)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=["*"],
allow_headers=["*"],
)
# Compression
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Request logging
app.add_middleware(RequestLoggingMiddleware)
# Prometheus metrics
app.add_middleware(PrometheusMiddleware)
# Distributed tracing
app.add_middleware(TracingMiddleware)
# Rate limiting
app.add_middleware(
RateLimitMiddleware,
requests_per_window=settings.rate_limit_requests,
window_seconds=settings.rate_limit_window_seconds,
)
# Tenant context
app.add_middleware(TenantContextMiddleware)
7.2 Rate Limiting Middleware
# src/backend/api/middleware/rate_limit.py
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
class RateLimitMiddleware(BaseHTTPMiddleware):
"""Rate limiting middleware using Redis sliding window."""
def __init__(
self,
app,
requests_per_window: int = 100,
window_seconds: int = 60,
):
super().__init__(app)
self.requests = requests_per_window
self.window = window_seconds
async def dispatch(self, request: Request, call_next) -> Response:
# Skip rate limiting for health checks
if request.url.path.startswith("/health"):
return await call_next(request)
# Get client identifier
client_id = self._get_client_id(request)
# Check rate limit
redis = request.app.state.redis
is_allowed, remaining = await redis.check_rate_limit(
key=f"ratelimit:{client_id}",
limit=self.requests,
window=self.window,
)
# Add rate limit headers
response = await call_next(request) if is_allowed else JSONResponse(
status_code=429,
content={"error": "rate_limit_exceeded", "message": "Too many requests"},
)
response.headers["X-RateLimit-Limit"] = str(self.requests)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(self.window)
return response
def _get_client_id(self, request: Request) -> str:
"""Get client identifier from request."""
# Prefer authenticated user ID
if hasattr(request.state, "user"):
return f"user:{request.state.user.id}"
# Fall back to IP address
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return f"ip:{forwarded.split(',')[0].strip()}"
return f"ip:{request.client.host}"
8. Error Handling
8.1 Exception Classes
# src/backend/api/exceptions.py
from fastapi import HTTPException, status
class APIException(HTTPException):
"""Base API exception."""
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
error = "internal_error"
message = "An unexpected error occurred"
def __init__(self, message: str | None = None, details: dict | None = None):
super().__init__(
status_code=self.status_code,
detail={
"error": self.error,
"message": message or self.message,
"details": details,
},
)
class NotFoundError(APIException):
status_code = status.HTTP_404_NOT_FOUND
error = "not_found"
message = "Resource not found"
class ValidationError(APIException):
status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
error = "validation_error"
message = "Request validation failed"
class AuthenticationError(APIException):
status_code = status.HTTP_401_UNAUTHORIZED
error = "authentication_error"
message = "Authentication required"
class AuthorizationError(APIException):
status_code = status.HTTP_403_FORBIDDEN
error = "authorization_error"
message = "Access denied"
class RateLimitError(APIException):
status_code = status.HTTP_429_TOO_MANY_REQUESTS
error = "rate_limit_exceeded"
message = "Too many requests"
class ServiceUnavailableError(APIException):
status_code = status.HTTP_503_SERVICE_UNAVAILABLE
error = "service_unavailable"
message = "Service temporarily unavailable"
8.2 Exception Handlers
# In main.py
from fastapi import Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError as PydanticValidationError
@app.exception_handler(APIException)
async def api_exception_handler(request: Request, exc: APIException):
"""Handle API exceptions."""
return JSONResponse(
status_code=exc.status_code,
content=exc.detail,
)
@app.exception_handler(PydanticValidationError)
async def validation_exception_handler(request: Request, exc: PydanticValidationError):
"""Handle Pydantic validation errors."""
return JSONResponse(
status_code=422,
content={
"error": "validation_error",
"message": "Request validation failed",
"details": exc.errors(),
},
)
@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
"""Handle unexpected exceptions."""
logger.exception("Unhandled exception", exc_info=exc)
return JSONResponse(
status_code=500,
content={
"error": "internal_error",
"message": "An unexpected error occurred",
},
)
9. Health Checks
# src/backend/api/routes/health.py
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from ..dependencies import get_db, get_redis
router = APIRouter()
class HealthStatus(BaseModel):
status: str
version: str
database: str
redis: str
celery: str
@router.get("")
async def health_check() -> dict:
"""Basic health check."""
return {"status": "healthy"}
@router.get("/ready")
async def readiness_check(
db=Depends(get_db),
redis=Depends(get_redis),
) -> HealthStatus:
"""Detailed readiness check."""
# Check database
try:
await db.execute("SELECT 1")
db_status = "healthy"
except Exception:
db_status = "unhealthy"
# Check Redis
try:
await redis.ping()
redis_status = "healthy"
except Exception:
redis_status = "unhealthy"
# Check Celery
try:
celery_app.control.ping(timeout=1.0)
celery_status = "healthy"
except Exception:
celery_status = "unhealthy"
overall = "healthy" if all(
s == "healthy" for s in [db_status, redis_status, celery_status]
) else "degraded"
return HealthStatus(
status=overall,
version="1.0.0",
database=db_status,
redis=redis_status,
celery=celery_status,
)
@router.get("/live")
async def liveness_check() -> dict:
"""Kubernetes liveness probe."""
return {"status": "alive"}
10. OpenAPI Documentation
10.1 Custom OpenAPI Schema
# In main.py
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="CODITECT Document Management API",
version="1.0.0",
description="""
## Overview
Enterprise document management with semantic search capabilities.
## Authentication
All endpoints (except health checks) require JWT authentication.
Include the token in the `Authorization` header:
Authorization: Bearer <your_token>
## Rate Limiting
API requests are rate limited per user:
- Default: 100 requests per minute
- Headers: `X-RateLimit-Limit`, `X-RateLimit-Remaining`, `X-RateLimit-Reset`
""",
routes=app.routes,
)
# Security scheme
openapi_schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
}
}
# Apply security globally
openapi_schema["security"] = [{"BearerAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
11. Testing Strategy
11.1 Test Structure
tests/backend/api/
├── conftest.py # Fixtures
├── test_auth.py # Auth endpoint tests
├── test_documents.py # Document endpoint tests
├── test_search.py # Search endpoint tests
├── test_middleware.py # Middleware tests
└── integration/
└── test_full_flow.py # Integration tests
11.2 Test Fixtures
# tests/backend/api/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from src.backend.api.main import create_app
from src.backend.models.base import Base
@pytest.fixture
async def db_session():
"""Create test database session."""
engine = create_async_engine("postgresql+asyncpg://test:test@localhost/test")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSession(engine) as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
def client(db_session):
"""Create test client."""
app = create_app()
app.dependency_overrides[get_db] = lambda: db_session
return TestClient(app)
@pytest.fixture
def auth_headers(client):
"""Get authenticated headers."""
response = client.post("/api/v1/auth/login", json={
"username": "test@example.com",
"password": "testpassword",
})
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
12. Related Documents
13. Revision History
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0.0 | 2025-12-28 | Hal Casteel | Initial TDD creation |