Skip to main content

ADR-020: Security Hardening

Status: Accepted Date: 2025-11-30 Deciders: Security Team, Engineering Team Tags: security, authentication, encryption, rate-limiting, compliance


Context

Security Requirements

CODITECT license server handles sensitive data and must be hardened against attacks:

Data at Risk:

  • License keys (customer IP)
  • User credentials and sessions
  • Billing information (Stripe integration)
  • Hardware IDs (device fingerprints)
  • Usage data (metering)

Attack Vectors:

  • Brute force attacks on license validation
  • DDoS attacks overwhelming API
  • SQL injection via API inputs
  • XSS attacks in web dashboard
  • Token theft (session hijacking)
  • Man-in-the-middle (MITM) attacks

Compliance Requirements:

  • GDPR: Encryption at rest/transit, data minimization
  • SOC 2: Access control, audit logging, encryption
  • HIPAA: PHI protection (if healthcare customers)
  • PCI DSS: Secure payment data handling (Stripe)

Decision

We will implement defense-in-depth security with:

  1. JWT Authentication for API access control
  2. Encryption at Rest (AES-256) and in Transit (TLS 1.3)
  3. Rate Limiting to prevent brute force and DDoS
  4. Input Validation to prevent injection attacks
  5. Security Headers (CSP, HSTS, X-Frame-Options)
  6. Audit Logging for compliance and forensics

Security Architecture

┌───────────────────────────────────────────────────────────────┐
│ Security Layers │
└───────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Network Security (GCP) │
│ • Cloud Armor (DDoS protection) │
│ • VPC firewall rules │
│ • TLS 1.3 (Load Balancer) │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ Layer 2: Application Gateway │
│ • Rate limiting (Redis) │
│ • IP whitelisting/blacklisting │
│ • Request size limits │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ Layer 3: Authentication & Authorization │
│ • JWT token validation (RS256) │
│ • API key authentication │
│ • Role-based access control (RBAC) │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ Layer 4: Input Validation │
│ • Request schema validation (Pydantic) │
│ • SQL injection prevention (parameterized queries) │
│ • XSS prevention (output encoding) │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ Layer 5: Data Protection │
│ • Encryption at rest (AES-256) │
│ • Encryption in transit (TLS 1.3) │
│ • Key management (Cloud KMS) │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ Layer 6: Audit & Monitoring │
│ • Structured audit logs │
│ • Security event alerting │
│ • Anomaly detection │
└─────────────────────────────────────────────────────────────┘

Implementation

1. JWT Authentication

File: backend/authentication/jwt_auth.py

import jwt
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from django.conf import settings
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed


class JWTAuthentication(BaseAuthentication):
"""
JWT token authentication for API requests.

Algorithm: RS256 (asymmetric)
Expiration: 1 hour
Refresh: 7 days
"""

def authenticate(self, request):
"""
Authenticate request using JWT token.

Token location: Authorization: Bearer <token>
"""
auth_header = request.META.get('HTTP_AUTHORIZATION')

if not auth_header or not auth_header.startswith('Bearer '):
return None

token = auth_header.split(' ')[1]

try:
payload = self._decode_token(token)
user = self._get_user_from_payload(payload)
return (user, token)

except jwt.ExpiredSignatureError:
raise AuthenticationFailed('Token expired')

except jwt.InvalidTokenError:
raise AuthenticationFailed('Invalid token')

def _decode_token(self, token):
"""
Decode and validate JWT token.

Validation:
- Signature (RS256 public key)
- Expiration
- Issuer
- Audience
"""
# Load public key from Cloud KMS
public_key = self._get_public_key()

payload = jwt.decode(
token,
public_key,
algorithms=['RS256'],
issuer='coditect-license-server',
audience='coditect-api',
options={
'verify_signature': True,
'verify_exp': True,
'verify_iat': True,
'verify_iss': True,
'verify_aud': True,
'require': ['exp', 'iat', 'sub', 'iss', 'aud']
}
)

return payload

def _get_public_key(self):
"""
Fetch RSA public key from Cloud KMS.

Cached for 1 hour to reduce KMS API calls.
"""
from google.cloud import kms

client = kms.KeyManagementServiceClient()
key_name = settings.JWT_PUBLIC_KEY_NAME

public_key = client.get_public_key(request={'name': key_name})

# Parse PEM format
pem_key = serialization.load_pem_public_key(
public_key.pem.encode(),
backend=default_backend()
)

return pem_key

def _get_user_from_payload(self, payload):
"""Get user from JWT subject claim."""
from django.contrib.auth import get_user_model

User = get_user_model()
user_id = payload.get('sub')

try:
user = User.objects.get(id=user_id)
return user
except User.DoesNotExist:
raise AuthenticationFailed('User not found')


def generate_jwt_token(user):
"""
Generate JWT access token.

Claims:
- sub: User ID
- email: User email
- roles: User roles
- iat: Issued at
- exp: Expiration (1 hour)
- iss: Issuer
- aud: Audience
"""
now = datetime.utcnow()

payload = {
'sub': str(user.id),
'email': user.email,
'roles': list(user.roles.values_list('name', flat=True)),
'iat': now,
'exp': now + timedelta(hours=1),
'iss': 'coditect-license-server',
'aud': 'coditect-api'
}

# Sign with private key from Cloud KMS
private_key = _get_private_key()

token = jwt.encode(
payload,
private_key,
algorithm='RS256'
)

return token


def generate_refresh_token(user):
"""
Generate JWT refresh token.

Expiration: 7 days
Single use: Invalidated after refresh
"""
now = datetime.utcnow()

payload = {
'sub': str(user.id),
'type': 'refresh',
'iat': now,
'exp': now + timedelta(days=7),
'iss': 'coditect-license-server',
'aud': 'coditect-api'
}

private_key = _get_private_key()

token = jwt.encode(
payload,
private_key,
algorithm='RS256'
)

# Store refresh token hash in database
from .models import RefreshToken
import hashlib

token_hash = hashlib.sha256(token.encode()).hexdigest()

RefreshToken.objects.create(
user=user,
token_hash=token_hash,
expires_at=now + timedelta(days=7)
)

return token


def _get_private_key():
"""
Fetch RSA private key from Cloud KMS.

Never stored on disk - retrieved on demand.
"""
from google.cloud import kms

client = kms.KeyManagementServiceClient()
key_name = settings.JWT_PRIVATE_KEY_NAME

# Sign request to get private key
# (Cloud KMS asymmetric signing - private key never leaves HSM)
response = client.asymmetric_sign(
request={
'name': key_name,
'digest': {'sha256': b'test'},
}
)

# For JWT signing, use Cloud KMS signing directly
# This is more secure than retrieving the private key
return key_name # Return key name for KMS signing

2. Rate Limiting

File: backend/middleware/rate_limiter.py

import redis
from django.http import JsonResponse
from django.conf import settings


class RateLimitMiddleware:
"""
Token bucket rate limiting using Redis.

Limits:
- Anonymous: 100 req/minute
- Authenticated: 1000 req/minute
- Admin: Unlimited

Headers returned:
- X-RateLimit-Limit: Total limit
- X-RateLimit-Remaining: Remaining requests
- X-RateLimit-Reset: Reset timestamp
"""

def __init__(self, get_response):
self.get_response = get_response
self.redis_client = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_RATE_LIMIT_DB,
decode_responses=True
)

def __call__(self, request):
"""
Check rate limit before processing request.
"""
# Get identifier (user ID or IP)
identifier = self._get_identifier(request)

# Check if rate limited
allowed, limit_info = self._check_rate_limit(identifier, request.user)

if not allowed:
return JsonResponse({
'error': 'Rate limit exceeded',
'retry_after': limit_info['reset_in']
}, status=429)

# Process request
response = self.get_response(request)

# Add rate limit headers
response['X-RateLimit-Limit'] = limit_info['limit']
response['X-RateLimit-Remaining'] = limit_info['remaining']
response['X-RateLimit-Reset'] = limit_info['reset']

return response

def _get_identifier(self, request):
"""
Get unique identifier for rate limiting.

Priority:
1. User ID (if authenticated)
2. API key (if present)
3. IP address (fallback)
"""
if request.user.is_authenticated:
return f"user:{request.user.id}"

api_key = request.META.get('HTTP_X_API_KEY')
if api_key:
return f"api_key:{api_key}"

# Get real IP (behind load balancer)
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')

return f"ip:{ip}"

def _check_rate_limit(self, identifier, user):
"""
Token bucket rate limiting.

Algorithm:
1. Check current token count
2. Refill tokens based on elapsed time
3. Consume 1 token if available
4. Return allowed/denied
"""
# Determine limit based on user type
if user.is_authenticated and user.is_staff:
# Unlimited for admins
return True, {
'limit': 'unlimited',
'remaining': 'unlimited',
'reset': None
}

if user.is_authenticated:
limit = 1000 # 1000 req/min
window = 60 # 1 minute
else:
limit = 100 # 100 req/min
window = 60 # 1 minute

# Lua script for atomic token bucket
lua_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- Get current bucket state
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or limit
local last_refill = tonumber(bucket[2]) or now

-- Calculate refill
local elapsed = now - last_refill
local refill_rate = limit / window
local new_tokens = math.min(limit, tokens + (elapsed * refill_rate))

-- Try to consume 1 token
if new_tokens >= 1 then
new_tokens = new_tokens - 1
redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
redis.call('EXPIRE', key, window * 2)
return {1, new_tokens, limit - new_tokens}
else
return {0, new_tokens, limit - new_tokens}
end
"""

import time
now = time.time()

result = self.redis_client.eval(
lua_script,
1,
f"rate_limit:{identifier}",
limit,
window,
now
)

allowed = result[0] == 1
remaining = int(result[1])
consumed = int(result[2])

# Calculate reset time
reset_in = window - (consumed / (limit / window))

return allowed, {
'limit': limit,
'remaining': remaining,
'reset': int(now + reset_in)
}

3. Encryption at Rest

File: backend/encryption/field_encryption.py

from cryptography.fernet import Fernet
from django.conf import settings
from django.db import models
import base64


class EncryptedField(models.TextField):
"""
Encrypted database field using AES-256.

Encryption: AES-256-GCM (via Fernet)
Key storage: Google Cloud KMS
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cipher = self._get_cipher()

def _get_cipher(self):
"""
Get Fernet cipher from Cloud KMS.

Key rotation: Automatic via Cloud KMS
"""
from google.cloud import kms

client = kms.KeyManagementServiceClient()
key_name = settings.FIELD_ENCRYPTION_KEY_NAME

# Decrypt data encryption key (DEK) using KMS
encrypted_dek = settings.ENCRYPTED_DEK

response = client.decrypt(
request={
'name': key_name,
'ciphertext': base64.b64decode(encrypted_dek)
}
)

dek = response.plaintext

# Create Fernet cipher
return Fernet(dek)

def from_db_value(self, value, expression, connection):
"""
Decrypt value from database.
"""
if value is None:
return value

try:
decrypted = self.cipher.decrypt(value.encode())
return decrypted.decode()
except Exception:
# Decryption failed - possibly corrupted or wrong key
return None

def get_prep_value(self, value):
"""
Encrypt value before saving to database.
"""
if value is None:
return value

encrypted = self.cipher.encrypt(value.encode())
return encrypted.decode()


# Example usage in models
class License(models.Model):
"""License model with encrypted fields."""

# Encrypted fields
license_key = EncryptedField(max_length=500)
stripe_customer_id = EncryptedField(max_length=500, null=True)

# Non-encrypted fields
tier = models.CharField(max_length=20)
expires_at = models.DateTimeField()

4. Input Validation

File: backend/validation/request_validators.py

from pydantic import BaseModel, validator, EmailStr, constr
from typing import Optional
from datetime import datetime


class LicenseValidationRequest(BaseModel):
"""
License validation request schema.

Prevents:
- SQL injection (no raw SQL)
- XSS (output encoding)
- Excessive input (max lengths)
"""

license_key: constr(min_length=32, max_length=64)
hardware_id: constr(min_length=64, max_length=64)
version: constr(regex=r'^\d+\.\d+\.\d+$', max_length=20)

@validator('license_key')
def validate_license_key(cls, v):
"""
Validate license key format.

Format: Alphanumeric + hyphens only
"""
import re
if not re.match(r'^[A-Za-z0-9\-]+$', v):
raise ValueError('Invalid license key format')
return v

@validator('hardware_id')
def validate_hardware_id(cls, v):
"""
Validate hardware ID format.

Format: SHA256 hex string
"""
import re
if not re.match(r'^[a-f0-9]{64}$', v):
raise ValueError('Invalid hardware ID format')
return v


class SeatAcquisitionRequest(BaseModel):
"""Seat acquisition request schema."""

license_key: constr(min_length=32, max_length=64)
user_email: EmailStr
session_id: constr(min_length=36, max_length=36)

@validator('session_id')
def validate_session_id(cls, v):
"""Validate session ID is valid UUID."""
import uuid
try:
uuid.UUID(v)
return v
except ValueError:
raise ValueError('Invalid session ID format')


class UserRegistrationRequest(BaseModel):
"""User registration request schema."""

email: EmailStr
password: constr(min_length=12, max_length=128)
company_name: Optional[constr(max_length=200)]

@validator('password')
def validate_password_strength(cls, v):
"""
Validate password strength.

Requirements:
- Minimum 12 characters
- At least 1 uppercase
- At least 1 lowercase
- At least 1 digit
- At least 1 special character
"""
import re

if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')

if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')

if not re.search(r'\d', v):
raise ValueError('Password must contain digit')

if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain special character')

return v

@validator('email')
def validate_email_not_disposable(cls, v):
"""
Prevent disposable email addresses.

Check against known disposable domains.
"""
disposable_domains = [
'guerrillamail.com',
'mailinator.com',
'tempmail.com',
'10minutemail.com'
]

domain = v.split('@')[1].lower()

if domain in disposable_domains:
raise ValueError('Disposable email addresses not allowed')

return v


# Django REST Framework endpoint with validation
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status, serializers

class LicenseValidationRequestSerializer(serializers.Serializer):
license_key = serializers.CharField(max_length=100)
hardware_id = serializers.CharField(max_length=64)
version = serializers.CharField(max_length=20)

@api_view(['POST'])
@permission_classes([AllowAny]) # License validation doesn't require authentication
def validate_license(request):
"""
Validate license with automatic input validation.

Django REST Framework serializer automatically validates:
- Field types
- String lengths
- Regex patterns
- Custom validators

Returns 400 if validation fails.
"""
# Validate request data
serializer = LicenseValidationRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

# Request is already validated by DRF serializer
# No SQL injection possible - using Django ORM

from backend.license_service import LicenseService

service = LicenseService()
result = service.validate_license(
license_key=serializer.validated_data['license_key'],
hardware_id=serializer.validated_data['hardware_id'],
version=serializer.validated_data['version']
)

return Response(result, status=status.HTTP_200_OK)

5. Security Headers

File: backend/middleware/security_headers.py

class SecurityHeadersMiddleware:
"""
Add security headers to all responses.

Headers:
- Content-Security-Policy (CSP)
- Strict-Transport-Security (HSTS)
- X-Content-Type-Options
- X-Frame-Options
- X-XSS-Protection
- Referrer-Policy
"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
response = self.get_response(request)

# Content Security Policy
response['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://js.stripe.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"connect-src 'self' https://api.stripe.com; "
"frame-src https://js.stripe.com; "
"object-src 'none'; "
"base-uri 'self'; "
"form-action 'self';"
)

# HSTS - Force HTTPS
response['Strict-Transport-Security'] = (
'max-age=31536000; includeSubDomains; preload'
)

# Prevent MIME sniffing
response['X-Content-Type-Options'] = 'nosniff'

# Prevent clickjacking
response['X-Frame-Options'] = 'DENY'

# XSS protection
response['X-XSS-Protection'] = '1; mode=block'

# Referrer policy
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'

# Permissions policy
response['Permissions-Policy'] = (
'geolocation=(), microphone=(), camera=()'
)

return response

6. Audit Logging

File: backend/audit/audit_logger.py

import logging
import json
from datetime import datetime
from django.conf import settings


class AuditLogger:
"""
Structured audit logging for compliance.

Logs:
- User actions (login, logout, password change)
- License operations (validation, seat acquisition)
- Admin actions (license creation, user management)
- Security events (failed login, rate limit)

Format: JSON (Cloud Logging compatible)
Retention: 90 days (compliance requirement)
"""

def __init__(self):
self.logger = logging.getLogger('audit')
self.logger.setLevel(logging.INFO)

# Cloud Logging handler
if settings.ENVIRONMENT == 'production':
from google.cloud import logging as cloud_logging
client = cloud_logging.Client()
handler = client.get_default_handler()
self.logger.addHandler(handler)

def log_event(
self,
event_type: str,
user_id: str,
resource_type: str,
resource_id: str,
action: str,
status: str,
ip_address: str,
user_agent: str,
metadata: dict = None
):
"""
Log audit event.

Args:
event_type: 'security', 'license', 'admin', 'user'
user_id: User ID or 'anonymous'
resource_type: 'license', 'seat', 'user', 'api_key'
resource_id: Resource identifier
action: 'create', 'read', 'update', 'delete', 'validate'
status: 'success', 'failure', 'denied'
ip_address: Client IP
user_agent: Client user agent
metadata: Additional context
"""
audit_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'event_type': event_type,
'user_id': user_id,
'resource_type': resource_type,
'resource_id': resource_id,
'action': action,
'status': status,
'ip_address': ip_address,
'user_agent': user_agent,
'metadata': metadata or {}
}

self.logger.info(json.dumps(audit_entry))

def log_security_event(
self,
event_subtype: str,
user_id: str,
ip_address: str,
user_agent: str,
severity: str = 'medium',
metadata: dict = None
):
"""
Log security event.

Events:
- failed_login
- suspicious_activity
- rate_limit_exceeded
- invalid_token
- brute_force_detected
"""
security_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'event_type': 'security',
'event_subtype': event_subtype,
'user_id': user_id,
'ip_address': ip_address,
'user_agent': user_agent,
'severity': severity,
'metadata': metadata or {}
}

self.logger.warning(json.dumps(security_entry))


# Example usage
audit_logger = AuditLogger()

# Log license validation
audit_logger.log_event(
event_type='license',
user_id='user-123',
resource_type='license',
resource_id='lic-abc123',
action='validate',
status='success',
ip_address='203.0.113.1',
user_agent='CODITECT-Client/1.0',
metadata={
'hardware_id': 'abc123...',
'version': '2.0.0'
}
)

# Log failed login
audit_logger.log_security_event(
event_subtype='failed_login',
user_id='user@example.com',
ip_address='203.0.113.1',
user_agent='Mozilla/5.0...',
severity='high',
metadata={
'attempt_count': 5,
'last_attempt': datetime.utcnow().isoformat()
}
)

7. DDoS Protection

File: infrastructure/cloud-armor-policy.yaml

# Google Cloud Armor security policy
# Protects against DDoS and application attacks

name: coditect-license-api-policy
description: Cloud Armor policy for license API

# Default rule
defaultRuleAction: allow

# Security rules
rules:
# Block known bad IPs
- priority: 1000
description: "Block malicious IPs"
match:
versionedExpr: SRC_IPS_V1
config:
srcIpRanges:
- "192.0.2.0/24" # Example malicious range
action: deny(403)

# Rate limit per IP
- priority: 2000
description: "Rate limit 100 req/min per IP"
match:
expr:
expression: "true"
rateLimitOptions:
conformAction: allow
exceedAction: deny(429)
enforceOnKey: IP
rateLimitThreshold:
count: 100
intervalSec: 60

# Block SQL injection attempts
- priority: 3000
description: "Block SQL injection"
match:
expr:
expression: |
evaluatePreconfiguredExpr('sqli-stable',
['owasp-crs-v030001-id942251-sqli',
'owasp-crs-v030001-id942420-sqli',
'owasp-crs-v030001-id942431-sqli'])
action: deny(403)

# Block XSS attempts
- priority: 4000
description: "Block XSS"
match:
expr:
expression: |
evaluatePreconfiguredExpr('xss-stable',
['owasp-crs-v030001-id941150-xss',
'owasp-crs-v030001-id941320-xss'])
action: deny(403)

# Block scanners
- priority: 5000
description: "Block security scanners"
match:
expr:
expression: |
evaluatePreconfiguredExpr('scannerdetection-stable',
['owasp-crs-v030001-id913101-scannerdetection'])
action: deny(403)

# Geographic restrictions (optional)
- priority: 6000
description: "Allow only US/EU traffic"
match:
expr:
expression: |
origin.region_code != 'US' && origin.region_code != 'EU'
action: deny(403)

# Default allow
- priority: 2147483647
description: "Default allow rule"
match:
versionedExpr: SRC_IPS_V1
config:
srcIpRanges:
- "*"
action: allow

8. Database Security

File: backend/database/security_settings.py

# PostgreSQL security settings

DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'coditect_licenses',
'USER': 'app_user', # Not superuser!
'PASSWORD': '', # From Cloud Secret Manager
'HOST': '/cloudsql/project:region:instance', # Unix socket
'PORT': '',
'OPTIONS': {
# SSL/TLS required
'sslmode': 'require',
'sslrootcert': '/path/to/server-ca.pem',
'sslcert': '/path/to/client-cert.pem',
'sslkey': '/path/to/client-key.pem',

# Connection limits
'connect_timeout': 10,
'options': '-c statement_timeout=30000', # 30 seconds
},
'CONN_MAX_AGE': 600, # Connection pooling
}
}

# Row-Level Security (RLS)
# Enforces tenant isolation at database level

# Migration to enable RLS
RLS_MIGRATION = """
-- Enable RLS on licenses table
ALTER TABLE licenses ENABLE ROW LEVEL SECURITY;

-- Create policy: Users can only see their own tenant's licenses
CREATE POLICY tenant_isolation_policy ON licenses
FOR ALL
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- Create policy: Admins can see all
CREATE POLICY admin_full_access ON licenses
FOR ALL
TO admin_role
USING (true);

-- Apply to all tenant-scoped tables
ALTER TABLE license_events ENABLE ROW LEVEL SECURITY;
ALTER TABLE seats ENABLE ROW LEVEL SECURITY;
ALTER TABLE usage_events ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation_policy ON license_events
FOR ALL
USING (
EXISTS (
SELECT 1 FROM licenses
WHERE licenses.id = license_events.license_id
AND licenses.tenant_id = current_setting('app.current_tenant_id')::uuid
)
);
"""

# Set tenant context before each query
class TenantIsolationMiddleware:
"""
Set tenant context for Row-Level Security.

Prevents cross-tenant data access at database level.
"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
if request.user.is_authenticated:
tenant_id = request.user.tenant_id

from django.db import connection
with connection.cursor() as cursor:
cursor.execute(
"SET LOCAL app.current_tenant_id = %s",
[str(tenant_id)]
)

response = self.get_response(request)
return response


# Prepared statements (prevent SQL injection)
# ALWAYS use Django ORM or parameterized queries
# NEVER use raw SQL with string formatting

# ❌ WRONG - SQL injection vulnerability
def get_license_wrong(license_key):
query = f"SELECT * FROM licenses WHERE license_key = '{license_key}'"
return License.objects.raw(query)

# ✅ CORRECT - Parameterized query
def get_license_correct(license_key):
return License.objects.filter(license_key=license_key).first()

# ✅ CORRECT - Raw SQL with parameters
def get_license_raw_correct(license_key):
query = "SELECT * FROM licenses WHERE license_key = %s"
return License.objects.raw(query, [license_key])

Security Checklist

Production Deployment Checklist

Before deploying to production:

  • TLS 1.3 enabled on load balancer
  • HTTPS redirect (HTTP → HTTPS)
  • Cloud Armor configured with rules
  • Rate limiting enabled (Redis)
  • JWT authentication configured
  • Encryption at rest enabled (Cloud KMS)
  • Database SSL enforced
  • Row-Level Security enabled
  • Security headers middleware active
  • Input validation on all endpoints
  • Audit logging configured
  • Secret rotation scheduled
  • Dependency scanning automated (Dependabot)
  • SAST scanning configured (GitHub Code Scanning)
  • Container scanning enabled (GCR vulnerability scanning)
  • Firewall rules configured (VPC)
  • IAM least privilege enforced
  • Backup encryption enabled
  • Incident response plan documented
  • Security training completed

Regular Security Tasks

Weekly:

  • Review audit logs for anomalies
  • Check failed login attempts
  • Monitor rate limit violations

Monthly:

  • Review Cloud Armor blocked IPs
  • Update dependency versions
  • Review IAM permissions
  • Test backup restoration

Quarterly:

  • Penetration testing
  • Security audit
  • Update security documentation
  • Review incident response plan

Annually:

  • SOC 2 Type II audit
  • Rotate KMS keys
  • Security awareness training
  • Disaster recovery drill

Consequences

Positive

Defense in Depth

  • Multiple security layers protect against breaches
  • No single point of failure
  • Attack surface minimized

Compliance Ready

  • GDPR: Encryption, audit trails, data minimization
  • SOC 2: Access control, logging, encryption
  • HIPAA: PHI protection for healthcare customers
  • PCI DSS: Secure payment handling (Stripe)

Automated Protection

  • Cloud Armor: Automatic DDoS mitigation
  • Rate limiting: Prevent brute force
  • Input validation: Block injection attacks
  • Audit logging: Forensics and compliance

Secure by Default

  • TLS 1.3 enforced
  • Security headers automatic
  • Encryption at rest/transit
  • Least privilege IAM

Incident Response

  • Comprehensive audit logs
  • Real-time security alerts
  • Automated threat detection
  • Clear escalation path

Negative

⚠️ Performance Overhead

  • Rate limiting: ~5ms latency
  • Encryption: ~10ms latency
  • JWT validation: ~15ms latency
  • Total: ~30ms per request

⚠️ Operational Complexity

  • Key rotation procedures
  • Certificate management
  • Security monitoring
  • Incident response drills

⚠️ Development Friction

  • Input validation strictness
  • Security testing requirements
  • Compliance documentation
  • Security reviews

⚠️ Cost

  • Cloud Armor: $200/month
  • Cloud KMS: $50/month (key operations)
  • Audit logging storage: $100/month
  • Security tools: $300/month
  • Total: $650/month

  • ADR-001: License Key Generation (KMS encryption)
  • ADR-009: GCP Infrastructure Architecture (Cloud Armor, VPC)
  • ADR-013: Stripe Integration (PCI DSS compliance)
  • ADR-019: Monitoring and Observability (security event monitoring)

References


Last Updated: 2025-11-30 Owner: Security Team Review Cycle: Quarterly (penetration testing)