Architecture Decision Records
---------- Use FastAPI for Backend Framework 2025-10-31 ADR-002 Accepted Deploy on Google Kubernetes Engine 2025-10-31 ADR-004 Accepted...
Architecture Decision Records
ADR Index
| ADR | Title | Status | Date |
|---|---|---|---|
| ADR-001 | Use FastAPI for Backend Framework | Accepted | 2025-10-31 |
| ADR-002 | WebSocket for Real-Time Communication | Accepted | 2025-10-31 |
| ADR-003 | Deploy on Google Kubernetes Engine | Accepted | 2025-10-31 |
| ADR-004 | Use Anthropic Claude for AI Analysis | Accepted | 2025-10-31 |
| ADR-005 | Event-Driven Architecture with Pub/Sub | Accepted | 2025-10-31 |
| ADR-006 | React with TypeScript for Frontend | Accepted | 2025-10-31 |
| ADR-007 | Zustand for State Management | Accepted | 2025-10-31 |
| ADR-008 | PostgreSQL for Primary Database | Accepted | 2025-10-31 |
| ADR-009 | Document Versioning Strategy | Accepted | 2025-11-01 |
| ADR-010 | Multi-Tenancy Approach | Accepted | 2025-11-01 |
| ADR-011 | Backup and Disaster Recovery | Accepted | 2025-11-01 |
| ADR-012 | API Rate Limiting Implementation | Accepted | 2025-11-01 |
| ADR-013 | Logging and Audit Trail Format | Accepted | 2025-11-01 |
| ADR-014 | CI/CD Pipeline Architecture | Accepted | 2025-11-01 |
| ADR-015 | Secret Management Approach | Accepted | 2025-11-01 |
ADR-001: Use FastAPI for Backend Framework
Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team
Context
Need to select a Python web framework for building REST API and WebSocket endpoints with high performance and modern async support.
Decision
Use FastAPI as the primary backend framework.
Rationale
Pros:
- Native async/await support for high concurrency
- Automatic OpenAPI documentation generation
- Built-in request validation with Pydantic
- WebSocket support out of the box
- Excellent performance (comparable to Node.js/Go)
- Type hints improve code quality and IDE support
- Large ecosystem and active community
Cons:
- Relatively newer than Django/Flask
- Less mature ecosystem for some use cases
- Smaller pool of developers familiar with it
Alternatives Considered
-
Django + Django Channels
- Pros: Mature, full-featured, ORM
- Cons: Heavier, synchronous by default, more complex setup
-
Flask + Flask-SocketIO
- Pros: Lightweight, flexible
- Cons: Manual setup for async, no auto-documentation
-
Sanic
- Pros: Very fast, async-first
- Cons: Smaller community, less tooling
Consequences
Positive:
- Fast development with automatic API docs
- High performance for concurrent operations
- Type safety reduces bugs
- Easy testing with async support
Negative:
- Team needs to learn FastAPI patterns
- Some third-party integrations may be immature
Implementation Notes
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title="PDF Analysis API",
version="1.0.0",
docs_url="/api/docs"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
ADR-002: WebSocket for Real-Time Communication
Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team
Context
Need real-time updates for PDF processing progress and analysis results without client polling.
Decision
Use WebSocket protocol for bidirectional real-time communication between frontend and backend.
Rationale
Pros:
- True real-time bidirectional communication
- Lower latency than polling
- Reduced server load compared to HTTP polling
- Native browser support
- Works well with FastAPI
Cons:
- More complex than REST
- Requires connection management
- Scaling requires sticky sessions or message broker
- Debugging more difficult than HTTP
Alternatives Considered
-
HTTP Polling
- Pros: Simple, stateless
- Cons: High latency, inefficient, server load
-
Server-Sent Events (SSE)
- Pros: Simple, HTTP-based
- Cons: One-way only, no binary support
-
gRPC Streaming
- Pros: Efficient, bidirectional
- Cons: Complex setup, limited browser support
Consequences
Positive:
- Instant progress updates for users
- Better user experience with real-time feedback
- Reduced API calls and server load
Negative:
- Need connection recovery logic
- Requires Redis pub/sub for multi-instance scaling
- More complex error handling
Implementation Strategy
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set
import json
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
if user_id not in self.active_connections:
self.active_connections[user_id] = set()
self.active_connections[user_id].add(websocket)
def disconnect(self, websocket: WebSocket, user_id: str):
if user_id in self.active_connections:
self.active_connections[user_id].discard(websocket)
async def send_personal_message(
self,
message: dict,
user_id: str
):
if user_id in self.active_connections:
for connection in self.active_connections[user_id]:
await connection.send_text(json.dumps(message))
ADR-003: Deploy on Google Kubernetes Engine
Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team, DevOps Team
Context
Need scalable, managed container orchestration platform for production deployment.
Decision
Deploy application on Google Kubernetes Engine (GKE) with Autopilot mode.
Rationale
Pros:
- Managed Kubernetes reduces operational overhead
- Autopilot mode eliminates node management
- Native integration with GCP services (Cloud SQL, GCS, etc.)
- Built-in monitoring and logging
- Auto-scaling capabilities
- High availability across zones
- Cost-effective with commitment discounts
Cons:
- Vendor lock-in to GCP
- Learning curve for Kubernetes
- More complex than simpler PaaS options
Alternatives Considered
-
AWS EKS
- Pros: AWS ecosystem, mature
- Cons: More expensive, manual node management
-
Cloud Run
- Pros: Simpler, serverless
- Cons: Limited for WebSocket, less control
-
Google Compute Engine VMs
- Pros: Full control
- Cons: Manual scaling, no container orchestration
-
Azure AKS
- Pros: Good pricing, Azure integration
- Cons: Less mature than GKE
Consequences
Positive:
- Easy horizontal scaling
- High availability and fault tolerance
- Integrated monitoring and logging
- GitOps-friendly deployment
- Resource efficiency with Autopilot
Negative:
- Kubernetes complexity for team
- Need to manage YAML configurations
- Debugging can be challenging
Implementation Approach
# GKE Autopilot cluster
apiVersion: container.cnrm.cloud.google.com/v1beta1
kind: ContainerCluster
metadata:
name: pdf-analysis-cluster
spec:
location: us-central1
enableAutopilot: true
releaseChannel:
channel: REGULAR
networkingMode: VPC_NATIVE
workloadIdentityConfig:
workloadPool: project-id.svc.id.goog
ADR-004: Use Anthropic Claude for AI Analysis
Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team, AI Team
Context
Need powerful LLM for intelligent PDF content analysis, extraction, and cross-validation.
Decision
Use Anthropic Claude (Sonnet 4) as the primary AI model for document analysis.
Rationale
Pros:
- Excellent at structured data extraction
- Strong reasoning capabilities for validation
- Large context window (200K tokens)
- JSON mode for structured outputs
- Reliable API with good uptime
- Strong safety and alignment
- Competitive pricing
Cons:
- API dependency and cost
- Rate limits for high volume
- No self-hosted option
Alternatives Considered
-
OpenAI GPT-4
- Pros: Well-known, good performance
- Cons: Shorter context, more expensive
-
Google Vertex AI (PaLM)
- Pros: GCP integration, lower cost
- Cons: Less capable for extraction tasks
-
Open-Source Models (Llama, Mistral)
- Pros: Self-hosted, no API costs
- Cons: Require GPU infrastructure, lower quality
Consequences
Positive:
- High-quality extraction and analysis
- Large context handles full documents
- Structured output reduces parsing errors
- Reliable service with good support
Negative:
- Ongoing API costs scale with usage
- Latency depends on external service
- Need fallback for API outages
Cost Analysis
# Estimated costs per document
COST_PER_1M_INPUT_TOKENS = 3.00 # USD
COST_PER_1M_OUTPUT_TOKENS = 15.00 # USD
# Average 10-page PDF
avg_input_tokens = 8_000 # 800 tokens/page
avg_output_tokens = 2_000 # Structured extraction
cost_per_document = (
(avg_input_tokens / 1_000_000 * COST_PER_1M_INPUT_TOKENS) +
(avg_output_tokens / 1_000_000 * COST_PER_1M_OUTPUT_TOKENS)
)
# ~$0.05 per document
Token Budget Strategy
- Structure analysis: 2,000 tokens
- Component extraction: 4,000 tokens/page
- Cross-validation: 3,000 tokens
- Total budget: 100,000 tokens/document max
ADR-005: Event-Driven Architecture with Pub/Sub
Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team
Context
Need decoupled architecture for processing pipeline with fault tolerance and scalability.
Decision
Implement event-driven architecture using Redis Pub/Sub for inter-service communication.
Rationale
Pros:
- Loose coupling between services
- Easy to add new consumers
- Fault tolerance through retry mechanisms
- Scalable processing pipeline
- Asynchronous processing
- Event sourcing enables audit trails
Cons:
- More complex than direct calls
- Debugging distributed events harder
- Need event schema management
- Eventual consistency
Alternatives Considered
-
Direct Service Calls (gRPC)
- Pros: Simple, synchronous
- Cons: Tight coupling, cascading failures
-
Cloud Pub/Sub
- Pros: Managed, scalable
- Cons: Higher cost, external dependency
-
RabbitMQ
- Pros: Feature-rich, mature
- Cons: More infrastructure, heavier
Event Schema
from enum import Enum
from pydantic import BaseModel
from datetime import datetime
class EventType(str, Enum):
DOCUMENT_UPLOADED = "document.uploaded"
PROCESSING_STARTED = "processing.started"
PROCESSING_COMPLETED = "processing.completed"
PROCESSING_FAILED = "processing.failed"
ANALYSIS_REQUESTED = "analysis.requested"
ANALYSIS_COMPLETED = "analysis.completed"
class Event(BaseModel):
event_id: str
event_type: EventType
timestamp: datetime
user_id: str
document_id: str
data: dict
metadata: dict = {}
Implementation
import redis.asyncio as redis
import json
class EventBus:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def publish(self, event: Event):
"""Publish event to channel"""
channel = f"events.{event.event_type}"
await self.redis.publish(
channel,
event.model_dump_json()
)
async def subscribe(self, event_type: EventType, handler):
"""Subscribe to event channel"""
channel = f"events.{event_type}"
pubsub = self.redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message['type'] == 'message':
event = Event.model_validate_json(message['data'])
await handler(event)
ADR-006: React with TypeScript for Frontend
Status: Accepted
Date: 2025-10-31
Deciders: Frontend Team
Context
Need modern, performant frontend framework for building interactive file management UI.
Decision
Use React 18 with TypeScript for frontend development.
Rationale
Pros:
- Component-based architecture
- Large ecosystem and community
- Excellent TypeScript support
- Virtual DOM for performance
- Concurrent rendering features
- Strong tooling (ESLint, Prettier, etc.)
- Server components (future)
Cons:
- Boilerplate compared to Vue
- Need additional libraries for routing, state
- Learning curve for new developers
Alternatives Considered
-
Vue 3
- Pros: Simpler, less boilerplate
- Cons: Smaller ecosystem, less TypeScript maturity
-
Svelte
- Pros: No virtual DOM, smaller bundles
- Cons: Smaller community, fewer libraries
-
Angular
- Pros: Full-featured, TypeScript-first
- Cons: Heavy, steep learning curve
Technology Stack
{
"dependencies": {
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-router-dom": "^6.18.0",
"zustand": "^4.4.0",
"@mui/material": "^5.14.0",
"axios": "^1.5.0"
},
"devDependencies": {
"@types/react": "^18.2.0",
"@types/react-dom": "^18.2.0",
"typescript": "^5.2.0",
"vite": "^5.0.0",
"vitest": "^1.0.0"
}
}
ADR-007: Zustand for State Management
Status: Accepted
Date: 2025-10-31
Deciders: Frontend Team
Context
Need lightweight state management solution for React application.
Decision
Use Zustand for global state management.
Rationale
Pros:
- Minimal boilerplate
- No Provider wrapper needed
- TypeScript support
- Small bundle size (1KB)
- Hook-based API
- Middleware support
- Easy to test
Cons:
- Less features than Redux
- Smaller community
- Fewer DevTools integrations
Alternatives Considered
-
Redux Toolkit
- Pros: Industry standard, DevTools
- Cons: More boilerplate, larger bundle
-
Recoil
- Pros: Atom-based, fine-grained updates
- Cons: More complex, Facebook dependency
-
Context API + useReducer
- Pros: Built-in, no dependencies
- Cons: Performance issues, verbose
Example Store
import { create } from 'zustand';
import { persist } from 'zustand/middleware';
interface AppState {
user: User | null;
documents: Document[];
setUser: (user: User | null) => void;
addDocument: (doc: Document) => void;
}
export const useAppStore = create<AppState>()(
persist(
(set) => ({
user: null,
documents: [],
setUser: (user) => set({ user }),
addDocument: (doc) => set((state) => ({
documents: [...state.documents, doc]
})),
}),
{
name: 'app-storage',
}
)
);
ADR-008: PostgreSQL for Primary Database
Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team, Database Team
Context
Need relational database for storing document metadata, user data, and analysis results.
Decision
Use PostgreSQL 15 via Cloud SQL as the primary database.
Rationale
Pros:
- ACID compliance
- JSON/JSONB support for flexible schemas
- Full-text search capabilities
- Excellent performance
- Strong consistency
- Mature ecosystem
- GCP managed service
- Point-in-time recovery
Cons:
- Vertical scaling limitations
- Sharding complexity for huge scale
- Not ideal for time-series data
Alternatives Considered
-
MySQL
- Pros: Simple, popular
- Cons: Less feature-rich, JSON support inferior
-
MongoDB
- Pros: Flexible schema, horizontal scaling
- Cons: No transactions (older versions), consistency issues
-
Cloud Spanner
- Pros: Global scale, strong consistency
- Cons: Very expensive, overkill for MVP
Schema Design Principles
- Use UUIDs for primary keys
- JSONB for flexible metadata
- Proper indexing for performance
- Foreign keys for referential integrity
- Timestamps for all records
Connection Pooling
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
ADR-009: Document Versioning Strategy
Status: Accepted Date: 2025-11-01 Deciders: Architecture Team
Context
Documents may be updated or reprocessed with improved AI models. Need strategy for managing document versions and analysis history.
Decision
Implement append-only versioning with immutable document records and soft deletes.
Rationale
Pros:
- Complete audit trail of changes
- Easy rollback to previous versions
- No data loss
- Supports compliance requirements
- Can compare analysis results across versions
Cons:
- Increased storage requirements
- More complex queries
- Need cleanup strategy for old versions
Alternatives Considered
-
Overwrite Strategy
- Pros: Simple, less storage
- Cons: No history, no rollback
-
Snapshot-Based Versioning
- Pros: Full snapshots at each version
- Cons: Very high storage cost
-
Delta-Based Versioning
- Pros: Storage efficient
- Cons: Complex, slower retrieval
Implementation
class Document(Base):
__tablename__ = "documents"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
version = Column(Integer, nullable=False, default=1)
parent_id = Column(UUID, ForeignKey("documents.id"))
# Soft delete
deleted_at = Column(DateTime)
is_current = Column(Boolean, default=True, index=True)
# Version metadata
version_created_at = Column(DateTime, default=datetime.utcnow)
version_created_by = Column(UUID, ForeignKey("users.id"))
version_notes = Column(Text)
__table_args__ = (
Index('idx_document_parent_version', 'parent_id', 'version'),
Index('idx_document_current', 'parent_id', 'is_current'),
)
Version Management Rules
- New upload creates version 1
- Reprocessing creates new version with same parent_id
- Only latest version has is_current=True
- Soft delete sets deleted_at, preserves record
- Hard delete after 90 days retention period
API Endpoints
GET /api/v1/documents/{id}/versions # List all versions
GET /api/v1/documents/{id}/versions/{v} # Get specific version
POST /api/v1/documents/{id}/reprocess # Create new version
ADR-010: Multi-Tenancy Approach
Status: Accepted Date: 2025-11-01 Deciders: Architecture Team
Context
Platform needs to support multiple organizations/teams with data isolation and resource quotas.
Decision
Implement shared-database, row-level multi-tenancy with organization-based partitioning.
Rationale
Pros:
- Cost-effective (shared infrastructure)
- Easy to manage and deploy
- Simple scaling
- Row-level security ensures isolation
- Good balance of isolation and efficiency
Cons:
- Less isolation than separate databases
- Potential for data leakage bugs
- Noisy neighbor issues
- Schema changes affect all tenants
Alternatives Considered
-
Separate Database Per Tenant
- Pros: Complete isolation, custom schemas
- Cons: High cost, complex management
-
Separate Schema Per Tenant
- Pros: Better isolation, same DB
- Cons: Migration complexity, connection pool issues
-
Kubernetes Namespace Per Tenant
- Pros: Strong isolation
- Cons: Very expensive, complex
Implementation
class Organization(Base):
__tablename__ = "organizations"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
name = Column(String(255), nullable=False)
slug = Column(String(100), unique=True, nullable=False, index=True)
# Quotas
max_users = Column(Integer, default=10)
max_documents = Column(Integer, default=1000)
max_storage_bytes = Column(BigInteger, default=10_737_418_240) # 10GB
max_api_calls_per_month = Column(Integer, default=100_000)
# Billing
plan_tier = Column(String(50), default="free")
stripe_customer_id = Column(String(255))
# Status
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
class User(Base):
__tablename__ = "users"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
organization_id = Column(UUID, ForeignKey("organizations.id"), nullable=False, index=True)
# ... other fields
class Document(Base):
__tablename__ = "documents"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
organization_id = Column(UUID, ForeignKey("organizations.id"), nullable=False, index=True)
# ... other fields
__table_args__ = (
Index('idx_documents_org', 'organization_id'),
)
Security Measures
- All queries automatically filter by organization_id
- Middleware extracts org from JWT token
- API endpoints validate org access
- Background jobs include org context
async def get_current_organization(
current_user: User = Depends(get_current_user)
) -> Organization:
org = await db.get(Organization, current_user.organization_id)
if not org or not org.is_active:
raise HTTPException(status_code=403, detail="Organization not active")
return org
ADR-011: Backup and Disaster Recovery
Status: Accepted Date: 2025-11-01 Deciders: Architecture Team, DevOps Team
Context
Need reliable backup strategy and disaster recovery plan to prevent data loss and ensure business continuity.
Decision
Implement automated backups with point-in-time recovery and multi-region disaster recovery.
Rationale
Pros:
- Automated daily backups
- Point-in-time recovery for last 7 days
- Cross-region replication for DR
- Encryption at rest and in transit
- Compliance with data protection regulations
Cons:
- Additional storage costs
- Slightly higher latency for cross-region writes
- Complexity in failover procedures
Backup Strategy
PostgreSQL (Cloud SQL)
- Automated daily backups at 2 AM UTC
- 30-day retention period
- Point-in-time recovery enabled (7 days)
- Transaction log backups every 5 minutes
- Cross-region replica in
us-east1(async)
Redis
- RDB snapshots every 6 hours
- AOF (Append-Only File) enabled
- Snapshots stored in GCS
- 7-day retention
Google Cloud Storage (GCS)
- Versioning enabled on all buckets
- 90-day retention for deleted objects
- Cross-region bucket replication
- Object lifecycle management
Application Data
- Kubernetes manifests in Git (GitOps)
- Secrets in Google Secret Manager with versioning
- Configuration in ConfigMaps (backed up to Git)
Recovery Time Objectives (RTO/RPO)
| Component | RTO | RPO |
|---|---|---|
| Database | 1 hour | 5 minutes |
| Redis | 30 minutes | 6 hours |
| GCS Objects | Immediate | 0 (versioned) |
| Application | 30 minutes | 0 (stateless) |
Disaster Recovery Procedures
# Disaster Recovery Runbook
## Database Failure
1. Promote read replica to primary
2. Update application connection strings
3. Verify data integrity
4. Monitor replication lag
## Complete Region Failure
1. Update DNS to point to DR region
2. Promote DR database to primary
3. Start application pods in DR region
4. Verify critical services
5. Notify stakeholders
## Data Corruption
1. Identify corruption timestamp
2. Stop application writes
3. Restore from point-in-time backup
4. Verify data integrity
5. Resume operations
Backup Testing
- Monthly restore drills
- Quarterly DR failover tests
- Automated backup validation
- Recovery time measurement
Implementation
# Automated backup script
#!/bin/bash
# PostgreSQL backup
gcloud sql backups create \
--instance=pdf-analysis-db \
--project=$PROJECT_ID
# Export to GCS for long-term storage
gcloud sql export sql pdf-analysis-db \
gs://$BACKUP_BUCKET/sql/$(date +%Y%m%d)/backup.sql \
--database=pdfanalysis
# Redis snapshot
redis-cli BGSAVE
gsutil cp /var/lib/redis/dump.rdb \
gs://$BACKUP_BUCKET/redis/$(date +%Y%m%d)/
# Verify backups
python verify_backups.py
ADR-012: API Rate Limiting Implementation
Status: Accepted Date: 2025-11-01 Deciders: Architecture Team
Context
Need to prevent API abuse, ensure fair usage, and protect infrastructure from overload.
Decision
Implement sliding window rate limiting using Redis with tiered limits based on user plan.
Rationale
Pros:
- Precise rate limiting with sliding window
- Fast (Redis-based)
- Distributed across multiple instances
- User-friendly with clear error messages
- Supports different tiers
Cons:
- Redis dependency
- Slightly higher latency per request
- Need to handle Redis failures gracefully
Alternatives Considered
-
Fixed Window Rate Limiting
- Pros: Simple, less memory
- Cons: Burst traffic at window boundaries
-
Token Bucket
- Pros: Allows bursts
- Cons: More complex, harder to explain to users
-
Application-Level In-Memory
- Pros: Very fast
- Cons: Not distributed, lost on restart
Rate Limit Tiers
RATE_LIMITS = {
"free": {
"requests_per_minute": 10,
"requests_per_hour": 100,
"requests_per_day": 1000,
"documents_per_day": 10,
},
"pro": {
"requests_per_minute": 100,
"requests_per_hour": 5000,
"requests_per_day": 50000,
"documents_per_day": 500,
},
"enterprise": {
"requests_per_minute": 1000,
"requests_per_hour": 50000,
"requests_per_day": 1000000,
"documents_per_day": 10000,
}
}
Implementation
from fastapi import Request, HTTPException
from datetime import datetime
import redis.asyncio as redis
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def check_rate_limit(
self,
user_id: str,
tier: str,
window: str = "minute"
) -> bool:
"""
Sliding window rate limiter.
Uses Redis sorted sets with timestamps as scores.
Removes expired entries and counts remaining.
"""
now = datetime.utcnow().timestamp()
window_seconds = {
"minute": 60,
"hour": 3600,
"day": 86400
}[window]
window_start = now - window_seconds
key = f"rate_limit:{user_id}:{window}"
# Remove expired entries
await self.redis.zremrangebyscore(key, 0, window_start)
# Count requests in window
count = await self.redis.zcard(key)
limit = RATE_LIMITS[tier][f"requests_per_{window}"]
if count >= limit:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"limit": limit,
"window": window,
"retry_after": int(window_seconds - (now - window_start))
},
headers={
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(now + window_seconds)),
"Retry-After": str(window_seconds)
}
)
# Add current request
await self.redis.zadd(key, {str(now): now})
await self.redis.expire(key, window_seconds)
return True
# Middleware
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
user = await get_current_user(request)
limiter = request.app.state.rate_limiter
await limiter.check_rate_limit(user.id, user.tier, "minute")
await limiter.check_rate_limit(user.id, user.tier, "hour")
response = await call_next(request)
return response
Response Headers
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 47
X-RateLimit-Reset: 1698765432
Retry-After: 60
ADR-013: Logging and Audit Trail Format
Status: Accepted Date: 2025-11-01 Deciders: Architecture Team, Security Team
Context
Need structured logging for debugging, monitoring, and compliance. Audit trail required for security and regulatory compliance.
Decision
Implement structured JSON logging with comprehensive audit trail for all sensitive operations.
Rationale
Pros:
- Easy to parse and analyze
- Works well with Cloud Logging
- Searchable and filterable
- Consistent format across services
- Machine-readable
- Supports correlation IDs
Cons:
- Larger log size than plain text
- Less human-readable in raw form
- Need log aggregation tool
Log Levels
- DEBUG: Detailed diagnostic information
- INFO: General informational messages
- WARNING: Warning messages, degraded functionality
- ERROR: Error messages, operation failed
- CRITICAL: Critical errors, service disruption
Log Format
{
"timestamp": "2025-11-01T12:34:56.789Z",
"level": "INFO",
"service": "backend",
"logger": "api.documents",
"message": "Document uploaded successfully",
"trace_id": "550e8400-e29b-41d4-a716-446655440000",
"span_id": "7fb8a3c2",
"user_id": "user-123",
"organization_id": "org-456",
"request_id": "req-789",
"context": {
"document_id": "doc-001",
"filename": "report.pdf",
"size_bytes": 1048576,
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0..."
},
"performance": {
"duration_ms": 245,
"memory_mb": 128
},
"environment": "production",
"version": "1.0.0"
}
Implementation
import logging
import json
from datetime import datetime
from contextvars import ContextVar
import structlog
# Context variables for request tracking
trace_id_var: ContextVar[str] = ContextVar("trace_id", default=None)
user_id_var: ContextVar[str] = ContextVar("user_id", default=None)
# Configure structured logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Usage
logger.info(
"document_uploaded",
document_id=doc.id,
filename=doc.filename,
size_bytes=doc.size_bytes,
user_id=user.id
)
Audit Trail
class AuditLog(Base):
__tablename__ = "audit_logs"
id = Column(UUID, primary_key=True, default=uuid.uuid4)
timestamp = Column(DateTime, default=datetime.utcnow, index=True)
# Who
user_id = Column(UUID, ForeignKey("users.id"), index=True)
organization_id = Column(UUID, ForeignKey("organizations.id"), index=True)
ip_address = Column(String(45))
user_agent = Column(String(500))
# What
action = Column(String(100), nullable=False, index=True)
resource_type = Column(String(50), nullable=False, index=True)
resource_id = Column(UUID, index=True)
# Details
old_values = Column(JSONB)
new_values = Column(JSONB)
metadata = Column(JSONB, default={})
# Outcome
status = Column(String(20)) # success, failure
error_message = Column(Text)
# Tracing
request_id = Column(UUID)
trace_id = Column(String(100))
__table_args__ = (
Index('idx_audit_user_action', 'user_id', 'action'),
Index('idx_audit_resource', 'resource_type', 'resource_id'),
Index('idx_audit_timestamp', 'timestamp'),
)
Audited Actions
- User login/logout
- Document upload/delete
- API key creation/revocation
- Permission changes
- Configuration updates
- Data exports
- Admin actions
Retention Policy
- Application logs: 30 days
- Audit logs: 7 years (compliance)
- Error logs: 90 days
- Access logs: 90 days
ADR-014: CI/CD Pipeline Architecture
Status: Accepted Date: 2025-11-01 Deciders: DevOps Team, Architecture Team
Context
Need automated pipeline for building, testing, and deploying application safely and reliably.
Decision
Implement GitHub Actions-based CI/CD with multi-stage pipeline and environment promotions.
Rationale
Pros:
- Native GitHub integration
- Free for public repos, affordable for private
- YAML-based configuration
- Large marketplace of actions
- Parallel job execution
- Secrets management built-in
- Self-hosted runners supported
Cons:
- Less powerful than Jenkins
- Vendor lock-in to GitHub
- Limited debugging capabilities
- Concurrent job limits
Alternatives Considered
-
GitLab CI/CD
- Pros: Integrated, powerful
- Cons: Need GitLab migration
-
CircleCI
- Pros: Fast, good caching
- Cons: Cost, separate tool
-
Jenkins
- Pros: Most powerful, flexible
- Cons: Self-hosted, complex
Pipeline Stages
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]
jobs:
# Stage 1: Code Quality
lint-and-format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Lint Python
run: pylint **/*.py
- name: Lint TypeScript
run: npm run lint
# Stage 2: Security
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Trivy scan
uses: aquasecurity/trivy-action@master
- name: Snyk scan
uses: snyk/actions/python@master
# Stage 3: Test
test:
runs-on: ubuntu-latest
needs: [lint-and-format]
steps:
- uses: actions/checkout@v4
- name: Run tests
run: pytest --cov=.
- name: Upload coverage
uses: codecov/codecov-action@v4
# Stage 4: Build
build:
runs-on: ubuntu-latest
needs: [test, security-scan]
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: docker build -t app:${{ github.sha }} .
- name: Push to registry
run: docker push gcr.io/$PROJECT/app:${{ github.sha }}
# Stage 5: Deploy to Staging
deploy-staging:
runs-on: ubuntu-latest
needs: [build]
if: github.ref == 'refs/heads/develop'
environment: staging
steps:
- name: Deploy to GKE
run: kubectl set image deployment/app app=gcr.io/$PROJECT/app:${{ github.sha }}
# Stage 6: Deploy to Production
deploy-production:
runs-on: ubuntu-latest
needs: [build]
if: github.event_name == 'release'
environment: production
steps:
- name: Canary deployment
run: ./scripts/canary-deploy.sh
- name: Monitor metrics
run: ./scripts/monitor-canary.sh
- name: Promote canary
run: ./scripts/promote-canary.sh
Deployment Strategy
Environments:
- Development: Auto-deploy on every commit to
develop - Staging: Auto-deploy on PR merge to
develop - Production: Manual approval + canary deployment
Deployment Steps:
- Build and push Docker images
- Update Kubernetes manifests
- Deploy to canary (10% traffic)
- Monitor for 5 minutes
- Promote to full deployment
- Run smoke tests
Rollback Procedure
# Automatic rollback if:
# - Pod crash rate > 10%
# - Error rate > 5%
# - p95 latency > 2x baseline
kubectl rollout undo deployment/backend -n production
kubectl rollout undo deployment/frontend -n production
Metrics to Track
- Build success rate
- Deploy frequency
- Mean time to recovery (MTTR)
- Change failure rate
- Lead time for changes
ADR-015: Secret Management Approach
Status: Accepted Date: 2025-11-01 Deciders: Security Team, DevOps Team
Context
Need secure storage and access control for API keys, database credentials, and other secrets.
Decision
Use Google Secret Manager for secret storage with Kubernetes Workload Identity for access.
Rationale
Pros:
- Centralized secret management
- Automatic rotation support
- Versioning and audit trail
- IAM-based access control
- Encryption at rest and in transit
- Integration with GKE
- No secrets in Git or environment variables
Cons:
- GCP vendor lock-in
- Slight latency for secret retrieval
- Cost per secret access
- Learning curve for team
Alternatives Considered
-
Kubernetes Secrets
- Pros: Native to K8s, simple
- Cons: Base64 only, no rotation, visible in etcd
-
HashiCorp Vault
- Pros: Very powerful, multi-cloud
- Cons: Self-hosted, complex, high cost
-
AWS Secrets Manager
- Pros: AWS native
- Cons: Vendor lock-in, need AWS account
Secret Categories
Application Secrets:
- Anthropic API key
- JWT signing key
- OAuth client secrets
- Webhook signing secrets
Infrastructure Secrets:
- Database passwords
- Redis passwords
- GCS service account keys
- Docker registry credentials
Third-Party Integrations:
- Stripe API keys
- SendGrid API keys
- Monitoring API keys
- Logging API keys
Implementation
from google.cloud import secretmanager
from functools import lru_cache
class SecretManager:
def __init__(self, project_id: str):
self.client = secretmanager.SecretManagerServiceClient()
self.project_id = project_id
@lru_cache(maxsize=100)
def get_secret(self, secret_name: str, version: str = "latest") -> str:
"""
Retrieve secret from Google Secret Manager.
Cached to reduce API calls.
"""
name = f"projects/{self.project_id}/secrets/{secret_name}/versions/{version}"
response = self.client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")
def create_secret(self, secret_name: str, secret_value: str):
"""Create new secret"""
parent = f"projects/{self.project_id}"
# Create secret
secret = self.client.create_secret(
request={
"parent": parent,
"secret_id": secret_name,
"secret": {"replication": {"automatic": {}}},
}
)
# Add secret version
self.client.add_secret_version(
request={
"parent": secret.name,
"payload": {"data": secret_value.encode("UTF-8")},
}
)
# Usage in application
secrets = SecretManager(project_id="my-project")
ANTHROPIC_API_KEY = secrets.get_secret("anthropic-api-key")
DATABASE_PASSWORD = secrets.get_secret("database-password")
Kubernetes Integration
apiVersion: v1
kind: ServiceAccount
metadata:
name: pdf-analysis-sa
annotations:
iam.gke.io/gcp-service-account: pdf-analysis@project.iam.gserviceaccount.com
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: backend
spec:
template:
spec:
serviceAccountName: pdf-analysis-sa
containers:
- name: backend
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: app-secrets
key: anthropic-api-key
Secret Rotation Policy
- API Keys: Rotate every 90 days
- Database Passwords: Rotate every 180 days
- JWT Signing Keys: Rotate every 30 days
- Service Account Keys: Rotate every 90 days
Access Control
# Grant access to secret
gcloud secrets add-iam-policy-binding anthropic-api-key \
--member="serviceAccount:pdf-analysis@project.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
# Audit secret access
gcloud logging read "resource.type=secretmanager.googleapis.com/Secret" \
--limit=50 --format=json
Best Practices
- Never commit secrets to Git
- Use different secrets per environment
- Enable audit logging
- Rotate regularly
- Use least-privilege access
- Monitor secret usage
- Encrypt secrets at application layer for extra security