Document Access Audit Trail Design
Document ID: BIO-QMS-AUDIT-TRAIL-001 Version: 1.0.0 Status: Active Owner: Security Architecture Team Last Updated: 2026-02-16
Table of Contents
- Executive Summary
- Architecture Overview
- AuditLog Model Specification
- Logging Middleware
- Analytics Capabilities
- Analytics API Endpoints
- Compliance Reporting
- Data Retention
- Real-Time Monitoring
- Privacy Considerations
- Performance Optimization
- Security of Audit Data
- Implementation Guide
- Testing Strategy
- Appendices
1. Executive Summary
1.1 Purpose
This document specifies the comprehensive audit trail system for the BIO-QMS Platform's NDA-gated document access control (ADR-196). The audit trail captures all document access events, provides analytics for usage patterns, and generates compliance reports required by FDA 21 CFR Part 11, HIPAA, SOC2, and ISO 13485.
1.2 Key Features
- Complete Access Logging: Every document view, search, download, and share attempt
- Tamper-Proof Storage: Append-only audit logs with cryptographic integrity verification
- Real-Time Analytics: Live dashboards showing access patterns, popular content, and anomalies
- Compliance Reports: Automated CSV/JSON/PDF exports for regulatory submissions
- Privacy-Aware: GDPR-compliant with data subject rights and anonymization
- High Performance: Async logging with batch writes, <10ms overhead per request
1.3 Regulatory Compliance
| Regulation | Requirement | Implementation |
|---|---|---|
| 21 CFR Part 11 | Audit trail for all PHI access | AuditLog model with user, timestamp, action, document |
| HIPAA Security Rule | Access logging (§164.312(b)) | IP address, user agent, session tracking |
| SOC2 Type II | CC6.2 (Monitoring) | Real-time monitoring, anomaly detection |
| ISO 13485 | 4.2.4 (Document Control) | Document access history, version tracking |
| GDPR Article 30 | Records of processing | User consent, retention policies, right to erasure |
2. Architecture Overview
2.1 System Context
┌─────────────────────────────────────────────────────────────────────┐
│ BIO-QMS Documentation Viewer │
│ │
│ ┌──────────────┐ ┌─────────────────┐ ┌──────────────┐ │
│ │ User Action │───────▶│ Audit Middleware│─────▶│ Event Queue │ │
│ │ (view/down) │ │ (intercept) │ │ (Celery) │ │
│ └──────────────┘ └─────────────────┘ └──────┬───────┘ │
│ │ │
│ ┌─────────────────▼───────┐ │
│ │ Batch Writer (async) │ │
│ │ - Groups events by 100 │ │
│ │ - Bulk insert to DB │ │
│ └─────────────┬───────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────▼───────────┐│
│ │ AuditLog Database (append-only) ││
│ │ ┌────────────┬────────────┬────────────┬────────────┐ ││
│ │ │ Current │ Archive │ Anonymized │ Hash Chain │ ││
│ │ │ (90 days) │ (7 years) │ (GDPR) │ (integrity)│ ││
│ │ └────────────┴────────────┴────────────┴────────────┘ ││
│ └──────────────────────────────────────────────────────────────────┘│
│ │
│ ┌──────────────────────────────────────────────────────────────────┐│
│ │ Analytics & Reporting Layer ││
│ │ ┌──────────────┬──────────────┬──────────────┬──────────────┐ ││
│ │ │ Real-Time │ Aggregation │ Compliance │ Anomaly │ ││
│ │ │ Dashboard │ Service │ Reports │ Detection │ ││
│ │ │ (WebSocket) │ (Celery) │ (CSV/PDF) │ (ML-based) │ ││
│ │ └──────────────┴──────────────┴──────────────┴──────────────┘ ││
│ └──────────────────────────────────────────────────────────────────┘│
└───────────────────────────────────────────────────────────────────────┘
2.2 Data Flow
Write Path (Logging):
User Request
↓
Audit Middleware (captures context)
↓
Event Queue (Redis-backed Celery)
↓
Batch Writer (groups 100 events or 5 seconds)
↓
Bulk Insert to AuditLog table
↓
Hash Chain Update (integrity verification)
↓
WebSocket Notification (real-time dashboard)
Read Path (Analytics):
Analytics API Request
↓
Cache Check (Redis, 60s TTL)
↓ (cache miss)
Read Replica Query (materialized views)
↓
Aggregation Service (compute metrics)
↓
Cache Result
↓
Return JSON Response
3. AuditLog Model Specification
3.1 Django Model
File: backend/audit/models.py
from django.db import models
from django.contrib.postgres.fields import JSONField
from django.core.validators import validate_ipv46_address
import uuid
import hashlib
class AuditLog(models.Model):
"""
Comprehensive audit log for document access events.
Compliance: 21 CFR Part 11, HIPAA §164.312(b), SOC2 CC6.2
Retention: 7 years (configurable per regulation)
Storage: Append-only, no UPDATE/DELETE allowed
"""
# Primary Key
id = models.BigAutoField(
primary_key=True,
help_text="Sequential audit log entry ID"
)
# User Identity
user_email = models.EmailField(
max_length=255,
db_index=True,
help_text="Email of user performing action (from JWT or session)"
)
user_id = models.CharField(
max_length=128,
db_index=True,
null=True,
blank=True,
help_text="User ID from identity provider (GCP, Auth0, etc.)"
)
# Action Type
ACTION_CHOICES = [
('view', 'Document Viewed'),
('search', 'Search Performed'),
('download', 'Document Downloaded'),
('print', 'Document Printed'),
('share_attempt', 'Share Attempt (blocked or allowed)'),
('auth_failure', 'Authentication Failure'),
('nda_accept', 'NDA Accepted'),
('nda_decline', 'NDA Declined'),
('token_refresh', 'Access Token Refreshed'),
('session_start', 'Session Started'),
('session_end', 'Session Ended'),
]
action = models.CharField(
max_length=32,
choices=ACTION_CHOICES,
db_index=True,
help_text="Type of action performed"
)
# Document Context
document_id = models.CharField(
max_length=500,
db_index=True,
help_text="Document ID from publish.json (e.g., docs-architecture-system-design)"
)
document_title = models.CharField(
max_length=500,
db_index=True,
help_text="Denormalized document title for query performance"
)
document_category = models.CharField(
max_length=100,
db_index=True,
null=True,
blank=True,
help_text="Document category (Architecture, Compliance, etc.)"
)
# Temporal Context
timestamp = models.DateTimeField(
auto_now_add=True,
db_index=True,
help_text="UTC timestamp when action occurred"
)
# Network Context
ip_address = models.GenericIPAddressField(
protocol='both', # IPv4 or IPv6
db_index=True,
help_text="Source IP address of the request"
)
user_agent = models.TextField(
help_text="Browser user agent string (for device fingerprinting)"
)
geolocation = models.JSONField(
null=True,
blank=True,
help_text="IP geolocation data (country, region, city) from GeoIP2"
)
# Token and Session Context
token_id = models.UUIDField(
db_index=True,
null=True,
blank=True,
help_text="DocumentViewToken UUID that authorized this access"
)
session_id = models.UUIDField(
db_index=True,
default=uuid.uuid4,
help_text="Session UUID grouping actions within a user session"
)
# Additional Metadata
metadata = models.JSONField(
default=dict,
blank=True,
help_text="""
Additional action-specific context:
- search_query: str (for search actions)
- download_format: str (pdf, docx, etc.)
- share_recipient: str (for share attempts)
- failure_reason: str (for auth_failure)
- nda_version: str (for nda_accept/decline)
- referrer_url: str (page that linked to document)
"""
)
# Performance Metrics
duration_ms = models.IntegerField(
null=True,
blank=True,
help_text="Time spent on page in milliseconds (for view actions)"
)
response_time_ms = models.IntegerField(
null=True,
blank=True,
help_text="Server response time for this action"
)
# Integrity Verification
previous_hash = models.CharField(
max_length=64,
null=True,
blank=True,
help_text="SHA-256 hash of previous audit log entry (hash chain)"
)
record_hash = models.CharField(
max_length=64,
unique=True,
help_text="SHA-256 hash of this record (for tamper detection)"
)
# Data Lifecycle
retention_until = models.DateTimeField(
db_index=True,
null=True,
blank=True,
help_text="Date when this record can be archived/anonymized (per retention policy)"
)
anonymized = models.BooleanField(
default=False,
db_index=True,
help_text="True if PII has been anonymized (GDPR right to erasure)"
)
archived = models.BooleanField(
default=False,
db_index=True,
help_text="True if moved to cold storage (S3 Glacier)"
)
class Meta:
db_table = 'audit_log'
ordering = ['-timestamp']
indexes = [
models.Index(fields=['user_email', 'timestamp']),
models.Index(fields=['document_id', 'timestamp']),
models.Index(fields=['action', 'timestamp']),
models.Index(fields=['session_id', 'timestamp']),
models.Index(fields=['ip_address', 'timestamp']),
models.Index(fields=['timestamp', 'archived']),
]
# Partitioning by month for performance
# Implemented via PostgreSQL table partitioning (see migration)
def __str__(self):
return f"{self.timestamp} | {self.user_email} | {self.action} | {self.document_id}"
def save(self, *args, **kwargs):
"""
Override save to compute hash chain and record hash.
"""
if not self.record_hash:
# Get previous record's hash
if not self.previous_hash:
prev = AuditLog.objects.order_by('-id').first()
self.previous_hash = prev.record_hash if prev else "0" * 64
# Compute this record's hash
hash_input = f"{self.previous_hash}|{self.user_email}|{self.action}|{self.document_id}|{self.timestamp.isoformat()}"
self.record_hash = hashlib.sha256(hash_input.encode()).hexdigest()
super().save(*args, **kwargs)
@classmethod
def verify_integrity(cls, start_id=None, end_id=None):
"""
Verify hash chain integrity for audit logs.
Returns:
dict: {
'valid': bool,
'total_checked': int,
'errors': [{'id': int, 'expected_hash': str, 'actual_hash': str}]
}
"""
queryset = cls.objects.all()
if start_id:
queryset = queryset.filter(id__gte=start_id)
if end_id:
queryset = queryset.filter(id__lte=end_id)
queryset = queryset.order_by('id')
errors = []
prev_hash = "0" * 64
for log in queryset:
if log.previous_hash != prev_hash:
errors.append({
'id': log.id,
'expected_previous_hash': prev_hash,
'actual_previous_hash': log.previous_hash
})
# Re-compute hash to verify
hash_input = f"{log.previous_hash}|{log.user_email}|{log.action}|{log.document_id}|{log.timestamp.isoformat()}"
computed_hash = hashlib.sha256(hash_input.encode()).hexdigest()
if computed_hash != log.record_hash:
errors.append({
'id': log.id,
'expected_record_hash': computed_hash,
'actual_record_hash': log.record_hash
})
prev_hash = log.record_hash
return {
'valid': len(errors) == 0,
'total_checked': queryset.count(),
'errors': errors
}
3.2 Database Migration
File: backend/audit/migrations/0001_initial.py
from django.db import migrations, models
import django.contrib.postgres.fields
import uuid
class Migration(migrations.Migration):
initial = True
dependencies = []
operations = [
migrations.CreateModel(
name='AuditLog',
fields=[
('id', models.BigAutoField(primary_key=True, serialize=False)),
('user_email', models.EmailField(db_index=True, max_length=255)),
('user_id', models.CharField(blank=True, db_index=True, max_length=128, null=True)),
('action', models.CharField(choices=[
('view', 'Document Viewed'),
('search', 'Search Performed'),
('download', 'Document Downloaded'),
('print', 'Document Printed'),
('share_attempt', 'Share Attempt'),
('auth_failure', 'Authentication Failure'),
('nda_accept', 'NDA Accepted'),
('nda_decline', 'NDA Declined'),
('token_refresh', 'Access Token Refreshed'),
('session_start', 'Session Started'),
('session_end', 'Session Ended'),
], db_index=True, max_length=32)),
('document_id', models.CharField(db_index=True, max_length=500)),
('document_title', models.CharField(db_index=True, max_length=500)),
('document_category', models.CharField(blank=True, db_index=True, max_length=100, null=True)),
('timestamp', models.DateTimeField(auto_now_add=True, db_index=True)),
('ip_address', models.GenericIPAddressField(db_index=True, protocol='both')),
('user_agent', models.TextField()),
('geolocation', models.JSONField(blank=True, null=True)),
('token_id', models.UUIDField(blank=True, db_index=True, null=True)),
('session_id', models.UUIDField(db_index=True, default=uuid.uuid4)),
('metadata', models.JSONField(blank=True, default=dict)),
('duration_ms', models.IntegerField(blank=True, null=True)),
('response_time_ms', models.IntegerField(blank=True, null=True)),
('previous_hash', models.CharField(blank=True, max_length=64, null=True)),
('record_hash', models.CharField(max_length=64, unique=True)),
('retention_until', models.DateTimeField(blank=True, db_index=True, null=True)),
('anonymized', models.BooleanField(db_index=True, default=False)),
('archived', models.BooleanField(db_index=True, default=False)),
],
options={
'db_table': 'audit_log',
'ordering': ['-timestamp'],
},
),
# Create composite indexes
migrations.AddIndex(
model_name='auditlog',
index=models.Index(fields=['user_email', 'timestamp'], name='audit_user_time_idx'),
),
migrations.AddIndex(
model_name='auditlog',
index=models.Index(fields=['document_id', 'timestamp'], name='audit_doc_time_idx'),
),
migrations.AddIndex(
model_name='auditlog',
index=models.Index(fields=['action', 'timestamp'], name='audit_action_time_idx'),
),
migrations.AddIndex(
model_name='auditlog',
index=models.Index(fields=['session_id', 'timestamp'], name='audit_session_time_idx'),
),
migrations.AddIndex(
model_name='auditlog',
index=models.Index(fields=['ip_address', 'timestamp'], name='audit_ip_time_idx'),
),
migrations.AddIndex(
model_name='auditlog',
index=models.Index(fields=['timestamp', 'archived'], name='audit_time_arch_idx'),
),
]
3.3 PostgreSQL Table Partitioning
File: backend/audit/migrations/0002_partition_by_month.py
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('audit', '0001_initial'),
]
operations = [
migrations.RunSQL(
sql="""
-- Convert audit_log to partitioned table
CREATE TABLE audit_log_partitioned (
LIKE audit_log INCLUDING ALL
) PARTITION BY RANGE (timestamp);
-- Create partitions for 2026 (monthly)
CREATE TABLE audit_log_2026_01 PARTITION OF audit_log_partitioned
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE audit_log_2026_02 PARTITION OF audit_log_partitioned
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
CREATE TABLE audit_log_2026_03 PARTITION OF audit_log_partitioned
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');
-- Continue through 2026-12
-- (Generate programmatically in production)
-- Create default partition for future data
CREATE TABLE audit_log_default PARTITION OF audit_log_partitioned
DEFAULT;
-- Migrate existing data
INSERT INTO audit_log_partitioned SELECT * FROM audit_log;
-- Swap tables
ALTER TABLE audit_log RENAME TO audit_log_old;
ALTER TABLE audit_log_partitioned RENAME TO audit_log;
-- Drop old table (after verification)
-- DROP TABLE audit_log_old;
""",
reverse_sql="""
ALTER TABLE audit_log RENAME TO audit_log_partitioned;
ALTER TABLE audit_log_old RENAME TO audit_log;
DROP TABLE audit_log_partitioned CASCADE;
"""
),
]
4. Logging Middleware
4.1 Django Middleware
File: backend/audit/middleware.py
import time
import uuid
from django.utils.deprecation import MiddlewareMixin
from django.conf import settings
from .tasks import log_audit_event_async
import geoip2.database
class AuditLoggingMiddleware(MiddlewareMixin):
"""
Middleware to capture document access events for audit trail.
Intercepts all requests to /api/documents/* and /api/search endpoints.
Logs asynchronously via Celery to avoid blocking requests.
"""
def __init__(self, get_response):
self.get_response = get_response
self.geoip_reader = None
# Initialize GeoIP2 reader if available
try:
self.geoip_reader = geoip2.database.Reader(
settings.GEOIP_DATABASE_PATH
)
except Exception as e:
print(f"Warning: GeoIP2 database not available: {e}")
def process_request(self, request):
"""
Capture request start time and session ID.
"""
request._audit_start_time = time.time()
# Get or create session ID
if not hasattr(request, 'session') or 'audit_session_id' not in request.session:
request.session['audit_session_id'] = str(uuid.uuid4())
request._audit_session_id = request.session.get('audit_session_id')
def process_response(self, request, response):
"""
Log audit event after response is generated.
"""
# Only log document access and search endpoints
if not self._should_audit(request):
return response
# Extract user identity
user_email = self._get_user_email(request)
user_id = self._get_user_id(request)
if not user_email:
# Anonymous access — log with placeholder
user_email = "anonymous@bioqms.local"
# Determine action type
action = self._determine_action(request, response)
# Extract document context
document_id, document_title, document_category = self._get_document_context(request)
# Get network context
ip_address = self._get_client_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '')
geolocation = self._get_geolocation(ip_address)
# Get token ID if present
token_id = self._get_token_id(request)
# Calculate response time
response_time_ms = int((time.time() - request._audit_start_time) * 1000)
# Build metadata
metadata = self._build_metadata(request, response)
# Queue async audit log
log_audit_event_async.delay(
user_email=user_email,
user_id=user_id,
action=action,
document_id=document_id,
document_title=document_title,
document_category=document_category,
ip_address=ip_address,
user_agent=user_agent,
geolocation=geolocation,
token_id=token_id,
session_id=request._audit_session_id,
metadata=metadata,
response_time_ms=response_time_ms,
)
return response
def _should_audit(self, request):
"""
Determine if this request should be audited.
"""
path = request.path
# Audit document access endpoints
if path.startswith('/api/v1/documents/'):
return True
# Audit search endpoints
if path.startswith('/api/v1/search'):
return True
# Audit download endpoints
if path.startswith('/api/v1/download/'):
return True
# Audit NDA acceptance
if path.startswith('/api/v1/nda/accept'):
return True
return False
def _get_user_email(self, request):
"""Extract user email from JWT or session."""
if hasattr(request, 'user') and request.user.is_authenticated:
return request.user.email
# Try JWT token
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
# Decode JWT to get email claim
# (Implementation depends on JWT library)
try:
from .auth import decode_jwt
payload = decode_jwt(token)
return payload.get('email')
except Exception:
pass
return None
def _get_user_id(self, request):
"""Extract user ID from JWT or session."""
if hasattr(request, 'user') and request.user.is_authenticated:
return str(request.user.id)
# Try JWT token
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
try:
from .auth import decode_jwt
payload = decode_jwt(token)
return payload.get('sub') # Subject claim
except Exception:
pass
return None
def _determine_action(self, request, response):
"""Determine audit action type based on request/response."""
path = request.path
method = request.method
if path.startswith('/api/v1/search'):
return 'search'
if path.startswith('/api/v1/download/'):
return 'download'
if path.startswith('/api/v1/nda/accept'):
return 'nda_accept' if response.status_code == 200 else 'nda_decline'
if path.startswith('/api/v1/documents/'):
if method == 'GET':
return 'view'
elif method == 'POST' and 'print' in request.POST:
return 'print'
elif method == 'POST' and 'share' in request.POST:
return 'share_attempt'
# Check for auth failures
if response.status_code in [401, 403]:
return 'auth_failure'
return 'view' # Default
def _get_document_context(self, request):
"""Extract document ID, title, and category from request."""
path = request.path
# Parse document ID from URL
# Example: /api/v1/documents/docs-architecture-system-design
if '/documents/' in path:
document_id = path.split('/documents/')[-1].split('/')[0]
elif '/download/' in path:
document_id = path.split('/download/')[-1].split('/')[0]
else:
document_id = 'unknown'
# Fetch document metadata from publish.json cache
# (Implementation depends on caching strategy)
try:
from .document_cache import get_document_metadata
doc_meta = get_document_metadata(document_id)
document_title = doc_meta.get('title', document_id)
document_category = doc_meta.get('category', 'Uncategorized')
except Exception:
document_title = document_id
document_category = 'Unknown'
return document_id, document_title, document_category
def _get_client_ip(self, request):
"""Get real client IP (handle proxies/load balancers)."""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0].strip()
else:
ip = request.META.get('REMOTE_ADDR')
return ip
def _get_geolocation(self, ip_address):
"""Get geolocation data from IP address using GeoIP2."""
if not self.geoip_reader:
return None
try:
response = self.geoip_reader.city(ip_address)
return {
'country': response.country.name,
'country_code': response.country.iso_code,
'region': response.subdivisions.most_specific.name if response.subdivisions else None,
'city': response.city.name,
'latitude': response.location.latitude,
'longitude': response.location.longitude,
}
except Exception:
return None
def _get_token_id(self, request):
"""Extract DocumentViewToken UUID from request."""
# Check for token_id in query params or headers
token_id = request.GET.get('token_id') or request.META.get('HTTP_X_TOKEN_ID')
if token_id:
try:
return uuid.UUID(token_id)
except ValueError:
pass
return None
def _build_metadata(self, request, response):
"""Build metadata dict with action-specific context."""
metadata = {}
# Add search query for search actions
if request.path.startswith('/api/v1/search'):
metadata['search_query'] = request.GET.get('q', '')
metadata['search_results_count'] = getattr(response, 'results_count', 0)
# Add download format
if request.path.startswith('/api/v1/download/'):
metadata['download_format'] = request.GET.get('format', 'pdf')
# Add referrer URL
referrer = request.META.get('HTTP_REFERER')
if referrer:
metadata['referrer_url'] = referrer
# Add failure reason for auth failures
if response.status_code in [401, 403]:
metadata['failure_reason'] = getattr(response, 'reason_phrase', 'Unknown')
return metadata
4.2 Celery Async Task
File: backend/audit/tasks.py
from celery import shared_task
from django.utils import timezone
from datetime import timedelta
from .models import AuditLog
from django.conf import settings
@shared_task(bind=True, max_retries=3)
def log_audit_event_async(self, **kwargs):
"""
Asynchronously log an audit event.
Uses retry logic to ensure guaranteed delivery.
Batches are handled by Celery's task grouping.
"""
try:
# Calculate retention period based on regulation
retention_days = get_retention_period(kwargs.get('action'))
retention_until = timezone.now() + timedelta(days=retention_days)
AuditLog.objects.create(
user_email=kwargs['user_email'],
user_id=kwargs.get('user_id'),
action=kwargs['action'],
document_id=kwargs['document_id'],
document_title=kwargs['document_title'],
document_category=kwargs.get('document_category'),
ip_address=kwargs['ip_address'],
user_agent=kwargs['user_agent'],
geolocation=kwargs.get('geolocation'),
token_id=kwargs.get('token_id'),
session_id=kwargs['session_id'],
metadata=kwargs.get('metadata', {}),
response_time_ms=kwargs.get('response_time_ms'),
retention_until=retention_until,
)
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
def get_retention_period(action):
"""
Get retention period in days based on action type and regulations.
Returns:
int: Number of days to retain this audit log
"""
# Default: 7 years (2557 days) per 21 CFR Part 11
default_retention = 2557
retention_policy = {
'view': 2557, # 7 years (FDA)
'download': 2557, # 7 years (FDA)
'search': 1095, # 3 years (less critical)
'print': 2557, # 7 years (FDA)
'share_attempt': 2557, # 7 years (security)
'auth_failure': 2557, # 7 years (security)
'nda_accept': 2557, # 7 years (legal)
'nda_decline': 2557, # 7 years (legal)
}
return retention_policy.get(action, default_retention)
@shared_task
def batch_log_audit_events(events):
"""
Batch insert multiple audit events for performance.
Args:
events: List of dict with audit event data
"""
audit_logs = []
for event in events:
retention_days = get_retention_period(event.get('action'))
retention_until = timezone.now() + timedelta(days=retention_days)
audit_logs.append(AuditLog(
user_email=event['user_email'],
user_id=event.get('user_id'),
action=event['action'],
document_id=event['document_id'],
document_title=event['document_title'],
document_category=event.get('document_category'),
ip_address=event['ip_address'],
user_agent=event['user_agent'],
geolocation=event.get('geolocation'),
token_id=event.get('token_id'),
session_id=event['session_id'],
metadata=event.get('metadata', {}),
response_time_ms=event.get('response_time_ms'),
retention_until=retention_until,
))
# Bulk insert (up to 1000 at a time)
AuditLog.objects.bulk_create(audit_logs, batch_size=1000)
5. Analytics Capabilities
5.1 Core Metrics
The audit trail system provides comprehensive analytics through aggregation queries and materialized views:
5.1.1 Document View Metrics
Total Views Per Document:
from django.db.models import Count, Q
from .models import AuditLog
def get_document_view_counts(document_id=None, start_date=None, end_date=None):
"""
Get view counts for documents.
Returns:
QuerySet with document_id, total_views, unique_users, unique_ips
"""
queryset = AuditLog.objects.filter(action='view')
if document_id:
queryset = queryset.filter(document_id=document_id)
if start_date:
queryset = queryset.filter(timestamp__gte=start_date)
if end_date:
queryset = queryset.filter(timestamp__lte=end_date)
return queryset.values('document_id', 'document_title').annotate(
total_views=Count('id'),
unique_users=Count('user_email', distinct=True),
unique_ips=Count('ip_address', distinct=True)
).order_by('-total_views')
5.1.2 User Engagement Metrics
Average Time on Page:
from django.db.models import Avg, Sum, F
def get_engagement_metrics(document_id=None):
"""
Calculate user engagement metrics.
Returns:
dict: {
'avg_duration_seconds': float,
'total_views': int,
'bounce_rate': float # % of sessions with <10s duration
}
"""
queryset = AuditLog.objects.filter(
action='view',
duration_ms__isnull=False
)
if document_id:
queryset = queryset.filter(document_id=document_id)
stats = queryset.aggregate(
avg_duration_ms=Avg('duration_ms'),
total_views=Count('id'),
short_sessions=Count('id', filter=Q(duration_ms__lt=10000))
)
return {
'avg_duration_seconds': stats['avg_duration_ms'] / 1000 if stats['avg_duration_ms'] else 0,
'total_views': stats['total_views'],
'bounce_rate': (stats['short_sessions'] / stats['total_views'] * 100) if stats['total_views'] > 0 else 0
}
5.1.3 Search Analytics
Popular Search Queries:
def get_search_analytics(limit=50):
"""
Analyze search behavior.
Returns:
list: Top search queries with counts and zero-result rates
"""
search_logs = AuditLog.objects.filter(action='search')
# Extract search queries from metadata
queries = {}
for log in search_logs:
query = log.metadata.get('search_query', '').strip().lower()
if not query:
continue
if query not in queries:
queries[query] = {
'query': query,
'count': 0,
'zero_results': 0
}
queries[query]['count'] += 1
if log.metadata.get('search_results_count', 0) == 0:
queries[query]['zero_results'] += 1
# Calculate zero-result rate
for query_data in queries.values():
query_data['zero_result_rate'] = (
query_data['zero_results'] / query_data['count'] * 100
)
# Sort by count and return top N
return sorted(
queries.values(),
key=lambda x: x['count'],
reverse=True
)[:limit]
5.1.4 Access Pattern Detection
Unusual Access Patterns:
from django.db.models import Window, F
from django.db.models.functions import RowNumber
from datetime import timedelta
def detect_anomalies(threshold_views_per_hour=100):
"""
Detect unusual access patterns that may indicate:
- Bulk downloading
- Rapid traversal (bot behavior)
- Unauthorized access attempts
Returns:
list: Anomalous sessions with details
"""
# Find sessions with >threshold_views_per_hour views
one_hour_ago = timezone.now() - timedelta(hours=1)
high_activity_sessions = AuditLog.objects.filter(
timestamp__gte=one_hour_ago
).values('session_id', 'user_email').annotate(
view_count=Count('id'),
unique_documents=Count('document_id', distinct=True),
ip_count=Count('ip_address', distinct=True)
).filter(
view_count__gte=threshold_views_per_hour
)
anomalies = []
for session in high_activity_sessions:
# Check for rapid traversal (many docs, short time)
if session['unique_documents'] > 50:
anomalies.append({
'type': 'rapid_traversal',
'session_id': session['session_id'],
'user_email': session['user_email'],
'view_count': session['view_count'],
'unique_documents': session['unique_documents'],
'severity': 'high'
})
# Check for IP hopping (same session, multiple IPs)
if session['ip_count'] > 3:
anomalies.append({
'type': 'ip_hopping',
'session_id': session['session_id'],
'user_email': session['user_email'],
'ip_count': session['ip_count'],
'severity': 'critical'
})
return anomalies
5.2 Materialized Views for Performance
SQL Migration: backend/audit/migrations/0003_materialized_views.py
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('audit', '0002_partition_by_month'),
]
operations = [
# Daily document stats materialized view
migrations.RunSQL(
sql="""
CREATE MATERIALIZED VIEW audit_daily_document_stats AS
SELECT
DATE(timestamp) as date,
document_id,
document_title,
document_category,
COUNT(*) as total_views,
COUNT(DISTINCT user_email) as unique_users,
COUNT(DISTINCT ip_address) as unique_ips,
AVG(duration_ms) as avg_duration_ms
FROM audit_log
WHERE action = 'view'
GROUP BY DATE(timestamp), document_id, document_title, document_category;
CREATE UNIQUE INDEX ON audit_daily_document_stats (date, document_id);
""",
reverse_sql="DROP MATERIALIZED VIEW IF EXISTS audit_daily_document_stats CASCADE;"
),
# Hourly user activity materialized view
migrations.RunSQL(
sql="""
CREATE MATERIALIZED VIEW audit_hourly_user_activity AS
SELECT
DATE_TRUNC('hour', timestamp) as hour,
user_email,
action,
COUNT(*) as action_count
FROM audit_log
GROUP BY DATE_TRUNC('hour', timestamp), user_email, action;
CREATE UNIQUE INDEX ON audit_hourly_user_activity (hour, user_email, action);
""",
reverse_sql="DROP MATERIALIZED VIEW IF EXISTS audit_hourly_user_activity CASCADE;"
),
# Search analytics materialized view
migrations.RunSQL(
sql="""
CREATE MATERIALIZED VIEW audit_search_analytics AS
SELECT
metadata->>'search_query' as search_query,
COUNT(*) as search_count,
SUM(CASE WHEN (metadata->>'search_results_count')::int = 0 THEN 1 ELSE 0 END) as zero_result_count,
AVG((metadata->>'search_results_count')::int) as avg_results
FROM audit_log
WHERE action = 'search' AND metadata->>'search_query' IS NOT NULL
GROUP BY metadata->>'search_query';
CREATE INDEX ON audit_search_analytics (search_count DESC);
""",
reverse_sql="DROP MATERIALIZED VIEW IF EXISTS audit_search_analytics CASCADE;"
),
]
Refresh Task (Celery Beat):
# backend/audit/tasks.py
from celery import shared_task
from django.db import connection
@shared_task
def refresh_materialized_views():
"""
Refresh all audit analytics materialized views.
Runs every hour via Celery Beat.
"""
with connection.cursor() as cursor:
cursor.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY audit_daily_document_stats;")
cursor.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY audit_hourly_user_activity;")
cursor.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY audit_search_analytics;")
6. Analytics API Endpoints
6.1 Document Analytics Endpoint
File: backend/audit/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from django.core.cache import cache
from django.db import connection
from datetime import datetime, timedelta
from .models import AuditLog
class DocumentAnalyticsView(APIView):
"""
GET /api/v1/analytics/documents
Returns aggregated document access metrics.
Query Parameters:
- document_id: str (optional, filter by document)
- start_date: ISO date (optional, default: 30 days ago)
- end_date: ISO date (optional, default: now)
- category: str (optional, filter by category)
- limit: int (optional, default: 50, max: 500)
- order_by: str (optional, 'views'|'users'|'engagement', default: 'views')
Response:
{
"documents": [
{
"document_id": "docs-architecture-system-design",
"title": "System Design Document",
"category": "Architecture",
"total_views": 1234,
"unique_users": 45,
"unique_ips": 38,
"avg_duration_seconds": 180,
"last_viewed": "2026-02-16T10:30:00Z"
},
...
],
"total_count": 133,
"date_range": {
"start": "2026-01-17",
"end": "2026-02-16"
}
}
"""
permission_classes = [IsAuthenticated]
def get(self, request):
# Parse query parameters
document_id = request.GET.get('document_id')
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
category = request.GET.get('category')
limit = int(request.GET.get('limit', 50))
order_by = request.GET.get('order_by', 'views')
# Default date range: last 30 days
if not end_date:
end_date = datetime.now()
else:
end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
if not start_date:
start_date = end_date - timedelta(days=30)
else:
start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
# Limit validation
if limit > 500:
limit = 500
# Build cache key
cache_key = f"analytics:documents:{document_id}:{start_date.date()}:{end_date.date()}:{category}:{limit}:{order_by}"
# Try cache first (60 second TTL)
cached_result = cache.get(cache_key)
if cached_result:
return Response(cached_result)
# Query materialized view for performance
with connection.cursor() as cursor:
sql = """
SELECT
document_id,
document_title,
document_category,
SUM(total_views) as total_views,
SUM(unique_users) as unique_users,
SUM(unique_ips) as unique_ips,
AVG(avg_duration_ms) as avg_duration_ms,
MAX(date) as last_viewed
FROM audit_daily_document_stats
WHERE date BETWEEN %s AND %s
"""
params = [start_date.date(), end_date.date()]
if document_id:
sql += " AND document_id = %s"
params.append(document_id)
if category:
sql += " AND document_category = %s"
params.append(category)
sql += " GROUP BY document_id, document_title, document_category"
# Order by clause
if order_by == 'users':
sql += " ORDER BY unique_users DESC"
elif order_by == 'engagement':
sql += " ORDER BY avg_duration_ms DESC"
else: # default: views
sql += " ORDER BY total_views DESC"
sql += " LIMIT %s"
params.append(limit)
cursor.execute(sql, params)
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
# Format response
documents = []
for row in results:
documents.append({
'document_id': row['document_id'],
'title': row['document_title'],
'category': row['document_category'],
'total_views': int(row['total_views']),
'unique_users': int(row['unique_users']),
'unique_ips': int(row['unique_ips']),
'avg_duration_seconds': round(row['avg_duration_ms'] / 1000, 1) if row['avg_duration_ms'] else 0,
'last_viewed': row['last_viewed'].isoformat() if row['last_viewed'] else None
})
response_data = {
'documents': documents,
'total_count': len(documents),
'date_range': {
'start': start_date.date().isoformat(),
'end': end_date.date().isoformat()
}
}
# Cache result
cache.set(cache_key, response_data, 60)
return Response(response_data)
6.2 User Activity Analytics Endpoint
class UserAnalyticsView(APIView):
"""
GET /api/v1/analytics/users
Returns per-user access summary.
Query Parameters:
- user_email: str (optional, filter by user)
- start_date: ISO date (optional)
- end_date: ISO date (optional)
- limit: int (optional, default: 100)
Response:
{
"users": [
{
"user_email": "researcher@bioqms.local",
"total_actions": 456,
"documents_viewed": 23,
"searches_performed": 12,
"downloads": 5,
"avg_session_duration_minutes": 15.3,
"last_active": "2026-02-16T14:22:00Z"
},
...
]
}
"""
permission_classes = [IsAuthenticated]
def get(self, request):
user_email = request.GET.get('user_email')
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
limit = int(request.GET.get('limit', 100))
# Default: last 30 days
if not end_date:
end_date = datetime.now()
else:
end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
if not start_date:
start_date = end_date - timedelta(days=30)
else:
start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
# Query
queryset = AuditLog.objects.filter(
timestamp__range=[start_date, end_date]
)
if user_email:
queryset = queryset.filter(user_email=user_email)
# Aggregate by user
user_stats = queryset.values('user_email').annotate(
total_actions=Count('id'),
documents_viewed=Count('id', filter=Q(action='view')),
searches_performed=Count('id', filter=Q(action='search')),
downloads=Count('id', filter=Q(action='download')),
unique_sessions=Count('session_id', distinct=True),
total_duration_ms=Sum('duration_ms', filter=Q(action='view')),
last_active=Max('timestamp')
).order_by('-total_actions')[:limit]
users = []
for stat in user_stats:
avg_session_duration = 0
if stat['unique_sessions'] > 0 and stat['total_duration_ms']:
avg_session_duration = stat['total_duration_ms'] / stat['unique_sessions'] / 60000 # to minutes
users.append({
'user_email': stat['user_email'],
'total_actions': stat['total_actions'],
'documents_viewed': stat['documents_viewed'],
'searches_performed': stat['searches_performed'],
'downloads': stat['downloads'],
'avg_session_duration_minutes': round(avg_session_duration, 1),
'last_active': stat['last_active'].isoformat()
})
return Response({
'users': users,
'total_count': len(users),
'date_range': {
'start': start_date.date().isoformat(),
'end': end_date.date().isoformat()
}
})
6.3 Access Patterns Analysis Endpoint
class AccessPatternsView(APIView):
"""
GET /api/v1/analytics/patterns
Returns access pattern analysis including:
- Peak usage hours
- Geographic distribution
- Device/browser breakdown
- Anomaly detection
Response:
{
"peak_hours": {
"hour": 14, // 2 PM UTC
"view_count": 234
},
"geographic_distribution": [
{"country": "United States", "count": 1234},
{"country": "United Kingdom", "count": 456},
...
],
"device_breakdown": {
"desktop": 78,
"mobile": 15,
"tablet": 7
},
"anomalies": [
{
"type": "rapid_traversal",
"session_id": "uuid",
"user_email": "user@example.com",
"severity": "high",
"detected_at": "2026-02-16T10:15:00Z"
}
]
}
"""
permission_classes = [IsAuthenticated]
def get(self, request):
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
if not end_date:
end_date = datetime.now()
else:
end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
if not start_date:
start_date = end_date - timedelta(days=7)
else:
start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
queryset = AuditLog.objects.filter(timestamp__range=[start_date, end_date])
# Peak hours analysis
from django.db.models.functions import ExtractHour
peak_hours = queryset.annotate(
hour=ExtractHour('timestamp')
).values('hour').annotate(
count=Count('id')
).order_by('-count').first()
# Geographic distribution
geo_dist = {}
for log in queryset.exclude(geolocation__isnull=True):
country = log.geolocation.get('country', 'Unknown')
geo_dist[country] = geo_dist.get(country, 0) + 1
geographic_distribution = [
{'country': k, 'count': v}
for k, v in sorted(geo_dist.items(), key=lambda x: x[1], reverse=True)
][:10]
# Device breakdown (from user agent)
device_counts = {'desktop': 0, 'mobile': 0, 'tablet': 0, 'unknown': 0}
for log in queryset:
ua = log.user_agent.lower()
if 'mobile' in ua or 'android' in ua:
device_counts['mobile'] += 1
elif 'tablet' in ua or 'ipad' in ua:
device_counts['tablet'] += 1
elif 'mozilla' in ua or 'chrome' in ua:
device_counts['desktop'] += 1
else:
device_counts['unknown'] += 1
# Detect anomalies
from .analytics import detect_anomalies
anomalies = detect_anomalies()
return Response({
'peak_hours': peak_hours,
'geographic_distribution': geographic_distribution,
'device_breakdown': device_counts,
'anomalies': anomalies[:20] # Limit to top 20
})
6.4 Search Analytics Endpoint
class SearchAnalyticsView(APIView):
"""
GET /api/v1/analytics/search
Returns search behavior analytics.
Response:
{
"popular_queries": [
{
"query": "hipaa compliance",
"count": 123,
"zero_result_rate": 5.2
},
...
],
"zero_result_queries": [
{
"query": "xyz protocol",
"count": 45
}
],
"avg_results_per_query": 8.3
}
"""
permission_classes = [IsAuthenticated]
def get(self, request):
limit = int(request.GET.get('limit', 50))
# Query materialized view
with connection.cursor() as cursor:
# Popular queries
cursor.execute("""
SELECT search_query, search_count, zero_result_count, avg_results
FROM audit_search_analytics
ORDER BY search_count DESC
LIMIT %s
""", [limit])
columns = [col[0] for col in cursor.description]
popular_queries = []
for row in cursor.fetchall():
data = dict(zip(columns, row))
zero_result_rate = (data['zero_result_count'] / data['search_count'] * 100) if data['search_count'] > 0 else 0
popular_queries.append({
'query': data['search_query'],
'count': data['search_count'],
'zero_result_rate': round(zero_result_rate, 1),
'avg_results': round(data['avg_results'], 1) if data['avg_results'] else 0
})
# Zero-result queries
cursor.execute("""
SELECT search_query, zero_result_count
FROM audit_search_analytics
WHERE zero_result_count > 0
ORDER BY zero_result_count DESC
LIMIT %s
""", [limit])
zero_result_queries = [
{'query': row[0], 'count': row[1]}
for row in cursor.fetchall()
]
# Overall stats
cursor.execute("""
SELECT AVG(avg_results) as overall_avg
FROM audit_search_analytics
""")
overall_avg = cursor.fetchone()[0] or 0
return Response({
'popular_queries': popular_queries,
'zero_result_queries': zero_result_queries,
'avg_results_per_query': round(overall_avg, 1)
})
7. Compliance Reporting
7.1 CSV Export
File: backend/audit/reports.py
import csv
from io import StringIO
from django.http import HttpResponse
from datetime import datetime, timedelta
from .models import AuditLog
def generate_csv_report(start_date, end_date, filters=None):
"""
Generate CSV audit report for compliance.
Args:
start_date: datetime
end_date: datetime
filters: dict with optional filters (user_email, document_id, action)
Returns:
HttpResponse with CSV content
"""
queryset = AuditLog.objects.filter(
timestamp__range=[start_date, end_date]
).order_by('timestamp')
if filters:
if filters.get('user_email'):
queryset = queryset.filter(user_email=filters['user_email'])
if filters.get('document_id'):
queryset = queryset.filter(document_id=filters['document_id'])
if filters.get('action'):
queryset = queryset.filter(action=filters['action'])
# Create CSV in memory
output = StringIO()
writer = csv.writer(output)
# Write header
writer.writerow([
'ID',
'Timestamp (UTC)',
'User Email',
'User ID',
'Action',
'Document ID',
'Document Title',
'Category',
'IP Address',
'Geolocation',
'Session ID',
'Token ID',
'Duration (seconds)',
'Metadata'
])
# Write data rows
for log in queryset:
writer.writerow([
log.id,
log.timestamp.isoformat(),
log.user_email,
log.user_id or '',
log.get_action_display(),
log.document_id,
log.document_title,
log.document_category or '',
log.ip_address,
f"{log.geolocation.get('country', '')} - {log.geolocation.get('city', '')}" if log.geolocation else '',
log.session_id,
log.token_id or '',
round(log.duration_ms / 1000, 1) if log.duration_ms else '',
str(log.metadata) if log.metadata else ''
])
# Create HTTP response
response = HttpResponse(output.getvalue(), content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename="audit_report_{start_date.date()}_{end_date.date()}.csv"'
return response
7.2 JSON Export
import json
from django.http import JsonResponse
def generate_json_report(start_date, end_date, filters=None):
"""
Generate structured JSON audit report.
Returns:
JsonResponse with full audit trail data
"""
queryset = AuditLog.objects.filter(
timestamp__range=[start_date, end_date]
).order_by('timestamp')
if filters:
if filters.get('user_email'):
queryset = queryset.filter(user_email=filters['user_email'])
if filters.get('document_id'):
queryset = queryset.filter(document_id=filters['document_id'])
if filters.get('action'):
queryset = queryset.filter(action=filters['action'])
# Build structured JSON
report = {
'report_metadata': {
'generated_at': datetime.now().isoformat(),
'date_range': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'filters': filters or {},
'total_records': queryset.count()
},
'audit_logs': []
}
for log in queryset:
report['audit_logs'].append({
'id': log.id,
'timestamp': log.timestamp.isoformat(),
'user': {
'email': log.user_email,
'user_id': log.user_id
},
'action': {
'type': log.action,
'display_name': log.get_action_display()
},
'document': {
'id': log.document_id,
'title': log.document_title,
'category': log.document_category
},
'network': {
'ip_address': log.ip_address,
'geolocation': log.geolocation,
'user_agent': log.user_agent
},
'session': {
'session_id': str(log.session_id),
'token_id': str(log.token_id) if log.token_id else None
},
'performance': {
'duration_ms': log.duration_ms,
'response_time_ms': log.response_time_ms
},
'metadata': log.metadata,
'integrity': {
'record_hash': log.record_hash,
'previous_hash': log.previous_hash
}
})
return JsonResponse(report, json_dumps_params={'indent': 2})
7.3 PDF Report Generation
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak
from reportlab.lib import colors
from django.http import HttpResponse
from io import BytesIO
def generate_pdf_report(start_date, end_date, filters=None):
"""
Generate formatted PDF compliance audit report.
Returns:
HttpResponse with PDF content
"""
buffer = BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=letter)
story = []
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
textColor=colors.HexColor('#1a1a1a'),
spaceAfter=30
)
# Title page
story.append(Paragraph("BIO-QMS Platform", title_style))
story.append(Paragraph("Document Access Audit Report", styles['Heading2']))
story.append(Spacer(1, 0.3 * inch))
# Report metadata
metadata_data = [
['Report Generated:', datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')],
['Date Range:', f"{start_date.date()} to {end_date.date()}"],
['Regulatory Framework:', '21 CFR Part 11, HIPAA §164.312(b)'],
]
if filters:
if filters.get('user_email'):
metadata_data.append(['Filtered by User:', filters['user_email']])
if filters.get('document_id'):
metadata_data.append(['Filtered by Document:', filters['document_id']])
metadata_table = Table(metadata_data, colWidths=[2 * inch, 4 * inch])
metadata_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(metadata_table)
story.append(Spacer(1, 0.5 * inch))
# Summary statistics
queryset = AuditLog.objects.filter(timestamp__range=[start_date, end_date])
if filters:
if filters.get('user_email'):
queryset = queryset.filter(user_email=filters['user_email'])
if filters.get('document_id'):
queryset = queryset.filter(document_id=filters['document_id'])
total_records = queryset.count()
unique_users = queryset.values('user_email').distinct().count()
unique_documents = queryset.values('document_id').distinct().count()
story.append(Paragraph("Summary Statistics", styles['Heading3']))
summary_data = [
['Total Audit Records', str(total_records)],
['Unique Users', str(unique_users)],
['Unique Documents Accessed', str(unique_documents)],
['Document Views', str(queryset.filter(action='view').count())],
['Downloads', str(queryset.filter(action='download').count())],
['Authentication Failures', str(queryset.filter(action='auth_failure').count())],
]
summary_table = Table(summary_data, colWidths=[3 * inch, 2 * inch])
summary_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.lightgrey),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
]))
story.append(summary_table)
story.append(PageBreak())
# Detailed audit log table
story.append(Paragraph("Detailed Audit Log", styles['Heading3']))
audit_data = [['Timestamp', 'User', 'Action', 'Document', 'IP Address']]
for log in queryset.order_by('timestamp')[:100]: # Limit to first 100 for PDF
audit_data.append([
log.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
log.user_email[:30], # Truncate long emails
log.get_action_display(),
log.document_title[:40], # Truncate long titles
log.ip_address
])
audit_table = Table(audit_data, colWidths=[1.2 * inch, 1.5 * inch, 1 * inch, 2 * inch, 1 * inch])
audit_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('FONTSIZE', (0, 1), (-1, -1), 8),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('VALIGN', (0, 0), (-1, -1), 'TOP'),
]))
story.append(audit_table)
if queryset.count() > 100:
story.append(Spacer(1, 0.2 * inch))
story.append(Paragraph(f"<i>Note: Showing first 100 of {queryset.count()} total records. Download CSV for complete data.</i>", styles['Normal']))
# Build PDF
doc.build(story)
buffer.seek(0)
response = HttpResponse(buffer.getvalue(), content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="audit_report_{start_date.date()}_{end_date.date()}.pdf"'
return response
7.4 Scheduled Reports (Celery Beat)
File: backend/audit/tasks.py
from celery import shared_task
from celery.schedules import crontab
from django.core.mail import EmailMessage
from datetime import datetime, timedelta
from .reports import generate_csv_report, generate_pdf_report
@shared_task
def send_daily_audit_report():
"""
Generate and email daily audit report.
Runs at 8:00 AM UTC every day via Celery Beat.
"""
yesterday = datetime.now() - timedelta(days=1)
start_date = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
end_date = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999)
# Generate CSV report
csv_response = generate_csv_report(start_date, end_date)
csv_content = csv_response.content
# Generate PDF report
pdf_response = generate_pdf_report(start_date, end_date)
pdf_content = pdf_response.content
# Email to compliance team
email = EmailMessage(
subject=f"BIO-QMS Daily Audit Report - {yesterday.date()}",
body=f"Attached is the daily document access audit report for {yesterday.date()}.\n\nTotal events logged: {AuditLog.objects.filter(timestamp__range=[start_date, end_date]).count()}",
from_email='noreply@bioqms.local',
to=['compliance@bioqms.local', 'security@bioqms.local']
)
email.attach(f'audit_report_{yesterday.date()}.csv', csv_content, 'text/csv')
email.attach(f'audit_report_{yesterday.date()}.pdf', pdf_content, 'application/pdf')
email.send()
@shared_task
def send_weekly_audit_report():
"""
Generate and email weekly audit report.
Runs every Monday at 9:00 AM UTC.
"""
today = datetime.now()
start_date = today - timedelta(days=7)
end_date = today
csv_response = generate_csv_report(start_date, end_date)
pdf_response = generate_pdf_report(start_date, end_date)
email = EmailMessage(
subject=f"BIO-QMS Weekly Audit Report - Week of {start_date.date()}",
body=f"Attached is the weekly document access audit report covering {start_date.date()} to {end_date.date()}.",
from_email='noreply@bioqms.local',
to=['compliance@bioqms.local', 'management@bioqms.local']
)
email.attach(f'weekly_audit_report_{start_date.date()}.csv', csv_response.content, 'text/csv')
email.attach(f'weekly_audit_report_{start_date.date()}.pdf', pdf_response.content, 'application/pdf')
email.send()
# Celery Beat schedule configuration
# backend/settings.py
CELERY_BEAT_SCHEDULE = {
'daily-audit-report': {
'task': 'audit.tasks.send_daily_audit_report',
'schedule': crontab(hour=8, minute=0), # 8:00 AM UTC
},
'weekly-audit-report': {
'task': 'audit.tasks.send_weekly_audit_report',
'schedule': crontab(day_of_week=1, hour=9, minute=0), # Monday 9:00 AM
},
'refresh-materialized-views': {
'task': 'audit.tasks.refresh_materialized_views',
'schedule': crontab(minute=0), # Every hour
},
}
Document continues... (sections 8-15 to follow in next response)