Skip to main content

Architecture Decision Records

---------- Use FastAPI for Backend Framework 2025-10-31 ADR-002 Accepted Deploy on Google Kubernetes Engine 2025-10-31 ADR-004 Accepted...

Architecture Decision Records

ADR Index

ADRTitleStatusDate
ADR-001Use FastAPI for Backend FrameworkAccepted2025-10-31
ADR-002WebSocket for Real-Time CommunicationAccepted2025-10-31
ADR-003Deploy on Google Kubernetes EngineAccepted2025-10-31
ADR-004Use Anthropic Claude for AI AnalysisAccepted2025-10-31
ADR-005Event-Driven Architecture with Pub/SubAccepted2025-10-31
ADR-006React with TypeScript for FrontendAccepted2025-10-31
ADR-007Zustand for State ManagementAccepted2025-10-31
ADR-008PostgreSQL for Primary DatabaseAccepted2025-10-31
ADR-009Document Versioning StrategyAccepted2025-11-01
ADR-010Multi-Tenancy ApproachAccepted2025-11-01
ADR-011Backup and Disaster RecoveryAccepted2025-11-01
ADR-012API Rate Limiting ImplementationAccepted2025-11-01
ADR-013Logging and Audit Trail FormatAccepted2025-11-01
ADR-014CI/CD Pipeline ArchitectureAccepted2025-11-01
ADR-015Secret Management ApproachAccepted2025-11-01

ADR-001: Use FastAPI for Backend Framework

Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team

Context

Need to select a Python web framework for building REST API and WebSocket endpoints with high performance and modern async support.

Decision

Use FastAPI as the primary backend framework.

Rationale

Pros:

  • Native async/await support for high concurrency
  • Automatic OpenAPI documentation generation
  • Built-in request validation with Pydantic
  • WebSocket support out of the box
  • Excellent performance (comparable to Node.js/Go)
  • Type hints improve code quality and IDE support
  • Large ecosystem and active community

Cons:

  • Relatively newer than Django/Flask
  • Less mature ecosystem for some use cases
  • Smaller pool of developers familiar with it

Alternatives Considered

  1. Django + Django Channels

    • Pros: Mature, full-featured, ORM
    • Cons: Heavier, synchronous by default, more complex setup
  2. Flask + Flask-SocketIO

    • Pros: Lightweight, flexible
    • Cons: Manual setup for async, no auto-documentation
  3. Sanic

    • Pros: Very fast, async-first
    • Cons: Smaller community, less tooling

Consequences

Positive:

  • Fast development with automatic API docs
  • High performance for concurrent operations
  • Type safety reduces bugs
  • Easy testing with async support

Negative:

  • Team needs to learn FastAPI patterns
  • Some third-party integrations may be immature

Implementation Notes

from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(
title="PDF Analysis API",
version="1.0.0",
docs_url="/api/docs"
)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)

ADR-002: WebSocket for Real-Time Communication

Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team

Context

Need real-time updates for PDF processing progress and analysis results without client polling.

Decision

Use WebSocket protocol for bidirectional real-time communication between frontend and backend.

Rationale

Pros:

  • True real-time bidirectional communication
  • Lower latency than polling
  • Reduced server load compared to HTTP polling
  • Native browser support
  • Works well with FastAPI

Cons:

  • More complex than REST
  • Requires connection management
  • Scaling requires sticky sessions or message broker
  • Debugging more difficult than HTTP

Alternatives Considered

  1. HTTP Polling

    • Pros: Simple, stateless
    • Cons: High latency, inefficient, server load
  2. Server-Sent Events (SSE)

    • Pros: Simple, HTTP-based
    • Cons: One-way only, no binary support
  3. gRPC Streaming

    • Pros: Efficient, bidirectional
    • Cons: Complex setup, limited browser support

Consequences

Positive:

  • Instant progress updates for users
  • Better user experience with real-time feedback
  • Reduced API calls and server load

Negative:

  • Need connection recovery logic
  • Requires Redis pub/sub for multi-instance scaling
  • More complex error handling

Implementation Strategy

from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set
import json

class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, Set[WebSocket]] = {}

async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
if user_id not in self.active_connections:
self.active_connections[user_id] = set()
self.active_connections[user_id].add(websocket)

def disconnect(self, websocket: WebSocket, user_id: str):
if user_id in self.active_connections:
self.active_connections[user_id].discard(websocket)

async def send_personal_message(
self,
message: dict,
user_id: str
):
if user_id in self.active_connections:
for connection in self.active_connections[user_id]:
await connection.send_text(json.dumps(message))

ADR-003: Deploy on Google Kubernetes Engine

Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team, DevOps Team

Context

Need scalable, managed container orchestration platform for production deployment.

Decision

Deploy application on Google Kubernetes Engine (GKE) with Autopilot mode.

Rationale

Pros:

  • Managed Kubernetes reduces operational overhead
  • Autopilot mode eliminates node management
  • Native integration with GCP services (Cloud SQL, GCS, etc.)
  • Built-in monitoring and logging
  • Auto-scaling capabilities
  • High availability across zones
  • Cost-effective with commitment discounts

Cons:

  • Vendor lock-in to GCP
  • Learning curve for Kubernetes
  • More complex than simpler PaaS options

Alternatives Considered

  1. AWS EKS

    • Pros: AWS ecosystem, mature
    • Cons: More expensive, manual node management
  2. Cloud Run

    • Pros: Simpler, serverless
    • Cons: Limited for WebSocket, less control
  3. Google Compute Engine VMs

    • Pros: Full control
    • Cons: Manual scaling, no container orchestration
  4. Azure AKS

    • Pros: Good pricing, Azure integration
    • Cons: Less mature than GKE

Consequences

Positive:

  • Easy horizontal scaling
  • High availability and fault tolerance
  • Integrated monitoring and logging
  • GitOps-friendly deployment
  • Resource efficiency with Autopilot

Negative:

  • Kubernetes complexity for team
  • Need to manage YAML configurations
  • Debugging can be challenging

Implementation Approach

# GKE Autopilot cluster
apiVersion: container.cnrm.cloud.google.com/v1beta1
kind: ContainerCluster
metadata:
name: pdf-analysis-cluster
spec:
location: us-central1
enableAutopilot: true
releaseChannel:
channel: REGULAR
networkingMode: VPC_NATIVE
workloadIdentityConfig:
workloadPool: project-id.svc.id.goog

ADR-004: Use Anthropic Claude for AI Analysis

Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team, AI Team

Context

Need powerful LLM for intelligent PDF content analysis, extraction, and cross-validation.

Decision

Use Anthropic Claude (Sonnet 4) as the primary AI model for document analysis.

Rationale

Pros:

  • Excellent at structured data extraction
  • Strong reasoning capabilities for validation
  • Large context window (200K tokens)
  • JSON mode for structured outputs
  • Reliable API with good uptime
  • Strong safety and alignment
  • Competitive pricing

Cons:

  • API dependency and cost
  • Rate limits for high volume
  • No self-hosted option

Alternatives Considered

  1. OpenAI GPT-4

    • Pros: Well-known, good performance
    • Cons: Shorter context, more expensive
  2. Google Vertex AI (PaLM)

    • Pros: GCP integration, lower cost
    • Cons: Less capable for extraction tasks
  3. Open-Source Models (Llama, Mistral)

    • Pros: Self-hosted, no API costs
    • Cons: Require GPU infrastructure, lower quality

Consequences

Positive:

  • High-quality extraction and analysis
  • Large context handles full documents
  • Structured output reduces parsing errors
  • Reliable service with good support

Negative:

  • Ongoing API costs scale with usage
  • Latency depends on external service
  • Need fallback for API outages

Cost Analysis

# Estimated costs per document
COST_PER_1M_INPUT_TOKENS = 3.00 # USD
COST_PER_1M_OUTPUT_TOKENS = 15.00 # USD

# Average 10-page PDF
avg_input_tokens = 8_000 # 800 tokens/page
avg_output_tokens = 2_000 # Structured extraction

cost_per_document = (
(avg_input_tokens / 1_000_000 * COST_PER_1M_INPUT_TOKENS) +
(avg_output_tokens / 1_000_000 * COST_PER_1M_OUTPUT_TOKENS)
)
# ~$0.05 per document

Token Budget Strategy

  • Structure analysis: 2,000 tokens
  • Component extraction: 4,000 tokens/page
  • Cross-validation: 3,000 tokens
  • Total budget: 100,000 tokens/document max

ADR-005: Event-Driven Architecture with Pub/Sub

Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team

Context

Need decoupled architecture for processing pipeline with fault tolerance and scalability.

Decision

Implement event-driven architecture using Redis Pub/Sub for inter-service communication.

Rationale

Pros:

  • Loose coupling between services
  • Easy to add new consumers
  • Fault tolerance through retry mechanisms
  • Scalable processing pipeline
  • Asynchronous processing
  • Event sourcing enables audit trails

Cons:

  • More complex than direct calls
  • Debugging distributed events harder
  • Need event schema management
  • Eventual consistency

Alternatives Considered

  1. Direct Service Calls (gRPC)

    • Pros: Simple, synchronous
    • Cons: Tight coupling, cascading failures
  2. Cloud Pub/Sub

    • Pros: Managed, scalable
    • Cons: Higher cost, external dependency
  3. RabbitMQ

    • Pros: Feature-rich, mature
    • Cons: More infrastructure, heavier

Event Schema

from enum import Enum
from pydantic import BaseModel
from datetime import datetime

class EventType(str, Enum):
DOCUMENT_UPLOADED = "document.uploaded"
PROCESSING_STARTED = "processing.started"
PROCESSING_COMPLETED = "processing.completed"
PROCESSING_FAILED = "processing.failed"
ANALYSIS_REQUESTED = "analysis.requested"
ANALYSIS_COMPLETED = "analysis.completed"

class Event(BaseModel):
event_id: str
event_type: EventType
timestamp: datetime
user_id: str
document_id: str
data: dict
metadata: dict = {}

Implementation

import redis.asyncio as redis
import json

class EventBus:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client

async def publish(self, event: Event):
"""Publish event to channel"""
channel = f"events.{event.event_type}"
await self.redis.publish(
channel,
event.model_dump_json()
)

async def subscribe(self, event_type: EventType, handler):
"""Subscribe to event channel"""
channel = f"events.{event_type}"
pubsub = self.redis.pubsub()
await pubsub.subscribe(channel)

async for message in pubsub.listen():
if message['type'] == 'message':
event = Event.model_validate_json(message['data'])
await handler(event)

ADR-006: React with TypeScript for Frontend

Status: Accepted
Date: 2025-10-31
Deciders: Frontend Team

Context

Need modern, performant frontend framework for building interactive file management UI.

Decision

Use React 18 with TypeScript for frontend development.

Rationale

Pros:

  • Component-based architecture
  • Large ecosystem and community
  • Excellent TypeScript support
  • Virtual DOM for performance
  • Concurrent rendering features
  • Strong tooling (ESLint, Prettier, etc.)
  • Server components (future)

Cons:

  • Boilerplate compared to Vue
  • Need additional libraries for routing, state
  • Learning curve for new developers

Alternatives Considered

  1. Vue 3

    • Pros: Simpler, less boilerplate
    • Cons: Smaller ecosystem, less TypeScript maturity
  2. Svelte

    • Pros: No virtual DOM, smaller bundles
    • Cons: Smaller community, fewer libraries
  3. Angular

    • Pros: Full-featured, TypeScript-first
    • Cons: Heavy, steep learning curve

Technology Stack

{
"dependencies": {
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-router-dom": "^6.18.0",
"zustand": "^4.4.0",
"@mui/material": "^5.14.0",
"axios": "^1.5.0"
},
"devDependencies": {
"@types/react": "^18.2.0",
"@types/react-dom": "^18.2.0",
"typescript": "^5.2.0",
"vite": "^5.0.0",
"vitest": "^1.0.0"
}
}

ADR-007: Zustand for State Management

Status: Accepted
Date: 2025-10-31
Deciders: Frontend Team

Context

Need lightweight state management solution for React application.

Decision

Use Zustand for global state management.

Rationale

Pros:

  • Minimal boilerplate
  • No Provider wrapper needed
  • TypeScript support
  • Small bundle size (1KB)
  • Hook-based API
  • Middleware support
  • Easy to test

Cons:

  • Less features than Redux
  • Smaller community
  • Fewer DevTools integrations

Alternatives Considered

  1. Redux Toolkit

    • Pros: Industry standard, DevTools
    • Cons: More boilerplate, larger bundle
  2. Recoil

    • Pros: Atom-based, fine-grained updates
    • Cons: More complex, Facebook dependency
  3. Context API + useReducer

    • Pros: Built-in, no dependencies
    • Cons: Performance issues, verbose

Example Store

import { create } from 'zustand';
import { persist } from 'zustand/middleware';

interface AppState {
user: User | null;
documents: Document[];
setUser: (user: User | null) => void;
addDocument: (doc: Document) => void;
}

export const useAppStore = create<AppState>()(
persist(
(set) => ({
user: null,
documents: [],
setUser: (user) => set({ user }),
addDocument: (doc) => set((state) => ({
documents: [...state.documents, doc]
})),
}),
{
name: 'app-storage',
}
)
);

ADR-008: PostgreSQL for Primary Database

Status: Accepted
Date: 2025-10-31
Deciders: Architecture Team, Database Team

Context

Need relational database for storing document metadata, user data, and analysis results.

Decision

Use PostgreSQL 15 via Cloud SQL as the primary database.

Rationale

Pros:

  • ACID compliance
  • JSON/JSONB support for flexible schemas
  • Full-text search capabilities
  • Excellent performance
  • Strong consistency
  • Mature ecosystem
  • GCP managed service
  • Point-in-time recovery

Cons:

  • Vertical scaling limitations
  • Sharding complexity for huge scale
  • Not ideal for time-series data

Alternatives Considered

  1. MySQL

    • Pros: Simple, popular
    • Cons: Less feature-rich, JSON support inferior
  2. MongoDB

    • Pros: Flexible schema, horizontal scaling
    • Cons: No transactions (older versions), consistency issues
  3. Cloud Spanner

    • Pros: Global scale, strong consistency
    • Cons: Very expensive, overkill for MVP

Schema Design Principles

  • Use UUIDs for primary keys
  • JSONB for flexible metadata
  • Proper indexing for performance
  • Foreign keys for referential integrity
  • Timestamps for all records

Connection Pooling

from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker
)

engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600
)

AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)

ADR-009: Document Versioning Strategy

Status: Accepted Date: 2025-11-01 Deciders: Architecture Team

Context

Documents may be updated or reprocessed with improved AI models. Need strategy for managing document versions and analysis history.

Decision

Implement append-only versioning with immutable document records and soft deletes.

Rationale

Pros:

  • Complete audit trail of changes
  • Easy rollback to previous versions
  • No data loss
  • Supports compliance requirements
  • Can compare analysis results across versions

Cons:

  • Increased storage requirements
  • More complex queries
  • Need cleanup strategy for old versions

Alternatives Considered

  1. Overwrite Strategy

    • Pros: Simple, less storage
    • Cons: No history, no rollback
  2. Snapshot-Based Versioning

    • Pros: Full snapshots at each version
    • Cons: Very high storage cost
  3. Delta-Based Versioning

    • Pros: Storage efficient
    • Cons: Complex, slower retrieval

Implementation

class Document(Base):
__tablename__ = "documents"

id = Column(UUID, primary_key=True, default=uuid.uuid4)
version = Column(Integer, nullable=False, default=1)
parent_id = Column(UUID, ForeignKey("documents.id"))

# Soft delete
deleted_at = Column(DateTime)
is_current = Column(Boolean, default=True, index=True)

# Version metadata
version_created_at = Column(DateTime, default=datetime.utcnow)
version_created_by = Column(UUID, ForeignKey("users.id"))
version_notes = Column(Text)

__table_args__ = (
Index('idx_document_parent_version', 'parent_id', 'version'),
Index('idx_document_current', 'parent_id', 'is_current'),
)

Version Management Rules

  • New upload creates version 1
  • Reprocessing creates new version with same parent_id
  • Only latest version has is_current=True
  • Soft delete sets deleted_at, preserves record
  • Hard delete after 90 days retention period

API Endpoints

GET /api/v1/documents/{id}/versions      # List all versions
GET /api/v1/documents/{id}/versions/{v} # Get specific version
POST /api/v1/documents/{id}/reprocess # Create new version

ADR-010: Multi-Tenancy Approach

Status: Accepted Date: 2025-11-01 Deciders: Architecture Team

Context

Platform needs to support multiple organizations/teams with data isolation and resource quotas.

Decision

Implement shared-database, row-level multi-tenancy with organization-based partitioning.

Rationale

Pros:

  • Cost-effective (shared infrastructure)
  • Easy to manage and deploy
  • Simple scaling
  • Row-level security ensures isolation
  • Good balance of isolation and efficiency

Cons:

  • Less isolation than separate databases
  • Potential for data leakage bugs
  • Noisy neighbor issues
  • Schema changes affect all tenants

Alternatives Considered

  1. Separate Database Per Tenant

    • Pros: Complete isolation, custom schemas
    • Cons: High cost, complex management
  2. Separate Schema Per Tenant

    • Pros: Better isolation, same DB
    • Cons: Migration complexity, connection pool issues
  3. Kubernetes Namespace Per Tenant

    • Pros: Strong isolation
    • Cons: Very expensive, complex

Implementation

class Organization(Base):
__tablename__ = "organizations"

id = Column(UUID, primary_key=True, default=uuid.uuid4)
name = Column(String(255), nullable=False)
slug = Column(String(100), unique=True, nullable=False, index=True)

# Quotas
max_users = Column(Integer, default=10)
max_documents = Column(Integer, default=1000)
max_storage_bytes = Column(BigInteger, default=10_737_418_240) # 10GB
max_api_calls_per_month = Column(Integer, default=100_000)

# Billing
plan_tier = Column(String(50), default="free")
stripe_customer_id = Column(String(255))

# Status
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)

class User(Base):
__tablename__ = "users"

id = Column(UUID, primary_key=True, default=uuid.uuid4)
organization_id = Column(UUID, ForeignKey("organizations.id"), nullable=False, index=True)
# ... other fields

class Document(Base):
__tablename__ = "documents"

id = Column(UUID, primary_key=True, default=uuid.uuid4)
organization_id = Column(UUID, ForeignKey("organizations.id"), nullable=False, index=True)
# ... other fields

__table_args__ = (
Index('idx_documents_org', 'organization_id'),
)

Security Measures

  • All queries automatically filter by organization_id
  • Middleware extracts org from JWT token
  • API endpoints validate org access
  • Background jobs include org context
async def get_current_organization(
current_user: User = Depends(get_current_user)
) -> Organization:
org = await db.get(Organization, current_user.organization_id)
if not org or not org.is_active:
raise HTTPException(status_code=403, detail="Organization not active")
return org

ADR-011: Backup and Disaster Recovery

Status: Accepted Date: 2025-11-01 Deciders: Architecture Team, DevOps Team

Context

Need reliable backup strategy and disaster recovery plan to prevent data loss and ensure business continuity.

Decision

Implement automated backups with point-in-time recovery and multi-region disaster recovery.

Rationale

Pros:

  • Automated daily backups
  • Point-in-time recovery for last 7 days
  • Cross-region replication for DR
  • Encryption at rest and in transit
  • Compliance with data protection regulations

Cons:

  • Additional storage costs
  • Slightly higher latency for cross-region writes
  • Complexity in failover procedures

Backup Strategy

PostgreSQL (Cloud SQL)

  • Automated daily backups at 2 AM UTC
  • 30-day retention period
  • Point-in-time recovery enabled (7 days)
  • Transaction log backups every 5 minutes
  • Cross-region replica in us-east1 (async)

Redis

  • RDB snapshots every 6 hours
  • AOF (Append-Only File) enabled
  • Snapshots stored in GCS
  • 7-day retention

Google Cloud Storage (GCS)

  • Versioning enabled on all buckets
  • 90-day retention for deleted objects
  • Cross-region bucket replication
  • Object lifecycle management

Application Data

  • Kubernetes manifests in Git (GitOps)
  • Secrets in Google Secret Manager with versioning
  • Configuration in ConfigMaps (backed up to Git)

Recovery Time Objectives (RTO/RPO)

ComponentRTORPO
Database1 hour5 minutes
Redis30 minutes6 hours
GCS ObjectsImmediate0 (versioned)
Application30 minutes0 (stateless)

Disaster Recovery Procedures

# Disaster Recovery Runbook

## Database Failure
1. Promote read replica to primary
2. Update application connection strings
3. Verify data integrity
4. Monitor replication lag

## Complete Region Failure
1. Update DNS to point to DR region
2. Promote DR database to primary
3. Start application pods in DR region
4. Verify critical services
5. Notify stakeholders

## Data Corruption
1. Identify corruption timestamp
2. Stop application writes
3. Restore from point-in-time backup
4. Verify data integrity
5. Resume operations

Backup Testing

  • Monthly restore drills
  • Quarterly DR failover tests
  • Automated backup validation
  • Recovery time measurement

Implementation

# Automated backup script
#!/bin/bash

# PostgreSQL backup
gcloud sql backups create \
--instance=pdf-analysis-db \
--project=$PROJECT_ID

# Export to GCS for long-term storage
gcloud sql export sql pdf-analysis-db \
gs://$BACKUP_BUCKET/sql/$(date +%Y%m%d)/backup.sql \
--database=pdfanalysis

# Redis snapshot
redis-cli BGSAVE
gsutil cp /var/lib/redis/dump.rdb \
gs://$BACKUP_BUCKET/redis/$(date +%Y%m%d)/

# Verify backups
python verify_backups.py

ADR-012: API Rate Limiting Implementation

Status: Accepted Date: 2025-11-01 Deciders: Architecture Team

Context

Need to prevent API abuse, ensure fair usage, and protect infrastructure from overload.

Decision

Implement sliding window rate limiting using Redis with tiered limits based on user plan.

Rationale

Pros:

  • Precise rate limiting with sliding window
  • Fast (Redis-based)
  • Distributed across multiple instances
  • User-friendly with clear error messages
  • Supports different tiers

Cons:

  • Redis dependency
  • Slightly higher latency per request
  • Need to handle Redis failures gracefully

Alternatives Considered

  1. Fixed Window Rate Limiting

    • Pros: Simple, less memory
    • Cons: Burst traffic at window boundaries
  2. Token Bucket

    • Pros: Allows bursts
    • Cons: More complex, harder to explain to users
  3. Application-Level In-Memory

    • Pros: Very fast
    • Cons: Not distributed, lost on restart

Rate Limit Tiers

RATE_LIMITS = {
"free": {
"requests_per_minute": 10,
"requests_per_hour": 100,
"requests_per_day": 1000,
"documents_per_day": 10,
},
"pro": {
"requests_per_minute": 100,
"requests_per_hour": 5000,
"requests_per_day": 50000,
"documents_per_day": 500,
},
"enterprise": {
"requests_per_minute": 1000,
"requests_per_hour": 50000,
"requests_per_day": 1000000,
"documents_per_day": 10000,
}
}

Implementation

from fastapi import Request, HTTPException
from datetime import datetime
import redis.asyncio as redis

class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client

async def check_rate_limit(
self,
user_id: str,
tier: str,
window: str = "minute"
) -> bool:
"""
Sliding window rate limiter.

Uses Redis sorted sets with timestamps as scores.
Removes expired entries and counts remaining.
"""
now = datetime.utcnow().timestamp()
window_seconds = {
"minute": 60,
"hour": 3600,
"day": 86400
}[window]

window_start = now - window_seconds
key = f"rate_limit:{user_id}:{window}"

# Remove expired entries
await self.redis.zremrangebyscore(key, 0, window_start)

# Count requests in window
count = await self.redis.zcard(key)
limit = RATE_LIMITS[tier][f"requests_per_{window}"]

if count >= limit:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"limit": limit,
"window": window,
"retry_after": int(window_seconds - (now - window_start))
},
headers={
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(now + window_seconds)),
"Retry-After": str(window_seconds)
}
)

# Add current request
await self.redis.zadd(key, {str(now): now})
await self.redis.expire(key, window_seconds)

return True

# Middleware
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
user = await get_current_user(request)
limiter = request.app.state.rate_limiter

await limiter.check_rate_limit(user.id, user.tier, "minute")
await limiter.check_rate_limit(user.id, user.tier, "hour")

response = await call_next(request)
return response

Response Headers

X-RateLimit-Limit: 100
X-RateLimit-Remaining: 47
X-RateLimit-Reset: 1698765432
Retry-After: 60

ADR-013: Logging and Audit Trail Format

Status: Accepted Date: 2025-11-01 Deciders: Architecture Team, Security Team

Context

Need structured logging for debugging, monitoring, and compliance. Audit trail required for security and regulatory compliance.

Decision

Implement structured JSON logging with comprehensive audit trail for all sensitive operations.

Rationale

Pros:

  • Easy to parse and analyze
  • Works well with Cloud Logging
  • Searchable and filterable
  • Consistent format across services
  • Machine-readable
  • Supports correlation IDs

Cons:

  • Larger log size than plain text
  • Less human-readable in raw form
  • Need log aggregation tool

Log Levels

  • DEBUG: Detailed diagnostic information
  • INFO: General informational messages
  • WARNING: Warning messages, degraded functionality
  • ERROR: Error messages, operation failed
  • CRITICAL: Critical errors, service disruption

Log Format

{
"timestamp": "2025-11-01T12:34:56.789Z",
"level": "INFO",
"service": "backend",
"logger": "api.documents",
"message": "Document uploaded successfully",
"trace_id": "550e8400-e29b-41d4-a716-446655440000",
"span_id": "7fb8a3c2",
"user_id": "user-123",
"organization_id": "org-456",
"request_id": "req-789",
"context": {
"document_id": "doc-001",
"filename": "report.pdf",
"size_bytes": 1048576,
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0..."
},
"performance": {
"duration_ms": 245,
"memory_mb": 128
},
"environment": "production",
"version": "1.0.0"
}

Implementation

import logging
import json
from datetime import datetime
from contextvars import ContextVar
import structlog

# Context variables for request tracking
trace_id_var: ContextVar[str] = ContextVar("trace_id", default=None)
user_id_var: ContextVar[str] = ContextVar("user_id", default=None)

# Configure structured logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# Usage
logger.info(
"document_uploaded",
document_id=doc.id,
filename=doc.filename,
size_bytes=doc.size_bytes,
user_id=user.id
)

Audit Trail

class AuditLog(Base):
__tablename__ = "audit_logs"

id = Column(UUID, primary_key=True, default=uuid.uuid4)
timestamp = Column(DateTime, default=datetime.utcnow, index=True)

# Who
user_id = Column(UUID, ForeignKey("users.id"), index=True)
organization_id = Column(UUID, ForeignKey("organizations.id"), index=True)
ip_address = Column(String(45))
user_agent = Column(String(500))

# What
action = Column(String(100), nullable=False, index=True)
resource_type = Column(String(50), nullable=False, index=True)
resource_id = Column(UUID, index=True)

# Details
old_values = Column(JSONB)
new_values = Column(JSONB)
metadata = Column(JSONB, default={})

# Outcome
status = Column(String(20)) # success, failure
error_message = Column(Text)

# Tracing
request_id = Column(UUID)
trace_id = Column(String(100))

__table_args__ = (
Index('idx_audit_user_action', 'user_id', 'action'),
Index('idx_audit_resource', 'resource_type', 'resource_id'),
Index('idx_audit_timestamp', 'timestamp'),
)

Audited Actions

  • User login/logout
  • Document upload/delete
  • API key creation/revocation
  • Permission changes
  • Configuration updates
  • Data exports
  • Admin actions

Retention Policy

  • Application logs: 30 days
  • Audit logs: 7 years (compliance)
  • Error logs: 90 days
  • Access logs: 90 days

ADR-014: CI/CD Pipeline Architecture

Status: Accepted Date: 2025-11-01 Deciders: DevOps Team, Architecture Team

Context

Need automated pipeline for building, testing, and deploying application safely and reliably.

Decision

Implement GitHub Actions-based CI/CD with multi-stage pipeline and environment promotions.

Rationale

Pros:

  • Native GitHub integration
  • Free for public repos, affordable for private
  • YAML-based configuration
  • Large marketplace of actions
  • Parallel job execution
  • Secrets management built-in
  • Self-hosted runners supported

Cons:

  • Less powerful than Jenkins
  • Vendor lock-in to GitHub
  • Limited debugging capabilities
  • Concurrent job limits

Alternatives Considered

  1. GitLab CI/CD

    • Pros: Integrated, powerful
    • Cons: Need GitLab migration
  2. CircleCI

    • Pros: Fast, good caching
    • Cons: Cost, separate tool
  3. Jenkins

    • Pros: Most powerful, flexible
    • Cons: Self-hosted, complex

Pipeline Stages

name: CI/CD Pipeline

on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]

jobs:
# Stage 1: Code Quality
lint-and-format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Lint Python
run: pylint **/*.py
- name: Lint TypeScript
run: npm run lint

# Stage 2: Security
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Trivy scan
uses: aquasecurity/trivy-action@master
- name: Snyk scan
uses: snyk/actions/python@master

# Stage 3: Test
test:
runs-on: ubuntu-latest
needs: [lint-and-format]
steps:
- uses: actions/checkout@v4
- name: Run tests
run: pytest --cov=.
- name: Upload coverage
uses: codecov/codecov-action@v4

# Stage 4: Build
build:
runs-on: ubuntu-latest
needs: [test, security-scan]
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: docker build -t app:${{ github.sha }} .
- name: Push to registry
run: docker push gcr.io/$PROJECT/app:${{ github.sha }}

# Stage 5: Deploy to Staging
deploy-staging:
runs-on: ubuntu-latest
needs: [build]
if: github.ref == 'refs/heads/develop'
environment: staging
steps:
- name: Deploy to GKE
run: kubectl set image deployment/app app=gcr.io/$PROJECT/app:${{ github.sha }}

# Stage 6: Deploy to Production
deploy-production:
runs-on: ubuntu-latest
needs: [build]
if: github.event_name == 'release'
environment: production
steps:
- name: Canary deployment
run: ./scripts/canary-deploy.sh
- name: Monitor metrics
run: ./scripts/monitor-canary.sh
- name: Promote canary
run: ./scripts/promote-canary.sh

Deployment Strategy

Environments:

  • Development: Auto-deploy on every commit to develop
  • Staging: Auto-deploy on PR merge to develop
  • Production: Manual approval + canary deployment

Deployment Steps:

  1. Build and push Docker images
  2. Update Kubernetes manifests
  3. Deploy to canary (10% traffic)
  4. Monitor for 5 minutes
  5. Promote to full deployment
  6. Run smoke tests

Rollback Procedure

# Automatic rollback if:
# - Pod crash rate > 10%
# - Error rate > 5%
# - p95 latency > 2x baseline

kubectl rollout undo deployment/backend -n production
kubectl rollout undo deployment/frontend -n production

Metrics to Track

  • Build success rate
  • Deploy frequency
  • Mean time to recovery (MTTR)
  • Change failure rate
  • Lead time for changes

ADR-015: Secret Management Approach

Status: Accepted Date: 2025-11-01 Deciders: Security Team, DevOps Team

Context

Need secure storage and access control for API keys, database credentials, and other secrets.

Decision

Use Google Secret Manager for secret storage with Kubernetes Workload Identity for access.

Rationale

Pros:

  • Centralized secret management
  • Automatic rotation support
  • Versioning and audit trail
  • IAM-based access control
  • Encryption at rest and in transit
  • Integration with GKE
  • No secrets in Git or environment variables

Cons:

  • GCP vendor lock-in
  • Slight latency for secret retrieval
  • Cost per secret access
  • Learning curve for team

Alternatives Considered

  1. Kubernetes Secrets

    • Pros: Native to K8s, simple
    • Cons: Base64 only, no rotation, visible in etcd
  2. HashiCorp Vault

    • Pros: Very powerful, multi-cloud
    • Cons: Self-hosted, complex, high cost
  3. AWS Secrets Manager

    • Pros: AWS native
    • Cons: Vendor lock-in, need AWS account

Secret Categories

Application Secrets:

  • Anthropic API key
  • JWT signing key
  • OAuth client secrets
  • Webhook signing secrets

Infrastructure Secrets:

  • Database passwords
  • Redis passwords
  • GCS service account keys
  • Docker registry credentials

Third-Party Integrations:

  • Stripe API keys
  • SendGrid API keys
  • Monitoring API keys
  • Logging API keys

Implementation

from google.cloud import secretmanager
from functools import lru_cache

class SecretManager:
def __init__(self, project_id: str):
self.client = secretmanager.SecretManagerServiceClient()
self.project_id = project_id

@lru_cache(maxsize=100)
def get_secret(self, secret_name: str, version: str = "latest") -> str:
"""
Retrieve secret from Google Secret Manager.
Cached to reduce API calls.
"""
name = f"projects/{self.project_id}/secrets/{secret_name}/versions/{version}"
response = self.client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")

def create_secret(self, secret_name: str, secret_value: str):
"""Create new secret"""
parent = f"projects/{self.project_id}"

# Create secret
secret = self.client.create_secret(
request={
"parent": parent,
"secret_id": secret_name,
"secret": {"replication": {"automatic": {}}},
}
)

# Add secret version
self.client.add_secret_version(
request={
"parent": secret.name,
"payload": {"data": secret_value.encode("UTF-8")},
}
)

# Usage in application
secrets = SecretManager(project_id="my-project")
ANTHROPIC_API_KEY = secrets.get_secret("anthropic-api-key")
DATABASE_PASSWORD = secrets.get_secret("database-password")

Kubernetes Integration

apiVersion: v1
kind: ServiceAccount
metadata:
name: pdf-analysis-sa
annotations:
iam.gke.io/gcp-service-account: pdf-analysis@project.iam.gserviceaccount.com

---

apiVersion: apps/v1
kind: Deployment
metadata:
name: backend
spec:
template:
spec:
serviceAccountName: pdf-analysis-sa
containers:
- name: backend
env:
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: app-secrets
key: anthropic-api-key

Secret Rotation Policy

  • API Keys: Rotate every 90 days
  • Database Passwords: Rotate every 180 days
  • JWT Signing Keys: Rotate every 30 days
  • Service Account Keys: Rotate every 90 days

Access Control

# Grant access to secret
gcloud secrets add-iam-policy-binding anthropic-api-key \
--member="serviceAccount:pdf-analysis@project.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"

# Audit secret access
gcloud logging read "resource.type=secretmanager.googleapis.com/Secret" \
--limit=50 --format=json

Best Practices

  1. Never commit secrets to Git
  2. Use different secrets per environment
  3. Enable audit logging
  4. Rotate regularly
  5. Use least-privilege access
  6. Monitor secret usage
  7. Encrypt secrets at application layer for extra security