Celery Background Task Integration
Complete Celery integration for CODITECT License Platform, enabling background task processing and scheduled jobs for production operations.
Overview
Celery provides asynchronous task processing for:
- Zombie session cleanup - Hourly cleanup of expired sessions
- License expiration notifications - Scheduled alerts for expiring licenses
- Usage metrics aggregation - Periodic analytics processing
Architecture
┌─────────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Django API │────▶│ Redis │◀────│ Celery Worker │
│ (Web Server) │ │ (Broker) │ │ (Tasks) │
└─────────────────┘ └──────────────┘ └─────────────────┘
▲
│
┌─────┴──────┐
│ Celery Beat│
│ (Scheduler)│
└────────────┘
Components
- Django Application - Enqueues tasks via
task.delay()ortask.apply_async() - Redis - Message broker and result backend
- Celery Workers - Process background tasks asynchronously
- Celery Beat - Schedules periodic tasks (cron-like)
Installation
Dependencies
# Install Celery with Redis support
pip install celery[redis]==5.3.4 django-redis==5.4.0
Added to requirements.txt:
celery[redis]==5.3.4- Task queue with Redis transportdjango-redis==5.4.0- Django cache backend for Redis
Configuration Files
Created:
license_platform/celery.py- Celery application configurationlicense_platform/__init__.py- Auto-load Celery on Django startuplicenses/tasks.py- Background task definitionslicense_platform/settings/production.py- Celery settings (CELERY_BROKER_URL, etc.)
Task Definitions
1. Zombie Session Cleanup
Task: licenses.tasks.cleanup_zombie_sessions
Schedule: Hourly (via Celery Beat)
Purpose: Cleanup sessions that expired in Redis but remain active in database
Logic:
threshold = timezone.now() - timedelta(minutes=6)
zombie_sessions = LicenseSession.objects.filter(
last_heartbeat_at__lt=threshold,
ended_at__isnull=True
)
# Set ended_at for each zombie session
Returns:
{
'sessions_found': 42,
'sessions_cleaned': 42,
'errors': 0,
'duration_seconds': 1.23
}
Manual Execution:
from licenses.tasks import cleanup_zombie_sessions
# Synchronous execution (for testing)
result = cleanup_zombie_sessions()
# Asynchronous execution (production)
task = cleanup_zombie_sessions.delay()
print(task.id) # Task ID for tracking
2. License Expiration Checks (Future)
Task: licenses.tasks.check_license_expirations
Schedule: Daily at midnight UTC
Purpose: Send notifications for licenses expiring soon (30, 7, 1 days before expiration)
Status: Task defined, email integration pending
3. Usage Metrics Aggregation (Future)
Task: licenses.tasks.aggregate_usage_metrics
Schedule: TBD
Purpose: Process session data for analytics and reporting
Status: Placeholder task, implementation pending
Celery Beat Schedule
Periodic tasks configured in license_platform/celery.py:
app.conf.beat_schedule = {
'cleanup-zombie-sessions-hourly': {
'task': 'licenses.tasks.cleanup_zombie_sessions',
'schedule': crontab(minute=0), # Every hour on the hour
'options': {
'expires': 3300, # Expires after 55 minutes
},
},
}
Local Development
Start Redis (Required)
# Using Docker
docker run -d -p 6379:6379 redis:7-alpine
# Or install locally
brew install redis
redis-server
Start Celery Worker
# From project root
celery -A license_platform worker --loglevel=info
Output:
-------------- celery@hostname v5.3.4 (emerald-rush)
--- ***** -----
-- ******* ---- Linux-5.x-x86_64-with-glibc2.35 2025-11-30 20:00:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: license_platform:0x7f...
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. licenses.tasks.cleanup_zombie_sessions
. licenses.tasks.check_license_expirations
. licenses.tasks.aggregate_usage_metrics
. license_platform.celery.debug_task
Start Celery Beat
# In separate terminal
celery -A license_platform beat --loglevel=info
Output:
celery beat v5.3.4 (emerald-rush) is starting.
__ - ... __ - _
LocalTime -> 2025-11-30 20:00:00
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2025-11-30 20:00:00,000: INFO/MainProcess] beat: Starting...
[2025-11-30 20:00:00,000: INFO/MainProcess] Scheduler: Sending due task cleanup-zombie-sessions-hourly
Test Task Execution
# Django shell
python manage.py shell
# Import task
from licenses.tasks import cleanup_zombie_sessions
# Execute asynchronously
task = cleanup_zombie_sessions.delay()
# Check task status
print(task.id) # e.g., 'd7a5b2c1-3e4f-5g6h-7i8j-9k0l1m2n3o4p'
print(task.status) # 'PENDING', 'STARTED', 'SUCCESS', 'FAILURE'
# Get result (blocks until complete)
result = task.get(timeout=10)
print(result)
# {'sessions_found': 5, 'sessions_cleaned': 5, 'errors': 0, 'duration_seconds': 0.45}
Kubernetes Deployment
Deployment Manifests
Created:
k8s/celery-worker-deployment.yaml- Worker pods with HPAk8s/celery-beat-deployment.yaml- Beat scheduler (single replica)
Deploy to GKE
# Apply worker deployment
kubectl apply -f k8s/celery-worker-deployment.yaml
# Apply beat deployment
kubectl apply -f k8s/celery-beat-deployment.yaml
# Verify deployments
kubectl get pods -n coditect-license-platform | grep celery
# Expected output:
# celery-worker-abc123-xyz 1/1 Running 0 10s
# celery-worker-def456-uvw 1/1 Running 0 10s
# celery-beat-ghi789-rst 1/1 Running 0 10s
Worker Configuration
Replicas: 2 (min) to 10 (max) via HPA
Resources:
- Requests: 200m CPU, 256Mi memory
- Limits: 500m CPU, 512Mi memory
Autoscaling Triggers:
- CPU utilization > 70%
- Memory utilization > 80%
Command:
celery -A license_platform worker \
--loglevel=info \
--concurrency=4 \
--max-tasks-per-child=1000 \
--time-limit=300 \
--soft-time-limit=240
Beat Configuration
Replicas: 1 (fixed, no scaling)
Strategy: Recreate (ensures only one beat instance)
Resources:
- Requests: 100m CPU, 128Mi memory
- Limits: 200m CPU, 256Mi memory
Command:
celery -A license_platform beat \
--loglevel=info \
--scheduler=django_celery_beat.schedulers:DatabaseScheduler
Monitoring
Health Checks
Worker Liveness Probe:
celery -A license_platform inspect ping -d celery@$HOSTNAME
Worker Readiness Probe:
celery -A license_platform inspect ping -d celery@$HOSTNAME
Beat Liveness Probe:
pgrep -f "celery.*beat"
Logs
View worker logs:
kubectl logs -f deployment/celery-worker -n coditect-license-platform
View beat logs:
kubectl logs -f deployment/celery-beat -n coditect-license-platform
Metrics (Future)
Celery metrics can be exported to Prometheus via:
celery-exportersidecar container- Custom metrics in tasks
- Integration with Django logging
Troubleshooting
Workers Not Processing Tasks
Check Redis connection:
# In Django shell
from django.core.cache import cache
cache.set('test', 'value')
print(cache.get('test')) # Should print 'value'
Check worker status:
celery -A license_platform inspect active
celery -A license_platform inspect stats
Tasks Failing
View task traceback:
from celery.result import AsyncResult
task_id = 'd7a5b2c1-3e4f-5g6h-7i8j-9k0l1m2n3o4p'
result = AsyncResult(task_id)
if result.failed():
print(result.traceback)
Retry failed task:
from licenses.tasks import cleanup_zombie_sessions
# Retry with custom delay
cleanup_zombie_sessions.apply_async(retry=True, retry_policy={
'max_retries': 3,
'interval_start': 60,
'interval_step': 60,
})
Beat Not Scheduling Tasks
Check beat scheduler database:
# In Django shell
from django_celery_beat.models import PeriodicTask
# List scheduled tasks
for task in PeriodicTask.objects.all():
print(f'{task.name}: {task.enabled}')
Verify beat is running:
ps aux | grep "celery.*beat"
Configuration Reference
Environment Variables
| Variable | Description | Default | Required |
|---|---|---|---|
REDIS_HOST | Redis hostname | localhost | Yes |
REDIS_PORT | Redis port | 6379 | No |
REDIS_DB | Redis database number | 0 | No |
REDIS_PASSWORD | Redis password | None | No |
CELERY_BROKER_URL | Broker URL (auto-generated) | - | Auto |
CELERY_RESULT_BACKEND | Result backend URL | - | Auto |
Celery Settings
Configured in license_platform/celery.py:
app.conf.update(
timezone='UTC',
enable_utc=True,
task_serializer='json',
accept_content=['json'],
result_serializer='json',
task_track_started=True,
task_time_limit=300, # 5 minutes
task_soft_time_limit=240, # 4 minutes
result_expires=86400, # 24 hours
worker_prefetch_multiplier=4,
worker_max_tasks_per_child=1000,
)
Best Practices
Task Design
- Idempotent Tasks - Tasks should be safe to retry without side effects
- Timeouts - Set reasonable time limits to prevent hung tasks
- Error Handling - Use try/except and log errors explicitly
- Batch Processing - Process large datasets in batches (e.g., 100 records)
- Result Expiration - Clean up task results after 24 hours
Production Guidelines
- Resource Limits - Always set CPU/memory limits in Kubernetes
- Autoscaling - Use HPA for workers, fixed replicas for beat
- Monitoring - Implement health checks and log aggregation
- Graceful Shutdown - Set terminationGracePeriodSeconds for in-flight tasks
- Task Priority - Use priority queues for critical tasks (future enhancement)
Security
- No Secrets in Tasks - Pass IDs, fetch secrets from environment/vault
- Input Validation - Validate all task arguments
- Audit Logging - Log all task executions for compliance
- Network Policies - Restrict worker network access in Kubernetes
Future Enhancements
Planned Features
- Task Priority Queues - Separate queues for critical tasks
- Distributed Locking - Prevent duplicate task execution
- Task Monitoring Dashboard - Web UI for task management
- Custom Metrics - Export task metrics to Prometheus
- Email Integration - Complete license expiration notifications
- Webhook Support - Trigger tasks via external webhooks
- Task Chains - Complex workflows with task dependencies
Scalability Improvements
- Worker Pools - Separate worker pools for different task types
- Result Backend Optimization - Use separate Redis instance for results
- Task Routing - Route tasks to specific workers based on capabilities
- Message Compression - Compress large task payloads
- Connection Pooling - Optimize Redis connection usage
References
- Celery Documentation: https://docs.celeryproject.org/en/stable/
- Django-Celery Integration: https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html
- Celery Beat: https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
- Redis as Broker: https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/redis.html
Status: Production-Ready Last Updated: November 30, 2025 Deployment: Kubernetes (GKE) Maintainer: CODITECT Platform Team