Skip to main content

Graceful License Release - CODITECT Licensing Platform

Overview

Graceful license release occurs when a user explicitly exits CODITECT, allowing the session to be properly terminated and the license seat to be immediately reclaimed for other users. This is the preferred termination method compared to waiting for session expiry.

Sequence Diagram

Implementation

Client-Side (CODITECT CLI)

Signal Handlers for Graceful Shutdown:

import signal
import sys
import asyncio
from typing import Optional

class GracefulShutdown:
"""Handle graceful shutdown on SIGINT, SIGTERM."""

def __init__(self, license_client, heartbeat_manager):
self.license_client = license_client
self.heartbeat_manager = heartbeat_manager
self.shutting_down = False

def setup_handlers(self):
"""Register signal handlers."""
signal.signal(signal.SIGINT, self._handle_signal) # Ctrl+C
signal.signal(signal.SIGTERM, self._handle_signal) # Docker stop

# Windows compatibility
if sys.platform == 'win32':
signal.signal(signal.SIGBREAK, self._handle_signal)

def _handle_signal(self, signum, frame):
"""Handle shutdown signal."""
if self.shutting_down:
print("\nForce shutdown...")
sys.exit(1)

self.shutting_down = True
print("\nShutting down gracefully...")

# Run async shutdown
asyncio.create_task(self.shutdown())

def shutdown(self):
"""Perform graceful shutdown."""
try:
# 1. Stop heartbeat
print("Stopping heartbeat...")
await self.heartbeat_manager.stop()

# 2. Release license
print("Releasing license...")
await self.license_client.release_license()

print("Goodbye! 👋")
sys.exit(0)

except Exception as e:
print(f"Error during shutdown: {e}")
sys.exit(1)


# Usage in main application
def main():
# Initialize license
license_client = LicenseClient()
session_id = await license_client.acquire_license()

# Start heartbeat
heartbeat = HeartbeatManager(session_id)
await heartbeat.start()

# Setup graceful shutdown
shutdown_handler = GracefulShutdown(license_client, heartbeat)
shutdown_handler.setup_handlers()

# Run CODITECT
try:
await run_coditect()
finally:
# Ensure cleanup even if not via signal
await shutdown_handler.shutdown()

License Client SDK - Release Method:

class LicenseClient:
"""Client SDK for license operations."""

def release_license(self, max_retries: int = 3) -> bool:
"""
Release license and free up seat.

Returns:
True if released successfully
False if failed (session will auto-expire)
"""
if not self.session_id:
logging.warning("No active session to release")
return False

url = f"{self.api_url}/api/v1/licenses/release"

for attempt in range(1, max_retries + 1):
try:
async with aiohttp.ClientSession() as session:
async with session.delete(
url,
json={
"session_id": self.session_id,
"hardware_id": self.hardware_id
},
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
data = await resp.json()
logging.info(
f"License released successfully "
f"(available seats: {data.get('available_seats', 'unknown')})"
)
self.session_id = None
return True

elif resp.status == 404:
# Session already expired
logging.info("Session was already expired")
self.session_id = None
return True

else:
error = await resp.text()
logging.error(f"Release failed: {resp.status} - {error}")

except asyncio.TimeoutError:
logging.warning(f"Release timeout (attempt {attempt}/{max_retries})")
if attempt < max_retries:
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue

except aiohttp.ClientError as e:
logging.warning(f"Release network error: {e}")
if attempt < max_retries:
await asyncio.sleep(2 ** attempt)
continue

# All retries failed
logging.warning(
"Failed to release license - session will auto-expire in 6 minutes"
)
return False

Server-Side (License API)

Django REST Framework Endpoint:

import logging
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status, serializers
from django_redis import get_redis_connection
from django.db import connection
from django.utils import timezone

class LicenseReleaseRequestSerializer(serializers.Serializer):
session_id = serializers.CharField()
hardware_id = serializers.CharField(required=False)

@api_view(['DELETE'])
@permission_classes([IsAuthenticated])
def release_license(request):
"""
Gracefully release license and free up seat.

Request body:
{
"session_id": "abc123-session-id",
"hardware_id": "1a2b3c4d5e6f7890" // Optional verification
}

Response:
{
"status": "released",
"session_id": "abc123-session-id",
"tenant_id": "550e8400-...",
"available_seats": 4,
"timestamp": "2025-11-23T20:00:00Z"
}
"""
# Validate request data
serializer = LicenseReleaseRequestSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

session_id = serializer.validated_data['session_id']
hardware_id = serializer.validated_data.get('hardware_id')

# Get Redis connection
redis_client = get_redis_connection()

session_key = f"session:{session_id}"

# Check if session exists
if not redis_client.exists(session_key):
logging.warning(f"Release attempt for non-existent session: {session_id}")
return Response(
{"detail": "Session not found or already released"},
status=status.HTTP_404_NOT_FOUND
)

# Get session metadata
tenant_id = redis_client.hget(f"session_metadata:{session_id}", "tenant_id")

if not tenant_id:
logging.error(f"Session metadata missing for {session_id}")
return Response(
{"detail": "Session metadata corrupted"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)

tenant_id = tenant_id.decode('utf-8') if isinstance(tenant_id, bytes) else tenant_id

# Optional: Verify hardware fingerprint
if hardware_id:
stored_hw_id = redis_client.hget(f"session_metadata:{session_id}", "hardware_id")
if stored_hw_id and stored_hw_id.decode('utf-8') != hardware_id:
logging.warning(f"Hardware ID mismatch for session {session_id}")
return Response(
{"detail": "Hardware fingerprint mismatch"},
status=status.HTTP_403_FORBIDDEN
)

# Use Lua script for atomic release
release_result = redis_client.eval(
RELEASE_SEAT_SCRIPT,
1, # Number of keys
tenant_id, # KEYS[1]
session_id # ARGV[1]
)

if not release_result:
return Response(
{"detail": "Failed to release seat"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)

new_seat_count, available_seats = release_result

# Audit log
from apps.licenses.models import AuditLog
with connection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO audit_logs (tenant_id, action, resource_type, resource_id, metadata, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
""",
[
tenant_id,
"LICENSE_RELEASED",
"session",
session_id,
{
"session_id": session_id,
"new_seat_count": new_seat_count,
"available_seats": available_seats,
"timestamp": timezone.now().isoformat()
},
timezone.now()
]
)

# Metrics
from prometheus_client import Counter, Gauge
session_released_counter.inc()
active_sessions_gauge.dec()

logging.info(
f"License released: session={session_id}, "
f"tenant={tenant_id}, available={available_seats}"
)

return Response(
{
"status": "released",
"session_id": session_id,
"tenant_id": tenant_id,
"available_seats": available_seats,
"timestamp": timezone.now().isoformat()
},
status=status.HTTP_200_OK
)

Redis Lua Script for Atomic Release:

-- release_seat.lua
-- Atomically release a session and decrement seat count

local tenant_id = KEYS[1]
local session_id = ARGV[1]

-- Remove from active sessions set
local removed = redis.call('SREM', 'tenant:' .. tenant_id .. ':active_sessions', session_id)

if removed == 1 then
-- Decrement seat count
local new_count = redis.call('DECR', 'tenant:' .. tenant_id .. ':seat_count')

-- Delete session key
redis.call('DEL', 'session:' .. session_id)

-- Delete session metadata
redis.call('DEL', 'session_metadata:' .. session_id)

-- Calculate available seats
local max_seats = tonumber(redis.call('GET', 'tenant:' .. tenant_id .. ':max_seats') or '1')
local available = max_seats - new_count

return {new_count, available}
else
-- Session was not in active set (already released or expired)
return nil
end

Edge Cases

1. Double Release

Scenario: User exits twice rapidly (e.g., Ctrl+C twice)

Handling:

class LicenseClient:
def __init__(self):
self._releasing = False

def release_license(self):
if self._releasing:
logging.info("Release already in progress")
return True

self._releasing = True

try:
# ... release logic ...
finally:
self._releasing = False

2. Release During Heartbeat

Scenario: Release request sent while heartbeat is being sent

Handling:

class GracefulShutdown:
def shutdown(self):
# 1. Stop heartbeat first (prevents race)
await self.heartbeat_manager.stop()

# 2. Wait for any in-flight heartbeat to complete
await asyncio.sleep(0.5)

# 3. Now safe to release
await self.license_client.release_license()

3. Network Failure During Release

Scenario: Network down when trying to release

Client behavior:

  • Retry 3 times with exponential backoff
  • If all fail: Accept failure, session will auto-expire via TTL
  • Exit cleanly (don't block user)
def release_license(self, max_retries: int = 3):
for attempt in range(1, max_retries + 1):
try:
# ... attempt release ...
return True
except NetworkError:
if attempt < max_retries:
await asyncio.sleep(2 ** attempt)
continue

# All retries failed - non-fatal
logging.warning("Could not release license - will auto-expire")
return False # But still exit cleanly

4. Crash During Release

Scenario: Client crashes while sending release request

Result:

  • Release request may or may not reach server
  • If reached: Session deleted immediately
  • If not reached: Session expires via TTL in 6 minutes
  • Either way: Seat reclaimed

No data loss or seat leak possible.

Comparison: Graceful vs Auto-Expiry

AspectGraceful ReleaseAuto-Expiry (Zombie Cleanup)
Seat Reclaim TimeImmediate (<1 second)6 minutes (TTL duration)
Server LoadMinimal (1 DELETE request)Higher (keyspace notifications)
User Experience"License released" messageSilent (user unaware)
Network Required?YesNo (works offline)
Audit Trail"LICENSE_RELEASED""SESSION_EXPIRED"
Preferred?✅ Yes (instant reclaim)Fallback only

Recommendation: Always attempt graceful release, fall back to auto-expiry.

Monitoring

Prometheus Metrics

from prometheus_client import Counter, Gauge, Histogram

# Release counter
session_released = Counter(
'license_sessions_released_total',
'Total sessions gracefully released',
['result'] # success, failed, not_found
)

# Release latency
release_latency = Histogram(
'license_release_latency_seconds',
'Time to process license release'
)

# Active sessions gauge
active_sessions = Gauge(
'license_active_sessions',
'Current number of active license sessions',
['tenant_id']
)

# Usage
with release_latency.time():
result = await release_license(req, db, redis)
session_released.labels(result='success').inc()
active_sessions.labels(tenant_id=tenant_id).dec()

Grafana Dashboard

panels:
- title: "License Releases (Last Hour)"
metric: rate(license_sessions_released_total{result="success"}[5m])

- title: "Release Success Rate"
metric: |
sum(rate(license_sessions_released_total{result="success"}[5m]))
/
sum(rate(license_sessions_released_total[5m]))

- title: "Release Latency (p95)"
metric: histogram_quantile(0.95, license_release_latency_seconds)

Testing

Unit Tests

@pytest.mark.asyncio
def test_release_license_success(redis_mock, db):
"""Test successful license release."""
# Arrange
session_id = "test-session-123"
tenant_id = "test-tenant-456"

redis_mock.exists.return_value = True
redis_mock.hget.return_value = tenant_id.encode()
redis_mock.eval.return_value = [4, 1] # new_count, available

req = LicenseReleaseRequest(session_id=session_id)

# Act
response = await release_license(req, db, redis_mock)

# Assert
assert response["status"] == "released"
assert response["available_seats"] == 1
redis_mock.eval.assert_called_once()


@pytest.mark.asyncio
def test_release_nonexistent_session(redis_mock, db):
"""Test releasing non-existent session returns 404."""
# Arrange
redis_mock.exists.return_value = False

req = LicenseReleaseRequest(session_id="nonexistent")

# Act & Assert
with pytest.raises(Response(status=status.HTTP_400_BAD_REQUEST)) as exc_info:
await release_license(req, db, redis_mock)

assert exc_info.value.status_code == 404

Integration Tests

@pytest.mark.integration
def test_graceful_shutdown_releases_license(client, real_redis):
"""Test that graceful shutdown releases license."""
# 1. Acquire license
session_id = await acquire_license(client)

# 2. Verify session exists
assert real_redis.exists(f"session:{session_id}") == 1

# 3. Release license
response = await client.delete(
"/api/v1/licenses/release",
json={"session_id": session_id}
)
assert response.status_code == 200

# 4. Verify session deleted
assert real_redis.exists(f"session:{session_id}") == 0

# 5. Verify seat count decremented
seat_count = real_redis.get("tenant:test-tenant:seat_count")
assert seat_count == 0

Status: Specification Complete ✅ Implementation: Pending (Phase 2) Dependencies: License API, Redis Lua scripts ETA: 2 hours


Last Updated: November 23, 2025 Owner: Backend Team Reviewed By: QA Team