ADR-020: Security Hardening
Status: Accepted Date: 2025-11-30 Deciders: Security Team, Engineering Team Tags: security, authentication, encryption, rate-limiting, compliance
Context
Security Requirements
CODITECT license server handles sensitive data and must be hardened against attacks:
Data at Risk:
- License keys (customer IP)
- User credentials and sessions
- Billing information (Stripe integration)
- Hardware IDs (device fingerprints)
- Usage data (metering)
Attack Vectors:
- Brute force attacks on license validation
- DDoS attacks overwhelming API
- SQL injection via API inputs
- XSS attacks in web dashboard
- Token theft (session hijacking)
- Man-in-the-middle (MITM) attacks
Compliance Requirements:
- GDPR: Encryption at rest/transit, data minimization
- SOC 2: Access control, audit logging, encryption
- HIPAA: PHI protection (if healthcare customers)
- PCI DSS: Secure payment data handling (Stripe)
Decision
We will implement defense-in-depth security with:
- JWT Authentication for API access control
- Encryption at Rest (AES-256) and in Transit (TLS 1.3)
- Rate Limiting to prevent brute force and DDoS
- Input Validation to prevent injection attacks
- Security Headers (CSP, HSTS, X-Frame-Options)
- Audit Logging for compliance and forensics
Security Architecture
┌───────────────────────────────────────────────────────────────┐
│ Security Layers │
└───────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Network Security (GCP) │
│ • Cloud Armor (DDoS protection) │
│ • VPC firewall rules │
│ • TLS 1.3 (Load Balancer) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 2: Application Gateway │
│ • Rate limiting (Redis) │
│ • IP whitelisting/blacklisting │
│ • Request size limits │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 3: Authentication & Authorization │
│ • JWT token validation (RS256) │
│ • API key authentication │
│ • Role-based access control (RBAC) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 4: Input Validation │
│ • Request schema validation (Pydantic) │
│ • SQL injection prevention (parameterized queries) │
│ • XSS prevention (output encoding) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 5: Data Protection │
│ • Encryption at rest (AES-256) │
│ • Encryption in transit (TLS 1.3) │
│ • Key management (Cloud KMS) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Layer 6: Audit & Monitoring │
│ • Structured audit logs │
│ • Security event alerting │
│ • Anomaly detection │
└─────────────────────────────────────────────────────────────┘
Implementation
1. JWT Authentication
File: backend/authentication/jwt_auth.py
import jwt
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from django.conf import settings
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
class JWTAuthentication(BaseAuthentication):
"""
JWT token authentication for API requests.
Algorithm: RS256 (asymmetric)
Expiration: 1 hour
Refresh: 7 days
"""
def authenticate(self, request):
"""
Authenticate request using JWT token.
Token location: Authorization: Bearer <token>
"""
auth_header = request.META.get('HTTP_AUTHORIZATION')
if not auth_header or not auth_header.startswith('Bearer '):
return None
token = auth_header.split(' ')[1]
try:
payload = self._decode_token(token)
user = self._get_user_from_payload(payload)
return (user, token)
except jwt.ExpiredSignatureError:
raise AuthenticationFailed('Token expired')
except jwt.InvalidTokenError:
raise AuthenticationFailed('Invalid token')
def _decode_token(self, token):
"""
Decode and validate JWT token.
Validation:
- Signature (RS256 public key)
- Expiration
- Issuer
- Audience
"""
# Load public key from Cloud KMS
public_key = self._get_public_key()
payload = jwt.decode(
token,
public_key,
algorithms=['RS256'],
issuer='coditect-license-server',
audience='coditect-api',
options={
'verify_signature': True,
'verify_exp': True,
'verify_iat': True,
'verify_iss': True,
'verify_aud': True,
'require': ['exp', 'iat', 'sub', 'iss', 'aud']
}
)
return payload
def _get_public_key(self):
"""
Fetch RSA public key from Cloud KMS.
Cached for 1 hour to reduce KMS API calls.
"""
from google.cloud import kms
client = kms.KeyManagementServiceClient()
key_name = settings.JWT_PUBLIC_KEY_NAME
public_key = client.get_public_key(request={'name': key_name})
# Parse PEM format
pem_key = serialization.load_pem_public_key(
public_key.pem.encode(),
backend=default_backend()
)
return pem_key
def _get_user_from_payload(self, payload):
"""Get user from JWT subject claim."""
from django.contrib.auth import get_user_model
User = get_user_model()
user_id = payload.get('sub')
try:
user = User.objects.get(id=user_id)
return user
except User.DoesNotExist:
raise AuthenticationFailed('User not found')
def generate_jwt_token(user):
"""
Generate JWT access token.
Claims:
- sub: User ID
- email: User email
- roles: User roles
- iat: Issued at
- exp: Expiration (1 hour)
- iss: Issuer
- aud: Audience
"""
now = datetime.utcnow()
payload = {
'sub': str(user.id),
'email': user.email,
'roles': list(user.roles.values_list('name', flat=True)),
'iat': now,
'exp': now + timedelta(hours=1),
'iss': 'coditect-license-server',
'aud': 'coditect-api'
}
# Sign with private key from Cloud KMS
private_key = _get_private_key()
token = jwt.encode(
payload,
private_key,
algorithm='RS256'
)
return token
def generate_refresh_token(user):
"""
Generate JWT refresh token.
Expiration: 7 days
Single use: Invalidated after refresh
"""
now = datetime.utcnow()
payload = {
'sub': str(user.id),
'type': 'refresh',
'iat': now,
'exp': now + timedelta(days=7),
'iss': 'coditect-license-server',
'aud': 'coditect-api'
}
private_key = _get_private_key()
token = jwt.encode(
payload,
private_key,
algorithm='RS256'
)
# Store refresh token hash in database
from .models import RefreshToken
import hashlib
token_hash = hashlib.sha256(token.encode()).hexdigest()
RefreshToken.objects.create(
user=user,
token_hash=token_hash,
expires_at=now + timedelta(days=7)
)
return token
def _get_private_key():
"""
Fetch RSA private key from Cloud KMS.
Never stored on disk - retrieved on demand.
"""
from google.cloud import kms
client = kms.KeyManagementServiceClient()
key_name = settings.JWT_PRIVATE_KEY_NAME
# Sign request to get private key
# (Cloud KMS asymmetric signing - private key never leaves HSM)
response = client.asymmetric_sign(
request={
'name': key_name,
'digest': {'sha256': b'test'},
}
)
# For JWT signing, use Cloud KMS signing directly
# This is more secure than retrieving the private key
return key_name # Return key name for KMS signing
2. Rate Limiting
File: backend/middleware/rate_limiter.py
import redis
from django.http import JsonResponse
from django.conf import settings
class RateLimitMiddleware:
"""
Token bucket rate limiting using Redis.
Limits:
- Anonymous: 100 req/minute
- Authenticated: 1000 req/minute
- Admin: Unlimited
Headers returned:
- X-RateLimit-Limit: Total limit
- X-RateLimit-Remaining: Remaining requests
- X-RateLimit-Reset: Reset timestamp
"""
def __init__(self, get_response):
self.get_response = get_response
self.redis_client = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_RATE_LIMIT_DB,
decode_responses=True
)
def __call__(self, request):
"""
Check rate limit before processing request.
"""
# Get identifier (user ID or IP)
identifier = self._get_identifier(request)
# Check if rate limited
allowed, limit_info = self._check_rate_limit(identifier, request.user)
if not allowed:
return JsonResponse({
'error': 'Rate limit exceeded',
'retry_after': limit_info['reset_in']
}, status=429)
# Process request
response = self.get_response(request)
# Add rate limit headers
response['X-RateLimit-Limit'] = limit_info['limit']
response['X-RateLimit-Remaining'] = limit_info['remaining']
response['X-RateLimit-Reset'] = limit_info['reset']
return response
def _get_identifier(self, request):
"""
Get unique identifier for rate limiting.
Priority:
1. User ID (if authenticated)
2. API key (if present)
3. IP address (fallback)
"""
if request.user.is_authenticated:
return f"user:{request.user.id}"
api_key = request.META.get('HTTP_X_API_KEY')
if api_key:
return f"api_key:{api_key}"
# Get real IP (behind load balancer)
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
return f"ip:{ip}"
def _check_rate_limit(self, identifier, user):
"""
Token bucket rate limiting.
Algorithm:
1. Check current token count
2. Refill tokens based on elapsed time
3. Consume 1 token if available
4. Return allowed/denied
"""
# Determine limit based on user type
if user.is_authenticated and user.is_staff:
# Unlimited for admins
return True, {
'limit': 'unlimited',
'remaining': 'unlimited',
'reset': None
}
if user.is_authenticated:
limit = 1000 # 1000 req/min
window = 60 # 1 minute
else:
limit = 100 # 100 req/min
window = 60 # 1 minute
# Lua script for atomic token bucket
lua_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- Get current bucket state
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or limit
local last_refill = tonumber(bucket[2]) or now
-- Calculate refill
local elapsed = now - last_refill
local refill_rate = limit / window
local new_tokens = math.min(limit, tokens + (elapsed * refill_rate))
-- Try to consume 1 token
if new_tokens >= 1 then
new_tokens = new_tokens - 1
redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
redis.call('EXPIRE', key, window * 2)
return {1, new_tokens, limit - new_tokens}
else
return {0, new_tokens, limit - new_tokens}
end
"""
import time
now = time.time()
result = self.redis_client.eval(
lua_script,
1,
f"rate_limit:{identifier}",
limit,
window,
now
)
allowed = result[0] == 1
remaining = int(result[1])
consumed = int(result[2])
# Calculate reset time
reset_in = window - (consumed / (limit / window))
return allowed, {
'limit': limit,
'remaining': remaining,
'reset': int(now + reset_in)
}
3. Encryption at Rest
File: backend/encryption/field_encryption.py
from cryptography.fernet import Fernet
from django.conf import settings
from django.db import models
import base64
class EncryptedField(models.TextField):
"""
Encrypted database field using AES-256.
Encryption: AES-256-GCM (via Fernet)
Key storage: Google Cloud KMS
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cipher = self._get_cipher()
def _get_cipher(self):
"""
Get Fernet cipher from Cloud KMS.
Key rotation: Automatic via Cloud KMS
"""
from google.cloud import kms
client = kms.KeyManagementServiceClient()
key_name = settings.FIELD_ENCRYPTION_KEY_NAME
# Decrypt data encryption key (DEK) using KMS
encrypted_dek = settings.ENCRYPTED_DEK
response = client.decrypt(
request={
'name': key_name,
'ciphertext': base64.b64decode(encrypted_dek)
}
)
dek = response.plaintext
# Create Fernet cipher
return Fernet(dek)
def from_db_value(self, value, expression, connection):
"""
Decrypt value from database.
"""
if value is None:
return value
try:
decrypted = self.cipher.decrypt(value.encode())
return decrypted.decode()
except Exception:
# Decryption failed - possibly corrupted or wrong key
return None
def get_prep_value(self, value):
"""
Encrypt value before saving to database.
"""
if value is None:
return value
encrypted = self.cipher.encrypt(value.encode())
return encrypted.decode()
# Example usage in models
class License(models.Model):
"""License model with encrypted fields."""
# Encrypted fields
license_key = EncryptedField(max_length=500)
stripe_customer_id = EncryptedField(max_length=500, null=True)
# Non-encrypted fields
tier = models.CharField(max_length=20)
expires_at = models.DateTimeField()
4. Input Validation
File: backend/validation/request_validators.py
from pydantic import BaseModel, validator, EmailStr, constr
from typing import Optional
from datetime import datetime
class LicenseValidationRequest(BaseModel):
"""
License validation request schema.
Prevents:
- SQL injection (no raw SQL)
- XSS (output encoding)
- Excessive input (max lengths)
"""
license_key: constr(min_length=32, max_length=64)
hardware_id: constr(min_length=64, max_length=64)
version: constr(regex=r'^\d+\.\d+\.\d+$', max_length=20)
@validator('license_key')
def validate_license_key(cls, v):
"""
Validate license key format.
Format: Alphanumeric + hyphens only
"""
import re
if not re.match(r'^[A-Za-z0-9\-]+$', v):
raise ValueError('Invalid license key format')
return v
@validator('hardware_id')
def validate_hardware_id(cls, v):
"""
Validate hardware ID format.
Format: SHA256 hex string
"""
import re
if not re.match(r'^[a-f0-9]{64}$', v):
raise ValueError('Invalid hardware ID format')
return v
class SeatAcquisitionRequest(BaseModel):
"""Seat acquisition request schema."""
license_key: constr(min_length=32, max_length=64)
user_email: EmailStr
session_id: constr(min_length=36, max_length=36)
@validator('session_id')
def validate_session_id(cls, v):
"""Validate session ID is valid UUID."""
import uuid
try:
uuid.UUID(v)
return v
except ValueError:
raise ValueError('Invalid session ID format')
class UserRegistrationRequest(BaseModel):
"""User registration request schema."""
email: EmailStr
password: constr(min_length=12, max_length=128)
company_name: Optional[constr(max_length=200)]
@validator('password')
def validate_password_strength(cls, v):
"""
Validate password strength.
Requirements:
- Minimum 12 characters
- At least 1 uppercase
- At least 1 lowercase
- At least 1 digit
- At least 1 special character
"""
import re
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain digit')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain special character')
return v
@validator('email')
def validate_email_not_disposable(cls, v):
"""
Prevent disposable email addresses.
Check against known disposable domains.
"""
disposable_domains = [
'guerrillamail.com',
'mailinator.com',
'tempmail.com',
'10minutemail.com'
]
domain = v.split('@')[1].lower()
if domain in disposable_domains:
raise ValueError('Disposable email addresses not allowed')
return v
# Django REST Framework endpoint with validation
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status, serializers
class LicenseValidationRequestSerializer(serializers.Serializer):
license_key = serializers.CharField(max_length=100)
hardware_id = serializers.CharField(max_length=64)
version = serializers.CharField(max_length=20)
@api_view(['POST'])
@permission_classes([AllowAny]) # License validation doesn't require authentication
def validate_license(request):
"""
Validate license with automatic input validation.
Django REST Framework serializer automatically validates:
- Field types
- String lengths
- Regex patterns
- Custom validators
Returns 400 if validation fails.
"""
# Validate request data
serializer = LicenseValidationRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# Request is already validated by DRF serializer
# No SQL injection possible - using Django ORM
from backend.license_service import LicenseService
service = LicenseService()
result = service.validate_license(
license_key=serializer.validated_data['license_key'],
hardware_id=serializer.validated_data['hardware_id'],
version=serializer.validated_data['version']
)
return Response(result, status=status.HTTP_200_OK)
5. Security Headers
File: backend/middleware/security_headers.py
class SecurityHeadersMiddleware:
"""
Add security headers to all responses.
Headers:
- Content-Security-Policy (CSP)
- Strict-Transport-Security (HSTS)
- X-Content-Type-Options
- X-Frame-Options
- X-XSS-Protection
- Referrer-Policy
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# Content Security Policy
response['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://js.stripe.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"connect-src 'self' https://api.stripe.com; "
"frame-src https://js.stripe.com; "
"object-src 'none'; "
"base-uri 'self'; "
"form-action 'self';"
)
# HSTS - Force HTTPS
response['Strict-Transport-Security'] = (
'max-age=31536000; includeSubDomains; preload'
)
# Prevent MIME sniffing
response['X-Content-Type-Options'] = 'nosniff'
# Prevent clickjacking
response['X-Frame-Options'] = 'DENY'
# XSS protection
response['X-XSS-Protection'] = '1; mode=block'
# Referrer policy
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Permissions policy
response['Permissions-Policy'] = (
'geolocation=(), microphone=(), camera=()'
)
return response
6. Audit Logging
File: backend/audit/audit_logger.py
import logging
import json
from datetime import datetime
from django.conf import settings
class AuditLogger:
"""
Structured audit logging for compliance.
Logs:
- User actions (login, logout, password change)
- License operations (validation, seat acquisition)
- Admin actions (license creation, user management)
- Security events (failed login, rate limit)
Format: JSON (Cloud Logging compatible)
Retention: 90 days (compliance requirement)
"""
def __init__(self):
self.logger = logging.getLogger('audit')
self.logger.setLevel(logging.INFO)
# Cloud Logging handler
if settings.ENVIRONMENT == 'production':
from google.cloud import logging as cloud_logging
client = cloud_logging.Client()
handler = client.get_default_handler()
self.logger.addHandler(handler)
def log_event(
self,
event_type: str,
user_id: str,
resource_type: str,
resource_id: str,
action: str,
status: str,
ip_address: str,
user_agent: str,
metadata: dict = None
):
"""
Log audit event.
Args:
event_type: 'security', 'license', 'admin', 'user'
user_id: User ID or 'anonymous'
resource_type: 'license', 'seat', 'user', 'api_key'
resource_id: Resource identifier
action: 'create', 'read', 'update', 'delete', 'validate'
status: 'success', 'failure', 'denied'
ip_address: Client IP
user_agent: Client user agent
metadata: Additional context
"""
audit_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'event_type': event_type,
'user_id': user_id,
'resource_type': resource_type,
'resource_id': resource_id,
'action': action,
'status': status,
'ip_address': ip_address,
'user_agent': user_agent,
'metadata': metadata or {}
}
self.logger.info(json.dumps(audit_entry))
def log_security_event(
self,
event_subtype: str,
user_id: str,
ip_address: str,
user_agent: str,
severity: str = 'medium',
metadata: dict = None
):
"""
Log security event.
Events:
- failed_login
- suspicious_activity
- rate_limit_exceeded
- invalid_token
- brute_force_detected
"""
security_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'event_type': 'security',
'event_subtype': event_subtype,
'user_id': user_id,
'ip_address': ip_address,
'user_agent': user_agent,
'severity': severity,
'metadata': metadata or {}
}
self.logger.warning(json.dumps(security_entry))
# Example usage
audit_logger = AuditLogger()
# Log license validation
audit_logger.log_event(
event_type='license',
user_id='user-123',
resource_type='license',
resource_id='lic-abc123',
action='validate',
status='success',
ip_address='203.0.113.1',
user_agent='CODITECT-Client/1.0',
metadata={
'hardware_id': 'abc123...',
'version': '2.0.0'
}
)
# Log failed login
audit_logger.log_security_event(
event_subtype='failed_login',
user_id='user@example.com',
ip_address='203.0.113.1',
user_agent='Mozilla/5.0...',
severity='high',
metadata={
'attempt_count': 5,
'last_attempt': datetime.utcnow().isoformat()
}
)
7. DDoS Protection
File: infrastructure/cloud-armor-policy.yaml
# Google Cloud Armor security policy
# Protects against DDoS and application attacks
name: coditect-license-api-policy
description: Cloud Armor policy for license API
# Default rule
defaultRuleAction: allow
# Security rules
rules:
# Block known bad IPs
- priority: 1000
description: "Block malicious IPs"
match:
versionedExpr: SRC_IPS_V1
config:
srcIpRanges:
- "192.0.2.0/24" # Example malicious range
action: deny(403)
# Rate limit per IP
- priority: 2000
description: "Rate limit 100 req/min per IP"
match:
expr:
expression: "true"
rateLimitOptions:
conformAction: allow
exceedAction: deny(429)
enforceOnKey: IP
rateLimitThreshold:
count: 100
intervalSec: 60
# Block SQL injection attempts
- priority: 3000
description: "Block SQL injection"
match:
expr:
expression: |
evaluatePreconfiguredExpr('sqli-stable',
['owasp-crs-v030001-id942251-sqli',
'owasp-crs-v030001-id942420-sqli',
'owasp-crs-v030001-id942431-sqli'])
action: deny(403)
# Block XSS attempts
- priority: 4000
description: "Block XSS"
match:
expr:
expression: |
evaluatePreconfiguredExpr('xss-stable',
['owasp-crs-v030001-id941150-xss',
'owasp-crs-v030001-id941320-xss'])
action: deny(403)
# Block scanners
- priority: 5000
description: "Block security scanners"
match:
expr:
expression: |
evaluatePreconfiguredExpr('scannerdetection-stable',
['owasp-crs-v030001-id913101-scannerdetection'])
action: deny(403)
# Geographic restrictions (optional)
- priority: 6000
description: "Allow only US/EU traffic"
match:
expr:
expression: |
origin.region_code != 'US' && origin.region_code != 'EU'
action: deny(403)
# Default allow
- priority: 2147483647
description: "Default allow rule"
match:
versionedExpr: SRC_IPS_V1
config:
srcIpRanges:
- "*"
action: allow
8. Database Security
File: backend/database/security_settings.py
# PostgreSQL security settings
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'coditect_licenses',
'USER': 'app_user', # Not superuser!
'PASSWORD': '', # From Cloud Secret Manager
'HOST': '/cloudsql/project:region:instance', # Unix socket
'PORT': '',
'OPTIONS': {
# SSL/TLS required
'sslmode': 'require',
'sslrootcert': '/path/to/server-ca.pem',
'sslcert': '/path/to/client-cert.pem',
'sslkey': '/path/to/client-key.pem',
# Connection limits
'connect_timeout': 10,
'options': '-c statement_timeout=30000', # 30 seconds
},
'CONN_MAX_AGE': 600, # Connection pooling
}
}
# Row-Level Security (RLS)
# Enforces tenant isolation at database level
# Migration to enable RLS
RLS_MIGRATION = """
-- Enable RLS on licenses table
ALTER TABLE licenses ENABLE ROW LEVEL SECURITY;
-- Create policy: Users can only see their own tenant's licenses
CREATE POLICY tenant_isolation_policy ON licenses
FOR ALL
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
-- Create policy: Admins can see all
CREATE POLICY admin_full_access ON licenses
FOR ALL
TO admin_role
USING (true);
-- Apply to all tenant-scoped tables
ALTER TABLE license_events ENABLE ROW LEVEL SECURITY;
ALTER TABLE seats ENABLE ROW LEVEL SECURITY;
ALTER TABLE usage_events ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation_policy ON license_events
FOR ALL
USING (
EXISTS (
SELECT 1 FROM licenses
WHERE licenses.id = license_events.license_id
AND licenses.tenant_id = current_setting('app.current_tenant_id')::uuid
)
);
"""
# Set tenant context before each query
class TenantIsolationMiddleware:
"""
Set tenant context for Row-Level Security.
Prevents cross-tenant data access at database level.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
if request.user.is_authenticated:
tenant_id = request.user.tenant_id
from django.db import connection
with connection.cursor() as cursor:
cursor.execute(
"SET LOCAL app.current_tenant_id = %s",
[str(tenant_id)]
)
response = self.get_response(request)
return response
# Prepared statements (prevent SQL injection)
# ALWAYS use Django ORM or parameterized queries
# NEVER use raw SQL with string formatting
# ❌ WRONG - SQL injection vulnerability
def get_license_wrong(license_key):
query = f"SELECT * FROM licenses WHERE license_key = '{license_key}'"
return License.objects.raw(query)
# ✅ CORRECT - Parameterized query
def get_license_correct(license_key):
return License.objects.filter(license_key=license_key).first()
# ✅ CORRECT - Raw SQL with parameters
def get_license_raw_correct(license_key):
query = "SELECT * FROM licenses WHERE license_key = %s"
return License.objects.raw(query, [license_key])
Security Checklist
Production Deployment Checklist
Before deploying to production:
- TLS 1.3 enabled on load balancer
- HTTPS redirect (HTTP → HTTPS)
- Cloud Armor configured with rules
- Rate limiting enabled (Redis)
- JWT authentication configured
- Encryption at rest enabled (Cloud KMS)
- Database SSL enforced
- Row-Level Security enabled
- Security headers middleware active
- Input validation on all endpoints
- Audit logging configured
- Secret rotation scheduled
- Dependency scanning automated (Dependabot)
- SAST scanning configured (GitHub Code Scanning)
- Container scanning enabled (GCR vulnerability scanning)
- Firewall rules configured (VPC)
- IAM least privilege enforced
- Backup encryption enabled
- Incident response plan documented
- Security training completed
Regular Security Tasks
Weekly:
- Review audit logs for anomalies
- Check failed login attempts
- Monitor rate limit violations
Monthly:
- Review Cloud Armor blocked IPs
- Update dependency versions
- Review IAM permissions
- Test backup restoration
Quarterly:
- Penetration testing
- Security audit
- Update security documentation
- Review incident response plan
Annually:
- SOC 2 Type II audit
- Rotate KMS keys
- Security awareness training
- Disaster recovery drill
Consequences
Positive
✅ Defense in Depth
- Multiple security layers protect against breaches
- No single point of failure
- Attack surface minimized
✅ Compliance Ready
- GDPR: Encryption, audit trails, data minimization
- SOC 2: Access control, logging, encryption
- HIPAA: PHI protection for healthcare customers
- PCI DSS: Secure payment handling (Stripe)
✅ Automated Protection
- Cloud Armor: Automatic DDoS mitigation
- Rate limiting: Prevent brute force
- Input validation: Block injection attacks
- Audit logging: Forensics and compliance
✅ Secure by Default
- TLS 1.3 enforced
- Security headers automatic
- Encryption at rest/transit
- Least privilege IAM
✅ Incident Response
- Comprehensive audit logs
- Real-time security alerts
- Automated threat detection
- Clear escalation path
Negative
⚠️ Performance Overhead
- Rate limiting: ~5ms latency
- Encryption: ~10ms latency
- JWT validation: ~15ms latency
- Total: ~30ms per request
⚠️ Operational Complexity
- Key rotation procedures
- Certificate management
- Security monitoring
- Incident response drills
⚠️ Development Friction
- Input validation strictness
- Security testing requirements
- Compliance documentation
- Security reviews
⚠️ Cost
- Cloud Armor: $200/month
- Cloud KMS: $50/month (key operations)
- Audit logging storage: $100/month
- Security tools: $300/month
- Total: $650/month
Related ADRs
- ADR-001: License Key Generation (KMS encryption)
- ADR-009: GCP Infrastructure Architecture (Cloud Armor, VPC)
- ADR-013: Stripe Integration (PCI DSS compliance)
- ADR-019: Monitoring and Observability (security event monitoring)
References
Last Updated: 2025-11-30 Owner: Security Team Review Cycle: Quarterly (penetration testing)