Skip to main content

C4-02: License Acquisition Flow (Code Level)

Overview

This C4 Code diagram provides a detailed view of the license acquisition flow at the code level, showing specific classes, methods, and interactions within the Django backend.

Purpose: Detailed code-level understanding of license acquisition Audience: Backend developers implementing license endpoints Abstraction Level: C4 Level 4 (Code)


Mermaid Class Diagram


Detailed Code Flow

Step 1: Request Reception & Validation

# backend/licenses/views.py

from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status

@api_view(['POST'])
def acquire_license(request):
"""
Acquire a license seat for a CODITECT session.

**Endpoint:** POST /api/v1/license/acquire

**Request Body:**
{
"license_key": "CODITECT-XXXX-XXXX-XXXX",
"hardware_id": "a1b2c3d4e5f6...",
"user_email": "user@example.com"
}

**Response (200 OK):**
{
"session_id": "uuid",
"license_data": {...signed token...},
"expires_at": "2026-12-31T23:59:59Z",
"seats_used": 5,
"seats_total": 10
}
"""
# Step 1.1: Validate request data
serializer = LicenseAcquireRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

validated_data = serializer.validated_data

# Step 1.2: Call service layer
try:
result = LicenseService.acquire_license(
user=request.user,
license_key=validated_data['license_key'],
hardware_id=validated_data['hardware_id'],
)
return Response(result, status=status.HTTP_200_OK)

except LicenseService.NoSeatsAvailableError:
return Response(
{"detail": "No seats available"},
status=status.HTTP_402_PAYMENT_REQUIRED
)

except LicenseService.LicenseInvalidError as e:
return Response(
{"detail": str(e)},
status=status.HTTP_403_FORBIDDEN
)

Step 2: Request Serialization & Validation

# backend/licenses/serializers.py

from rest_framework import serializers
import re

class LicenseAcquireRequestSerializer(serializers.Serializer):
"""Validate license acquisition request."""

license_key = serializers.CharField(
max_length=255,
required=True,
help_text="CODITECT license key (format: CODITECT-XXXX-XXXX-XXXX)"
)

hardware_id = serializers.CharField(
max_length=255,
required=True,
help_text="SHA256 hash of hardware fingerprint"
)

user_email = serializers.EmailField(
required=True,
help_text="User email for session tracking"
)

def validate_license_key(self, value):
"""Validate license key format."""
# Must match: CODITECT-XXXX-XXXX-XXXX
pattern = r'^CODITECT-[A-Z0-9]{4}-[A-Z0-9]{4}-[A-Z0-9]{4}$'
if not re.match(pattern, value):
raise serializers.ValidationError(
"Invalid license key format. Expected: CODITECT-XXXX-XXXX-XXXX"
)
return value

def validate_hardware_id(self, value):
"""Validate hardware ID is SHA256 hash."""
if len(value) != 64: # SHA256 produces 64-character hex string
raise serializers.ValidationError(
"Hardware ID must be a 64-character SHA256 hash"
)

# Check if valid hex
try:
int(value, 16)
except ValueError:
raise serializers.ValidationError(
"Hardware ID must be a valid hexadecimal string"
)

return value

Step 3: Service Layer - License Acquisition Logic

# backend/licenses/services.py

from django.db import transaction
from django.utils import timezone
from datetime import timedelta
import logging

logger = logging.getLogger('licenses')

class LicenseService:
"""Business logic for license operations."""

class NoSeatsAvailableError(Exception):
"""Raised when no seats are available."""
pass

class LicenseInvalidError(Exception):
"""Raised when license is invalid or expired."""
pass

@classmethod
def acquire_license(cls, user, license_key, hardware_id):
"""
Acquire a license seat with atomic seat counting.

Returns:
dict: License acquisition response
"""
# Step 3.1: Verify license exists and is valid
try:
license_obj = License.objects.select_for_update().get(
license_key=license_key
)
except License.DoesNotExist:
raise cls.LicenseInvalidError("License key not found")

# Step 3.2: Verify license validity
if not cls._verify_license_validity(license_obj):
raise cls.LicenseInvalidError(
f"License is {cls._get_invalid_reason(license_obj)}"
)

# Step 3.3: Check for existing active session (same hardware)
existing_session = Session.objects.filter(
license_id=license_obj,
hardware_id=hardware_id,
status='active'
).first()

if existing_session:
logger.info(f"Reusing existing session {existing_session.id}")
return cls._build_response(existing_session, license_obj)

# Step 3.4: Atomic seat availability check (Redis Lua script)
seats_available, seats_used = RedisAtomicService.atomic_seat_check(
license_key=license_key,
seats_total=license_obj.seats_total
)

if not seats_available:
logger.warning(
f"No seats available for license {license_key} "
f"({seats_used}/{license_obj.seats_total} used)"
)
raise cls.NoSeatsAvailableError(
f"All {license_obj.seats_total} seats are in use. "
f"Please wait for a seat to become available."
)

# Step 3.5: Create session (transaction ensures atomicity)
with transaction.atomic():
session = cls._create_session(license_obj, hardware_id, user)

# Add session to Redis set
RedisAtomicService.add_session_to_set(
license_key=license_key,
session_id=session.id
)

# Set TTL (6 minutes)
RedisAtomicService.set_ttl(
key=f"license:{license_key}:active_sessions",
seconds=360
)

# Log audit trail
AuditLog.log_license_acquire(session)

# Step 3.6: Sign license token with Cloud KMS
license_data = cls._build_license_data(license_obj, session)
signed_token = CloudKMSService.sign_license_data(license_data)
license_data['signature'] = signed_token

# Step 3.7: Build and return response
return {
'session_id': str(session.id),
'license_data': license_data,
'expires_at': license_obj.expires_at.isoformat(),
'seats_used': seats_used + 1,
'seats_total': license_obj.seats_total,
}

@staticmethod
def _verify_license_validity(license_obj):
"""Check if license is valid."""
if not license_obj.is_active:
return False
if license_obj.is_suspended:
return False
if license_obj.expires_at < timezone.now():
return False
return True

@staticmethod
def _get_invalid_reason(license_obj):
"""Get reason why license is invalid."""
if not license_obj.is_active:
return "inactive"
if license_obj.is_suspended:
return "suspended"
if license_obj.expires_at < timezone.now():
return "expired"
return "unknown"

@staticmethod
def _create_session(license_obj, hardware_id, user):
"""Create a new session."""
session = Session.objects.create(
tenant_id=license_obj.tenant_id,
license_id=license_obj,
user=user,
hardware_id=hardware_id,
status='active',
last_heartbeat_at=timezone.now(),
heartbeat_count=0,
started_at=timezone.now(),
)

logger.info(f"Created session {session.id} for license {license_obj.license_key}")

return session

@staticmethod
def _build_license_data(license_obj, session):
"""Build license data for signing."""
return {
'license_key': license_obj.license_key,
'tenant_id': str(license_obj.tenant_id.id),
'tier': license_obj.license_tier,
'seats_total': license_obj.seats_total,
'expires_at': license_obj.expires_at.isoformat(),
'session_id': str(session.id),
'issued_at': timezone.now().isoformat(),
}

Step 4: Redis Atomic Seat Counting

# backend/core/redis_service.py

from django.core.cache import cache
import logging

logger = logging.getLogger('redis')

class RedisAtomicService:
"""Redis operations for atomic seat counting."""

# Lua script for atomic seat availability check
ACQUIRE_SEAT_LUA = """
local license_key = KEYS[1]
local seats_total = tonumber(ARGV[1])
local ttl = tonumber(ARGV[2])

local active_sessions_key = "license:" .. license_key .. ":active_sessions"
local seats_used = redis.call("SCARD", active_sessions_key)

if seats_used >= seats_total then
return {0, seats_used} -- No seats available
end

return {1, seats_used} -- Seat available
"""

@classmethod
def atomic_seat_check(cls, license_key, seats_total):
"""
Atomically check if a seat is available.

Returns:
tuple: (seats_available: bool, seats_used: int)
"""
redis_client = cache.client.get_client()

try:
result = redis_client.eval(
cls.ACQUIRE_SEAT_LUA,
1, # Number of KEYS
f"license:{license_key}", # KEYS[1]
seats_total, # ARGV[1]
360, # ARGV[2] (TTL: 6 minutes)
)

seats_available = bool(result[0])
seats_used = int(result[1])

logger.debug(
f"License {license_key}: {seats_used}/{seats_total} seats used, "
f"available={seats_available}"
)

return (seats_available, seats_used)

except Exception as e:
logger.error(f"Redis atomic seat check failed: {e}")
raise

@classmethod
def add_session_to_set(cls, license_key, session_id):
"""Add session ID to active sessions set."""
redis_client = cache.client.get_client()

redis_key = f"license:{license_key}:active_sessions"
redis_client.sadd(redis_key, str(session_id))

logger.debug(f"Added session {session_id} to {redis_key}")

@classmethod
def set_ttl(cls, key, seconds):
"""Set TTL on a Redis key."""
redis_client = cache.client.get_client()
redis_client.expire(key, seconds)

logger.debug(f"Set TTL {seconds}s on {key}")

Step 5: Cloud KMS License Signing

# backend/core/kms_service.py

from google.cloud import kms
import json
import hashlib
import base64
import logging

logger = logging.getLogger('kms')

class CloudKMSService:
"""Cloud KMS operations for license signing."""

def __init__(self):
self.client = kms.KeyManagementServiceClient()
self.key_name = (
"projects/coditect-citus-prod/locations/us-central1/"
"keyRings/coditect-license-keys/cryptoKeys/license-signing-key/"
"cryptoKeyVersions/1"
)

def sign_license_data(self, payload: dict) -> str:
"""
Sign license data with Cloud KMS RSA-4096 key.

Args:
payload: License data dictionary

Returns:
str: Base64-encoded signature
"""
# Step 5.1: Serialize payload (deterministic JSON)
payload_bytes = json.dumps(payload, sort_keys=True).encode('utf-8')

# Step 5.2: Create SHA-256 digest
digest = self._create_digest(payload_bytes)

# Step 5.3: Sign with Cloud KMS
try:
request = {
'name': self.key_name,
'digest': {'sha256': digest},
}

response = self.client.asymmetric_sign(request)

# Step 5.4: Base64-encode signature
signature_b64 = base64.b64encode(response.signature).decode('utf-8')

logger.info(f"Signed license with Cloud KMS key {self.key_name}")

return signature_b64

except Exception as e:
logger.error(f"Cloud KMS signing failed: {e}")
raise

def get_public_key(self) -> bytes:
"""
Get public key for local signature verification.

Returns:
bytes: Public key in PEM format
"""
request = {'name': self.key_name}
public_key = self.client.get_public_key(request)
return public_key.pem.encode('utf-8')

@staticmethod
def _create_digest(payload_bytes: bytes) -> bytes:
"""Create SHA-256 digest of payload."""
return hashlib.sha256(payload_bytes).digest()

Step 6: Audit Logging

# backend/core/audit.py

from django.utils import timezone
import json

class AuditLog:
"""Audit logging for compliance and security."""

@staticmethod
def log_license_acquire(session):
"""Log license acquisition event."""
from licenses.models import AuditLog as AuditLogModel

audit_entry = AuditLogModel.objects.create(
tenant_id=session.tenant_id,
user_id=session.user,
action='LICENSE_ACQUIRE',
resource_type='session',
resource_id=session.id,
metadata={
'license_id': str(session.license_id.id),
'license_key': session.license_id.license_key,
'hardware_id': session.hardware_id,
'status': session.status,
'started_at': session.started_at.isoformat(),
}
)

logger.info(
f"Audit log created: {audit_entry.id} "
f"(action={audit_entry.action}, session={session.id})"
)

return audit_entry

Method Call Sequence


Data Models (Detailed)

License Model

# backend/licenses/models.py

from django.db import models
from django_multitenant.models import TenantModel
from django.utils import timezone
import uuid

class License(TenantModel):
"""
Floating concurrent license with multi-tenant isolation.

Supports multiple license tiers:
- trial: 1 seat, 14 days
- starter: 3 seats, monthly
- professional: 10 seats, monthly
- enterprise: unlimited seats, annual
"""

tenant_column = 'tenant_id'

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
tenant_id = models.ForeignKey(
'tenants.Tenant',
on_delete=models.CASCADE,
related_name='licenses'
)

license_key = models.CharField(
max_length=255,
unique=True,
db_index=True,
help_text="Unique license key (format: CODITECT-XXXX-XXXX-XXXX)"
)

license_tier = models.CharField(
max_length=50,
choices=[
('trial', 'Trial'),
('starter', 'Starter'),
('professional', 'Professional'),
('enterprise', 'Enterprise'),
],
default='trial'
)

seats_total = models.PositiveIntegerField(
default=1,
help_text="Maximum concurrent seats allowed"
)

expires_at = models.DateTimeField(
help_text="License expiration timestamp"
)

is_active = models.BooleanField(
default=True,
help_text="Whether license is currently active"
)

is_suspended = models.BooleanField(
default=False,
help_text="Whether license has been suspended (e.g., payment failure)"
)

issued_at = models.DateTimeField(
auto_now_add=True,
help_text="When license was first issued"
)

created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)

class Meta:
db_table = 'licenses'
indexes = [
models.Index(fields=['tenant_id', 'is_active']),
models.Index(fields=['license_key']),
models.Index(fields=['expires_at']),
]

def is_valid(self) -> bool:
"""Check if license is currently valid."""
if not self.is_active:
return False
if self.is_suspended:
return False
if self.expires_at < timezone.now():
return False
return True

def has_available_seats(self) -> bool:
"""
Check if license has available seats (via Redis).

NOTE: This is an estimate. Use atomic_seat_check for accuracy.
"""
from core.redis_service import RedisAtomicService

seats_available, _ = RedisAtomicService.atomic_seat_check(
license_key=self.license_key,
seats_total=self.seats_total
)
return seats_available

def get_seat_utilization(self) -> float:
"""Get current seat utilization percentage."""
from django.core.cache import cache

redis_key = f"license:{self.license_key}:active_sessions"
redis_client = cache.client.get_client()

seats_used = redis_client.scard(redis_key)

return (seats_used / self.seats_total) * 100.0

def days_until_expiration(self) -> int:
"""Get number of days until license expires."""
delta = self.expires_at - timezone.now()
return delta.days

def __str__(self):
return f"{self.license_key} ({self.license_tier})"

Session Model

# backend/licenses/models.py

class Session(TenantModel):
"""
Active CODITECT session with heartbeat tracking.

Sessions expire after 6 minutes without heartbeat.
Zombie sessions are automatically cleaned up by Celery task.
"""

tenant_column = 'tenant_id'

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
tenant_id = models.ForeignKey(
'tenants.Tenant',
on_delete=models.CASCADE,
related_name='sessions'
)

license_id = models.ForeignKey(
License,
on_delete=models.CASCADE,
related_name='sessions'
)

user = models.ForeignKey(
'users.User',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='sessions',
help_text="User who acquired the license (optional)"
)

hardware_id = models.CharField(
max_length=255,
help_text="SHA256 hash of hardware fingerprint"
)

hostname = models.CharField(
max_length=255,
blank=True,
null=True,
help_text="Machine hostname (optional)"
)

status = models.CharField(
max_length=50,
choices=[
('active', 'Active'),
('expired', 'Expired'),
('released', 'Released'),
('zombie', 'Zombie'),
],
default='active'
)

last_heartbeat_at = models.DateTimeField(
help_text="Timestamp of last heartbeat"
)

heartbeat_count = models.PositiveIntegerField(
default=0,
help_text="Total heartbeats received"
)

started_at = models.DateTimeField(
auto_now_add=True,
help_text="Session start time"
)

ended_at = models.DateTimeField(
null=True,
blank=True,
help_text="Session end time (if released)"
)

created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)

class Meta:
db_table = 'sessions'
indexes = [
models.Index(fields=['tenant_id', 'license_id', 'status']),
models.Index(fields=['hardware_id']),
models.Index(fields=['last_heartbeat_at']),
models.Index(fields=['status', 'last_heartbeat_at']),
]
constraints = [
models.UniqueConstraint(
fields=['license_id', 'hardware_id', 'status'],
condition=models.Q(status='active'),
name='unique_active_session_per_hardware'
)
]

def is_heartbeat_alive(self, timeout_seconds=360) -> bool:
"""Check if session is still alive (heartbeat within timeout)."""
if self.status != 'active':
return False

elapsed = (timezone.now() - self.last_heartbeat_at).total_seconds()
return elapsed < timeout_seconds

def mark_zombie(self):
"""Mark session as zombie (missed heartbeat)."""
self.status = 'zombie'
self.ended_at = timezone.now()
self.save()

def release(self):
"""Release session (graceful shutdown)."""
self.status = 'released'
self.ended_at = timezone.now()
self.save()

# Remove from Redis active sessions set
from core.redis_service import RedisAtomicService
RedisAtomicService.remove_session_from_set(
license_key=self.license_id.license_key,
session_id=self.id
)

def record_heartbeat(self):
"""Record a heartbeat."""
self.last_heartbeat_at = timezone.now()
self.heartbeat_count += 1
self.save(update_fields=['last_heartbeat_at', 'heartbeat_count', 'updated_at'])

def __str__(self):
return f"Session {self.id} ({self.status})"

Error Handling

Exception Hierarchy

# backend/licenses/exceptions.py

class LicenseError(Exception):
"""Base exception for all license errors."""
pass

class LicenseNotFoundError(LicenseError):
"""License key not found."""
status_code = 404

class LicenseInvalidError(LicenseError):
"""License is invalid (expired, suspended, inactive)."""
status_code = 403

class NoSeatsAvailableError(LicenseError):
"""No seats available."""
status_code = 402

class RedisUnavailableError(LicenseError):
"""Redis is unavailable."""
status_code = 503

class CloudKMSUnavailableError(LicenseError):
"""Cloud KMS is unavailable."""
status_code = 503

Error Response Format

# backend/core/exception_handler.py

from rest_framework.views import exception_handler as drf_exception_handler
from rest_framework.response import Response

def custom_exception_handler(exc, context):
"""Custom exception handler for consistent error responses."""

# Call DRF's default exception handler first
response = drf_exception_handler(exc, context)

if response is not None:
# Customize error response format
error_response = {
'error': {
'code': exc.__class__.__name__,
'message': str(exc),
'status_code': response.status_code,
}
}

# Add field-specific errors if validation error
if hasattr(exc, 'detail') and isinstance(exc.detail, dict):
error_response['error']['fields'] = exc.detail

response.data = error_response

return response

Testing

Unit Tests

# backend/licenses/tests/test_license_acquisition.py

import pytest
from django.test import TestCase
from licenses.services import LicenseService
from licenses.models import License, Session, Tenant
from core.redis_service import RedisAtomicService

@pytest.mark.django_db
class TestLicenseAcquisition(TestCase):
"""Test license acquisition flow."""

def setUp(self):
# Create test tenant
self.tenant = Tenant.objects.create(
name="Test Tenant",
license_tier="professional",
seats_total=10
)

# Create test license
self.license = License.objects.create(
tenant_id=self.tenant,
license_key="CODITECT-TEST-1234-ABCD",
license_tier="professional",
seats_total=3,
expires_at=timezone.now() + timedelta(days=365),
is_active=True
)

# Create test user
self.user = User.objects.create(
tenant_id=self.tenant,
email="test@example.com",
role="admin"
)

def test_acquire_license_success(self):
"""Test successful license acquisition."""
hardware_id = "a1b2c3d4e5f6"

result = LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id=hardware_id
)

# Verify response
assert 'session_id' in result
assert 'license_data' in result
assert result['seats_used'] == 1
assert result['seats_total'] == 3

# Verify session created
session = Session.objects.get(id=result['session_id'])
assert session.status == 'active'
assert session.hardware_id == hardware_id
assert session.license_id == self.license

def test_acquire_license_no_seats_available(self):
"""Test license acquisition when no seats available."""
# Create 3 active sessions (all seats used)
for i in range(3):
Session.objects.create(
tenant_id=self.tenant,
license_id=self.license,
hardware_id=f"hardware_{i}",
status='active',
last_heartbeat_at=timezone.now()
)

# Attempt to acquire 4th seat
with pytest.raises(LicenseService.NoSeatsAvailableError):
LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id="hardware_4"
)

def test_acquire_license_expired(self):
"""Test license acquisition with expired license."""
# Set license to expired
self.license.expires_at = timezone.now() - timedelta(days=1)
self.license.save()

with pytest.raises(LicenseService.LicenseInvalidError) as exc_info:
LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id="hardware_1"
)

assert "expired" in str(exc_info.value).lower()

def test_acquire_license_reuse_existing_session(self):
"""Test reusing existing session for same hardware."""
hardware_id = "a1b2c3d4e5f6"

# First acquisition
result1 = LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id=hardware_id
)

# Second acquisition with same hardware_id
result2 = LicenseService.acquire_license(
user=self.user,
license_key=self.license.license_key,
hardware_id=hardware_id
)

# Should return same session
assert result1['session_id'] == result2['session_id']

# Should only have 1 active session
active_sessions = Session.objects.filter(
license_id=self.license,
status='active'
).count()
assert active_sessions == 1

Status: Specification Complete ✅ Implementation: Backend 60% complete (views/models/serializers done) Dependencies: Redis Memorystore, Cloud KMS Estimated Remaining: 2-3 hours (services layer + tests)


Last Updated: November 30, 2025 Owner: Backend Team Reviewed By: Architecture Team