Skip to main content

Zombie Session Cleanup - CODITECT Licensing Platform

Overview

Zombie sessions occur when CODITECT clients crash, lose network connectivity, or terminate abnormally without sending a graceful license release. The cleanup mechanism automatically reclaims seats after session TTL expiry using Redis keyspace notifications.

Sequence Diagram

Redis Keyspace Notifications

Configuration

Enable keyspace notifications in Redis:

# In redis.conf
notify-keyspace-events "Ex"

# Or via CLI
redis-cli CONFIG SET notify-keyspace-events Ex

What this enables:

  • E - Keyevent events (events published to __keyevent@<db>__:<event> channel)
  • x - Expired events

Subscription Pattern

Subscribe to expired events:

import redis

r = redis.Redis(host='redis-host', port=6379, db=0)
pubsub = r.pubsub()

# Subscribe to expired events on database 0
pubsub.psubscribe('__keyevent@0__:expired')

for message in pubsub.listen():
if message['type'] == 'pmessage':
expired_key = message['data'].decode('utf-8')
if expired_key.startswith('session:'):
handle_session_expiry(expired_key)

Implementation

Cleanup Worker (Python)

import asyncio
import redis.asyncio as redis
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime

class ZombieSessionCleanup:
"""Background worker to clean up expired sessions."""

def __init__(
self,
redis_url: str,
db: AsyncSession,
pattern: str = '__keyevent@0__:expired'
):
self.redis_url = redis_url
self.db = db
self.pattern = pattern
self.running = False

async def start(self):
"""Start listening for expired session events."""
self.redis = await redis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()

await self.pubsub.psubscribe(self.pattern)
self.running = True

logging.info(f"Zombie session cleanup started (pattern: {self.pattern})")

await self._listen_loop()

async def stop(self):
"""Stop the cleanup worker."""
self.running = False
await self.pubsub.unsubscribe(self.pattern)
await self.redis.close()
logging.info("Zombie session cleanup stopped")

async def _listen_loop(self):
"""Main event listening loop."""
while self.running:
try:
message = await self.pubsub.get_message(
ignore_subscribe_messages=True,
timeout=1.0
)

if message and message['type'] == 'pmessage':
expired_key = message['data'].decode('utf-8')
await self._handle_expired_key(expired_key)

except Exception as e:
logging.error(f"Error in cleanup loop: {e}", exc_info=True)
await asyncio.sleep(1) # Brief pause before retry

async def _handle_expired_key(self, key: str):
"""Handle an expired Redis key."""
# Only process session keys
if not key.startswith('session:'):
return

session_id = key.replace('session:', '')
logging.info(f"Processing expired session: {session_id}")

try:
# Find which tenant this session belonged to
# We need to scan all tenant active_session sets
tenant_id = await self._find_tenant_for_session(session_id)

if not tenant_id:
logging.warning(f"No tenant found for expired session: {session_id}")
return

# Remove from tenant's active sessions set
removed = await self.redis.srem(
f"tenant:{tenant_id}:active_sessions",
session_id
)

if removed:
# Decrement seat count
new_count = await self.redis.decr(f"tenant:{tenant_id}:seat_count")
logging.info(
f"Seat reclaimed for tenant {tenant_id} "
f"(session: {session_id}, new count: {new_count})"
)

# Audit log
await self._log_session_expiry(tenant_id, session_id)

# Metrics
from prometheus_client import Counter
session_expired_counter.labels(reason='timeout').inc()

else:
logging.warning(
f"Session {session_id} not found in tenant {tenant_id} active set"
)

except Exception as e:
logging.error(f"Error handling expired session {session_id}: {e}", exc_info=True)

async def _find_tenant_for_session(self, session_id: str) -> str:
"""Find which tenant owns this session by scanning active_sessions sets."""
# Pattern: tenant:*:active_sessions
cursor = 0
while True:
cursor, keys = await self.redis.scan(
cursor=cursor,
match="tenant:*:active_sessions",
count=100
)

for key in keys:
key_str = key.decode('utf-8') if isinstance(key, bytes) else key

# Check if session_id is in this set
is_member = await self.redis.sismember(key_str, session_id)
if is_member:
# Extract tenant_id from key
# Format: tenant:{tenant_id}:active_sessions
tenant_id = key_str.split(':')[1]
return tenant_id

if cursor == 0:
break

return None

async def _log_session_expiry(self, tenant_id: str, session_id: str):
"""Log session expiry to audit log."""
from models import AuditLog

audit = AuditLog(
tenant_id=tenant_id,
action="SESSION_EXPIRED",
resource_type="session",
resource_id=session_id,
metadata={
"session_id": session_id,
"reason": "ttl_expired",
"timestamp": datetime.utcnow().isoformat()
}
)

self.db.add(audit)
await self.db.commit()


# Start worker on application startup (Django AppConfig)
# apps/licenses/apps.py
from django.apps import AppConfig
from django.conf import settings
import threading

class LicensesConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.licenses'
cleanup_worker = None
cleanup_thread = None

def ready(self):
"""Initialize cleanup worker on application startup."""
if settings.ENABLE_ZOMBIE_CLEANUP:
from apps.licenses.services.zombie_cleanup import ZombieSessionCleanup

self.cleanup_worker = ZombieSessionCleanup(
redis_url=settings.REDIS_URL
)

# Start cleanup worker in background thread
self.cleanup_thread = threading.Thread(
target=self.cleanup_worker.start,
daemon=True
)
self.cleanup_thread.start()

# Shutdown handling via Django signals
# apps/licenses/signals.py
from django.core.signals import request_finished
from django.dispatch import receiver
from django.apps import apps

@receiver(request_finished)
def cleanup_on_shutdown(sender, **kwargs):
"""Stop cleanup worker on application shutdown."""
config = apps.get_app_config('licenses')
if config.cleanup_worker:
config.cleanup_worker.stop()

Alternative: Lua Script for Atomic Cleanup

More efficient approach using Redis Lua script:

-- cleanup_expired_session.lua
-- Called when a session key expires

local session_id = KEYS[1] -- e.g., "session:abc123"

-- Find tenant by scanning active_sessions sets
local cursor = 0
local found_tenant = nil

repeat
local result = redis.call('SCAN', cursor, 'MATCH', 'tenant:*:active_sessions', 'COUNT', 100)
cursor = result[1]
local keys = result[2]

for _, key in ipairs(keys) do
local is_member = redis.call('SISMEMBER', key, session_id)
if is_member == 1 then
-- Extract tenant_id from key (format: tenant:{id}:active_sessions)
found_tenant = string.match(key, "tenant:([^:]+):active_sessions")
break
end
end

until cursor == "0" or found_tenant

if found_tenant then
-- Remove from active sessions
redis.call('SREM', 'tenant:' .. found_tenant .. ':active_sessions', session_id)

-- Decrement seat count
local new_count = redis.call('DECR', 'tenant:' .. found_tenant .. ':seat_count')

return {found_tenant, new_count}
else
return nil
end

Load and use script:

# Load script
with open('cleanup_expired_session.lua', 'r') as f:
cleanup_script = f.read()

script_sha = redis.script_load(cleanup_script)

# On keyspace notification
async def handle_expired_key(key: str):
if key.startswith('session:'):
result = await redis.evalsha(
script_sha,
1, # Number of keys
key # KEYS[1]
)

if result:
tenant_id, new_count = result
logging.info(f"Cleaned session {key}: tenant={tenant_id}, seats={new_count}")

Edge Cases

1. Session Expires During Heartbeat

Scenario:

  1. Client sends heartbeat at T=359s (1 second before expiry)
  2. Network delay causes heartbeat to arrive at T=361s
  3. Session already expired and cleaned up

Handling:

from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from django_redis import get_redis_connection

@api_view(['PUT'])
@permission_classes([IsAuthenticated])
def heartbeat(request):
"""Extend session TTL via heartbeat (handles expired sessions gracefully)."""
# Get session_id from request
session_id = request.data.get('session_id')

# Get Redis connection
redis_client = get_redis_connection()

# Check if session exists
if not redis_client.exists(f"session:{session_id}"):
# Session expired - return 404
return Response(
{"detail": "Session expired"},
status=status.HTTP_404_NOT_FOUND
)

# Extend TTL
redis_client.expire(f"session:{session_id}", 360)

return Response(
{"status": "alive"},
status=status.HTTP_200_OK
)

Client recovery:

async def _send_heartbeat(self):
response = await self.api.heartbeat(self.session_id)

if response.status == 404:
logging.warning("Session expired - reacquiring license")
await self._reacquire_license()

2. Rapid Session Churn

Scenario: Many sessions expiring simultaneously (e.g., datacenter outage)

Problem: Cleanup worker overwhelmed

Solution:

import asyncio
from asyncio import Queue

class ZombieSessionCleanup:
def __init__(self, ...):
# ...
self.cleanup_queue = Queue(maxsize=1000)
self.worker_count = 5

async def start(self):
# Start multiple worker tasks
self.workers = [
asyncio.create_task(self._cleanup_worker(i))
for i in range(self.worker_count)
]

# Listen for expired events
await self._listen_loop()

async def _listen_loop(self):
while self.running:
message = await self.pubsub.get_message(...)

if message:
expired_key = message['data'].decode('utf-8')
# Add to queue instead of processing directly
await self.cleanup_queue.put(expired_key)

async def _cleanup_worker(self, worker_id: int):
"""Worker task that processes cleanup queue."""
while self.running:
try:
key = await asyncio.wait_for(
self.cleanup_queue.get(),
timeout=1.0
)
await self._handle_expired_key(key)
except asyncio.TimeoutError:
continue

3. Network Partition During Cleanup

Scenario: Redis available but PostgreSQL down

Problem: Session cleaned from Redis but audit log fails

Solution:

async def _handle_expired_key(self, key: str):
try:
# 1. Clean Redis (critical)
tenant_id = await self._find_tenant_for_session(session_id)
await self.redis.srem(f"tenant:{tenant_id}:active_sessions", session_id)
await self.redis.decr(f"tenant:{tenant_id}:seat_count")

# 2. Audit log (best effort)
try:
await self._log_session_expiry(tenant_id, session_id)
except Exception as e:
# Don't fail cleanup if audit log fails
logging.error(f"Audit log failed (non-critical): {e}")

# Add to retry queue for later
await self.audit_retry_queue.put({
"tenant_id": tenant_id,
"session_id": session_id,
"timestamp": datetime.utcnow()
})

except Exception as e:
logging.error(f"Critical error in cleanup: {e}", exc_info=True)
# Alert ops team
await send_alert("Zombie cleanup failed", str(e))

Monitoring & Alerts

Prometheus Metrics

from prometheus_client import Counter, Gauge, Histogram

# Session expiry counter
session_expired = Counter(
'license_sessions_expired_total',
'Total expired sessions cleaned up',
['reason'] # timeout, manual_release, error
)

# Active cleanup queue size
cleanup_queue_size = Gauge(
'cleanup_queue_size',
'Number of sessions waiting for cleanup'
)

# Cleanup latency
cleanup_latency = Histogram(
'cleanup_latency_seconds',
'Time to process session cleanup'
)

# Usage
with cleanup_latency.time():
await handle_expired_key(key)
session_expired.labels(reason='timeout').inc()

Grafana Dashboard

panels:
- title: "Session Expirations (Last Hour)"
metric: rate(license_sessions_expired_total[5m])
alert_threshold: > 100 per minute

- title: "Cleanup Queue Depth"
metric: cleanup_queue_size
alert_threshold: > 500

- title: "Cleanup Latency (p99)"
metric: histogram_quantile(0.99, cleanup_latency_seconds)
alert_threshold: > 5 seconds

Alerts

# PagerDuty / Alertmanager
alerts:
- name: HighSessionExpirationRate
condition: rate(license_sessions_expired_total[5m]) > 100
severity: warning
message: "High session expiration rate detected (possible outage)"

- name: CleanupQueueBacklog
condition: cleanup_queue_size > 1000
severity: critical
message: "Cleanup queue backlog - worker may be stuck"

- name: CleanupWorkerDown
condition: up{job="cleanup-worker"} == 0
severity: critical
message: "Zombie session cleanup worker is down"

Testing

Unit Tests

@pytest.mark.asyncio
async def test_session_expiry_decrements_seat_count(redis_mock, db):
"""Test that expired session decrements tenant seat count."""
# Arrange
tenant_id = "test-tenant-123"
session_id = "session-abc"

# Setup initial state
redis_mock.set(f"tenant:{tenant_id}:seat_count", 5)
redis_mock.sadd(f"tenant:{tenant_id}:active_sessions", session_id)

# Act
cleanup = ZombieSessionCleanup(redis_mock, db)
await cleanup._handle_expired_key(f"session:{session_id}")

# Assert
new_count = redis_mock.get(f"tenant:{tenant_id}:seat_count")
assert new_count == 4

is_member = redis_mock.sismember(
f"tenant:{tenant_id}:active_sessions",
session_id
)
assert not is_member


@pytest.mark.asyncio
async def test_cleanup_creates_audit_log(redis_mock, db):
"""Test that cleanup creates audit log entry."""
# Arrange
tenant_id = "test-tenant-123"
session_id = "session-abc"

# Act
cleanup = ZombieSessionCleanup(redis_mock, db)
await cleanup._log_session_expiry(tenant_id, session_id)

# Assert
audit = await db.execute(
select(AuditLog).where(
AuditLog.action == "SESSION_EXPIRED",
AuditLog.metadata['session_id'].astext == session_id
)
)
assert audit.scalar_one_or_none() is not None

Integration Tests

@pytest.mark.integration
async def test_full_zombie_cleanup_flow(real_redis, real_db):
"""Test complete zombie session cleanup with real Redis."""
# 1. Acquire license (creates session)
session_id = await acquire_license()

# 2. Get initial seat count
initial_count = await real_redis.get(f"tenant:{tenant_id}:seat_count")

# 3. Simulate client crash (no heartbeat)
# Wait for session to expire (TTL = 6 seconds for test)
await asyncio.sleep(7)

# 4. Verify session cleaned up
exists = await real_redis.exists(f"session:{session_id}")
assert not exists

# 5. Verify seat count decremented
new_count = await real_redis.get(f"tenant:{tenant_id}:seat_count")
assert new_count == initial_count - 1

# 6. Verify audit log created
audit = await real_db.execute(
select(AuditLog).where(AuditLog.action == "SESSION_EXPIRED")
)
assert audit.scalar_one_or_none() is not None

Status: Specification Complete ✅ Implementation: Pending (Phase 2) Dependencies: Redis keyspace notifications, Cleanup worker ETA: 3 hours


Last Updated: November 23, 2025 Owner: Backend Team Reviewed By: DevOps Team