Skip to main content

DocumentViewToken Django Model Design

Executive Summary

The DocumentViewToken model provides secure, time-limited access to BIO-QMS platform documents after NDA verification. It implements a two-tier validation system: Redis caching for high-performance token validation (sub-millisecond) and PostgreSQL for authoritative record storage with comprehensive audit trails.

Key Capabilities:

  • Token-based access control with UUID v4 entropy (122 bits)
  • Per-project TTL configuration (default 90 days, configurable)
  • Redis caching layer with 5-minute TTL for validation lookups
  • Grace period access (7 days read-only after expiry)
  • IP whitelisting and scope-based access control
  • Comprehensive audit logging (access count, last accessed, revocation tracking)

Performance Targets:

  • Token validation: <2ms (Redis cache hit)
  • Token creation: <50ms
  • Bulk revocation: <100ms per 1000 tokens
  • Concurrent token validations: 10,000 req/s

Table of Contents

  1. Model Definition
  2. TTL Configuration System
  3. Redis Caching Layer
  4. Token Validation Logic
  5. Token Lifecycle Management
  6. Manager and QuerySet
  7. Django Admin Interface
  8. Serializers and Views
  9. Database Design
  10. Security Considerations
  11. Testing Strategy
  12. Deployment Configuration

1. Model Definition

1.1 Core Model

# backend/access_control/models/document_view_token.py

import uuid
from datetime import timedelta
from typing import Optional, Dict, List

from django.db import models
from django.utils import timezone
from django.core.exceptions import ValidationError
from django.contrib.postgres.indexes import HashIndex

from ..managers import DocumentViewTokenManager


class DocumentViewToken(models.Model):
"""
Time-limited token for accessing BIO-QMS documents after NDA verification.

Tokens are created after successful NDA signing and grant access to specific
document categories within a project. Validation is cached in Redis for
performance, with PostgreSQL as the authoritative source.

Lifecycle:
1. Created after NDA verification (granted_at)
2. Validated on each document access (last_accessed, access_count)
3. Optionally refreshed when nearing expiry
4. Revoked explicitly or expired automatically (revoked_at, expires_at)
5. Archived after grace period (7 days post-expiry)

Access Control:
- project_id: which BIO-QMS project this token grants access to
- scope: JSON field specifying document categories/audiences
- ip_whitelist: optional list of allowed IP addresses

Caching:
- Redis key: dvt:{token}
- Cache TTL: 5 minutes
- Invalidated immediately on revocation
"""

# Primary Key and Core Relations
token = models.UUIDField(
primary_key=True,
default=uuid.uuid4,
editable=False,
help_text="UUID v4 token (122 bits entropy)"
)

nda_record = models.ForeignKey(
'access_control.NDARecord',
on_delete=models.PROTECT,
related_name='view_tokens',
db_index=True,
help_text="NDA record that authorized this token"
)

project_id = models.CharField(
max_length=100,
db_index=True,
help_text="BIO-QMS project identifier (e.g., 'bio-qms-core', 'clinical-trials-mgmt')"
)

# Temporal Fields
granted_at = models.DateTimeField(
auto_now_add=True,
db_index=True,
help_text="When token was created/granted"
)

expires_at = models.DateTimeField(
db_index=True,
help_text="Token expiry timestamp (computed from project TTL)"
)

revoked_at = models.DateTimeField(
null=True,
blank=True,
db_index=True,
help_text="When token was explicitly revoked (NULL = not revoked)"
)

last_accessed = models.DateTimeField(
null=True,
blank=True,
help_text="Most recent token validation timestamp"
)

# Usage Tracking
access_count = models.IntegerField(
default=0,
help_text="Number of times token has been validated"
)

# Access Control
ip_whitelist = models.JSONField(
null=True,
blank=True,
help_text="Optional list of allowed IP addresses (CIDR notation supported)"
)

scope = models.JSONField(
default=dict,
help_text="Access scope: document categories, audiences, permissions"
)

# Metadata
created_by = models.ForeignKey(
'auth.User',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='created_tokens',
help_text="User who granted this token (system user if automated)"
)

notes = models.TextField(
blank=True,
help_text="Admin notes (e.g., reason for manual revocation)"
)

# Custom manager
objects = DocumentViewTokenManager()

class Meta:
db_table = 'access_control_document_view_token'
verbose_name = 'Document View Token'
verbose_name_plural = 'Document View Tokens'
ordering = ['-granted_at']

indexes = [
# Primary lookup patterns
models.Index(fields=['nda_record', 'project_id'], name='dvt_nda_project_idx'),
models.Index(fields=['expires_at', 'revoked_at'], name='dvt_expiry_idx'),

# Cleanup/archival queries
models.Index(fields=['granted_at'], name='dvt_granted_idx'),

# Hash index for UUID lookups (PostgreSQL-specific)
HashIndex(fields=['token'], name='dvt_token_hash_idx'),
]

constraints = [
# Token cannot be revoked before it was granted
models.CheckConstraint(
check=models.Q(revoked_at__gte=models.F('granted_at')) | models.Q(revoked_at__isnull=True),
name='dvt_revoked_after_granted'
),

# Token must expire after it was granted
models.CheckConstraint(
check=models.Q(expires_at__gt=models.F('granted_at')),
name='dvt_expires_after_granted'
),
]

def __str__(self) -> str:
return f"Token {self.token} (Project: {self.project_id}, NDA: {self.nda_record_id})"

def __repr__(self) -> str:
return (
f"<DocumentViewToken token={self.token} "
f"project={self.project_id} "
f"expires={self.expires_at.isoformat()} "
f"revoked={self.revoked_at is not None}>"
)

# Validation methods in Section 4
# Lifecycle methods in Section 5

1.2 Scope Field Schema

The scope JSON field defines what documents the token grants access to:

# Example scope structure
{
"document_categories": [
"regulatory-documents",
"clinical-trials",
"quality-records"
],
"audience_levels": [
"internal",
"external-partner"
],
"permissions": {
"read": True,
"download": True,
"print": False,
"share": False
},
"document_ids": [
# Optional: explicit document whitelist
"doc-uuid-1",
"doc-uuid-2"
],
"excluded_categories": [
# Optional: categories to exclude
"financial-records"
]
}

1.3 IP Whitelist Field Schema

The ip_whitelist JSON field supports CIDR notation:

# Example IP whitelist
[
"192.168.1.100", # Single IP
"10.0.0.0/8", # CIDR block
"2001:db8::/32" # IPv6 CIDR
]

# NULL = no IP restrictions (default)

2. TTL Configuration System

2.1 ProjectTokenConfig Model

Per-project TTL overrides and access policies:

# backend/access_control/models/project_token_config.py

from django.db import models
from django.core.validators import MinValueValidator


class ProjectTokenConfig(models.Model):
"""
Per-project configuration for DocumentViewToken TTL and access policies.

If no config exists for a project, defaults from settings are used:
- DEFAULT_TOKEN_TTL_DAYS = 90
- DEFAULT_GRACE_PERIOD_DAYS = 7
- DEFAULT_AUTO_RENEWAL = False
"""

project_id = models.CharField(
max_length=100,
unique=True,
primary_key=True,
help_text="BIO-QMS project identifier"
)

# TTL Settings
token_ttl_days = models.IntegerField(
validators=[MinValueValidator(1)],
default=90,
help_text="Token validity period in days (default: 90)"
)

grace_period_days = models.IntegerField(
validators=[MinValueValidator(0)],
default=7,
help_text="Read-only access period after expiry (default: 7)"
)

# Auto-Renewal
auto_renewal_enabled = models.BooleanField(
default=False,
help_text="Automatically renew tokens when within renewal_threshold_days"
)

renewal_threshold_days = models.IntegerField(
validators=[MinValueValidator(1)],
default=10,
help_text="Renew when this many days before expiry (if auto_renewal enabled)"
)

max_renewals = models.IntegerField(
validators=[MinValueValidator(0)],
default=3,
help_text="Maximum number of auto-renewals (0 = unlimited)"
)

# Rate Limiting
max_requests_per_minute = models.IntegerField(
validators=[MinValueValidator(1)],
default=100,
help_text="Max token validation requests per minute"
)

max_concurrent_sessions = models.IntegerField(
validators=[MinValueValidator(1)],
default=3,
help_text="Max concurrent active tokens per NDA record"
)

# Access Policies
require_ip_whitelist = models.BooleanField(
default=False,
help_text="Whether IP whitelisting is mandatory for this project"
)

allowed_document_categories = models.JSONField(
default=list,
help_text="List of document categories available for this project"
)

# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(
'auth.User',
on_delete=models.SET_NULL,
null=True,
related_name='created_token_configs'
)

notes = models.TextField(blank=True)

class Meta:
db_table = 'access_control_project_token_config'
verbose_name = 'Project Token Configuration'
verbose_name_plural = 'Project Token Configurations'

def __str__(self) -> str:
return f"TokenConfig: {self.project_id} (TTL: {self.token_ttl_days}d)"

def get_expiry_date(self, from_datetime=None):
"""Calculate expiry date based on TTL."""
from datetime import timedelta
from django.utils import timezone

base = from_datetime or timezone.now()
return base + timedelta(days=self.token_ttl_days)

def get_grace_expiry_date(self, from_datetime=None):
"""Calculate final expiry including grace period."""
expiry = self.get_expiry_date(from_datetime)
return expiry + timedelta(days=self.grace_period_days)

def is_renewal_eligible(self, token):
"""Check if token is eligible for auto-renewal."""
if not self.auto_renewal_enabled:
return False

if token.revoked_at:
return False

# Check renewal count (stored in token metadata)
renewal_count = token.scope.get('_renewal_count', 0)
if self.max_renewals > 0 and renewal_count >= self.max_renewals:
return False

# Check if within renewal threshold
from django.utils import timezone
days_until_expiry = (token.expires_at - timezone.now()).days
return days_until_expiry <= self.renewal_threshold_days


# Helper function for getting config
def get_project_token_config(project_id: str) -> ProjectTokenConfig:
"""
Get or create token config for project with defaults from settings.
"""
from django.conf import settings

config, created = ProjectTokenConfig.objects.get_or_create(
project_id=project_id,
defaults={
'token_ttl_days': getattr(settings, 'DEFAULT_TOKEN_TTL_DAYS', 90),
'grace_period_days': getattr(settings, 'DEFAULT_GRACE_PERIOD_DAYS', 7),
'auto_renewal_enabled': getattr(settings, 'DEFAULT_AUTO_RENEWAL', False),
}
)
return config

2.2 TTL Calculation Logic

# backend/access_control/utils/ttl.py

from datetime import timedelta
from django.utils import timezone
from ..models import ProjectTokenConfig


def calculate_token_expiry(project_id: str, granted_at=None):
"""
Calculate token expiry based on project configuration.

Args:
project_id: BIO-QMS project identifier
granted_at: Base datetime (default: now)

Returns:
datetime: Token expiry timestamp
"""
config = ProjectTokenConfig.objects.filter(project_id=project_id).first()

if config:
ttl_days = config.token_ttl_days
else:
from django.conf import settings
ttl_days = getattr(settings, 'DEFAULT_TOKEN_TTL_DAYS', 90)

base = granted_at or timezone.now()
return base + timedelta(days=ttl_days)


def get_grace_period_expiry(token):
"""
Calculate final expiry including grace period.

Args:
token: DocumentViewToken instance

Returns:
datetime: Grace period expiry timestamp
"""
config = ProjectTokenConfig.objects.filter(project_id=token.project_id).first()

if config:
grace_days = config.grace_period_days
else:
from django.conf import settings
grace_days = getattr(settings, 'DEFAULT_GRACE_PERIOD_DAYS', 7)

return token.expires_at + timedelta(days=grace_days)


def is_within_grace_period(token) -> bool:
"""
Check if token is in grace period (read-only access allowed).
"""
if not token.is_expired():
return False # Not expired yet

now = timezone.now()
grace_expiry = get_grace_period_expiry(token)
return now <= grace_expiry

3. Redis Caching Layer

3.1 Cache Key Format

# backend/access_control/cache/token_cache.py

import json
import hashlib
from typing import Optional, Dict
from datetime import timedelta

from django.core.cache import cache
from django.conf import settings
from django.utils import timezone


# Cache key prefix
TOKEN_CACHE_PREFIX = "dvt" # DocumentViewToken
TOKEN_CACHE_TTL = 300 # 5 minutes


def get_cache_key(token: str) -> str:
"""
Generate Redis cache key for token.

Format: dvt:{token_uuid}
Example: dvt:550e8400-e29b-41d4-a716-446655440000
"""
return f"{TOKEN_CACHE_PREFIX}:{token}"


def get_rate_limit_key(token: str) -> str:
"""
Generate Redis cache key for rate limiting.

Format: dvt:rl:{token_uuid}
"""
return f"{TOKEN_CACHE_PREFIX}:rl:{token}"


def get_session_count_key(nda_record_id: int, project_id: str) -> str:
"""
Generate Redis cache key for concurrent session tracking.

Format: dvt:sessions:{nda_record_id}:{project_id}
"""
return f"{TOKEN_CACHE_PREFIX}:sessions:{nda_record_id}:{project_id}"

3.2 Cache Warming

# backend/access_control/cache/token_cache.py (continued)

def warm_token_cache(token_obj) -> bool:
"""
Populate Redis cache with token validation data.

Called on token creation and after database updates.

Args:
token_obj: DocumentViewToken instance

Returns:
bool: True if cached successfully
"""
cache_data = serialize_token_for_cache(token_obj)
cache_key = get_cache_key(str(token_obj.token))

try:
cache.set(cache_key, cache_data, timeout=TOKEN_CACHE_TTL)
return True
except Exception as e:
# Log but don't fail - database is authoritative
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Failed to warm cache for token {token_obj.token}: {e}")
return False


def serialize_token_for_cache(token_obj) -> Dict:
"""
Convert DocumentViewToken to cache-friendly dict.

Only includes fields needed for validation (minimal payload).
"""
return {
'token': str(token_obj.token),
'nda_record_id': token_obj.nda_record_id,
'project_id': token_obj.project_id,
'expires_at': token_obj.expires_at.isoformat(),
'revoked_at': token_obj.revoked_at.isoformat() if token_obj.revoked_at else None,
'ip_whitelist': token_obj.ip_whitelist,
'scope': token_obj.scope,
'nda_status': token_obj.nda_record.status, # Denormalized for performance
'grace_period_days': get_grace_period_days(token_obj.project_id),
}


def get_grace_period_days(project_id: str) -> int:
"""Get grace period for project (with caching)."""
cache_key = f"project_grace:{project_id}"
grace_days = cache.get(cache_key)

if grace_days is None:
from ..models import ProjectTokenConfig
config = ProjectTokenConfig.objects.filter(project_id=project_id).first()

if config:
grace_days = config.grace_period_days
else:
from django.conf import settings
grace_days = getattr(settings, 'DEFAULT_GRACE_PERIOD_DAYS', 7)

# Cache for 1 hour
cache.set(cache_key, grace_days, timeout=3600)

return grace_days

3.3 Cache Validation

# backend/access_control/cache/token_cache.py (continued)

def get_cached_token(token: str) -> Optional[Dict]:
"""
Retrieve token validation data from Redis cache.

Args:
token: Token UUID string

Returns:
Dict with token data or None if cache miss
"""
cache_key = get_cache_key(token)
return cache.get(cache_key)


def validate_from_cache(token: str, ip_address: Optional[str] = None) -> Dict:
"""
Validate token from Redis cache.

Returns:
Dict with validation result:
{
'valid': bool,
'reason': str (if invalid),
'grace_period': bool (if in grace period),
'scope': dict,
'cache_hit': bool
}
"""
cache_data = get_cached_token(token)

if not cache_data:
return {'valid': False, 'cache_hit': False, 'reason': 'cache_miss'}

now = timezone.now()

# Check revocation
if cache_data['revoked_at']:
return {
'valid': False,
'cache_hit': True,
'reason': 'token_revoked',
'revoked_at': cache_data['revoked_at']
}

# Check NDA status
if cache_data['nda_status'] != 'active':
return {
'valid': False,
'cache_hit': True,
'reason': 'nda_inactive',
'nda_status': cache_data['nda_status']
}

# Check expiry
expires_at = timezone.datetime.fromisoformat(cache_data['expires_at'])
is_expired = now > expires_at

if is_expired:
# Check grace period
grace_days = cache_data.get('grace_period_days', 7)
grace_expiry = expires_at + timedelta(days=grace_days)

if now > grace_expiry:
return {
'valid': False,
'cache_hit': True,
'reason': 'token_expired',
'expires_at': cache_data['expires_at']
}

# Within grace period - read-only access
return {
'valid': True,
'cache_hit': True,
'grace_period': True,
'scope': {**cache_data['scope'], 'permissions': {'read': True, 'download': False, 'print': False}},
'expires_at': cache_data['expires_at']
}

# Check IP whitelist
if cache_data['ip_whitelist'] and ip_address:
if not is_ip_whitelisted(ip_address, cache_data['ip_whitelist']):
return {
'valid': False,
'cache_hit': True,
'reason': 'ip_not_whitelisted',
'ip_address': ip_address
}

# Token is valid
return {
'valid': True,
'cache_hit': True,
'grace_period': False,
'scope': cache_data['scope'],
'expires_at': cache_data['expires_at']
}


def is_ip_whitelisted(ip_address: str, whitelist: list) -> bool:
"""
Check if IP address matches whitelist (supports CIDR).
"""
import ipaddress

try:
ip = ipaddress.ip_address(ip_address)
except ValueError:
return False

for allowed in whitelist:
try:
# Try CIDR notation first
if '/' in allowed:
network = ipaddress.ip_network(allowed, strict=False)
if ip in network:
return True
else:
# Exact IP match
if ip == ipaddress.ip_address(allowed):
return True
except ValueError:
continue

return False

3.4 Cache Invalidation

# backend/access_control/cache/token_cache.py (continued)

def invalidate_token_cache(token: str) -> bool:
"""
Remove token from Redis cache immediately.

Called on revocation or manual invalidation.

Args:
token: Token UUID string

Returns:
bool: True if invalidated successfully
"""
cache_key = get_cache_key(token)
try:
cache.delete(cache_key)
return True
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to invalidate cache for token {token}: {e}")
return False


def invalidate_project_tokens(project_id: str) -> int:
"""
Bulk invalidate all tokens for a project.

Used when project configuration changes or project access is suspended.

Returns:
int: Number of tokens invalidated
"""
from ..models import DocumentViewToken

tokens = DocumentViewToken.objects.filter(
project_id=project_id,
revoked_at__isnull=True
).values_list('token', flat=True)

count = 0
for token in tokens:
if invalidate_token_cache(str(token)):
count += 1

return count


def invalidate_nda_tokens(nda_record_id: int) -> int:
"""
Bulk invalidate all tokens for an NDA record.

Used when NDA is revoked or expires.

Returns:
int: Number of tokens invalidated
"""
from ..models import DocumentViewToken

tokens = DocumentViewToken.objects.filter(
nda_record_id=nda_record_id,
revoked_at__isnull=True
).values_list('token', flat=True)

count = 0
for token in tokens:
if invalidate_token_cache(str(token)):
count += 1

return count

3.5 Redis Sentinel Configuration

# backend/config/redis.py

from django.conf import settings
from redis.sentinel import Sentinel


def get_redis_sentinel():
"""
Get Redis Sentinel connection for high availability.

Sentinel monitors multiple Redis instances and provides automatic failover.
"""
sentinels = getattr(settings, 'REDIS_SENTINELS', [
('redis-sentinel-1.internal', 26379),
('redis-sentinel-2.internal', 26379),
('redis-sentinel-3.internal', 26379),
])

sentinel = Sentinel(
sentinels,
socket_timeout=0.5,
socket_connect_timeout=0.5,
sentinel_kwargs={
'password': settings.REDIS_SENTINEL_PASSWORD
}
)

return sentinel


def get_redis_master():
"""Get master Redis connection from Sentinel."""
sentinel = get_redis_sentinel()
master = sentinel.master_for(
'mymaster',
socket_timeout=0.5,
password=settings.REDIS_PASSWORD,
db=settings.REDIS_TOKEN_DB
)
return master


def get_redis_slave():
"""Get slave Redis connection for read-only operations."""
sentinel = get_redis_sentinel()
slave = sentinel.slave_for(
'mymaster',
socket_timeout=0.5,
password=settings.REDIS_PASSWORD,
db=settings.REDIS_TOKEN_DB
)
return slave


# Django cache configuration
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
f'redis://{host}:{port}/0'
for host, port in settings.REDIS_SENTINELS
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.SentinelClient',
'SENTINEL_KWARGS': {
'password': settings.REDIS_SENTINEL_PASSWORD
},
'PASSWORD': settings.REDIS_PASSWORD,
'MASTER_NAME': 'mymaster',
'SOCKET_CONNECT_TIMEOUT': 0.5,
'SOCKET_TIMEOUT': 0.5,
'CONNECTION_POOL_KWARGS': {
'max_connections': 50,
'retry_on_timeout': True
}
}
}
}

4. Token Validation Logic

4.1 Core Validation Methods

# backend/access_control/models/document_view_token.py (methods)

class DocumentViewToken(models.Model):
# ... (model definition from Section 1)

def is_valid(self, ip_address: Optional[str] = None, check_rate_limit: bool = True) -> bool:
"""
Check if token is currently valid.

Validation criteria:
1. Not revoked (revoked_at is NULL)
2. Not expired (expires_at > now OR within grace period)
3. NDA record is active
4. IP address matches whitelist (if configured)
5. Rate limit not exceeded (if check_rate_limit=True)

Args:
ip_address: Client IP address (for whitelist check)
check_rate_limit: Whether to enforce rate limiting

Returns:
bool: True if valid
"""
from django.utils import timezone
from .cache.token_cache import is_ip_whitelisted

# Check revocation
if self.revoked_at:
return False

# Check NDA status
if self.nda_record.status != 'active':
return False

# Check expiry (with grace period)
now = timezone.now()
if now > self.expires_at:
# Check grace period
from .utils.ttl import get_grace_period_expiry
grace_expiry = get_grace_period_expiry(self)
if now > grace_expiry:
return False

# Check IP whitelist
if self.ip_whitelist and ip_address:
if not is_ip_whitelisted(ip_address, self.ip_whitelist):
return False

# Check rate limit
if check_rate_limit:
if not self._check_rate_limit():
return False

return True

def is_expired(self) -> bool:
"""Check if token is past expiry (not including grace period)."""
from django.utils import timezone
return timezone.now() > self.expires_at

def is_revoked(self) -> bool:
"""Check if token has been explicitly revoked."""
return self.revoked_at is not None

def is_within_grace_period(self) -> bool:
"""Check if token is in grace period (read-only access)."""
if not self.is_expired():
return False

from .utils.ttl import is_within_grace_period
return is_within_grace_period(self)

def get_validation_status(self, ip_address: Optional[str] = None) -> Dict:
"""
Get detailed validation status for diagnostics.

Returns:
Dict with status fields:
- valid: bool
- expired: bool
- revoked: bool
- grace_period: bool
- nda_active: bool
- ip_allowed: bool
- rate_limit_ok: bool
- reason: str (if invalid)
"""
from django.utils import timezone
from .cache.token_cache import is_ip_whitelisted

status = {
'valid': True,
'expired': self.is_expired(),
'revoked': self.is_revoked(),
'grace_period': False,
'nda_active': self.nda_record.status == 'active',
'ip_allowed': True,
'rate_limit_ok': True,
'reason': None
}

# Check each validation criterion
if status['revoked']:
status['valid'] = False
status['reason'] = 'token_revoked'
elif not status['nda_active']:
status['valid'] = False
status['reason'] = 'nda_inactive'
elif status['expired']:
if self.is_within_grace_period():
status['grace_period'] = True
# Still valid but limited permissions
else:
status['valid'] = False
status['reason'] = 'token_expired'

# IP whitelist
if self.ip_whitelist and ip_address:
if not is_ip_whitelisted(ip_address, self.ip_whitelist):
status['valid'] = False
status['ip_allowed'] = False
status['reason'] = 'ip_not_whitelisted'

# Rate limit
if not self._check_rate_limit():
status['valid'] = False
status['rate_limit_ok'] = False
status['reason'] = 'rate_limit_exceeded'

return status

4.2 Validate and Refresh

# backend/access_control/models/document_view_token.py (continued)

def validate_and_refresh(self, ip_address: Optional[str] = None) -> Dict:
"""
Validate token and update usage metrics.

This is the main entry point for token validation on document access.

Flow:
1. Check Redis cache for validation
2. If cache miss, validate from database
3. Update last_accessed and access_count
4. Refresh Redis cache
5. Check if auto-renewal is needed

Args:
ip_address: Client IP address

Returns:
Dict with validation result:
{
'valid': bool,
'grace_period': bool,
'scope': dict,
'cache_hit': bool,
'renewed': bool
}
"""
from django.utils import timezone
from .cache.token_cache import validate_from_cache, warm_token_cache

# Try cache first
cache_result = validate_from_cache(str(self.token), ip_address)

if cache_result['cache_hit']:
# Cache hit - use cached validation
if cache_result['valid']:
# Update metrics asynchronously
self._update_usage_metrics_async()

# Check auto-renewal
if self._should_auto_renew():
self._auto_renew()
cache_result['renewed'] = True

return cache_result

# Cache miss - validate from database
is_valid = self.is_valid(ip_address=ip_address)

if is_valid:
# Update metrics synchronously (cache miss = DB hit anyway)
self.last_accessed = timezone.now()
self.access_count += 1
self.save(update_fields=['last_accessed', 'access_count'])

# Warm cache for next request
warm_token_cache(self)

# Check auto-renewal
renewed = False
if self._should_auto_renew():
self._auto_renew()
renewed = True

return {
'valid': True,
'grace_period': self.is_within_grace_period(),
'scope': self.get_effective_scope(),
'cache_hit': False,
'renewed': renewed
}
else:
# Invalid - get detailed reason
status = self.get_validation_status(ip_address)
return {
'valid': False,
'reason': status['reason'],
'cache_hit': False,
'renewed': False
}

def _update_usage_metrics_async(self):
"""Update last_accessed and access_count asynchronously."""
from django_q.tasks import async_task

async_task(
'access_control.tasks.update_token_usage',
str(self.token)
)

def get_effective_scope(self) -> Dict:
"""
Get effective scope considering grace period.

During grace period, permissions are restricted to read-only.
"""
scope = self.scope.copy()

if self.is_within_grace_period():
scope['permissions'] = {
'read': True,
'download': False,
'print': False,
'share': False
}

return scope

4.3 Rate Limiting

# backend/access_control/models/document_view_token.py (continued)

def _check_rate_limit(self) -> bool:
"""
Check if token has exceeded rate limit.

Uses Redis counter with 1-minute sliding window.
Default limit: 100 requests/minute (configurable per project).

Returns:
bool: True if under limit
"""
from django.core.cache import cache
from .cache.token_cache import get_rate_limit_key
from .models import ProjectTokenConfig

# Get project rate limit
config = ProjectTokenConfig.objects.filter(project_id=self.project_id).first()
if config:
max_requests = config.max_requests_per_minute
else:
from django.conf import settings
max_requests = getattr(settings, 'DEFAULT_MAX_REQUESTS_PER_MINUTE', 100)

# Redis key for rate limiting
rl_key = get_rate_limit_key(str(self.token))

# Increment counter (expires after 60 seconds)
try:
current = cache.get(rl_key)
if current is None:
cache.set(rl_key, 1, timeout=60)
return True
elif current >= max_requests:
return False
else:
cache.incr(rl_key)
return True
except Exception:
# Redis failure - allow request (fail open)
return True

def _check_concurrent_sessions(self) -> bool:
"""
Check if NDA record has too many concurrent active tokens.

Prevents token sharing by limiting concurrent sessions per NDA.

Returns:
bool: True if under limit
"""
from django.core.cache import cache
from .cache.token_cache import get_session_count_key
from .models import ProjectTokenConfig

# Get project concurrent session limit
config = ProjectTokenConfig.objects.filter(project_id=self.project_id).first()
if config:
max_sessions = config.max_concurrent_sessions
else:
from django.conf import settings
max_sessions = getattr(settings, 'DEFAULT_MAX_CONCURRENT_SESSIONS', 3)

# Count active tokens for this NDA + project
session_key = get_session_count_key(self.nda_record_id, self.project_id)

try:
active_tokens = cache.get(session_key)
if active_tokens is None:
# Compute from database and cache for 5 minutes
from django.utils import timezone
active_count = DocumentViewToken.objects.filter(
nda_record=self.nda_record,
project_id=self.project_id,
revoked_at__isnull=True,
expires_at__gt=timezone.now()
).count()
cache.set(session_key, active_count, timeout=300)
return active_count <= max_sessions
else:
return active_tokens <= max_sessions
except Exception:
# Redis failure - allow request (fail open)
return True

4.4 Auto-Renewal Logic

# backend/access_control/models/document_view_token.py (continued)

def _should_auto_renew(self) -> bool:
"""
Check if token should be auto-renewed.

Criteria:
1. Auto-renewal enabled for project
2. Token not revoked
3. Within renewal threshold
4. Under max renewal count

Returns:
bool: True if should renew
"""
from .models import ProjectTokenConfig

config = ProjectTokenConfig.objects.filter(project_id=self.project_id).first()

if not config or not config.is_renewal_eligible(self):
return False

from django.utils import timezone
days_until_expiry = (self.expires_at - timezone.now()).days

return days_until_expiry <= config.renewal_threshold_days

def _auto_renew(self):
"""
Automatically renew token expiry.

Extends expires_at by one TTL period and increments renewal count.
"""
from datetime import timedelta
from .models import ProjectTokenConfig
from .cache.token_cache import warm_token_cache

config = ProjectTokenConfig.objects.filter(project_id=self.project_id).first()

if config:
ttl_days = config.token_ttl_days
else:
from django.conf import settings
ttl_days = getattr(settings, 'DEFAULT_TOKEN_TTL_DAYS', 90)

# Extend expiry
self.expires_at += timedelta(days=ttl_days)

# Increment renewal count in scope metadata
renewal_count = self.scope.get('_renewal_count', 0)
self.scope['_renewal_count'] = renewal_count + 1

self.save(update_fields=['expires_at', 'scope'])

# Refresh cache
warm_token_cache(self)

# Log renewal
import logging
logger = logging.getLogger(__name__)
logger.info(f"Auto-renewed token {self.token} (renewal #{renewal_count + 1})")

5. Token Lifecycle Management

5.1 Token Creation

# backend/access_control/services/token_service.py

from typing import Optional, Dict
from datetime import timedelta
from django.utils import timezone
from django.db import transaction

from ..models import DocumentViewToken, NDARecord, ProjectTokenConfig
from ..cache.token_cache import warm_token_cache
from ..utils.ttl import calculate_token_expiry


def create_view_token(
nda_record: NDARecord,
project_id: str,
scope: Dict,
ip_whitelist: Optional[list] = None,
created_by=None
) -> DocumentViewToken:
"""
Create new document view token after NDA verification.

Args:
nda_record: Verified NDA record
project_id: BIO-QMS project identifier
scope: Access scope (categories, audiences, permissions)
ip_whitelist: Optional list of allowed IPs
created_by: User who granted the token

Returns:
DocumentViewToken instance

Raises:
ValueError: If NDA is not active or project config is invalid
"""
if nda_record.status != 'active':
raise ValueError(f"NDA record {nda_record.id} is not active (status: {nda_record.status})")

# Validate project configuration
config = ProjectTokenConfig.objects.filter(project_id=project_id).first()
if config and config.require_ip_whitelist and not ip_whitelist:
raise ValueError(f"Project {project_id} requires IP whitelist")

# Check concurrent session limit
if config:
active_count = DocumentViewToken.objects.filter(
nda_record=nda_record,
project_id=project_id,
revoked_at__isnull=True,
expires_at__gt=timezone.now()
).count()

if active_count >= config.max_concurrent_sessions:
raise ValueError(
f"NDA record {nda_record.id} has reached max concurrent sessions "
f"({config.max_concurrent_sessions}) for project {project_id}"
)

with transaction.atomic():
# Create token
token = DocumentViewToken.objects.create(
nda_record=nda_record,
project_id=project_id,
scope=scope,
ip_whitelist=ip_whitelist,
expires_at=calculate_token_expiry(project_id),
created_by=created_by
)

# Warm Redis cache
warm_token_cache(token)

# Log creation
import logging
logger = logging.getLogger(__name__)
logger.info(
f"Created view token {token.token} for NDA {nda_record.id}, "
f"project {project_id}, expires {token.expires_at.isoformat()}"
)

return token


def create_token_from_nda_verification(nda_verification_result: Dict) -> DocumentViewToken:
"""
Create token from NDA verification workflow result.

Called by NDA verification service after successful verification.

Args:
nda_verification_result: Dict with:
- nda_record: NDARecord instance
- project_id: str
- scope: dict (from NDA document metadata)
- user: User instance (who verified)

Returns:
DocumentViewToken instance
"""
return create_view_token(
nda_record=nda_verification_result['nda_record'],
project_id=nda_verification_result['project_id'],
scope=nda_verification_result['scope'],
created_by=nda_verification_result.get('user')
)

5.2 Token Revocation

# backend/access_control/services/token_service.py (continued)

def revoke_token(token: DocumentViewToken, reason: str = '', revoked_by=None) -> bool:
"""
Explicitly revoke a token.

Args:
token: DocumentViewToken instance
reason: Reason for revocation (logged in notes)
revoked_by: User who revoked the token

Returns:
bool: True if revoked successfully
"""
from django.utils import timezone
from ..cache.token_cache import invalidate_token_cache

if token.revoked_at:
return False # Already revoked

with transaction.atomic():
token.revoked_at = timezone.now()

if reason:
timestamp = timezone.now().isoformat()
revoked_by_info = f" by {revoked_by}" if revoked_by else ""
token.notes += f"\n[{timestamp}] Revoked{revoked_by_info}: {reason}"

token.save(update_fields=['revoked_at', 'notes'])

# Invalidate cache immediately
invalidate_token_cache(str(token.token))

# Log revocation
import logging
logger = logging.getLogger(__name__)
logger.info(f"Revoked token {token.token}: {reason}")

return True


def bulk_revoke_by_nda(nda_record: NDARecord, reason: str = '') -> int:
"""
Revoke all active tokens for an NDA record.

Used when NDA is revoked or company relationship ends.

Args:
nda_record: NDARecord instance
reason: Reason for bulk revocation

Returns:
int: Number of tokens revoked
"""
from django.utils import timezone
from ..cache.token_cache import invalidate_nda_tokens

active_tokens = DocumentViewToken.objects.filter(
nda_record=nda_record,
revoked_at__isnull=True
)

count = active_tokens.count()

if count == 0:
return 0

with transaction.atomic():
now = timezone.now()
timestamp = now.isoformat()
note = f"\n[{timestamp}] Bulk revoked: {reason}"

# Update all tokens
active_tokens.update(
revoked_at=now,
notes=models.F('notes') + note
)

# Invalidate cache for all tokens
invalidate_nda_tokens(nda_record.id)

# Log bulk revocation
import logging
logger = logging.getLogger(__name__)
logger.info(f"Bulk revoked {count} tokens for NDA {nda_record.id}: {reason}")

return count


def bulk_revoke_by_project(project_id: str, reason: str = '') -> int:
"""
Revoke all active tokens for a project.

Used when project access is suspended or project is archived.

Args:
project_id: BIO-QMS project identifier
reason: Reason for bulk revocation

Returns:
int: Number of tokens revoked
"""
from django.utils import timezone
from ..cache.token_cache import invalidate_project_tokens

active_tokens = DocumentViewToken.objects.filter(
project_id=project_id,
revoked_at__isnull=True
)

count = active_tokens.count()

if count == 0:
return 0

with transaction.atomic():
now = timezone.now()
timestamp = now.isoformat()
note = f"\n[{timestamp}] Project revoked: {reason}"

active_tokens.update(
revoked_at=now,
notes=models.F('notes') + note
)

invalidate_project_tokens(project_id)

import logging
logger = logging.getLogger(__name__)
logger.info(f"Bulk revoked {count} tokens for project {project_id}: {reason}")

return count

5.3 Token Expiration & Cleanup

# backend/access_control/management/commands/cleanup_expired_tokens.py

from django.core.management.base import BaseCommand
from django.utils import timezone
from datetime import timedelta

from access_control.models import DocumentViewToken
from access_control.utils.ttl import get_grace_period_expiry


class Command(BaseCommand):
help = 'Archive expired tokens past grace period'

def add_arguments(self, parser):
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be archived without actually archiving'
)
parser.add_argument(
'--batch-size',
type=int,
default=1000,
help='Number of tokens to process per batch'
)

def handle(self, *args, **options):
dry_run = options['dry_run']
batch_size = options['batch_size']

now = timezone.now()

# Find tokens past grace period
expired_tokens = DocumentViewToken.objects.filter(
revoked_at__isnull=True
).select_related('nda_record')

to_archive = []
for token in expired_tokens:
grace_expiry = get_grace_period_expiry(token)
if now > grace_expiry:
to_archive.append(token)

total = len(to_archive)

if dry_run:
self.stdout.write(
self.style.WARNING(f'[DRY RUN] Would archive {total} expired tokens')
)
return

# Archive in batches
archived = 0
for i in range(0, total, batch_size):
batch = to_archive[i:i+batch_size]

# Update to mark as archived (revoked_at = grace_expiry)
for token in batch:
grace_expiry = get_grace_period_expiry(token)
token.revoked_at = grace_expiry
token.notes += f"\n[{now.isoformat()}] Auto-archived after grace period"

DocumentViewToken.objects.bulk_update(
batch,
['revoked_at', 'notes'],
batch_size=batch_size
)

archived += len(batch)
self.stdout.write(f'Archived batch {i//batch_size + 1}: {len(batch)} tokens')

self.stdout.write(
self.style.SUCCESS(f'Successfully archived {archived} expired tokens')
)


# Celery task for scheduled cleanup
from celery import shared_task

@shared_task
def cleanup_expired_tokens_task():
"""
Scheduled task to clean up expired tokens.

Run daily via Celery Beat:
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-tokens': {
'task': 'access_control.tasks.cleanup_expired_tokens_task',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
}
"""
from django.core.management import call_command

call_command('cleanup_expired_tokens', '--batch-size=5000')

5.4 Token Rotation

# backend/access_control/services/token_service.py (continued)

def rotate_token(old_token: DocumentViewToken, reason: str = 'security_rotation') -> DocumentViewToken:
"""
Rotate token by creating new token and revoking old one.

Used for periodic security rotation or after suspicious activity.

Args:
old_token: Existing DocumentViewToken instance
reason: Reason for rotation

Returns:
DocumentViewToken: New token with same permissions
"""
with transaction.atomic():
# Create new token with same configuration
new_token = create_view_token(
nda_record=old_token.nda_record,
project_id=old_token.project_id,
scope=old_token.scope,
ip_whitelist=old_token.ip_whitelist,
created_by=old_token.created_by
)

# Revoke old token
revoke_token(old_token, reason=f"Rotated to {new_token.token}: {reason}")

# Log rotation
import logging
logger = logging.getLogger(__name__)
logger.info(f"Rotated token {old_token.token} -> {new_token.token}: {reason}")

return new_token


def rotate_tokens_by_age(max_age_days: int = 180) -> int:
"""
Rotate tokens older than specified age.

Used for periodic security rotation policy.

Args:
max_age_days: Maximum token age before rotation

Returns:
int: Number of tokens rotated
"""
from datetime import timedelta

cutoff_date = timezone.now() - timedelta(days=max_age_days)

old_tokens = DocumentViewToken.objects.filter(
granted_at__lt=cutoff_date,
revoked_at__isnull=True
)

count = 0
for token in old_tokens:
try:
rotate_token(token, reason=f'age_rotation_{max_age_days}d')
count += 1
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to rotate token {token.token}: {e}")

return count

6. Manager and QuerySet

6.1 Custom Manager

# backend/access_control/managers.py

from django.db import models
from django.utils import timezone
from django.db.models import Q, Count, Max


class DocumentViewTokenQuerySet(models.QuerySet):
"""Custom queryset with common token filters."""

def valid(self):
"""Get currently valid tokens (not revoked, not expired)."""
now = timezone.now()
return self.filter(
revoked_at__isnull=True,
expires_at__gt=now
).select_related('nda_record')

def expired(self, include_grace_period: bool = False):
"""Get expired tokens."""
now = timezone.now()

if include_grace_period:
# Expired but possibly within grace period
return self.filter(expires_at__lt=now)
else:
# Fully expired (past grace period)
from ..utils.ttl import get_grace_period_expiry

# This requires annotation - handle in view layer
return self.filter(expires_at__lt=now)

def revoked(self):
"""Get explicitly revoked tokens."""
return self.filter(revoked_at__isnull=False)

def for_project(self, project_id: str):
"""Get tokens for specific project."""
return self.filter(project_id=project_id)

def for_nda(self, nda_record):
"""Get tokens for specific NDA record."""
return self.filter(nda_record=nda_record)

def for_company(self, company):
"""Get tokens for all NDAs from a company."""
return self.filter(nda_record__company=company)

def active_in_grace_period(self):
"""Get tokens currently in grace period."""
now = timezone.now()

# Tokens that are expired but within grace period
# Requires project config lookup - better as annotated query
return self.filter(
expires_at__lt=now,
revoked_at__isnull=True
)

def with_usage_stats(self):
"""Annotate with usage statistics."""
return self.annotate(
days_active=models.ExpressionWrapper(
models.F('expires_at') - models.F('granted_at'),
output_field=models.DurationField()
),
days_until_expiry=models.ExpressionWrapper(
models.F('expires_at') - timezone.now(),
output_field=models.DurationField()
)
)

def recently_accessed(self, days: int = 7):
"""Get tokens accessed in last N days."""
cutoff = timezone.now() - timezone.timedelta(days=days)
return self.filter(last_accessed__gte=cutoff)

def never_accessed(self):
"""Get tokens that have never been used."""
return self.filter(access_count=0, last_accessed__isnull=True)

def high_usage(self, min_count: int = 100):
"""Get tokens with high access count."""
return self.filter(access_count__gte=min_count)


class DocumentViewTokenManager(models.Manager):
"""Custom manager for DocumentViewToken."""

def get_queryset(self):
return DocumentViewTokenQuerySet(self.model, using=self._db)

def valid(self):
return self.get_queryset().valid()

def expired(self, include_grace_period=False):
return self.get_queryset().expired(include_grace_period)

def revoked(self):
return self.get_queryset().revoked()

def for_project(self, project_id):
return self.get_queryset().for_project(project_id)

def for_nda(self, nda_record):
return self.get_queryset().for_nda(nda_record)

def for_company(self, company):
return self.get_queryset().for_company(company)

def create_from_nda(self, nda_record, project_id, scope, **kwargs):
"""
Create token from NDA verification.

Convenience method that handles TTL calculation and cache warming.
"""
from ..services.token_service import create_view_token
return create_view_token(nda_record, project_id, scope, **kwargs)

def bulk_revoke_by_nda(self, nda_record, reason=''):
"""Bulk revoke all tokens for NDA."""
from ..services.token_service import bulk_revoke_by_nda
return bulk_revoke_by_nda(nda_record, reason)

def bulk_revoke_by_project(self, project_id, reason=''):
"""Bulk revoke all tokens for project."""
from ..services.token_service import bulk_revoke_by_project
return bulk_revoke_by_project(project_id, reason)

def get_usage_summary(self, project_id=None):
"""
Get usage summary statistics.

Returns:
Dict with token counts by status
"""
qs = self.get_queryset()
if project_id:
qs = qs.for_project(project_id)

now = timezone.now()

return {
'total': qs.count(),
'valid': qs.valid().count(),
'expired': qs.filter(expires_at__lt=now).count(),
'revoked': qs.revoked().count(),
'never_used': qs.never_accessed().count(),
'recent_access': qs.recently_accessed(days=7).count(),
}

7. Django Admin Interface

7.1 TokenAdmin Configuration

# backend/access_control/admin.py

from django.contrib import admin
from django.utils.html import format_html
from django.utils import timezone
from django.urls import reverse
from django.db.models import Count, Q

from .models import DocumentViewToken, ProjectTokenConfig
from .services.token_service import revoke_token, rotate_token


@admin.register(DocumentViewToken)
class DocumentViewTokenAdmin(admin.ModelAdmin):
"""Django admin for DocumentViewToken."""

list_display = [
'token_short',
'project_id',
'nda_record_link',
'status_badge',
'granted_at',
'expires_at',
'access_count',
'last_accessed',
]

list_filter = [
'project_id',
'granted_at',
'expires_at',
('revoked_at', admin.EmptyFieldListFilter),
'nda_record__status',
]

search_fields = [
'token',
'project_id',
'nda_record__id',
'nda_record__company__name',
'notes',
]

readonly_fields = [
'token',
'granted_at',
'last_accessed',
'access_count',
'status_display',
'days_until_expiry',
'usage_stats',
]

fieldsets = [
('Token Information', {
'fields': ['token', 'status_display', 'days_until_expiry']
}),
('Access Control', {
'fields': ['nda_record', 'project_id', 'scope', 'ip_whitelist']
}),
('Temporal', {
'fields': ['granted_at', 'expires_at', 'revoked_at']
}),
('Usage', {
'fields': ['access_count', 'last_accessed', 'usage_stats']
}),
('Metadata', {
'fields': ['created_by', 'notes'],
'classes': ['collapse']
}),
]

actions = [
'revoke_selected',
'rotate_selected',
'extend_expiry_30d',
'extend_expiry_90d',
]

def token_short(self, obj):
"""Display first 8 chars of token."""
return f"{str(obj.token)[:8]}..."
token_short.short_description = 'Token'

def nda_record_link(self, obj):
"""Link to NDA record."""
url = reverse('admin:access_control_ndarecord_change', args=[obj.nda_record_id])
return format_html('<a href="{}">{}</a>', url, obj.nda_record_id)
nda_record_link.short_description = 'NDA Record'

def status_badge(self, obj):
"""Visual status badge."""
if obj.revoked_at:
color = 'red'
text = 'REVOKED'
elif obj.expires_at < timezone.now():
if obj.is_within_grace_period():
color = 'orange'
text = 'GRACE PERIOD'
else:
color = 'darkred'
text = 'EXPIRED'
else:
color = 'green'
text = 'VALID'

return format_html(
'<span style="background:{}; color:white; padding:3px 8px; border-radius:3px;">{}</span>',
color, text
)
status_badge.short_description = 'Status'

def status_display(self, obj):
"""Detailed status for readonly field."""
status = obj.get_validation_status()

lines = [
f"Valid: {status['valid']}",
f"Expired: {status['expired']}",
f"Revoked: {status['revoked']}",
f"Grace Period: {status['grace_period']}",
f"NDA Active: {status['nda_active']}",
]

if status['reason']:
lines.append(f"Reason: {status['reason']}")

return '\n'.join(lines)
status_display.short_description = 'Validation Status'

def days_until_expiry(self, obj):
"""Days until expiry (negative if expired)."""
delta = obj.expires_at - timezone.now()
days = delta.days

if days < 0:
return format_html('<span style="color:red;">{} days ago</span>', abs(days))
elif days < 7:
return format_html('<span style="color:orange;">{} days</span>', days)
else:
return f"{days} days"
days_until_expiry.short_description = 'Days Until Expiry'

def usage_stats(self, obj):
"""Display usage statistics."""
if obj.access_count == 0:
return "Never used"

if obj.last_accessed:
time_since = timezone.now() - obj.last_accessed
hours = int(time_since.total_seconds() / 3600)

if hours < 1:
recency = "< 1 hour ago"
elif hours < 24:
recency = f"{hours} hours ago"
else:
recency = f"{hours // 24} days ago"
else:
recency = "Unknown"

return f"{obj.access_count} accesses, last: {recency}"
usage_stats.short_description = 'Usage Statistics'

# Admin actions

@admin.action(description='Revoke selected tokens')
def revoke_selected(self, request, queryset):
"""Revoke selected tokens."""
count = 0
for token in queryset.filter(revoked_at__isnull=True):
if revoke_token(token, reason=f'Admin revocation by {request.user}', revoked_by=request.user):
count += 1

self.message_user(request, f'Revoked {count} tokens.')

@admin.action(description='Rotate selected tokens')
def rotate_selected(self, request, queryset):
"""Rotate selected tokens (create new, revoke old)."""
count = 0
for token in queryset.filter(revoked_at__isnull=True):
try:
new_token = rotate_token(token, reason=f'Admin rotation by {request.user}')
count += 1
except Exception as e:
self.message_user(request, f'Failed to rotate {token.token}: {e}', level='error')

self.message_user(request, f'Rotated {count} tokens.')

@admin.action(description='Extend expiry by 30 days')
def extend_expiry_30d(self, request, queryset):
"""Extend token expiry by 30 days."""
from datetime import timedelta

count = queryset.filter(revoked_at__isnull=True).update(
expires_at=models.F('expires_at') + timedelta(days=30)
)

# Invalidate cache for updated tokens
from .cache.token_cache import invalidate_token_cache
for token in queryset:
invalidate_token_cache(str(token.token))

self.message_user(request, f'Extended expiry for {count} tokens by 30 days.')

@admin.action(description='Extend expiry by 90 days')
def extend_expiry_90d(self, request, queryset):
"""Extend token expiry by 90 days."""
from datetime import timedelta

count = queryset.filter(revoked_at__isnull=True).update(
expires_at=models.F('expires_at') + timedelta(days=90)
)

from .cache.token_cache import invalidate_token_cache
for token in queryset:
invalidate_token_cache(str(token.token))

self.message_user(request, f'Extended expiry for {count} tokens by 90 days.')

def get_queryset(self, request):
"""Optimize queryset with select_related."""
qs = super().get_queryset(request)
return qs.select_related('nda_record', 'nda_record__company', 'created_by')


@admin.register(ProjectTokenConfig)
class ProjectTokenConfigAdmin(admin.ModelAdmin):
"""Django admin for ProjectTokenConfig."""

list_display = [
'project_id',
'token_ttl_days',
'grace_period_days',
'auto_renewal_enabled',
'max_requests_per_minute',
'max_concurrent_sessions',
]

list_filter = [
'auto_renewal_enabled',
'require_ip_whitelist',
]

search_fields = ['project_id', 'notes']

fieldsets = [
('Project', {
'fields': ['project_id']
}),
('TTL Settings', {
'fields': ['token_ttl_days', 'grace_period_days']
}),
('Auto-Renewal', {
'fields': [
'auto_renewal_enabled',
'renewal_threshold_days',
'max_renewals'
]
}),
('Rate Limiting', {
'fields': [
'max_requests_per_minute',
'max_concurrent_sessions'
]
}),
('Access Policies', {
'fields': [
'require_ip_whitelist',
'allowed_document_categories'
]
}),
('Metadata', {
'fields': ['created_by', 'notes'],
'classes': ['collapse']
}),
]

8. Serializers and Views

8.1 DRF Serializers

# backend/access_control/serializers.py

from rest_framework import serializers
from django.utils import timezone

from .models import DocumentViewToken, ProjectTokenConfig


class TokenValidationSerializer(serializers.Serializer):
"""
Lightweight serializer for token validation (used on every request).

Minimal payload for performance.
"""
token = serializers.UUIDField()
ip_address = serializers.IPAddressField(required=False, allow_null=True)

def validate(self, data):
"""Validate that token exists and is valid."""
token_uuid = data['token']
ip_address = data.get('ip_address')

try:
token = DocumentViewToken.objects.get(token=token_uuid)
except DocumentViewToken.DoesNotExist:
raise serializers.ValidationError({'token': 'Invalid token'})

# Validate and refresh
result = token.validate_and_refresh(ip_address=ip_address)

if not result['valid']:
raise serializers.ValidationError({
'token': f"Token validation failed: {result.get('reason', 'unknown')}"
})

data['validation_result'] = result
data['token_obj'] = token

return data


class TokenGrantSerializer(serializers.Serializer):
"""
Serializer for token creation flow.
"""
nda_record_id = serializers.IntegerField()
project_id = serializers.CharField(max_length=100)
scope = serializers.JSONField()
ip_whitelist = serializers.ListField(
child=serializers.CharField(),
required=False,
allow_null=True
)

def validate_nda_record_id(self, value):
"""Validate NDA record exists and is active."""
from .models import NDARecord

try:
nda = NDARecord.objects.get(id=value)
except NDARecord.DoesNotExist:
raise serializers.ValidationError("NDA record not found")

if nda.status != 'active':
raise serializers.ValidationError(f"NDA record is not active (status: {nda.status})")

return value

def validate_project_id(self, value):
"""Validate project exists."""
# TODO: Add project registry validation
return value

def validate_scope(self, value):
"""Validate scope structure."""
required_keys = ['document_categories', 'audience_levels', 'permissions']

for key in required_keys:
if key not in value:
raise serializers.ValidationError(f"Missing required key: {key}")

# Validate permissions
valid_permissions = {'read', 'download', 'print', 'share'}
for perm in value['permissions'].keys():
if perm not in valid_permissions:
raise serializers.ValidationError(f"Invalid permission: {perm}")

return value

def create(self, validated_data):
"""Create token from validated data."""
from .services.token_service import create_view_token
from .models import NDARecord

nda = NDARecord.objects.get(id=validated_data['nda_record_id'])

return create_view_token(
nda_record=nda,
project_id=validated_data['project_id'],
scope=validated_data['scope'],
ip_whitelist=validated_data.get('ip_whitelist'),
created_by=self.context.get('request').user
)


class TokenInfoSerializer(serializers.ModelSerializer):
"""
Detailed serializer for token status and metadata.
"""
status = serializers.SerializerMethodField()
days_until_expiry = serializers.SerializerMethodField()
nda_company = serializers.CharField(source='nda_record.company.name', read_only=True)
validation_status = serializers.SerializerMethodField()

class Meta:
model = DocumentViewToken
fields = [
'token',
'project_id',
'nda_company',
'status',
'granted_at',
'expires_at',
'revoked_at',
'days_until_expiry',
'access_count',
'last_accessed',
'scope',
'ip_whitelist',
'validation_status',
]
read_only_fields = fields

def get_status(self, obj):
"""Get human-readable status."""
if obj.revoked_at:
return 'revoked'
elif obj.is_expired():
if obj.is_within_grace_period():
return 'grace_period'
else:
return 'expired'
else:
return 'valid'

def get_days_until_expiry(self, obj):
"""Days until expiry (negative if expired)."""
delta = obj.expires_at - timezone.now()
return delta.days

def get_validation_status(self, obj):
"""Detailed validation status."""
return obj.get_validation_status()


class ProjectTokenConfigSerializer(serializers.ModelSerializer):
"""Serializer for project token configuration."""

class Meta:
model = ProjectTokenConfig
fields = '__all__'
read_only_fields = ['created_at', 'updated_at']

8.2 API Views

# backend/access_control/views.py

from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated

from .models import DocumentViewToken, ProjectTokenConfig
from .serializers import (
TokenValidationSerializer,
TokenGrantSerializer,
TokenInfoSerializer,
ProjectTokenConfigSerializer
)
from .services.token_service import revoke_token, rotate_token


class DocumentViewTokenViewSet(viewsets.ReadOnlyModelViewSet):
"""
API endpoints for DocumentViewToken.

List and retrieve are read-only.
Creation handled via /grant endpoint.
"""
queryset = DocumentViewToken.objects.all().select_related('nda_record', 'created_by')
serializer_class = TokenInfoSerializer
permission_classes = [IsAuthenticated]
filterset_fields = ['project_id', 'nda_record']
search_fields = ['token', 'project_id', 'notes']
ordering_fields = ['granted_at', 'expires_at', 'access_count']
ordering = ['-granted_at']

@action(detail=False, methods=['post'])
def validate(self, request):
"""
Validate token.

POST /api/tokens/validate/
Body: {"token": "uuid", "ip_address": "1.2.3.4"}

Returns validation result.
"""
serializer = TokenValidationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)

result = serializer.validated_data['validation_result']

return Response({
'valid': result['valid'],
'grace_period': result.get('grace_period', False),
'scope': result.get('scope'),
'cache_hit': result.get('cache_hit', False)
})

@action(detail=False, methods=['post'])
def grant(self, request):
"""
Create new token after NDA verification.

POST /api/tokens/grant/
Body: {
"nda_record_id": 123,
"project_id": "bio-qms-core",
"scope": {...},
"ip_whitelist": ["1.2.3.4"]
}

Returns created token.
"""
serializer = TokenGrantSerializer(data=request.data, context={'request': request})
serializer.is_valid(raise_exception=True)

token = serializer.save()

return Response(
TokenInfoSerializer(token).data,
status=status.HTTP_201_CREATED
)

@action(detail=True, methods=['post'])
def revoke(self, request, pk=None):
"""
Revoke token.

POST /api/tokens/{uuid}/revoke/
Body: {"reason": "Security incident"}
"""
token = self.get_object()
reason = request.data.get('reason', '')

if revoke_token(token, reason=reason, revoked_by=request.user):
return Response({'status': 'revoked'})
else:
return Response(
{'error': 'Token already revoked'},
status=status.HTTP_400_BAD_REQUEST
)

@action(detail=True, methods=['post'])
def rotate(self, request, pk=None):
"""
Rotate token (create new, revoke old).

POST /api/tokens/{uuid}/rotate/
Body: {"reason": "Periodic rotation"}
"""
old_token = self.get_object()
reason = request.data.get('reason', 'API rotation')

try:
new_token = rotate_token(old_token, reason=reason)
return Response(TokenInfoSerializer(new_token).data)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)

@action(detail=False, methods=['get'])
def usage_summary(self, request):
"""
Get usage summary statistics.

GET /api/tokens/usage_summary/?project_id=bio-qms-core
"""
project_id = request.query_params.get('project_id')
summary = DocumentViewToken.objects.get_usage_summary(project_id=project_id)
return Response(summary)


class ProjectTokenConfigViewSet(viewsets.ModelViewSet):
"""
API endpoints for ProjectTokenConfig.

CRUD operations for project token configurations.
"""
queryset = ProjectTokenConfig.objects.all()
serializer_class = ProjectTokenConfigSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'project_id'

9. Database Design

9.1 Migration

# backend/access_control/migrations/0001_initial.py

from django.db import migrations, models
import django.db.models.deletion
import uuid


class Migration(migrations.Migration):

initial = True

dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
# Assumes NDARecord exists
('access_control', '0001_create_nda_record'),
]

operations = [
migrations.CreateModel(
name='ProjectTokenConfig',
fields=[
('project_id', models.CharField(max_length=100, primary_key=True, serialize=False)),
('token_ttl_days', models.IntegerField(default=90)),
('grace_period_days', models.IntegerField(default=7)),
('auto_renewal_enabled', models.BooleanField(default=False)),
('renewal_threshold_days', models.IntegerField(default=10)),
('max_renewals', models.IntegerField(default=3)),
('max_requests_per_minute', models.IntegerField(default=100)),
('max_concurrent_sessions', models.IntegerField(default=3)),
('require_ip_whitelist', models.BooleanField(default=False)),
('allowed_document_categories', models.JSONField(default=list)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('notes', models.TextField(blank=True)),
('created_by', models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='created_token_configs',
to='auth.user'
)),
],
options={
'db_table': 'access_control_project_token_config',
'verbose_name': 'Project Token Configuration',
'verbose_name_plural': 'Project Token Configurations',
},
),

migrations.CreateModel(
name='DocumentViewToken',
fields=[
('token', models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False
)),
('project_id', models.CharField(db_index=True, max_length=100)),
('granted_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('expires_at', models.DateTimeField(db_index=True)),
('revoked_at', models.DateTimeField(blank=True, db_index=True, null=True)),
('last_accessed', models.DateTimeField(blank=True, null=True)),
('access_count', models.IntegerField(default=0)),
('ip_whitelist', models.JSONField(blank=True, null=True)),
('scope', models.JSONField(default=dict)),
('notes', models.TextField(blank=True)),
('created_by', models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='created_tokens',
to='auth.user'
)),
('nda_record', models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
related_name='view_tokens',
to='access_control.ndarecord'
)),
],
options={
'db_table': 'access_control_document_view_token',
'verbose_name': 'Document View Token',
'verbose_name_plural': 'Document View Tokens',
'ordering': ['-granted_at'],
},
),

# Indexes
migrations.AddIndex(
model_name='documentviewtoken',
index=models.Index(fields=['nda_record', 'project_id'], name='dvt_nda_project_idx'),
),
migrations.AddIndex(
model_name='documentviewtoken',
index=models.Index(fields=['expires_at', 'revoked_at'], name='dvt_expiry_idx'),
),
migrations.AddIndex(
model_name='documentviewtoken',
index=models.Index(fields=['granted_at'], name='dvt_granted_idx'),
),

# PostgreSQL-specific hash index
migrations.RunSQL(
sql="CREATE INDEX dvt_token_hash_idx ON access_control_document_view_token USING hash (token);",
reverse_sql="DROP INDEX IF EXISTS dvt_token_hash_idx;"
),

# Check constraints
migrations.AddConstraint(
model_name='documentviewtoken',
constraint=models.CheckConstraint(
check=models.Q(
models.Q(revoked_at__gte=models.F('granted_at')),
models.Q(revoked_at__isnull=True),
_connector='OR'
),
name='dvt_revoked_after_granted'
),
),
migrations.AddConstraint(
model_name='documentviewtoken',
constraint=models.CheckConstraint(
check=models.Q(expires_at__gt=models.F('granted_at')),
name='dvt_expires_after_granted'
),
),
]

9.2 Indexes and Performance

Index Strategy:

  1. Primary Lookup (token UUID): Hash index for O(1) lookup
  2. NDA + Project Lookup: Composite B-tree for filtering by NDA and project
  3. Expiry Queries: Composite B-tree on expires_at + revoked_at for cleanup
  4. Time-Series: B-tree on granted_at for archival queries

Query Patterns:

-- Token validation (most frequent)
SELECT * FROM access_control_document_view_token WHERE token = $1;
-- Uses: dvt_token_hash_idx (hash index)

-- Active tokens for NDA + project
SELECT * FROM access_control_document_view_token
WHERE nda_record_id = $1 AND project_id = $2 AND revoked_at IS NULL;
-- Uses: dvt_nda_project_idx

-- Expired tokens cleanup
SELECT * FROM access_control_document_view_token
WHERE expires_at < $1 AND revoked_at IS NULL;
-- Uses: dvt_expiry_idx

-- Archival queries
SELECT * FROM access_control_document_view_token
WHERE granted_at < $1;
-- Uses: dvt_granted_idx

9.3 Partitioning Strategy

For high-volume deployments (>10M tokens), partition by granted_at:

-- Convert to partitioned table
CREATE TABLE access_control_document_view_token_new (
LIKE access_control_document_view_token INCLUDING ALL
) PARTITION BY RANGE (granted_at);

-- Create monthly partitions
CREATE TABLE access_control_document_view_token_2026_01
PARTITION OF access_control_document_view_token_new
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');

CREATE TABLE access_control_document_view_token_2026_02
PARTITION OF access_control_document_view_token_new
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

-- Migrate data
INSERT INTO access_control_document_view_token_new
SELECT * FROM access_control_document_view_token;

-- Swap tables
BEGIN;
ALTER TABLE access_control_document_view_token RENAME TO access_control_document_view_token_old;
ALTER TABLE access_control_document_view_token_new RENAME TO access_control_document_view_token;
COMMIT;

-- Auto-create partitions with pg_partman extension
SELECT create_parent(
'public.access_control_document_view_token',
'granted_at',
'native',
'monthly'
);

9.4 Archival Policy

# backend/access_control/management/commands/archive_old_tokens.py

from django.core.management.base import BaseCommand
from django.utils import timezone
from datetime import timedelta

class Command(BaseCommand):
help = 'Archive tokens older than retention period to cold storage'

def add_arguments(self, parser):
parser.add_argument(
'--retention-days',
type=int,
default=365,
help='Archive tokens older than this many days'
)

def handle(self, *args, **options):
cutoff = timezone.now() - timedelta(days=options['retention_days'])

old_tokens = DocumentViewToken.objects.filter(
granted_at__lt=cutoff
)

# Export to S3/GCS before deletion
from .utils.archive import export_tokens_to_cold_storage
export_tokens_to_cold_storage(old_tokens)

# Delete from active table
count = old_tokens.delete()[0]

self.stdout.write(
self.style.SUCCESS(f'Archived {count} tokens older than {options["retention_days"]} days')
)

10. Security Considerations

10.1 Token Entropy Analysis

UUID v4 Entropy:

  • 128 bits total
  • 6 bits reserved for version/variant
  • 122 bits effective entropy
  • Collision probability: ~2^-61 for 1 billion tokens

Brute Force Resistance:

  • At 1M attempts/second: 10^20 years to exhaustion
  • At rate limit (100 req/min): effectively infinite

Recommendation: UUID v4 provides sufficient entropy for document access tokens.

10.2 Token Transmission Security

# backend/access_control/middleware.py

class SecureTokenMiddleware:
"""
Middleware to enforce secure token transmission.

Sets security headers for token cookies.
"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
response = self.get_response(request)

# If setting token cookie, add security flags
if 'dvt' in response.cookies:
response.cookies['dvt']['httponly'] = True
response.cookies['dvt']['secure'] = True # HTTPS only
response.cookies['dvt']['samesite'] = 'Strict' # CSRF protection
response.cookies['dvt']['max_age'] = 3600 # 1 hour

return response


# View that sets token cookie
from django.http import HttpResponse

def set_token_cookie(request, token):
"""
Set DocumentViewToken in httpOnly cookie.

Cookie flags:
- httpOnly: JavaScript cannot access (XSS protection)
- Secure: HTTPS only
- SameSite=Strict: CSRF protection
- Max-Age: 1 hour (short-lived)
"""
response = HttpResponse()
response.set_cookie(
key='dvt',
value=str(token.token),
httponly=True,
secure=True,
samesite='Strict',
max_age=3600
)
return response

10.3 Brute Force Protection

# backend/access_control/middleware.py (continued)

from django.core.cache import cache
from django.http import HttpResponseForbidden

class TokenBruteForceProtection:
"""
Middleware to detect and block token brute force attempts.

Tracks failed validation attempts per IP address.
Blocks IP after threshold (10 failures in 5 minutes).
"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
ip = self.get_client_ip(request)

# Check if IP is blocked
if self.is_blocked(ip):
return HttpResponseForbidden('Rate limit exceeded. Try again later.')

response = self.get_response(request)

# Track failed validation attempts
if hasattr(request, '_token_validation_failed'):
self.record_failure(ip)

return response

def get_client_ip(self, request):
"""Get client IP address from request."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0]
return request.META.get('REMOTE_ADDR')

def is_blocked(self, ip):
"""Check if IP is currently blocked."""
block_key = f'bf_block:{ip}'
return cache.get(block_key) is not None

def record_failure(self, ip):
"""Record failed validation attempt."""
fail_key = f'bf_fail:{ip}'

failures = cache.get(fail_key, 0)
failures += 1

# Store for 5 minutes
cache.set(fail_key, failures, timeout=300)

# Block if threshold exceeded
if failures >= 10:
block_key = f'bf_block:{ip}'
cache.set(block_key, True, timeout=3600) # Block for 1 hour

import logging
logger = logging.getLogger(__name__)
logger.warning(f'Blocked IP {ip} for brute force attempts (failures: {failures})')

10.4 Token Binding (Optional)

# backend/access_control/models/document_view_token.py (optional feature)

class DocumentViewToken(models.Model):
# ... (existing fields)

# Optional token binding fields
bound_user_agent = models.CharField(
max_length=500,
blank=True,
help_text="User-Agent string token is bound to"
)

bound_ip_address = models.GenericIPAddressField(
null=True,
blank=True,
help_text="IP address token is bound to"
)

def is_binding_valid(self, request):
"""
Check if request matches token binding.

Token binding prevents token theft by tying token to specific context.

Args:
request: Django request object

Returns:
bool: True if binding matches or no binding set
"""
# Check User-Agent binding
if self.bound_user_agent:
request_ua = request.META.get('HTTP_USER_AGENT', '')
if request_ua != self.bound_user_agent:
return False

# Check IP binding
if self.bound_ip_address:
from ..middleware import TokenBruteForceProtection
request_ip = TokenBruteForceProtection().get_client_ip(request)
if request_ip != self.bound_ip_address:
return False

return True


# Enable token binding in create_view_token
def create_view_token(
nda_record,
project_id,
scope,
bind_to_request=None, # New parameter
**kwargs
):
"""Create token with optional binding."""

binding_fields = {}

if bind_to_request:
binding_fields['bound_user_agent'] = bind_to_request.META.get('HTTP_USER_AGENT', '')

from ..middleware import TokenBruteForceProtection
binding_fields['bound_ip_address'] = TokenBruteForceProtection().get_client_ip(bind_to_request)

token = DocumentViewToken.objects.create(
nda_record=nda_record,
project_id=project_id,
scope=scope,
**binding_fields,
**kwargs
)

return token

10.5 Audit Logging

# backend/access_control/models/token_access_log.py

from django.db import models

class TokenAccessLog(models.Model):
"""
Audit log for token access attempts.

Separate table for high-volume writes without affecting token table.
"""

token = models.UUIDField(db_index=True)
timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
ip_address = models.GenericIPAddressField()
user_agent = models.CharField(max_length=500)
result = models.CharField(
max_length=20,
choices=[
('success', 'Success'),
('expired', 'Expired'),
('revoked', 'Revoked'),
('invalid_ip', 'Invalid IP'),
('rate_limited', 'Rate Limited'),
]
)
document_id = models.CharField(max_length=100, blank=True)

class Meta:
db_table = 'access_control_token_access_log'
indexes = [
models.Index(fields=['token', 'timestamp']),
models.Index(fields=['timestamp']),
models.Index(fields=['ip_address', 'timestamp']),
]
# Partition by timestamp for time-series data

@classmethod
def log_access(cls, token, request, result, document_id=''):
"""Log token access attempt."""
from ..middleware import TokenBruteForceProtection

cls.objects.create(
token=token.token if hasattr(token, 'token') else token,
ip_address=TokenBruteForceProtection().get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
result=result,
document_id=document_id
)

11. Testing Strategy

11.1 Unit Tests

# backend/access_control/tests/test_document_view_token.py

from django.test import TestCase
from django.utils import timezone
from datetime import timedelta

from ..models import DocumentViewToken, NDARecord, ProjectTokenConfig
from ..services.token_service import create_view_token, revoke_token


class DocumentViewTokenTestCase(TestCase):

def setUp(self):
"""Set up test fixtures."""
# Create test NDA record
self.nda = NDARecord.objects.create(
company_id=1,
status='active'
)

# Create test project config
self.config = ProjectTokenConfig.objects.create(
project_id='test-project',
token_ttl_days=90,
grace_period_days=7
)

# Create test token
self.token = create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={
'document_categories': ['regulatory'],
'audience_levels': ['internal'],
'permissions': {'read': True, 'download': True}
}
)

def test_token_creation(self):
"""Test token is created with correct attributes."""
self.assertIsNotNone(self.token.token)
self.assertEqual(self.token.project_id, 'test-project')
self.assertEqual(self.token.nda_record, self.nda)
self.assertIsNone(self.token.revoked_at)
self.assertEqual(self.token.access_count, 0)

def test_token_expiry_calculation(self):
"""Test expiry date is correctly calculated from TTL."""
expected_expiry = self.token.granted_at + timedelta(days=90)

# Allow 1 second tolerance for test execution time
self.assertAlmostEqual(
self.token.expires_at.timestamp(),
expected_expiry.timestamp(),
delta=1
)

def test_is_valid_fresh_token(self):
"""Test is_valid returns True for fresh token."""
self.assertTrue(self.token.is_valid())

def test_is_valid_revoked_token(self):
"""Test is_valid returns False for revoked token."""
revoke_token(self.token, reason='test')
self.assertFalse(self.token.is_valid())

def test_is_valid_expired_token(self):
"""Test is_valid returns False for expired token past grace period."""
# Set expiry to 8 days ago (past grace period)
self.token.expires_at = timezone.now() - timedelta(days=8)
self.token.save()

self.assertFalse(self.token.is_valid())

def test_is_within_grace_period(self):
"""Test grace period detection."""
# Set expiry to 3 days ago (within 7-day grace period)
self.token.expires_at = timezone.now() - timedelta(days=3)
self.token.save()

self.assertTrue(self.token.is_within_grace_period())
# Token should still be valid during grace period
self.assertTrue(self.token.is_valid())

def test_ip_whitelist_validation(self):
"""Test IP whitelist enforcement."""
self.token.ip_whitelist = ['192.168.1.100', '10.0.0.0/8']
self.token.save()

# Valid IPs
self.assertTrue(self.token.is_valid(ip_address='192.168.1.100'))
self.assertTrue(self.token.is_valid(ip_address='10.0.50.100'))

# Invalid IP
self.assertFalse(self.token.is_valid(ip_address='1.2.3.4'))

def test_validate_and_refresh_updates_metrics(self):
"""Test validate_and_refresh updates access metrics."""
initial_count = self.token.access_count

result = self.token.validate_and_refresh()

self.assertTrue(result['valid'])
self.token.refresh_from_db()
self.assertEqual(self.token.access_count, initial_count + 1)
self.assertIsNotNone(self.token.last_accessed)

def test_grace_period_scope_restriction(self):
"""Test permissions are restricted during grace period."""
# Expire token to trigger grace period
self.token.expires_at = timezone.now() - timedelta(days=3)
self.token.save()

effective_scope = self.token.get_effective_scope()

# Permissions should be read-only
self.assertTrue(effective_scope['permissions']['read'])
self.assertFalse(effective_scope['permissions']['download'])

def test_revocation(self):
"""Test token revocation."""
self.assertTrue(revoke_token(self.token, reason='Test revocation'))

self.token.refresh_from_db()
self.assertIsNotNone(self.token.revoked_at)
self.assertIn('Test revocation', self.token.notes)

def test_bulk_revoke_by_nda(self):
"""Test bulk revocation by NDA record."""
from ..services.token_service import bulk_revoke_by_nda

# Create multiple tokens
for _ in range(5):
create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={}
)

count = bulk_revoke_by_nda(self.nda, reason='Test bulk revoke')

# 6 tokens total (1 from setUp + 5 new)
self.assertEqual(count, 6)

11.2 Integration Tests

# backend/access_control/tests/test_token_views.py

from rest_framework.test import APITestCase
from rest_framework import status
from django.contrib.auth import get_user_model

from ..models import NDARecord, DocumentViewToken


User = get_user_model()


class TokenAPITestCase(APITestCase):

def setUp(self):
"""Set up test client and fixtures."""
self.user = User.objects.create_user(
username='testuser',
password='testpass'
)
self.client.force_authenticate(user=self.user)

self.nda = NDARecord.objects.create(
company_id=1,
status='active'
)

def test_grant_token(self):
"""Test POST /api/tokens/grant/ creates token."""
data = {
'nda_record_id': self.nda.id,
'project_id': 'test-project',
'scope': {
'document_categories': ['regulatory'],
'audience_levels': ['internal'],
'permissions': {'read': True, 'download': True}
}
}

response = self.client.post('/api/tokens/grant/', data, format='json')

self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertIn('token', response.data)

# Verify token was created
token_uuid = response.data['token']
token = DocumentViewToken.objects.get(token=token_uuid)
self.assertEqual(token.project_id, 'test-project')

def test_validate_token(self):
"""Test POST /api/tokens/validate/ validates token."""
from ..services.token_service import create_view_token

token = create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={}
)

data = {
'token': str(token.token),
'ip_address': '192.168.1.100'
}

response = self.client.post('/api/tokens/validate/', data, format='json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertTrue(response.data['valid'])

def test_revoke_token(self):
"""Test POST /api/tokens/{uuid}/revoke/ revokes token."""
from ..services.token_service import create_view_token

token = create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={}
)

response = self.client.post(
f'/api/tokens/{token.token}/revoke/',
{'reason': 'Test revocation'},
format='json'
)

self.assertEqual(response.status_code, status.HTTP_200_OK)

token.refresh_from_db()
self.assertIsNotNone(token.revoked_at)

11.3 Cache Tests

# backend/access_control/tests/test_token_cache.py

from django.test import TestCase
from django.core.cache import cache

from ..models import DocumentViewToken, NDARecord
from ..cache.token_cache import (
warm_token_cache,
get_cached_token,
validate_from_cache,
invalidate_token_cache
)
from ..services.token_service import create_view_token


class TokenCacheTestCase(TestCase):

def setUp(self):
"""Set up test fixtures."""
cache.clear()

self.nda = NDARecord.objects.create(
company_id=1,
status='active'
)

self.token = create_view_token(
nda_record=self.nda,
project_id='test-project',
scope={}
)

def test_warm_token_cache(self):
"""Test token cache warming."""
result = warm_token_cache(self.token)
self.assertTrue(result)

# Verify cache entry exists
cached = get_cached_token(str(self.token.token))
self.assertIsNotNone(cached)
self.assertEqual(cached['project_id'], 'test-project')

def test_validate_from_cache_hit(self):
"""Test cache validation with cache hit."""
warm_token_cache(self.token)

result = validate_from_cache(str(self.token.token))

self.assertTrue(result['cache_hit'])
self.assertTrue(result['valid'])

def test_validate_from_cache_miss(self):
"""Test cache validation with cache miss."""
# Don't warm cache
result = validate_from_cache(str(self.token.token))

self.assertFalse(result['cache_hit'])
self.assertEqual(result['reason'], 'cache_miss')

def test_invalidate_token_cache(self):
"""Test cache invalidation."""
warm_token_cache(self.token)

# Verify cached
self.assertIsNotNone(get_cached_token(str(self.token.token)))

# Invalidate
invalidate_token_cache(str(self.token.token))

# Verify removed
self.assertIsNone(get_cached_token(str(self.token.token)))

12. Deployment Configuration

12.1 Django Settings

# backend/config/settings.py

# Token configuration
DEFAULT_TOKEN_TTL_DAYS = 90
DEFAULT_GRACE_PERIOD_DAYS = 7
DEFAULT_AUTO_RENEWAL = False
DEFAULT_MAX_REQUESTS_PER_MINUTE = 100
DEFAULT_MAX_CONCURRENT_SESSIONS = 3

# Redis configuration
REDIS_SENTINELS = [
('redis-sentinel-1.internal', 26379),
('redis-sentinel-2.internal', 26379),
('redis-sentinel-3.internal', 26379),
]
REDIS_SENTINEL_PASSWORD = env('REDIS_SENTINEL_PASSWORD')
REDIS_PASSWORD = env('REDIS_PASSWORD')
REDIS_TOKEN_DB = 1 # Dedicated DB for tokens

# Cache configuration
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
f'redis://{host}:{port}/0'
for host, port in REDIS_SENTINELS
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.SentinelClient',
'SENTINEL_KWARGS': {
'password': REDIS_SENTINEL_PASSWORD
},
'PASSWORD': REDIS_PASSWORD,
'MASTER_NAME': 'mymaster',
'CONNECTION_POOL_KWARGS': {
'max_connections': 50
}
}
}
}

# Middleware
MIDDLEWARE = [
# ... other middleware
'access_control.middleware.SecureTokenMiddleware',
'access_control.middleware.TokenBruteForceProtection',
]

# Celery Beat schedule
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
'cleanup-expired-tokens': {
'task': 'access_control.tasks.cleanup_expired_tokens_task',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
}

12.2 Kubernetes Configuration

# k8s/deployments/django-backend.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
name: django-backend
spec:
replicas: 3
template:
spec:
containers:
- name: django
image: django-backend:latest
env:
- name: DEFAULT_TOKEN_TTL_DAYS
value: "90"
- name: REDIS_SENTINEL_PASSWORD
valueFrom:
secretKeyRef:
name: redis-credentials
key: sentinel-password
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-credentials
key: password
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"

12.3 Monitoring and Alerts

# backend/access_control/monitoring.py

from prometheus_client import Counter, Histogram, Gauge

# Metrics
token_validations = Counter(
'token_validations_total',
'Total token validation attempts',
['project_id', 'result']
)

token_validation_duration = Histogram(
'token_validation_duration_seconds',
'Token validation duration',
['cache_hit']
)

active_tokens = Gauge(
'active_tokens',
'Number of active tokens',
['project_id']
)

# Usage
def record_validation_metrics(project_id, result, duration, cache_hit):
"""Record token validation metrics."""
token_validations.labels(
project_id=project_id,
result=result
).inc()

token_validation_duration.labels(
cache_hit=cache_hit
).observe(duration)

Appendix A: Configuration Examples

Example 1: High-Security Project

ProjectTokenConfig.objects.create(
project_id='clinical-trials-sensitive',
token_ttl_days=30, # Short TTL
grace_period_days=0, # No grace period
auto_renewal_enabled=False,
max_requests_per_minute=50, # Strict rate limit
max_concurrent_sessions=1, # Single session only
require_ip_whitelist=True, # Mandatory IP restriction
allowed_document_categories=['clinical-trials']
)

Example 2: Partner Collaboration Project

ProjectTokenConfig.objects.create(
project_id='external-partner-collab',
token_ttl_days=180, # Long TTL
grace_period_days=14, # Extended grace
auto_renewal_enabled=True,
renewal_threshold_days=30,
max_renewals=5,
max_requests_per_minute=200, # Generous limit
max_concurrent_sessions=10, # Multiple users
require_ip_whitelist=False,
allowed_document_categories=[
'regulatory-documents',
'quality-records',
'training-materials'
]
)

Appendix B: Migration Checklist

Pre-Deployment:

  • Run migrations: python manage.py migrate access_control
  • Create default ProjectTokenConfig for existing projects
  • Verify Redis Sentinel connectivity
  • Test cache warming on staging environment

Deployment:

  • Deploy Django backend with new models
  • Deploy Celery workers for background tasks
  • Configure Celery Beat schedule for cleanup
  • Enable monitoring dashboards

Post-Deployment:

  • Verify token creation flow works end-to-end
  • Test cache hit rate (target: >95%)
  • Monitor validation latency (target: <2ms cache hit)
  • Run archival command on old test data

Document Revision History

VersionDateAuthorChanges
1.0.02026-02-16Claude (Opus 4.6)Initial comprehensive design document

End of Document