DocumentViewToken Django Model Design
Executive Summary
The DocumentViewToken model provides secure, time-limited access to BIO-QMS platform documents after NDA verification. It implements a two-tier validation system: Redis caching for high-performance token validation (sub-millisecond) and PostgreSQL for authoritative record storage with comprehensive audit trails.
Key Capabilities:
- Token-based access control with UUID v4 entropy (122 bits)
- Per-project TTL configuration (default 90 days, configurable)
- Redis caching layer with 5-minute TTL for validation lookups
- Grace period access (7 days read-only after expiry)
- IP whitelisting and scope-based access control
- Comprehensive audit logging (access count, last accessed, revocation tracking)
Performance Targets:
- Token validation: <2ms (Redis cache hit)
- Token creation: <50ms
- Bulk revocation: <100ms per 1000 tokens
- Concurrent token validations: 10,000 req/s
Table of Contents
- Model Definition
- TTL Configuration System
- Redis Caching Layer
- Token Validation Logic
- Token Lifecycle Management
- Manager and QuerySet
- Django Admin Interface
- Serializers and Views
- Database Design
- Security Considerations
- Testing Strategy
- Deployment Configuration
1. Model Definition
1.1 Core Model
# backend/access_control/models/document_view_token.py
import uuid
from datetime import timedelta
from typing import Optional, Dict, List
from django.db import models
from django.utils import timezone
from django.core.exceptions import ValidationError
from django.contrib.postgres.indexes import HashIndex
from ..managers import DocumentViewTokenManager
class DocumentViewToken(models.Model):
"""
Time-limited token for accessing BIO-QMS documents after NDA verification.
Tokens are created after successful NDA signing and grant access to specific
document categories within a project. Validation is cached in Redis for
performance, with PostgreSQL as the authoritative source.
Lifecycle:
1. Created after NDA verification (granted_at)
2. Validated on each document access (last_accessed, access_count)
3. Optionally refreshed when nearing expiry
4. Revoked explicitly or expired automatically (revoked_at, expires_at)
5. Archived after grace period (7 days post-expiry)
Access Control:
- project_id: which BIO-QMS project this token grants access to
- scope: JSON field specifying document categories/audiences
- ip_whitelist: optional list of allowed IP addresses
Caching:
- Redis key: dvt:{token}
- Cache TTL: 5 minutes
- Invalidated immediately on revocation
"""
# Primary Key and Core Relations
token = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False,
help_text="UUID v4 token (122 bits entropy)"
)
nda_record = models.ForeignKey(
'access_control.NDARecord',
on_delete=models.PROTECT,
related_name='view_tokens',
db_index=True,
help_text="NDA record that authorized this token"
)
project_id = models.CharField(
max_length=100,
db_index=True,
help_text="BIO-QMS project identifier (e.g., 'bio-qms-core', 'clinical-trials-mgmt')"
)
# Temporal Fields
granted_at = models.DateTimeField(
auto_now_add=True,
db_index=True,
help_text="When token was created/granted"
)
expires_at = models.DateTimeField(
db_index=True,
help_text="Token expiry timestamp (computed from project TTL)"
)
revoked_at = models.DateTimeField(
null=True,
blank=True,
db_index=True,
help_text="When token was explicitly revoked (NULL = not revoked)"
)
last_accessed = models.DateTimeField(
null=True,
blank=True,
help_text="Most recent token validation timestamp"
)
# Usage Tracking
access_count = models.IntegerField(
default=0,
help_text="Number of times token has been validated"
)
# Access Control
ip_whitelist = models.JSONField(
null=True,
blank=True,
help_text="Optional list of allowed IP addresses (CIDR notation supported)"
)
scope = models.JSONField(
default=dict,
help_text="Access scope: document categories, audiences, permissions"
)
# Metadata
created_by = models.ForeignKey(
'auth.User',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='created_tokens',
help_text="User who granted this token (system user if automated)"
)
notes = models.TextField(
blank=True,
help_text="Admin notes (e.g., reason for manual revocation)"
)
# Custom manager
objects = DocumentViewTokenManager()
class Meta:
db_table = 'access_control_document_view_token'
verbose_name = 'Document View Token'
verbose_name_plural = 'Document View Tokens'
ordering = ['-granted_at']
indexes = [
# Primary lookup patterns
models.Index(fields=['nda_record', 'project_id'], name='dvt_nda_project_idx'),
models.Index(fields=['expires_at', 'revoked_at'], name='dvt_expiry_idx'),
# Cleanup/archival queries
models.Index(fields=['granted_at'], name='dvt_granted_idx'),
# Hash index for UUID lookups (PostgreSQL-specific)
HashIndex(fields=['token'], name='dvt_token_hash_idx'),
]
constraints = [
# Token cannot be revoked before it was granted
models.CheckConstraint(
check=models.Q(revoked_at__gte=models.F('granted_at')) | models.Q(revoked_at__isnull=True),
name='dvt_revoked_after_granted'
),
# Token must expire after it was granted
models.CheckConstraint(
check=models.Q(expires_at__gt=models.F('granted_at')),
name='dvt_expires_after_granted'
),
]
def __str__(self) -> str:
return f"Token {self.token} (Project: {self.project_id}, NDA: {self.nda_record_id})"
def __repr__(self) -> str:
return (
f"<DocumentViewToken token={self.token} "
f"project={self.project_id} "
f"expires={self.expires_at.isoformat()} "
f"revoked={self.revoked_at is not None}>"
)
# Validation methods in Section 4
# Lifecycle methods in Section 5
1.2 Scope Field Schema
The scope JSON field defines what documents the token grants access to:
# Example scope structure
{
"document_categories": [
"regulatory-documents",
"clinical-trials",
"quality-records"
],
"audience_levels": [
"internal",
"external-partner"
],
"permissions": {
"read": True,
"download": True,
"print": False,
"share": False
},
"document_ids": [
# Optional: explicit document whitelist
"doc-uuid-1",
"doc-uuid-2"
],
"excluded_categories": [
# Optional: categories to exclude
"financial-records"
]
}
1.3 IP Whitelist Field Schema
The ip_whitelist JSON field supports CIDR notation:
# Example IP whitelist
[
"192.168.1.100", # Single IP
"10.0.0.0/8", # CIDR block
"2001:db8::/32" # IPv6 CIDR
]
# NULL = no IP restrictions (default)
2. TTL Configuration System
2.1 ProjectTokenConfig Model
Per-project TTL overrides and access policies:
# backend/access_control/models/project_token_config.py
from django.db import models
from django.core.validators import MinValueValidator
class ProjectTokenConfig(models.Model):
"""
Per-project configuration for DocumentViewToken TTL and access policies.
If no config exists for a project, defaults from settings are used:
- DEFAULT_TOKEN_TTL_DAYS = 90
- DEFAULT_GRACE_PERIOD_DAYS = 7
- DEFAULT_AUTO_RENEWAL = False
"""
project_id = models.CharField(
max_length=100,
unique=True,
primary_key=True,
help_text="BIO-QMS project identifier"
)
# TTL Settings
token_ttl_days = models.IntegerField(
validators=[MinValueValidator(1)],
default=90,
help_text="Token validity period in days (default: 90)"
)
grace_period_days = models.IntegerField(
validators=[MinValueValidator(0)],
default=7,
help_text="Read-only access period after expiry (default: 7)"
)
# Auto-Renewal
auto_renewal_enabled = models.BooleanField(
default=False,
help_text="Automatically renew tokens when within renewal_threshold_days"
)
renewal_threshold_days = models.IntegerField(
validators=[MinValueValidator(1)],
default=10,
help_text="Renew when this many days before expiry (if auto_renewal enabled)"
)
max_renewals = models.IntegerField(
validators=[MinValueValidator(0)],
default=3,
help_text="Maximum number of auto-renewals (0 = unlimited)"
)
# Rate Limiting
max_requests_per_minute = models.IntegerField(
validators=[MinValueValidator(1)],
default=100,
help_text="Max token validation requests per minute"
)
max_concurrent_sessions = models.IntegerField(
validators=[MinValueValidator(1)],
default=3,
help_text="Max concurrent active tokens per NDA record"
)
# Access Policies
require_ip_whitelist = models.BooleanField(
default=False,
help_text="Whether IP whitelisting is mandatory for this project"
)
allowed_document_categories = models.JSONField(
default=list,
help_text="List of document categories available for this project"
)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(
'auth.User',
on_delete=models.SET_NULL,
null=True,
related_name='created_token_configs'
)
notes = models.TextField(blank=True)
class Meta:
db_table = 'access_control_project_token_config'
verbose_name = 'Project Token Configuration'
verbose_name_plural = 'Project Token Configurations'
def __str__(self) -> str:
return f"TokenConfig: {self.project_id} (TTL: {self.token_ttl_days}d)"
def get_expiry_date(self, from_datetime=None):
"""Calculate expiry date based on TTL."""
from datetime import timedelta
from django.utils import timezone
base = from_datetime or timezone.now()
return base + timedelta(days=self.token_ttl_days)
def get_grace_expiry_date(self, from_datetime=None):
"""Calculate final expiry including grace period."""
expiry = self.get_expiry_date(from_datetime)
return expiry + timedelta(days=self.grace_period_days)
def is_renewal_eligible(self, token):
"""Check if token is eligible for auto-renewal."""
if not self.auto_renewal_enabled:
return False
if token.revoked_at:
return False
# Check renewal count (stored in token metadata)
renewal_count = token.scope.get('_renewal_count', 0)
if self.max_renewals > 0 and renewal_count >= self.max_renewals:
return False
# Check if within renewal threshold
from django.utils import timezone
days_until_expiry = (token.expires_at - timezone.now()).days
return days_until_expiry <= self.renewal_threshold_days
# Helper function for getting config
def get_project_token_config(project_id: str) -> ProjectTokenConfig:
"""
Get or create token config for project with defaults from settings.
"""
from django.conf import settings
config, created = ProjectTokenConfig.objects.get_or_create(
project_id=project_id,
defaults={
'token_ttl_days': getattr(settings, 'DEFAULT_TOKEN_TTL_DAYS', 90),
'grace_period_days': getattr(settings, 'DEFAULT_GRACE_PERIOD_DAYS', 7),
'auto_renewal_enabled': getattr(settings, 'DEFAULT_AUTO_RENEWAL', False),
}
)
return config
2.2 TTL Calculation Logic
# backend/access_control/utils/ttl.py
from datetime import timedelta
from django.utils import timezone
from ..models import ProjectTokenConfig
def calculate_token_expiry(project_id: str, granted_at=None):
"""
Calculate token expiry based on project configuration.
Args:
project_id: BIO-QMS project identifier
granted_at: Base datetime (default: now)
Returns:
datetime: Token expiry timestamp
"""
config = ProjectTokenConfig.objects.filter(project_id=project_id).first()
if config:
ttl_days = config.token_ttl_days
else:
from django.conf import settings
ttl_days = getattr(settings, 'DEFAULT_TOKEN_TTL_DAYS', 90)
base = granted_at or timezone.now()
return base + timedelta(days=ttl_days)
def get_grace_period_expiry(token):
"""
Calculate final expiry including grace period.
Args:
token: DocumentViewToken instance
Returns:
datetime: Grace period expiry timestamp
"""
config = ProjectTokenConfig.objects.filter(project_id=token.project_id).first()
if config:
grace_days = config.grace_period_days
else:
from django.conf import settings
grace_days = getattr(settings, 'DEFAULT_GRACE_PERIOD_DAYS', 7)
return token.expires_at + timedelta(days=grace_days)
def is_within_grace_period(token) -> bool:
"""
Check if token is in grace period (read-only access allowed).
"""
if not token.is_expired():
return False # Not expired yet
now = timezone.now()
grace_expiry = get_grace_period_expiry(token)
return now <= grace_expiry
3. Redis Caching Layer
3.1 Cache Key Format
# backend/access_control/cache/token_cache.py
import json
import hashlib
from typing import Optional, Dict
from datetime import timedelta
from django.core.cache import cache
from django.conf import settings
from django.utils import timezone
# Cache key prefix
TOKEN_CACHE_PREFIX = "dvt" # DocumentViewToken
TOKEN_CACHE_TTL = 300 # 5 minutes
def get_cache_key(token: str) -> str:
"""
Generate Redis cache key for token.
Format: dvt:{token_uuid}
Example: dvt:550e8400-e29b-41d4-a716-446655440000
"""
return f"{TOKEN_CACHE_PREFIX}:{token}"
def get_rate_limit_key(token: str) -> str:
"""
Generate Redis cache key for rate limiting.
Format: dvt:rl:{token_uuid}
"""
return f"{TOKEN_CACHE_PREFIX}:rl:{token}"
def get_session_count_key(nda_record_id: int, project_id: str) -> str:
"""
Generate Redis cache key for concurrent session tracking.
Format: dvt:sessions:{nda_record_id}:{project_id}
"""
return f"{TOKEN_CACHE_PREFIX}:sessions:{nda_record_id}:{project_id}"
3.2 Cache Warming
# backend/access_control/cache/token_cache.py (continued)
def warm_token_cache(token_obj) -> bool:
"""
Populate Redis cache with token validation data.
Called on token creation and after database updates.
Args:
token_obj: DocumentViewToken instance
Returns:
bool: True if cached successfully
"""
cache_data = serialize_token_for_cache(token_obj)
cache_key = get_cache_key(str(token_obj.token))
try:
cache.set(cache_key, cache_data, timeout=TOKEN_CACHE_TTL)
return True
except Exception as e:
# Log but don't fail - database is authoritative
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Failed to warm cache for token {token_obj.token}: {e}")
return False
def serialize_token_for_cache(token_obj) -> Dict:
"""
Convert DocumentViewToken to cache-friendly dict.
Only includes fields needed for validation (minimal payload).
"""
return {
'token': str(token_obj.token),
'nda_record_id': token_obj.nda_record_id,
'project_id': token_obj.project_id,
'expires_at': token_obj.expires_at.isoformat(),
'revoked_at': token_obj.revoked_at.isoformat() if token_obj.revoked_at else None,
'ip_whitelist': token_obj.ip_whitelist,
'scope': token_obj.scope,
'nda_status': token_obj.nda_record.status, # Denormalized for performance
'grace_period_days': get_grace_period_days(token_obj.project_id),
}
def get_grace_period_days(project_id: str) -> int:
"""Get grace period for project (with caching)."""
cache_key = f"project_grace:{project_id}"
grace_days = cache.get(cache_key)
if grace_days is None:
from ..models import ProjectTokenConfig
config = ProjectTokenConfig.objects.filter(project_id=project_id).first()
if config:
grace_days = config.grace_period_days
else:
from django.conf import settings
grace_days = getattr(settings, 'DEFAULT_GRACE_PERIOD_DAYS', 7)
# Cache for 1 hour
cache.set(cache_key, grace_days, timeout=3600)
return grace_days
3.3 Cache Validation
# backend/access_control/cache/token_cache.py (continued)
def get_cached_token(token: str) -> Optional[Dict]:
"""
Retrieve token validation data from Redis cache.
Args:
token: Token UUID string
Returns:
Dict with token data or None if cache miss
"""
cache_key = get_cache_key(token)
return cache.get(cache_key)
def validate_from_cache(token: str, ip_address: Optional[str] = None) -> Dict:
"""
Validate token from Redis cache.
Returns:
Dict with validation result:
{
'valid': bool,
'reason': str (if invalid),
'grace_period': bool (if in grace period),
'scope': dict,
'cache_hit': bool
}
"""
cache_data = get_cached_token(token)
if not cache_data:
return {'valid': False, 'cache_hit': False, 'reason': 'cache_miss'}
now = timezone.now()
# Check revocation
if cache_data['revoked_at']:
return {
'valid': False,
'cache_hit': True,
'reason': 'token_revoked',
'revoked_at': cache_data['revoked_at']
}
# Check NDA status
if cache_data['nda_status'] != 'active':
return {
'valid': False,
'cache_hit': True,
'reason': 'nda_inactive',
'nda_status': cache_data['nda_status']
}
# Check expiry
expires_at = timezone.datetime.fromisoformat(cache_data['expires_at'])
is_expired = now > expires_at
if is_expired:
# Check grace period
grace_days = cache_data.get('grace_period_days', 7)
grace_expiry = expires_at + timedelta(days=grace_days)
if now > grace_expiry:
return {
'valid': False,
'cache_hit': True,
'reason': 'token_expired',
'expires_at': cache_data['expires_at']
}
# Within grace period - read-only access
return {
'valid': True,
'cache_hit': True,
'grace_period': True,
'scope': {**cache_data['scope'], 'permissions': {'read': True, 'download': False, 'print': False}},
'expires_at': cache_data['expires_at']
}
# Check IP whitelist
if cache_data['ip_whitelist'] and ip_address:
if not is_ip_whitelisted(ip_address, cache_data['ip_whitelist']):
return {
'valid': False,
'cache_hit': True,
'reason': 'ip_not_whitelisted',
'ip_address': ip_address
}
# Token is valid
return {
'valid': True,
'cache_hit': True,
'grace_period': False,
'scope': cache_data['scope'],
'expires_at': cache_data['expires_at']
}
def is_ip_whitelisted(ip_address: str, whitelist: list) -> bool:
"""
Check if IP address matches whitelist (supports CIDR).
"""
import ipaddress
try:
ip = ipaddress.ip_address(ip_address)
except ValueError:
return False
for allowed in whitelist:
try:
# Try CIDR notation first
if '/' in allowed:
network = ipaddress.ip_network(allowed, strict=False)
if ip in network:
return True
else:
# Exact IP match
if ip == ipaddress.ip_address(allowed):
return True
except ValueError:
continue
return False
3.4 Cache Invalidation
# backend/access_control/cache/token_cache.py (continued)
def invalidate_token_cache(token: str) -> bool:
"""
Remove token from Redis cache immediately.
Called on revocation or manual invalidation.
Args:
token: Token UUID string
Returns:
bool: True if invalidated successfully
"""
cache_key = get_cache_key(token)
try:
cache.delete(cache_key)
return True
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to invalidate cache for token {token}: {e}")
return False
def invalidate_project_tokens(project_id: str) -> int:
"""
Bulk invalidate all tokens for a project.
Used when project configuration changes or project access is suspended.
Returns:
int: Number of tokens invalidated
"""
from ..models import DocumentViewToken
tokens = DocumentViewToken.objects.filter(
project_id=project_id,
revoked_at__isnull=True
).values_list('token', flat=True)
count = 0
for token in tokens:
if invalidate_token_cache(str(token)):
count += 1
return count
def invalidate_nda_tokens(nda_record_id: int) -> int:
"""
Bulk invalidate all tokens for an NDA record.
Used when NDA is revoked or expires.
Returns:
int: Number of tokens invalidated
"""
from ..models import DocumentViewToken
tokens = DocumentViewToken.objects.filter(
nda_record_id=nda_record_id,
revoked_at__isnull=True
).values_list('token', flat=True)
count = 0
for token in tokens:
if invalidate_token_cache(str(token)):
count += 1
return count
3.5 Redis Sentinel Configuration
# backend/config/redis.py
from django.conf import settings
from redis.sentinel import Sentinel
def get_redis_sentinel():
"""
Get Redis Sentinel connection for high availability.
Sentinel monitors multiple Redis instances and provides automatic failover.
"""
sentinels = getattr(settings, 'REDIS_SENTINELS', [
('redis-sentinel-1.internal', 26379),
('redis-sentinel-2.internal', 26379),
('redis-sentinel-3.internal', 26379),
])
sentinel = Sentinel(
sentinels,
socket_timeout=0.5,
socket_connect_timeout=0.5,
sentinel_kwargs={
'password': settings.REDIS_SENTINEL_PASSWORD
}
)
return sentinel
def get_redis_master():
"""Get master Redis connection from Sentinel."""
sentinel = get_redis_sentinel()
master = sentinel.master_for(
'mymaster',
socket_timeout=0.5,
password=settings.REDIS_PASSWORD,
db=settings.REDIS_TOKEN_DB
)
return master
def get_redis_slave():
"""Get slave Redis connection for read-only operations."""
sentinel = get_redis_sentinel()
slave = sentinel.slave_for(
'mymaster',
socket_timeout=0.5,
password=settings.REDIS_PASSWORD,
db=settings.REDIS_TOKEN_DB
)
return slave
# Django cache configuration
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
f'redis://{host}:{port}/0'
for host, port in settings.REDIS_SENTINELS
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.SentinelClient',
'SENTINEL_KWARGS': {
'password': settings.REDIS_SENTINEL_PASSWORD
},
'PASSWORD': settings.REDIS_PASSWORD,
'MASTER_NAME': 'mymaster',
'SOCKET_CONNECT_TIMEOUT': 0.5,
'SOCKET_TIMEOUT': 0.5,
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True
}
}
}
}
4. Token Validation Logic
4.1 Core Validation Methods
# backend/access_control/models/document_view_token.py (methods)
class DocumentViewToken(models.Model):
# ... (model definition from Section 1)
def is_valid(self, ip_address: Optional[str] = None, check_rate_limit: bool = True) -> bool:
"""
Check if token is currently valid.
Validation criteria:
1. Not revoked (revoked_at is NULL)
2. Not expired (expires_at > now OR within grace period)
3. NDA record is active
4. IP address matches whitelist (if configured)
5. Rate limit not exceeded (if check_rate_limit=True)
Args:
ip_address: Client IP address (for whitelist check)
check_rate_limit: Whether to enforce rate limiting
Returns:
bool: True if valid
"""
from django.utils import timezone
from .cache.token_cache import is_ip_whitelisted
# Check revocation
if self.revoked_at:
return False
# Check NDA status
if self.nda_record.status != 'active':
return False
# Check expiry (with grace period)
now = timezone.now()
if now > self.expires_at:
# Check grace period
from .utils.ttl import get_grace_period_expiry
grace_expiry = get_grace_period_expiry(self)
if now > grace_expiry:
return False
# Check IP whitelist
if self.ip_whitelist and ip_address:
if not is_ip_whitelisted(ip_address, self.ip_whitelist):
return False
# Check rate limit
if check_rate_limit:
if not self._check_rate_limit():
return False
return True
def is_expired(self) -> bool:
"""Check if token is past expiry (not including grace period)."""
from django.utils import timezone
return timezone.now() > self.expires_at
def is_revoked(self) -> bool:
"""Check if token has been explicitly revoked."""
return self.revoked_at is not None
def is_within_grace_period(self) -> bool:
"""Check if token is in grace period (read-only access)."""
if not self.is_expired():
return False
from .utils.ttl import is_within_grace_period
return is_within_grace_period(self)
def get_validation_status(self, ip_address: Optional[str] = None) -> Dict:
"""
Get detailed validation status for diagnostics.
Returns:
Dict with status fields:
- valid: bool
- expired: bool
- revoked: bool
- grace_period: bool
- nda_active: bool
- ip_allowed: bool
- rate_limit_ok: bool
- reason: str (if invalid)
"""
from django.utils import timezone
from .cache.token_cache import is_ip_whitelisted
status = {
'valid': True,
'expired': self.is_expired(),
'revoked': self.is_revoked(),
'grace_period': False,
'nda_active': self.nda_record.status == 'active',
'ip_allowed': True,
'rate_limit_ok': True,
'reason': None
}
# Check each validation criterion
if status['revoked']:
status['valid'] = False
status['reason'] = 'token_revoked'
elif not status['nda_active']:
status['valid'] = False
status['reason'] = 'nda_inactive'
elif status['expired']:
if self.is_within_grace_period():
status['grace_period'] = True
# Still valid but limited permissions
else:
status['valid'] = False
status['reason'] = 'token_expired'
# IP whitelist
if self.ip_whitelist and ip_address:
if not is_ip_whitelisted(ip_address, self.ip_whitelist):
status['valid'] = False
status['ip_allowed'] = False
status['reason'] = 'ip_not_whitelisted'
# Rate limit
if not self._check_rate_limit():
status['valid'] = False
status['rate_limit_ok'] = False
status['reason'] = 'rate_limit_exceeded'
return status
4.2 Validate and Refresh
# backend/access_control/models/document_view_token.py (continued)
def validate_and_refresh(self, ip_address: Optional[str] = None) -> Dict:
"""
Validate token and update usage metrics.
This is the main entry point for token validation on document access.
Flow:
1. Check Redis cache for validation
2. If cache miss, validate from database
3. Update last_accessed and access_count
4. Refresh Redis cache
5. Check if auto-renewal is needed
Args:
ip_address: Client IP address
Returns:
Dict with validation result:
{
'valid': bool,
'grace_period': bool,
'scope': dict,
'cache_hit': bool,
'renewed': bool
}
"""
from django.utils import timezone
from .cache.token_cache import validate_from_cache, warm_token_cache
# Try cache first
cache_result = validate_from_cache(str(self.token), ip_address)
if cache_result['cache_hit']:
# Cache hit - use cached validation
if cache_result['valid']:
# Update metrics asynchronously
self._update_usage_metrics_async()
# Check auto-renewal
if self._should_auto_renew():
self._auto_renew()
cache_result['renewed'] = True
return cache_result
# Cache miss - validate from database
is_valid = self.is_valid(ip_address=ip_address)
if is_valid:
# Update metrics synchronously (cache miss = DB hit anyway)
self.last_accessed = timezone.now()
self.access_count += 1
self.save(update_fields=['last_accessed', 'access_count'])
# Warm cache for next request
warm_token_cache(self)
# Check auto-renewal
renewed = False
if self._should_auto_renew():
self._auto_renew()
renewed = True
return {
'valid': True,
'grace_period': self.is_within_grace_period(),
'scope': self.get_effective_scope(),
'cache_hit': False,
'renewed': renewed
}
else:
# Invalid - get detailed reason
status = self.get_validation_status(ip_address)
return {
'valid': False,
'reason': status['reason'],
'cache_hit': False,
'renewed': False
}
def _update_usage_metrics_async(self):
"""Update last_accessed and access_count asynchronously."""
from django_q.tasks import async_task
async_task(
'access_control.tasks.update_token_usage',
str(self.token)
)
def get_effective_scope(self) -> Dict:
"""
Get effective scope considering grace period.
During grace period, permissions are restricted to read-only.
"""
scope = self.scope.copy()
if self.is_within_grace_period():
scope['permissions'] = {
'read': True,
'download': False,
'print': False,
'share': False
}
return scope
4.3 Rate Limiting
# backend/access_control/models/document_view_token.py (continued)
def _check_rate_limit(self) -> bool:
"""
Check if token has exceeded rate limit.
Uses Redis counter with 1-minute sliding window.
Default limit: 100 requests/minute (configurable per project).
Returns:
bool: True if under limit
"""
from django.core.cache import cache
from .cache.token_cache import get_rate_limit_key
from .models import ProjectTokenConfig
# Get project rate limit
config = ProjectTokenConfig.objects.filter(project_id=self.project_id).first()
if config:
max_requests = config.max_requests_per_minute
else:
from django.conf import settings
max_requests = getattr(settings, 'DEFAULT_MAX_REQUESTS_PER_MINUTE', 100)
# Redis key for rate limiting
rl_key = get_rate_limit_key(str(self.token))
# Increment counter (expires after 60 seconds)
try:
current = cache.get(rl_key)
if current is None:
cache.set(rl_key, 1, timeout=60)
return True
elif current >= max_requests:
return False
else:
cache.incr(rl_key)
return True
except Exception:
# Redis failure - allow request (fail open)
return True
def _check_concurrent_sessions(self) -> bool:
"""
Check if NDA record has too many concurrent active tokens.
Prevents token sharing by limiting concurrent sessions per NDA.
Returns:
bool: True if under limit
"""
from django.core.cache import cache
from .cache.token_cache import get_session_count_key
from .models import ProjectTokenConfig
# Get project concurrent session limit
config = ProjectTokenConfig.objects.filter(project_id=self.project_id).first()
if config:
max_sessions = config.max_concurrent_sessions
else:
from django.conf import settings
max_sessions = getattr(settings, 'DEFAULT_MAX_CONCURRENT_SESSIONS', 3)
# Count active tokens for this NDA + project
session_key = get_session_count_key(self.nda_record_id, self.project_id)
try:
active_tokens = cache.get(session_key)
if active_tokens is None:
# Compute from database and cache for 5 minutes
from django.utils import timezone
active_count = DocumentViewToken.objects.filter(
nda_record=self.nda_record,
project_id=self.project_id,
revoked_at__isnull=True,
expires_at__gt=timezone.now()
).count()
cache.set(session_key, active_count, timeout=300)
return active_count <= max_sessions
else:
return active_tokens <= max_sessions
except Exception:
# Redis failure - allow request (fail open)
return True
4.4 Auto-Renewal Logic
# backend/access_control/models/document_view_token.py (continued)
def _should_auto_renew(self) -> bool:
"""
Check if token should be auto-renewed.
Criteria:
1. Auto-renewal enabled for project
2. Token not revoked
3. Within renewal threshold
4. Under max renewal count
Returns:
bool: True if should renew
"""
from .models import ProjectTokenConfig
config = ProjectTokenConfig.objects.filter(project_id=self.project_id).first()
if not config or not config.is_renewal_eligible(self):
return False
from django.utils import timezone
days_until_expiry = (self.expires_at - timezone.now()).days
return days_until_expiry <= config.renewal_threshold_days
def _auto_renew(self):
"""
Automatically renew token expiry.
Extends expires_at by one TTL period and increments renewal count.
"""
from datetime import timedelta
from .models import ProjectTokenConfig
from .cache.token_cache import warm_token_cache
config = ProjectTokenConfig.objects.filter(project_id=self.project_id).first()
if config:
ttl_days = config.token_ttl_days
else:
from django.conf import settings
ttl_days = getattr(settings, 'DEFAULT_TOKEN_TTL_DAYS', 90)
# Extend expiry
self.expires_at += timedelta(days=ttl_days)
# Increment renewal count in scope metadata
renewal_count = self.scope.get('_renewal_count', 0)
self.scope['_renewal_count'] = renewal_count + 1
self.save(update_fields=['expires_at', 'scope'])
# Refresh cache
warm_token_cache(self)
# Log renewal
import logging
logger = logging.getLogger(__name__)
logger.info(f"Auto-renewed token {self.token} (renewal #{renewal_count + 1})")
5. Token Lifecycle Management
5.1 Token Creation
# backend/access_control/services/token_service.py
from typing import Optional, Dict
from datetime import timedelta
from django.utils import timezone
from django.db import transaction
from ..models import DocumentViewToken, NDARecord, ProjectTokenConfig
from ..cache.token_cache import warm_token_cache
from ..utils.ttl import calculate_token_expiry
def create_view_token(
nda_record: NDARecord,
project_id: str,
scope: Dict,
ip_whitelist: Optional[list] = None,
created_by=None
) -> DocumentViewToken:
"""
Create new document view token after NDA verification.
Args:
nda_record: Verified NDA record
project_id: BIO-QMS project identifier
scope: Access scope (categories, audiences, permissions)
ip_whitelist: Optional list of allowed IPs
created_by: User who granted the token
Returns:
DocumentViewToken instance
Raises:
ValueError: If NDA is not active or project config is invalid
"""
if nda_record.status != 'active':
raise ValueError(f"NDA record {nda_record.id} is not active (status: {nda_record.status})")
# Validate project configuration
config = ProjectTokenConfig.objects.filter(project_id=project_id).first()
if config and config.require_ip_whitelist and not ip_whitelist:
raise ValueError(f"Project {project_id} requires IP whitelist")
# Check concurrent session limit
if config:
active_count = DocumentViewToken.objects.filter(
nda_record=nda_record,
project_id=project_id,
revoked_at__isnull=True,
expires_at__gt=timezone.now()
).count()
if active_count >= config.max_concurrent_sessions:
raise ValueError(
f"NDA record {nda_record.id} has reached max concurrent sessions "
f"({config.max_concurrent_sessions}) for project {project_id}"
)
with transaction.atomic():
# Create token
token = DocumentViewToken.objects.create(
nda_record=nda_record,
project_id=project_id,
scope=scope,
ip_whitelist=ip_whitelist,
expires_at=calculate_token_expiry(project_id),
created_by=created_by
)
# Warm Redis cache
warm_token_cache(token)
# Log creation
import logging
logger = logging.getLogger(__name__)
logger.info(
f"Created view token {token.token} for NDA {nda_record.id}, "
f"project {project_id}, expires {token.expires_at.isoformat()}"
)
return token
def create_token_from_nda_verification(nda_verification_result: Dict) -> DocumentViewToken:
"""
Create token from NDA verification workflow result.
Called by NDA verification service after successful verification.
Args:
nda_verification_result: Dict with:
- nda_record: NDARecord instance
- project_id: str
- scope: dict (from NDA document metadata)
- user: User instance (who verified)
Returns:
DocumentViewToken instance
"""
return create_view_token(
nda_record=nda_verification_result['nda_record'],
project_id=nda_verification_result['project_id'],
scope=nda_verification_result['scope'],
created_by=nda_verification_result.get('user')
)
5.2 Token Revocation
# backend/access_control/services/token_service.py (continued)
def revoke_token(token: DocumentViewToken, reason: str = '', revoked_by=None) -> bool:
"""
Explicitly revoke a token.
Args:
token: DocumentViewToken instance
reason: Reason for revocation (logged in notes)
revoked_by: User who revoked the token
Returns:
bool: True if revoked successfully
"""
from django.utils import timezone
from ..cache.token_cache import invalidate_token_cache
if token.revoked_at:
return False # Already revoked
with transaction.atomic():
token.revoked_at = timezone.now()
if reason:
timestamp = timezone.now().isoformat()
revoked_by_info = f" by {revoked_by}" if revoked_by else ""
token.notes += f"\n[{timestamp}] Revoked{revoked_by_info}: {reason}"
token.save(update_fields=['revoked_at', 'notes'])
# Invalidate cache immediately
invalidate_token_cache(str(token.token))
# Log revocation
import logging
logger = logging.getLogger(__name__)
logger.info(f"Revoked token {token.token}: {reason}")
return True
def bulk_revoke_by_nda(nda_record: NDARecord, reason: str = '') -> int:
"""
Revoke all active tokens for an NDA record.
Used when NDA is revoked or company relationship ends.
Args:
nda_record: NDARecord instance
reason: Reason for bulk revocation
Returns:
int: Number of tokens revoked
"""
from django.utils import timezone
from ..cache.token_cache import invalidate_nda_tokens
active_tokens = DocumentViewToken.objects.filter(
nda_record=nda_record,
revoked_at__isnull=True
)
count = active_tokens.count()
if count == 0:
return 0
with transaction.atomic():
now = timezone.now()
timestamp = now.isoformat()
note = f"\n[{timestamp}] Bulk revoked: {reason}"
# Update all tokens
active_tokens.update(
revoked_at=now,
notes=models.F('notes') + note
)
# Invalidate cache for all tokens
invalidate_nda_tokens(nda_record.id)
# Log bulk revocation
import logging
logger = logging.getLogger(__name__)
logger.info(f"Bulk revoked {count} tokens for NDA {nda_record.id}: {reason}")
return count
def bulk_revoke_by_project(project_id: str, reason: str = '') -> int:
"""
Revoke all active tokens for a project.
Used when project access is suspended or project is archived.
Args:
project_id: BIO-QMS project identifier
reason: Reason for bulk revocation
Returns:
int: Number of tokens revoked
"""
from django.utils import timezone
from ..cache.token_cache import invalidate_project_tokens
active_tokens = DocumentViewToken.objects.filter(
project_id=project_id,
revoked_at__isnull=True
)
count = active_tokens.count()
if count == 0:
return 0
with transaction.atomic():
now = timezone.now()
timestamp = now.isoformat()
note = f"\n[{timestamp}] Project revoked: {reason}"
active_tokens.update(
revoked_at=now,
notes=models.F('notes') + note
)
invalidate_project_tokens(project_id)
import logging
logger = logging.getLogger(__name__)
logger.info(f"Bulk revoked {count} tokens for project {project_id}: {reason}")
return count
5.3 Token Expiration & Cleanup
# backend/access_control/management/commands/cleanup_expired_tokens.py
from django.core.management.base import BaseCommand
from django.utils import timezone
from datetime import timedelta
from access_control.models import DocumentViewToken
from access_control.utils.ttl import get_grace_period_expiry
class Command(BaseCommand):
help = 'Archive expired tokens past grace period'
def add_arguments(self, parser):
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be archived without actually archiving'
)
parser.add_argument(
'--batch-size',
type=int,
default=1000,
help='Number of tokens to process per batch'
)
def handle(self, *args, **options):
dry_run = options['dry_run']
batch_size = options['batch_size']
now = timezone.now()
# Find tokens past grace period
expired_tokens = DocumentViewToken.objects.filter(
revoked_at__isnull=True
).select_related('nda_record')
to_archive = []
for token in expired_tokens:
grace_expiry = get_grace_period_expiry(token)
if now > grace_expiry:
to_archive.append(token)
total = len(to_archive)
if dry_run:
self.stdout.write(
self.style.WARNING(f'[DRY RUN] Would archive {total} expired tokens')
)
return
# Archive in batches
archived = 0
for i in range(0, total, batch_size):
batch = to_archive[i:i+batch_size]
# Update to mark as archived (revoked_at = grace_expiry)
for token in batch:
grace_expiry = get_grace_period_expiry(token)
token.revoked_at = grace_expiry
token.notes += f"\n[{now.isoformat()}] Auto-archived after grace period"
DocumentViewToken.objects.bulk_update(
batch,
['revoked_at', 'notes'],
batch_size=batch_size
)
archived += len(batch)
self.stdout.write(f'Archived batch {i//batch_size + 1}: {len(batch)} tokens')
self.stdout.write(
self.style.SUCCESS(f'Successfully archived {archived} expired tokens')
)
# Celery task for scheduled cleanup
from celery import shared_task
@shared_task
def cleanup_expired_tokens_task():
"""
Scheduled task to clean up expired tokens.
Run daily via Celery Beat:
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-tokens': {
'task': 'access_control.tasks.cleanup_expired_tokens_task',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
}
"""
from django.core.management import call_command
call_command('cleanup_expired_tokens', '--batch-size=5000')
5.4 Token Rotation
# backend/access_control/services/token_service.py (continued)
def rotate_token(old_token: DocumentViewToken, reason: str = 'security_rotation') -> DocumentViewToken:
"""
Rotate token by creating new token and revoking old one.
Used for periodic security rotation or after suspicious activity.
Args:
old_token: Existing DocumentViewToken instance
reason: Reason for rotation
Returns:
DocumentViewToken: New token with same permissions
"""
with transaction.atomic():
# Create new token with same configuration
new_token = create_view_token(
nda_record=old_token.nda_record,
project_id=old_token.project_id,
scope=old_token.scope,
ip_whitelist=old_token.ip_whitelist,
created_by=old_token.created_by
)
# Revoke old token
revoke_token(old_token, reason=f"Rotated to {new_token.token}: {reason}")
# Log rotation
import logging
logger = logging.getLogger(__name__)
logger.info(f"Rotated token {old_token.token} -> {new_token.token}: {reason}")
return new_token
def rotate_tokens_by_age(max_age_days: int = 180) -> int:
"""
Rotate tokens older than specified age.
Used for periodic security rotation policy.
Args:
max_age_days: Maximum token age before rotation
Returns:
int: Number of tokens rotated
"""
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=max_age_days)
old_tokens = DocumentViewToken.objects.filter(
granted_at__lt=cutoff_date,
revoked_at__isnull=True
)
count = 0
for token in old_tokens:
try:
rotate_token(token, reason=f'age_rotation_{max_age_days}d')
count += 1
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to rotate token {token.token}: {e}")
return count
6. Manager and QuerySet
6.1 Custom Manager
# backend/access_control/managers.py
from django.db import models
from django.utils import timezone
from django.db.models import Q, Count, Max
class DocumentViewTokenQuerySet(models.QuerySet):
"""Custom queryset with common token filters."""
def valid(self):
"""Get currently valid tokens (not revoked, not expired)."""
now = timezone.now()
return self.filter(
revoked_at__isnull=True,
expires_at__gt=now
).select_related('nda_record')
def expired(self, include_grace_period: bool = False):
"""Get expired tokens."""
now = timezone.now()
if include_grace_period:
# Expired but possibly within grace period
return self.filter(expires_at__lt=now)
else:
# Fully expired (past grace period)
from ..utils.ttl import get_grace_period_expiry
# This requires annotation - handle in view layer
return self.filter(expires_at__lt=now)
def revoked(self):
"""Get explicitly revoked tokens."""
return self.filter(revoked_at__isnull=False)
def for_project(self, project_id: str):
"""Get tokens for specific project."""
return self.filter(project_id=project_id)
def for_nda(self, nda_record):
"""Get tokens for specific NDA record."""
return self.filter(nda_record=nda_record)
def for_company(self, company):
"""Get tokens for all NDAs from a company."""
return self.filter(nda_record__company=company)
def active_in_grace_period(self):
"""Get tokens currently in grace period."""
now = timezone.now()
# Tokens that are expired but within grace period
# Requires project config lookup - better as annotated query
return self.filter(
expires_at__lt=now,
revoked_at__isnull=True
)
def with_usage_stats(self):
"""Annotate with usage statistics."""
return self.annotate(
days_active=models.ExpressionWrapper(
models.F('expires_at') - models.F('granted_at'),
output_field=models.DurationField()
),
days_until_expiry=models.ExpressionWrapper(
models.F('expires_at') - timezone.now(),
output_field=models.DurationField()
)
)
def recently_accessed(self, days: int = 7):
"""Get tokens accessed in last N days."""
cutoff = timezone.now() - timezone.timedelta(days=days)
return self.filter(last_accessed__gte=cutoff)
def never_accessed(self):
"""Get tokens that have never been used."""
return self.filter(access_count=0, last_accessed__isnull=True)
def high_usage(self, min_count: int = 100):
"""Get tokens with high access count."""
return self.filter(access_count__gte=min_count)
class DocumentViewTokenManager(models.Manager):
"""Custom manager for DocumentViewToken."""
def get_queryset(self):
return DocumentViewTokenQuerySet(self.model, using=self._db)
def valid(self):
return self.get_queryset().valid()
def expired(self, include_grace_period=False):
return self.get_queryset().expired(include_grace_period)
def revoked(self):
return self.get_queryset().revoked()
def for_project(self, project_id):
return self.get_queryset().for_project(project_id)
def for_nda(self, nda_record):
return self.get_queryset().for_nda(nda_record)
def for_company(self, company):
return self.get_queryset().for_company(company)
def create_from_nda(self, nda_record, project_id, scope, **kwargs):
"""
Create token from NDA verification.
Convenience method that handles TTL calculation and cache warming.
"""
from ..services.token_service import create_view_token
return create_view_token(nda_record, project_id, scope, **kwargs)
def bulk_revoke_by_nda(self, nda_record, reason=''):
"""Bulk revoke all tokens for NDA."""
from ..services.token_service import bulk_revoke_by_nda
return bulk_revoke_by_nda(nda_record, reason)
def bulk_revoke_by_project(self, project_id, reason=''):
"""Bulk revoke all tokens for project."""
from ..services.token_service import bulk_revoke_by_project
return bulk_revoke_by_project(project_id, reason)
def get_usage_summary(self, project_id=None):
"""
Get usage summary statistics.
Returns:
Dict with token counts by status
"""
qs = self.get_queryset()
if project_id:
qs = qs.for_project(project_id)
now = timezone.now()
return {
'total': qs.count(),
'valid': qs.valid().count(),
'expired': qs.filter(expires_at__lt=now).count(),
'revoked': qs.revoked().count(),
'never_used': qs.never_accessed().count(),
'recent_access': qs.recently_accessed(days=7).count(),
}
7. Django Admin Interface
7.1 TokenAdmin Configuration
# backend/access_control/admin.py
from django.contrib import admin
from django.utils.html import format_html
from django.utils import timezone
from django.urls import reverse
from django.db.models import Count, Q
from .models import DocumentViewToken, ProjectTokenConfig
from .services.token_service import revoke_token, rotate_token
@admin.register(DocumentViewToken)
class DocumentViewTokenAdmin(admin.ModelAdmin):
"""Django admin for DocumentViewToken."""
list_display = [
'token_short',
'project_id',
'nda_record_link',
'status_badge',
'granted_at',
'expires_at',
'access_count',
'last_accessed',
]
list_filter = [
'project_id',
'granted_at',
'expires_at',
('revoked_at', admin.EmptyFieldListFilter),
'nda_record__status',
]
search_fields = [
'token',
'project_id',
'nda_record__id',
'nda_record__company__name',
'notes',
]
readonly_fields = [
'token',
'granted_at',
'last_accessed',
'access_count',
'status_display',
'days_until_expiry',
'usage_stats',
]
fieldsets = [
('Token Information', {
'fields': ['token', 'status_display', 'days_until_expiry']
}),
('Access Control', {
'fields': ['nda_record', 'project_id', 'scope', 'ip_whitelist']
}),
('Temporal', {
'fields': ['granted_at', 'expires_at', 'revoked_at']
}),
('Usage', {
'fields': ['access_count', 'last_accessed', 'usage_stats']
}),
('Metadata', {
'fields': ['created_by', 'notes'],
'classes': ['collapse']
}),
]
actions = [
'revoke_selected',
'rotate_selected',
'extend_expiry_30d',
'extend_expiry_90d',
]
def token_short(self, obj):
"""Display first 8 chars of token."""
return f"{str(obj.token)[:8]}..."
token_short.short_description = 'Token'
def nda_record_link(self, obj):
"""Link to NDA record."""
url = reverse('admin:access_control_ndarecord_change', args=[obj.nda_record_id])
return format_html('<a href="{}">{}</a>', url, obj.nda_record_id)
nda_record_link.short_description = 'NDA Record'
def status_badge(self, obj):
"""Visual status badge."""
if obj.revoked_at:
color = 'red'
text = 'REVOKED'
elif obj.expires_at < timezone.now():
if obj.is_within_grace_period():
color = 'orange'
text = 'GRACE PERIOD'
else:
color = 'darkred'
text = 'EXPIRED'
else:
color = 'green'
text = 'VALID'
return format_html(
'<span style="background:{}; color:white; padding:3px 8px; border-radius:3px;">{}</span>',
color, text
)
status_badge.short_description = 'Status'
def status_display(self, obj):
"""Detailed status for readonly field."""
status = obj.get_validation_status()
lines = [
f"Valid: {status['valid']}",
f"Expired: {status['expired']}",
f"Revoked: {status['revoked']}",
f"Grace Period: {status['grace_period']}",
f"NDA Active: {status['nda_active']}",
]
if status['reason']:
lines.append(f"Reason: {status['reason']}")
return '\n'.join(lines)
status_display.short_description = 'Validation Status'
def days_until_expiry(self, obj):
"""Days until expiry (negative if expired)."""
delta = obj.expires_at - timezone.now()
days = delta.days
if days < 0:
return format_html('<span style="color:red;">{} days ago</span>', abs(days))
elif days < 7:
return format_html('<span style="color:orange;">{} days</span>', days)
else:
return f"{days} days"
days_until_expiry.short_description = 'Days Until Expiry'
def usage_stats(self, obj):
"""Display usage statistics."""
if obj.access_count == 0:
return "Never used"
if obj.last_accessed:
time_since = timezone.now() - obj.last_accessed
hours = int(time_since.total_seconds() / 3600)
if hours < 1:
recency = "< 1 hour ago"
elif hours < 24:
recency = f"{hours} hours ago"
else:
recency = f"{hours // 24} days ago"
else:
recency = "Unknown"
return f"{obj.access_count} accesses, last: {recency}"
usage_stats.short_description = 'Usage Statistics'
# Admin actions
@admin.action(description='Revoke selected tokens')
def revoke_selected(self, request, queryset):
"""Revoke selected tokens."""
count = 0
for token in queryset.filter(revoked_at__isnull=True):
if revoke_token(token, reason=f'Admin revocation by {request.user}', revoked_by=request.user):
count += 1
self.message_user(request, f'Revoked {count} tokens.')
@admin.action(description='Rotate selected tokens')
def rotate_selected(self, request, queryset):
"""Rotate selected tokens (create new, revoke old)."""
count = 0
for token in queryset.filter(revoked_at__isnull=True):
try:
new_token = rotate_token(token, reason=f'Admin rotation by {request.user}')
count += 1
except Exception as e:
self.message_user(request, f'Failed to rotate {token.token}: {e}', level='error')
self.message_user(request, f'Rotated {count} tokens.')
@admin.action(description='Extend expiry by 30 days')
def extend_expiry_30d(self, request, queryset):
"""Extend token expiry by 30 days."""
from datetime import timedelta
count = queryset.filter(revoked_at__isnull=True).update(
expires_at=models.F('expires_at') + timedelta(days=30)
)
# Invalidate cache for updated tokens
from .cache.token_cache import invalidate_token_cache
for token in queryset:
invalidate_token_cache(str(token.token))
self.message_user(request, f'Extended expiry for {count} tokens by 30 days.')
@admin.action(description='Extend expiry by 90 days')
def extend_expiry_90d(self, request, queryset):
"""Extend token expiry by 90 days."""
from datetime import timedelta
count = queryset.filter(revoked_at__isnull=True).update(
expires_at=models.F('expires_at') + timedelta(days=90)
)
from .cache.token_cache import invalidate_token_cache
for token in queryset:
invalidate_token_cache(str(token.token))
self.message_user(request, f'Extended expiry for {count} tokens by 90 days.')
def get_queryset(self, request):
"""Optimize queryset with select_related."""
qs = super().get_queryset(request)
return qs.select_related('nda_record', 'nda_record__company', 'created_by')
@admin.register(ProjectTokenConfig)
class ProjectTokenConfigAdmin(admin.ModelAdmin):
"""Django admin for ProjectTokenConfig."""
list_display = [
'project_id',
'token_ttl_days',
'grace_period_days',
'auto_renewal_enabled',
'max_requests_per_minute',
'max_concurrent_sessions',
]
list_filter = [
'auto_renewal_enabled',
'require_ip_whitelist',
]
search_fields = ['project_id', 'notes']
fieldsets = [
('Project', {
'fields': ['project_id']
}),
('TTL Settings', {
'fields': ['token_ttl_days', 'grace_period_days']
}),
('Auto-Renewal', {
'fields': [
'auto_renewal_enabled',
'renewal_threshold_days',
'max_renewals'
]
}),
('Rate Limiting', {
'fields': [
'max_requests_per_minute',
'max_concurrent_sessions'
]
}),
('Access Policies', {
'fields': [
'require_ip_whitelist',
'allowed_document_categories'
]
}),
('Metadata', {
'fields': ['created_by', 'notes'],
'classes': ['collapse']
}),
]
8. Serializers and Views
8.1 DRF Serializers
# backend/access_control/serializers.py
from rest_framework import serializers
from django.utils import timezone
from .models import DocumentViewToken, ProjectTokenConfig
class TokenValidationSerializer(serializers.Serializer):
"""
Lightweight serializer for token validation (used on every request).
Minimal payload for performance.
"""
token = serializers.UUIDField()
ip_address = serializers.IPAddressField(required=False, allow_null=True)
def validate(self, data):
"""Validate that token exists and is valid."""
token_uuid = data['token']
ip_address = data.get('ip_address')
try:
token = DocumentViewToken.objects.get(token=token_uuid)
except DocumentViewToken.DoesNotExist:
raise serializers.ValidationError({'token': 'Invalid token'})
# Validate and refresh
result = token.validate_and_refresh(ip_address=ip_address)
if not result['valid']:
raise serializers.ValidationError({
'token': f"Token validation failed: {result.get('reason', 'unknown')}"
})
data['validation_result'] = result
data['token_obj'] = token
return data
class TokenGrantSerializer(serializers.Serializer):
"""
Serializer for token creation flow.
"""
nda_record_id = serializers.IntegerField()
project_id = serializers.CharField(max_length=100)
scope = serializers.JSONField()
ip_whitelist = serializers.ListField(
child=serializers.CharField(),
required=False,
allow_null=True
)
def validate_nda_record_id(self, value):
"""Validate NDA record exists and is active."""
from .models import NDARecord
try:
nda = NDARecord.objects.get(id=value)
except NDARecord.DoesNotExist:
raise serializers.ValidationError("NDA record not found")
if nda.status != 'active':
raise serializers.ValidationError(f"NDA record is not active (status: {nda.status})")
return value
def validate_project_id(self, value):
"""Validate project exists."""
# TODO: Add project registry validation
return value
def validate_scope(self, value):
"""Validate scope structure."""
required_keys = ['document_categories', 'audience_levels', 'permissions']
for key in required_keys:
if key not in value:
raise serializers.ValidationError(f"Missing required key: {key}")
# Validate permissions
valid_permissions = {'read', 'download', 'print', 'share'}
for perm in value['permissions'].keys():
if perm not in valid_permissions:
raise serializers.ValidationError(f"Invalid permission: {perm}")
return value
def create(self, validated_data):
"""Create token from validated data."""
from .services.token_service import create_view_token
from .models import NDARecord
nda = NDARecord.objects.get(id=validated_data['nda_record_id'])
return create_view_token(
nda_record=nda,
project_id=validated_data['project_id'],
scope=validated_data['scope'],
ip_whitelist=validated_data.get('ip_whitelist'),
created_by=self.context.get('request').user
)
class TokenInfoSerializer(serializers.ModelSerializer):
"""
Detailed serializer for token status and metadata.
"""
status = serializers.SerializerMethodField()
days_until_expiry = serializers.SerializerMethodField()
nda_company = serializers.CharField(source='nda_record.company.name', read_only=True)
validation_status = serializers.SerializerMethodField()
class Meta:
model = DocumentViewToken
fields = [
'token',
'project_id',
'nda_company',
'status',
'granted_at',
'expires_at',
'revoked_at',
'days_until_expiry',
'access_count',
'last_accessed',
'scope',
'ip_whitelist',
'validation_status',
]
read_only_fields = fields
def get_status(self, obj):
"""Get human-readable status."""
if obj.revoked_at:
return 'revoked'
elif obj.is_expired():
if obj.is_within_grace_period():
return 'grace_period'
else:
return 'expired'
else:
return 'valid'
def get_days_until_expiry(self, obj):
"""Days until expiry (negative if expired)."""
delta = obj.expires_at - timezone.now()
return delta.days
def get_validation_status(self, obj):
"""Detailed validation status."""
return obj.get_validation_status()
class ProjectTokenConfigSerializer(serializers.ModelSerializer):
"""Serializer for project token configuration."""
class Meta:
model = ProjectTokenConfig
fields = '__all__'
read_only_fields = ['created_at', 'updated_at']
8.2 API Views
# backend/access_control/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from .models import DocumentViewToken, ProjectTokenConfig
from .serializers import (
TokenValidationSerializer,
TokenGrantSerializer,
TokenInfoSerializer,
ProjectTokenConfigSerializer
)
from .services.token_service import revoke_token, rotate_token
class DocumentViewTokenViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoints for DocumentViewToken.
List and retrieve are read-only.
Creation handled via /grant endpoint.
"""
queryset = DocumentViewToken.objects.all().select_related('nda_record', 'created_by')
serializer_class = TokenInfoSerializer
permission_classes = [IsAuthenticated]
filterset_fields = ['project_id', 'nda_record']
search_fields = ['token', 'project_id', 'notes']
ordering_fields = ['granted_at', 'expires_at', 'access_count']
ordering = ['-granted_at']
@action(detail=False, methods=['post'])
def validate(self, request):
"""
Validate token.
POST /api/tokens/validate/
Body: {"token": "uuid", "ip_address": "1.2.3.4"}
Returns validation result.
"""
serializer = TokenValidationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
result = serializer.validated_data['validation_result']
return Response({
'valid': result['valid'],
'grace_period': result.get('grace_period', False),
'scope': result.get('scope'),
'cache_hit': result.get('cache_hit', False)
})
@action(detail=False, methods=['post'])
def grant(self, request):
"""
Create new token after NDA verification.
POST /api/tokens/grant/
Body: {
"nda_record_id": 123,
"project_id": "bio-qms-core",
"scope": {...},
"ip_whitelist": ["1.2.3.4"]
}
Returns created token.
"""
serializer = TokenGrantSerializer(data=request.data, context={'request': request})
serializer.is_valid(raise_exception=True)
token = serializer.save()
return Response(
TokenInfoSerializer(token).data,
status=status.HTTP_201_CREATED
)
@action(detail=True, methods=['post'])
def revoke(self, request, pk=None):
"""
Revoke token.
POST /api/tokens/{uuid}/revoke/
Body: {"reason": "Security incident"}
"""
token = self.get_object()
reason = request.data.get('reason', '')
if revoke_token(token, reason=reason, revoked_by=request.user):
return Response({'status': 'revoked'})
else:
return Response(
{'error': 'Token already revoked'},
status=status.HTTP_400_BAD_REQUEST
)
@action(detail=True, methods=['post'])
def rotate(self, request, pk=None):
"""
Rotate token (create new, revoke old).
POST /api/tokens/{uuid}/rotate/
Body: {"reason": "Periodic rotation"}
"""
old_token = self.get_object()
reason = request.data.get('reason', 'API rotation')
try:
new_token = rotate_token(old_token, reason=reason)
return Response(TokenInfoSerializer(new_token).data)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
@action(detail=False, methods=['get'])
def usage_summary(self, request):
"""
Get usage summary statistics.
GET /api/tokens/usage_summary/?project_id=bio-qms-core
"""
project_id = request.query_params.get('project_id')
summary = DocumentViewToken.objects.get_usage_summary(project_id=project_id)
return Response(summary)
class ProjectTokenConfigViewSet(viewsets.ModelViewSet):
"""
API endpoints for ProjectTokenConfig.
CRUD operations for project token configurations.
"""
queryset = ProjectTokenConfig.objects.all()
serializer_class = ProjectTokenConfigSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'project_id'
9. Database Design
9.1 Migration
# backend/access_control/migrations/0001_initial.py
from django.db import migrations, models
import django.db.models.deletion
import uuid
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
# Assumes NDARecord exists
('access_control', '0001_create_nda_record'),
]
operations = [
migrations.CreateModel(
name='ProjectTokenConfig',
fields=[
('project_id', models.CharField(max_length=100, primary_key=True, serialize=False)),
('token_ttl_days', models.IntegerField(default=90)),
('grace_period_days', models.IntegerField(default=7)),
('auto_renewal_enabled', models.BooleanField(default=False)),
('renewal_threshold_days', models.IntegerField(default=10)),
('max_renewals', models.IntegerField(default=3)),
('max_requests_per_minute', models.IntegerField(default=100)),
('max_concurrent_sessions', models.IntegerField(default=3)),
('require_ip_whitelist', models.BooleanField(default=False)),
('allowed_document_categories', models.JSONField(default=list)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('notes', models.TextField(blank=True)),
('created_by', models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='created_token_configs',
to='auth.user'
)),
],
options={
'db_table': 'access_control_project_token_config',
'verbose_name': 'Project Token Configuration',
'verbose_name_plural': 'Project Token Configurations',
},
),
migrations.CreateModel(
name='DocumentViewToken',
fields=[
('token', models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False
)),
('project_id', models.CharField(db_index=True, max_length=100)),
('granted_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('expires_at', models.DateTimeField(db_index=True)),
('revoked_at', models.DateTimeField(blank=True, db_index=True, null=True)),
('last_accessed', models.DateTimeField(blank=True, null=True)),
('access_count', models.IntegerField(default=0)),
('ip_whitelist', models.JSONField(blank=True, null=True)),
('scope', models.JSONField(default=dict)),
('notes', models.TextField(blank=True)),
('created_by', models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='created_tokens',
to='auth.user'
)),
('nda_record', models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
related_name='view_tokens',
to='access_control.ndarecord'
)),
],
options={
'db_table': 'access_control_document_view_token',
'verbose_name': 'Document View Token',
'verbose_name_plural': 'Document View Tokens',
'ordering': ['-granted_at'],
},
),
# Indexes
migrations.AddIndex(
model_name='documentviewtoken',
index=models.Index(fields=['nda_record', 'project_id'], name='dvt_nda_project_idx'),
),
migrations.AddIndex(
model_name='documentviewtoken',
index=models.Index(fields=['expires_at', 'revoked_at'], name='dvt_expiry_idx'),
),
migrations.AddIndex(
model_name='documentviewtoken',
index=models.Index(fields=['granted_at'], name='dvt_granted_idx'),
),
# PostgreSQL-specific hash index
migrations.RunSQL(
sql="CREATE INDEX dvt_token_hash_idx ON access_control_document_view_token USING hash (token);",
reverse_sql="DROP INDEX IF EXISTS dvt_token_hash_idx;"
),
# Check constraints
migrations.AddConstraint(
model_name='documentviewtoken',
constraint=models.CheckConstraint(
check=models.Q(
models.Q(revoked_at__gte=models.F('granted_at')),
models.Q(revoked_at__isnull=True),
_connector='OR'
),
name='dvt_revoked_after_granted'
),
),
migrations.AddConstraint(
model_name='documentviewtoken',
constraint=models.CheckConstraint(
check=models.Q(expires_at__gt=models.F('granted_at')),
name='dvt_expires_after_granted'
),
),
]
9.2 Indexes and Performance
Index Strategy:
- Primary Lookup (
tokenUUID): Hash index for O(1) lookup - NDA + Project Lookup: Composite B-tree for filtering by NDA and project
- Expiry Queries: Composite B-tree on
expires_at+revoked_atfor cleanup - Time-Series: B-tree on
granted_atfor archival queries
Query Patterns:
-- Token validation (most frequent)
SELECT * FROM access_control_document_view_token WHERE token = $1;
-- Uses: dvt_token_hash_idx (hash index)
-- Active tokens for NDA + project
SELECT * FROM access_control_document_view_token
WHERE nda_record_id = $1 AND project_id = $2 AND revoked_at IS NULL;
-- Uses: dvt_nda_project_idx
-- Expired tokens cleanup
SELECT * FROM access_control_document_view_token
WHERE expires_at < $1 AND revoked_at IS NULL;
-- Uses: dvt_expiry_idx
-- Archival queries
SELECT * FROM access_control_document_view_token
WHERE granted_at < $1;
-- Uses: dvt_granted_idx
9.3 Partitioning Strategy
For high-volume deployments (>10M tokens), partition by granted_at:
-- Convert to partitioned table
CREATE TABLE access_control_document_view_token_new (
LIKE access_control_document_view_token INCLUDING ALL
) PARTITION BY RANGE (granted_at);
-- Create monthly partitions
CREATE TABLE access_control_document_view_token_2026_01
PARTITION OF access_control_document_view_token_new
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE access_control_document_view_token_2026_02
PARTITION OF access_control_document_view_token_new
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- Migrate data
INSERT INTO access_control_document_view_token_new
SELECT * FROM access_control_document_view_token;
-- Swap tables
BEGIN;
ALTER TABLE access_control_document_view_token RENAME TO access_control_document_view_token_old;
ALTER TABLE access_control_document_view_token_new RENAME TO access_control_document_view_token;
COMMIT;
-- Auto-create partitions with pg_partman extension
SELECT create_parent(
'public.access_control_document_view_token',
'granted_at',
'native',
'monthly'
);
9.4 Archival Policy
# backend/access_control/management/commands/archive_old_tokens.py
from django.core.management.base import BaseCommand
from django.utils import timezone
from datetime import timedelta
class Command(BaseCommand):
help = 'Archive tokens older than retention period to cold storage'
def add_arguments(self, parser):
parser.add_argument(
'--retention-days',
type=int,
default=365,
help='Archive tokens older than this many days'
)
def handle(self, *args, **options):
cutoff = timezone.now() - timedelta(days=options['retention_days'])
old_tokens = DocumentViewToken.objects.filter(
granted_at__lt=cutoff
)
# Export to S3/GCS before deletion
from .utils.archive import export_tokens_to_cold_storage
export_tokens_to_cold_storage(old_tokens)
# Delete from active table
count = old_tokens.delete()[0]
self.stdout.write(
self.style.SUCCESS(f'Archived {count} tokens older than {options["retention_days"]} days')
)
10. Security Considerations
10.1 Token Entropy Analysis
UUID v4 Entropy:
- 128 bits total
- 6 bits reserved for version/variant
- 122 bits effective entropy
- Collision probability: ~2^-61 for 1 billion tokens
Brute Force Resistance:
- At 1M attempts/second: 10^20 years to exhaustion
- At rate limit (100 req/min): effectively infinite
Recommendation: UUID v4 provides sufficient entropy for document access tokens.
10.2 Token Transmission Security
# backend/access_control/middleware.py
class SecureTokenMiddleware:
"""
Middleware to enforce secure token transmission.
Sets security headers for token cookies.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# If setting token cookie, add security flags
if 'dvt' in response.cookies:
response.cookies['dvt']['httponly'] = True
response.cookies['dvt']['secure'] = True # HTTPS only
response.cookies['dvt']['samesite'] = 'Strict' # CSRF protection
response.cookies['dvt']['max_age'] = 3600 # 1 hour
return response
# View that sets token cookie
from django.http import HttpResponse
def set_token_cookie(request, token):
"""
Set DocumentViewToken in httpOnly cookie.
Cookie flags:
- httpOnly: JavaScript cannot access (XSS protection)
- Secure: HTTPS only
- SameSite=Strict: CSRF protection
- Max-Age: 1 hour (short-lived)
"""
response = HttpResponse()
response.set_cookie(
key='dvt',
value=str(token.token),
httponly=True,
secure=True,
samesite='Strict',
max_age=3600
)
return response
10.3 Brute Force Protection
# backend/access_control/middleware.py (continued)
from django.core.cache import cache
from django.http import HttpResponseForbidden
class TokenBruteForceProtection:
"""
Middleware to detect and block token brute force attempts.
Tracks failed validation attempts per IP address.
Blocks IP after threshold (10 failures in 5 minutes).
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
ip = self.get_client_ip(request)
# Check if IP is blocked
if self.is_blocked(ip):
return HttpResponseForbidden('Rate limit exceeded. Try again later.')
response = self.get_response(request)
# Track failed validation attempts
if hasattr(request, '_token_validation_failed'):
self.record_failure(ip)
return response
def get_client_ip(self, request):
"""Get client IP address from request."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0]
return request.META.get('REMOTE_ADDR')
def is_blocked(self, ip):
"""Check if IP is currently blocked."""
block_key = f'bf_block:{ip}'
return cache.get(block_key) is not None
def record_failure(self, ip):
"""Record failed validation attempt."""
fail_key = f'bf_fail:{ip}'
failures = cache.get(fail_key, 0)
failures += 1
# Store for 5 minutes
cache.set(fail_key, failures, timeout=300)
# Block if threshold exceeded
if failures >= 10:
block_key = f'bf_block:{ip}'
cache.set(block_key, True, timeout=3600) # Block for 1 hour
import logging
logger = logging.getLogger(__name__)
logger.warning(f'Blocked IP {ip} for brute force attempts (failures: {failures})')
10.4 Token Binding (Optional)
# backend/access_control/models/document_view_token.py (optional feature)
class DocumentViewToken(models.Model):
# ... (existing fields)
# Optional token binding fields
bound_user_agent = models.CharField(
max_length=500,
blank=True,
help_text="User-Agent string token is bound to"
)
bound_ip_address = models.GenericIPAddressField(
null=True,
blank=True,
help_text="IP address token is bound to"
)
def is_binding_valid(self, request):
"""
Check if request matches token binding.
Token binding prevents token theft by tying token to specific context.
Args:
request: Django request object
Returns:
bool: True if binding matches or no binding set
"""
# Check User-Agent binding
if self.bound_user_agent:
request_ua = request.META.get('HTTP_USER_AGENT', '')
if request_ua != self.bound_user_agent:
return False
# Check IP binding
if self.bound_ip_address:
from ..middleware import TokenBruteForceProtection
request_ip = TokenBruteForceProtection().get_client_ip(request)
if request_ip != self.bound_ip_address:
return False
return True
# Enable token binding in create_view_token
def create_view_token(
nda_record,
project_id,
scope,
bind_to_request=None, # New parameter
**kwargs
):
"""Create token with optional binding."""
binding_fields = {}
if bind_to_request:
binding_fields['bound_user_agent'] = bind_to_request.META.get('HTTP_USER_AGENT', '')
from ..middleware import TokenBruteForceProtection
binding_fields['bound_ip_address'] = TokenBruteForceProtection().get_client_ip(bind_to_request)
token = DocumentViewToken.objects.create(
nda_record=nda_record,
project_id=project_id,
scope=scope,
**binding_fields,
**kwargs
)
return token
10.5 Audit Logging
# backend/access_control/models/token_access_log.py
from django.db import models
class TokenAccessLog(models.Model):
"""
Audit log for token access attempts.
Separate table for high-volume writes without affecting token table.
"""
token = models.UUIDField(db_index=True)
timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
ip_address = models.GenericIPAddressField()
user_agent = models.CharField(max_length=500)
result = models.CharField(
max_length=20,
choices=[
('success', 'Success'),
('expired', 'Expired'),
('revoked', 'Revoked'),
('invalid_ip', 'Invalid IP'),
('rate_limited', 'Rate Limited'),
]
)
document_id = models.CharField(max_length=100, blank=True)
class Meta:
db_table = 'access_control_token_access_log'
indexes = [
models.Index(fields=['token', 'timestamp']),
models.Index(fields=['timestamp']),
models.Index(fields=['ip_address', 'timestamp']),
]
# Partition by timestamp for time-series data
@classmethod
def log_access(cls, token, request, result, document_id=''):
"""Log token access attempt."""
from ..middleware import TokenBruteForceProtection
cls.objects.create(
token=token.token if hasattr(token, 'token') else token,
ip_address=TokenBruteForceProtection().get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
result=result,
document_id=document_id
)
11. Testing Strategy
11.1 Unit Tests
# backend/access_control/tests/test_document_view_token.py
from django.test import TestCase
from django.utils import timezone
from datetime import timedelta
from ..models import DocumentViewToken, NDARecord, ProjectTokenConfig
from ..services.token_service import create_view_token, revoke_token
class DocumentViewTokenTestCase(TestCase):
def setUp(self):
"""Set up test fixtures."""
# Create test NDA record
self.nda = NDARecord.objects.create(
company_id=1,
status='active'
)
# Create test project config
self.config = ProjectTokenConfig.objects.create(
project_id='test-project',
token_ttl_days=90,
grace_period_days=7
)
# Create test token
self.token = create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={
'document_categories': ['regulatory'],
'audience_levels': ['internal'],
'permissions': {'read': True, 'download': True}
}
)
def test_token_creation(self):
"""Test token is created with correct attributes."""
self.assertIsNotNone(self.token.token)
self.assertEqual(self.token.project_id, 'test-project')
self.assertEqual(self.token.nda_record, self.nda)
self.assertIsNone(self.token.revoked_at)
self.assertEqual(self.token.access_count, 0)
def test_token_expiry_calculation(self):
"""Test expiry date is correctly calculated from TTL."""
expected_expiry = self.token.granted_at + timedelta(days=90)
# Allow 1 second tolerance for test execution time
self.assertAlmostEqual(
self.token.expires_at.timestamp(),
expected_expiry.timestamp(),
delta=1
)
def test_is_valid_fresh_token(self):
"""Test is_valid returns True for fresh token."""
self.assertTrue(self.token.is_valid())
def test_is_valid_revoked_token(self):
"""Test is_valid returns False for revoked token."""
revoke_token(self.token, reason='test')
self.assertFalse(self.token.is_valid())
def test_is_valid_expired_token(self):
"""Test is_valid returns False for expired token past grace period."""
# Set expiry to 8 days ago (past grace period)
self.token.expires_at = timezone.now() - timedelta(days=8)
self.token.save()
self.assertFalse(self.token.is_valid())
def test_is_within_grace_period(self):
"""Test grace period detection."""
# Set expiry to 3 days ago (within 7-day grace period)
self.token.expires_at = timezone.now() - timedelta(days=3)
self.token.save()
self.assertTrue(self.token.is_within_grace_period())
# Token should still be valid during grace period
self.assertTrue(self.token.is_valid())
def test_ip_whitelist_validation(self):
"""Test IP whitelist enforcement."""
self.token.ip_whitelist = ['192.168.1.100', '10.0.0.0/8']
self.token.save()
# Valid IPs
self.assertTrue(self.token.is_valid(ip_address='192.168.1.100'))
self.assertTrue(self.token.is_valid(ip_address='10.0.50.100'))
# Invalid IP
self.assertFalse(self.token.is_valid(ip_address='1.2.3.4'))
def test_validate_and_refresh_updates_metrics(self):
"""Test validate_and_refresh updates access metrics."""
initial_count = self.token.access_count
result = self.token.validate_and_refresh()
self.assertTrue(result['valid'])
self.token.refresh_from_db()
self.assertEqual(self.token.access_count, initial_count + 1)
self.assertIsNotNone(self.token.last_accessed)
def test_grace_period_scope_restriction(self):
"""Test permissions are restricted during grace period."""
# Expire token to trigger grace period
self.token.expires_at = timezone.now() - timedelta(days=3)
self.token.save()
effective_scope = self.token.get_effective_scope()
# Permissions should be read-only
self.assertTrue(effective_scope['permissions']['read'])
self.assertFalse(effective_scope['permissions']['download'])
def test_revocation(self):
"""Test token revocation."""
self.assertTrue(revoke_token(self.token, reason='Test revocation'))
self.token.refresh_from_db()
self.assertIsNotNone(self.token.revoked_at)
self.assertIn('Test revocation', self.token.notes)
def test_bulk_revoke_by_nda(self):
"""Test bulk revocation by NDA record."""
from ..services.token_service import bulk_revoke_by_nda
# Create multiple tokens
for _ in range(5):
create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={}
)
count = bulk_revoke_by_nda(self.nda, reason='Test bulk revoke')
# 6 tokens total (1 from setUp + 5 new)
self.assertEqual(count, 6)
11.2 Integration Tests
# backend/access_control/tests/test_token_views.py
from rest_framework.test import APITestCase
from rest_framework import status
from django.contrib.auth import get_user_model
from ..models import NDARecord, DocumentViewToken
User = get_user_model()
class TokenAPITestCase(APITestCase):
def setUp(self):
"""Set up test client and fixtures."""
self.user = User.objects.create_user(
username='testuser',
password='testpass'
)
self.client.force_authenticate(user=self.user)
self.nda = NDARecord.objects.create(
company_id=1,
status='active'
)
def test_grant_token(self):
"""Test POST /api/tokens/grant/ creates token."""
data = {
'nda_record_id': self.nda.id,
'project_id': 'test-project',
'scope': {
'document_categories': ['regulatory'],
'audience_levels': ['internal'],
'permissions': {'read': True, 'download': True}
}
}
response = self.client.post('/api/tokens/grant/', data, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertIn('token', response.data)
# Verify token was created
token_uuid = response.data['token']
token = DocumentViewToken.objects.get(token=token_uuid)
self.assertEqual(token.project_id, 'test-project')
def test_validate_token(self):
"""Test POST /api/tokens/validate/ validates token."""
from ..services.token_service import create_view_token
token = create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={}
)
data = {
'token': str(token.token),
'ip_address': '192.168.1.100'
}
response = self.client.post('/api/tokens/validate/', data, format='json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(response.data['valid'])
def test_revoke_token(self):
"""Test POST /api/tokens/{uuid}/revoke/ revokes token."""
from ..services.token_service import create_view_token
token = create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={}
)
response = self.client.post(
f'/api/tokens/{token.token}/revoke/',
{'reason': 'Test revocation'},
format='json'
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
token.refresh_from_db()
self.assertIsNotNone(token.revoked_at)
11.3 Cache Tests
# backend/access_control/tests/test_token_cache.py
from django.test import TestCase
from django.core.cache import cache
from ..models import DocumentViewToken, NDARecord
from ..cache.token_cache import (
warm_token_cache,
get_cached_token,
validate_from_cache,
invalidate_token_cache
)
from ..services.token_service import create_view_token
class TokenCacheTestCase(TestCase):
def setUp(self):
"""Set up test fixtures."""
cache.clear()
self.nda = NDARecord.objects.create(
company_id=1,
status='active'
)
self.token = create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={}
)
def test_warm_token_cache(self):
"""Test token cache warming."""
result = warm_token_cache(self.token)
self.assertTrue(result)
# Verify cache entry exists
cached = get_cached_token(str(self.token.token))
self.assertIsNotNone(cached)
self.assertEqual(cached['project_id'], 'test-project')
def test_validate_from_cache_hit(self):
"""Test cache validation with cache hit."""
warm_token_cache(self.token)
result = validate_from_cache(str(self.token.token))
self.assertTrue(result['cache_hit'])
self.assertTrue(result['valid'])
def test_validate_from_cache_miss(self):
"""Test cache validation with cache miss."""
# Don't warm cache
result = validate_from_cache(str(self.token.token))
self.assertFalse(result['cache_hit'])
self.assertEqual(result['reason'], 'cache_miss')
def test_invalidate_token_cache(self):
"""Test cache invalidation."""
warm_token_cache(self.token)
# Verify cached
self.assertIsNotNone(get_cached_token(str(self.token.token)))
# Invalidate
invalidate_token_cache(str(self.token.token))
# Verify removed
self.assertIsNone(get_cached_token(str(self.token.token)))
12. Deployment Configuration
12.1 Django Settings
# backend/config/settings.py
# Token configuration
DEFAULT_TOKEN_TTL_DAYS = 90
DEFAULT_GRACE_PERIOD_DAYS = 7
DEFAULT_AUTO_RENEWAL = False
DEFAULT_MAX_REQUESTS_PER_MINUTE = 100
DEFAULT_MAX_CONCURRENT_SESSIONS = 3
# Redis configuration
REDIS_SENTINELS = [
('redis-sentinel-1.internal', 26379),
('redis-sentinel-2.internal', 26379),
('redis-sentinel-3.internal', 26379),
]
REDIS_SENTINEL_PASSWORD = env('REDIS_SENTINEL_PASSWORD')
REDIS_PASSWORD = env('REDIS_PASSWORD')
REDIS_TOKEN_DB = 1 # Dedicated DB for tokens
# Cache configuration
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
f'redis://{host}:{port}/0'
for host, port in REDIS_SENTINELS
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.SentinelClient',
'SENTINEL_KWARGS': {
'password': REDIS_SENTINEL_PASSWORD
},
'PASSWORD': REDIS_PASSWORD,
'MASTER_NAME': 'mymaster',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50
}
}
}
}
# Middleware
MIDDLEWARE = [
# ... other middleware
'access_control.middleware.SecureTokenMiddleware',
'access_control.middleware.TokenBruteForceProtection',
]
# Celery Beat schedule
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-tokens': {
'task': 'access_control.tasks.cleanup_expired_tokens_task',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
}
12.2 Kubernetes Configuration
# k8s/deployments/django-backend.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: django-backend
spec:
replicas: 3
template:
spec:
containers:
- name: django
image: django-backend:latest
env:
- name: DEFAULT_TOKEN_TTL_DAYS
value: "90"
- name: REDIS_SENTINEL_PASSWORD
valueFrom:
secretKeyRef:
name: redis-credentials
key: sentinel-password
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-credentials
key: password
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
12.3 Monitoring and Alerts
# backend/access_control/monitoring.py
from prometheus_client import Counter, Histogram, Gauge
# Metrics
token_validations = Counter(
'token_validations_total',
'Total token validation attempts',
['project_id', 'result']
)
token_validation_duration = Histogram(
'token_validation_duration_seconds',
'Token validation duration',
['cache_hit']
)
active_tokens = Gauge(
'active_tokens',
'Number of active tokens',
['project_id']
)
# Usage
def record_validation_metrics(project_id, result, duration, cache_hit):
"""Record token validation metrics."""
token_validations.labels(
project_id=project_id,
result=result
).inc()
token_validation_duration.labels(
cache_hit=cache_hit
).observe(duration)
Appendix A: Configuration Examples
Example 1: High-Security Project
ProjectTokenConfig.objects.create(
project_id='clinical-trials-sensitive',
token_ttl_days=30, # Short TTL
grace_period_days=0, # No grace period
auto_renewal_enabled=False,
max_requests_per_minute=50, # Strict rate limit
max_concurrent_sessions=1, # Single session only
require_ip_whitelist=True, # Mandatory IP restriction
allowed_document_categories=['clinical-trials']
)
Example 2: Partner Collaboration Project
ProjectTokenConfig.objects.create(
project_id='external-partner-collab',
token_ttl_days=180, # Long TTL
grace_period_days=14, # Extended grace
auto_renewal_enabled=True,
renewal_threshold_days=30,
max_renewals=5,
max_requests_per_minute=200, # Generous limit
max_concurrent_sessions=10, # Multiple users
require_ip_whitelist=False,
allowed_document_categories=[
'regulatory-documents',
'quality-records',
'training-materials'
]
)
Appendix B: Migration Checklist
Pre-Deployment:
- Run migrations:
python manage.py migrate access_control - Create default ProjectTokenConfig for existing projects
- Verify Redis Sentinel connectivity
- Test cache warming on staging environment
Deployment:
- Deploy Django backend with new models
- Deploy Celery workers for background tasks
- Configure Celery Beat schedule for cleanup
- Enable monitoring dashboards
Post-Deployment:
- Verify token creation flow works end-to-end
- Test cache hit rate (target: >95%)
- Monitor validation latency (target: <2ms cache hit)
- Run archival command on old test data
Document Revision History
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0.0 | 2026-02-16 | Claude (Opus 4.6) | Initial comprehensive design document |
End of Document