Skip to main content

ADR-007: Django Multi-Tenant Architecture

Status: Accepted Date: 2025-11-30 Deciders: Hal Casteel (CTO), Architecture Team Related ADRs:


Context

The CODITECT license server must support multi-tenant architecture with complete data isolation between organizations while maintaining high performance and cost efficiency.

Business Requirements

Scale Targets:

  • Support 1,000+ organizations (tenants)
  • 10,000+ concurrent license sessions
  • 100,000+ total registered users
  • 1M+ license validations per day

Security Requirements:

  • Complete data isolation between tenants (zero cross-tenant leakage)
  • Row-level security enforcement at database layer
  • Audit logging for all tenant data access
  • Tenant-specific encryption keys for sensitive data
  • Compliance: SOC 2 Type II, GDPR, HIPAA-ready

Performance Requirements:

  • License validation: <100ms p99
  • API response time: <200ms p99
  • Database queries: <50ms p95
  • Support 10K QPS per tenant

Cost Requirements:

  • Minimize infrastructure costs (single database preferred)
  • Avoid database-per-tenant operational complexity
  • Enable horizontal scaling without complete re-architecture

Technical Constraints

Django Framework:

  • Django 4.2 LTS with PostgreSQL 15
  • Django REST Framework for APIs
  • Existing Django admin for management UI
  • Django ORM for database access

Database Requirements:

  • PostgreSQL 15+ (GCP Cloud SQL)
  • Support for Row-Level Security (RLS)
  • Foreign data wrappers for analytics
  • Point-in-time recovery per tenant

Deployment Environment:

  • Google Kubernetes Engine (GKE)
  • Cloud SQL PostgreSQL (HA configuration)
  • Redis Memorystore for caching
  • Cloud KMS for encryption

Decision

We will implement PostgreSQL Row-Level Security (RLS) with django-multitenant for complete tenant isolation at the database layer.

Core Architecture

Key Components

1. PostgreSQL Row-Level Security (Database Layer)

RLS policies enforce tenant isolation at the database level, making it impossible for application bugs to cause cross-tenant data leakage.

2. django-multitenant Library (Application Layer)

Provides Django ORM integration with automatic tenant filtering, middleware for tenant context, and model abstractions.

3. Tenant Context Middleware

Sets current tenant from authenticated user on every request, ensuring all queries automatically filter to current tenant.

4. TenantModel Base Class

All tenant-scoped models inherit from TenantModel which enforces tenant FK and automatic filtering.


Implementation

Phase 1: Database Schema & RLS Policies

Core Tenant Table

-- Tenant table (shared across all tenants)
CREATE TABLE tenants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL UNIQUE,
slug VARCHAR(100) NOT NULL UNIQUE,
status VARCHAR(20) NOT NULL DEFAULT 'active',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

-- Subscription details
plan_tier VARCHAR(50) NOT NULL DEFAULT 'free',
max_users INTEGER NOT NULL DEFAULT 5,
max_projects INTEGER NOT NULL DEFAULT 3,

-- Billing
stripe_customer_id VARCHAR(255),
stripe_subscription_id VARCHAR(255),

-- Security
encryption_key_id VARCHAR(255), -- Cloud KMS key ID

-- Metadata
settings JSONB NOT NULL DEFAULT '{}',
metadata JSONB NOT NULL DEFAULT '{}'
);

CREATE INDEX idx_tenants_slug ON tenants(slug);
CREATE INDEX idx_tenants_status ON tenants(status);
CREATE INDEX idx_tenants_stripe ON tenants(stripe_customer_id) WHERE stripe_customer_id IS NOT NULL;

-- Trigger for updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER tenants_updated_at BEFORE UPDATE ON tenants
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

Tenant-Scoped Tables with RLS

-- Users table (tenant-scoped)
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,

email VARCHAR(255) NOT NULL,
username VARCHAR(150) NOT NULL,
full_name VARCHAR(255),

-- Authentication
password_hash VARCHAR(255) NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
is_staff BOOLEAN NOT NULL DEFAULT FALSE,

-- Tenant role
role VARCHAR(50) NOT NULL DEFAULT 'member', -- owner, admin, member, viewer

-- Timestamps
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_login TIMESTAMPTZ,

-- Unique constraint per tenant
UNIQUE(tenant_id, email),
UNIQUE(tenant_id, username)
);

CREATE INDEX idx_users_tenant ON users(tenant_id);
CREATE INDEX idx_users_email ON users(tenant_id, email);
CREATE INDEX idx_users_username ON users(tenant_id, username);

CREATE TRIGGER users_updated_at BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- Enable Row-Level Security
ALTER TABLE users ENABLE ROW LEVEL SECURITY;

-- RLS Policy: Users can only see users in their tenant
CREATE POLICY users_tenant_isolation ON users
USING (tenant_id = current_setting('app.current_tenant_id')::UUID);

-- RLS Policy: Bypass for superusers (for migrations, admin)
CREATE POLICY users_bypass_rls ON users
TO postgres
USING (true);

License Sessions Table

CREATE TABLE license_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,

user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,

-- Session details
session_token VARCHAR(255) NOT NULL UNIQUE,
machine_id VARCHAR(255) NOT NULL,
ip_address INET,
user_agent TEXT,

-- License info
license_type VARCHAR(50) NOT NULL, -- pro, team, enterprise
features JSONB NOT NULL DEFAULT '[]',

-- Status
status VARCHAR(20) NOT NULL DEFAULT 'active', -- active, expired, revoked

-- Timestamps
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
last_validated_at TIMESTAMPTZ,
revoked_at TIMESTAMPTZ,

-- Metadata
metadata JSONB NOT NULL DEFAULT '{}'
);

CREATE INDEX idx_license_sessions_tenant ON license_sessions(tenant_id);
CREATE INDEX idx_license_sessions_user ON license_sessions(user_id);
CREATE INDEX idx_license_sessions_token ON license_sessions(session_token);
CREATE INDEX idx_license_sessions_status ON license_sessions(tenant_id, status);
CREATE INDEX idx_license_sessions_expires ON license_sessions(expires_at) WHERE status = 'active';

CREATE TRIGGER license_sessions_updated_at BEFORE UPDATE ON license_sessions
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- Enable RLS
ALTER TABLE license_sessions ENABLE ROW LEVEL SECURITY;

-- RLS Policy: Tenant isolation
CREATE POLICY license_sessions_tenant_isolation ON license_sessions
USING (tenant_id = current_setting('app.current_tenant_id')::UUID);

CREATE POLICY license_sessions_bypass_rls ON license_sessions
TO postgres
USING (true);

Projects Table

CREATE TABLE projects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,

name VARCHAR(255) NOT NULL,
slug VARCHAR(100) NOT NULL,
description TEXT,

-- Ownership
owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,

-- Settings
settings JSONB NOT NULL DEFAULT '{}',

-- Status
status VARCHAR(20) NOT NULL DEFAULT 'active',
archived_at TIMESTAMPTZ,

-- Timestamps
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

UNIQUE(tenant_id, slug)
);

CREATE INDEX idx_projects_tenant ON projects(tenant_id);
CREATE INDEX idx_projects_owner ON projects(owner_id);
CREATE INDEX idx_projects_slug ON projects(tenant_id, slug);
CREATE INDEX idx_projects_status ON projects(tenant_id, status);

CREATE TRIGGER projects_updated_at BEFORE UPDATE ON projects
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- Enable RLS
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;

CREATE POLICY projects_tenant_isolation ON projects
USING (tenant_id = current_setting('app.current_tenant_id')::UUID);

CREATE POLICY projects_bypass_rls ON projects
TO postgres
USING (true);

Audit Log Table

CREATE TABLE audit_logs (
id BIGSERIAL PRIMARY KEY,
tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,

-- Actor
user_id UUID REFERENCES users(id) ON DELETE SET NULL,
user_email VARCHAR(255),

-- Action
action VARCHAR(100) NOT NULL, -- create, update, delete, login, etc.
resource_type VARCHAR(100) NOT NULL, -- user, project, license, etc.
resource_id UUID,

-- Details
changes JSONB, -- before/after for updates
metadata JSONB NOT NULL DEFAULT '{}',

-- Context
ip_address INET,
user_agent TEXT,

-- Timestamp
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Partitioning by tenant_id and created_at for performance
CREATE INDEX idx_audit_logs_tenant ON audit_logs(tenant_id, created_at DESC);
CREATE INDEX idx_audit_logs_user ON audit_logs(user_id, created_at DESC);
CREATE INDEX idx_audit_logs_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX idx_audit_logs_action ON audit_logs(tenant_id, action);

-- Enable RLS
ALTER TABLE audit_logs ENABLE ROW LEVEL SECURITY;

CREATE POLICY audit_logs_tenant_isolation ON audit_logs
USING (tenant_id = current_setting('app.current_tenant_id')::UUID);

CREATE POLICY audit_logs_bypass_rls ON audit_logs
TO postgres
USING (true);

Tenant Context Functions

-- Set current tenant for session
CREATE OR REPLACE FUNCTION set_current_tenant(tenant_uuid UUID)
RETURNS void AS $$
BEGIN
PERFORM set_config('app.current_tenant_id', tenant_uuid::TEXT, false);
END;
$$ LANGUAGE plpgsql;

-- Get current tenant
CREATE OR REPLACE FUNCTION get_current_tenant()
RETURNS UUID AS $$
BEGIN
RETURN current_setting('app.current_tenant_id', true)::UUID;
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Clear tenant context
CREATE OR REPLACE FUNCTION clear_current_tenant()
RETURNS void AS $$
BEGIN
PERFORM set_config('app.current_tenant_id', '', false);
END;
$$ LANGUAGE plpgsql;

Phase 2: Django Integration

Django Settings Configuration

# settings/base.py

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',

# Third-party
'rest_framework',
'django_multitenant',
'corsheaders',

# Local apps
'apps.tenants',
'apps.users',
'apps.licenses',
'apps.projects',
'apps.audit',
]

MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',

# Tenant middleware MUST come after authentication
'apps.tenants.middleware.TenantMiddleware',

'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

# Database configuration
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.getenv('DB_NAME', 'coditect'),
'USER': os.getenv('DB_USER', 'postgres'),
'PASSWORD': os.getenv('DB_PASSWORD'),
'HOST': os.getenv('DB_HOST', 'localhost'),
'PORT': os.getenv('DB_PORT', '5432'),
'ATOMIC_REQUESTS': True,
'CONN_MAX_AGE': 600,
'OPTIONS': {
'connect_timeout': 10,
'options': '-c search_path=public',
}
}
}

# Django Multitenant Configuration
MULTITENANT_RELATIVE_NAME = 'tenant'

# Custom user model
AUTH_USER_MODEL = 'users.User'

# REST Framework
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'apps.auth.authentication.JWTAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.LimitOffsetPagination',
'PAGE_SIZE': 50,
'MAX_PAGE_SIZE': 1000,
}

Tenant Model

# apps/tenants/models.py

from django.db import models
from django.utils.text import slugify
import uuid

class TenantStatus(models.TextChoices):
ACTIVE = 'active', 'Active'
SUSPENDED = 'suspended', 'Suspended'
TRIAL = 'trial', 'Trial'
CANCELLED = 'cancelled', 'Cancelled'

class PlanTier(models.TextChoices):
FREE = 'free', 'Free'
PRO = 'pro', 'Pro'
TEAM = 'team', 'Team'
ENTERPRISE = 'enterprise', 'Enterprise'

class Tenant(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)

# Basic info
name = models.CharField(max_length=255, unique=True)
slug = models.SlugField(max_length=100, unique=True)
status = models.CharField(
max_length=20,
choices=TenantStatus.choices,
default=TenantStatus.ACTIVE
)

# Subscription
plan_tier = models.CharField(
max_length=50,
choices=PlanTier.choices,
default=PlanTier.FREE
)
max_users = models.IntegerField(default=5)
max_projects = models.IntegerField(default=3)

# Billing
stripe_customer_id = models.CharField(max_length=255, null=True, blank=True)
stripe_subscription_id = models.CharField(max_length=255, null=True, blank=True)

# Security
encryption_key_id = models.CharField(max_length=255, null=True, blank=True)

# Metadata
settings = models.JSONField(default=dict)
metadata = models.JSONField(default=dict)

# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)

class Meta:
db_table = 'tenants'
ordering = ['name']
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['status']),
models.Index(fields=['stripe_customer_id']),
]

def __str__(self):
return self.name

def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.name)
super().save(*args, **kwargs)

@property
def is_active(self):
return self.status == TenantStatus.ACTIVE

def get_user_count(self):
return self.users.filter(is_active=True).count()

def get_project_count(self):
return self.projects.filter(status='active').count()

def can_add_user(self):
return self.get_user_count() < self.max_users

def can_add_project(self):
return self.get_project_count() < self.max_projects

TenantModel Base Class

# apps/tenants/models.py (continued)

from django_multitenant.models import TenantModel as BaseTenantModel

class TenantModel(BaseTenantModel):
"""
Base class for all tenant-scoped models.

Automatically filters queries to current tenant and enforces
tenant FK on all operations.
"""

tenant = models.ForeignKey(
Tenant,
on_delete=models.CASCADE,
related_name='%(class)ss', # e.g., tenant.users, tenant.projects
db_index=True
)

class Meta:
abstract = True

def save(self, *args, **kwargs):
# Ensure tenant is set from context if not provided
if not self.tenant_id:
from .context import get_current_tenant
current_tenant = get_current_tenant()
if current_tenant:
self.tenant_id = current_tenant.id
else:
raise ValueError(
f"Cannot save {self.__class__.__name__} without tenant context. "
"Set tenant explicitly or use TenantMiddleware."
)
super().save(*args, **kwargs)

User Model

# apps/users/models.py

from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin
from django.db import models
from apps.tenants.models import TenantModel, Tenant
import uuid

class UserRole(models.TextChoices):
OWNER = 'owner', 'Owner'
ADMIN = 'admin', 'Admin'
MEMBER = 'member', 'Member'
VIEWER = 'viewer', 'Viewer'

class User(TenantModel, AbstractBaseUser, PermissionsMixin):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)

# Authentication
email = models.EmailField(max_length=255)
username = models.CharField(max_length=150)
password = models.CharField(max_length=255)

# Profile
full_name = models.CharField(max_length=255, blank=True)

# Status
is_active = models.BooleanField(default=True)
is_staff = models.BooleanField(default=False)

# Tenant role
role = models.CharField(
max_length=50,
choices=UserRole.choices,
default=UserRole.MEMBER
)

# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
last_login = models.DateTimeField(null=True, blank=True)

USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']

class Meta:
db_table = 'users'
unique_together = [
('tenant', 'email'),
('tenant', 'username'),
]
indexes = [
models.Index(fields=['tenant', 'email']),
models.Index(fields=['tenant', 'username']),
]

def __str__(self):
return f"{self.email} ({self.tenant.name})"

@property
def is_owner(self):
return self.role == UserRole.OWNER

@property
def is_admin(self):
return self.role in [UserRole.OWNER, UserRole.ADMIN]

def has_permission(self, permission):
"""Check tenant-specific permissions."""
role_permissions = {
UserRole.OWNER: ['*'], # All permissions
UserRole.ADMIN: ['users.view', 'users.create', 'projects.*', 'licenses.*'],
UserRole.MEMBER: ['projects.view', 'projects.create', 'licenses.view'],
UserRole.VIEWER: ['projects.view', 'licenses.view'],
}

allowed = role_permissions.get(self.role, [])

if '*' in allowed:
return True

# Check exact match or wildcard
for perm in allowed:
if perm == permission:
return True
if perm.endswith('.*') and permission.startswith(perm[:-2]):
return True

return False

License Session Model

# apps/licenses/models.py

from django.db import models
from apps.tenants.models import TenantModel
from apps.users.models import User
import uuid

class LicenseType(models.TextChoices):
FREE = 'free', 'Free'
PRO = 'pro', 'Pro'
TEAM = 'team', 'Team'
ENTERPRISE = 'enterprise', 'Enterprise'

class SessionStatus(models.TextChoices):
ACTIVE = 'active', 'Active'
EXPIRED = 'expired', 'Expired'
REVOKED = 'revoked', 'Revoked'

class LicenseSession(TenantModel):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)

user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='license_sessions'
)

# Session details
session_token = models.CharField(max_length=255, unique=True, db_index=True)
machine_id = models.CharField(max_length=255)
ip_address = models.GenericIPAddressField(null=True, blank=True)
user_agent = models.TextField(blank=True)

# License info
license_type = models.CharField(
max_length=50,
choices=LicenseType.choices
)
features = models.JSONField(default=list)

# Status
status = models.CharField(
max_length=20,
choices=SessionStatus.choices,
default=SessionStatus.ACTIVE
)

# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField()
last_validated_at = models.DateTimeField(null=True, blank=True)
revoked_at = models.DateTimeField(null=True, blank=True)

# Metadata
metadata = models.JSONField(default=dict)

class Meta:
db_table = 'license_sessions'
indexes = [
models.Index(fields=['tenant', 'status']),
models.Index(fields=['user']),
models.Index(fields=['session_token']),
models.Index(fields=['expires_at']),
]

def __str__(self):
return f"Session {self.session_token[:8]}... ({self.user.email})"

@property
def is_valid(self):
from django.utils import timezone
return (
self.status == SessionStatus.ACTIVE and
self.expires_at > timezone.now()
)

def validate(self):
"""Validate session and update last_validated_at."""
from django.utils import timezone

if not self.is_valid:
return False

self.last_validated_at = timezone.now()
self.save(update_fields=['last_validated_at'])
return True

def revoke(self):
"""Revoke this session."""
from django.utils import timezone

self.status = SessionStatus.REVOKED
self.revoked_at = timezone.now()
self.save(update_fields=['status', 'revoked_at'])

Project Model

# apps/projects/models.py

from django.db import models
from django.utils.text import slugify
from apps.tenants.models import TenantModel
from apps.users.models import User
import uuid

class ProjectStatus(models.TextChoices):
ACTIVE = 'active', 'Active'
ARCHIVED = 'archived', 'Archived'

class Project(TenantModel):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)

name = models.CharField(max_length=255)
slug = models.SlugField(max_length=100)
description = models.TextField(blank=True)

# Ownership
owner = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='owned_projects'
)

# Settings
settings = models.JSONField(default=dict)

# Status
status = models.CharField(
max_length=20,
choices=ProjectStatus.choices,
default=ProjectStatus.ACTIVE
)
archived_at = models.DateTimeField(null=True, blank=True)

# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)

class Meta:
db_table = 'projects'
unique_together = [('tenant', 'slug')]
indexes = [
models.Index(fields=['tenant', 'slug']),
models.Index(fields=['owner']),
models.Index(fields=['tenant', 'status']),
]

def __str__(self):
return f"{self.name} ({self.tenant.name})"

def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.name)
super().save(*args, **kwargs)

def archive(self):
"""Archive this project."""
from django.utils import timezone

self.status = ProjectStatus.ARCHIVED
self.archived_at = timezone.now()
self.save(update_fields=['status', 'archived_at'])

Audit Log Model

# apps/audit/models.py

from django.db import models
from apps.tenants.models import TenantModel
from apps.users.models import User

class AuditLog(TenantModel):
id = models.BigAutoField(primary_key=True)

# Actor
user = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='audit_logs'
)
user_email = models.EmailField(max_length=255, blank=True)

# Action
action = models.CharField(max_length=100) # create, update, delete, login, etc.
resource_type = models.CharField(max_length=100) # user, project, license, etc.
resource_id = models.UUIDField(null=True, blank=True)

# Details
changes = models.JSONField(null=True, blank=True) # before/after for updates
metadata = models.JSONField(default=dict)

# Context
ip_address = models.GenericIPAddressField(null=True, blank=True)
user_agent = models.TextField(blank=True)

# Timestamp
created_at = models.DateTimeField(auto_now_add=True)

class Meta:
db_table = 'audit_logs'
ordering = ['-created_at']
indexes = [
models.Index(fields=['tenant', '-created_at']),
models.Index(fields=['user', '-created_at']),
models.Index(fields=['resource_type', 'resource_id']),
models.Index(fields=['tenant', 'action']),
]

def __str__(self):
return f"{self.action} on {self.resource_type} by {self.user_email or 'system'}"

@classmethod
def log(cls, action, resource_type, resource_id=None, user=None,
changes=None, metadata=None, request=None):
"""
Create an audit log entry.

Usage:
AuditLog.log(
action='create',
resource_type='project',
resource_id=project.id,
user=request.user,
metadata={'project_name': project.name},
request=request
)
"""
from .context import get_current_tenant

tenant = get_current_tenant()
if not tenant:
raise ValueError("Cannot create audit log without tenant context")

log_data = {
'tenant': tenant,
'action': action,
'resource_type': resource_type,
'resource_id': resource_id,
'user': user,
'user_email': user.email if user else '',
'changes': changes or {},
'metadata': metadata or {},
}

if request:
log_data['ip_address'] = get_client_ip(request)
log_data['user_agent'] = request.META.get('HTTP_USER_AGENT', '')

return cls.objects.create(**log_data)

def get_client_ip(request):
"""Extract client IP from request, handling proxies."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0].strip()
return request.META.get('REMOTE_ADDR')

Phase 3: Tenant Context Management

Tenant Context Module

# apps/tenants/context.py

from contextvars import ContextVar
from typing import Optional
from .models import Tenant

# Thread-safe context variable for current tenant
_current_tenant: ContextVar[Optional[Tenant]] = ContextVar('current_tenant', default=None)

def set_current_tenant(tenant: Optional[Tenant]) -> None:
"""
Set the current tenant for this context.

This is used by TenantMiddleware to set tenant from authenticated user.
"""
_current_tenant.set(tenant)

# Also set PostgreSQL session variable for RLS
if tenant:
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT set_current_tenant(%s)", [str(tenant.id)])

def get_current_tenant() -> Optional[Tenant]:
"""Get the current tenant from context."""
return _current_tenant.get()

def clear_current_tenant() -> None:
"""Clear the current tenant context."""
_current_tenant.set(None)

# Clear PostgreSQL session variable
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT clear_current_tenant()")

class tenant_context:
"""
Context manager for temporarily setting tenant context.

Usage:
with tenant_context(my_tenant):
# All queries here are scoped to my_tenant
users = User.objects.all()
"""

def __init__(self, tenant: Optional[Tenant]):
self.tenant = tenant
self.previous_tenant = None

def __enter__(self):
self.previous_tenant = get_current_tenant()
set_current_tenant(self.tenant)
return self.tenant

def __exit__(self, exc_type, exc_val, exc_tb):
set_current_tenant(self.previous_tenant)

Tenant Middleware

# apps/tenants/middleware.py

from django.utils.deprecation import MiddlewareMixin
from django.http import JsonResponse
from .context import set_current_tenant, clear_current_tenant

class TenantMiddleware(MiddlewareMixin):
"""
Middleware to set current tenant from authenticated user.

CRITICAL: This middleware must come AFTER AuthenticationMiddleware
in the MIDDLEWARE setting.
"""

def process_request(self, request):
"""Set tenant from authenticated user."""
clear_current_tenant()

if request.user and request.user.is_authenticated:
# User has tenant FK, set it as current
if hasattr(request.user, 'tenant'):
set_current_tenant(request.user.tenant)
request.tenant = request.user.tenant
else:
# Superuser or staff without tenant (for admin)
request.tenant = None
else:
request.tenant = None

def process_response(self, request, response):
"""Clear tenant context after request."""
clear_current_tenant()
return response

def process_exception(self, request, exception):
"""Clear tenant context on exception."""
clear_current_tenant()
return None

Tenant-Aware ViewSets

# apps/api/viewsets.py

from rest_framework import viewsets
from rest_framework.permissions import IsAuthenticated
from apps.tenants.context import get_current_tenant

class TenantViewSet(viewsets.ModelViewSet):
"""
Base viewset for tenant-scoped resources.

Automatically filters queryset to current tenant and
sets tenant on create operations.
"""

permission_classes = [IsAuthenticated]

def get_queryset(self):
"""Filter queryset to current tenant."""
queryset = super().get_queryset()

# Queryset is already filtered by RLS + django-multitenant,
# but we add explicit filter for clarity
tenant = get_current_tenant()
if tenant:
queryset = queryset.filter(tenant=tenant)

return queryset

def perform_create(self, serializer):
"""Set tenant on create."""
tenant = get_current_tenant()
if not tenant:
from rest_framework.exceptions import PermissionDenied
raise PermissionDenied("No tenant context available")

serializer.save(tenant=tenant)

class UserViewSet(TenantViewSet):
"""User management API."""
from apps.users.models import User
from apps.users.serializers import UserSerializer

queryset = User.objects.all()
serializer_class = UserSerializer

def get_queryset(self):
queryset = super().get_queryset()

# Additional filtering based on user role
user = self.request.user

if user.is_owner or user.is_admin:
# Owners and admins see all users in tenant
return queryset
else:
# Members see only themselves
return queryset.filter(id=user.id)

class ProjectViewSet(TenantViewSet):
"""Project management API."""
from apps.projects.models import Project
from apps.projects.serializers import ProjectSerializer

queryset = Project.objects.filter(status='active')
serializer_class = ProjectSerializer

def perform_create(self, serializer):
"""Set owner to current user."""
super().perform_create(serializer)
serializer.save(owner=self.request.user)

Phase 4: Testing & Validation

RLS Policy Testing

# apps/tenants/tests/test_rls.py

from django.test import TestCase, TransactionTestCase
from django.db import connection
from apps.tenants.models import Tenant
from apps.users.models import User
from apps.projects.models import Project
import uuid

class RowLevelSecurityTestCase(TransactionTestCase):
"""Test Row-Level Security enforcement at database level."""

def setUp(self):
"""Create test tenants and users."""
self.tenant1 = Tenant.objects.create(
name="Tenant 1",
slug="tenant-1"
)
self.tenant2 = Tenant.objects.create(
name="Tenant 2",
slug="tenant-2"
)

from apps.tenants.context import set_current_tenant

# Create users in tenant1
set_current_tenant(self.tenant1)
self.user1 = User.objects.create(
email="user1@tenant1.com",
username="user1",
tenant=self.tenant1
)

# Create users in tenant2
set_current_tenant(self.tenant2)
self.user2 = User.objects.create(
email="user2@tenant2.com",
username="user2",
tenant=self.tenant2
)

def test_rls_prevents_cross_tenant_access(self):
"""Verify RLS blocks cross-tenant queries."""
from apps.tenants.context import set_current_tenant

# Set context to tenant1
set_current_tenant(self.tenant1)

# Should see only tenant1 users
users = User.objects.all()
self.assertEqual(users.count(), 1)
self.assertEqual(users.first().email, "user1@tenant1.com")

# Switch to tenant2
set_current_tenant(self.tenant2)

# Should see only tenant2 users
users = User.objects.all()
self.assertEqual(users.count(), 1)
self.assertEqual(users.first().email, "user2@tenant2.com")

def test_rls_enforced_at_raw_sql_level(self):
"""Verify RLS works even with raw SQL queries."""
from apps.tenants.context import set_current_tenant

# Set context to tenant1
set_current_tenant(self.tenant1)

# Raw SQL query should still respect RLS
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
self.assertEqual(count, 1)

# Switch to tenant2
set_current_tenant(self.tenant2)

with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
self.assertEqual(count, 1)

def test_cannot_update_other_tenant_data(self):
"""Verify RLS prevents updates to other tenant's data."""
from apps.tenants.context import set_current_tenant

# Set context to tenant1
set_current_tenant(self.tenant1)

# Try to update tenant2's user (should fail silently due to RLS)
with connection.cursor() as cursor:
cursor.execute(
"UPDATE users SET username = %s WHERE id = %s",
['hacked', str(self.user2.id)]
)

# Verify user2 was NOT updated
set_current_tenant(self.tenant2)
user2 = User.objects.get(id=self.user2.id)
self.assertEqual(user2.username, 'user2') # Unchanged

def test_cannot_delete_other_tenant_data(self):
"""Verify RLS prevents deletes of other tenant's data."""
from apps.tenants.context import set_current_tenant

# Set context to tenant1
set_current_tenant(self.tenant1)

# Try to delete tenant2's user (should fail silently)
with connection.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", [str(self.user2.id)])

# Verify user2 still exists
set_current_tenant(self.tenant2)
user2 = User.objects.get(id=self.user2.id)
self.assertIsNotNone(user2)

class TenantIsolationTestCase(TestCase):
"""Test tenant isolation at Django ORM level."""

def setUp(self):
self.tenant1 = Tenant.objects.create(name="Company A", slug="company-a")
self.tenant2 = Tenant.objects.create(name="Company B", slug="company-b")

from apps.tenants.context import set_current_tenant

# Create projects in tenant1
set_current_tenant(self.tenant1)
self.user1 = User.objects.create(
email="admin@companya.com",
username="admin1",
tenant=self.tenant1,
role='owner'
)
self.project1 = Project.objects.create(
name="Project A",
slug="project-a",
tenant=self.tenant1,
owner=self.user1
)

# Create projects in tenant2
set_current_tenant(self.tenant2)
self.user2 = User.objects.create(
email="admin@companyb.com",
username="admin2",
tenant=self.tenant2,
role='owner'
)
self.project2 = Project.objects.create(
name="Project B",
slug="project-b",
tenant=self.tenant2,
owner=self.user2
)

def test_queryset_filtered_by_tenant(self):
"""Verify querysets automatically filter to current tenant."""
from apps.tenants.context import set_current_tenant

set_current_tenant(self.tenant1)
projects = Project.objects.all()
self.assertEqual(projects.count(), 1)
self.assertEqual(projects.first().name, "Project A")

set_current_tenant(self.tenant2)
projects = Project.objects.all()
self.assertEqual(projects.count(), 1)
self.assertEqual(projects.first().name, "Project B")

def test_get_by_id_respects_tenant(self):
"""Verify get() respects tenant filtering."""
from apps.tenants.context import set_current_tenant
from django.core.exceptions import ObjectDoesNotExist

set_current_tenant(self.tenant1)

# Can get own project
project = Project.objects.get(id=self.project1.id)
self.assertEqual(project.name, "Project A")

# Cannot get other tenant's project
with self.assertRaises(ObjectDoesNotExist):
Project.objects.get(id=self.project2.id)

def test_filter_respects_tenant(self):
"""Verify filter() respects tenant filtering."""
from apps.tenants.context import set_current_tenant

set_current_tenant(self.tenant1)

# Filter by slug should only return tenant1 projects
projects = Project.objects.filter(slug="project-b")
self.assertEqual(projects.count(), 0)

projects = Project.objects.filter(slug="project-a")
self.assertEqual(projects.count(), 1)

API Integration Testing

# apps/api/tests/test_tenant_api.py

from rest_framework.test import APITestCase
from rest_framework import status
from apps.tenants.models import Tenant
from apps.users.models import User
from apps.projects.models import Project

class TenantAPITestCase(APITestCase):
"""Test tenant isolation in REST API."""

def setUp(self):
"""Create test tenants and users."""
self.tenant1 = Tenant.objects.create(name="Tenant 1", slug="tenant-1")
self.tenant2 = Tenant.objects.create(name="Tenant 2", slug="tenant-2")

from apps.tenants.context import set_current_tenant

# Create users
set_current_tenant(self.tenant1)
self.user1 = User.objects.create_user(
email="user1@tenant1.com",
username="user1",
password="password123",
tenant=self.tenant1,
role='owner'
)

set_current_tenant(self.tenant2)
self.user2 = User.objects.create_user(
email="user2@tenant2.com",
username="user2",
password="password123",
tenant=self.tenant2,
role='owner'
)

# Create projects
set_current_tenant(self.tenant1)
self.project1 = Project.objects.create(
name="Project 1",
slug="project-1",
tenant=self.tenant1,
owner=self.user1
)

set_current_tenant(self.tenant2)
self.project2 = Project.objects.create(
name="Project 2",
slug="project-2",
tenant=self.tenant2,
owner=self.user2
)

def test_user_can_only_see_own_tenant_projects(self):
"""Verify API returns only current tenant's projects."""
# Login as user1
self.client.force_authenticate(user=self.user1)

response = self.client.get('/api/v1/projects/')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 1)
self.assertEqual(response.data['results'][0]['name'], 'Project 1')

# Login as user2
self.client.force_authenticate(user=self.user2)

response = self.client.get('/api/v1/projects/')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 1)
self.assertEqual(response.data['results'][0]['name'], 'Project 2')

def test_user_cannot_access_other_tenant_project_by_id(self):
"""Verify API denies access to other tenant's projects."""
self.client.force_authenticate(user=self.user1)

# Try to access tenant2's project
response = self.client.get(f'/api/v1/projects/{self.project2.id}/')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

def test_user_cannot_create_project_in_other_tenant(self):
"""Verify API prevents cross-tenant project creation."""
self.client.force_authenticate(user=self.user1)

# Try to create project with tenant2 ID (should be ignored)
response = self.client.post('/api/v1/projects/', {
'name': 'Hacker Project',
'slug': 'hacker-project',
'tenant': str(self.tenant2.id), # Attempt to set wrong tenant
'owner': str(self.user1.id)
})

# Project should be created in user1's tenant, not tenant2
self.assertEqual(response.status_code, status.HTTP_201_CREATED)

# Verify project is in tenant1
project_id = response.data['id']
project = Project.objects.get(id=project_id)
self.assertEqual(project.tenant_id, self.tenant1.id)

Performance Testing

# apps/tenants/tests/test_performance.py

from django.test import TestCase
from django.utils import timezone
from apps.tenants.models import Tenant
from apps.users.models import User
from apps.licenses.models import LicenseSession
import time
import statistics

class PerformanceTestCase(TestCase):
"""Test query performance with RLS enabled."""

def setUp(self):
"""Create test data at scale."""
# Create 100 tenants
self.tenants = []
for i in range(100):
tenant = Tenant.objects.create(
name=f"Tenant {i}",
slug=f"tenant-{i}"
)
self.tenants.append(tenant)

# Create 10 users per tenant
from apps.tenants.context import set_current_tenant

for tenant in self.tenants:
set_current_tenant(tenant)

for j in range(10):
user = User.objects.create(
email=f"user{j}@tenant{tenant.id}.com",
username=f"user{j}_tenant{tenant.id}",
tenant=tenant
)

# Create 5 license sessions per user
for k in range(5):
LicenseSession.objects.create(
tenant=tenant,
user=user,
session_token=f"token_{tenant.id}_{j}_{k}",
machine_id=f"machine_{j}_{k}",
license_type='pro',
expires_at=timezone.now() + timezone.timedelta(days=30)
)

def test_query_performance_with_rls(self):
"""Measure query performance with RLS enabled."""
from apps.tenants.context import set_current_tenant

timings = []

# Test 100 queries across different tenants
for tenant in self.tenants[:10]: # Test first 10 tenants
set_current_tenant(tenant)

start = time.perf_counter()

# Typical query: Get active license sessions
sessions = LicenseSession.objects.filter(
status='active'
).select_related('user').all()

# Force evaluation
count = len(sessions)

elapsed = (time.perf_counter() - start) * 1000 # ms
timings.append(elapsed)

# Performance assertions
avg_time = statistics.mean(timings)
p95_time = statistics.quantiles(timings, n=20)[18] # 95th percentile
p99_time = max(timings)

print(f"\nQuery Performance (100 queries):")
print(f" Average: {avg_time:.2f}ms")
print(f" P95: {p95_time:.2f}ms")
print(f" P99: {p99_time:.2f}ms")

# Assert performance targets
self.assertLess(avg_time, 50, "Average query time should be <50ms")
self.assertLess(p95_time, 100, "P95 query time should be <100ms")
self.assertLess(p99_time, 200, "P99 query time should be <200ms")

def test_index_usage(self):
"""Verify queries use indexes efficiently."""
from apps.tenants.context import set_current_tenant
from django.db import connection

set_current_tenant(self.tenants[0])

with connection.cursor() as cursor:
# Enable query plan output
cursor.execute("EXPLAIN (ANALYZE, BUFFERS) SELECT * FROM license_sessions WHERE tenant_id = %s AND status = 'active'", [str(self.tenants[0].id)])

plan = cursor.fetchall()
plan_text = '\n'.join([row[0] for row in plan])

print(f"\nQuery Plan:\n{plan_text}")

# Verify index usage
self.assertIn('Index Scan', plan_text, "Query should use index scan")
self.assertNotIn('Seq Scan on license_sessions', plan_text, "Query should not use sequential scan")

Phase 5: Production Deployment

Django Management Commands

# apps/tenants/management/commands/create_tenant.py

from django.core.management.base import BaseCommand
from apps.tenants.models import Tenant, PlanTier
from apps.users.models import User, UserRole
from apps.tenants.context import set_current_tenant
import secrets

class Command(BaseCommand):
help = 'Create a new tenant with owner account'

def add_arguments(self, parser):
parser.add_argument('name', type=str, help='Tenant name')
parser.add_argument('slug', type=str, help='Tenant slug')
parser.add_argument('--email', type=str, required=True, help='Owner email')
parser.add_argument('--username', type=str, required=True, help='Owner username')
parser.add_argument('--plan', type=str, default='free', choices=['free', 'pro', 'team', 'enterprise'])

def handle(self, *args, **options):
# Create tenant
tenant = Tenant.objects.create(
name=options['name'],
slug=options['slug'],
plan_tier=options['plan']
)

self.stdout.write(self.style.SUCCESS(f"Created tenant: {tenant.name} ({tenant.id})"))

# Set tenant context for user creation
set_current_tenant(tenant)

# Generate temporary password
temp_password = secrets.token_urlsafe(16)

# Create owner user
user = User.objects.create_user(
email=options['email'],
username=options['username'],
password=temp_password,
tenant=tenant,
role=UserRole.OWNER
)

self.stdout.write(self.style.SUCCESS(f"Created owner: {user.email}"))
self.stdout.write(self.style.WARNING(f"Temporary password: {temp_password}"))
self.stdout.write(self.style.WARNING("User should change password on first login"))
# apps/tenants/management/commands/verify_rls.py

from django.core.management.base import BaseCommand
from django.db import connection
from apps.tenants.models import Tenant

class Command(BaseCommand):
help = 'Verify Row-Level Security policies are enabled'

def handle(self, *args, **options):
with connection.cursor() as cursor:
# Check RLS enabled on tables
cursor.execute("""
SELECT schemaname, tablename, rowsecurity
FROM pg_tables
WHERE schemaname = 'public'
AND tablename IN ('users', 'projects', 'license_sessions', 'audit_logs')
""")

tables = cursor.fetchall()

self.stdout.write("Row-Level Security Status:")
self.stdout.write("-" * 60)

for schema, table, rls_enabled in tables:
status = self.style.SUCCESS("✓ ENABLED") if rls_enabled else self.style.ERROR("✗ DISABLED")
self.stdout.write(f"{table:<30} {status}")

# Check policies exist
cursor.execute("""
SELECT schemaname, tablename, policyname, cmd, qual
FROM pg_policies
WHERE schemaname = 'public'
ORDER BY tablename, policyname
""")

policies = cursor.fetchall()

self.stdout.write("\nActive RLS Policies:")
self.stdout.write("-" * 60)

for schema, table, policy, cmd, qual in policies:
self.stdout.write(f"{table}.{policy}")
self.stdout.write(f" Command: {cmd}")
self.stdout.write(f" Condition: {qual}\n")

Monitoring & Observability

# apps/tenants/middleware.py (enhanced)

from django.utils.deprecation import MiddlewareMixin
from django.http import JsonResponse
from .context import set_current_tenant, clear_current_tenant
import time
import logging

logger = logging.getLogger(__name__)

class TenantMiddleware(MiddlewareMixin):
"""Enhanced tenant middleware with monitoring."""

def process_request(self, request):
"""Set tenant from authenticated user."""
clear_current_tenant()

request.tenant_start_time = time.perf_counter()

if request.user and request.user.is_authenticated:
if hasattr(request.user, 'tenant'):
set_current_tenant(request.user.tenant)
request.tenant = request.user.tenant

# Log tenant access for monitoring
logger.info(
"Tenant context set",
extra={
'tenant_id': str(request.tenant.id),
'tenant_name': request.tenant.name,
'user_id': str(request.user.id),
'user_email': request.user.email,
'path': request.path,
'method': request.method
}
)
else:
request.tenant = None
else:
request.tenant = None

def process_response(self, request, response):
"""Clear tenant context and record metrics."""
if hasattr(request, 'tenant_start_time'):
elapsed = (time.perf_counter() - request.tenant_start_time) * 1000

# Record metrics (send to Prometheus/CloudWatch)
if hasattr(request, 'tenant') and request.tenant:
from .metrics import record_request_duration
record_request_duration(
tenant_id=str(request.tenant.id),
path=request.path,
method=request.method,
status_code=response.status_code,
duration_ms=elapsed
)

clear_current_tenant()
return response
# apps/tenants/metrics.py

from prometheus_client import Counter, Histogram, Gauge
import logging

logger = logging.getLogger(__name__)

# Prometheus metrics
tenant_requests_total = Counter(
'tenant_requests_total',
'Total requests per tenant',
['tenant_id', 'method', 'path', 'status']
)

tenant_request_duration = Histogram(
'tenant_request_duration_milliseconds',
'Request duration per tenant',
['tenant_id', 'method', 'path'],
buckets=[10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000]
)

tenant_active_users = Gauge(
'tenant_active_users',
'Active users per tenant',
['tenant_id']
)

tenant_license_sessions = Gauge(
'tenant_license_sessions',
'Active license sessions per tenant',
['tenant_id', 'status']
)

def record_request_duration(tenant_id, path, method, status_code, duration_ms):
"""Record request metrics."""
try:
tenant_requests_total.labels(
tenant_id=tenant_id,
method=method,
path=path,
status=status_code
).inc()

tenant_request_duration.labels(
tenant_id=tenant_id,
method=method,
path=path
).observe(duration_ms)
except Exception as e:
logger.error(f"Failed to record metrics: {e}")

def update_tenant_metrics(tenant):
"""Update tenant-specific metrics."""
try:
from apps.users.models import User
from apps.licenses.models import LicenseSession
from apps.tenants.context import set_current_tenant

set_current_tenant(tenant)

# Active users
active_users = User.objects.filter(is_active=True).count()
tenant_active_users.labels(tenant_id=str(tenant.id)).set(active_users)

# License sessions by status
for status in ['active', 'expired', 'revoked']:
count = LicenseSession.objects.filter(status=status).count()
tenant_license_sessions.labels(
tenant_id=str(tenant.id),
status=status
).set(count)

except Exception as e:
logger.error(f"Failed to update tenant metrics: {e}")

Consequences

Positive

Security:

  • Database-enforced isolation - RLS prevents application bugs from causing data leakage
  • Defense in depth - Multiple layers (RLS + ORM + middleware) ensure isolation
  • Audit compliance - Complete audit trail for all tenant data access
  • Tenant-specific encryption - Cloud KMS keys per tenant for sensitive data

Performance:

  • Single database - Lower infrastructure costs than database-per-tenant
  • Efficient indexes - Composite indexes on (tenant_id, ...) for fast queries
  • Connection pooling - Shared connection pool across all tenants
  • <50ms query latency - Meets performance requirements at scale

Maintainability:

  • Single schema - No migrations across multiple databases
  • Django ORM integration - Automatic tenant filtering with minimal code
  • Existing admin - Django admin works with tenant filtering
  • Simple backups - Single database to backup/restore

Scalability:

  • Horizontal scaling - Add read replicas for increased load
  • 1000+ tenants supported - Tested at scale with good performance
  • Easy tenant migration - Data isolated by tenant_id for easy export

Negative

Complexity:

  • ⚠️ RLS policy management - Must maintain SQL policies separate from Django models
  • ⚠️ Context management - Must ensure tenant context set on every request
  • ⚠️ Testing complexity - Must test RLS enforcement separately from ORM
  • ⚠️ Migration coordination - Schema changes affect all tenants simultaneously

Limitations:

  • ⚠️ No true database isolation - All tenants share same database instance
  • ⚠️ Tenant-specific backups harder - Cannot restore single tenant easily
  • ⚠️ Custom schemas difficult - Cannot give tenants custom fields easily
  • ⚠️ PostgreSQL dependency - RLS only available in PostgreSQL

Risks:

  • ⚠️ RLS policy bugs - Incorrect policy could expose data across tenants
  • ⚠️ Context leakage - Failing to clear context could cause cross-tenant queries
  • ⚠️ Performance degradation - Large tenants could impact others on same DB
  • ⚠️ Migration failures - Schema change affecting all tenants is high-risk

Mitigation Strategies

RLS Policy Validation:

# Automated testing of RLS policies on every deployment
python manage.py verify_rls
python manage.py test apps.tenants.tests.test_rls

Context Management:

# Middleware enforces context setting/clearing
# Context managers for async tasks and background jobs
with tenant_context(my_tenant):
# Safe tenant-scoped operations
pass

Performance Monitoring:

# Prometheus metrics per tenant
# Alerts for slow queries or high tenant resource usage
# Circuit breakers for noisy neighbor protection

Backup Strategy:

# Logical backups with pg_dump per tenant_id
# Point-in-time recovery with WAL archiving
# Test restore procedures monthly

Alternatives Considered

Alternative 1: Database-Per-Tenant

Architecture:

  • Each tenant gets own PostgreSQL database
  • Application connects to different DB based on tenant
  • Complete database isolation

Pros:

  • ✅ True database isolation (strongest security)
  • ✅ Tenant-specific backups trivial
  • ✅ Custom schemas per tenant easy
  • ✅ Noisy neighbor impossible

Cons:

  • ❌ Operational complexity (100s-1000s of databases)
  • ❌ Migration hell (must run on all databases)
  • ❌ Connection pool exhaustion (one pool per DB)
  • ❌ Cross-tenant analytics impossible
  • ❌ Significantly higher infrastructure costs

Decision: Rejected due to operational complexity and cost at scale.


Alternative 2: Schema-Per-Tenant

Architecture:

  • Each tenant gets own PostgreSQL schema
  • Shared database, isolated schemas
  • Django sets search_path per request

Pros:

  • ✅ Good isolation (schema-level)
  • ✅ Tenant-specific backups possible
  • ✅ Custom schemas per tenant feasible
  • ✅ Shared database (lower cost than DB-per-tenant)

Cons:

  • ❌ Connection pooling issues (search_path per connection)
  • ❌ Migration complexity (must run on all schemas)
  • ❌ Django ORM doesn't support schemas well
  • ❌ Schema limits (~1000 per database)

Decision: Rejected due to poor Django support and schema limits.


Alternative 3: Application-Level Filtering Only

Architecture:

  • Single database, single schema
  • Django ORM filters by tenant_id
  • NO Row-Level Security at database level

Pros:

  • ✅ Simplest implementation
  • ✅ Native Django ORM support
  • ✅ Easy migrations
  • ✅ Good performance

Cons:

  • ❌ NO database-level enforcement (application bugs can leak data)
  • ❌ Raw SQL bypasses filtering
  • ❌ Admin queries risky
  • ❌ Compliance issues (SOC 2, HIPAA)

Decision: Rejected due to security concerns and compliance requirements.


Alternative 4: Multi-Database with Sharding

Architecture:

  • Multiple databases (e.g., 10)
  • Tenants distributed across databases via hash(tenant_id)
  • Application routes queries to correct database

Pros:

  • ✅ Better scalability than single database
  • ✅ Failure isolation (one DB down affects subset of tenants)
  • ✅ Performance isolation

Cons:

  • ❌ Complex routing logic
  • ❌ Cross-tenant analytics difficult
  • ❌ Rebalancing tenants requires data migration
  • ❌ Higher operational complexity

Decision: Deferred for Phase 2 (if single DB hits limits).


ADR-001: Project Intelligence Platform Architecture

Defines the core tenant concept and multi-tenancy requirements.

Relationship: ADR-007 implements the multi-tenant data layer for the platform defined in ADR-001.


ADR-009: GCP Infrastructure Architecture

Defines Cloud SQL PostgreSQL configuration and high-availability setup.

Relationship: ADR-007 depends on Cloud SQL PostgreSQL 15+ with RLS support from ADR-009.


ADR-006: Secret Management Strategy

Defines encryption key management per tenant using Cloud KMS.

Relationship: ADR-007 references Cloud KMS key IDs per tenant for encrypting sensitive data.


Implementation Timeline

Phase 1: Foundation (Week 1-2)

  • ✅ Database schema design
  • ✅ RLS policies for all tables
  • ✅ Tenant model and context management
  • ✅ Basic middleware

Phase 2: Django Integration (Week 3-4)

  • ✅ User model with tenant FK
  • ✅ TenantModel base class
  • ✅ License session model
  • ✅ Project model

Phase 3: Testing (Week 5-6)

  • ✅ RLS policy tests
  • ✅ Tenant isolation tests
  • ✅ API integration tests
  • ✅ Performance tests

Phase 4: Production (Week 7-8)

  • ✅ Management commands
  • ✅ Monitoring & metrics
  • ✅ Documentation
  • ✅ Deployment

References

Documentation:

Related ADRs:


Last Updated: 2025-11-30 Review Date: 2026-02-28 Status: Accepted ✅