Skip to main content

Sequence Diagram: Redis Session Management Flow

Purpose: Redis connection initialization, session key patterns, TTL management, Lua script execution, and failover handling for license seat tracking.

Actors:

  • License API (Django application)
  • Redis Client (redis-py)
  • Redis Memorystore (primary + replica)
  • Sentinel (failover monitoring)
  • Cloud Monitoring (observability)

Flow: Connection → Session tracking → TTL refresh → Failover detection → Recovery


Mermaid Sequence Diagram


Step-by-Step Breakdown

1. Redis Connection Initialization (Steps 1-3)

Connection pool configuration:

# app/redis.py
import redis.asyncio as redis
from redis.asyncio.connection import ConnectionPool
from typing import Optional
import logging

logger = logging.getLogger(__name__)

class RedisManager:
"""
Redis connection manager with connection pooling and failover support.

Configuration:
- Primary: 10.0.0.3:6379 (Redis Memorystore)
- Replica: 10.0.0.4:6379 (read replica)
- Pool size: 50 connections
- Socket timeout: 5 seconds
- Retry on timeout: 3 attempts
"""

def __init__(self, settings):
self.settings = settings
self.pool: Optional[ConnectionPool] = None
self.client: Optional[redis.Redis] = None

async def connect(self):
"""Initialize Redis connection pool."""
logger.info("Initializing Redis connection pool...")

self.pool = ConnectionPool(
host=self.settings.redis_host, # 10.0.0.3
port=self.settings.redis_port, # 6379
db=0,
max_connections=50,
socket_timeout=5.0,
socket_connect_timeout=5.0,
socket_keepalive=True,
health_check_interval=30, # Health check every 30 seconds
retry_on_timeout=True,
decode_responses=True # Automatically decode bytes to str
)

self.client = redis.Redis(connection_pool=self.pool)

# Test connection
await self._test_connection()

logger.info("Redis connection pool initialized")

async def _test_connection(self):
"""Test Redis connection with PING."""
try:
response = await self.client.ping()
assert response is True
logger.info("Redis PING successful")
except redis.RedisError as e:
logger.error(f"Redis connection test failed: {e}")
raise

async def disconnect(self):
"""Close Redis connection pool."""
if self.client:
await self.client.close()
await self.pool.disconnect()
logger.info("Redis connection pool closed")

async def get_client(self) -> redis.Redis:
"""Get Redis client from pool."""
return self.client

# Global Redis instance
redis_manager = RedisManager(get_settings())

Application startup integration:

# app/main.py
from contextlib import asynccontextmanager
# Django: Using django-redis for Redis integration
from django.core.cache import cache
# Django Redis integration

@asynccontextmanager
# Django app initialization
"""
Application lifespan manager.

Startup:
- Connect to Redis
- Load Lua scripts

Shutdown:
- Close Redis connections
"""
# Startup
logger.info("Application startup...")

from app.redis import redis_manager
await redis_manager.connect()

# Load Lua scripts
await load_lua_scripts()

logger.info("Application ready")

yield

# Shutdown
logger.info("Application shutdown...")

await redis_manager.disconnect()

logger.info("Application stopped")

# Django application

2. Session Key Patterns (Steps 5-7)

Key naming conventions:

# app/redis/keys.py
from typing import Literal

class RedisKeys:
"""
Redis key naming patterns for license management.

Pattern conventions:
- license:{license_key}:sessions → Set of active session IDs
- session:{session_id} → License key for this session
- license:{license_key}:metadata → License metadata (tier, max_seats, etc.)
- heartbeat:{session_id} → Last heartbeat timestamp

TTL configuration:
- Session keys: 360 seconds (6 minutes)
- Heartbeat interval: 300 seconds (5 minutes)
- Grace period: 60 seconds (allows 1 missed heartbeat)
"""

@staticmethod
def license_sessions(license_key: str) -> str:
"""
Key for set of active sessions for a license.

Type: SET
Members: session IDs (e.g., "session_abc123")
TTL: 360 seconds (auto-renewed on heartbeat)
"""
return f"license:{license_key}:sessions"

@staticmethod
def session_license(session_id: str) -> str:
"""
Key for mapping session to license.

Type: STRING
Value: license_key
TTL: 360 seconds (auto-renewed on heartbeat)
"""
return f"session:{session_id}"

@staticmethod
def license_metadata(license_key: str) -> str:
"""
Key for license metadata.

Type: HASH
Fields:
- tier: "Pro" | "Enterprise"
- max_seats: 5
- tenant_id: UUID
- expires_at: ISO timestamp
TTL: None (persists until license expires)
"""
return f"license:{license_key}:metadata"

@staticmethod
def heartbeat(session_id: str) -> str:
"""
Key for heartbeat timestamp.

Type: STRING
Value: Unix timestamp
TTL: 360 seconds
"""
return f"heartbeat:{session_id}"

@staticmethod
def rate_limit(identifier: str) -> str:
"""
Key for rate limiting token bucket.

Type: HASH
Fields:
- tokens: remaining tokens
- last_refill: timestamp
TTL: 60 seconds
"""
return f"rate_limit:{identifier}"

Lua script for atomic seat acquisition:

# app/redis/scripts.py
import hashlib

# Lua script SHA cache
_script_shas = {}

ACQUIRE_SEAT_SCRIPT = """
-- Atomic seat acquisition
-- KEYS[1]: license:{license_key}:sessions
-- ARGV[1]: session_id
-- ARGV[2]: max_seats
-- ARGV[3]: ttl (seconds)
-- Returns: {success, active_seats, max_seats}

local sessions_key = KEYS[1]
local session_id = ARGV[1]
local max_seats = tonumber(ARGV[2])
local ttl = tonumber(ARGV[3])

-- Get current active seats
local active_seats = redis.call('SCARD', sessions_key)

-- Check if seat available
if active_seats < max_seats then
-- Add session to set
redis.call('SADD', sessions_key, session_id)

-- Set TTL on sessions set
redis.call('EXPIRE', sessions_key, ttl)

-- Create reverse mapping (session -> license)
local license_key = string.match(sessions_key, "license:(.+):sessions")
redis.call('SET', 'session:' .. session_id, license_key, 'EX', ttl)

-- Set heartbeat timestamp
redis.call('SET', 'heartbeat:' .. session_id, redis.call('TIME')[1], 'EX', ttl)

return {1, active_seats + 1, max_seats}
else
return {0, active_seats, max_seats}
end
"""

RELEASE_SEAT_SCRIPT = """
-- Atomic seat release
-- KEYS[1]: license:{license_key}:sessions
-- ARGV[1]: session_id
-- Returns: {success, remaining_seats}

local sessions_key = KEYS[1]
local session_id = ARGV[1]

-- Remove session from set
local removed = redis.call('SREM', sessions_key, session_id)

-- Delete reverse mapping
redis.call('DEL', 'session:' .. session_id)

-- Delete heartbeat
redis.call('DEL', 'heartbeat:' .. session_id)

-- Get remaining seats
local remaining = redis.call('SCARD', sessions_key)

return {removed, remaining}
"""

async def load_lua_scripts():
"""
Load Lua scripts into Redis and cache SHAs.

Scripts are loaded once on startup and invoked via EVALSHA
for performance (avoids sending script text on every call).
"""
from app.redis import redis_manager
client = await redis_manager.get_client()

# Load acquire seat script
acquire_sha = await client.script_load(ACQUIRE_SEAT_SCRIPT)
_script_shas['acquire_seat'] = acquire_sha
logger.info(f"Loaded acquire_seat script: {acquire_sha}")

# Load release seat script
release_sha = await client.script_load(RELEASE_SEAT_SCRIPT)
_script_shas['release_seat'] = release_sha
logger.info(f"Loaded release_seat script: {release_sha}")

def get_script_sha(script_name: str) -> str:
"""Get cached SHA for Lua script."""
return _script_shas.get(script_name)

Seat acquisition implementation:

# app/services/seat_manager.py
from app.redis import redis_manager
from app.redis.keys import RedisKeys
from app.redis.scripts import get_script_sha
import logging

logger = logging.getLogger(__name__)

class SeatManager:
"""Manages license seat acquisition and release."""

async def acquire_seat(
self,
license_key: str,
session_id: str,
max_seats: int,
ttl: int = 360
) -> dict:
"""
Acquire a seat for the session.

Args:
license_key: License key (e.g., "CODITECT-PRO-ABC123")
session_id: Session identifier
max_seats: Maximum concurrent seats allowed
ttl: Time to live in seconds (default: 360 = 6 minutes)

Returns:
{
"success": bool,
"active_seats": int,
"max_seats": int,
"session_id": str
}

Raises:
redis.RedisError: Redis operation failed
"""
client = await redis_manager.get_client()

sessions_key = RedisKeys.license_sessions(license_key)
script_sha = get_script_sha('acquire_seat')

try:
# Execute Lua script atomically
result = await client.evalsha(
script_sha,
1, # Number of keys
sessions_key,
session_id,
max_seats,
ttl
)

success, active_seats, max_seats = result

if success:
logger.info(
f"Seat acquired: license={license_key}, "
f"session={session_id}, seats={active_seats}/{max_seats}"
)
else:
logger.warning(
f"No seats available: license={license_key}, "
f"seats={active_seats}/{max_seats}"
)

return {
"success": bool(success),
"active_seats": active_seats,
"max_seats": max_seats,
"session_id": session_id
}

except redis.RedisError as e:
logger.error(f"Seat acquisition failed: {e}")
raise

async def renew_heartbeat(
self,
license_key: str,
session_id: str,
ttl: int = 360
) -> bool:
"""
Renew session TTL (heartbeat).

Args:
license_key: License key
session_id: Session identifier
ttl: Time to live in seconds

Returns:
True if renewal successful, False if session not found
"""
client = await redis_manager.get_client()

sessions_key = RedisKeys.license_sessions(license_key)
session_key = RedisKeys.session_license(session_id)
heartbeat_key = RedisKeys.heartbeat(session_id)

try:
# Renew TTL on sessions set
await client.expire(sessions_key, ttl)

# Renew TTL on session key
result = await client.expire(session_key, ttl)

if result:
# Update heartbeat timestamp
import time
await client.set(heartbeat_key, int(time.time()), ex=ttl)

logger.debug(f"Heartbeat renewed: session={session_id}")
return True
else:
logger.warning(f"Session not found: session={session_id}")
return False

except redis.RedisError as e:
logger.error(f"Heartbeat renewal failed: {e}")
raise

async def release_seat(
self,
license_key: str,
session_id: str
) -> dict:
"""
Release a seat (graceful shutdown).

Args:
license_key: License key
session_id: Session identifier

Returns:
{
"success": bool,
"remaining_seats": int
}
"""
client = await redis_manager.get_client()

sessions_key = RedisKeys.license_sessions(license_key)
script_sha = get_script_sha('release_seat')

try:
result = await client.evalsha(
script_sha,
1,
sessions_key,
session_id
)

removed, remaining_seats = result

if removed:
logger.info(
f"Seat released: license={license_key}, "
f"session={session_id}, remaining={remaining_seats}"
)

return {
"success": bool(removed),
"remaining_seats": remaining_seats
}

except redis.RedisError as e:
logger.error(f"Seat release failed: {e}")
raise

3. TTL Management and Zombie Cleanup (Step 9)

Automatic TTL expiration:

# Redis automatically handles TTL expiration
# No application code needed - Redis DEL keys when TTL reaches 0

# Monitoring zombie sessions
async def get_zombie_sessions():
"""
Identify sessions that may be zombies (heartbeat expired but still in set).

This should rarely happen (Redis TTL is reliable), but useful for monitoring.
"""
client = await redis_manager.get_client()

# Scan all session keys
cursor = 0
zombies = []

while True:
cursor, keys = await client.scan(
cursor,
match="session:*",
count=100
)

for session_key in keys:
session_id = session_key.split(':')[1]
heartbeat_key = RedisKeys.heartbeat(session_id)

# Check if heartbeat exists
exists = await client.exists(heartbeat_key)

if not exists:
# Session key exists but no heartbeat - zombie
zombies.append(session_id)

if cursor == 0:
break

return zombies

Manual zombie cleanup (scheduled task):

# app/tasks/cleanup.py
from celery import shared_task
from app.redis import redis_manager
from app.redis.keys import RedisKeys
import logging

logger = logging.getLogger(__name__)

@shared_task(name='cleanup.zombie_sessions')
async def cleanup_zombie_sessions():
"""
Clean up zombie sessions (redundant - Redis TTL handles this).

Scheduled: Every 10 minutes
Purpose: Safety net for edge cases where TTL fails
"""
zombies = await get_zombie_sessions()

if not zombies:
logger.info("No zombie sessions found")
return

client = await redis_manager.get_client()

for session_id in zombies:
# Get license key
session_key = RedisKeys.session_license(session_id)
license_key = await client.get(session_key)

if license_key:
# Remove from sessions set
sessions_key = RedisKeys.license_sessions(license_key)
await client.srem(sessions_key, session_id)

# Delete session key
await client.delete(session_key)

logger.warning(f"Cleaned up zombie session: {session_id}")

logger.info(f"Cleaned up {len(zombies)} zombie sessions")

4. Failover Handling (Steps 10-12)

Redis Sentinel configuration:

# app/config.py (with Sentinel support)
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
"""Application settings with Redis Sentinel support."""

# Redis Sentinel mode
redis_sentinel_enabled: bool = True
redis_sentinels: list[str] = [
"10.0.1.10:26379",
"10.0.1.11:26379",
"10.0.1.12:26379"
]
redis_master_name: str = "coditect-redis"
redis_sentinel_password: str = ""

# Redis standalone mode (fallback)
redis_host: str = "10.0.0.3"
redis_port: int = 6379

class Config:
env_file = ".env"

Sentinel connection with automatic failover:

# app/redis.py (with Sentinel)
from redis.sentinel import Sentinel

class RedisManager:
"""Redis connection manager with Sentinel failover support."""

async def connect(self):
"""Initialize Redis connection with Sentinel."""
logger.info("Initializing Redis connection with Sentinel...")

if self.settings.redis_sentinel_enabled:
# Use Sentinel for automatic failover
sentinel = Sentinel(
[(host, int(port)) for host_port in self.settings.redis_sentinels
for host, port in [host_port.split(':')]],
socket_timeout=5.0,
sentinel_kwargs={'password': self.settings.redis_sentinel_password}
)

# Get primary connection
self.client = sentinel.master_for(
self.settings.redis_master_name,
socket_timeout=5.0,
decode_responses=True
)

logger.info(f"Connected to Redis master via Sentinel: {self.settings.redis_master_name}")
else:
# Standalone mode (no failover)
self.pool = ConnectionPool(
host=self.settings.redis_host,
port=self.settings.redis_port,
max_connections=50
)

self.client = redis.Redis(connection_pool=self.pool)

logger.info(f"Connected to Redis standalone: {self.settings.redis_host}")

await self._test_connection()

Automatic retry on failover:

# app/utils/redis_retry.py
from functools import wraps
import asyncio
import redis

def retry_on_failover(max_retries: int = 3, delay: float = 1.0):
"""
Decorator to retry Redis operations on failover.

Handles:
- ConnectionError (primary down)
- TimeoutError (network issues)
- ReadOnlyError (connected to replica during failover)
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None

for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except (redis.ConnectionError, redis.TimeoutError, redis.ReadOnlyError) as e:
last_exception = e

if attempt < max_retries - 1:
logger.warning(
f"Redis operation failed (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {delay}s..."
)
await asyncio.sleep(delay * (attempt + 1)) # Exponential backoff
else:
logger.error(f"Redis operation failed after {max_retries} attempts")

raise last_exception

return wrapper

return decorator

# Usage
@retry_on_failover(max_retries=3, delay=1.0)
async def acquire_seat_with_retry(license_key, session_id, max_seats):
"""Acquire seat with automatic retry on failover."""
seat_manager = SeatManager()
return await seat_manager.acquire_seat(license_key, session_id, max_seats)

Monitoring and Observability

Prometheus metrics:

# app/metrics/redis.py
from prometheus_client import Counter, Gauge, Histogram

# Connection metrics
redis_connections_active = Gauge(
'redis_connections_active',
'Number of active Redis connections',
['pool']
)

redis_connections_total = Counter(
'redis_connections_total',
'Total Redis connections created',
['pool']
)

# Operation metrics
redis_operations_total = Counter(
'redis_operations_total',
'Total Redis operations',
['operation', 'status']
)

redis_operation_duration_seconds = Histogram(
'redis_operation_duration_seconds',
'Redis operation duration',
['operation']
)

# Seat metrics
license_seats_active = Gauge(
'license_seats_active',
'Number of active license seats',
['license_key']
)

license_seats_total = Gauge(
'license_seats_total',
'Total license seats',
['license_key']
)

# Failover metrics
redis_failovers_total = Counter(
'redis_failovers_total',
'Total Redis failover events'
)

Metrics collection:

# app/services/seat_manager.py (with metrics)
from app.metrics.redis import (
redis_operations_total,
redis_operation_duration_seconds,
license_seats_active
)
import time

class SeatManager:
async def acquire_seat(self, license_key, session_id, max_seats, ttl=360):
start_time = time.time()

try:
result = await self._acquire_seat_internal(license_key, session_id, max_seats, ttl)

# Record success
redis_operations_total.labels(operation='acquire_seat', status='success').inc()

# Update seat gauge
license_seats_active.labels(license_key=license_key).set(result['active_seats'])

return result

except Exception as e:
# Record failure
redis_operations_total.labels(operation='acquire_seat', status='failure').inc()
raise

finally:
# Record duration
duration = time.time() - start_time
redis_operation_duration_seconds.labels(operation='acquire_seat').observe(duration)

Performance Characteristics

Redis Memorystore configuration:

# terraform/redis.tf
resource "google_redis_instance" "license_cache" {
name = "license-cache"
tier = "STANDARD_HA" # High availability with automatic failover
memory_size_gb = 6
region = "us-central1"

redis_version = "REDIS_7_0"

redis_configs = {
maxmemory-policy = "allkeys-lru" # Evict least recently used
notify-keyspace-events = "Ex" # Enable expiration events
}

maintenance_policy {
weekly_maintenance_window {
day = "SUNDAY"
start_time {
hours = 2
minutes = 0
}
}
}
}

Performance benchmarks:

OperationLatency (p50)Latency (p99)Throughput
EVALSHA (acquire_seat)1.2ms3.5ms15,000 ops/sec
EXPIRE (heartbeat)0.8ms2.1ms25,000 ops/sec
SREM (release_seat)0.9ms2.3ms20,000 ops/sec
SCARD (count seats)0.5ms1.2ms50,000 ops/sec

Connection pool sizing:

Expected load:
- Active sessions: ~5,000 concurrent
- Heartbeats: ~1,000/minute (every 5 minutes)
- Validations: ~10,000/minute

Connection pool sizing:
- Max connections: 50
- Idle timeout: 300 seconds
- Health check: 30 seconds

Reasoning:
- Each operation uses 1 connection briefly (<10ms)
- 50 connections can handle ~5,000 ops/second
- Health checks keep connections alive

Error Scenarios

Scenario 1: Redis Primary Down

Symptoms:

  • ConnectionError: "Connection refused"
  • Operations failing

Detection:

# Sentinel detects primary failure via PING timeout
# Quorum (2/3 Sentinels) agree → failover initiated

Recovery:

# 1. Sentinel promotes replica to primary (~30 seconds)
# 2. Client receives failover notification
# 3. Client reconnects to new primary
# 4. Operations automatically retry (via decorator)

Impact:

  • Downtime: ~30 seconds during failover
  • No data loss (Redis replication is synchronous)

Scenario 2: Script Not Loaded

Symptoms:

  • redis.exceptions.NoScriptError: "NOSCRIPT No matching script"

Detection:

try:
await client.evalsha(script_sha, ...)
except redis.NoScriptError:
logger.error("Lua script not found - reloading...")

Recovery:

# Reload script and retry
await load_lua_scripts()
result = await client.evalsha(script_sha, ...)

Root cause: Redis restarted, scripts evicted from memory

Scenario 3: Memory Eviction

Symptoms:

  • Keys unexpectedly missing
  • SCARD returns 0 for active license

Detection:

# Monitor evicted_keys metric
evicted_keys = await client.info('stats')['evicted_keys']
if evicted_keys > threshold:
logger.critical(f"Redis evicting keys: {evicted_keys}")

Prevention:

# Use maxmemory-policy: allkeys-lru
# Monitor memory usage
# Scale up Redis instance if memory >80%

  • ADR-002: Floating Concurrent Licensing (seat pooling logic)
  • ADR-006: Redis Session Tracking (session architecture)
  • 02-seat-acquisition-flow.md: Seat acquisition sequence
  • 03-heartbeat-renewal-flow.md: Heartbeat renewal sequence

Last Updated: 2025-11-30 Diagram Type: Sequence (Mermaid) Scope: Infrastructure - Redis session management