Zombie Session Cleanup - CODITECT Licensing Platform
Overview
Zombie sessions occur when CODITECT clients crash, lose network connectivity, or terminate abnormally without sending a graceful license release. The cleanup mechanism automatically reclaims seats after session TTL expiry using Redis keyspace notifications.
Sequence Diagram
Redis Keyspace Notifications
Configuration
Enable keyspace notifications in Redis:
# In redis.conf
notify-keyspace-events "Ex"
# Or via CLI
redis-cli CONFIG SET notify-keyspace-events Ex
What this enables:
E- Keyevent events (events published to__keyevent@<db>__:<event>channel)x- Expired events
Subscription Pattern
Subscribe to expired events:
import redis
r = redis.Redis(host='redis-host', port=6379, db=0)
pubsub = r.pubsub()
# Subscribe to expired events on database 0
pubsub.psubscribe('__keyevent@0__:expired')
for message in pubsub.listen():
if message['type'] == 'pmessage':
expired_key = message['data'].decode('utf-8')
if expired_key.startswith('session:'):
handle_session_expiry(expired_key)
Implementation
Cleanup Worker (Python)
import asyncio
import redis.asyncio as redis
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime
class ZombieSessionCleanup:
"""Background worker to clean up expired sessions."""
def __init__(
self,
redis_url: str,
db: AsyncSession,
pattern: str = '__keyevent@0__:expired'
):
self.redis_url = redis_url
self.db = db
self.pattern = pattern
self.running = False
async def start(self):
"""Start listening for expired session events."""
self.redis = await redis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()
await self.pubsub.psubscribe(self.pattern)
self.running = True
logging.info(f"Zombie session cleanup started (pattern: {self.pattern})")
await self._listen_loop()
async def stop(self):
"""Stop the cleanup worker."""
self.running = False
await self.pubsub.unsubscribe(self.pattern)
await self.redis.close()
logging.info("Zombie session cleanup stopped")
async def _listen_loop(self):
"""Main event listening loop."""
while self.running:
try:
message = await self.pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0
)
if message and message['type'] == 'pmessage':
expired_key = message['data'].decode('utf-8')
await self._handle_expired_key(expired_key)
except Exception as e:
logging.error(f"Error in cleanup loop: {e}", exc_info=True)
await asyncio.sleep(1) # Brief pause before retry
async def _handle_expired_key(self, key: str):
"""Handle an expired Redis key."""
# Only process session keys
if not key.startswith('session:'):
return
session_id = key.replace('session:', '')
logging.info(f"Processing expired session: {session_id}")
try:
# Find which tenant this session belonged to
# We need to scan all tenant active_session sets
tenant_id = await self._find_tenant_for_session(session_id)
if not tenant_id:
logging.warning(f"No tenant found for expired session: {session_id}")
return
# Remove from tenant's active sessions set
removed = await self.redis.srem(
f"tenant:{tenant_id}:active_sessions",
session_id
)
if removed:
# Decrement seat count
new_count = await self.redis.decr(f"tenant:{tenant_id}:seat_count")
logging.info(
f"Seat reclaimed for tenant {tenant_id} "
f"(session: {session_id}, new count: {new_count})"
)
# Audit log
await self._log_session_expiry(tenant_id, session_id)
# Metrics
from prometheus_client import Counter
session_expired_counter.labels(reason='timeout').inc()
else:
logging.warning(
f"Session {session_id} not found in tenant {tenant_id} active set"
)
except Exception as e:
logging.error(f"Error handling expired session {session_id}: {e}", exc_info=True)
async def _find_tenant_for_session(self, session_id: str) -> str:
"""Find which tenant owns this session by scanning active_sessions sets."""
# Pattern: tenant:*:active_sessions
cursor = 0
while True:
cursor, keys = await self.redis.scan(
cursor=cursor,
match="tenant:*:active_sessions",
count=100
)
for key in keys:
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
# Check if session_id is in this set
is_member = await self.redis.sismember(key_str, session_id)
if is_member:
# Extract tenant_id from key
# Format: tenant:{tenant_id}:active_sessions
tenant_id = key_str.split(':')[1]
return tenant_id
if cursor == 0:
break
return None
async def _log_session_expiry(self, tenant_id: str, session_id: str):
"""Log session expiry to audit log."""
from models import AuditLog
audit = AuditLog(
tenant_id=tenant_id,
action="SESSION_EXPIRED",
resource_type="session",
resource_id=session_id,
metadata={
"session_id": session_id,
"reason": "ttl_expired",
"timestamp": datetime.utcnow().isoformat()
}
)
self.db.add(audit)
await self.db.commit()
# Start worker on application startup (Django AppConfig)
# apps/licenses/apps.py
from django.apps import AppConfig
from django.conf import settings
import threading
class LicensesConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.licenses'
cleanup_worker = None
cleanup_thread = None
def ready(self):
"""Initialize cleanup worker on application startup."""
if settings.ENABLE_ZOMBIE_CLEANUP:
from apps.licenses.services.zombie_cleanup import ZombieSessionCleanup
self.cleanup_worker = ZombieSessionCleanup(
redis_url=settings.REDIS_URL
)
# Start cleanup worker in background thread
self.cleanup_thread = threading.Thread(
target=self.cleanup_worker.start,
daemon=True
)
self.cleanup_thread.start()
# Shutdown handling via Django signals
# apps/licenses/signals.py
from django.core.signals import request_finished
from django.dispatch import receiver
from django.apps import apps
@receiver(request_finished)
def cleanup_on_shutdown(sender, **kwargs):
"""Stop cleanup worker on application shutdown."""
config = apps.get_app_config('licenses')
if config.cleanup_worker:
config.cleanup_worker.stop()
Alternative: Lua Script for Atomic Cleanup
More efficient approach using Redis Lua script:
-- cleanup_expired_session.lua
-- Called when a session key expires
local session_id = KEYS[1] -- e.g., "session:abc123"
-- Find tenant by scanning active_sessions sets
local cursor = 0
local found_tenant = nil
repeat
local result = redis.call('SCAN', cursor, 'MATCH', 'tenant:*:active_sessions', 'COUNT', 100)
cursor = result[1]
local keys = result[2]
for _, key in ipairs(keys) do
local is_member = redis.call('SISMEMBER', key, session_id)
if is_member == 1 then
-- Extract tenant_id from key (format: tenant:{id}:active_sessions)
found_tenant = string.match(key, "tenant:([^:]+):active_sessions")
break
end
end
until cursor == "0" or found_tenant
if found_tenant then
-- Remove from active sessions
redis.call('SREM', 'tenant:' .. found_tenant .. ':active_sessions', session_id)
-- Decrement seat count
local new_count = redis.call('DECR', 'tenant:' .. found_tenant .. ':seat_count')
return {found_tenant, new_count}
else
return nil
end
Load and use script:
# Load script
with open('cleanup_expired_session.lua', 'r') as f:
cleanup_script = f.read()
script_sha = redis.script_load(cleanup_script)
# On keyspace notification
async def handle_expired_key(key: str):
if key.startswith('session:'):
result = await redis.evalsha(
script_sha,
1, # Number of keys
key # KEYS[1]
)
if result:
tenant_id, new_count = result
logging.info(f"Cleaned session {key}: tenant={tenant_id}, seats={new_count}")
Edge Cases
1. Session Expires During Heartbeat
Scenario:
- Client sends heartbeat at T=359s (1 second before expiry)
- Network delay causes heartbeat to arrive at T=361s
- Session already expired and cleaned up
Handling:
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from django_redis import get_redis_connection
@api_view(['PUT'])
@permission_classes([IsAuthenticated])
def heartbeat(request):
"""Extend session TTL via heartbeat (handles expired sessions gracefully)."""
# Get session_id from request
session_id = request.data.get('session_id')
# Get Redis connection
redis_client = get_redis_connection()
# Check if session exists
if not redis_client.exists(f"session:{session_id}"):
# Session expired - return 404
return Response(
{"detail": "Session expired"},
status=status.HTTP_404_NOT_FOUND
)
# Extend TTL
redis_client.expire(f"session:{session_id}", 360)
return Response(
{"status": "alive"},
status=status.HTTP_200_OK
)
Client recovery:
async def _send_heartbeat(self):
response = await self.api.heartbeat(self.session_id)
if response.status == 404:
logging.warning("Session expired - reacquiring license")
await self._reacquire_license()
2. Rapid Session Churn
Scenario: Many sessions expiring simultaneously (e.g., datacenter outage)
Problem: Cleanup worker overwhelmed
Solution:
import asyncio
from asyncio import Queue
class ZombieSessionCleanup:
def __init__(self, ...):
# ...
self.cleanup_queue = Queue(maxsize=1000)
self.worker_count = 5
async def start(self):
# Start multiple worker tasks
self.workers = [
asyncio.create_task(self._cleanup_worker(i))
for i in range(self.worker_count)
]
# Listen for expired events
await self._listen_loop()
async def _listen_loop(self):
while self.running:
message = await self.pubsub.get_message(...)
if message:
expired_key = message['data'].decode('utf-8')
# Add to queue instead of processing directly
await self.cleanup_queue.put(expired_key)
async def _cleanup_worker(self, worker_id: int):
"""Worker task that processes cleanup queue."""
while self.running:
try:
key = await asyncio.wait_for(
self.cleanup_queue.get(),
timeout=1.0
)
await self._handle_expired_key(key)
except asyncio.TimeoutError:
continue
3. Network Partition During Cleanup
Scenario: Redis available but PostgreSQL down
Problem: Session cleaned from Redis but audit log fails
Solution:
async def _handle_expired_key(self, key: str):
try:
# 1. Clean Redis (critical)
tenant_id = await self._find_tenant_for_session(session_id)
await self.redis.srem(f"tenant:{tenant_id}:active_sessions", session_id)
await self.redis.decr(f"tenant:{tenant_id}:seat_count")
# 2. Audit log (best effort)
try:
await self._log_session_expiry(tenant_id, session_id)
except Exception as e:
# Don't fail cleanup if audit log fails
logging.error(f"Audit log failed (non-critical): {e}")
# Add to retry queue for later
await self.audit_retry_queue.put({
"tenant_id": tenant_id,
"session_id": session_id,
"timestamp": datetime.utcnow()
})
except Exception as e:
logging.error(f"Critical error in cleanup: {e}", exc_info=True)
# Alert ops team
await send_alert("Zombie cleanup failed", str(e))
Monitoring & Alerts
Prometheus Metrics
from prometheus_client import Counter, Gauge, Histogram
# Session expiry counter
session_expired = Counter(
'license_sessions_expired_total',
'Total expired sessions cleaned up',
['reason'] # timeout, manual_release, error
)
# Active cleanup queue size
cleanup_queue_size = Gauge(
'cleanup_queue_size',
'Number of sessions waiting for cleanup'
)
# Cleanup latency
cleanup_latency = Histogram(
'cleanup_latency_seconds',
'Time to process session cleanup'
)
# Usage
with cleanup_latency.time():
await handle_expired_key(key)
session_expired.labels(reason='timeout').inc()
Grafana Dashboard
panels:
- title: "Session Expirations (Last Hour)"
metric: rate(license_sessions_expired_total[5m])
alert_threshold: > 100 per minute
- title: "Cleanup Queue Depth"
metric: cleanup_queue_size
alert_threshold: > 500
- title: "Cleanup Latency (p99)"
metric: histogram_quantile(0.99, cleanup_latency_seconds)
alert_threshold: > 5 seconds
Alerts
# PagerDuty / Alertmanager
alerts:
- name: HighSessionExpirationRate
condition: rate(license_sessions_expired_total[5m]) > 100
severity: warning
message: "High session expiration rate detected (possible outage)"
- name: CleanupQueueBacklog
condition: cleanup_queue_size > 1000
severity: critical
message: "Cleanup queue backlog - worker may be stuck"
- name: CleanupWorkerDown
condition: up{job="cleanup-worker"} == 0
severity: critical
message: "Zombie session cleanup worker is down"
Testing
Unit Tests
@pytest.mark.asyncio
async def test_session_expiry_decrements_seat_count(redis_mock, db):
"""Test that expired session decrements tenant seat count."""
# Arrange
tenant_id = "test-tenant-123"
session_id = "session-abc"
# Setup initial state
redis_mock.set(f"tenant:{tenant_id}:seat_count", 5)
redis_mock.sadd(f"tenant:{tenant_id}:active_sessions", session_id)
# Act
cleanup = ZombieSessionCleanup(redis_mock, db)
await cleanup._handle_expired_key(f"session:{session_id}")
# Assert
new_count = redis_mock.get(f"tenant:{tenant_id}:seat_count")
assert new_count == 4
is_member = redis_mock.sismember(
f"tenant:{tenant_id}:active_sessions",
session_id
)
assert not is_member
@pytest.mark.asyncio
async def test_cleanup_creates_audit_log(redis_mock, db):
"""Test that cleanup creates audit log entry."""
# Arrange
tenant_id = "test-tenant-123"
session_id = "session-abc"
# Act
cleanup = ZombieSessionCleanup(redis_mock, db)
await cleanup._log_session_expiry(tenant_id, session_id)
# Assert
audit = await db.execute(
select(AuditLog).where(
AuditLog.action == "SESSION_EXPIRED",
AuditLog.metadata['session_id'].astext == session_id
)
)
assert audit.scalar_one_or_none() is not None
Integration Tests
@pytest.mark.integration
async def test_full_zombie_cleanup_flow(real_redis, real_db):
"""Test complete zombie session cleanup with real Redis."""
# 1. Acquire license (creates session)
session_id = await acquire_license()
# 2. Get initial seat count
initial_count = await real_redis.get(f"tenant:{tenant_id}:seat_count")
# 3. Simulate client crash (no heartbeat)
# Wait for session to expire (TTL = 6 seconds for test)
await asyncio.sleep(7)
# 4. Verify session cleaned up
exists = await real_redis.exists(f"session:{session_id}")
assert not exists
# 5. Verify seat count decremented
new_count = await real_redis.get(f"tenant:{tenant_id}:seat_count")
assert new_count == initial_count - 1
# 6. Verify audit log created
audit = await real_db.execute(
select(AuditLog).where(AuditLog.action == "SESSION_EXPIRED")
)
assert audit.scalar_one_or_none() is not None
Status: Specification Complete ✅ Implementation: Pending (Phase 2) Dependencies: Redis keyspace notifications, Cleanup worker ETA: 3 hours
Last Updated: November 23, 2025 Owner: Backend Team Reviewed By: DevOps Team