License Acquisition Workflow
Overview
This document describes the complete workflow for acquiring, validating, and maintaining a CODITECT license from the user's perspective and the system's technical implementation.
User Experience Flow
Detailed Workflow Steps
1. Initial Authentication
Trigger: User runs coditect init for the first time
Steps:
- CODITECT opens browser to
https://auth.coditect.ai/login - User selects sign-in method:
- Google OAuth
- GitHub OAuth
- Email/Password
- Identity Platform handles authentication
- Browser redirects to
coditect://auth/callback?token=JWT_TOKEN - CODITECT captures JWT token
- Token saved to
~/.coditect/credentials.json
Credential File Format:
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "AEohG9x...",
"expires_at": "2025-11-24T01:30:00Z",
"user_email": "user@example.com",
"tenant_id": "550e8400-e29b-41d4-a716-446655440000"
}
2. Hardware Fingerprinting
Purpose: Prevent license sharing across machines
Collected Data:
import platform
import uuid
import hashlib
def generate_hardware_fingerprint():
components = [
platform.node(), # Hostname
str(uuid.getnode()), # MAC address
platform.processor(), # CPU info
platform.system(), # OS
]
# Hash to anonymize
fingerprint = hashlib.sha256(
"|".join(components).encode()
).hexdigest()
return fingerprint[:32] # First 32 chars
Example Output: a7f3b2c8d9e1f4a6b5c7d8e9f0a1b2c3
Privacy: Hashed on client-side, raw values never sent to server
3. License Acquisition Request
HTTP Request:
POST /api/v1/licenses/acquire HTTP/1.1
Host: auth.coditect.ai
Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...
Content-Type: application/json
{
"hardware_id": "a7f3b2c8d9e1f4a6b5c7d8e9f0a1b2c3",
"client_version": "1.0.0",
"platform": "darwin"
}
Server-Side Processing:
@action(detail=False, methods='post')("/api/v1/licenses/acquire")
def acquire_license(
req: LicenseAcquireRequest,
current_user: User = request.user,
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
kms_client: KMSClient = Depends(get_kms)
):
# 1. Get tenant's license configuration
license = await db.execute(
select(License)
.where(License.tenant_id == current_user.tenant_id)
.where(License.expiry_date > datetime.utcnow())
)
license = license.scalar_one_or_none()
if not license:
raise Response(status=status.HTTP_400_BAD_REQUEST)(404, "No active license found")
# 2. Atomic seat acquisition (Redis Lua script)
session_id = f"{current_user.id}:{req.hardware_id}:{int(time.time())}"
acquired = await redis.eval(
ACQUIRE_SEAT_SCRIPT,
keys=[current_user.tenant_id],
args=[session_id, license.max_seats]
)
if acquired == 0:
# Get active users for error message
active_sessions = await redis.smembers(
f"tenant:{current_user.tenant_id}:active_sessions"
)
raise Response(status=status.HTTP_400_BAD_REQUEST)(
429,
detail={
"error": "All license seats in use",
"max_seats": license.max_seats,
"active_sessions": len(active_sessions)
}
)
# 3. Build license payload
payload = {
"session_id": session_id,
"user_id": str(current_user.id),
"tenant_id": str(current_user.tenant_id),
"tier": license.tier,
"features": license.features,
"valid_until": (datetime.utcnow() + timedelta(hours=24)).isoformat(),
"issued_at": datetime.utcnow().isoformat()
}
# 4. Sign with Cloud KMS
digest = hashlib.sha256(
json.dumps(payload, sort_keys=True).encode()
).digest()
sign_response = kms_client.asymmetric_sign(
request={
"name": KMS_KEY_NAME,
"digest": {"sha256": digest}
}
)
signature = base64.b64encode(sign_response.signature).decode()
# 5. Log acquisition
await create_audit_log(
db,
tenant_id=current_user.tenant_id,
user_id=current_user.id,
action="LICENSE_ACQUIRED",
metadata={
"session_id": session_id,
"hardware_id": req.hardware_id
}
)
return {
"payload": payload,
"signature": signature,
"heartbeat_interval": 300 # 5 minutes
}
Success Response:
{
"payload": {
"session_id": "user123:a7f3b2c8:1732406400",
"user_id": "550e8400-e29b-41d4-a716-446655440000",
"tenant_id": "660e8400-e29b-41d4-a716-446655440000",
"tier": "PRO",
"features": ["marketplace", "analytics", "collaboration"],
"valid_until": "2025-11-25T01:30:00Z",
"issued_at": "2025-11-24T01:30:00Z"
},
"signature": "MEUCIQDx7Y...",
"heartbeat_interval": 300
}
4. Client-Side Signature Verification
Purpose: Ensure license wasn't tampered with
Implementation:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import base64
# Public key embedded in CODITECT client
PUBLIC_KEY_PEM = """
-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEA...
-----END PUBLIC KEY-----
"""
def verify_license_signature(payload: dict, signature: str) -> bool:
# Load public key
public_key = serialization.load_pem_public_key(
PUBLIC_KEY_PEM.encode()
)
# Reconstruct digest
payload_json = json.dumps(payload, sort_keys=True)
digest = hashlib.sha256(payload_json.encode()).digest()
# Verify signature
try:
public_key.verify(
base64.b64decode(signature),
digest,
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False
5. Heartbeat Mechanism
Purpose: Keep session alive, detect zombie sessions
Client Implementation:
import asyncio
import aiohttp
class LicenseClient:
def __init__(self):
self.session_id = None
self.heartbeat_task = None
def start_heartbeat(self, interval: int = 300):
"""Send heartbeat every 5 minutes"""
while True:
try:
await asyncio.sleep(interval)
await self.send_heartbeat()
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"Heartbeat failed: {e}")
# Retry with exponential backoff
await asyncio.sleep(min(interval * 2, 60))
def send_heartbeat(self):
async with aiohttp.ClientSession() as session:
async with session.put(
f"https://auth.coditect.ai/api/v1/licenses/heartbeat",
json={"session_id": self.session_id},
headers={"Authorization": f"Bearer {self.token}"}
) as resp:
if resp.status == 200:
data = await resp.json()
logging.debug(f"Heartbeat OK: {data}")
else:
logging.error(f"Heartbeat failed: {resp.status}")
Server Implementation:
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status, serializers
from django_redis import get_redis_connection
class HeartbeatRequestSerializer(serializers.Serializer):
session_id = serializers.CharField()
@api_view(['PUT'])
@permission_classes([IsAuthenticated])
def heartbeat(request):
# Validate request data
serializer = HeartbeatRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
session_id = serializer.validated_data['session_id']
# Get Redis connection
redis_client = get_redis_connection()
# Extend session TTL
result = redis_client.expire(
f"session:{session_id}",
360 # 6 minutes
)
if result == 0:
# Session doesn't exist
return Response(
{"detail": "Session not found"},
status=status.HTTP_404_NOT_FOUND
)
return Response(
{
"status": "alive",
"expires_in": 360
},
status=status.HTTP_200_OK
)
6. Zombie Session Cleanup
Scenario: User closes CODITECT without graceful shutdown (crash, power loss)
Redis Keyspace Notification Setup:
# Enable keyspace notifications for expired keys
redis-cli CONFIG SET notify-keyspace-events Ex
Listener Implementation:
import asyncio
import aioredis
def listen_for_expired_sessions():
redis = await aioredis.from_url("redis://localhost")
pubsub = redis.pubsub()
# Subscribe to expired key events
await pubsub.psubscribe("__keyevent@0__:expired")
async for message in pubsub.listen():
if message["type"] == "pmessage":
expired_key = message["data"].decode()
if expired_key.startswith("session:"):
session_id = expired_key.split(":")[1]
await handle_session_expiry(session_id)
def handle_session_expiry(session_id: str):
"""Called when Redis expires a session key"""
# Parse session_id to get tenant_id
# Format: user_id:hardware_id:timestamp
parts = session_id.split(":")
# Get tenant_id from database
user_id = parts[0]
user = await db.query(User).filter(id=user_id).first()
if user:
# Decrement seat count
await redis.decr(f"tenant:{user.tenant_id}:seat_count")
# Remove from active sessions set
await redis.srem(
f"tenant:{user.tenant_id}:active_sessions",
session_id
)
# Log cleanup
await create_audit_log(
db,
tenant_id=user.tenant_id,
user_id=user.id,
action="SESSION_EXPIRED",
metadata={"session_id": session_id, "cleanup": "automatic"}
)
logging.info(f"Cleaned up expired session: {session_id}")
7. Graceful License Release
Trigger: User runs exit or presses Ctrl+C
Client Implementation:
import signal
import sys
class LicenseClient:
def setup_cleanup_handlers(self):
"""Register cleanup on exit"""
signal.signal(signal.SIGINT, self.handle_shutdown)
signal.signal(signal.SIGTERM, self.handle_shutdown)
atexit.register(self.release_license)
def handle_shutdown(self, signum, frame):
"""Handle Ctrl+C or kill signal"""
logging.info("Shutting down gracefully...")
asyncio.run(self.release_license())
sys.exit(0)
def release_license(self):
"""Explicit license release"""
if not self.session_id:
return
try:
async with aiohttp.ClientSession() as session:
async with session.delete(
f"https://auth.coditect.ai/api/v1/licenses/release",
json={"session_id": self.session_id},
headers={"Authorization": f"Bearer {self.token}"}
) as resp:
if resp.status == 200:
logging.info("License released successfully")
except Exception as e:
logging.error(f"Failed to release license: {e}")
finally:
self.session_id = None
Server Implementation:
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status, serializers
from django_redis import get_redis_connection
from apps.licenses.services.audit import create_audit_log
class LicenseReleaseRequestSerializer(serializers.Serializer):
session_id = serializers.CharField()
@api_view(['DELETE'])
@permission_classes([IsAuthenticated])
def release_license(request):
# Validate request data
serializer = LicenseReleaseRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
session_id = serializer.validated_data['session_id']
current_user = request.user
# Get Redis connection
redis_client = get_redis_connection()
# Remove session
redis_client.delete(f"session:{session_id}")
# Decrement seat count
redis_client.decr(f"tenant:{current_user.tenant_id}:seat_count")
# Remove from active sessions
redis_client.srem(
f"tenant:{current_user.tenant_id}:active_sessions",
session_id
)
# Log release
create_audit_log(
tenant_id=current_user.tenant_id,
user_id=current_user.id,
action="LICENSE_RELEASED",
metadata={"session_id": session_id, "graceful": True}
)
return Response(
{"status": "released"},
status=status.HTTP_200_OK
)
Error Handling
1. License Expired
{
"error": "License expired",
"expiry_date": "2025-11-01T00:00:00Z",
"renewal_url": "https://auth.coditect.ai/billing"
}
Client Action: Show renewal prompt, open browser to billing page
2. All Seats In Use
{
"error": "All license seats in use",
"max_seats": 5,
"active_sessions": 5,
"active_users": [
{"email": "user1@example.com", "since": "2025-11-24T01:00:00Z"},
{"email": "user2@example.com", "since": "2025-11-24T01:15:00Z"}
],
"upgrade_url": "https://auth.coditect.ai/billing/upgrade"
}
Client Action: Show active users, offer upgrade option
3. Network Offline
Client Behavior:
- Grace Period: Continue working for 24 hours (configurable)
- Warning: Show notification "Working offline, license expires in X hours"
- Expiry: After grace period, prompt to reconnect
Implementation:
class LicenseClient:
def __init__(self):
self.offline_grace_period = timedelta(hours=24)
self.last_successful_validation = None
def is_license_valid_offline(self) -> bool:
"""Check if cached license is still valid"""
if not self.last_successful_validation:
return False
time_offline = datetime.utcnow() - self.last_successful_validation
return time_offline < self.offline_grace_period
Security Considerations
1. Token Signing
- Algorithm: RSA-4096 with SHA-256
- Key Storage: Cloud KMS (HSM-backed)
- Rotation: Automatic every 90 days
- Backward Compatibility: 30-day overlap for old keys
2. Hardware Fingerprinting
- Privacy: Hashed client-side, never raw values
- Stability: Uses stable components (CPU, MAC) not volatile (RAM)
- Binding: Max 3 hardware IDs per user (for laptop + desktop + CI)
3. Audit Logging
- All actions logged: Acquisition, heartbeat, release, expiry
- Retention: 90 days (compliance)
- Anonymization: PII scrubbed after 30 days
Performance Optimization
1. Redis Lua Scripts
- Atomic operations prevent race conditions
- Single network round-trip for seat check + increment
- Performance: <1ms for seat acquisition
2. Connection Pooling
- Client: Reuse HTTP connections (aiohttp connection pool)
- Server: PostgreSQL connection pooling (SQLAlchemy async pool)
3. Caching
- License metadata cached in Redis for 5 minutes
- Public key cached in client (never expires, rotated via client update)
Status: Infrastructure 100% Deployed ✅ Next: Implement Django REST Framework endpoints based on this spec ETA: 1-2 weeks to working license system