Skip to main content

License Acquisition Workflow

Overview

This document describes the complete workflow for acquiring, validating, and maintaining a CODITECT license from the user's perspective and the system's technical implementation.

User Experience Flow

Detailed Workflow Steps

1. Initial Authentication

Trigger: User runs coditect init for the first time

Steps:

  1. CODITECT opens browser to https://auth.coditect.ai/login
  2. User selects sign-in method:
    • Google OAuth
    • GitHub OAuth
    • Email/Password
  3. Identity Platform handles authentication
  4. Browser redirects to coditect://auth/callback?token=JWT_TOKEN
  5. CODITECT captures JWT token
  6. Token saved to ~/.coditect/credentials.json

Credential File Format:

{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "AEohG9x...",
"expires_at": "2025-11-24T01:30:00Z",
"user_email": "user@example.com",
"tenant_id": "550e8400-e29b-41d4-a716-446655440000"
}

2. Hardware Fingerprinting

Purpose: Prevent license sharing across machines

Collected Data:

import platform
import uuid
import hashlib

def generate_hardware_fingerprint():
components = [
platform.node(), # Hostname
str(uuid.getnode()), # MAC address
platform.processor(), # CPU info
platform.system(), # OS
]

# Hash to anonymize
fingerprint = hashlib.sha256(
"|".join(components).encode()
).hexdigest()

return fingerprint[:32] # First 32 chars

Example Output: a7f3b2c8d9e1f4a6b5c7d8e9f0a1b2c3

Privacy: Hashed on client-side, raw values never sent to server

3. License Acquisition Request

HTTP Request:

POST /api/v1/licenses/acquire HTTP/1.1
Host: auth.coditect.ai
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
Content-Type: application/json

{
"hardware_id": "a7f3b2c8d9e1f4a6b5c7d8e9f0a1b2c3",
"client_version": "1.0.0",
"platform": "darwin"
}

Server-Side Processing:

@action(detail=False, methods='post')("/api/v1/licenses/acquire")
def acquire_license(
req: LicenseAcquireRequest,
current_user: User = request.user,
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
kms_client: KMSClient = Depends(get_kms)
):
# 1. Get tenant's license configuration
license = await db.execute(
select(License)
.where(License.tenant_id == current_user.tenant_id)
.where(License.expiry_date > datetime.utcnow())
)
license = license.scalar_one_or_none()

if not license:
raise Response(status=status.HTTP_400_BAD_REQUEST)(404, "No active license found")

# 2. Atomic seat acquisition (Redis Lua script)
session_id = f"{current_user.id}:{req.hardware_id}:{int(time.time())}"

acquired = await redis.eval(
ACQUIRE_SEAT_SCRIPT,
keys=[current_user.tenant_id],
args=[session_id, license.max_seats]
)

if acquired == 0:
# Get active users for error message
active_sessions = await redis.smembers(
f"tenant:{current_user.tenant_id}:active_sessions"
)
raise Response(status=status.HTTP_400_BAD_REQUEST)(
429,
detail={
"error": "All license seats in use",
"max_seats": license.max_seats,
"active_sessions": len(active_sessions)
}
)

# 3. Build license payload
payload = {
"session_id": session_id,
"user_id": str(current_user.id),
"tenant_id": str(current_user.tenant_id),
"tier": license.tier,
"features": license.features,
"valid_until": (datetime.utcnow() + timedelta(hours=24)).isoformat(),
"issued_at": datetime.utcnow().isoformat()
}

# 4. Sign with Cloud KMS
digest = hashlib.sha256(
json.dumps(payload, sort_keys=True).encode()
).digest()

sign_response = kms_client.asymmetric_sign(
request={
"name": KMS_KEY_NAME,
"digest": {"sha256": digest}
}
)

signature = base64.b64encode(sign_response.signature).decode()

# 5. Log acquisition
await create_audit_log(
db,
tenant_id=current_user.tenant_id,
user_id=current_user.id,
action="LICENSE_ACQUIRED",
metadata={
"session_id": session_id,
"hardware_id": req.hardware_id
}
)

return {
"payload": payload,
"signature": signature,
"heartbeat_interval": 300 # 5 minutes
}

Success Response:

{
"payload": {
"session_id": "user123:a7f3b2c8:1732406400",
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"tenant_id": "660e8400-e29b-41d4-a716-446655440000",
"tier": "PRO",
"features": ["marketplace", "analytics", "collaboration"],
"valid_until": "2025-11-25T01:30:00Z",
"issued_at": "2025-11-24T01:30:00Z"
},
"signature": "MEUCIQDx7Y...",
"heartbeat_interval": 300
}

4. Client-Side Signature Verification

Purpose: Ensure license wasn't tampered with

Implementation:

from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import base64

# Public key embedded in CODITECT client
PUBLIC_KEY_PEM = """
-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA...
-----END PUBLIC KEY-----
"""

def verify_license_signature(payload: dict, signature: str) -> bool:
# Load public key
public_key = serialization.load_pem_public_key(
PUBLIC_KEY_PEM.encode()
)

# Reconstruct digest
payload_json = json.dumps(payload, sort_keys=True)
digest = hashlib.sha256(payload_json.encode()).digest()

# Verify signature
try:
public_key.verify(
base64.b64decode(signature),
digest,
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False

5. Heartbeat Mechanism

Purpose: Keep session alive, detect zombie sessions

Client Implementation:

import asyncio
import aiohttp

class LicenseClient:
def __init__(self):
self.session_id = None
self.heartbeat_task = None

def start_heartbeat(self, interval: int = 300):
"""Send heartbeat every 5 minutes"""
while True:
try:
await asyncio.sleep(interval)
await self.send_heartbeat()
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"Heartbeat failed: {e}")
# Retry with exponential backoff
await asyncio.sleep(min(interval * 2, 60))

def send_heartbeat(self):
async with aiohttp.ClientSession() as session:
async with session.put(
f"https://auth.coditect.ai/api/v1/licenses/heartbeat",
json={"session_id": self.session_id},
headers={"Authorization": f"Bearer {self.token}"}
) as resp:
if resp.status == 200:
data = await resp.json()
logging.debug(f"Heartbeat OK: {data}")
else:
logging.error(f"Heartbeat failed: {resp.status}")

Server Implementation:

from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status, serializers
from django_redis import get_redis_connection

class HeartbeatRequestSerializer(serializers.Serializer):
session_id = serializers.CharField()

@api_view(['PUT'])
@permission_classes([IsAuthenticated])
def heartbeat(request):
# Validate request data
serializer = HeartbeatRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

session_id = serializer.validated_data['session_id']

# Get Redis connection
redis_client = get_redis_connection()

# Extend session TTL
result = redis_client.expire(
f"session:{session_id}",
360 # 6 minutes
)

if result == 0:
# Session doesn't exist
return Response(
{"detail": "Session not found"},
status=status.HTTP_404_NOT_FOUND
)

return Response(
{
"status": "alive",
"expires_in": 360
},
status=status.HTTP_200_OK
)

6. Zombie Session Cleanup

Scenario: User closes CODITECT without graceful shutdown (crash, power loss)

Redis Keyspace Notification Setup:

# Enable keyspace notifications for expired keys
redis-cli CONFIG SET notify-keyspace-events Ex

Listener Implementation:

import asyncio
import aioredis

def listen_for_expired_sessions():
redis = await aioredis.from_url("redis://localhost")
pubsub = redis.pubsub()

# Subscribe to expired key events
await pubsub.psubscribe("__keyevent@0__:expired")

async for message in pubsub.listen():
if message["type"] == "pmessage":
expired_key = message["data"].decode()

if expired_key.startswith("session:"):
session_id = expired_key.split(":")[1]
await handle_session_expiry(session_id)

def handle_session_expiry(session_id: str):
"""Called when Redis expires a session key"""

# Parse session_id to get tenant_id
# Format: user_id:hardware_id:timestamp
parts = session_id.split(":")

# Get tenant_id from database
user_id = parts[0]
user = await db.query(User).filter(id=user_id).first()

if user:
# Decrement seat count
await redis.decr(f"tenant:{user.tenant_id}:seat_count")

# Remove from active sessions set
await redis.srem(
f"tenant:{user.tenant_id}:active_sessions",
session_id
)

# Log cleanup
await create_audit_log(
db,
tenant_id=user.tenant_id,
user_id=user.id,
action="SESSION_EXPIRED",
metadata={"session_id": session_id, "cleanup": "automatic"}
)

logging.info(f"Cleaned up expired session: {session_id}")

7. Graceful License Release

Trigger: User runs exit or presses Ctrl+C

Client Implementation:

import signal
import sys

class LicenseClient:
def setup_cleanup_handlers(self):
"""Register cleanup on exit"""
signal.signal(signal.SIGINT, self.handle_shutdown)
signal.signal(signal.SIGTERM, self.handle_shutdown)
atexit.register(self.release_license)

def handle_shutdown(self, signum, frame):
"""Handle Ctrl+C or kill signal"""
logging.info("Shutting down gracefully...")
asyncio.run(self.release_license())
sys.exit(0)

def release_license(self):
"""Explicit license release"""
if not self.session_id:
return

try:
async with aiohttp.ClientSession() as session:
async with session.delete(
f"https://auth.coditect.ai/api/v1/licenses/release",
json={"session_id": self.session_id},
headers={"Authorization": f"Bearer {self.token}"}
) as resp:
if resp.status == 200:
logging.info("License released successfully")
except Exception as e:
logging.error(f"Failed to release license: {e}")
finally:
self.session_id = None

Server Implementation:

from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status, serializers
from django_redis import get_redis_connection
from apps.licenses.services.audit import create_audit_log

class LicenseReleaseRequestSerializer(serializers.Serializer):
session_id = serializers.CharField()

@api_view(['DELETE'])
@permission_classes([IsAuthenticated])
def release_license(request):
# Validate request data
serializer = LicenseReleaseRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

session_id = serializer.validated_data['session_id']
current_user = request.user

# Get Redis connection
redis_client = get_redis_connection()

# Remove session
redis_client.delete(f"session:{session_id}")

# Decrement seat count
redis_client.decr(f"tenant:{current_user.tenant_id}:seat_count")

# Remove from active sessions
redis_client.srem(
f"tenant:{current_user.tenant_id}:active_sessions",
session_id
)

# Log release
create_audit_log(
tenant_id=current_user.tenant_id,
user_id=current_user.id,
action="LICENSE_RELEASED",
metadata={"session_id": session_id, "graceful": True}
)

return Response(
{"status": "released"},
status=status.HTTP_200_OK
)

Error Handling

1. License Expired

{
"error": "License expired",
"expiry_date": "2025-11-01T00:00:00Z",
"renewal_url": "https://auth.coditect.ai/billing"
}

Client Action: Show renewal prompt, open browser to billing page

2. All Seats In Use

{
"error": "All license seats in use",
"max_seats": 5,
"active_sessions": 5,
"active_users": [
{"email": "user1@example.com", "since": "2025-11-24T01:00:00Z"},
{"email": "user2@example.com", "since": "2025-11-24T01:15:00Z"}
],
"upgrade_url": "https://auth.coditect.ai/billing/upgrade"
}

Client Action: Show active users, offer upgrade option

3. Network Offline

Client Behavior:

  • Grace Period: Continue working for 24 hours (configurable)
  • Warning: Show notification "Working offline, license expires in X hours"
  • Expiry: After grace period, prompt to reconnect

Implementation:

class LicenseClient:
def __init__(self):
self.offline_grace_period = timedelta(hours=24)
self.last_successful_validation = None

def is_license_valid_offline(self) -> bool:
"""Check if cached license is still valid"""
if not self.last_successful_validation:
return False

time_offline = datetime.utcnow() - self.last_successful_validation
return time_offline < self.offline_grace_period

Security Considerations

1. Token Signing

  • Algorithm: RSA-4096 with SHA-256
  • Key Storage: Cloud KMS (HSM-backed)
  • Rotation: Automatic every 90 days
  • Backward Compatibility: 30-day overlap for old keys

2. Hardware Fingerprinting

  • Privacy: Hashed client-side, never raw values
  • Stability: Uses stable components (CPU, MAC) not volatile (RAM)
  • Binding: Max 3 hardware IDs per user (for laptop + desktop + CI)

3. Audit Logging

  • All actions logged: Acquisition, heartbeat, release, expiry
  • Retention: 90 days (compliance)
  • Anonymization: PII scrubbed after 30 days

Performance Optimization

1. Redis Lua Scripts

  • Atomic operations prevent race conditions
  • Single network round-trip for seat check + increment
  • Performance: <1ms for seat acquisition

2. Connection Pooling

  • Client: Reuse HTTP connections (aiohttp connection pool)
  • Server: PostgreSQL connection pooling (SQLAlchemy async pool)

3. Caching

  • License metadata cached in Redis for 5 minutes
  • Public key cached in client (never expires, rotated via client update)

Status: Infrastructure 100% Deployed ✅ Next: Implement Django REST Framework endpoints based on this spec ETA: 1-2 weeks to working license system