Sequence Diagram: Cloud KMS License Signing Flow
Purpose: RSA-4096 asymmetric key creation, license token signing, public key distribution, and signature verification for tamper-proof offline licensing.
Actors:
- License API (Django application)
- Cloud KMS (key management service)
- Secret Manager (key metadata storage)
- CODITECT Client (local framework)
- Cloud Monitoring (audit logging)
Flow: Key creation → License signing → Public key distribution → Local verification
Mermaid Sequence Diagram
Step-by-Step Breakdown
1. Key Creation and Setup (Steps 1-5)
Create KMS key ring and signing key:
#!/bin/bash
# scripts/setup-kms.sh
# One-time setup for Cloud KMS license signing
set -e
PROJECT_ID="coditect-cloud-infra"
REGION="us-central1"
KEY_RING="license-signing"
KEY_NAME="license-signer"
echo "Creating Cloud KMS key ring and signing key..."
# Step 1: Create key ring
gcloud kms keyrings create "$KEY_RING" \
--location="$REGION" \
--project="$PROJECT_ID"
echo "✅ Key ring created: $KEY_RING"
# Step 2: Create asymmetric signing key (RSA-4096)
gcloud kms keys create "$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--purpose=asymmetric-signing \
--default-algorithm=rsa-sign-pkcs1-4096-sha256 \
--protection-level=hsm \
--project="$PROJECT_ID"
echo "✅ Signing key created: $KEY_NAME"
echo " - Algorithm: RSA_SIGN_PKCS1_4096_SHA256"
echo " - Key size: 4096 bits"
echo " - Protection: HSM (Hardware Security Module)"
# Step 3: Get public key
PUBLIC_KEY=$(gcloud kms keys versions get-public-key 1 \
--key="$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--output-file=- \
--project="$PROJECT_ID")
echo "✅ Public key exported"
# Step 4: Store public key in Secret Manager
echo "$PUBLIC_KEY" | gcloud secrets create kms-public-key \
--data-file=- \
--replication-policy=automatic \
--project="$PROJECT_ID"
echo "✅ Public key stored in Secret Manager: kms-public-key"
# Step 5: Grant License API service account permission to sign
SERVICE_ACCOUNT="license-api@$PROJECT_ID.iam.gserviceaccount.com"
gcloud kms keys add-iam-policy-binding "$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--member="serviceAccount:$SERVICE_ACCOUNT" \
--role="roles/cloudkms.signerVerifier" \
--project="$PROJECT_ID"
echo "✅ Granted signing permission to: $SERVICE_ACCOUNT"
echo ""
echo "🎉 Cloud KMS setup complete!"
echo "Key path: projects/$PROJECT_ID/locations/$REGION/keyRings/$KEY_RING/cryptoKeys/$KEY_NAME"
OpenTofu configuration:
# opentofu/modules/kms/main.tf
resource "google_kms_key_ring" "license_signing" {
name = var.key_ring_name
location = var.region
}
resource "google_kms_crypto_key" "license_signer" {
name = var.key_name
key_ring = google_kms_key_ring.license_signing.id
purpose = "ASYMMETRIC_SIGN"
version_template {
algorithm = "RSA_SIGN_PKCS1_4096_SHA256"
protection_level = "HSM"
}
rotation_period = "7776000s" # 90 days
lifecycle {
prevent_destroy = true
}
}
# Grant License API permission to sign
resource "google_kms_crypto_key_iam_member" "license_api_signer" {
crypto_key_id = google_kms_crypto_key.license_signer.id
role = "roles/cloudkms.signerVerifier"
member = "serviceAccount:${var.service_account_email}"
}
# Output key path
output "key_name" {
description = "Full resource name of the KMS crypto key"
value = google_kms_crypto_key.license_signer.id
}
2. License Signing (Steps 6-11)
Django REST Framework license validation endpoint:
# app/api/v1/licenses.py
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status, serializers
from django.utils import timezone
from google.cloud import kms
import hashlib
import base64
import json
from datetime import datetime
from apps.core.services.license_service import LicenseService
from apps.core.services.kms_service import KMSService
# Request/Response Serializers
class LicenseValidationRequestSerializer(serializers.Serializer):
"""License validation request from CODITECT client."""
license_key = serializers.CharField(max_length=255)
hardware_id = serializers.CharField(max_length=255)
class LicensePayloadSerializer(serializers.Serializer):
"""License payload to be signed."""
license_key = serializers.CharField(max_length=255)
tenant_id = serializers.CharField()
tier = serializers.CharField()
max_seats = serializers.IntegerField()
hardware_id = serializers.CharField()
expires_at = serializers.DateTimeField()
issued_at = serializers.DateTimeField()
class SignedLicenseResponseSerializer(serializers.Serializer):
"""Signed license response."""
license = LicensePayloadSerializer()
signature = serializers.CharField()
algorithm = serializers.CharField()
@api_view(['POST'])
@permission_classes([AllowAny]) # License validation doesn't require auth
def validate_license(request):
"""
Validate license and return signed license token.
Flow:
1. Validate license in PostgreSQL
2. Build license payload
3. Sign payload with Cloud KMS
4. Return signed license
Returns:
SignedLicenseResponse: License payload + Cloud KMS signature
Raises:
403: License invalid or expired
409: No seats available
500: KMS signing failed
"""
# Validate request data
serializer = LicenseValidationRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"detail": "Invalid request data", "errors": serializer.errors},
status=status.HTTP_400_BAD_REQUEST
)
license_key = serializer.validated_data['license_key']
hardware_id = serializer.validated_data['hardware_id']
license_service = LicenseService()
kms_service = KMSService()
# Step 1: Validate license
license_data = license_service.validate_license(
license_key,
hardware_id
)
if not license_data:
return Response(
{"detail": "License invalid or expired"},
status=status.HTTP_403_FORBIDDEN
)
# Step 2: Build payload
payload = {
'license_key': license_key,
'tenant_id': str(license_data['tenant_id']),
'tier': license_data['tier'],
'max_seats': license_data['max_seats'],
'hardware_id': hardware_id,
'expires_at': license_data['expires_at'].isoformat(),
'issued_at': timezone.now().isoformat()
}
# Step 3: Sign with Cloud KMS
signature = kms_service.sign_license(payload)
# Step 4: Return signed license
return Response(
{
'license': payload,
'signature': signature,
'algorithm': "RSA_SIGN_PKCS1_4096_SHA256"
},
status=status.HTTP_200_OK
)
KMS signing service:
# app/services/kms_service.py
from google.cloud import kms
import hashlib
import base64
import json
import logging
logger = logging.getLogger(__name__)
class KMSService:
"""
Cloud KMS service for signing license tokens.
Configuration:
- Key ring: license-signing
- Key name: license-signer
- Algorithm: RSA_SIGN_PKCS1_4096_SHA256
- Protection: HSM (Hardware Security Module)
"""
def __init__(self, settings):
self.settings = settings
self.client = kms.KeyManagementServiceClient()
# Key path
self.key_name = (
f"projects/{settings.gcp_project_id}"
f"/locations/{settings.gcp_region}"
f"/keyRings/license-signing"
f"/cryptoKeys/license-signer"
f"/cryptoKeyVersions/1"
)
async def sign_license(self, payload: dict) -> str:
"""
Sign license payload with Cloud KMS.
Args:
payload: License payload to sign
Returns:
Base64-encoded signature (512 bytes for RSA-4096)
Raises:
Exception: KMS signing failed
Process:
1. Serialize payload to JSON (sorted keys for determinism)
2. Compute SHA-256 digest
3. Send digest to Cloud KMS for signing
4. Cloud KMS signs with private key (HSM)
5. Return base64-encoded signature
"""
try:
# Step 1: Serialize payload (deterministic)
payload_json = json.dumps(payload, sort_keys=True, separators=(',', ':'))
# Step 2: Compute SHA-256 digest
digest = hashlib.sha256(payload_json.encode()).digest()
logger.info(f"Signing license: key={payload.get('license_key')}, digest={digest.hex()[:16]}...")
# Step 3: Call Cloud KMS asymmetricSign
request = kms.AsymmetricSignRequest(
name=self.key_name,
digest=kms.Digest(sha256=digest)
)
response = self.client.asymmetric_sign(request)
# Step 4: Base64-encode signature
signature = base64.b64encode(response.signature).decode('utf-8')
logger.info(f"License signed successfully: signature_length={len(response.signature)}")
return signature
except Exception as e:
logger.error(f"KMS signing failed: {e}")
raise
async def get_public_key(self) -> str:
"""
Get public key for signature verification.
Returns:
Public key in PEM format
Note:
Public key is cached in Secret Manager for client distribution.
Clients download once and cache locally.
"""
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{self.settings.gcp_project_id}/secrets/kms-public-key/versions/latest"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")
3. Public Key Distribution (Step 13)
Public key endpoint:
# app/api/v1/public_key.py
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from django.http import HttpResponse
from django.conf import settings
from apps.core.services.kms_service import KMSService
@api_view(['GET'])
@permission_classes([AllowAny]) # Public key is public information
def get_public_key(request):
"""
Get Cloud KMS public key for license signature verification.
Returns PEM-formatted RSA-4096 public key.
Clients should:
1. Download this once
2. Cache locally (~/.coditect/kms-public-key.pem)
3. Use for offline signature verification
4. Refresh periodically (weekly recommended)
Returns:
str: Public key in PEM format
"""
kms_service = KMSService(settings)
public_key = kms_service.get_public_key()
return HttpResponse(
public_key,
content_type='text/plain',
status=200
)
4. Client-Side Signature Verification (Steps 15-16)
CODITECT client verification:
# coditect/licensing/verifier.py
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.exceptions import InvalidSignature
import base64
import hashlib
import json
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class LicenseVerifier:
"""
Verify signed license tokens offline using Cloud KMS public key.
Security:
- RSA-4096 public key verification
- SHA-256 digest
- PKCS#1 v1.5 padding
- Offline-capable (no network required after initial download)
"""
def __init__(self):
self.public_key_path = Path.home() / ".coditect" / "kms-public-key.pem"
self.public_key = None
def load_public_key(self):
"""
Load Cloud KMS public key from local cache.
Public key downloaded from /api/v1/public-key and cached locally.
Raises:
FileNotFoundError: Public key not cached (need to download)
"""
if not self.public_key_path.exists():
raise FileNotFoundError(
"Cloud KMS public key not found. "
"Run: coditect license download-key"
)
with open(self.public_key_path, 'rb') as f:
pem_data = f.read()
self.public_key = serialization.load_pem_public_key(pem_data)
logger.info("Cloud KMS public key loaded")
def verify_license(self, license_payload: dict, signature: str) -> bool:
"""
Verify license signature (offline).
Args:
license_payload: License payload (dict)
signature: Base64-encoded signature from Cloud KMS
Returns:
True if signature valid, False otherwise
Process:
1. Serialize payload (same as server: sorted keys)
2. Compute SHA-256 digest
3. Decode signature from base64
4. Verify signature with public key
5. Check expiration timestamp
"""
if not self.public_key:
self.load_public_key()
try:
# Step 1: Serialize payload (deterministic)
payload_json = json.dumps(license_payload, sort_keys=True, separators=(',', ':'))
# Step 2: Compute SHA-256 digest
digest = hashlib.sha256(payload_json.encode()).digest()
# Step 3: Decode signature
signature_bytes = base64.b64decode(signature)
# Step 4: Verify signature with public key
self.public_key.verify(
signature_bytes,
digest,
padding.PKCS1v15(),
hashes.SHA256()
)
logger.info("License signature verified successfully")
# Step 5: Check expiration
from datetime import datetime, timezone
expires_at = datetime.fromisoformat(license_payload['expires_at'])
if datetime.now(timezone.utc) > expires_at:
logger.warning("License expired")
return False
return True
except InvalidSignature:
logger.error("License signature verification failed (tampered)")
return False
except Exception as e:
logger.error(f"License verification error: {e}")
return False
async def download_public_key(self, api_url: str):
"""
Download Cloud KMS public key from License API.
Args:
api_url: License API base URL (e.g., https://api.coditect.dev)
Saves to: ~/.coditect/kms-public-key.pem
"""
import httpx
url = f"{api_url}/api/v1/public-key"
logger.info(f"Downloading public key from: {url}")
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
public_key_pem = response.text
# Save to cache
self.public_key_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.public_key_path, 'w') as f:
f.write(public_key_pem)
logger.info(f"Public key saved to: {self.public_key_path}")
CLI command for public key download:
# coditect/cli/license.py
import click
import asyncio
@click.group()
def license():
"""Manage CODITECT licenses."""
pass
@license.command()
@click.option('--api-url', default='https://api.coditect.dev', help='License API URL')
async def download_key(api_url):
"""Download Cloud KMS public key for offline verification."""
from coditect.licensing.verifier import LicenseVerifier
verifier = LicenseVerifier()
await verifier.download_public_key(api_url)
click.echo(f"✅ Public key downloaded and cached")
click.echo(f" Path: {verifier.public_key_path}")
click.echo(f" You can now verify licenses offline")
# Usage:
# coditect license download-key
# coditect license download-key --api-url https://staging.coditect.dev
5. Key Rotation (Step 17)
Automatic key rotation script:
#!/bin/bash
# scripts/rotate-kms-key.sh
# Rotate Cloud KMS signing key (run every 90 days)
set -e
PROJECT_ID="coditect-cloud-infra"
REGION="us-central1"
KEY_RING="license-signing"
KEY_NAME="license-signer"
echo "Rotating Cloud KMS signing key..."
# Step 1: Create new key version
echo "Creating new key version..."
gcloud kms keys versions create \
--key="$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--primary \
--project="$PROJECT_ID"
NEW_VERSION=$(gcloud kms keys versions list \
--key="$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--filter="state=ENABLED AND primary=true" \
--format="value(name)" \
--project="$PROJECT_ID" | head -1)
echo "✅ New key version created: $NEW_VERSION"
# Step 2: Export new public key
PUBLIC_KEY=$(gcloud kms keys versions get-public-key "$NEW_VERSION" \
--key="$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--output-file=- \
--project="$PROJECT_ID")
echo "✅ New public key exported"
# Step 3: Add new version to Secret Manager (keep old)
echo "$PUBLIC_KEY" | gcloud secrets versions add kms-public-key \
--data-file=- \
--project="$PROJECT_ID"
echo "✅ New public key version added to Secret Manager"
# Step 4: Notify clients (optional)
# Clients will gradually update public keys on next license validation
echo ""
echo "🎉 Key rotation complete!"
echo "New version: $NEW_VERSION"
echo "Old signatures remain valid with previous public key version"
echo "Clients will update public keys on next license validation"
Scheduled key rotation (Cloud Scheduler):
# opentofu/modules/scheduler/main.tf
resource "google_cloud_scheduler_job" "kms_key_rotation" {
name = "kms-key-rotation"
description = "Rotate Cloud KMS signing key every 90 days"
schedule = "0 0 1 */3 *" # Every 90 days (quarterly)
time_zone = "America/Los_Angeles"
attempt_deadline = "320s"
http_target {
http_method = "POST"
uri = "https://cloudbuild.googleapis.com/v1/projects/${var.project_id}/triggers/${var.rotation_trigger_id}:run"
oauth_token {
service_account_email = var.scheduler_service_account
}
}
}
Security Considerations
Key Security
Private Key Protection:
- Stored in Cloud KMS HSM (Hardware Security Module)
- Never exposed or exported
- FIPS 140-2 Level 3 validated
- Automatic backup and recovery
Public Key Distribution:
- Cached in Secret Manager with versioning
- Distributed to clients via HTTPS
- Multiple versions supported (for key rotation)
- Cached locally for offline verification
Signature Verification
Client-Side Verification:
- RSA-4096 provides 112 bits of security
- SHA-256 digest (256 bits)
- PKCS#1 v1.5 padding
- Offline-capable (no network required)
Tamper Detection:
- Any modification to license payload invalidates signature
- Clients reject tampered licenses immediately
- Server logs rejected signatures for fraud detection
Audit Logging
Cloud KMS Audit Logs:
- All signing operations logged
- Principal (service account) recorded
- Timestamp and key version tracked
- Digest logged for forensics
Query audit logs:
gcloud logging read \
'resource.type="cloudkms_cryptokeyversion"
AND protoPayload.methodName="AsymmetricSign"' \
--limit=50 \
--format=json
Performance Characteristics
Cloud KMS Performance:
| Operation | Latency (p50) | Latency (p99) | Throughput |
|---|---|---|---|
| AsymmetricSign (RSA-4096) | 15ms | 50ms | 100 ops/sec |
| GetPublicKey | 5ms | 15ms | 500 ops/sec |
Client-Side Verification:
| Operation | Latency | Memory |
|---|---|---|
| Load public key | 1ms | 4 KB |
| Verify signature (RSA-4096) | 2ms | 16 KB |
| Total verification | 3ms | 20 KB |
Caching Strategy:
- Public key cached locally (never expires)
- Refresh recommended weekly (check for key rotation)
- Signature verification 100% offline
Error Scenarios
Scenario 1: KMS Signing Failure
Symptoms:
- HTTPException 500: "License signing failed"
- Cloud KMS unavailable or rate limited
Detection:
try:
signature = await kms_service.sign_license(payload)
except Exception as e:
logger.error(f"KMS signing failed: {e}")
# Retry with exponential backoff
Recovery:
- Retry with exponential backoff (3 attempts)
- If persistent: Return 503 Service Unavailable
- Monitor Cloud KMS quota and availability
Scenario 2: Public Key Not Found
Symptoms:
- Client: FileNotFoundError ("Public key not cached")
Detection:
if not self.public_key_path.exists():
raise FileNotFoundError("Public key not found")
Recovery:
# Download public key
coditect license download-key
# Retry license validation
coditect license validate
Scenario 3: Signature Verification Failed (Tampered License)
Symptoms:
- Client: InvalidSignature exception
- License payload modified by attacker
Detection:
try:
self.public_key.verify(signature, digest, padding, hashes)
except InvalidSignature:
logger.error("Tampered license detected")
Response:
- Reject license immediately
- Block CODITECT startup
- Log incident for fraud investigation
- Require user to re-validate online
Cost Analysis
Cloud KMS Pricing:
| Component | Price | Monthly Cost |
|---|---|---|
| Active key versions | $0.06/version/month | $0.12 (2 versions) |
| Signing operations | $0.03/10,000 ops | $3.00 (1M ops) |
| Total | $3.12/month |
Assumptions:
- 2 active key versions (primary + previous for rotation)
- 1 million signatures/month (~33K/day, ~23/minute)
- 10,000 public key retrievals/month (clients cache)
Cost scaling:
- Linear with signing volume
- Fixed cost for key storage ($0.06/version)
- No charge for signature verification (client-side)
Related Documentation
- ADR-004: Cloud KMS License Signing (architecture decision)
- ADR-020: Security Hardening (overall security)
- 01-license-validation-flow.md: License validation sequence
Last Updated: 2025-11-30 Diagram Type: Sequence (Mermaid) Scope: Infrastructure - Cloud KMS license signing