Skip to main content

Celery Background Task Integration

Complete Celery integration for CODITECT License Platform, enabling background task processing and scheduled jobs for production operations.

Overview

Celery provides asynchronous task processing for:

  • Zombie session cleanup - Hourly cleanup of expired sessions
  • License expiration notifications - Scheduled alerts for expiring licenses
  • Usage metrics aggregation - Periodic analytics processing

Architecture

┌─────────────────┐     ┌──────────────┐     ┌─────────────────┐
│ Django API │────▶│ Redis │◀────│ Celery Worker │
│ (Web Server) │ │ (Broker) │ │ (Tasks) │
└─────────────────┘ └──────────────┘ └─────────────────┘


┌─────┴──────┐
│ Celery Beat│
│ (Scheduler)│
└────────────┘

Components

  1. Django Application - Enqueues tasks via task.delay() or task.apply_async()
  2. Redis - Message broker and result backend
  3. Celery Workers - Process background tasks asynchronously
  4. Celery Beat - Schedules periodic tasks (cron-like)

Installation

Dependencies

# Install Celery with Redis support
pip install celery[redis]==5.3.4 django-redis==5.4.0

Added to requirements.txt:

  • celery[redis]==5.3.4 - Task queue with Redis transport
  • django-redis==5.4.0 - Django cache backend for Redis

Configuration Files

Created:

  • license_platform/celery.py - Celery application configuration
  • license_platform/__init__.py - Auto-load Celery on Django startup
  • licenses/tasks.py - Background task definitions
  • license_platform/settings/production.py - Celery settings (CELERY_BROKER_URL, etc.)

Task Definitions

1. Zombie Session Cleanup

Task: licenses.tasks.cleanup_zombie_sessions

Schedule: Hourly (via Celery Beat)

Purpose: Cleanup sessions that expired in Redis but remain active in database

Logic:

threshold = timezone.now() - timedelta(minutes=6)
zombie_sessions = LicenseSession.objects.filter(
last_heartbeat_at__lt=threshold,
ended_at__isnull=True
)
# Set ended_at for each zombie session

Returns:

{
'sessions_found': 42,
'sessions_cleaned': 42,
'errors': 0,
'duration_seconds': 1.23
}

Manual Execution:

from licenses.tasks import cleanup_zombie_sessions

# Synchronous execution (for testing)
result = cleanup_zombie_sessions()

# Asynchronous execution (production)
task = cleanup_zombie_sessions.delay()
print(task.id) # Task ID for tracking

2. License Expiration Checks (Future)

Task: licenses.tasks.check_license_expirations

Schedule: Daily at midnight UTC

Purpose: Send notifications for licenses expiring soon (30, 7, 1 days before expiration)

Status: Task defined, email integration pending

3. Usage Metrics Aggregation (Future)

Task: licenses.tasks.aggregate_usage_metrics

Schedule: TBD

Purpose: Process session data for analytics and reporting

Status: Placeholder task, implementation pending

Celery Beat Schedule

Periodic tasks configured in license_platform/celery.py:

app.conf.beat_schedule = {
'cleanup-zombie-sessions-hourly': {
'task': 'licenses.tasks.cleanup_zombie_sessions',
'schedule': crontab(minute=0), # Every hour on the hour
'options': {
'expires': 3300, # Expires after 55 minutes
},
},
}

Local Development

Start Redis (Required)

# Using Docker
docker run -d -p 6379:6379 redis:7-alpine

# Or install locally
brew install redis
redis-server

Start Celery Worker

# From project root
celery -A license_platform worker --loglevel=info

Output:

 -------------- celery@hostname v5.3.4 (emerald-rush)
--- ***** -----
-- ******* ---- Linux-5.x-x86_64-with-glibc2.35 2025-11-30 20:00:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: license_platform:0x7f...
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery

[tasks]
. licenses.tasks.cleanup_zombie_sessions
. licenses.tasks.check_license_expirations
. licenses.tasks.aggregate_usage_metrics
. license_platform.celery.debug_task

Start Celery Beat

# In separate terminal
celery -A license_platform beat --loglevel=info

Output:

celery beat v5.3.4 (emerald-rush) is starting.
__ - ... __ - _
LocalTime -> 2025-11-30 20:00:00
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)

[2025-11-30 20:00:00,000: INFO/MainProcess] beat: Starting...
[2025-11-30 20:00:00,000: INFO/MainProcess] Scheduler: Sending due task cleanup-zombie-sessions-hourly

Test Task Execution

# Django shell
python manage.py shell

# Import task
from licenses.tasks import cleanup_zombie_sessions

# Execute asynchronously
task = cleanup_zombie_sessions.delay()

# Check task status
print(task.id) # e.g., 'd7a5b2c1-3e4f-5g6h-7i8j-9k0l1m2n3o4p'
print(task.status) # 'PENDING', 'STARTED', 'SUCCESS', 'FAILURE'

# Get result (blocks until complete)
result = task.get(timeout=10)
print(result)
# {'sessions_found': 5, 'sessions_cleaned': 5, 'errors': 0, 'duration_seconds': 0.45}

Kubernetes Deployment

Deployment Manifests

Created:

  • k8s/celery-worker-deployment.yaml - Worker pods with HPA
  • k8s/celery-beat-deployment.yaml - Beat scheduler (single replica)

Deploy to GKE

# Apply worker deployment
kubectl apply -f k8s/celery-worker-deployment.yaml

# Apply beat deployment
kubectl apply -f k8s/celery-beat-deployment.yaml

# Verify deployments
kubectl get pods -n coditect-license-platform | grep celery

# Expected output:
# celery-worker-abc123-xyz 1/1 Running 0 10s
# celery-worker-def456-uvw 1/1 Running 0 10s
# celery-beat-ghi789-rst 1/1 Running 0 10s

Worker Configuration

Replicas: 2 (min) to 10 (max) via HPA

Resources:

  • Requests: 200m CPU, 256Mi memory
  • Limits: 500m CPU, 512Mi memory

Autoscaling Triggers:

  • CPU utilization > 70%
  • Memory utilization > 80%

Command:

celery -A license_platform worker \
--loglevel=info \
--concurrency=4 \
--max-tasks-per-child=1000 \
--time-limit=300 \
--soft-time-limit=240

Beat Configuration

Replicas: 1 (fixed, no scaling)

Strategy: Recreate (ensures only one beat instance)

Resources:

  • Requests: 100m CPU, 128Mi memory
  • Limits: 200m CPU, 256Mi memory

Command:

celery -A license_platform beat \
--loglevel=info \
--scheduler=django_celery_beat.schedulers:DatabaseScheduler

Monitoring

Health Checks

Worker Liveness Probe:

celery -A license_platform inspect ping -d celery@$HOSTNAME

Worker Readiness Probe:

celery -A license_platform inspect ping -d celery@$HOSTNAME

Beat Liveness Probe:

pgrep -f "celery.*beat"

Logs

View worker logs:

kubectl logs -f deployment/celery-worker -n coditect-license-platform

View beat logs:

kubectl logs -f deployment/celery-beat -n coditect-license-platform

Metrics (Future)

Celery metrics can be exported to Prometheus via:

  • celery-exporter sidecar container
  • Custom metrics in tasks
  • Integration with Django logging

Troubleshooting

Workers Not Processing Tasks

Check Redis connection:

# In Django shell
from django.core.cache import cache
cache.set('test', 'value')
print(cache.get('test')) # Should print 'value'

Check worker status:

celery -A license_platform inspect active
celery -A license_platform inspect stats

Tasks Failing

View task traceback:

from celery.result import AsyncResult

task_id = 'd7a5b2c1-3e4f-5g6h-7i8j-9k0l1m2n3o4p'
result = AsyncResult(task_id)

if result.failed():
print(result.traceback)

Retry failed task:

from licenses.tasks import cleanup_zombie_sessions

# Retry with custom delay
cleanup_zombie_sessions.apply_async(retry=True, retry_policy={
'max_retries': 3,
'interval_start': 60,
'interval_step': 60,
})

Beat Not Scheduling Tasks

Check beat scheduler database:

# In Django shell
from django_celery_beat.models import PeriodicTask

# List scheduled tasks
for task in PeriodicTask.objects.all():
print(f'{task.name}: {task.enabled}')

Verify beat is running:

ps aux | grep "celery.*beat"

Configuration Reference

Environment Variables

VariableDescriptionDefaultRequired
REDIS_HOSTRedis hostnamelocalhostYes
REDIS_PORTRedis port6379No
REDIS_DBRedis database number0No
REDIS_PASSWORDRedis passwordNoneNo
CELERY_BROKER_URLBroker URL (auto-generated)-Auto
CELERY_RESULT_BACKENDResult backend URL-Auto

Celery Settings

Configured in license_platform/celery.py:

app.conf.update(
timezone='UTC',
enable_utc=True,
task_serializer='json',
accept_content=['json'],
result_serializer='json',
task_track_started=True,
task_time_limit=300, # 5 minutes
task_soft_time_limit=240, # 4 minutes
result_expires=86400, # 24 hours
worker_prefetch_multiplier=4,
worker_max_tasks_per_child=1000,
)

Best Practices

Task Design

  1. Idempotent Tasks - Tasks should be safe to retry without side effects
  2. Timeouts - Set reasonable time limits to prevent hung tasks
  3. Error Handling - Use try/except and log errors explicitly
  4. Batch Processing - Process large datasets in batches (e.g., 100 records)
  5. Result Expiration - Clean up task results after 24 hours

Production Guidelines

  1. Resource Limits - Always set CPU/memory limits in Kubernetes
  2. Autoscaling - Use HPA for workers, fixed replicas for beat
  3. Monitoring - Implement health checks and log aggregation
  4. Graceful Shutdown - Set terminationGracePeriodSeconds for in-flight tasks
  5. Task Priority - Use priority queues for critical tasks (future enhancement)

Security

  1. No Secrets in Tasks - Pass IDs, fetch secrets from environment/vault
  2. Input Validation - Validate all task arguments
  3. Audit Logging - Log all task executions for compliance
  4. Network Policies - Restrict worker network access in Kubernetes

Future Enhancements

Planned Features

  1. Task Priority Queues - Separate queues for critical tasks
  2. Distributed Locking - Prevent duplicate task execution
  3. Task Monitoring Dashboard - Web UI for task management
  4. Custom Metrics - Export task metrics to Prometheus
  5. Email Integration - Complete license expiration notifications
  6. Webhook Support - Trigger tasks via external webhooks
  7. Task Chains - Complex workflows with task dependencies

Scalability Improvements

  1. Worker Pools - Separate worker pools for different task types
  2. Result Backend Optimization - Use separate Redis instance for results
  3. Task Routing - Route tasks to specific workers based on capabilities
  4. Message Compression - Compress large task payloads
  5. Connection Pooling - Optimize Redis connection usage

References


Status: Production-Ready Last Updated: November 30, 2025 Deployment: Kubernetes (GKE) Maintainer: CODITECT Platform Team