Skip to main content

Sequence Diagram: Cloud KMS License Signing Flow

Purpose: RSA-4096 asymmetric key creation, license token signing, public key distribution, and signature verification for tamper-proof offline licensing.

Actors:

  • License API (Django application)
  • Cloud KMS (key management service)
  • Secret Manager (key metadata storage)
  • CODITECT Client (local framework)
  • Cloud Monitoring (audit logging)

Flow: Key creation → License signing → Public key distribution → Local verification


Mermaid Sequence Diagram


Step-by-Step Breakdown

1. Key Creation and Setup (Steps 1-5)

Create KMS key ring and signing key:

#!/bin/bash
# scripts/setup-kms.sh
# One-time setup for Cloud KMS license signing

set -e

PROJECT_ID="coditect-cloud-infra"
REGION="us-central1"
KEY_RING="license-signing"
KEY_NAME="license-signer"

echo "Creating Cloud KMS key ring and signing key..."

# Step 1: Create key ring
gcloud kms keyrings create "$KEY_RING" \
--location="$REGION" \
--project="$PROJECT_ID"

echo "✅ Key ring created: $KEY_RING"

# Step 2: Create asymmetric signing key (RSA-4096)
gcloud kms keys create "$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--purpose=asymmetric-signing \
--default-algorithm=rsa-sign-pkcs1-4096-sha256 \
--protection-level=hsm \
--project="$PROJECT_ID"

echo "✅ Signing key created: $KEY_NAME"
echo " - Algorithm: RSA_SIGN_PKCS1_4096_SHA256"
echo " - Key size: 4096 bits"
echo " - Protection: HSM (Hardware Security Module)"

# Step 3: Get public key
PUBLIC_KEY=$(gcloud kms keys versions get-public-key 1 \
--key="$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--output-file=- \
--project="$PROJECT_ID")

echo "✅ Public key exported"

# Step 4: Store public key in Secret Manager
echo "$PUBLIC_KEY" | gcloud secrets create kms-public-key \
--data-file=- \
--replication-policy=automatic \
--project="$PROJECT_ID"

echo "✅ Public key stored in Secret Manager: kms-public-key"

# Step 5: Grant License API service account permission to sign
SERVICE_ACCOUNT="license-api@$PROJECT_ID.iam.gserviceaccount.com"

gcloud kms keys add-iam-policy-binding "$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--member="serviceAccount:$SERVICE_ACCOUNT" \
--role="roles/cloudkms.signerVerifier" \
--project="$PROJECT_ID"

echo "✅ Granted signing permission to: $SERVICE_ACCOUNT"

echo ""
echo "🎉 Cloud KMS setup complete!"
echo "Key path: projects/$PROJECT_ID/locations/$REGION/keyRings/$KEY_RING/cryptoKeys/$KEY_NAME"

OpenTofu configuration:

# opentofu/modules/kms/main.tf
resource "google_kms_key_ring" "license_signing" {
name = var.key_ring_name
location = var.region
}

resource "google_kms_crypto_key" "license_signer" {
name = var.key_name
key_ring = google_kms_key_ring.license_signing.id

purpose = "ASYMMETRIC_SIGN"

version_template {
algorithm = "RSA_SIGN_PKCS1_4096_SHA256"
protection_level = "HSM"
}

rotation_period = "7776000s" # 90 days

lifecycle {
prevent_destroy = true
}
}

# Grant License API permission to sign
resource "google_kms_crypto_key_iam_member" "license_api_signer" {
crypto_key_id = google_kms_crypto_key.license_signer.id
role = "roles/cloudkms.signerVerifier"
member = "serviceAccount:${var.service_account_email}"
}

# Output key path
output "key_name" {
description = "Full resource name of the KMS crypto key"
value = google_kms_crypto_key.license_signer.id
}

2. License Signing (Steps 6-11)

Django REST Framework license validation endpoint:

# app/api/v1/licenses.py
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework import status, serializers
from django.utils import timezone
from google.cloud import kms
import hashlib
import base64
import json
from datetime import datetime

from apps.core.services.license_service import LicenseService
from apps.core.services.kms_service import KMSService

# Request/Response Serializers
class LicenseValidationRequestSerializer(serializers.Serializer):
"""License validation request from CODITECT client."""
license_key = serializers.CharField(max_length=255)
hardware_id = serializers.CharField(max_length=255)

class LicensePayloadSerializer(serializers.Serializer):
"""License payload to be signed."""
license_key = serializers.CharField(max_length=255)
tenant_id = serializers.CharField()
tier = serializers.CharField()
max_seats = serializers.IntegerField()
hardware_id = serializers.CharField()
expires_at = serializers.DateTimeField()
issued_at = serializers.DateTimeField()

class SignedLicenseResponseSerializer(serializers.Serializer):
"""Signed license response."""
license = LicensePayloadSerializer()
signature = serializers.CharField()
algorithm = serializers.CharField()

@api_view(['POST'])
@permission_classes([AllowAny]) # License validation doesn't require auth
def validate_license(request):
"""
Validate license and return signed license token.

Flow:
1. Validate license in PostgreSQL
2. Build license payload
3. Sign payload with Cloud KMS
4. Return signed license

Returns:
SignedLicenseResponse: License payload + Cloud KMS signature

Raises:
403: License invalid or expired
409: No seats available
500: KMS signing failed
"""
# Validate request data
serializer = LicenseValidationRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"detail": "Invalid request data", "errors": serializer.errors},
status=status.HTTP_400_BAD_REQUEST
)

license_key = serializer.validated_data['license_key']
hardware_id = serializer.validated_data['hardware_id']

license_service = LicenseService()
kms_service = KMSService()

# Step 1: Validate license
license_data = license_service.validate_license(
license_key,
hardware_id
)

if not license_data:
return Response(
{"detail": "License invalid or expired"},
status=status.HTTP_403_FORBIDDEN
)

# Step 2: Build payload
payload = {
'license_key': license_key,
'tenant_id': str(license_data['tenant_id']),
'tier': license_data['tier'],
'max_seats': license_data['max_seats'],
'hardware_id': hardware_id,
'expires_at': license_data['expires_at'].isoformat(),
'issued_at': timezone.now().isoformat()
}

# Step 3: Sign with Cloud KMS
signature = kms_service.sign_license(payload)

# Step 4: Return signed license
return Response(
{
'license': payload,
'signature': signature,
'algorithm': "RSA_SIGN_PKCS1_4096_SHA256"
},
status=status.HTTP_200_OK
)

KMS signing service:

# app/services/kms_service.py
from google.cloud import kms
import hashlib
import base64
import json
import logging

logger = logging.getLogger(__name__)

class KMSService:
"""
Cloud KMS service for signing license tokens.

Configuration:
- Key ring: license-signing
- Key name: license-signer
- Algorithm: RSA_SIGN_PKCS1_4096_SHA256
- Protection: HSM (Hardware Security Module)
"""

def __init__(self, settings):
self.settings = settings
self.client = kms.KeyManagementServiceClient()

# Key path
self.key_name = (
f"projects/{settings.gcp_project_id}"
f"/locations/{settings.gcp_region}"
f"/keyRings/license-signing"
f"/cryptoKeys/license-signer"
f"/cryptoKeyVersions/1"
)

async def sign_license(self, payload: dict) -> str:
"""
Sign license payload with Cloud KMS.

Args:
payload: License payload to sign

Returns:
Base64-encoded signature (512 bytes for RSA-4096)

Raises:
Exception: KMS signing failed

Process:
1. Serialize payload to JSON (sorted keys for determinism)
2. Compute SHA-256 digest
3. Send digest to Cloud KMS for signing
4. Cloud KMS signs with private key (HSM)
5. Return base64-encoded signature
"""
try:
# Step 1: Serialize payload (deterministic)
payload_json = json.dumps(payload, sort_keys=True, separators=(',', ':'))

# Step 2: Compute SHA-256 digest
digest = hashlib.sha256(payload_json.encode()).digest()

logger.info(f"Signing license: key={payload.get('license_key')}, digest={digest.hex()[:16]}...")

# Step 3: Call Cloud KMS asymmetricSign
request = kms.AsymmetricSignRequest(
name=self.key_name,
digest=kms.Digest(sha256=digest)
)

response = self.client.asymmetric_sign(request)

# Step 4: Base64-encode signature
signature = base64.b64encode(response.signature).decode('utf-8')

logger.info(f"License signed successfully: signature_length={len(response.signature)}")

return signature

except Exception as e:
logger.error(f"KMS signing failed: {e}")
raise

async def get_public_key(self) -> str:
"""
Get public key for signature verification.

Returns:
Public key in PEM format

Note:
Public key is cached in Secret Manager for client distribution.
Clients download once and cache locally.
"""
from google.cloud import secretmanager

client = secretmanager.SecretManagerServiceClient()

name = f"projects/{self.settings.gcp_project_id}/secrets/kms-public-key/versions/latest"

response = client.access_secret_version(request={"name": name})

return response.payload.data.decode("UTF-8")

3. Public Key Distribution (Step 13)

Public key endpoint:

# app/api/v1/public_key.py
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from django.http import HttpResponse
from django.conf import settings

from apps.core.services.kms_service import KMSService

@api_view(['GET'])
@permission_classes([AllowAny]) # Public key is public information
def get_public_key(request):
"""
Get Cloud KMS public key for license signature verification.

Returns PEM-formatted RSA-4096 public key.

Clients should:
1. Download this once
2. Cache locally (~/.coditect/kms-public-key.pem)
3. Use for offline signature verification
4. Refresh periodically (weekly recommended)

Returns:
str: Public key in PEM format
"""
kms_service = KMSService(settings)

public_key = kms_service.get_public_key()

return HttpResponse(
public_key,
content_type='text/plain',
status=200
)

4. Client-Side Signature Verification (Steps 15-16)

CODITECT client verification:

# coditect/licensing/verifier.py
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.exceptions import InvalidSignature
import base64
import hashlib
import json
import logging
from pathlib import Path

logger = logging.getLogger(__name__)

class LicenseVerifier:
"""
Verify signed license tokens offline using Cloud KMS public key.

Security:
- RSA-4096 public key verification
- SHA-256 digest
- PKCS#1 v1.5 padding
- Offline-capable (no network required after initial download)
"""

def __init__(self):
self.public_key_path = Path.home() / ".coditect" / "kms-public-key.pem"
self.public_key = None

def load_public_key(self):
"""
Load Cloud KMS public key from local cache.

Public key downloaded from /api/v1/public-key and cached locally.

Raises:
FileNotFoundError: Public key not cached (need to download)
"""
if not self.public_key_path.exists():
raise FileNotFoundError(
"Cloud KMS public key not found. "
"Run: coditect license download-key"
)

with open(self.public_key_path, 'rb') as f:
pem_data = f.read()

self.public_key = serialization.load_pem_public_key(pem_data)

logger.info("Cloud KMS public key loaded")

def verify_license(self, license_payload: dict, signature: str) -> bool:
"""
Verify license signature (offline).

Args:
license_payload: License payload (dict)
signature: Base64-encoded signature from Cloud KMS

Returns:
True if signature valid, False otherwise

Process:
1. Serialize payload (same as server: sorted keys)
2. Compute SHA-256 digest
3. Decode signature from base64
4. Verify signature with public key
5. Check expiration timestamp
"""
if not self.public_key:
self.load_public_key()

try:
# Step 1: Serialize payload (deterministic)
payload_json = json.dumps(license_payload, sort_keys=True, separators=(',', ':'))

# Step 2: Compute SHA-256 digest
digest = hashlib.sha256(payload_json.encode()).digest()

# Step 3: Decode signature
signature_bytes = base64.b64decode(signature)

# Step 4: Verify signature with public key
self.public_key.verify(
signature_bytes,
digest,
padding.PKCS1v15(),
hashes.SHA256()
)

logger.info("License signature verified successfully")

# Step 5: Check expiration
from datetime import datetime, timezone

expires_at = datetime.fromisoformat(license_payload['expires_at'])

if datetime.now(timezone.utc) > expires_at:
logger.warning("License expired")
return False

return True

except InvalidSignature:
logger.error("License signature verification failed (tampered)")
return False

except Exception as e:
logger.error(f"License verification error: {e}")
return False

async def download_public_key(self, api_url: str):
"""
Download Cloud KMS public key from License API.

Args:
api_url: License API base URL (e.g., https://api.coditect.dev)

Saves to: ~/.coditect/kms-public-key.pem
"""
import httpx

url = f"{api_url}/api/v1/public-key"

logger.info(f"Downloading public key from: {url}")

async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()

public_key_pem = response.text

# Save to cache
self.public_key_path.parent.mkdir(parents=True, exist_ok=True)

with open(self.public_key_path, 'w') as f:
f.write(public_key_pem)

logger.info(f"Public key saved to: {self.public_key_path}")

CLI command for public key download:

# coditect/cli/license.py
import click
import asyncio

@click.group()
def license():
"""Manage CODITECT licenses."""
pass

@license.command()
@click.option('--api-url', default='https://api.coditect.dev', help='License API URL')
async def download_key(api_url):
"""Download Cloud KMS public key for offline verification."""
from coditect.licensing.verifier import LicenseVerifier

verifier = LicenseVerifier()

await verifier.download_public_key(api_url)

click.echo(f"✅ Public key downloaded and cached")
click.echo(f" Path: {verifier.public_key_path}")
click.echo(f" You can now verify licenses offline")

# Usage:
# coditect license download-key
# coditect license download-key --api-url https://staging.coditect.dev

5. Key Rotation (Step 17)

Automatic key rotation script:

#!/bin/bash
# scripts/rotate-kms-key.sh
# Rotate Cloud KMS signing key (run every 90 days)

set -e

PROJECT_ID="coditect-cloud-infra"
REGION="us-central1"
KEY_RING="license-signing"
KEY_NAME="license-signer"

echo "Rotating Cloud KMS signing key..."

# Step 1: Create new key version
echo "Creating new key version..."
gcloud kms keys versions create \
--key="$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--primary \
--project="$PROJECT_ID"

NEW_VERSION=$(gcloud kms keys versions list \
--key="$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--filter="state=ENABLED AND primary=true" \
--format="value(name)" \
--project="$PROJECT_ID" | head -1)

echo "✅ New key version created: $NEW_VERSION"

# Step 2: Export new public key
PUBLIC_KEY=$(gcloud kms keys versions get-public-key "$NEW_VERSION" \
--key="$KEY_NAME" \
--keyring="$KEY_RING" \
--location="$REGION" \
--output-file=- \
--project="$PROJECT_ID")

echo "✅ New public key exported"

# Step 3: Add new version to Secret Manager (keep old)
echo "$PUBLIC_KEY" | gcloud secrets versions add kms-public-key \
--data-file=- \
--project="$PROJECT_ID"

echo "✅ New public key version added to Secret Manager"

# Step 4: Notify clients (optional)
# Clients will gradually update public keys on next license validation

echo ""
echo "🎉 Key rotation complete!"
echo "New version: $NEW_VERSION"
echo "Old signatures remain valid with previous public key version"
echo "Clients will update public keys on next license validation"

Scheduled key rotation (Cloud Scheduler):

# opentofu/modules/scheduler/main.tf
resource "google_cloud_scheduler_job" "kms_key_rotation" {
name = "kms-key-rotation"
description = "Rotate Cloud KMS signing key every 90 days"
schedule = "0 0 1 */3 *" # Every 90 days (quarterly)
time_zone = "America/Los_Angeles"
attempt_deadline = "320s"

http_target {
http_method = "POST"
uri = "https://cloudbuild.googleapis.com/v1/projects/${var.project_id}/triggers/${var.rotation_trigger_id}:run"

oauth_token {
service_account_email = var.scheduler_service_account
}
}
}

Security Considerations

Key Security

Private Key Protection:

  • Stored in Cloud KMS HSM (Hardware Security Module)
  • Never exposed or exported
  • FIPS 140-2 Level 3 validated
  • Automatic backup and recovery

Public Key Distribution:

  • Cached in Secret Manager with versioning
  • Distributed to clients via HTTPS
  • Multiple versions supported (for key rotation)
  • Cached locally for offline verification

Signature Verification

Client-Side Verification:

  • RSA-4096 provides 112 bits of security
  • SHA-256 digest (256 bits)
  • PKCS#1 v1.5 padding
  • Offline-capable (no network required)

Tamper Detection:

  • Any modification to license payload invalidates signature
  • Clients reject tampered licenses immediately
  • Server logs rejected signatures for fraud detection

Audit Logging

Cloud KMS Audit Logs:

  • All signing operations logged
  • Principal (service account) recorded
  • Timestamp and key version tracked
  • Digest logged for forensics

Query audit logs:

gcloud logging read \
'resource.type="cloudkms_cryptokeyversion"
AND protoPayload.methodName="AsymmetricSign"' \
--limit=50 \
--format=json

Performance Characteristics

Cloud KMS Performance:

OperationLatency (p50)Latency (p99)Throughput
AsymmetricSign (RSA-4096)15ms50ms100 ops/sec
GetPublicKey5ms15ms500 ops/sec

Client-Side Verification:

OperationLatencyMemory
Load public key1ms4 KB
Verify signature (RSA-4096)2ms16 KB
Total verification3ms20 KB

Caching Strategy:

  • Public key cached locally (never expires)
  • Refresh recommended weekly (check for key rotation)
  • Signature verification 100% offline

Error Scenarios

Scenario 1: KMS Signing Failure

Symptoms:

  • HTTPException 500: "License signing failed"
  • Cloud KMS unavailable or rate limited

Detection:

try:
signature = await kms_service.sign_license(payload)
except Exception as e:
logger.error(f"KMS signing failed: {e}")
# Retry with exponential backoff

Recovery:

  • Retry with exponential backoff (3 attempts)
  • If persistent: Return 503 Service Unavailable
  • Monitor Cloud KMS quota and availability

Scenario 2: Public Key Not Found

Symptoms:

  • Client: FileNotFoundError ("Public key not cached")

Detection:

if not self.public_key_path.exists():
raise FileNotFoundError("Public key not found")

Recovery:

# Download public key
coditect license download-key

# Retry license validation
coditect license validate

Scenario 3: Signature Verification Failed (Tampered License)

Symptoms:

  • Client: InvalidSignature exception
  • License payload modified by attacker

Detection:

try:
self.public_key.verify(signature, digest, padding, hashes)
except InvalidSignature:
logger.error("Tampered license detected")

Response:

  • Reject license immediately
  • Block CODITECT startup
  • Log incident for fraud investigation
  • Require user to re-validate online

Cost Analysis

Cloud KMS Pricing:

ComponentPriceMonthly Cost
Active key versions$0.06/version/month$0.12 (2 versions)
Signing operations$0.03/10,000 ops$3.00 (1M ops)
Total$3.12/month

Assumptions:

  • 2 active key versions (primary + previous for rotation)
  • 1 million signatures/month (~33K/day, ~23/minute)
  • 10,000 public key retrievals/month (clients cache)

Cost scaling:

  • Linear with signing volume
  • Fixed cost for key storage ($0.06/version)
  • No charge for signature verification (client-side)

  • ADR-004: Cloud KMS License Signing (architecture decision)
  • ADR-020: Security Hardening (overall security)
  • 01-license-validation-flow.md: License validation sequence

Last Updated: 2025-11-30 Diagram Type: Sequence (Mermaid) Scope: Infrastructure - Cloud KMS license signing