Skip to main content

C3 Component Diagram: Redis Components

Purpose: Component-level architecture of Redis Memorystore, showing atomic seat counting with Lua scripts, session TTL management, connection pooling, and failover strategy for multi-tenant license session tracking.

Scope: Redis Memorystore layer (6GB BASIC tier with RDB persistence)

Related Diagrams:


Mermaid Component Diagram


Component Descriptions

1. Connection Management

Connection Pool

  • Technology: django-redis with connection pooling
  • Configuration:
    CACHES = {
    'default': {
    'BACKEND': 'django_redis.cache.RedisCache',
    'LOCATION': 'redis://10.0.0.3:6379/0',
    'OPTIONS': {
    'CLIENT_CLASS': 'django_redis.client.DefaultClient',
    'CONNECTION_POOL_KWARGS': {
    'max_connections': 50,
    'retry_on_timeout': True,
    'socket_keepalive': True,
    'socket_keepalive_options': {
    1: 1, # TCP_KEEPIDLE
    2: 1, # TCP_KEEPINTVL
    3: 3, # TCP_KEEPCNT
    },
    },
    'SOCKET_CONNECT_TIMEOUT': 5, # seconds
    'SOCKET_TIMEOUT': 5, # seconds
    }
    }
    }
  • Max Connections: 50 (sufficient for 1,000+ requests/sec)
  • Connection Reuse: Persistent connections with keep-alive
  • Timeout Handling: 5-second timeout with automatic retry

django-redis Client

  • Purpose: Redis integration for Django Cache Framework
  • Features:
    • Native support for Redis data structures (SET, HASH, STRING)
    • Lua script execution via EVALSHA
    • Atomic operations for race condition prevention
    • Key serialization (pickle, JSON, msgpack)
  • Integration:
    from django.core.cache import cache

    # Django cache operations map to Redis
    cache.set('key', 'value', timeout=360) # → SET key value EX 360
    cache.get('key') # → GET key
    cache.delete('key') # → DEL key

2. Instance Configuration

Primary Instance

  • Location: us-central1-a (same zone as GKE for low latency)
  • Tier: BASIC (6GB memory, single instance)
  • Version: Redis 6.x (latest stable)
  • Network: Private IP (VPC peering, no public IP)
  • Performance: 12,000 ops/sec typical, 25,000 ops/sec burst
  • Latency: <1ms p50, <5ms p99 (same-zone access)

Why BASIC Tier:

  • Cost-effective for development ($30/month vs. $150/month for STANDARD HA)
  • Sufficient for MVP (10K concurrent sessions = 10K active keys)
  • Easy upgrade to STANDARD HA for production (no downtime)
  • RDB persistence provides crash recovery (daily snapshots)

RDB Persistence

  • Schedule: Daily snapshots at 2 AM UTC
  • Retention: 7 days (configurable)
  • Backup Destination: Google Cloud Storage (regional bucket)
  • Recovery: Automatic restore from latest snapshot on instance restart
  • Performance Impact: Minimal (snapshot during low-traffic hours)

RDB Configuration:

# Memorystore managed configuration
save 900 1 # Save after 900s (15min) if ≥1 key changed
save 300 10 # Save after 300s (5min) if ≥10 keys changed
save 60 10000 # Save after 60s if ≥10,000 keys changed
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes

AOF Disabled

  • Reason: Performance over durability (sessions are transient)
  • Trade-off: Up to 15 minutes data loss on crash (acceptable for license sessions)
  • Mitigation: RDB snapshots + PostgreSQL as source of truth
  • Production: Consider enabling AOF (appendonly yes) with everysec fsync for <1s data loss

3. Session Key Patterns

Tenant Namespace Isolation

tenant:{tenant_id}:*
  • Purpose: Isolate tenant data at key level (defense-in-depth)
  • Examples:
    • tenant:abc-123:seats - Active seat count
    • tenant:abc-123:licenses - License metadata cache
    • tenant:abc-123:rate_limit - API rate limiting

Benefits:

  • Clear ownership - Easy to identify tenant keys
  • Bulk operations - SCAN tenant:abc-123:* for tenant cleanup
  • Debugging - Filter keys by tenant in monitoring

License Session Set

license:{license_id}:sessions → SET of session_ids
  • Data Structure: SET (unordered collection of unique session IDs)
  • Operations:
    • SADD license:xyz:sessions session_123 - Add session to license
    • SREM license:xyz:sessions session_123 - Remove session
    • SCARD license:xyz:sessions - Count active sessions
    • SISMEMBER license:xyz:sessions session_123 - Check membership
  • TTL: 360 seconds (6 minutes)
  • Atomic Operations: Lua scripts prevent race conditions

Example:

# Lua script: acquire_seat.lua
local license_key = KEYS[1] # "license:xyz:sessions"
local session_id = ARGV[1] # "session_123"
local max_seats = tonumber(ARGV[2]) # 5
local ttl = tonumber(ARGV[3]) # 360

local current_seats = redis.call('SCARD', license_key)

if current_seats >= max_seats then
return {0, current_seats, max_seats} # Failure
end

redis.call('SADD', license_key, session_id)
redis.call('EXPIRE', license_key, ttl)

local new_seats = redis.call('SCARD', license_key)
return {1, new_seats, max_seats} # Success

Session Metadata Hash

session:{session_id} → HASH {field: value}
  • Data Structure: HASH (field-value pairs)
  • Fields:
    • user_id - User UUID
    • machine_id - Hardware fingerprint
    • ip_address - Client IP (for audit)
    • created_at - ISO timestamp
    • last_heartbeat - ISO timestamp
    • expires_at - ISO timestamp
  • TTL: 360 seconds (synchronized with license session SET)

Operations:

# Set session metadata
cache.hset('session:session_123', mapping={
'user_id': 'user-uuid',
'machine_id': 'hw-fingerprint',
'ip_address': '203.0.113.42',
'created_at': '2025-11-30T12:34:56Z',
'last_heartbeat': '2025-11-30T12:39:56Z',
'expires_at': '2025-11-30T12:40:56Z',
})

# Get session field
user_id = cache.hget('session:session_123', 'user_id')

# Get all session fields
session_data = cache.hgetall('session:session_123')

Seat Counter (Per Tenant)

tenant:{tenant_id}:seats → STRING (integer)
  • Data Structure: STRING (integer counter)
  • Operations:
    • INCR tenant:abc-123:seats - Increment active seats
    • DECR tenant:abc-123:seats - Decrement active seats
    • GET tenant:abc-123:seats - Get current seat count
  • Atomic: INCR/DECR operations are atomic (no race conditions)
  • No TTL: Counter persists (only modified by acquire/release)

Usage:

# Atomic seat increment
from django.core.cache import cache

# Acquire seat
current_seats = cache.incr(f'tenant:{tenant_id}:seats')
if current_seats > max_seats:
cache.decr(f'tenant:{tenant_id}:seats') # Rollback
raise SeatLimitExceeded

# Release seat
cache.decr(f'tenant:{tenant_id}:seats')

4. Lua Script Engine

Why Lua Scripts:

  • Atomicity: All operations execute as a single transaction (no race conditions)
  • Performance: Single network round-trip (vs. multiple Redis commands)
  • Correctness: Guaranteed seat limit enforcement under high concurrency

acquire_seat.lua

-- KEYS[1]: license:{license_id}:sessions
-- ARGV[1]: session_id
-- ARGV[2]: max_seats
-- ARGV[3]: ttl (seconds)

local license_key = KEYS[1]
local session_id = ARGV[1]
local max_seats = tonumber(ARGV[2])
local ttl = tonumber(ARGV[3])

-- Check current seat count
local current_seats = redis.call('SCARD', license_key)

-- Reject if at capacity
if current_seats >= max_seats then
return {0, current_seats, max_seats} -- {success=0, active, max}
end

-- Add session to set
redis.call('SADD', license_key, session_id)

-- Set TTL (refreshes on each heartbeat)
redis.call('EXPIRE', license_key, ttl)

-- Return success with updated count
local new_seats = redis.call('SCARD', license_key)
return {1, new_seats, max_seats} -- {success=1, active, max}

Django Integration:

# apps/licenses/services.py

import hashlib
from django.core.cache import cache

# Load Lua script and cache SHA
ACQUIRE_SEAT_SCRIPT = """
[Lua script content from above]
"""

ACQUIRE_SEAT_SHA = hashlib.sha1(ACQUIRE_SEAT_SCRIPT.encode()).hexdigest()

class SessionService:
@staticmethod
def acquire_seat(license_id, session_id, max_seats, ttl=360):
"""Atomically acquire a seat for session."""
from django_redis import get_redis_connection

redis_conn = get_redis_connection('default')

# Ensure script is loaded (idempotent)
try:
result = redis_conn.evalsha(
ACQUIRE_SEAT_SHA,
1, # Number of KEYS
f'license:{license_id}:sessions', # KEYS[1]
session_id, # ARGV[1]
max_seats, # ARGV[2]
ttl # ARGV[3]
)
except redis.exceptions.NoScriptError:
# Script not cached, load it
sha = redis_conn.script_load(ACQUIRE_SEAT_SCRIPT)
assert sha == ACQUIRE_SEAT_SHA
# Retry
result = redis_conn.evalsha(ACQUIRE_SEAT_SHA, 1,
f'license:{license_id}:sessions', session_id, max_seats, ttl)

success, active_seats, max_seats = result
return {
'acquired': bool(success),
'active_seats': active_seats,
'max_seats': max_seats,
}

release_seat.lua

-- KEYS[1]: license:{license_id}:sessions
-- ARGV[1]: session_id

local license_key = KEYS[1]
local session_id = ARGV[1]

-- Remove session from set
local removed = redis.call('SREM', license_key, session_id)

-- Get remaining active seats
local remaining_seats = redis.call('SCARD', license_key)

return {removed, remaining_seats} -- {1 if removed, remaining count}

Django Integration:

RELEASE_SEAT_SCRIPT = """
[Lua script content from above]
"""

RELEASE_SEAT_SHA = hashlib.sha1(RELEASE_SEAT_SCRIPT.encode()).hexdigest()

class SessionService:
@staticmethod
def release_seat(license_id, session_id):
"""Atomically release a seat."""
from django_redis import get_redis_connection

redis_conn = get_redis_connection('default')

try:
result = redis_conn.evalsha(
RELEASE_SEAT_SHA,
1,
f'license:{license_id}:sessions',
session_id
)
except redis.exceptions.NoScriptError:
sha = redis_conn.script_load(RELEASE_SEAT_SCRIPT)
assert sha == RELEASE_SEAT_SHA
result = redis_conn.evalsha(RELEASE_SEAT_SHA, 1,
f'license:{license_id}:sessions', session_id)

removed, remaining_seats = result
return {
'released': bool(removed),
'remaining_seats': remaining_seats,
}

heartbeat.lua

-- KEYS[1]: session:{session_id}
-- KEYS[2]: license:{license_id}:sessions
-- ARGV[1]: ttl (seconds)

local session_key = KEYS[1]
local license_key = KEYS[2]
local ttl = tonumber(ARGV[1])

-- Check session exists
if redis.call('EXISTS', session_key) == 0 then
return 0 -- Session expired
end

-- Refresh TTLs
redis.call('EXPIRE', session_key, ttl)
redis.call('EXPIRE', license_key, ttl)

-- Update last_heartbeat timestamp
redis.call('HSET', session_key, 'last_heartbeat', ARGV[2])

return 1 -- Heartbeat successful

Django Integration:

HEARTBEAT_SCRIPT = """
[Lua script content from above]
"""

HEARTBEAT_SHA = hashlib.sha1(HEARTBEAT_SCRIPT.encode()).hexdigest()

class SessionService:
@staticmethod
def heartbeat(session_id, license_id, ttl=360):
"""Refresh session TTL (heartbeat)."""
from django.utils import timezone
from django_redis import get_redis_connection

redis_conn = get_redis_connection('default')
now = timezone.now().isoformat()

try:
result = redis_conn.evalsha(
HEARTBEAT_SHA,
2, # Number of KEYS
f'session:{session_id}',
f'license:{license_id}:sessions',
ttl,
now
)
except redis.exceptions.NoScriptError:
sha = redis_conn.script_load(HEARTBEAT_SCRIPT)
assert sha == HEARTBEAT_SHA
result = redis_conn.evalsha(HEARTBEAT_SHA, 2,
f'session:{session_id}', f'license:{license_id}:sessions',
ttl, now)

return {'success': bool(result)}

Script Cache

  • Redis SCRIPT LOAD: Pre-load Lua scripts on application startup
  • SHA Hashing: Reference scripts by SHA-1 hash (saves bandwidth)
  • Persistence: Scripts survive Redis restarts (RDB snapshot includes scripts)
  • Cache Miss Handling: Auto-reload script if SHA not found (NoScriptError)

Application Startup:

# apps/licenses/apps.py

from django.apps import AppConfig
from django.core.cache import cache
from django_redis import get_redis_connection
import hashlib

class LicensesConfig(AppConfig):
name = 'apps.licenses'

def ready(self):
"""Load Lua scripts on application startup."""
from .services import (
ACQUIRE_SEAT_SCRIPT, ACQUIRE_SEAT_SHA,
RELEASE_SEAT_SCRIPT, RELEASE_SEAT_SHA,
HEARTBEAT_SCRIPT, HEARTBEAT_SHA
)

redis_conn = get_redis_connection('default')

# Load acquire_seat.lua
sha = redis_conn.script_load(ACQUIRE_SEAT_SCRIPT)
assert sha == ACQUIRE_SEAT_SHA

# Load release_seat.lua
sha = redis_conn.script_load(RELEASE_SEAT_SCRIPT)
assert sha == RELEASE_SEAT_SHA

# Load heartbeat.lua
sha = redis_conn.script_load(HEARTBEAT_SCRIPT)
assert sha == HEARTBEAT_SHA

print("✓ Redis Lua scripts loaded successfully")

5. TTL Management

Key Expiration (6-minute TTL)

  • Session TTL: 360 seconds (6 minutes)
  • Heartbeat Interval: 5 minutes (300 seconds)
  • Grace Period: 60 seconds (1 minute buffer for network latency)
  • Automatic Cleanup: Redis DEL expired keys automatically (no manual cleanup needed)

TTL Lifecycle:

Session Created → TTL=360s
↓ (5 minutes later)
Heartbeat → TTL=360s (refreshed)
↓ (5 minutes later)
Heartbeat → TTL=360s (refreshed)
↓ (If no heartbeat for 6 minutes)
Expired → Redis DEL (automatic seat release)

Django Code:

# Create session with TTL
cache.setex(
f'session:{session_id}',
timeout=360, # 6 minutes
value={'user_id': user_id, 'machine_id': machine_id}
)

# Refresh TTL on heartbeat
cache.expire(f'session:{session_id}', timeout=360)
cache.expire(f'license:{license_id}:sessions', timeout=360)

Keyspace Notifications

  • Configuration: notify-keyspace-events Ex
    • E - Keyevent events (e.g., __keyevent@0__:expired)
    • x - Expired events
  • Purpose: Cleanup workers listen for expired session keys
  • Channel: __keyevent@0__:expired (database 0)

Django Cleanup Worker (Celery Task):

# apps/licenses/tasks.py

import redis
from celery import shared_task
from django.core.cache import cache
from django_redis import get_redis_connection

@shared_task
def listen_for_expired_sessions():
"""
Background worker that listens for Redis keyspace notifications
and cleans up expired sessions.
"""
redis_conn = get_redis_connection('default')
pubsub = redis_conn.pubsub()

# Subscribe to expired key events
pubsub.subscribe('__keyevent@0__:expired')

for message in pubsub.listen():
if message['type'] == 'message':
expired_key = message['data'].decode()

# Handle session expiration
if expired_key.startswith('session:'):
session_id = expired_key.split(':', 1)[1]
handle_session_expiry(session_id)

# Handle license session set expiration
elif expired_key.endswith(':sessions'):
license_id = expired_key.split(':')[1]
handle_license_expiry(license_id)

def handle_session_expiry(session_id):
"""Handle cleanup when session expires."""
from apps.licenses.models import LicenseSession
from django.utils import timezone

try:
# Update PostgreSQL record
session = LicenseSession.objects.get(session_token=session_id)
session.status = 'expired'
session.expires_at = timezone.now()
session.save(update_fields=['status', 'expires_at'])

# Log audit event
logger.info(f"Session {session_id} expired (TTL cleanup)")
except LicenseSession.DoesNotExist:
pass # Already cleaned up

Celery Configuration:

# settings/base.py

CELERY_BEAT_SCHEDULE = {
'listen-for-expired-sessions': {
'task': 'apps.licenses.tasks.listen_for_expired_sessions',
'schedule': crontab(), # Run continuously
},
}

Eviction Policy (volatile-ttl)

  • Configuration: maxmemory-policy volatile-ttl
  • Behavior: When memory limit reached, evict keys with TTL (shortest TTL first)
  • Why: All license session keys have TTL, safe to evict oldest sessions first
  • Memory Limit: 6GB (Memorystore BASIC tier)
  • Memory Warning: 75% threshold (4.5GB) triggers alert

Eviction Priority (Low TTL → High Priority):

  1. Expired keys (TTL=0) - Evicted first
  2. Short TTL keys (e.g., TTL=60s) - Evicted next
  3. Long TTL keys (e.g., TTL=360s) - Evicted last

6. Data Structures

SET (Session Membership)

  • Use Case: License session tracking
  • Operations: O(1) for SADD, SREM, SISMEMBER; O(N) for SCARD
  • Memory: ~100 bytes per session ID
  • Max Size: Practically unlimited (6GB / 100 bytes = 60M sessions)

Example:

# Add session to license
cache.sadd(f'license:{license_id}:sessions', session_id)

# Check membership
is_member = cache.sismember(f'license:{license_id}:sessions', session_id)

# Count active sessions
active_count = cache.scard(f'license:{license_id}:sessions')

# Remove session
cache.srem(f'license:{license_id}:sessions', session_id)

HASH (Session Metadata)

  • Use Case: Session field-value storage
  • Operations: O(1) for HSET, HGET, HGETALL
  • Memory: ~200 bytes per session (6 fields × ~30 bytes each)
  • Encoding: Ziplist for small hashes (<512 bytes), hash table for large

Example:

# Set session metadata
cache.hset(f'session:{session_id}', 'user_id', user_id)
cache.hset(f'session:{session_id}', 'machine_id', machine_id)

# Get single field
user_id = cache.hget(f'session:{session_id}', 'user_id')

# Get all fields
session_data = cache.hgetall(f'session:{session_id}')
# Returns: {'user_id': '...', 'machine_id': '...', ...}

STRING (Seat Counters)

  • Use Case: Atomic seat counting per tenant
  • Operations: O(1) for INCR, DECR, GET
  • Memory: ~50 bytes per counter
  • Range: 64-bit signed integer (-2^63 to 2^63-1)

Example:

# Increment seat count
new_count = cache.incr(f'tenant:{tenant_id}:seats')

# Decrement seat count
new_count = cache.decr(f'tenant:{tenant_id}:seats')

# Get current count
current_count = cache.get(f'tenant:{tenant_id}:seats') or 0

7. Monitoring & Observability

INFO stats (Performance Metrics)

# Redis INFO command
$ redis-cli INFO stats

# Output:
total_connections_received:15432
total_commands_processed:234567
instantaneous_ops_per_sec:1234
total_net_input_bytes:45678901
total_net_output_bytes:89012345
keyspace_hits:123456
keyspace_misses:7890
evicted_keys:0
expired_keys:5432

Django Monitoring:

# apps/licenses/monitoring.py

from django_redis import get_redis_connection
from prometheus_client import Gauge

redis_ops_per_sec = Gauge('redis_ops_per_sec', 'Redis operations per second')
redis_memory_used = Gauge('redis_memory_used_bytes', 'Redis memory usage in bytes')
redis_connected_clients = Gauge('redis_connected_clients', 'Redis connected clients')

def collect_redis_metrics():
"""Collect Redis metrics for Prometheus."""
redis_conn = get_redis_connection('default')
info = redis_conn.info('stats')

redis_ops_per_sec.set(info['instantaneous_ops_per_sec'])
redis_memory_used.set(info['used_memory'])
redis_connected_clients.set(info['connected_clients'])

SLOWLOG (Query Performance)

  • Configuration: slowlog-log-slower-than 10000 (10ms threshold)
  • Size: slowlog-max-len 128 (keep last 128 slow queries)
  • Purpose: Identify performance bottlenecks

Django Slow Query Monitoring:

def check_slow_queries():
"""Check Redis SLOWLOG for performance issues."""
redis_conn = get_redis_connection('default')

# Get slow queries
slow_queries = redis_conn.slowlog_get(10) # Last 10 slow queries

for query in slow_queries:
duration_us = query['duration'] # Microseconds
command = ' '.join(query['command'])

if duration_us > 10000: # >10ms
logger.warning(
f"Slow Redis query: {command} took {duration_us/1000:.2f}ms"
)

Cloud Monitoring Integration

  • Metrics Export: Redis Exporter → Prometheus → Cloud Monitoring
  • Dashboards:
    • Operations: ops/sec, command distribution
    • Memory: used_memory, fragmentation ratio, evicted_keys
    • Connections: connected_clients, rejected_connections
    • Performance: p50/p95/p99 latency, slow queries
    • Availability: uptime, failover events

Example Dashboard Queries:

# Operations per second
rate(redis_commands_processed_total[5m])

# Memory usage percentage
(redis_memory_used_bytes / redis_memory_max_bytes) * 100

# Cache hit rate
rate(redis_keyspace_hits_total[5m]) /
(rate(redis_keyspace_hits_total[5m]) + rate(redis_keyspace_misses_total[5m]))

8. Persistence & Backup

RDB Snapshot Schedule

  • Frequency: Daily at 2 AM UTC (low-traffic window)
  • Trigger: Manual BGSAVE or automatic based on save configuration
  • Performance Impact: <5% CPU spike for 30-60 seconds during snapshot
  • Compression: gzip (reduces snapshot size by 70-80%)

Snapshot Configuration:

# Memorystore managed configuration
save 900 1 # Save after 15 minutes if ≥1 key changed
save 300 10 # Save after 5 minutes if ≥10 keys changed
save 60 10000 # Save after 1 minute if ≥10,000 keys changed

rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb

Manual Snapshot (Emergency Backup):

from django_redis import get_redis_connection

def create_backup():
"""Trigger manual Redis snapshot."""
redis_conn = get_redis_connection('default')

# Background save (non-blocking)
redis_conn.bgsave()

# Check save status
lastsave = redis_conn.lastsave()
print(f"Last snapshot: {lastsave}")

Backup Retention (7 Days)

  • Storage: Google Cloud Storage (regional bucket)
  • Location: gs://coditect-redis-backups/
  • Naming: dump-YYYY-MM-DD-HH-MM-SS.rdb
  • Lifecycle Policy: Auto-delete backups older than 7 days
  • Encryption: Google-managed encryption keys (default)

Backup Lifecycle:

# Cloud Storage lifecycle policy
{
"lifecycle": {
"rule": [
{
"action": {"type": "Delete"},
"condition": {"age": 7} # Delete after 7 days
}
]
}
}

Recovery Procedure

# 1. Stop Redis instance (maintenance window)
gcloud redis instances update licenses-redis \
--region=us-central1 \
--update-labels=maintenance=true

# 2. Download backup from Cloud Storage
gsutil cp gs://coditect-redis-backups/dump-2025-11-30-02-00-00.rdb /tmp/

# 3. Upload to Memorystore (via import)
gcloud redis instances import gs://coditect-redis-backups/dump-2025-11-30-02-00-00.rdb \
--instance=licenses-redis \
--region=us-central1

# 4. Verify data restored
redis-cli DBSIZE
# Expected: 15,000+ keys

# 5. Resume instance
gcloud redis instances update licenses-redis \
--region=us-central1 \
--remove-labels=maintenance

9. Security & Access Control

AUTH Token (Secret Manager)

  • Storage: Google Cloud Secret Manager
  • Secret Name: redis-auth-token
  • Rotation: Every 90 days (automated)
  • Access: Service account licenses-backend-sa@project.iam.gserviceaccount.com

Django Configuration:

# settings/production.py

from google.cloud import secretmanager

def get_redis_auth_token():
"""Retrieve Redis AUTH token from Secret Manager."""
client = secretmanager.SecretManagerServiceClient()
name = "projects/coditect-cloud-infra/secrets/redis-auth-token/versions/latest"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode('UTF-8')

CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': f'redis://:{get_redis_auth_token()}@10.0.0.3:6379/0',
# ... rest of config
}
}

Token Rotation (Celery Task):

# apps/core/tasks.py

from celery import shared_task
from google.cloud import secretmanager
import secrets

@shared_task
def rotate_redis_auth_token():
"""Rotate Redis AUTH token every 90 days."""
client = secretmanager.SecretManagerServiceClient()

# Generate new token
new_token = secrets.token_urlsafe(32)

# Create new secret version
parent = "projects/coditect-cloud-infra/secrets/redis-auth-token"
client.add_secret_version(
request={
"parent": parent,
"payload": {"data": new_token.encode('UTF-8')}
}
)

# Update Redis instance AUTH
from django_redis import get_redis_connection
redis_conn = get_redis_connection('default')
redis_conn.config_set('requirepass', new_token)

logger.info("Redis AUTH token rotated successfully")

TLS 1.3 (In-Transit Encryption)

  • Protocol: TLS 1.3 (latest standard)
  • Certificate: Google-managed SSL certificate
  • Cipher Suites: Strong ciphers only (AES-256-GCM, ChaCha20-Poly1305)
  • MITM Protection: Certificate pinning in client

Django Configuration (TLS):

CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'rediss://10.0.0.3:6379/0', # Note: rediss:// (TLS)
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'ssl_cert_reqs': 'required',
'ssl_ca_certs': '/etc/ssl/certs/ca-certificates.crt',
}
}
}
}

VPC Peering (Network Isolation)

  • Network: Private VPC (no public IP)
  • Peering: VPC peering between GKE VPC and Memorystore VPC
  • Firewall: Only GKE pods can reach Redis (IP whitelist)
  • IP Range: 10.0.0.0/24 (Memorystore authorized network)

Network Topology:

GKE Pods (10.1.0.0/16)
↓ VPC Peering
Redis Memorystore (10.0.0.3:6379)
↓ Private IP only
No Internet Access ✓

10. Failover & High Availability

Health Checks (PING every 30s)

  • Frequency: 30-second intervals
  • Command: PING
  • Expected Response: PONG
  • Timeout: 5 seconds
  • Failure Threshold: 3 consecutive failures → trigger failover

Django Health Check:

# apps/core/health.py

from django.core.cache import cache
from django.http import JsonResponse
import time

def redis_health_check(request):
"""Health check endpoint for Redis connectivity."""
try:
start = time.perf_counter()

# Ping Redis
cache.set('health_check', 'ok', timeout=10)
value = cache.get('health_check')

elapsed_ms = (time.perf_counter() - start) * 1000

if value != 'ok':
return JsonResponse({
'status': 'unhealthy',
'error': 'Redis value mismatch'
}, status=503)

return JsonResponse({
'status': 'healthy',
'latency_ms': round(elapsed_ms, 2)
})
except Exception as e:
return JsonResponse({
'status': 'unhealthy',
'error': str(e)
}, status=503)

Kubernetes Liveness Probe:

# kubernetes/base/deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
name: licenses-backend
spec:
template:
spec:
containers:
- name: django
livenessProbe:
httpGet:
path: /health/redis
port: 8000
initialDelaySeconds: 30
periodSeconds: 30
timeoutSeconds: 5
failureThreshold: 3

Auto-Reconnect (Exponential Backoff)

  • Initial Delay: 100ms
  • Max Delay: 30 seconds
  • Backoff Multiplier: 2x
  • Max Retries: Infinite (keep trying until connection restored)

django-redis Configuration:

CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'OPTIONS': {
'RETRY_ON_TIMEOUT': True,
'SOCKET_CONNECT_TIMEOUT': 5,
'SOCKET_TIMEOUT': 5,
'CONNECTION_POOL_KWARGS': {
'retry_on_timeout': True,
'socket_keepalive': True,
'socket_connect_timeout': 5,
'retry': {
'max_retries': 10,
'backoff_factor': 2, # Exponential backoff
'max_backoff': 30,
}
}
}
}
}

Custom Retry Logic:

# apps/core/redis_client.py

import time
from django_redis import get_redis_connection
import redis

def execute_with_retry(operation, max_retries=5, initial_delay=0.1):
"""Execute Redis operation with exponential backoff retry."""
delay = initial_delay

for attempt in range(max_retries):
try:
return operation()
except redis.ConnectionError as e:
if attempt == max_retries - 1:
raise # Final retry failed

logger.warning(
f"Redis connection error (attempt {attempt+1}/{max_retries}): {e}"
)
time.sleep(delay)
delay = min(delay * 2, 30) # Exponential backoff, max 30s
except redis.TimeoutError as e:
logger.error(f"Redis timeout: {e}")
raise # Don't retry timeouts (different issue)

# Usage
def acquire_seat(license_id, session_id):
def operation():
redis_conn = get_redis_connection('default')
return redis_conn.evalsha(ACQUIRE_SEAT_SHA, 1,
f'license:{license_id}:sessions', session_id, 5, 360)

return execute_with_retry(operation)

Failover Time (<60s with Replica)

  • BASIC Tier: ~5 minutes failover (no replica)
  • STANDARD HA Tier: <60 seconds failover (automatic promotion of replica to primary)
  • Data Loss: Zero (synchronous replication with RDB + AOF in STANDARD HA)

Production Upgrade to STANDARD HA:

# Upgrade to STANDARD HA (zero downtime)
gcloud redis instances update licenses-redis \
--region=us-central1 \
--tier=STANDARD_HA \
--replica-count=1 \
--read-replicas-mode=READ_REPLICAS_ENABLED

# Enable AOF for <1s data loss
gcloud redis instances update licenses-redis \
--region=us-central1 \
--persistence-mode=RDB_AOF \
--rdb-snapshot-period=ONE_HOUR \
--rdb-snapshot-start-time=02:00

Failover Monitoring (Celery Task):

# apps/core/tasks.py

from celery import shared_task
from django_redis import get_redis_connection

@shared_task
def monitor_redis_failover():
"""Monitor Redis for failover events."""
redis_conn = get_redis_connection('default')

try:
info = redis_conn.info('replication')
role = info['role']

if role == 'slave':
# Failover in progress
logger.critical("Redis failover detected: Instance is now slave")
# Alert on-call engineer
else:
logger.info(f"Redis health check: role={role}")
except Exception as e:
logger.error(f"Redis health check failed: {e}")

Performance Characteristics

Latency Benchmarks

Operationp50p95p99Max
PING<1ms<2ms<3ms<10ms
GET/SET<1ms<2ms<5ms<15ms
SADD/SREM<1ms<3ms<8ms<20ms
EVALSHA (Lua)<2ms<5ms<10ms<30ms
HGETALL (6 fields)<1ms<3ms<6ms<15ms

Testing Methodology:

# Load test with django-redis
import time
from django.core.cache import cache

def benchmark_redis_operations(iterations=1000):
"""Benchmark Redis operation latency."""
latencies = {'get': [], 'set': [], 'sadd': [], 'lua': []}

for i in range(iterations):
# Benchmark GET
start = time.perf_counter()
cache.get(f'test_key_{i}')
latencies['get'].append((time.perf_counter() - start) * 1000)

# Benchmark SET
start = time.perf_counter()
cache.set(f'test_key_{i}', 'value', timeout=300)
latencies['set'].append((time.perf_counter() - start) * 1000)

# Benchmark SADD
start = time.perf_counter()
cache.sadd(f'test_set_{i}', 'member')
latencies['sadd'].append((time.perf_counter() - start) * 1000)

# Calculate percentiles
for op, times in latencies.items():
times.sort()
p50 = times[len(times) // 2]
p95 = times[int(len(times) * 0.95)]
p99 = times[int(len(times) * 0.99)]
print(f"{op}: p50={p50:.2f}ms, p95={p95:.2f}ms, p99={p99:.2f}ms")

Throughput Benchmarks

MetricDevelopment (BASIC)Production (STANDARD HA)
Ops/sec (typical)12,00025,000
Ops/sec (burst)25,00050,000
Concurrent connections50200
Max keys60M60M
Memory6GB16GB

Load Testing:

# redis-benchmark against Memorystore
redis-benchmark -h 10.0.0.3 -p 6379 -a <auth-token> \
-c 50 \ # 50 concurrent connections
-n 100000 \ # 100K requests
-t get,set,sadd,evalsha \ # Test operations
-q # Quiet mode

# Results:
SET: 15234.56 requests per second
GET: 18567.89 requests per second
SADD: 14123.45 requests per second
EVALSHA: 11234.56 requests per second

Memory Usage

ComponentMemory per Item10K Sessions100K Sessions
Session SET100 bytes1 MB10 MB
Session HASH200 bytes2 MB20 MB
Seat Counter50 bytes500 KB5 MB
Lua Scripts5 KB5 KB5 KB
Total-3.5 MB35 MB

Capacity Planning:

  • 6GB Memorystore: Supports 170K+ concurrent sessions
  • 16GB Memorystore: Supports 450K+ concurrent sessions
  • Overhead: 20% for Redis internal structures, fragmentation

Memory Monitoring:

from django_redis import get_redis_connection

def check_redis_memory():
"""Check Redis memory usage and fragmentation."""
redis_conn = get_redis_connection('default')
info = redis_conn.info('memory')

used_memory_mb = info['used_memory'] / (1024 ** 2)
max_memory_mb = info['maxmemory'] / (1024 ** 2)
fragmentation_ratio = info['mem_fragmentation_ratio']

usage_percent = (used_memory_mb / max_memory_mb) * 100

print(f"Memory: {used_memory_mb:.2f} MB / {max_memory_mb:.2f} MB ({usage_percent:.1f}%)")
print(f"Fragmentation: {fragmentation_ratio:.2f}")

if usage_percent > 75:
logger.warning(f"Redis memory usage high: {usage_percent:.1f}%")

if fragmentation_ratio > 1.5:
logger.warning(f"Redis fragmentation high: {fragmentation_ratio:.2f}")

Integration with Django

Django Cache Configuration

# settings/base.py

CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://10.0.0.3:6379/0',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True,
'socket_keepalive': True,
'socket_keepalive_options': {
1: 1, # TCP_KEEPIDLE
2: 1, # TCP_KEEPINTVL
3: 3, # TCP_KEEPCNT
},
},
'SOCKET_CONNECT_TIMEOUT': 5,
'SOCKET_TIMEOUT': 5,
'SERIALIZER': 'django_redis.serializers.json.JSONSerializer',
'COMPRESSOR': 'django_redis.compressors.zlib.ZlibCompressor',
},
'KEY_PREFIX': 'coditect', # Namespace for all keys
'VERSION': 1, # Cache version (increment to invalidate all)
}
}

# Use Redis for session storage (optional)
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'

Service Layer Integration

# apps/licenses/services.py

from django.core.cache import cache
from django_redis import get_redis_connection
from apps.tenants.context import get_current_tenant

class SessionService:
"""Service for license session management with Redis."""

@staticmethod
def acquire_seat(user, machine_id, license_type='pro'):
"""Acquire a license session with atomic seat counting."""
tenant = get_current_tenant()
max_seats = tenant.max_users # From tenant plan

# Generate session ID
import uuid
session_id = str(uuid.uuid4())

# Atomic seat acquisition via Lua script
result = SessionService._execute_acquire_seat_script(
license_id=tenant.id,
session_id=session_id,
max_seats=max_seats,
ttl=360
)

if not result['acquired']:
raise SeatLimitExceeded(
f"No available seats ({result['active_seats']}/{result['max_seats']})"
)

# Store session metadata in Redis
cache.hset(f'session:{session_id}', mapping={
'user_id': str(user.id),
'machine_id': machine_id,
'license_type': license_type,
'created_at': timezone.now().isoformat(),
'last_heartbeat': timezone.now().isoformat(),
})
cache.expire(f'session:{session_id}', timeout=360)

# Create PostgreSQL record
from apps.licenses.models import LicenseSession
session = LicenseSession.objects.create(
tenant=tenant,
user=user,
session_token=session_id,
machine_id=machine_id,
license_type=license_type,
status='active',
expires_at=timezone.now() + timezone.timedelta(seconds=360)
)

return session

@staticmethod
def heartbeat(session_id, license_id):
"""Refresh session TTL (heartbeat)."""
result = SessionService._execute_heartbeat_script(
session_id=session_id,
license_id=license_id,
ttl=360
)

if not result['success']:
raise SessionExpired(f"Session {session_id} expired")

# Update PostgreSQL last_validated_at
from apps.licenses.models import LicenseSession
LicenseSession.objects.filter(session_token=session_id).update(
last_validated_at=timezone.now()
)

@staticmethod
def release_seat(session_id, license_id):
"""Release a license seat."""
result = SessionService._execute_release_seat_script(
license_id=license_id,
session_id=session_id
)

# Delete session metadata
cache.delete(f'session:{session_id}')

# Update PostgreSQL status
from apps.licenses.models import LicenseSession
LicenseSession.objects.filter(session_token=session_id).update(
status='released',
revoked_at=timezone.now()
)

@staticmethod
def _execute_acquire_seat_script(license_id, session_id, max_seats, ttl):
"""Execute acquire_seat.lua via EVALSHA."""
from .lua_scripts import ACQUIRE_SEAT_SHA, ACQUIRE_SEAT_SCRIPT
redis_conn = get_redis_connection('default')

try:
result = redis_conn.evalsha(
ACQUIRE_SEAT_SHA, 1,
f'license:{license_id}:sessions',
session_id, max_seats, ttl
)
except redis.exceptions.NoScriptError:
sha = redis_conn.script_load(ACQUIRE_SEAT_SCRIPT)
assert sha == ACQUIRE_SEAT_SHA
result = redis_conn.evalsha(ACQUIRE_SEAT_SHA, 1,
f'license:{license_id}:sessions', session_id, max_seats, ttl)

success, active_seats, max_seats = result
return {
'acquired': bool(success),
'active_seats': active_seats,
'max_seats': max_seats,
}

# Similar methods for _execute_heartbeat_script, _execute_release_seat_script

ViewSet Integration

# apps/licenses/views.py

from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from apps.tenants.context import get_current_tenant
from .services import SessionService

class LicenseSessionViewSet(viewsets.ModelViewSet):
"""License session management endpoints."""

@action(detail=False, methods=['post'])
def acquire(self, request):
"""Acquire a license session."""
machine_id = request.data.get('machine_id')
license_type = request.data.get('license_type', 'pro')

try:
session = SessionService.acquire_seat(
user=request.user,
machine_id=machine_id,
license_type=license_type
)

return Response({
'session_id': session.session_token,
'expires_at': session.expires_at.isoformat(),
'status': 'active',
}, status=status.HTTP_201_CREATED)

except SeatLimitExceeded as e:
return Response({
'error': 'no_seats_available',
'message': str(e)
}, status=status.HTTP_409_CONFLICT)

@action(detail=False, methods=['post'])
def heartbeat(self, request):
"""Refresh session TTL (heartbeat)."""
session_id = request.data.get('session_id')
tenant = get_current_tenant()

try:
SessionService.heartbeat(session_id, tenant.id)

return Response({
'status': 'renewed',
'ttl': 360
})

except SessionExpired as e:
return Response({
'error': 'session_expired',
'message': str(e)
}, status=status.HTTP_410_GONE)

@action(detail=False, methods=['post'])
def release(self, request):
"""Release a license session."""
session_id = request.data.get('session_id')
tenant = get_current_tenant()

SessionService.release_seat(session_id, tenant.id)

return Response({
'status': 'released'
})

Summary

C3-03: Redis Components provides the complete Redis Memorystore architecture for CODITECT license session tracking. Key highlights:

  1. Atomic Seat Counting: Lua scripts prevent race conditions under high concurrency
  2. TTL-Based Session Management: Automatic cleanup of zombie sessions (6-minute TTL)
  3. Connection Pooling: 50 connections with keep-alive for optimal performance
  4. Django Integration: Seamless integration with Django Cache Framework (django-redis)
  5. Production-Ready: RDB persistence, health checks, auto-reconnect, monitoring

Next Steps:

  • Implement Lua scripts in apps/licenses/lua_scripts.py
  • Create SessionService in apps/licenses/services.py
  • Add Redis endpoints to LicenseSessionViewSet
  • Configure Celery worker for keyspace notification listening
  • Set up Cloud Monitoring dashboards for Redis metrics

Related Documentation:


Status: ✅ COMPLETE Created: 2025-11-30 Phase: 7A - Critical Diagrams (6 of 6 complete) Next: Phase 7B - Core Conversions (FastAPI→Django)