C4-02: License Acquisition Flow (Code Level)
Overview
This C4 Code diagram provides a detailed view of the license acquisition flow at the code level, showing specific classes, methods, and interactions within the Django backend.
Purpose: Detailed code-level understanding of license acquisition Audience: Backend developers implementing license endpoints Abstraction Level: C4 Level 4 (Code)
Mermaid Class Diagram
Detailed Code Flow
Step 1: Request Reception & Validation
# backend/licenses/views.py
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status
@api_view(['POST'])
def acquire_license(request):
"""
Acquire a license seat for a CODITECT session.
**Endpoint:** POST /api/v1/license/acquire
**Request Body:**
{
"license_key": "CODITECT-XXXX-XXXX-XXXX",
"hardware_id": "a1b2c3d4e5f6...",
"user_email": "user@example.com"
}
**Response (200 OK):**
{
"session_id": "uuid",
"license_data": {...signed token...},
"expires_at": "2026-12-31T23:59:59Z",
"seats_used": 5,
"seats_total": 10
}
"""
# Step 1.1: Validate request data
serializer = LicenseAcquireRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
validated_data = serializer.validated_data
# Step 1.2: Call service layer
try:
result = LicenseService.acquire_license(
user=request.user,
license_key=validated_data['license_key'],
hardware_id=validated_data['hardware_id'],
)
return Response(result, status=status.HTTP_200_OK)
except LicenseService.NoSeatsAvailableError:
return Response(
{"detail": "No seats available"},
status=status.HTTP_402_PAYMENT_REQUIRED
)
except LicenseService.LicenseInvalidError as e:
return Response(
{"detail": str(e)},
status=status.HTTP_403_FORBIDDEN
)
Step 2: Request Serialization & Validation
# backend/licenses/serializers.py
from rest_framework import serializers
import re
class LicenseAcquireRequestSerializer(serializers.Serializer):
"""Validate license acquisition request."""
license_key = serializers.CharField(
max_length=255,
required=True,
help_text="CODITECT license key (format: CODITECT-XXXX-XXXX-XXXX)"
)
hardware_id = serializers.CharField(
max_length=255,
required=True,
help_text="SHA256 hash of hardware fingerprint"
)
user_email = serializers.EmailField(
required=True,
help_text="User email for session tracking"
)
def validate_license_key(self, value):
"""Validate license key format."""
# Must match: CODITECT-XXXX-XXXX-XXXX
pattern = r'^CODITECT-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}$'
if not re.match(pattern, value):
raise serializers.ValidationError(
"Invalid license key format. Expected: CODITECT-XXXX-XXXX-XXXX"
)
return value
def validate_hardware_id(self, value):
"""Validate hardware ID is SHA256 hash."""
if len(value) != 64: # SHA256 produces 64-character hex string
raise serializers.ValidationError(
"Hardware ID must be a 64-character SHA256 hash"
)
# Check if valid hex
try:
int(value, 16)
except ValueError:
raise serializers.ValidationError(
"Hardware ID must be a valid hexadecimal string"
)
return value
Step 3: Service Layer - License Acquisition Logic
# backend/licenses/services.py
from django.db import transaction
from django.utils import timezone
from datetime import timedelta
import logging
logger = logging.getLogger('licenses')
class LicenseService:
"""Business logic for license operations."""
class NoSeatsAvailableError(Exception):
"""Raised when no seats are available."""
pass
class LicenseInvalidError(Exception):
"""Raised when license is invalid or expired."""
pass
@classmethod
def acquire_license(cls, user, license_key, hardware_id):
"""
Acquire a license seat with atomic seat counting.
Returns:
dict: License acquisition response
"""
# Step 3.1: Verify license exists and is valid
try:
license_obj = License.objects.select_for_update().get(
license_key=license_key
)
except License.DoesNotExist:
raise cls.LicenseInvalidError("License key not found")
# Step 3.2: Verify license validity
if not cls._verify_license_validity(license_obj):
raise cls.LicenseInvalidError(
f"License is {cls._get_invalid_reason(license_obj)}"
)
# Step 3.3: Check for existing active session (same hardware)
existing_session = Session.objects.filter(
license_id=license_obj,
hardware_id=hardware_id,
status='active'
).first()
if existing_session:
logger.info(f"Reusing existing session {existing_session.id}")
return cls._build_response(existing_session, license_obj)
# Step 3.4: Atomic seat availability check (Redis Lua script)
seats_available, seats_used = RedisAtomicService.atomic_seat_check(
license_key=license_key,
seats_total=license_obj.seats_total
)
if not seats_available:
logger.warning(
f"No seats available for license {license_key} "
f"({seats_used}/{license_obj.seats_total} used)"
)
raise cls.NoSeatsAvailableError(
f"All {license_obj.seats_total} seats are in use. "
f"Please wait for a seat to become available."
)
# Step 3.5: Create session (transaction ensures atomicity)
with transaction.atomic():
session = cls._create_session(license_obj, hardware_id, user)
# Add session to Redis set
RedisAtomicService.add_session_to_set(
license_key=license_key,
session_id=session.id
)
# Set TTL (6 minutes)
RedisAtomicService.set_ttl(
key=f"license:{license_key}:active_sessions",
seconds=360
)
# Log audit trail
AuditLog.log_license_acquire(session)
# Step 3.6: Sign license token with Cloud KMS
license_data = cls._build_license_data(license_obj, session)
signed_token = CloudKMSService.sign_license_data(license_data)
license_data['signature'] = signed_token
# Step 3.7: Build and return response
return {
'session_id': str(session.id),
'license_data': license_data,
'expires_at': license_obj.expires_at.isoformat(),
'seats_used': seats_used + 1,
'seats_total': license_obj.seats_total,
}
@staticmethod
def _verify_license_validity(license_obj):
"""Check if license is valid."""
if not license_obj.is_active:
return False
if license_obj.is_suspended:
return False
if license_obj.expires_at < timezone.now():
return False
return True
@staticmethod
def _get_invalid_reason(license_obj):
"""Get reason why license is invalid."""
if not license_obj.is_active:
return "inactive"
if license_obj.is_suspended:
return "suspended"
if license_obj.expires_at < timezone.now():
return "expired"
return "unknown"
@staticmethod
def _create_session(license_obj, hardware_id, user):
"""Create a new session."""
session = Session.objects.create(
tenant_id=license_obj.tenant_id,
license_id=license_obj,
user=user,
hardware_id=hardware_id,
status='active',
last_heartbeat_at=timezone.now(),
heartbeat_count=0,
started_at=timezone.now(),
)
logger.info(f"Created session {session.id} for license {license_obj.license_key}")
return session
@staticmethod
def _build_license_data(license_obj, session):
"""Build license data for signing."""
return {
'license_key': license_obj.license_key,
'tenant_id': str(license_obj.tenant_id.id),
'tier': license_obj.license_tier,
'seats_total': license_obj.seats_total,
'expires_at': license_obj.expires_at.isoformat(),
'session_id': str(session.id),
'issued_at': timezone.now().isoformat(),
}
Step 4: Redis Atomic Seat Counting
# backend/core/redis_service.py
from django.core.cache import cache
import logging
logger = logging.getLogger('redis')
class RedisAtomicService:
"""Redis operations for atomic seat counting."""
# Lua script for atomic seat availability check
ACQUIRE_SEAT_LUA = """
local license_key = KEYS[1]
local seats_total = tonumber(ARGV[1])
local ttl = tonumber(ARGV[2])
local active_sessions_key = "license:" .. license_key .. ":active_sessions"
local seats_used = redis.call("SCARD", active_sessions_key)
if seats_used >= seats_total then
return {0, seats_used} -- No seats available
end
return {1, seats_used} -- Seat available
"""
@classmethod
def atomic_seat_check(cls, license_key, seats_total):
"""
Atomically check if a seat is available.
Returns:
tuple: (seats_available: bool, seats_used: int)
"""
redis_client = cache.client.get_client()
try:
result = redis_client.eval(
cls.ACQUIRE_SEAT_LUA,
1, # Number of KEYS
f"license:{license_key}", # KEYS[1]
seats_total, # ARGV[1]
360, # ARGV[2] (TTL: 6 minutes)
)
seats_available = bool(result[0])
seats_used = int(result[1])
logger.debug(
f"License {license_key}: {seats_used}/{seats_total} seats used, "
f"available={seats_available}"
)
return (seats_available, seats_used)
except Exception as e:
logger.error(f"Redis atomic seat check failed: {e}")
raise
@classmethod
def add_session_to_set(cls, license_key, session_id):
"""Add session ID to active sessions set."""
redis_client = cache.client.get_client()
redis_key = f"license:{license_key}:active_sessions"
redis_client.sadd(redis_key, str(session_id))
logger.debug(f"Added session {session_id} to {redis_key}")
@classmethod
def set_ttl(cls, key, seconds):
"""Set TTL on a Redis key."""
redis_client = cache.client.get_client()
redis_client.expire(key, seconds)
logger.debug(f"Set TTL {seconds}s on {key}")
Step 5: Cloud KMS License Signing
# backend/core/kms_service.py
from google.cloud import kms
import json
import hashlib
import base64
import logging
logger = logging.getLogger('kms')
class CloudKMSService:
"""Cloud KMS operations for license signing."""
def __init__(self):
self.client = kms.KeyManagementServiceClient()
self.key_name = (
"projects/coditect-citus-prod/locations/us-central1/"
"keyRings/coditect-license-keys/cryptoKeys/license-signing-key/"
"cryptoKeyVersions/1"
)
def sign_license_data(self, payload: dict) -> str:
"""
Sign license data with Cloud KMS RSA-4096 key.
Args:
payload: License data dictionary
Returns:
str: Base64-encoded signature
"""
# Step 5.1: Serialize payload (deterministic JSON)
payload_bytes = json.dumps(payload, sort_keys=True).encode('utf-8')
# Step 5.2: Create SHA-256 digest
digest = self._create_digest(payload_bytes)
# Step 5.3: Sign with Cloud KMS
try:
request = {
'name': self.key_name,
'digest': {'sha256': digest},
}
response = self.client.asymmetric_sign(request)
# Step 5.4: Base64-encode signature
signature_b64 = base64.b64encode(response.signature).decode('utf-8')
logger.info(f"Signed license with Cloud KMS key {self.key_name}")
return signature_b64
except Exception as e:
logger.error(f"Cloud KMS signing failed: {e}")
raise
def get_public_key(self) -> bytes:
"""
Get public key for local signature verification.
Returns:
bytes: Public key in PEM format
"""
request = {'name': self.key_name}
public_key = self.client.get_public_key(request)
return public_key.pem.encode('utf-8')
@staticmethod
def _create_digest(payload_bytes: bytes) -> bytes:
"""Create SHA-256 digest of payload."""
return hashlib.sha256(payload_bytes).digest()
Step 6: Audit Logging
# backend/core/audit.py
from django.utils import timezone
import json
class AuditLog:
"""Audit logging for compliance and security."""
@staticmethod
def log_license_acquire(session):
"""Log license acquisition event."""
from licenses.models import AuditLog as AuditLogModel
audit_entry = AuditLogModel.objects.create(
tenant_id=session.tenant_id,
user_id=session.user,
action='LICENSE_ACQUIRE',
resource_type='session',
resource_id=session.id,
metadata={
'license_id': str(session.license_id.id),
'license_key': session.license_id.license_key,
'hardware_id': session.hardware_id,
'status': session.status,
'started_at': session.started_at.isoformat(),
}
)
logger.info(
f"Audit log created: {audit_entry.id} "
f"(action={audit_entry.action}, session={session.id})"
)
return audit_entry
Method Call Sequence
Data Models (Detailed)
License Model
# backend/licenses/models.py
from django.db import models
from django_multitenant.models import TenantModel
from django.utils import timezone
import uuid
class License(TenantModel):
"""
Floating concurrent license with multi-tenant isolation.
Supports multiple license tiers:
- trial: 1 seat, 14 days
- starter: 3 seats, monthly
- professional: 10 seats, monthly
- enterprise: unlimited seats, annual
"""
tenant_column = 'tenant_id'
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
tenant_id = models.ForeignKey(
'tenants.Tenant',
on_delete=models.CASCADE,
related_name='licenses'
)
license_key = models.CharField(
max_length=255,
unique=True,
db_index=True,
help_text="Unique license key (format: CODITECT-XXXX-XXXX-XXXX)"
)
license_tier = models.CharField(
max_length=50,
choices=[
('trial', 'Trial'),
('starter', 'Starter'),
('professional', 'Professional'),
('enterprise', 'Enterprise'),
],
default='trial'
)
seats_total = models.PositiveIntegerField(
default=1,
help_text="Maximum concurrent seats allowed"
)
expires_at = models.DateTimeField(
help_text="License expiration timestamp"
)
is_active = models.BooleanField(
default=True,
help_text="Whether license is currently active"
)
is_suspended = models.BooleanField(
default=False,
help_text="Whether license has been suspended (e.g., payment failure)"
)
issued_at = models.DateTimeField(
auto_now_add=True,
help_text="When license was first issued"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'licenses'
indexes = [
models.Index(fields=['tenant_id', 'is_active']),
models.Index(fields=['license_key']),
models.Index(fields=['expires_at']),
]
def is_valid(self) -> bool:
"""Check if license is currently valid."""
if not self.is_active:
return False
if self.is_suspended:
return False
if self.expires_at < timezone.now():
return False
return True
def has_available_seats(self) -> bool:
"""
Check if license has available seats (via Redis).
NOTE: This is an estimate. Use atomic_seat_check for accuracy.
"""
from core.redis_service import RedisAtomicService
seats_available, _ = RedisAtomicService.atomic_seat_check(
license_key=self.license_key,
seats_total=self.seats_total
)
return seats_available
def get_seat_utilization(self) -> float:
"""Get current seat utilization percentage."""
from django.core.cache import cache
redis_key = f"license:{self.license_key}:active_sessions"
redis_client = cache.client.get_client()
seats_used = redis_client.scard(redis_key)
return (seats_used / self.seats_total) * 100.0
def days_until_expiration(self) -> int:
"""Get number of days until license expires."""
delta = self.expires_at - timezone.now()
return delta.days
def __str__(self):
return f"{self.license_key} ({self.license_tier})"
Session Model
# backend/licenses/models.py
class Session(TenantModel):
"""
Active CODITECT session with heartbeat tracking.
Sessions expire after 6 minutes without heartbeat.
Zombie sessions are automatically cleaned up by Celery task.
"""
tenant_column = 'tenant_id'
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
tenant_id = models.ForeignKey(
'tenants.Tenant',
on_delete=models.CASCADE,
related_name='sessions'
)
license_id = models.ForeignKey(
License,
on_delete=models.CASCADE,
related_name='sessions'
)
user = models.ForeignKey(
'users.User',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='sessions',
help_text="User who acquired the license (optional)"
)
hardware_id = models.CharField(
max_length=255,
help_text="SHA256 hash of hardware fingerprint"
)
hostname = models.CharField(
max_length=255,
blank=True,
null=True,
help_text="Machine hostname (optional)"
)
status = models.CharField(
max_length=50,
choices=[
('active', 'Active'),
('expired', 'Expired'),
('released', 'Released'),
('zombie', 'Zombie'),
],
default='active'
)
last_heartbeat_at = models.DateTimeField(
help_text="Timestamp of last heartbeat"
)
heartbeat_count = models.PositiveIntegerField(
default=0,
help_text="Total heartbeats received"
)
started_at = models.DateTimeField(
auto_now_add=True,
help_text="Session start time"
)
ended_at = models.DateTimeField(
null=True,
blank=True,
help_text="Session end time (if released)"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'sessions'
indexes = [
models.Index(fields=['tenant_id', 'license_id', 'status']),
models.Index(fields=['hardware_id']),
models.Index(fields=['last_heartbeat_at']),
models.Index(fields=['status', 'last_heartbeat_at']),
]
constraints = [
models.UniqueConstraint(
fields=['license_id', 'hardware_id', 'status'],
condition=models.Q(status='active'),
name='unique_active_session_per_hardware'
)
]
def is_heartbeat_alive(self, timeout_seconds=360) -> bool:
"""Check if session is still alive (heartbeat within timeout)."""
if self.status != 'active':
return False
elapsed = (timezone.now() - self.last_heartbeat_at).total_seconds()
return elapsed < timeout_seconds
def mark_zombie(self):
"""Mark session as zombie (missed heartbeat)."""
self.status = 'zombie'
self.ended_at = timezone.now()
self.save()
def release(self):
"""Release session (graceful shutdown)."""
self.status = 'released'
self.ended_at = timezone.now()
self.save()
# Remove from Redis active sessions set
from core.redis_service import RedisAtomicService
RedisAtomicService.remove_session_from_set(
license_key=self.license_id.license_key,
session_id=self.id
)
def record_heartbeat(self):
"""Record a heartbeat."""
self.last_heartbeat_at = timezone.now()
self.heartbeat_count += 1
self.save(update_fields=['last_heartbeat_at', 'heartbeat_count', 'updated_at'])
def __str__(self):
return f"Session {self.id} ({self.status})"
Error Handling
Exception Hierarchy
# backend/licenses/exceptions.py
class LicenseError(Exception):
"""Base exception for all license errors."""
pass
class LicenseNotFoundError(LicenseError):
"""License key not found."""
status_code = 404
class LicenseInvalidError(LicenseError):
"""License is invalid (expired, suspended, inactive)."""
status_code = 403
class NoSeatsAvailableError(LicenseError):
"""No seats available."""
status_code = 402
class RedisUnavailableError(LicenseError):
"""Redis is unavailable."""
status_code = 503
class CloudKMSUnavailableError(LicenseError):
"""Cloud KMS is unavailable."""
status_code = 503
Error Response Format
# backend/core/exception_handler.py
from rest_framework.views import exception_handler as drf_exception_handler
from rest_framework.response import Response
def custom_exception_handler(exc, context):
"""Custom exception handler for consistent error responses."""
# Call DRF's default exception handler first
response = drf_exception_handler(exc, context)
if response is not None:
# Customize error response format
error_response = {
'error': {
'code': exc.__class__.__name__,
'message': str(exc),
'status_code': response.status_code,
}
}
# Add field-specific errors if validation error
if hasattr(exc, 'detail') and isinstance(exc.detail, dict):
error_response['error']['fields'] = exc.detail
response.data = error_response
return response
Testing
Unit Tests
# backend/licenses/tests/test_license_acquisition.py
import pytest
from django.test import TestCase
from licenses.services import LicenseService
from licenses.models import License, Session, Tenant
from core.redis_service import RedisAtomicService
@pytest.mark.django_db
class TestLicenseAcquisition(TestCase):
"""Test license acquisition flow."""
def setUp(self):
# Create test tenant
self.tenant = Tenant.objects.create(
name="Test Tenant",
license_tier="professional",
seats_total=10
)
# Create test license
self.license = License.objects.create(
tenant_id=self.tenant,
license_key="CODITECT-TEST-1234-ABCD",
license_tier="professional",
seats_total=3,
expires_at=timezone.now() + timedelta(days=365),
is_active=True
)
# Create test user
self.user = User.objects.create(
tenant_id=self.tenant,
email="test@example.com",
role="admin"
)
def test_acquire_license_success(self):
"""Test successful license acquisition."""
hardware_id = "a1b2c3d4e5f6"
result = LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id=hardware_id
)
# Verify response
assert 'session_id' in result
assert 'license_data' in result
assert result['seats_used'] == 1
assert result['seats_total'] == 3
# Verify session created
session = Session.objects.get(id=result['session_id'])
assert session.status == 'active'
assert session.hardware_id == hardware_id
assert session.license_id == self.license
def test_acquire_license_no_seats_available(self):
"""Test license acquisition when no seats available."""
# Create 3 active sessions (all seats used)
for i in range(3):
Session.objects.create(
tenant_id=self.tenant,
license_id=self.license,
hardware_id=f"hardware_{i}",
status='active',
last_heartbeat_at=timezone.now()
)
# Attempt to acquire 4th seat
with pytest.raises(LicenseService.NoSeatsAvailableError):
LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id="hardware_4"
)
def test_acquire_license_expired(self):
"""Test license acquisition with expired license."""
# Set license to expired
self.license.expires_at = timezone.now() - timedelta(days=1)
self.license.save()
with pytest.raises(LicenseService.LicenseInvalidError) as exc_info:
LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id="hardware_1"
)
assert "expired" in str(exc_info.value).lower()
def test_acquire_license_reuse_existing_session(self):
"""Test reusing existing session for same hardware."""
hardware_id = "a1b2c3d4e5f6"
# First acquisition
result1 = LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id=hardware_id
)
# Second acquisition with same hardware_id
result2 = LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id=hardware_id
)
# Should return same session
assert result1['session_id'] == result2['session_id']
# Should only have 1 active session
active_sessions = Session.objects.filter(
license_id=self.license,
status='active'
).count()
assert active_sessions == 1
Status: Specification Complete ✅ Implementation: Backend 60% complete (views/models/serializers done) Dependencies: Redis Memorystore, Cloud KMS Estimated Remaining: 2-3 hours (services layer + tests)
Last Updated: November 30, 2025 Owner: Backend Team Reviewed By: Architecture Team