Skip to main content

Token Revocation & Admin Controls Design

Classification: Internal — Engineering Date: 2026-02-16 Status: Proposed Related ADRs: ADR-196 (NDA-Gated Conditional Access)


1. Executive Summary

This document specifies the complete architecture for token revocation, administrative controls, and self-service token management for the BIO-QMS NDA-gated access control system. The design ensures immediate token invalidation (<5s propagation), comprehensive admin tooling, bulk operations support, and compliant audit trails meeting 21 CFR Part 11 requirements.

Core Requirements:

  • Immediate revocation: Redis cache invalidation + database update in single transaction
  • Admin panel: Token management, NDA lifecycle, access log viewer
  • Bulk operations: Revoke by company, project, or custom filter
  • Self-service: Token holders can view, request extension, or self-revoke
  • Audit trail: Every action logged with performer, reason, timestamp

Non-Goals:

  • Token generation/issuance (covered in ADR-196 core spec)
  • DocuSign NDA signature flow (external system integration)
  • Document access enforcement (handled by document service middleware)

2. System Architecture

2.1 Component Overview

┌─────────────────────────────────────────────────────────────┐
│ Admin Web UI (React) │
│ ┌────────────┬──────────────┬────────────┬───────────────┐│
│ │ Token Mgmt │ NDA Mgmt │ Access Log │ Self-Service ││
│ │ Dashboard │ Dashboard │ Viewer │ Portal ││
│ └────────────┴──────────────┴────────────┴───────────────┘│
└───────────────────────────┬─────────────────────────────────┘
│ HTTPS/REST
┌───────────────────────────▼─────────────────────────────────┐
│ Admin API Service (Django/DRF) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ ViewSets: TokenAdmin, NDAAdmin, AuditLog, BulkOps │ │
│ └────────────────────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Permissions: IsAdmin, IsSuperAdmin, IsProjectAdmin │ │
│ └────────────────────────────────────────────────────────┘ │
└───────────┬─────────────────┬────────────────────┬──────────┘
│ │ │
▼ ▼ ▼
┌───────────────────┐ ┌──────────────┐ ┌─────────────────────┐
│ Redis Cache │ │ PostgreSQL │ │ Celery Task Queue │
│ ┌─────────────┐ │ │ ┌──────────┐ │ │ ┌────────────────┐ │
│ │ dvt:{uuid} │ │ │ │ dvt_ │ │ │ │ cleanup_ │ │
│ │ blacklist │ │ │ │ tokens │ │ │ │ expired_tokens │ │
│ │ │ │ │ │ audit_log│ │ │ │ send_ │ │
│ └─────────────┘ │ │ └──────────┘ │ │ │ notifications │ │
└───────────────────┘ └──────────────┘ │ └────────────────┘ │
└─────────────────────┘


┌─────────────────────┐
│ Notification Service│
│ ┌─────────────────┐ │
│ │ Email (SES) │ │
│ │ Slack Webhooks │ │
│ │ Custom Webhooks │ │
│ └─────────────────┘ │
└─────────────────────┘

2.2 Data Flow: Token Revocation

Admin clicks "Revoke Token"

├──> Admin API: POST /api/v1/admin/tokens/{id}/revoke
│ {
│ "reason": "NDA termination - employee departure",
│ "cascade_nda": false
│ }


┌────────────────────────────────────────────────────┐
│ 1. Permission Check: IsAdmin + tenant scope │
│ 2. Atomic DB Transaction: │
│ - UPDATE dvt_tokens SET │
│ revoked_at = NOW(), │
│ revoked_by = request.user.id, │
│ revocation_reason = $reason │
│ - INSERT INTO audit_log (...) │
│ 3. Redis Invalidation: │
│ - DEL dvt:{token_uuid} │
│ - SADD blacklist:revoked {token_uuid} │
│ - EXPIRE blacklist:revoked {expiry_seconds} │
│ 4. Emit Event: token.revoked │
└────────────────────────────────────────────────────┘

├──> Celery Task: send_revocation_notification
│ ├──> Email to token holder
│ ├──> Slack webhook to #access-control channel
│ └──> Custom webhook (if configured)

└──> Response 200 OK
{
"status": "revoked",
"token_id": "dvt_abc123...",
"revoked_at": "2026-02-16T10:30:00Z",
"propagation_time_ms": 1234
}

2.3 Data Flow: Scheduled Cleanup

Celery Beat: Every 1 hour


cleanup_expired_tokens() task

├──> SELECT id, token_uuid FROM dvt_tokens
│ WHERE expires_at < NOW() AND revoked_at IS NULL

├──> Batch UPDATE (chunks of 1000):
│ UPDATE dvt_tokens SET
│ revoked_at = NOW(),
│ revoked_by = 'SYSTEM',
│ revocation_reason = 'Automatic expiry'
│ WHERE id IN (...)

├──> Redis cleanup:
│ FOR EACH expired_token:
│ DEL dvt:{token_uuid}
│ SADD blacklist:revoked {token_uuid}

├──> Archive to cold storage (if expiry > 90 days ago):
│ INSERT INTO dvt_tokens_archive SELECT * FROM dvt_tokens
│ WHERE revoked_at < NOW() - INTERVAL '90 days'
│ DELETE FROM dvt_tokens WHERE id IN (...)

└──> Metrics emission:
tokens_expired_total{tenant}: count
tokens_archived_total{tenant}: count

3. Database Schema

3.1 Core Token Model (Extended from ADR-196)

# apps/access_control/models.py

from django.db import models
from django.contrib.auth import get_user_model
from django.utils import timezone
import uuid

User = get_user_model()

class DocumentVerificationToken(models.Model):
"""Digital Verification Token for NDA-gated document access."""

# Identity
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
token_uuid = models.UUIDField(unique=True, default=uuid.uuid4, db_index=True)

# Multi-tenancy
tenant_id = models.CharField(max_length=191, db_index=True)

# Token metadata
project_id = models.CharField(max_length=191, db_index=True)
project_name = models.CharField(max_length=255)
company_name = models.CharField(max_length=255, db_index=True)
recipient_email = models.EmailField(db_index=True)
recipient_name = models.CharField(max_length=255)

# NDA linkage
nda_envelope_id = models.CharField(max_length=191, unique=True, db_index=True,
help_text="DocuSign envelope ID")
nda_signed_at = models.DateTimeField(null=True, blank=True)
nda_signer_email = models.EmailField()

# Token lifecycle
issued_at = models.DateTimeField(default=timezone.now)
expires_at = models.DateTimeField(db_index=True)
last_used_at = models.DateTimeField(null=True, blank=True)
usage_count = models.IntegerField(default=0)

# Revocation
revoked_at = models.DateTimeField(null=True, blank=True, db_index=True)
revoked_by = models.CharField(max_length=191, null=True, blank=True,
help_text="User ID or 'SYSTEM' for auto-expiry")
revocation_reason = models.TextField(null=True, blank=True)

# Access scope
allowed_documents = models.JSONField(
default=list,
help_text="List of document IDs or '*' for project-wide"
)

# Rate limiting
rate_limit_per_hour = models.IntegerField(default=100)

# Compliance
compliance_metadata = models.JSONField(
default=dict,
help_text="GxP metadata: validation_status, risk_level, etc."
)

# Versioning
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
version = models.IntegerField(default=1)

class Meta:
db_table = 'dvt_tokens'
indexes = [
models.Index(fields=['tenant_id', 'project_id']),
models.Index(fields=['tenant_id', 'company_name']),
models.Index(fields=['tenant_id', 'expires_at']),
models.Index(fields=['tenant_id', 'revoked_at']),
]
verbose_name = 'Document Verification Token'
verbose_name_plural = 'Document Verification Tokens'

def __str__(self):
return f"DVT {self.token_uuid.hex[:8]} - {self.recipient_email}"

@property
def is_valid(self):
"""Check if token is currently valid."""
now = timezone.now()
return (
self.revoked_at is None and
self.expires_at > now and
self.nda_signed_at is not None
)

@property
def status(self):
"""Human-readable status."""
if self.revoked_at:
return 'revoked'
elif self.expires_at < timezone.now():
return 'expired'
elif not self.nda_signed_at:
return 'pending_nda_signature'
else:
return 'active'


class TokenAccessLog(models.Model):
"""Audit log for token usage (21 CFR Part 11 compliance)."""

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
tenant_id = models.CharField(max_length=191, db_index=True)

# Reference
token = models.ForeignKey(
DocumentVerificationToken,
on_delete=models.CASCADE,
related_name='access_logs'
)

# Action details
action_type = models.CharField(
max_length=50,
choices=[
('VALIDATE', 'Token Validation'),
('DOCUMENT_ACCESS', 'Document Access'),
('REVOKE', 'Token Revocation'),
('EXTEND', 'Expiry Extension'),
('SELF_REVOKE', 'Self-Service Revocation'),
],
db_index=True
)

# Context
document_id = models.CharField(max_length=191, null=True, blank=True)
document_name = models.CharField(max_length=255, null=True, blank=True)

# Request metadata
ip_address = models.GenericIPAddressField()
user_agent = models.TextField()
request_id = models.CharField(max_length=191, db_index=True,
help_text="Correlation ID for distributed tracing")

# Result
success = models.BooleanField(default=True)
error_message = models.TextField(null=True, blank=True)

# Timing
timestamp = models.DateTimeField(default=timezone.now, db_index=True)

# Part 11 compliance: immutable audit trail
# Enforced via DB trigger (see migration)

class Meta:
db_table = 'dvt_access_log'
indexes = [
models.Index(fields=['tenant_id', 'timestamp']),
models.Index(fields=['tenant_id', 'token', 'action_type']),
models.Index(fields=['request_id']),
]
ordering = ['-timestamp']
verbose_name = 'Token Access Log'
verbose_name_plural = 'Token Access Logs'


class NDARecord(models.Model):
"""NDA metadata and lifecycle tracking."""

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
tenant_id = models.CharField(max_length=191, db_index=True)

# DocuSign integration
envelope_id = models.CharField(max_length=191, unique=True, db_index=True)
envelope_status = models.CharField(
max_length=50,
choices=[
('sent', 'Sent'),
('delivered', 'Delivered'),
('completed', 'Completed'),
('declined', 'Declined'),
('voided', 'Voided'),
]
)

# NDA metadata
project_id = models.CharField(max_length=191, db_index=True)
project_name = models.CharField(max_length=255)
company_name = models.CharField(max_length=255, db_index=True)

# Signer details
signer_email = models.EmailField(db_index=True)
signer_name = models.CharField(max_length=255)

# Lifecycle
sent_at = models.DateTimeField()
signed_at = models.DateTimeField(null=True, blank=True, db_index=True)
expires_at = models.DateTimeField(db_index=True)

# Revocation
revoked_at = models.DateTimeField(null=True, blank=True)
revoked_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='revoked_ndas'
)
revocation_reason = models.TextField(null=True, blank=True)

# Document storage
signed_pdf_url = models.URLField(null=True, blank=True,
help_text="GCS signed URL or DocuSign URI")

# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)

class Meta:
db_table = 'dvt_nda_records'
indexes = [
models.Index(fields=['tenant_id', 'project_id']),
models.Index(fields=['tenant_id', 'company_name']),
models.Index(fields=['tenant_id', 'signed_at']),
]
verbose_name = 'NDA Record'
verbose_name_plural = 'NDA Records'

def __str__(self):
return f"NDA {self.envelope_id[:8]} - {self.signer_email}"

@property
def is_active(self):
"""Check if NDA is currently active."""
now = timezone.now()
return (
self.revoked_at is None and
self.signed_at is not None and
self.expires_at > now
)


class AdminAction(models.Model):
"""Audit log for all admin actions (separate from access log)."""

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
tenant_id = models.CharField(max_length=191, db_index=True)

# Actor
performer = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
related_name='admin_actions'
)
performer_email = models.EmailField() # Denormalized for retention

# Action
action_type = models.CharField(
max_length=50,
choices=[
('REVOKE_TOKEN', 'Revoke Token'),
('EXTEND_TOKEN', 'Extend Token Expiry'),
('REVOKE_NDA', 'Revoke NDA'),
('BULK_REVOKE', 'Bulk Token Revocation'),
('GENERATE_REPORT', 'Generate Compliance Report'),
('EMERGENCY_LOCKOUT', 'Emergency Project Lockout'),
],
db_index=True
)

# Target
target_type = models.CharField(max_length=50,
help_text="'token', 'nda', 'project', 'company'")
target_id = models.CharField(max_length=191)
target_count = models.IntegerField(default=1,
help_text="Number of records affected (for bulk ops)")

# Details
reason = models.TextField()
metadata = models.JSONField(
default=dict,
help_text="Action-specific context (filters, parameters, etc.)"
)

# Context
ip_address = models.GenericIPAddressField()
user_agent = models.TextField()

# Timing
timestamp = models.DateTimeField(default=timezone.now, db_index=True)

class Meta:
db_table = 'dvt_admin_actions'
indexes = [
models.Index(fields=['tenant_id', 'timestamp']),
models.Index(fields=['tenant_id', 'action_type']),
models.Index(fields=['performer']),
]
ordering = ['-timestamp']
verbose_name = 'Admin Action'
verbose_name_plural = 'Admin Actions'

3.2 Database Migrations

# apps/access_control/migrations/0002_add_revocation_and_audit.py

from django.db import migrations

class Migration(migrations.Migration):

dependencies = [
('access_control', '0001_initial'),
]

operations = [
# Add revocation fields to existing token table
migrations.AddField(
model_name='documentverificationtoken',
name='revoked_at',
field=models.DateTimeField(null=True, blank=True, db_index=True),
),
migrations.AddField(
model_name='documentverificationtoken',
name='revoked_by',
field=models.CharField(max_length=191, null=True, blank=True),
),
migrations.AddField(
model_name='documentverificationtoken',
name='revocation_reason',
field=models.TextField(null=True, blank=True),
),

# Create immutability trigger for access log (Part 11 compliance)
migrations.RunSQL(
sql="""
CREATE OR REPLACE FUNCTION prevent_access_log_modification()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Access log records cannot be modified or deleted (21 CFR Part 11 §11.10(e))';
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER access_log_immutable
BEFORE UPDATE OR DELETE ON dvt_access_log
FOR EACH ROW EXECUTE FUNCTION prevent_access_log_modification();
""",
reverse_sql="""
DROP TRIGGER IF EXISTS access_log_immutable ON dvt_access_log;
DROP FUNCTION IF EXISTS prevent_access_log_modification();
"""
),

# Create immutability trigger for admin actions
migrations.RunSQL(
sql="""
CREATE TRIGGER admin_actions_immutable
BEFORE UPDATE OR DELETE ON dvt_admin_actions
FOR EACH ROW EXECUTE FUNCTION prevent_access_log_modification();
""",
reverse_sql="""
DROP TRIGGER IF EXISTS admin_actions_immutable ON dvt_admin_actions;
"""
),
]

4. Immediate Revocation Mechanism

4.1 Revocation Service

# apps/access_control/services/revocation.py

from django.db import transaction
from django.utils import timezone
from django.core.cache import cache
from datetime import timedelta
import logging

from apps.access_control.models import (
DocumentVerificationToken,
AdminAction,
TokenAccessLog
)
from apps.access_control.tasks import send_revocation_notification

logger = logging.getLogger(__name__)

class TokenRevocationService:
"""Handles immediate token revocation with Redis cache invalidation."""

REDIS_KEY_PREFIX = 'dvt'
BLACKLIST_KEY = 'blacklist:revoked'
BLACKLIST_TTL = 60 * 60 * 24 * 90 # 90 days

@classmethod
@transaction.atomic
def revoke_token(cls, token_id, performer, reason, cascade_nda=False):
"""
Revoke a single token with immediate Redis invalidation.

Args:
token_id: Token UUID or ID
performer: User instance performing revocation
reason: Textual reason for revocation
cascade_nda: If True, revoke all tokens linked to same NDA

Returns:
dict: {
'status': 'revoked',
'token_id': str,
'revoked_at': datetime,
'propagation_time_ms': int,
'cascade_count': int
}

Raises:
DocumentVerificationToken.DoesNotExist: Token not found
PermissionDenied: Performer lacks permission
"""
start_time = timezone.now()

# Fetch token with row lock
token = DocumentVerificationToken.objects.select_for_update().get(
id=token_id
)

# Idempotency: already revoked
if token.revoked_at:
logger.info(f"Token {token.token_uuid} already revoked at {token.revoked_at}")
return {
'status': 'already_revoked',
'token_id': str(token.id),
'revoked_at': token.revoked_at,
'propagation_time_ms': 0,
'cascade_count': 0
}

# Determine targets
tokens_to_revoke = [token]
if cascade_nda and token.nda_envelope_id:
related_tokens = DocumentVerificationToken.objects.select_for_update().filter(
nda_envelope_id=token.nda_envelope_id,
revoked_at__isnull=True
).exclude(id=token.id)
tokens_to_revoke.extend(related_tokens)

# Database update
revoked_at = timezone.now()
for t in tokens_to_revoke:
t.revoked_at = revoked_at
t.revoked_by = str(performer.id)
t.revocation_reason = reason
t.save(update_fields=['revoked_at', 'revoked_by', 'revocation_reason', 'updated_at'])

# Redis invalidation
cls._invalidate_cache(t.token_uuid)

# Access log
TokenAccessLog.objects.create(
tenant_id=t.tenant_id,
token=t,
action_type='REVOKE',
ip_address=performer.last_login_ip or '0.0.0.0',
user_agent=f'Admin:{performer.email}',
request_id=f'admin-revoke-{t.id}',
success=True
)

# Admin action audit
AdminAction.objects.create(
tenant_id=token.tenant_id,
performer=performer,
performer_email=performer.email,
action_type='REVOKE_TOKEN',
target_type='token',
target_id=str(token.id),
target_count=len(tokens_to_revoke),
reason=reason,
metadata={
'token_uuid': str(token.token_uuid),
'cascade_nda': cascade_nda,
'nda_envelope_id': token.nda_envelope_id if cascade_nda else None,
'cascade_count': len(tokens_to_revoke) - 1
},
ip_address=performer.last_login_ip or '0.0.0.0',
user_agent='Admin Panel'
)

# Async notification
for t in tokens_to_revoke:
send_revocation_notification.delay(
token_id=str(t.id),
reason=reason
)

propagation_time_ms = int((timezone.now() - start_time).total_seconds() * 1000)

logger.info(
f"Revoked {len(tokens_to_revoke)} token(s) in {propagation_time_ms}ms "
f"(performer: {performer.email}, reason: {reason[:50]}...)"
)

return {
'status': 'revoked',
'token_id': str(token.id),
'revoked_at': revoked_at,
'propagation_time_ms': propagation_time_ms,
'cascade_count': len(tokens_to_revoke) - 1
}

@classmethod
def _invalidate_cache(cls, token_uuid):
"""
Invalidate Redis cache for a token.

1. DELETE dvt:{uuid}
2. SADD blacklist:revoked {uuid}
3. EXPIRE blacklist:revoked {ttl}
"""
cache_key = f'{cls.REDIS_KEY_PREFIX}:{token_uuid}'

# Delete cached token data
cache.delete(cache_key)

# Add to blacklist (Redis SET)
# This uses Django cache backend; for direct Redis:
# redis_client.sadd(cls.BLACKLIST_KEY, str(token_uuid))
# redis_client.expire(cls.BLACKLIST_KEY, cls.BLACKLIST_TTL)

# Via django-redis:
from django_redis import get_redis_connection
redis_conn = get_redis_connection("default")
redis_conn.sadd(cls.BLACKLIST_KEY, str(token_uuid))
redis_conn.expire(cls.BLACKLIST_KEY, cls.BLACKLIST_TTL)

logger.debug(f"Invalidated cache for token {token_uuid}")

@classmethod
@transaction.atomic
def bulk_revoke(cls, performer, reason, filters):
"""
Bulk revocation with filter support.

Args:
performer: User performing action
reason: Reason for bulk revocation
filters: dict of filter criteria:
- company_name: str
- project_id: str
- expires_before: datetime
- custom_filter: Q object

Returns:
dict: {
'status': 'revoked',
'count': int,
'propagation_time_ms': int
}
"""
start_time = timezone.now()

# Build queryset
qs = DocumentVerificationToken.objects.filter(
tenant_id=performer.tenant_id,
revoked_at__isnull=True
)

if 'company_name' in filters:
qs = qs.filter(company_name=filters['company_name'])
if 'project_id' in filters:
qs = qs.filter(project_id=filters['project_id'])
if 'expires_before' in filters:
qs = qs.filter(expires_at__lt=filters['expires_before'])
if 'custom_filter' in filters:
qs = qs.filter(filters['custom_filter'])

# Batch update (chunks of 1000)
token_ids = list(qs.values_list('id', flat=True))
total_count = len(token_ids)

logger.info(f"Bulk revoking {total_count} tokens (filters: {filters})")

revoked_at = timezone.now()
for i in range(0, total_count, 1000):
chunk_ids = token_ids[i:i+1000]
tokens = DocumentVerificationToken.objects.select_for_update().filter(
id__in=chunk_ids
)

for token in tokens:
token.revoked_at = revoked_at
token.revoked_by = str(performer.id)
token.revocation_reason = f"[BULK] {reason}"
token.save(update_fields=['revoked_at', 'revoked_by', 'revocation_reason'])

# Redis invalidation
cls._invalidate_cache(token.token_uuid)

# Access log
TokenAccessLog.objects.create(
tenant_id=token.tenant_id,
token=token,
action_type='REVOKE',
ip_address=performer.last_login_ip or '0.0.0.0',
user_agent=f'BulkOp:{performer.email}',
request_id=f'bulk-revoke-{i//1000}',
success=True
)

# Admin action
AdminAction.objects.create(
tenant_id=performer.tenant_id,
performer=performer,
performer_email=performer.email,
action_type='BULK_REVOKE',
target_type='token',
target_id='bulk',
target_count=total_count,
reason=reason,
metadata={'filters': filters},
ip_address=performer.last_login_ip or '0.0.0.0',
user_agent='Admin Panel'
)

propagation_time_ms = int((timezone.now() - start_time).total_seconds() * 1000)

return {
'status': 'revoked',
'count': total_count,
'propagation_time_ms': propagation_time_ms
}

@classmethod
@transaction.atomic
def extend_expiry(cls, token_id, performer, new_expiry_date, reason):
"""
Extend token expiry with audit trail.

Args:
token_id: Token UUID or ID
performer: User performing extension
new_expiry_date: New expiry datetime
reason: Reason for extension

Returns:
dict: Token data with new expiry
"""
token = DocumentVerificationToken.objects.select_for_update().get(id=token_id)

old_expiry = token.expires_at
token.expires_at = new_expiry_date
token.save(update_fields=['expires_at', 'updated_at'])

# Invalidate cache to force re-fetch with new expiry
cls._invalidate_cache(token.token_uuid)

# Admin action
AdminAction.objects.create(
tenant_id=token.tenant_id,
performer=performer,
performer_email=performer.email,
action_type='EXTEND_TOKEN',
target_type='token',
target_id=str(token.id),
reason=reason,
metadata={
'old_expiry': old_expiry.isoformat(),
'new_expiry': new_expiry_date.isoformat()
},
ip_address=performer.last_login_ip or '0.0.0.0',
user_agent='Admin Panel'
)

logger.info(f"Extended token {token.token_uuid} expiry from {old_expiry} to {new_expiry_date}")

return {
'status': 'extended',
'token_id': str(token.id),
'old_expiry': old_expiry,
'new_expiry': new_expiry_date
}

4.2 Token Validation with Blacklist Check

# apps/access_control/middleware.py

from django.core.cache import cache
from django.http import JsonResponse
from django.utils import timezone
from django_redis import get_redis_connection

from apps.access_control.models import DocumentVerificationToken, TokenAccessLog

class TokenValidationMiddleware:
"""Middleware to validate DVT tokens with Redis cache and blacklist check."""

def __init__(self, get_response):
self.get_response = get_response
self.redis_conn = get_redis_connection("default")

def __call__(self, request):
# Only process document access endpoints
if not request.path.startswith('/api/v1/documents/'):
return self.get_response(request)

# Extract token from header
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Missing or invalid authorization header'}, status=401)

token_uuid = auth_header[7:] # Remove 'Bearer '

# 1. Check blacklist first (fastest rejection)
if self.redis_conn.sismember('blacklist:revoked', token_uuid):
self._log_access(request, token_uuid, 'VALIDATE', False, 'Token revoked')
return JsonResponse({'error': 'Token has been revoked'}, status=403)

# 2. Check cache
cache_key = f'dvt:{token_uuid}'
cached_token = cache.get(cache_key)

if cached_token:
token_data = cached_token
else:
# 3. Database lookup
try:
token = DocumentVerificationToken.objects.get(token_uuid=token_uuid)
except DocumentVerificationToken.DoesNotExist:
self._log_access(request, token_uuid, 'VALIDATE', False, 'Token not found')
return JsonResponse({'error': 'Invalid token'}, status=401)

# Check validity
if not token.is_valid:
reason = 'Token expired' if token.expires_at < timezone.now() else 'Token revoked'
self._log_access(request, token_uuid, 'VALIDATE', False, reason)
return JsonResponse({'error': reason}, status=403)

# Cache for 5 minutes
token_data = {
'id': str(token.id),
'tenant_id': token.tenant_id,
'project_id': token.project_id,
'recipient_email': token.recipient_email,
'allowed_documents': token.allowed_documents,
'expires_at': token.expires_at.isoformat()
}
cache.set(cache_key, token_data, timeout=300)

# Attach token data to request
request.dvt_token = token_data

# Log successful validation
self._log_access(request, token_uuid, 'VALIDATE', True)

return self.get_response(request)

def _log_access(self, request, token_uuid, action_type, success, error_message=None):
"""Log access attempt to audit trail."""
TokenAccessLog.objects.create(
tenant_id=request.dvt_token.get('tenant_id', 'unknown') if hasattr(request, 'dvt_token') else 'unknown',
token_id=request.dvt_token.get('id') if hasattr(request, 'dvt_token') else None,
action_type=action_type,
ip_address=request.META.get('REMOTE_ADDR', '0.0.0.0'),
user_agent=request.META.get('HTTP_USER_AGENT', 'Unknown'),
request_id=request.META.get('HTTP_X_REQUEST_ID', 'unknown'),
success=success,
error_message=error_message
)

5. Scheduled Cleanup Tasks

5.1 Celery Tasks

# apps/access_control/tasks.py

from celery import shared_task
from django.db import transaction
from django.utils import timezone
from datetime import timedelta
import logging

from apps.access_control.models import (
DocumentVerificationToken,
TokenAccessLog,
AdminAction
)
from apps.access_control.services.revocation import TokenRevocationService
from apps.access_control.services.notifications import NotificationService

logger = logging.getLogger(__name__)

@shared_task(name='access_control.cleanup_expired_tokens')
def cleanup_expired_tokens():
"""
Hourly task to revoke expired tokens and cleanup old records.

Steps:
1. Find all tokens with expires_at < NOW() and revoked_at IS NULL
2. Batch revoke (chunks of 1000)
3. Invalidate Redis cache for each
4. Archive tokens expired >90 days ago
"""
logger.info("Starting expired token cleanup")

now = timezone.now()

# Find expired tokens
expired_tokens = DocumentVerificationToken.objects.filter(
expires_at__lt=now,
revoked_at__isnull=True
)

total_count = expired_tokens.count()
if total_count == 0:
logger.info("No expired tokens to clean up")
return {'status': 'no_action', 'count': 0}

logger.info(f"Found {total_count} expired tokens to revoke")

# Batch revoke
token_ids = list(expired_tokens.values_list('id', flat=True))
revoked_count = 0

for i in range(0, total_count, 1000):
chunk_ids = token_ids[i:i+1000]

with transaction.atomic():
tokens = DocumentVerificationToken.objects.select_for_update().filter(
id__in=chunk_ids,
revoked_at__isnull=True # Re-check in case concurrent revocation
)

for token in tokens:
token.revoked_at = now
token.revoked_by = 'SYSTEM'
token.revocation_reason = 'Automatic expiry'
token.save(update_fields=['revoked_at', 'revoked_by', 'revocation_reason'])

# Redis invalidation
TokenRevocationService._invalidate_cache(token.token_uuid)

revoked_count += 1

logger.info(f"Revoked chunk {i//1000 + 1}: {len(tokens)} tokens")

# Archive old tokens (>90 days expired)
archive_cutoff = now - timedelta(days=90)
archivable_tokens = DocumentVerificationToken.objects.filter(
revoked_at__lt=archive_cutoff
)

archive_count = archivable_tokens.count()
if archive_count > 0:
# Move to archive table (separate table for cold storage)
# This is a placeholder - actual implementation depends on archival strategy
logger.info(f"Archiving {archive_count} tokens (revoked >90 days ago)")
# archivable_tokens.delete() # DELETE after archival

logger.info(
f"Cleanup complete: revoked {revoked_count} expired tokens, "
f"archived {archive_count} old tokens"
)

return {
'status': 'success',
'revoked_count': revoked_count,
'archived_count': archive_count
}


@shared_task(name='access_control.send_expiry_warnings')
def send_expiry_warnings():
"""
Daily task to send expiry warning emails.

Sends warnings at:
- 7 days before expiry
- 1 day before expiry
"""
now = timezone.now()
seven_days = now + timedelta(days=7)
one_day = now + timedelta(days=1)

# 7-day warning
tokens_7d = DocumentVerificationToken.objects.filter(
expires_at__gte=now,
expires_at__lte=seven_days,
revoked_at__isnull=True,
# Check if warning already sent (add field or check sent_notifications table)
)

for token in tokens_7d:
NotificationService.send_expiry_warning(
token=token,
days_remaining=7
)

# 1-day warning
tokens_1d = DocumentVerificationToken.objects.filter(
expires_at__gte=now,
expires_at__lte=one_day,
revoked_at__isnull=True
)

for token in tokens_1d:
NotificationService.send_expiry_warning(
token=token,
days_remaining=1
)

logger.info(
f"Sent expiry warnings: {tokens_7d.count()} (7 days), "
f"{tokens_1d.count()} (1 day)"
)

return {
'status': 'success',
'warnings_7d': tokens_7d.count(),
'warnings_1d': tokens_1d.count()
}


@shared_task(name='access_control.send_revocation_notification')
def send_revocation_notification(token_id, reason):
"""
Send email and webhook notifications for token revocation.

Args:
token_id: Token UUID
reason: Revocation reason
"""
try:
token = DocumentVerificationToken.objects.get(id=token_id)
except DocumentVerificationToken.DoesNotExist:
logger.error(f"Token {token_id} not found for revocation notification")
return {'status': 'error', 'message': 'Token not found'}

# Email notification
NotificationService.send_revocation_email(token, reason)

# Slack webhook
NotificationService.send_slack_notification(
channel='#access-control',
message=f"Token revoked: {token.recipient_email} ({token.project_name}) - {reason}"
)

# Custom webhook (if configured)
if token.compliance_metadata.get('webhook_url'):
NotificationService.send_webhook(
url=token.compliance_metadata['webhook_url'],
payload={
'event': 'token.revoked',
'token_id': str(token.id),
'recipient_email': token.recipient_email,
'project_id': token.project_id,
'reason': reason,
'revoked_at': token.revoked_at.isoformat()
}
)

return {'status': 'sent'}

5.2 Celery Beat Schedule

# config/celery.py

from celery import Celery
from celery.schedules import crontab

app = Celery('bio_qms')

app.conf.beat_schedule = {
'cleanup-expired-tokens': {
'task': 'access_control.cleanup_expired_tokens',
'schedule': crontab(minute=0), # Every hour
},
'send-expiry-warnings': {
'task': 'access_control.send_expiry_warnings',
'schedule': crontab(hour=9, minute=0), # Daily at 9 AM
},
}

app.conf.timezone = 'UTC'

6. Admin API Endpoints

6.1 Token Management ViewSet

# apps/access_control/api/views.py

from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django.db.models import Q, Count
from django.utils import timezone
from datetime import timedelta

from apps.access_control.models import (
DocumentVerificationToken,
NDARecord,
TokenAccessLog,
AdminAction
)
from apps.access_control.api.serializers import (
TokenAdminSerializer,
NDAAdminSerializer,
AccessLogSerializer,
AdminActionSerializer
)
from apps.access_control.api.permissions import IsAdmin, IsSuperAdmin
from apps.access_control.services.revocation import TokenRevocationService
from apps.access_control.services.reporting import ComplianceReportService


class TokenAdminViewSet(viewsets.ReadOnlyModelViewSet):
"""
Admin API for token management.

Endpoints:
- GET /api/v1/admin/tokens/ - List tokens with filters
- GET /api/v1/admin/tokens/{id}/ - Token detail
- POST /api/v1/admin/tokens/{id}/revoke/ - Revoke single token
- POST /api/v1/admin/tokens/bulk-revoke/ - Bulk revocation
- POST /api/v1/admin/tokens/{id}/extend/ - Extend expiry
- GET /api/v1/admin/tokens/stats/ - Dashboard statistics
"""

queryset = DocumentVerificationToken.objects.all()
serializer_class = TokenAdminSerializer
permission_classes = [IsAuthenticated, IsAdmin]

def get_queryset(self):
"""Filter by tenant and apply search/filter parameters."""
qs = super().get_queryset().filter(tenant_id=self.request.user.tenant_id)

# Filters
status_filter = self.request.query_params.get('status')
if status_filter == 'active':
qs = qs.filter(revoked_at__isnull=True, expires_at__gt=timezone.now())
elif status_filter == 'revoked':
qs = qs.filter(revoked_at__isnull=False)
elif status_filter == 'expired':
qs = qs.filter(revoked_at__isnull=True, expires_at__lte=timezone.now())

project_id = self.request.query_params.get('project_id')
if project_id:
qs = qs.filter(project_id=project_id)

company_name = self.request.query_params.get('company_name')
if company_name:
qs = qs.filter(company_name__icontains=company_name)

recipient_email = self.request.query_params.get('email')
if recipient_email:
qs = qs.filter(recipient_email__icontains=recipient_email)

# Search
search = self.request.query_params.get('search')
if search:
qs = qs.filter(
Q(recipient_email__icontains=search) |
Q(recipient_name__icontains=search) |
Q(company_name__icontains=search) |
Q(project_name__icontains=search)
)

return qs.order_by('-created_at')

@action(detail=True, methods=['post'])
def revoke(self, request, pk=None):
"""
Revoke a single token.

POST /api/v1/admin/tokens/{id}/revoke/
{
"reason": "NDA terminated - employee departure",
"cascade_nda": false
}
"""
token = self.get_object()
reason = request.data.get('reason', '').strip()
cascade_nda = request.data.get('cascade_nda', False)

if not reason:
return Response(
{'error': 'Reason is required'},
status=status.HTTP_400_BAD_REQUEST
)

try:
result = TokenRevocationService.revoke_token(
token_id=token.id,
performer=request.user,
reason=reason,
cascade_nda=cascade_nda
)
return Response(result, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)

@action(detail=False, methods=['post'], permission_classes=[IsAuthenticated, IsSuperAdmin])
def bulk_revoke(self, request):
"""
Bulk token revocation.

POST /api/v1/admin/tokens/bulk-revoke/
{
"reason": "Project decommissioned",
"filters": {
"project_id": "proj_abc123",
"company_name": "Acme Corp"
}
}
"""
reason = request.data.get('reason', '').strip()
filters = request.data.get('filters', {})

if not reason:
return Response(
{'error': 'Reason is required'},
status=status.HTTP_400_BAD_REQUEST
)

try:
result = TokenRevocationService.bulk_revoke(
performer=request.user,
reason=reason,
filters=filters
)
return Response(result, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)

@action(detail=True, methods=['post'])
def extend(self, request, pk=None):
"""
Extend token expiry.

POST /api/v1/admin/tokens/{id}/extend/
{
"new_expiry": "2026-12-31T23:59:59Z",
"reason": "NDA renewed for 1 year"
}
"""
token = self.get_object()
new_expiry_str = request.data.get('new_expiry')
reason = request.data.get('reason', '').strip()

if not new_expiry_str or not reason:
return Response(
{'error': 'new_expiry and reason are required'},
status=status.HTTP_400_BAD_REQUEST
)

try:
from dateutil import parser
new_expiry = parser.isoparse(new_expiry_str)
except ValueError:
return Response(
{'error': 'Invalid date format for new_expiry'},
status=status.HTTP_400_BAD_REQUEST
)

try:
result = TokenRevocationService.extend_expiry(
token_id=token.id,
performer=request.user,
new_expiry_date=new_expiry,
reason=reason
)
return Response(result, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)

@action(detail=False, methods=['get'])
def stats(self, request):
"""
Dashboard statistics.

GET /api/v1/admin/tokens/stats/
Returns:
{
"total_tokens": 1234,
"active_tokens": 456,
"expired_tokens": 678,
"revoked_tokens": 100,
"tokens_by_project": {...},
"tokens_by_company": {...},
"expiring_soon": 23
}
"""
tenant_id = request.user.tenant_id
now = timezone.now()

qs = DocumentVerificationToken.objects.filter(tenant_id=tenant_id)

stats = {
'total_tokens': qs.count(),
'active_tokens': qs.filter(
revoked_at__isnull=True,
expires_at__gt=now
).count(),
'expired_tokens': qs.filter(
revoked_at__isnull=True,
expires_at__lte=now
).count(),
'revoked_tokens': qs.filter(revoked_at__isnull=False).count(),
'expiring_soon': qs.filter(
revoked_at__isnull=True,
expires_at__gt=now,
expires_at__lte=now + timedelta(days=7)
).count(),
}

# Tokens by project
stats['tokens_by_project'] = dict(
qs.values('project_name').annotate(count=Count('id')).values_list('project_name', 'count')
)

# Tokens by company
stats['tokens_by_company'] = dict(
qs.values('company_name').annotate(count=Count('id')).values_list('company_name', 'count')
)

return Response(stats, status=status.HTTP_200_OK)


class NDAAdminViewSet(viewsets.ReadOnlyModelViewSet):
"""
Admin API for NDA management.

Endpoints:
- GET /api/v1/admin/ndas/ - List NDAs
- GET /api/v1/admin/ndas/{id}/ - NDA detail with linked tokens
- POST /api/v1/admin/ndas/{id}/revoke/ - Revoke NDA (cascades to tokens)
- POST /api/v1/admin/ndas/{id}/send-reminder/ - Send renewal reminder
"""

queryset = NDARecord.objects.all()
serializer_class = NDAAdminSerializer
permission_classes = [IsAuthenticated, IsAdmin]

def get_queryset(self):
qs = super().get_queryset().filter(tenant_id=self.request.user.tenant_id)

# Filters
status_filter = self.request.query_params.get('status')
if status_filter == 'active':
qs = qs.filter(
revoked_at__isnull=True,
signed_at__isnull=False,
expires_at__gt=timezone.now()
)
elif status_filter == 'expired':
qs = qs.filter(expires_at__lte=timezone.now())

return qs.order_by('-created_at')

@action(detail=True, methods=['post'], permission_classes=[IsAuthenticated, IsSuperAdmin])
def revoke(self, request, pk=None):
"""
Revoke NDA and cascade to all linked tokens.

POST /api/v1/admin/ndas/{id}/revoke/
{
"reason": "Company terminated relationship"
}
"""
nda = self.get_object()
reason = request.data.get('reason', '').strip()

if not reason:
return Response(
{'error': 'Reason is required'},
status=status.HTTP_400_BAD_REQUEST
)

# Revoke NDA
from django.db import transaction
with transaction.atomic():
nda.revoked_at = timezone.now()
nda.revoked_by = request.user
nda.revocation_reason = reason
nda.save()

# Cascade to all tokens
tokens = DocumentVerificationToken.objects.filter(
nda_envelope_id=nda.envelope_id,
revoked_at__isnull=True
)

cascade_count = 0
for token in tokens:
TokenRevocationService.revoke_token(
token_id=token.id,
performer=request.user,
reason=f"NDA revoked: {reason}",
cascade_nda=False
)
cascade_count += 1

return Response({
'status': 'revoked',
'nda_id': str(nda.id),
'cascade_token_count': cascade_count
}, status=status.HTTP_200_OK)


class AccessLogViewSet(viewsets.ReadOnlyModelViewSet):
"""
Admin API for access log viewer.

Endpoints:
- GET /api/v1/admin/audit-log/ - List access logs with filters
- GET /api/v1/admin/audit-log/export/ - Export logs as CSV/JSON
- GET /api/v1/admin/audit-log/stream/ - Real-time stream (SSE)
"""

queryset = TokenAccessLog.objects.all()
serializer_class = AccessLogSerializer
permission_classes = [IsAuthenticated, IsAdmin]

def get_queryset(self):
qs = super().get_queryset().filter(tenant_id=self.request.user.tenant_id)

# Date range filter
start_date = self.request.query_params.get('start_date')
end_date = self.request.query_params.get('end_date')
if start_date:
from dateutil import parser
qs = qs.filter(timestamp__gte=parser.isoparse(start_date))
if end_date:
from dateutil import parser
qs = qs.filter(timestamp__lte=parser.isoparse(end_date))

# Action type filter
action_type = self.request.query_params.get('action_type')
if action_type:
qs = qs.filter(action_type=action_type)

# Success/failure filter
success_filter = self.request.query_params.get('success')
if success_filter in ['true', 'false']:
qs = qs.filter(success=(success_filter == 'true'))

# Token filter
token_id = self.request.query_params.get('token_id')
if token_id:
qs = qs.filter(token_id=token_id)

return qs.order_by('-timestamp')

@action(detail=False, methods=['get'])
def export(self, request):
"""
Export access logs.

GET /api/v1/admin/audit-log/export/?format=csv&start_date=...&end_date=...
"""
export_format = request.query_params.get('format', 'json')
qs = self.get_queryset()

if export_format == 'csv':
import csv
from django.http import HttpResponse

response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="access_log.csv"'

writer = csv.writer(response)
writer.writerow([
'Timestamp', 'Token ID', 'Action Type', 'IP Address',
'Success', 'Error Message', 'Document ID'
])

for log in qs.iterator(chunk_size=1000):
writer.writerow([
log.timestamp.isoformat(),
str(log.token_id),
log.action_type,
log.ip_address,
log.success,
log.error_message or '',
log.document_id or ''
])

return response
else:
# JSON export
serializer = self.get_serializer(qs, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)


class ReportViewSet(viewsets.ViewSet):
"""
Admin API for compliance reporting.

Endpoints:
- POST /api/v1/admin/reports/generate/ - Generate compliance report
- GET /api/v1/admin/reports/{id}/ - Download generated report
"""

permission_classes = [IsAuthenticated, IsAdmin]

@action(detail=False, methods=['post'])
def generate(self, request):
"""
Generate compliance report.

POST /api/v1/admin/reports/generate/
{
"report_type": "audit_trail",
"start_date": "2026-01-01T00:00:00Z",
"end_date": "2026-12-31T23:59:59Z",
"filters": {
"project_id": "proj_abc123"
}
}
"""
report_type = request.data.get('report_type')
start_date = request.data.get('start_date')
end_date = request.data.get('end_date')
filters = request.data.get('filters', {})

if not all([report_type, start_date, end_date]):
return Response(
{'error': 'report_type, start_date, and end_date are required'},
status=status.HTTP_400_BAD_REQUEST
)

# Async report generation
from apps.access_control.tasks import generate_compliance_report
task = generate_compliance_report.delay(
tenant_id=request.user.tenant_id,
report_type=report_type,
start_date=start_date,
end_date=end_date,
filters=filters,
requester_id=str(request.user.id)
)

return Response({
'status': 'generating',
'task_id': task.id,
'message': 'Report generation started. You will receive an email when complete.'
}, status=status.HTTP_202_ACCEPTED)

6.2 Serializers

# apps/access_control/api/serializers.py

from rest_framework import serializers
from apps.access_control.models import (
DocumentVerificationToken,
NDARecord,
TokenAccessLog,
AdminAction
)

class TokenAdminSerializer(serializers.ModelSerializer):
status = serializers.CharField(read_only=True)
is_valid = serializers.BooleanField(read_only=True)

class Meta:
model = DocumentVerificationToken
fields = [
'id', 'token_uuid', 'project_name', 'company_name',
'recipient_email', 'recipient_name', 'nda_envelope_id',
'issued_at', 'expires_at', 'last_used_at', 'usage_count',
'revoked_at', 'revoked_by', 'revocation_reason',
'status', 'is_valid', 'allowed_documents', 'created_at'
]
read_only_fields = ['id', 'token_uuid', 'created_at']


class NDAAdminSerializer(serializers.ModelSerializer):
is_active = serializers.BooleanField(read_only=True)
linked_tokens_count = serializers.SerializerMethodField()

class Meta:
model = NDARecord
fields = [
'id', 'envelope_id', 'envelope_status', 'project_name',
'company_name', 'signer_email', 'signer_name',
'sent_at', 'signed_at', 'expires_at', 'revoked_at',
'revoked_by', 'revocation_reason', 'is_active',
'linked_tokens_count', 'signed_pdf_url'
]

def get_linked_tokens_count(self, obj):
return DocumentVerificationToken.objects.filter(
nda_envelope_id=obj.envelope_id
).count()


class AccessLogSerializer(serializers.ModelSerializer):
token_email = serializers.CharField(source='token.recipient_email', read_only=True)

class Meta:
model = TokenAccessLog
fields = [
'id', 'token', 'token_email', 'action_type', 'document_id',
'document_name', 'ip_address', 'user_agent', 'success',
'error_message', 'timestamp', 'request_id'
]


class AdminActionSerializer(serializers.ModelSerializer):
performer_email = serializers.EmailField(read_only=True)

class Meta:
model = AdminAction
fields = [
'id', 'performer', 'performer_email', 'action_type',
'target_type', 'target_id', 'target_count', 'reason',
'metadata', 'timestamp'
]

6.3 Permissions

# apps/access_control/api/permissions.py

from rest_framework.permissions import BasePermission

class IsAdmin(BasePermission):
"""Permission check for admin users."""

def has_permission(self, request, view):
return (
request.user and
request.user.is_authenticated and
hasattr(request.user, 'role') and
request.user.role in ['admin', 'super_admin', 'project_admin', 'audit_viewer']
)


class IsSuperAdmin(BasePermission):
"""Permission check for super admin only (destructive operations)."""

def has_permission(self, request, view):
return (
request.user and
request.user.is_authenticated and
hasattr(request.user, 'role') and
request.user.role == 'super_admin'
)


class IsProjectAdmin(BasePermission):
"""Permission check for project-scoped admin."""

def has_permission(self, request, view):
return (
request.user and
request.user.is_authenticated and
hasattr(request.user, 'role') and
request.user.role in ['super_admin', 'project_admin']
)

def has_object_permission(self, request, view, obj):
# Project admin can only manage tokens for their assigned projects
if request.user.role == 'super_admin':
return True
return obj.project_id in request.user.assigned_projects

7. Self-Service Portal

7.1 Self-Service ViewSet

# apps/access_control/api/self_service_views.py

from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django.utils import timezone

from apps.access_control.models import (
DocumentVerificationToken,
TokenAccessLog
)
from apps.access_control.services.revocation import TokenRevocationService


class SelfServiceTokenViewSet(viewsets.ReadOnlyModelViewSet):
"""
Self-service API for token holders.

Endpoints:
- GET /api/v1/self-service/tokens/ - My active tokens
- GET /api/v1/self-service/tokens/{id}/ - Token detail
- POST /api/v1/self-service/tokens/{id}/request-extension/ - Request extension
- POST /api/v1/self-service/tokens/{id}/revoke/ - Self-revoke
- GET /api/v1/self-service/tokens/{id}/access-history/ - My access history
"""

permission_classes = [IsAuthenticated]

def get_queryset(self):
# Token holder can only see their own tokens
return DocumentVerificationToken.objects.filter(
recipient_email=self.request.user.email,
tenant_id=self.request.user.tenant_id
).order_by('-created_at')

@action(detail=True, methods=['post'])
def request_extension(self, request, pk=None):
"""
Request token expiry extension (sends to admin for approval).

POST /api/v1/self-service/tokens/{id}/request-extension/
{
"reason": "Need continued access for ongoing project work",
"requested_expiry": "2027-06-30T23:59:59Z"
}
"""
token = self.get_object()
reason = request.data.get('reason', '').strip()
requested_expiry = request.data.get('requested_expiry')

if not reason or not requested_expiry:
return Response(
{'error': 'reason and requested_expiry are required'},
status=status.HTTP_400_BAD_REQUEST
)

# Create extension request (notify admin)
from apps.access_control.models import ExtensionRequest
extension_req = ExtensionRequest.objects.create(
token=token,
requester_email=request.user.email,
reason=reason,
requested_expiry=requested_expiry,
status='pending'
)

# Send notification to admins
from apps.access_control.services.notifications import NotificationService
NotificationService.send_extension_request_notification(extension_req)

return Response({
'status': 'request_submitted',
'request_id': str(extension_req.id),
'message': 'Your extension request has been submitted to the admin team.'
}, status=status.HTTP_202_ACCEPTED)

@action(detail=True, methods=['post'])
def revoke(self, request, pk=None):
"""
Self-revoke token.

POST /api/v1/self-service/tokens/{id}/revoke/
{
"reason": "No longer need access"
}
"""
token = self.get_object()
reason = request.data.get('reason', '').strip()

if not reason:
return Response(
{'error': 'Reason is required'},
status=status.HTTP_400_BAD_REQUEST
)

# Self-revocation uses SYSTEM as performer
result = TokenRevocationService.revoke_token(
token_id=token.id,
performer=request.user, # Logged as self-revoke
reason=f"[SELF-REVOKE] {reason}",
cascade_nda=False
)

# Log as self-service action
TokenAccessLog.objects.create(
tenant_id=token.tenant_id,
token=token,
action_type='SELF_REVOKE',
ip_address=request.META.get('REMOTE_ADDR', '0.0.0.0'),
user_agent=request.META.get('HTTP_USER_AGENT', 'Unknown'),
request_id=f'self-revoke-{token.id}',
success=True
)

return Response(result, status=status.HTTP_200_OK)

@action(detail=True, methods=['get'])
def access_history(self, request, pk=None):
"""
View access history for this token.

GET /api/v1/self-service/tokens/{id}/access-history/
"""
token = self.get_object()

logs = TokenAccessLog.objects.filter(
token=token
).order_by('-timestamp')[:100] # Last 100 accesses

from apps.access_control.api.serializers import AccessLogSerializer
serializer = AccessLogSerializer(logs, many=True)

return Response(serializer.data, status=status.HTTP_200_OK)

8. Notification Service

8.1 Notification Service Implementation

# apps/access_control/services/notifications.py

from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.conf import settings
import requests
import logging

logger = logging.getLogger(__name__)

class NotificationService:
"""Handles email, Slack, and webhook notifications."""

@classmethod
def send_revocation_email(cls, token, reason):
"""Send email notification for token revocation."""
subject = f"Access Token Revoked - {token.project_name}"

context = {
'recipient_name': token.recipient_name,
'project_name': token.project_name,
'reason': reason,
'revoked_at': token.revoked_at.strftime('%Y-%m-%d %H:%M UTC'),
'support_email': settings.SUPPORT_EMAIL
}

html_message = render_to_string(
'access_control/emails/token_revoked.html',
context
)

send_mail(
subject=subject,
message=f"Your access token for {token.project_name} has been revoked. Reason: {reason}",
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[token.recipient_email],
html_message=html_message,
fail_silently=False
)

logger.info(f"Sent revocation email to {token.recipient_email}")

@classmethod
def send_expiry_warning(cls, token, days_remaining):
"""Send expiry warning email."""
subject = f"Access Token Expiring in {days_remaining} Days - {token.project_name}"

context = {
'recipient_name': token.recipient_name,
'project_name': token.project_name,
'days_remaining': days_remaining,
'expires_at': token.expires_at.strftime('%Y-%m-%d'),
'portal_url': f"{settings.FRONTEND_URL}/self-service/tokens"
}

html_message = render_to_string(
'access_control/emails/token_expiry_warning.html',
context
)

send_mail(
subject=subject,
message=f"Your access token will expire in {days_remaining} days.",
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[token.recipient_email],
html_message=html_message,
fail_silently=False
)

@classmethod
def send_slack_notification(cls, channel, message):
"""Send Slack webhook notification."""
webhook_url = settings.SLACK_WEBHOOK_URL
if not webhook_url:
logger.warning("Slack webhook URL not configured")
return

payload = {
'channel': channel,
'text': message,
'username': 'BIO-QMS Access Control',
'icon_emoji': ':lock:'
}

try:
response = requests.post(webhook_url, json=payload, timeout=5)
response.raise_for_status()
logger.info(f"Sent Slack notification to {channel}")
except requests.RequestException as e:
logger.error(f"Failed to send Slack notification: {e}")

@classmethod
def send_webhook(cls, url, payload):
"""Send custom webhook notification."""
try:
response = requests.post(
url,
json=payload,
headers={'Content-Type': 'application/json'},
timeout=10
)
response.raise_for_status()
logger.info(f"Sent webhook to {url}")
except requests.RequestException as e:
logger.error(f"Failed to send webhook to {url}: {e}")

@classmethod
def send_extension_request_notification(cls, extension_request):
"""Notify admins of extension request."""
subject = f"Token Extension Request - {extension_request.token.project_name}"

context = {
'requester_email': extension_request.requester_email,
'project_name': extension_request.token.project_name,
'current_expiry': extension_request.token.expires_at.strftime('%Y-%m-%d'),
'requested_expiry': extension_request.requested_expiry.strftime('%Y-%m-%d'),
'reason': extension_request.reason,
'admin_url': f"{settings.FRONTEND_URL}/admin/tokens/{extension_request.token.id}"
}

html_message = render_to_string(
'access_control/emails/extension_request.html',
context
)

# Send to all admins
admin_emails = settings.ADMIN_EMAILS
send_mail(
subject=subject,
message=f"Extension request from {extension_request.requester_email}",
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=admin_emails,
html_message=html_message,
fail_silently=False
)

9. Emergency Procedures

9.1 Kill Switch Service

# apps/access_control/services/emergency.py

from django.db import transaction
from django.utils import timezone
from apps.access_control.models import DocumentVerificationToken, AdminAction
from apps.access_control.services.revocation import TokenRevocationService
import logging

logger = logging.getLogger(__name__)

class EmergencyService:
"""Handles emergency lockout and kill switch operations."""

@classmethod
@transaction.atomic
def project_kill_switch(cls, project_id, performer, reason):
"""
Emergency kill switch: revoke ALL tokens for a project instantly.

Use case: Security breach, data leak, immediate lockout required.

Args:
project_id: Project to lock out
performer: Admin performing kill switch
reason: Emergency reason

Returns:
dict: {
'status': 'locked_out',
'project_id': str,
'revoked_count': int,
'propagation_time_ms': int
}
"""
logger.critical(
f"KILL SWITCH ACTIVATED: Project {project_id} by {performer.email}. "
f"Reason: {reason}"
)

start_time = timezone.now()

# Find all active tokens for project
tokens = DocumentVerificationToken.objects.select_for_update().filter(
project_id=project_id,
revoked_at__isnull=True
)

token_count = tokens.count()

# Revoke all
revoked_at = timezone.now()
for token in tokens:
token.revoked_at = revoked_at
token.revoked_by = str(performer.id)
token.revocation_reason = f"[EMERGENCY KILL SWITCH] {reason}"
token.save(update_fields=['revoked_at', 'revoked_by', 'revocation_reason'])

# Redis invalidation
TokenRevocationService._invalidate_cache(token.token_uuid)

# Admin action
AdminAction.objects.create(
tenant_id=performer.tenant_id,
performer=performer,
performer_email=performer.email,
action_type='EMERGENCY_LOCKOUT',
target_type='project',
target_id=project_id,
target_count=token_count,
reason=f"[KILL SWITCH] {reason}",
metadata={'emergency': True},
ip_address=performer.last_login_ip or '0.0.0.0',
user_agent='Emergency Kill Switch'
)

propagation_time_ms = int((timezone.now() - start_time).total_seconds() * 1000)

logger.critical(
f"KILL SWITCH COMPLETE: Revoked {token_count} tokens for project {project_id} "
f"in {propagation_time_ms}ms"
)

# Immediate Slack alert
from apps.access_control.services.notifications import NotificationService
NotificationService.send_slack_notification(
channel='#security-alerts',
message=f":rotating_light: KILL SWITCH ACTIVATED :rotating_light:\n"
f"Project: {project_id}\n"
f"Tokens revoked: {token_count}\n"
f"Performer: {performer.email}\n"
f"Reason: {reason}"
)

return {
'status': 'locked_out',
'project_id': project_id,
'revoked_count': token_count,
'propagation_time_ms': propagation_time_ms
}

@classmethod
def global_lockout(cls, performer, reason):
"""
NUCLEAR OPTION: Disable ALL token validation globally.

This sets a Redis flag that causes ALL token validations to fail.
Use only for critical incidents (security breach, system compromise).

Args:
performer: Super admin performing lockout
reason: Emergency reason

Returns:
dict: {'status': 'global_lockout_active'}
"""
logger.critical(
f"GLOBAL LOCKOUT ACTIVATED by {performer.email}. Reason: {reason}"
)

# Set Redis flag
from django_redis import get_redis_connection
redis_conn = get_redis_connection("default")
redis_conn.set('global_lockout:active', '1', ex=3600) # 1 hour TTL
redis_conn.set('global_lockout:reason', reason, ex=3600)
redis_conn.set('global_lockout:performer', performer.email, ex=3600)

# Admin action
AdminAction.objects.create(
tenant_id=performer.tenant_id,
performer=performer,
performer_email=performer.email,
action_type='EMERGENCY_LOCKOUT',
target_type='global',
target_id='*',
target_count=0,
reason=f"[GLOBAL LOCKOUT] {reason}",
metadata={'global': True, 'ttl_seconds': 3600},
ip_address=performer.last_login_ip or '0.0.0.0',
user_agent='Global Lockout'
)

# Immediate Slack + Email alert
from apps.access_control.services.notifications import NotificationService
NotificationService.send_slack_notification(
channel='#security-alerts',
message=f":rotating_light::rotating_light: GLOBAL LOCKOUT ACTIVE :rotating_light::rotating_light:\n"
f"ALL TOKEN VALIDATION DISABLED\n"
f"Performer: {performer.email}\n"
f"Reason: {reason}\n"
f"TTL: 1 hour (auto-restore)"
)

return {
'status': 'global_lockout_active',
'ttl_seconds': 3600,
'restore_at': (timezone.now() + timezone.timedelta(hours=1)).isoformat()
}

@classmethod
def disable_global_lockout(cls, performer):
"""Manually disable global lockout."""
from django_redis import get_redis_connection
redis_conn = get_redis_connection("default")
redis_conn.delete('global_lockout:active')
redis_conn.delete('global_lockout:reason')
redis_conn.delete('global_lockout:performer')

logger.critical(f"GLOBAL LOCKOUT DISABLED by {performer.email}")

return {'status': 'global_lockout_disabled'}

9.2 Global Lockout Middleware

# apps/access_control/middleware.py (add to existing TokenValidationMiddleware)

class TokenValidationMiddleware:
# ... existing code ...

def __call__(self, request):
# Check global lockout FIRST
if self._is_global_lockout_active():
reason = self.redis_conn.get('global_lockout:reason')
return JsonResponse({
'error': 'System maintenance in progress. All access temporarily disabled.',
'lockout_reason': reason.decode() if reason else 'Emergency lockout active'
}, status=503)

# ... rest of validation logic ...

def _is_global_lockout_active(self):
"""Check if global lockout is active."""
return self.redis_conn.exists('global_lockout:active')

10. Admin Panel UI Design (React)

10.1 Token Management Dashboard Component

// components/admin/TokenManagementDashboard.tsx

import React, { useState, useEffect } from 'react';
import { Card, Table, Input, Select, Button, Tag, Space, Modal, message } from 'antd';
import { SearchOutlined, ReloadOutlined, DeleteOutlined, ClockCircleOutlined } from '@ant-design/icons';
import axios from 'axios';

interface Token {
id: string;
token_uuid: string;
project_name: string;
company_name: string;
recipient_email: string;
recipient_name: string;
issued_at: string;
expires_at: string;
status: 'active' | 'revoked' | 'expired';
usage_count: number;
last_used_at: string | null;
}

interface TokenStats {
total_tokens: number;
active_tokens: number;
expired_tokens: number;
revoked_tokens: number;
expiring_soon: number;
}

const TokenManagementDashboard: React.FC = () => {
const [tokens, setTokens] = useState<Token[]>([]);
const [stats, setStats] = useState<TokenStats | null>(null);
const [loading, setLoading] = useState(false);
const [filters, setFilters] = useState({
status: 'all',
project_id: '',
company_name: '',
search: ''
});

useEffect(() => {
fetchTokens();
fetchStats();
}, [filters]);

const fetchTokens = async () => {
setLoading(true);
try {
const params = new URLSearchParams();
if (filters.status !== 'all') params.append('status', filters.status);
if (filters.project_id) params.append('project_id', filters.project_id);
if (filters.company_name) params.append('company_name', filters.company_name);
if (filters.search) params.append('search', filters.search);

const response = await axios.get(`/api/v1/admin/tokens/?${params.toString()}`);
setTokens(response.data.results);
} catch (error) {
message.error('Failed to load tokens');
} finally {
setLoading(false);
}
};

const fetchStats = async () => {
try {
const response = await axios.get('/api/v1/admin/tokens/stats/');
setStats(response.data);
} catch (error) {
message.error('Failed to load statistics');
}
};

const handleRevoke = (tokenId: string) => {
Modal.confirm({
title: 'Revoke Token',
content: (
<div>
<p>Are you sure you want to revoke this token?</p>
<Input.TextArea
rows={3}
placeholder="Reason for revocation (required)"
id="revoke-reason"
/>
</div>
),
onOk: async () => {
const reason = (document.getElementById('revoke-reason') as HTMLTextAreaElement)?.value;
if (!reason) {
message.error('Reason is required');
return Promise.reject();
}

try {
await axios.post(`/api/v1/admin/tokens/${tokenId}/revoke/`, { reason });
message.success('Token revoked successfully');
fetchTokens();
fetchStats();
} catch (error) {
message.error('Failed to revoke token');
throw error;
}
}
});
};

const handleExtend = (tokenId: string) => {
Modal.confirm({
title: 'Extend Token Expiry',
content: (
<div>
<p>New expiry date:</p>
<Input
type="datetime-local"
id="new-expiry"
/>
<p style={{ marginTop: 16 }}>Reason:</p>
<Input.TextArea
rows={2}
placeholder="Reason for extension"
id="extend-reason"
/>
</div>
),
onOk: async () => {
const newExpiry = (document.getElementById('new-expiry') as HTMLInputElement)?.value;
const reason = (document.getElementById('extend-reason') as HTMLTextAreaElement)?.value;

if (!newExpiry || !reason) {
message.error('Both new expiry date and reason are required');
return Promise.reject();
}

try {
await axios.post(`/api/v1/admin/tokens/${tokenId}/extend/`, {
new_expiry: newExpiry,
reason
});
message.success('Token expiry extended successfully');
fetchTokens();
} catch (error) {
message.error('Failed to extend token');
throw error;
}
}
});
};

const columns = [
{
title: 'Recipient',
dataIndex: 'recipient_email',
key: 'recipient_email',
render: (email: string, record: Token) => (
<div>
<div>{record.recipient_name}</div>
<div style={{ fontSize: 12, color: '#888' }}>{email}</div>
</div>
)
},
{
title: 'Company',
dataIndex: 'company_name',
key: 'company_name'
},
{
title: 'Project',
dataIndex: 'project_name',
key: 'project_name'
},
{
title: 'Status',
dataIndex: 'status',
key: 'status',
render: (status: string) => {
const color = status === 'active' ? 'green' : status === 'expired' ? 'orange' : 'red';
return <Tag color={color}>{status.toUpperCase()}</Tag>;
}
},
{
title: 'Expires',
dataIndex: 'expires_at',
key: 'expires_at',
render: (date: string) => new Date(date).toLocaleDateString()
},
{
title: 'Usage',
dataIndex: 'usage_count',
key: 'usage_count',
render: (count: number, record: Token) => (
<div>
<div>{count} accesses</div>
{record.last_used_at && (
<div style={{ fontSize: 11, color: '#888' }}>
Last: {new Date(record.last_used_at).toLocaleDateString()}
</div>
)}
</div>
)
},
{
title: 'Actions',
key: 'actions',
render: (_: any, record: Token) => (
<Space>
<Button
size="small"
icon={<ClockCircleOutlined />}
onClick={() => handleExtend(record.id)}
disabled={record.status === 'revoked'}
>
Extend
</Button>
<Button
size="small"
danger
icon={<DeleteOutlined />}
onClick={() => handleRevoke(record.id)}
disabled={record.status === 'revoked'}
>
Revoke
</Button>
</Space>
)
}
];

return (
<div style={{ padding: 24 }}>
{/* Statistics Cards */}
{stats && (
<div style={{ display: 'grid', gridTemplateColumns: 'repeat(5, 1fr)', gap: 16, marginBottom: 24 }}>
<Card>
<div style={{ fontSize: 24, fontWeight: 'bold' }}>{stats.total_tokens}</div>
<div style={{ color: '#888' }}>Total Tokens</div>
</Card>
<Card>
<div style={{ fontSize: 24, fontWeight: 'bold', color: '#52c41a' }}>{stats.active_tokens}</div>
<div style={{ color: '#888' }}>Active</div>
</Card>
<Card>
<div style={{ fontSize: 24, fontWeight: 'bold', color: '#faad14' }}>{stats.expired_tokens}</div>
<div style={{ color: '#888' }}>Expired</div>
</Card>
<Card>
<div style={{ fontSize: 24, fontWeight: 'bold', color: '#f5222d' }}>{stats.revoked_tokens}</div>
<div style={{ color: '#888' }}>Revoked</div>
</Card>
<Card>
<div style={{ fontSize: 24, fontWeight: 'bold', color: '#ff7a45' }}>{stats.expiring_soon}</div>
<div style={{ color: '#888' }}>Expiring Soon</div>
</Card>
</div>
)}

{/* Filters */}
<Card style={{ marginBottom: 16 }}>
<Space style={{ width: '100%', justifyContent: 'space-between' }}>
<Space>
<Input
placeholder="Search by email, company, project..."
prefix={<SearchOutlined />}
style={{ width: 300 }}
value={filters.search}
onChange={(e) => setFilters({ ...filters, search: e.target.value })}
/>
<Select
style={{ width: 150 }}
value={filters.status}
onChange={(value) => setFilters({ ...filters, status: value })}
>
<Select.Option value="all">All Status</Select.Option>
<Select.Option value="active">Active</Select.Option>
<Select.Option value="expired">Expired</Select.Option>
<Select.Option value="revoked">Revoked</Select.Option>
</Select>
</Space>
<Button icon={<ReloadOutlined />} onClick={fetchTokens}>
Refresh
</Button>
</Space>
</Card>

{/* Tokens Table */}
<Card>
<Table
dataSource={tokens}
columns={columns}
loading={loading}
rowKey="id"
pagination={{
pageSize: 20,
showSizeChanger: true,
showTotal: (total) => `Total ${total} tokens`
}}
/>
</Card>
</div>
);
};

export default TokenManagementDashboard;

11. Testing

11.1 Unit Tests

# apps/access_control/tests/test_revocation.py

from django.test import TestCase
from django.utils import timezone
from datetime import timedelta
from unittest.mock import patch

from apps.access_control.models import DocumentVerificationToken
from apps.access_control.services.revocation import TokenRevocationService
from apps.core.models import User


class TokenRevocationServiceTest(TestCase):

def setUp(self):
self.admin_user = User.objects.create_user(
email='admin@example.com',
tenant_id='tenant_001',
role='super_admin'
)

self.token = DocumentVerificationToken.objects.create(
tenant_id='tenant_001',
project_id='proj_001',
project_name='Test Project',
company_name='Acme Corp',
recipient_email='user@acme.com',
recipient_name='John Doe',
nda_envelope_id='env_123',
nda_signed_at=timezone.now(),
expires_at=timezone.now() + timedelta(days=365)
)

@patch('apps.access_control.services.revocation.TokenRevocationService._invalidate_cache')
def test_revoke_token_success(self, mock_invalidate):
"""Test successful token revocation."""
result = TokenRevocationService.revoke_token(
token_id=self.token.id,
performer=self.admin_user,
reason='Test revocation',
cascade_nda=False
)

self.assertEqual(result['status'], 'revoked')
self.assertEqual(result['cascade_count'], 0)
self.assertIsNotNone(result['revoked_at'])

# Verify database update
self.token.refresh_from_db()
self.assertIsNotNone(self.token.revoked_at)
self.assertEqual(self.token.revoked_by, str(self.admin_user.id))
self.assertEqual(self.token.revocation_reason, 'Test revocation')

# Verify cache invalidation called
mock_invalidate.assert_called_once_with(self.token.token_uuid)

def test_revoke_token_idempotent(self):
"""Test that revoking already-revoked token is idempotent."""
# First revocation
TokenRevocationService.revoke_token(
token_id=self.token.id,
performer=self.admin_user,
reason='First revocation'
)

# Second revocation
result = TokenRevocationService.revoke_token(
token_id=self.token.id,
performer=self.admin_user,
reason='Second revocation'
)

self.assertEqual(result['status'], 'already_revoked')

@patch('apps.access_control.services.revocation.TokenRevocationService._invalidate_cache')
def test_revoke_with_cascade(self, mock_invalidate):
"""Test cascading revocation to all tokens with same NDA."""
# Create additional token with same NDA
token2 = DocumentVerificationToken.objects.create(
tenant_id='tenant_001',
project_id='proj_001',
project_name='Test Project',
company_name='Acme Corp',
recipient_email='user2@acme.com',
recipient_name='Jane Doe',
nda_envelope_id='env_123', # Same NDA
nda_signed_at=timezone.now(),
expires_at=timezone.now() + timedelta(days=365)
)

result = TokenRevocationService.revoke_token(
token_id=self.token.id,
performer=self.admin_user,
reason='NDA revoked',
cascade_nda=True
)

self.assertEqual(result['cascade_count'], 1)

# Verify both tokens revoked
self.token.refresh_from_db()
token2.refresh_from_db()
self.assertIsNotNone(self.token.revoked_at)
self.assertIsNotNone(token2.revoked_at)

12. Performance Metrics

MetricTargetMeasurement
Revocation Propagation<5s from admin action to enforcementTime from API call to Redis invalidation complete
Cache Hit Rate>95% for token validationRedis cache hits / total validations
Bulk Revocation Throughput>1000 tokens/secondTokens revoked per second in batch operation
Admin Dashboard Load Time<2s for 10K tokensTime to render token list page
Cleanup Task Duration<5 minutes for 100K expired tokensHourly cleanup task execution time
API Response Time (p95)<200ms95th percentile for all admin API endpoints

13. Security Considerations

13.1 Access Control Matrix

RoleView TokensRevoke SingleBulk RevokeExtend ExpiryKill SwitchGlobal Lockout
Super Admin✓ All
Project Admin✓ Assigned projects only✓ Own projects✓ Own projects
Audit Viewer✓ Read-only
Token Holder✓ Own tokens✓ Self-revokeRequest only

13.2 Audit Trail Requirements (21 CFR Part 11)

Every revocation, extension, and admin action MUST generate audit entries with:

  • Who: Performer ID + email (denormalized for retention)
  • What: Action type + target entity + reason
  • When: Timestamp (UTC, ISO 8601)
  • Where: IP address + user agent
  • Why: Textual reason (mandatory for regulatory actions)
  • Context: Metadata (filters for bulk ops, cascade flag, etc.)

Audit entries are immutable (enforced via database trigger) and retained for 7 years per Part 11 §11.10(e).


14. Deployment Checklist

  • Database migrations applied (revocation fields, audit tables, triggers)
  • Redis cache configured with appropriate TTLs
  • Celery beat scheduler configured for hourly cleanup
  • Email templates created for revocation/expiry notifications
  • Slack webhook URL configured in settings
  • Admin panel UI deployed and accessible
  • Self-service portal deployed
  • Permission roles configured in identity provider
  • Monitoring dashboards created (Grafana)
  • Alert rules configured (Prometheus)
  • Runbook created for kill switch/emergency procedures
  • Load testing completed (1M+ tokens, 10K+ concurrent validations)
  • Security review completed (OWASP Top 10, access control)
  • Documentation published (admin user guide, API docs)

15. Appendix: API Reference

Complete Endpoint List

MethodEndpointDescriptionPermission
Token Management
GET/api/v1/admin/tokens/List tokens with filtersIsAdmin
GET/api/v1/admin/tokens/{id}/Token detailIsAdmin
POST/api/v1/admin/tokens/{id}/revoke/Revoke single tokenIsAdmin
POST/api/v1/admin/tokens/bulk-revoke/Bulk revocationIsSuperAdmin
POST/api/v1/admin/tokens/{id}/extend/Extend expiryIsAdmin
GET/api/v1/admin/tokens/stats/Dashboard statisticsIsAdmin
NDA Management
GET/api/v1/admin/ndas/List NDAsIsAdmin
GET/api/v1/admin/ndas/{id}/NDA detailIsAdmin
POST/api/v1/admin/ndas/{id}/revoke/Revoke NDA (cascade)IsSuperAdmin
Access Log
GET/api/v1/admin/audit-log/List access logsIsAdmin
GET/api/v1/admin/audit-log/export/Export logs (CSV/JSON)IsAdmin
Self-Service
GET/api/v1/self-service/tokens/My tokensIsAuthenticated
GET/api/v1/self-service/tokens/{id}/Token detailIsAuthenticated
POST/api/v1/self-service/tokens/{id}/request-extension/Request extensionIsAuthenticated
POST/api/v1/self-service/tokens/{id}/revoke/Self-revokeIsAuthenticated
GET/api/v1/self-service/tokens/{id}/access-history/My access historyIsAuthenticated
Emergency
POST/api/v1/admin/emergency/kill-switch/Project kill switchIsSuperAdmin
POST/api/v1/admin/emergency/global-lockout/Global lockoutIsSuperAdmin
DELETE/api/v1/admin/emergency/global-lockout/Disable lockoutIsSuperAdmin

Copyright 2026 AZ1.AI Inc. All rights reserved. Developer: Hal Casteel, CEO/CTO Product: CODITECT-BIO-QMS | Part of the CODITECT Product Suite Classification: Internal - Confidential