Legibility Controls for Long-Term Storage
Executive Summary
This document defines the comprehensive legibility controls for the CODITECT BIO-QMS platform, ensuring that regulated electronic records remain readable, accessible, and verifiable throughout their retention period (25+ years). The controls address format validation, rendering verification, format migration, accessibility, print management, storage architecture, and continuous monitoring in compliance with FDA 21 CFR Part 11, EU Annex 11, and WHO TRS 996 Annex 5.
Key Requirements:
- Legibility: Records must remain readable for the duration of their retention period
- Accuracy: Records must maintain their original content and meaning
- Availability: Records must be retrievable within defined timeframes
- Integrity: Records must be protected from corruption, degradation, and unauthorized modification
Architecture Overview:
- Primary Archive Format: PDF/A-2b (ISO 19005-2)
- Storage Tiers: Hot (SSD) → Warm (HDD) → Cold (GCS Archive)
- Validation: Automated format compliance + periodic rendering verification
- Migration: Technology obsolescence monitoring + controlled migration pipeline
1. Format Validation
1.1 Supported Archival Formats
All regulated records must be archived in one of the following validated formats:
| Format | Standard | Use Case | Retention Profile |
|---|---|---|---|
| PDF/A-2b | ISO 19005-2:2011 | Primary archive format for all document types | Hot/Warm/Cold |
| PDF/A-3b | ISO 19005-3:2012 | Documents with embedded source files (e.g., data files) | Hot/Warm/Cold |
| TIFF 6.0 | TIFF Specification 6.0 | Legacy scanned documents, migration source | Warm/Cold |
Rationale:
- PDF/A is the industry-standard long-term archival format
- PDF/A-2b provides ISO standardization with wide renderer support
- PDF/A-3b allows embedding of source data files while maintaining archival compliance
- TIFF 6.0 is accepted for legacy scanned documents but will be migrated to PDF/A
1.2 Automatic Format Integrity Check
Trigger: Every record archival event (document finalization, batch upload, migration)
Validation Pipeline:
Implementation:
# File: backend/qms/archival/format_validator.py
import subprocess
import hashlib
from pathlib import Path
from typing import Tuple, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import xml.etree.ElementTree as ET
@dataclass
class ValidationResult:
"""Format validation result."""
is_valid: bool
format_type: str
pdf_a_compliance: Optional[str] # e.g., "PDF/A-2b"
errors: list[str]
warnings: list[str]
metadata_complete: bool
fonts_embedded: bool
color_profiles_embedded: bool
checksum_sha256: str
validated_at: datetime
validator_version: str
class ArchivalFormatValidator:
"""
Validates archival format compliance for long-term storage.
Compliance: FDA 21 CFR Part 11 §11.10(b) - legibility
"""
VERAPDF_PATH = "/opt/verapdf/verapdf"
LIBTIFF_TIFFINFO_PATH = "/usr/bin/tiffinfo"
REQUIRED_METADATA_FIELDS = [
"dc:title",
"dc:creator",
"dc:subject",
"dc:description",
"xmp:CreateDate",
"xmp:ModifyDate",
"pdf:Producer",
]
def validate_pdf_a(self, file_path: Path) -> ValidationResult:
"""
Validate PDF/A compliance using veraPDF.
Args:
file_path: Path to PDF file
Returns:
ValidationResult with compliance status
"""
errors = []
warnings = []
# Run veraPDF validation
try:
result = subprocess.run(
[
self.VERAPDF_PATH,
"--format", "xml",
"--flavour", "2b", # PDF/A-2b
str(file_path)
],
capture_output=True,
text=True,
timeout=60
)
# Parse XML output
root = ET.fromstring(result.stdout)
# Check compliance
compliant_elem = root.find(".//compliant")
is_compliant = compliant_elem.text.lower() == "true" if compliant_elem is not None else False
# Extract errors
for error in root.findall(".//error"):
errors.append(error.find("message").text)
# Extract warnings
for warning in root.findall(".//warning"):
warnings.append(warning.find("message").text)
# Determine PDF/A flavor
flavour = root.find(".//flavour")
pdf_a_compliance = flavour.text if flavour is not None else "Unknown"
except subprocess.TimeoutExpired:
errors.append("veraPDF validation timeout (>60s)")
is_compliant = False
pdf_a_compliance = None
except Exception as e:
errors.append(f"veraPDF validation failed: {str(e)}")
is_compliant = False
pdf_a_compliance = None
# Metadata verification
metadata_complete = self._verify_metadata(file_path)
if not metadata_complete:
warnings.append("Incomplete XMP metadata")
# Font embedding verification
fonts_embedded = self._verify_fonts_embedded(file_path)
if not fonts_embedded:
errors.append("Not all fonts are embedded")
# Color profile verification
color_profiles_embedded = self._verify_color_profiles(file_path)
if not color_profiles_embedded:
errors.append("ICC color profiles not embedded")
# Generate checksum
checksum = self._generate_checksum(file_path)
return ValidationResult(
is_valid=is_compliant and fonts_embedded and color_profiles_embedded,
format_type="PDF/A",
pdf_a_compliance=pdf_a_compliance,
errors=errors,
warnings=warnings,
metadata_complete=metadata_complete,
fonts_embedded=fonts_embedded,
color_profiles_embedded=color_profiles_embedded,
checksum_sha256=checksum,
validated_at=datetime.utcnow(),
validator_version=self._get_verapdf_version()
)
def validate_tiff(self, file_path: Path) -> ValidationResult:
"""
Validate TIFF 6.0 compliance using LibTIFF.
Args:
file_path: Path to TIFF file
Returns:
ValidationResult with compliance status
"""
errors = []
warnings = []
try:
result = subprocess.run(
[self.LIBTIFF_TIFFINFO_PATH, str(file_path)],
capture_output=True,
text=True,
timeout=30
)
# Parse tiffinfo output
output = result.stdout
# Check for required TIFF tags
if "Image Width" not in output:
errors.append("Missing Image Width tag")
if "Image Length" not in output:
errors.append("Missing Image Length tag")
if "Bits/Sample" not in output:
errors.append("Missing Bits/Sample tag")
# Check compression
if "Compression Scheme" in output:
if "LZW" in output or "ZIP" in output or "None" in output:
pass # Acceptable compression
else:
warnings.append("Non-standard compression detected")
is_valid = len(errors) == 0
except subprocess.TimeoutExpired:
errors.append("TIFF validation timeout (>30s)")
is_valid = False
except Exception as e:
errors.append(f"TIFF validation failed: {str(e)}")
is_valid = False
# Generate checksum
checksum = self._generate_checksum(file_path)
return ValidationResult(
is_valid=is_valid,
format_type="TIFF",
pdf_a_compliance=None,
errors=errors,
warnings=warnings,
metadata_complete=False, # TIFF has limited metadata
fonts_embedded=True, # N/A for raster
color_profiles_embedded=False, # Not required for TIFF 6.0
checksum_sha256=checksum,
validated_at=datetime.utcnow(),
validator_version=self._get_libtiff_version()
)
def _verify_metadata(self, file_path: Path) -> bool:
"""Verify XMP metadata completeness using exiftool."""
try:
result = subprocess.run(
["exiftool", "-xmp:all", "-json", str(file_path)],
capture_output=True,
text=True,
timeout=10
)
import json
metadata = json.loads(result.stdout)[0]
# Check for required fields
for field in self.REQUIRED_METADATA_FIELDS:
if field not in metadata or not metadata[field]:
return False
return True
except Exception:
return False
def _verify_fonts_embedded(self, file_path: Path) -> bool:
"""Verify all fonts are embedded using pdffonts."""
try:
result = subprocess.run(
["pdffonts", str(file_path)],
capture_output=True,
text=True,
timeout=10
)
# Parse pdffonts output (skip header lines)
lines = result.stdout.strip().split("\n")[2:]
for line in lines:
columns = line.split()
if len(columns) >= 4:
# Column 3 is "emb" (embedded) - should be "yes"
if columns[3].lower() != "yes":
return False
return True
except Exception:
return False
def _verify_color_profiles(self, file_path: Path) -> bool:
"""Verify ICC color profiles are embedded."""
try:
result = subprocess.run(
["exiftool", "-icc_profile:all", str(file_path)],
capture_output=True,
text=True,
timeout=10
)
# Check if ICC profile info is present
return "ICC" in result.stdout
except Exception:
return False
def _generate_checksum(self, file_path: Path) -> str:
"""Generate SHA-256 checksum for file corruption detection."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def _get_verapdf_version(self) -> str:
"""Get veraPDF version for audit trail."""
try:
result = subprocess.run(
[self.VERAPDF_PATH, "--version"],
capture_output=True,
text=True,
timeout=5
)
return result.stdout.strip()
except Exception:
return "Unknown"
def _get_libtiff_version(self) -> str:
"""Get LibTIFF version for audit trail."""
try:
result = subprocess.run(
[self.LIBTIFF_TIFFINFO_PATH, "-v"],
capture_output=True,
text=True,
timeout=5
)
# Extract version from output
for line in result.stdout.split("\n"):
if "LIBTIFF" in line.upper():
return line.strip()
return "Unknown"
except Exception:
return "Unknown"
Database Schema:
-- Table: archival_validation_log
CREATE TABLE archival_validation_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),
-- Validation metadata
validated_at TIMESTAMP NOT NULL DEFAULT NOW(),
validator_version VARCHAR(255) NOT NULL,
-- Validation result
is_valid BOOLEAN NOT NULL,
format_type VARCHAR(50) NOT NULL, -- PDF/A, TIFF
pdf_a_compliance VARCHAR(50), -- PDF/A-2b, PDF/A-3b, etc.
-- Detailed checks
metadata_complete BOOLEAN NOT NULL,
fonts_embedded BOOLEAN NOT NULL,
color_profiles_embedded BOOLEAN NOT NULL,
-- Issues
errors JSONB, -- Array of error messages
warnings JSONB, -- Array of warning messages
-- File integrity
checksum_sha256 VARCHAR(64) NOT NULL,
file_size_bytes BIGINT NOT NULL,
-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
INDEX idx_validation_record (record_id),
INDEX idx_validation_tenant_date (tenant_id, validated_at),
INDEX idx_validation_invalid (is_valid, validated_at) WHERE is_valid = FALSE
);
2. Rendering Verification
2.1 Periodic Rendering Tests
Purpose: Verify that archived records can still be rendered accurately after storage.
Test Frequency:
- Hot tier (0-90 days): Quarterly (every 90 days)
- Warm tier (90 days - 2 years): Semi-annually (every 180 days)
- Cold tier (2+ years): Annually (every 365 days)
Test Coverage:
- Sample-based: 5% random sample of all records per test cycle
- Critical records: 100% of GxP-critical records (batch records, validation protocols, audit reports)
2.2 Automated Rendering Comparison
Pipeline:
Implementation:
# File: backend/qms/archival/rendering_verifier.py
import io
from pathlib import Path
from typing import Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
from PIL import Image
import fitz # PyMuPDF
import numpy as np
from skimage.metrics import structural_similarity as ssim
@dataclass
class RenderingTestResult:
"""Rendering verification test result."""
record_id: str
test_date: datetime
status: str # PASS, FAIL, MANUAL_REVIEW
similarity_score: float # 0.0 - 1.0
pixel_match_percentage: float # 0.0 - 100.0
reference_checksum: str
current_checksum: str
differences_detected: list[str]
reviewer_notes: Optional[str] = None
reviewed_by: Optional[str] = None
reviewed_at: Optional[datetime] = None
class RenderingVerifier:
"""
Verifies rendering accuracy of archived records.
Compliance: FDA 21 CFR Part 11 §11.10(b) - accurate reproduction
"""
SIMILARITY_THRESHOLD = 0.95 # 95% match required
DPI = 300 # Render resolution
def __init__(self, storage_client, database):
self.storage = storage_client
self.db = database
def verify_rendering(
self,
record_id: str,
archived_file_path: Path
) -> RenderingTestResult:
"""
Verify rendering accuracy against reference.
Args:
record_id: Unique record identifier
archived_file_path: Path to archived file
Returns:
RenderingTestResult with comparison metrics
"""
# Retrieve or create reference rendering
reference_img = self._get_reference_rendering(record_id)
if reference_img is None:
# First-time rendering - create reference
reference_img = self._render_to_image(archived_file_path)
self._store_reference_rendering(record_id, reference_img)
return RenderingTestResult(
record_id=record_id,
test_date=datetime.utcnow(),
status="PASS",
similarity_score=1.0,
pixel_match_percentage=100.0,
reference_checksum=self._image_checksum(reference_img),
current_checksum=self._image_checksum(reference_img),
differences_detected=[]
)
# Render current version
current_img = self._render_to_image(archived_file_path)
# Compare images
similarity, pixel_match, differences = self._compare_images(
reference_img,
current_img
)
# Determine status
status = "PASS" if similarity >= self.SIMILARITY_THRESHOLD else "FAIL"
if status == "FAIL":
status = "MANUAL_REVIEW" # Queue for human review
return RenderingTestResult(
record_id=record_id,
test_date=datetime.utcnow(),
status=status,
similarity_score=similarity,
pixel_match_percentage=pixel_match * 100,
reference_checksum=self._image_checksum(reference_img),
current_checksum=self._image_checksum(current_img),
differences_detected=differences
)
def _render_to_image(self, file_path: Path) -> Image.Image:
"""
Render PDF/A or TIFF to PNG image at specified DPI.
Args:
file_path: Path to archival file
Returns:
PIL Image object
"""
if file_path.suffix.lower() == '.pdf':
# Render PDF using PyMuPDF
doc = fitz.open(file_path)
page = doc[0] # First page
# Calculate zoom for target DPI
zoom = self.DPI / 72 # PDF default is 72 DPI
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data))
doc.close()
elif file_path.suffix.lower() in ['.tif', '.tiff']:
# Open TIFF directly
img = Image.open(file_path)
else:
raise ValueError(f"Unsupported format: {file_path.suffix}")
return img.convert('RGB') # Normalize to RGB
def _compare_images(
self,
reference: Image.Image,
current: Image.Image
) -> Tuple[float, float, list[str]]:
"""
Compare two images using SSIM and pixel-level matching.
Args:
reference: Reference image
current: Current rendering
Returns:
(similarity_score, pixel_match_ratio, differences_list)
"""
differences = []
# Convert to numpy arrays
ref_array = np.array(reference)
cur_array = np.array(current)
# Check dimensions match
if ref_array.shape != cur_array.shape:
differences.append(
f"Dimension mismatch: {ref_array.shape} vs {cur_array.shape}"
)
# Resize current to match reference for comparison
current = current.resize(reference.size, Image.Resampling.LANCZOS)
cur_array = np.array(current)
# Convert to grayscale for SSIM
ref_gray = np.mean(ref_array, axis=2) if len(ref_array.shape) == 3 else ref_array
cur_gray = np.mean(cur_array, axis=2) if len(cur_array.shape) == 3 else cur_array
# Calculate SSIM (Structural Similarity Index)
similarity_score = ssim(ref_gray, cur_gray, data_range=255)
# Calculate pixel-level match percentage
pixel_diff = np.abs(ref_array.astype(float) - cur_array.astype(float))
pixel_match_ratio = np.mean(pixel_diff < 10) / 255.0 # Tolerance: 10/255
# Identify specific differences
if similarity_score < self.SIMILARITY_THRESHOLD:
# Find regions with significant differences
diff_threshold = 50
significant_diff = np.any(pixel_diff > diff_threshold, axis=2) if len(pixel_diff.shape) == 3 else pixel_diff > diff_threshold
diff_count = np.sum(significant_diff)
diff_percentage = (diff_count / significant_diff.size) * 100
differences.append(
f"Significant pixel differences: {diff_percentage:.2f}% of image"
)
return similarity_score, pixel_match_ratio, differences
def _get_reference_rendering(self, record_id: str) -> Optional[Image.Image]:
"""Retrieve stored reference rendering from storage."""
reference_key = f"rendering-references/{record_id}.png"
try:
img_bytes = self.storage.download_blob(reference_key)
return Image.open(io.BytesIO(img_bytes))
except Exception:
return None
def _store_reference_rendering(self, record_id: str, image: Image.Image):
"""Store reference rendering to storage."""
reference_key = f"rendering-references/{record_id}.png"
img_buffer = io.BytesIO()
image.save(img_buffer, format='PNG')
img_bytes = img_buffer.getvalue()
self.storage.upload_blob(reference_key, img_bytes)
def _image_checksum(self, image: Image.Image) -> str:
"""Calculate SHA-256 checksum of image data."""
import hashlib
img_buffer = io.BytesIO()
image.save(img_buffer, format='PNG')
img_bytes = img_buffer.getvalue()
return hashlib.sha256(img_bytes).hexdigest()
Database Schema:
-- Table: rendering_test_results
CREATE TABLE rendering_test_results (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),
-- Test metadata
test_date TIMESTAMP NOT NULL DEFAULT NOW(),
test_type VARCHAR(50) NOT NULL, -- quarterly, annual, critical
-- Test result
status VARCHAR(50) NOT NULL, -- PASS, FAIL, MANUAL_REVIEW
similarity_score NUMERIC(5,4) NOT NULL, -- 0.0000 - 1.0000
pixel_match_percentage NUMERIC(5,2) NOT NULL, -- 0.00 - 100.00
-- Checksums
reference_checksum VARCHAR(64) NOT NULL,
current_checksum VARCHAR(64) NOT NULL,
-- Differences
differences_detected JSONB, -- Array of difference descriptions
-- Human review (if needed)
reviewer_notes TEXT,
reviewed_by UUID REFERENCES users(id),
reviewed_at TIMESTAMP,
-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
INDEX idx_rendering_record (record_id),
INDEX idx_rendering_status (status, test_date),
INDEX idx_rendering_tenant_date (tenant_id, test_date)
);
3. Format Migration
3.1 Technology Obsolescence Monitoring
Annual Format Viability Assessment:
| Assessment Criteria | Green (No Action) | Yellow (Monitor) | Red (Migration Required) |
|---|---|---|---|
| Renderer Availability | 5+ active renderers | 2-4 active renderers | <2 active renderers |
| Browser Support | All major browsers | 2-3 browsers | <2 browsers |
| Standard Status | Active ISO standard | Stable ISO standard | Deprecated standard |
| Vendor Support | Active development | Maintenance mode | End-of-life announced |
| Prevalence | >50% industry usage | 10-50% industry usage | <10% industry usage |
Current Assessment (2026):
| Format | Renderer Availability | Browser Support | Standard Status | Vendor Support | Prevalence | Overall Status |
|---|---|---|---|---|---|---|
| PDF/A-2b | 10+ (Adobe, PyMuPDF, etc.) | All major browsers | Active (ISO 19005-2) | Active | >80% | Green |
| PDF/A-3b | 8+ | All major browsers | Active (ISO 19005-3) | Active | 40% | Green |
| TIFF 6.0 | 5+ | Limited (plugins) | Stable (no updates) | Maintenance | 20% | Yellow |
Migration Schedule:
- TIFF 6.0 → PDF/A-2b: Migrate legacy TIFF archives within 2 years (by 2028)
- PDF (non-A) → PDF/A-2b: Immediate migration on detection
3.2 Migration Pipeline
Architecture:
Implementation:
# File: backend/qms/archival/format_migrator.py
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
from datetime import datetime
import subprocess
@dataclass
class MigrationResult:
"""Format migration result."""
success: bool
source_format: str
target_format: str
source_checksum: str
target_checksum: str
rendering_match: bool
validation_passed: bool
errors: list[str]
migrated_at: datetime
migration_tool: str
migration_tool_version: str
class FormatMigrator:
"""
Migrates archived records to current archival formats.
Compliance: FDA 21 CFR Part 11 §11.10(b) - migration with validation
"""
def migrate_tiff_to_pdfa(self, source_path: Path) -> MigrationResult:
"""
Migrate TIFF to PDF/A-2b using ImageMagick + Ghostscript.
Args:
source_path: Path to source TIFF file
Returns:
MigrationResult with migration status
"""
errors = []
target_path = source_path.with_suffix('.pdf')
try:
# Convert TIFF to PDF using ImageMagick
# -density 300: maintain 300 DPI
# -compress zip: lossless compression
# -quality 100: maximum quality
result = subprocess.run(
[
"convert",
str(source_path),
"-density", "300",
"-compress", "zip",
"-quality", "100",
str(target_path)
],
capture_output=True,
text=True,
timeout=120
)
if result.returncode != 0:
errors.append(f"ImageMagick conversion failed: {result.stderr}")
return self._failed_migration(errors)
# Convert PDF to PDF/A-2b using Ghostscript
pdfa_path = source_path.with_suffix('.pdfa.pdf')
result = subprocess.run(
[
"gs",
"-dPDFA=2",
"-dBATCH",
"-dNOPAUSE",
"-dUseCIEColor",
"-sProcessColorModel=DeviceRGB",
"-sDEVICE=pdfwrite",
f"-sOutputFile={pdfa_path}",
"-dPDFACompatibilityPolicy=1",
str(target_path)
],
capture_output=True,
text=True,
timeout=120
)
if result.returncode != 0:
errors.append(f"Ghostscript PDF/A conversion failed: {result.stderr}")
return self._failed_migration(errors)
# Validate resulting PDF/A
validator = ArchivalFormatValidator()
validation = validator.validate_pdf_a(pdfa_path)
if not validation.is_valid:
errors.extend(validation.errors)
return self._failed_migration(errors)
# Rendering comparison
verifier = RenderingVerifier(None, None)
ref_img = verifier._render_to_image(source_path)
new_img = verifier._render_to_image(pdfa_path)
similarity, _, _ = verifier._compare_images(ref_img, new_img)
rendering_match = similarity >= 0.95
if not rendering_match:
errors.append(f"Rendering mismatch: {similarity:.4f} < 0.95")
return MigrationResult(
success=rendering_match,
source_format="TIFF 6.0",
target_format="PDF/A-2b",
source_checksum=self._checksum(source_path),
target_checksum=validation.checksum_sha256,
rendering_match=rendering_match,
validation_passed=validation.is_valid,
errors=errors,
migrated_at=datetime.utcnow(),
migration_tool="ImageMagick + Ghostscript",
migration_tool_version=self._get_tool_versions()
)
except subprocess.TimeoutExpired:
errors.append("Migration timeout (>120s)")
return self._failed_migration(errors)
except Exception as e:
errors.append(f"Migration exception: {str(e)}")
return self._failed_migration(errors)
def migrate_pdf_to_pdfa(self, source_path: Path) -> MigrationResult:
"""
Migrate non-PDF/A PDF to PDF/A-2b using Ghostscript.
Args:
source_path: Path to source PDF file
Returns:
MigrationResult with migration status
"""
errors = []
target_path = source_path.with_stem(f"{source_path.stem}_pdfa")
try:
# Convert to PDF/A-2b using Ghostscript
result = subprocess.run(
[
"gs",
"-dPDFA=2",
"-dBATCH",
"-dNOPAUSE",
"-dUseCIEColor",
"-sProcessColorModel=DeviceRGB",
"-sDEVICE=pdfwrite",
f"-sOutputFile={target_path}",
"-dPDFACompatibilityPolicy=1",
str(source_path)
],
capture_output=True,
text=True,
timeout=120
)
if result.returncode != 0:
errors.append(f"Ghostscript PDF/A conversion failed: {result.stderr}")
return self._failed_migration(errors)
# Validate resulting PDF/A
validator = ArchivalFormatValidator()
validation = validator.validate_pdf_a(target_path)
if not validation.is_valid:
errors.extend(validation.errors)
return self._failed_migration(errors)
# Rendering comparison
verifier = RenderingVerifier(None, None)
ref_img = verifier._render_to_image(source_path)
new_img = verifier._render_to_image(target_path)
similarity, _, _ = verifier._compare_images(ref_img, new_img)
rendering_match = similarity >= 0.95
if not rendering_match:
errors.append(f"Rendering mismatch: {similarity:.4f} < 0.95")
return MigrationResult(
success=rendering_match,
source_format="PDF",
target_format="PDF/A-2b",
source_checksum=self._checksum(source_path),
target_checksum=validation.checksum_sha256,
rendering_match=rendering_match,
validation_passed=validation.is_valid,
errors=errors,
migrated_at=datetime.utcnow(),
migration_tool="Ghostscript",
migration_tool_version=self._get_tool_versions()
)
except subprocess.TimeoutExpired:
errors.append("Migration timeout (>120s)")
return self._failed_migration(errors)
except Exception as e:
errors.append(f"Migration exception: {str(e)}")
return self._failed_migration(errors)
def _failed_migration(self, errors: list[str]) -> MigrationResult:
"""Create a failed migration result."""
return MigrationResult(
success=False,
source_format="Unknown",
target_format="Unknown",
source_checksum="",
target_checksum="",
rendering_match=False,
validation_passed=False,
errors=errors,
migrated_at=datetime.utcnow(),
migration_tool="Unknown",
migration_tool_version="Unknown"
)
def _checksum(self, file_path: Path) -> str:
"""Calculate SHA-256 checksum."""
import hashlib
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def _get_tool_versions(self) -> str:
"""Get migration tool versions for audit trail."""
versions = []
# ImageMagick version
try:
result = subprocess.run(
["convert", "-version"],
capture_output=True,
text=True,
timeout=5
)
for line in result.stdout.split("\n"):
if "ImageMagick" in line:
versions.append(line.strip())
break
except Exception:
pass
# Ghostscript version
try:
result = subprocess.run(
["gs", "--version"],
capture_output=True,
text=True,
timeout=5
)
versions.append(f"Ghostscript {result.stdout.strip()}")
except Exception:
pass
return " | ".join(versions) if versions else "Unknown"
Database Schema:
-- Table: format_migration_log
CREATE TABLE format_migration_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),
-- Migration metadata
migrated_at TIMESTAMP NOT NULL DEFAULT NOW(),
migration_reason VARCHAR(255) NOT NULL, -- obsolescence, quality_improvement, etc.
-- Source and target
source_format VARCHAR(50) NOT NULL,
target_format VARCHAR(50) NOT NULL,
source_checksum VARCHAR(64) NOT NULL,
target_checksum VARCHAR(64) NOT NULL,
-- Validation
validation_passed BOOLEAN NOT NULL,
rendering_match BOOLEAN NOT NULL,
-- Migration result
success BOOLEAN NOT NULL,
errors JSONB, -- Array of error messages
-- Tools
migration_tool VARCHAR(255) NOT NULL,
migration_tool_version VARCHAR(255) NOT NULL,
-- Original preservation
original_file_preserved BOOLEAN NOT NULL DEFAULT TRUE,
original_file_location VARCHAR(500),
-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
INDEX idx_migration_record (record_id),
INDEX idx_migration_tenant_date (tenant_id, migrated_at),
INDEX idx_migration_success (success, migrated_at)
);
4. Accessibility
4.1 WCAG 2.1 AA Compliance
All archived electronic records must meet WCAG 2.1 AA accessibility standards to ensure readability by users with disabilities.
Requirements:
| WCAG Criterion | Implementation | Verification |
|---|---|---|
| 1.1.1 Non-text Content | Alt text for all images, charts, graphs | Automated check + manual review |
| 1.3.1 Info and Relationships | Tagged PDF structure, reading order | PDF accessibility checker |
| 1.3.2 Meaningful Sequence | Logical reading order in PDF tags | Manual verification |
| 1.4.3 Contrast (Minimum) | 4.5:1 contrast ratio for text | Automated color contrast analysis |
| 1.4.4 Resize Text | Text readable at 200% zoom | Manual testing |
| 2.1.1 Keyboard | All navigation via keyboard | Manual testing |
| 2.4.2 Page Titled | Descriptive page/document titles | Metadata verification |
| 3.1.1 Language of Page | Language identified in metadata | Metadata verification |
| 4.1.2 Name, Role, Value | Form fields properly labeled | PDF form checker |
4.2 Tagged PDF Structure
Implementation:
# File: backend/qms/archival/accessibility_tagger.py
from pathlib import Path
import fitz # PyMuPDF
from typing import Dict, List
class AccessibilityTagger:
"""
Adds accessibility tags to PDF/A documents.
Compliance: WCAG 2.1 AA, Section 508
"""
def tag_pdf_structure(self, pdf_path: Path) -> Dict[str, any]:
"""
Add structure tags to PDF for screen reader navigation.
Args:
pdf_path: Path to PDF file
Returns:
Dict with tagging results
"""
doc = fitz.open(pdf_path)
# Enable tagged PDF
doc.set_metadata({
'marked': 'true', # Mark as tagged PDF
'suspects': 'false' # No suspects in tagging
})
tags_added = []
for page_num, page in enumerate(doc):
# Extract text blocks with position
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if block["type"] == 0: # Text block
# Determine semantic role based on formatting
lines = block.get("lines", [])
for line in lines:
spans = line.get("spans", [])
for span in spans:
text = span.get("text", "")
font_size = span.get("size", 12)
font_flags = span.get("flags", 0)
# Classify as heading or paragraph
if font_size > 14 or (font_flags & 2 ** 4): # Large or bold
tag_type = "H1" if font_size > 18 else "H2"
else:
tag_type = "P"
# Add structure tag (simplified - full implementation uses PDF libraries)
tags_added.append({
'page': page_num,
'type': tag_type,
'text': text[:50] # First 50 chars
})
elif block["type"] == 1: # Image block
# Tag as Figure
tags_added.append({
'page': page_num,
'type': 'Figure',
'bbox': block.get("bbox")
})
# Save updated PDF
output_path = pdf_path.with_stem(f"{pdf_path.stem}_tagged")
doc.save(output_path, garbage=4, deflate=True)
doc.close()
return {
'success': True,
'tags_added': len(tags_added),
'output_path': str(output_path),
'tags': tags_added
}
def add_alt_text_to_images(
self,
pdf_path: Path,
alt_text_map: Dict[int, str] # page_num -> alt text
) -> bool:
"""
Add alt text to images in PDF.
Args:
pdf_path: Path to PDF file
alt_text_map: Mapping of page numbers to alt text
Returns:
Success status
"""
# Implementation uses pikepdf or other PDF library
# that supports adding alt text to image XObjects
# Simplified placeholder
return True
def verify_reading_order(self, pdf_path: Path) -> Dict[str, any]:
"""
Verify logical reading order in tagged PDF.
Args:
pdf_path: Path to PDF file
Returns:
Dict with verification results
"""
doc = fitz.open(pdf_path)
reading_order_issues = []
for page_num, page in enumerate(doc):
blocks = page.get_text("dict")["blocks"]
# Sort blocks by vertical position (top to bottom)
sorted_blocks = sorted(blocks, key=lambda b: b.get("bbox", [0, 0, 0, 0])[1])
# Check for out-of-order blocks
prev_y = -1
for block in sorted_blocks:
bbox = block.get("bbox", [0, 0, 0, 0])
current_y = bbox[1]
if current_y < prev_y - 10: # Tolerance: 10 points
reading_order_issues.append({
'page': page_num,
'issue': 'Out-of-order block detected',
'position': bbox
})
prev_y = bbox[3] # Bottom of current block
doc.close()
return {
'compliant': len(reading_order_issues) == 0,
'issues': reading_order_issues
}
5. Print Controls
5.1 Controlled Printing
Purpose: Maintain control over physical copies of regulated records with audit trail.
Implementation:
# File: backend/qms/archival/print_controller.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import fitz # PyMuPDF
from pathlib import Path
@dataclass
class PrintRequest:
"""Print request data."""
record_id: str
user_id: str
tenant_id: str
copy_type: str # CONTROLLED, UNCONTROLLED
num_copies: int
reason: str
requested_at: datetime
@dataclass
class PrintJob:
"""Print job result."""
job_id: str
record_id: str
user_id: str
watermarked_file_path: Path
copy_type: str
num_copies: int
printed_at: datetime
printer_name: str
class PrintController:
"""
Manages controlled printing of archived records.
Compliance: ISO 9001 §4.2.3 - control of documents
"""
WATERMARK_CONTROLLED = "CONTROLLED COPY"
WATERMARK_UNCONTROLLED = "UNCONTROLLED COPY - FOR INFORMATION ONLY"
def create_print_job(self, request: PrintRequest) -> PrintJob:
"""
Create a controlled print job with watermarking.
Args:
request: Print request details
Returns:
PrintJob with watermarked file ready for printing
"""
# Retrieve archived file
archived_file = self._retrieve_archived_file(request.record_id)
# Apply watermark
watermarked_file = self._apply_watermark(
archived_file,
request.copy_type,
request.record_id,
request.user_id
)
# Log print audit event
job_id = self._log_print_event(request, watermarked_file)
return PrintJob(
job_id=job_id,
record_id=request.record_id,
user_id=request.user_id,
watermarked_file_path=watermarked_file,
copy_type=request.copy_type,
num_copies=request.num_copies,
printed_at=datetime.utcnow(),
printer_name="default" # Configured printer
)
def _apply_watermark(
self,
source_file: Path,
copy_type: str,
record_id: str,
user_id: str
) -> Path:
"""
Apply watermark and header/footer to PDF.
Args:
source_file: Path to source PDF
copy_type: CONTROLLED or UNCONTROLLED
record_id: Record identifier
user_id: User requesting print
Returns:
Path to watermarked PDF
"""
doc = fitz.open(source_file)
watermark_text = (
self.WATERMARK_CONTROLLED if copy_type == "CONTROLLED"
else self.WATERMARK_UNCONTROLLED
)
for page_num, page in enumerate(doc, start=1):
# Add diagonal watermark
text_rect = page.rect
text_writer = fitz.TextWriter(text_rect)
# Calculate diagonal position
page_width = text_rect.width
page_height = text_rect.height
# Add watermark at center, rotated 45 degrees
page.insert_text(
point=(page_width / 2, page_height / 2),
text=watermark_text,
fontsize=48,
fontname="helv",
color=(0.9, 0.9, 0.9), # Light gray
rotate=45,
overlay=False
)
# Add header with document ID
header_text = f"Document ID: {record_id} | Page {page_num}/{len(doc)}"
page.insert_text(
point=(50, 30),
text=header_text,
fontsize=10,
fontname="helv",
color=(0, 0, 0)
)
# Add footer with print date and user
footer_text = f"Printed: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')} by User {user_id}"
page.insert_text(
point=(50, page_height - 30),
text=footer_text,
fontsize=10,
fontname="helv",
color=(0, 0, 0)
)
# Save watermarked PDF
output_path = source_file.with_stem(f"{source_file.stem}_print_{copy_type}")
doc.save(output_path, garbage=4, deflate=True)
doc.close()
return output_path
def _retrieve_archived_file(self, record_id: str) -> Path:
"""Retrieve archived file from storage."""
# Implementation retrieves from GCS or local storage
pass
def _log_print_event(self, request: PrintRequest, watermarked_file: Path) -> str:
"""Log print event to audit trail."""
# Implementation logs to database
pass
Database Schema:
-- Table: print_audit_log
CREATE TABLE print_audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id VARCHAR(50) UNIQUE NOT NULL,
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),
user_id UUID NOT NULL REFERENCES users(id),
-- Print details
copy_type VARCHAR(50) NOT NULL, -- CONTROLLED, UNCONTROLLED
num_copies INTEGER NOT NULL,
reason TEXT NOT NULL,
-- Watermark details
watermarked_file_path VARCHAR(500) NOT NULL,
watermark_text VARCHAR(255) NOT NULL,
-- Printer
printer_name VARCHAR(255),
print_queue VARCHAR(255),
-- Timestamps
requested_at TIMESTAMP NOT NULL,
printed_at TIMESTAMP NOT NULL DEFAULT NOW(),
-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
INDEX idx_print_record (record_id),
INDEX idx_print_user (user_id, printed_at),
INDEX idx_print_tenant_date (tenant_id, printed_at)
);
6. Long-Term Storage Architecture
6.1 Storage Tier Management
Architecture:
Storage Tier Specifications:
| Tier | Storage Medium | Access Time | Cost/GB/Month | Retention Period | Use Case |
|---|---|---|---|---|---|
| Hot | GCS Standard (SSD) | <1 second | $0.020 | 0-90 days | Active records, frequent access |
| Warm | GCS Nearline (HDD) | <5 seconds | $0.010 | 90 days - 2 years | Occasional access |
| Cold | GCS Archive | <12 hours | $0.0012 | 2+ years | Long-term retention, rare access |
GCS Lifecycle Policy:
{
"lifecycle": {
"rule": [
{
"action": {
"type": "SetStorageClass",
"storageClass": "NEARLINE"
},
"condition": {
"age": 90,
"matchesPrefix": ["archived-records/"],
"matchesSuffix": [".pdf", ".pdfa.pdf"]
}
},
{
"action": {
"type": "SetStorageClass",
"storageClass": "ARCHIVE"
},
"condition": {
"age": 730,
"matchesPrefix": ["archived-records/"],
"matchesSuffix": [".pdf", ".pdfa.pdf"]
}
},
{
"action": {
"type": "Delete"
},
"condition": {
"age": 9125,
"matchesPrefix": ["archived-records/non-regulated/"]
}
}
]
}
}
Terraform Configuration:
# File: infrastructure/terraform/storage.tf
resource "google_storage_bucket" "archival_storage" {
name = "${var.project_id}-archival-records"
location = "US" # Multi-region for high availability
storage_class = "STANDARD"
uniform_bucket_level_access = true
versioning {
enabled = true # Maintain version history
}
lifecycle_rule {
action {
type = "SetStorageClass"
storage_class = "NEARLINE"
}
condition {
age = 90
matches_prefix = ["archived-records/"]
matches_suffix = [".pdf", ".pdfa.pdf"]
}
}
lifecycle_rule {
action {
type = "SetStorageClass"
storage_class = "ARCHIVE"
}
condition {
age = 730 # 2 years
matches_prefix = ["archived-records/"]
matches_suffix = [".pdf", ".pdfa.pdf"]
}
}
# Retention policy for regulated records (25 years minimum)
retention_policy {
retention_period = 788400000 # 25 years in seconds
}
# Encryption
encryption {
default_kms_key_name = google_kms_crypto_key.archival_encryption_key.id
}
# Logging
logging {
log_bucket = google_storage_bucket.archival_access_logs.name
}
# Labels
labels = {
environment = var.environment
compliance = "fda-21cfr11"
data_class = "regulated-records"
}
}
# Geographic redundancy - replicate to second region
resource "google_storage_bucket" "archival_storage_dr" {
name = "${var.project_id}-archival-records-dr"
location = "EU" # Different geography for DR
storage_class = "STANDARD"
uniform_bucket_level_access = true
versioning {
enabled = true
}
# Same lifecycle rules as primary
lifecycle_rule {
action {
type = "SetStorageClass"
storage_class = "NEARLINE"
}
condition {
age = 90
matches_prefix = ["archived-records/"]
}
}
lifecycle_rule {
action {
type = "SetStorageClass"
storage_class = "ARCHIVE"
}
condition {
age = 730
matches_prefix = ["archived-records/"]
}
}
retention_policy {
retention_period = 788400000 # 25 years
}
encryption {
default_kms_key_name = google_kms_crypto_key.archival_encryption_key_eu.id
}
labels = {
environment = var.environment
compliance = "fda-21cfr11"
data_class = "regulated-records"
dr_replica = "true"
}
}
# KMS encryption key for archival storage
resource "google_kms_crypto_key" "archival_encryption_key" {
name = "archival-records-encryption-key"
key_ring = google_kms_key_ring.qms_keyring.id
rotation_period = "7776000s" # 90 days
lifecycle {
prevent_destroy = true # Never destroy encryption keys
}
}
# Access logs bucket
resource "google_storage_bucket" "archival_access_logs" {
name = "${var.project_id}-archival-access-logs"
location = "US"
storage_class = "STANDARD"
uniform_bucket_level_access = true
lifecycle_rule {
action {
type = "Delete"
}
condition {
age = 2555 # 7 years retention for audit logs
}
}
}
6.2 Bit Rot Detection & Recovery
Purpose: Detect and recover from storage medium degradation (bit rot).
Implementation:
# File: backend/qms/archival/integrity_checker.py
from pathlib import Path
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
import hashlib
@dataclass
class IntegrityCheckResult:
"""File integrity check result."""
record_id: str
file_path: str
stored_checksum: str
computed_checksum: str
matches: bool
checked_at: datetime
file_size_bytes: int
storage_tier: str
class StorageIntegrityChecker:
"""
Periodically verifies file integrity using checksums.
Compliance: FDA 21 CFR Part 11 §11.10(a) - data integrity
"""
# Check frequency by storage tier
CHECK_INTERVALS = {
'hot': 30, # days
'warm': 90, # days
'cold': 365 # days
}
def verify_file_integrity(
self,
record_id: str,
file_path: Path,
stored_checksum: str,
storage_tier: str
) -> IntegrityCheckResult:
"""
Verify file integrity by comparing stored and computed checksums.
Args:
record_id: Record identifier
file_path: Path to file
stored_checksum: Previously stored SHA-256 checksum
storage_tier: Storage tier (hot, warm, cold)
Returns:
IntegrityCheckResult with verification status
"""
# Compute current checksum
computed_checksum = self._compute_checksum(file_path)
# Compare checksums
matches = stored_checksum == computed_checksum
# Get file size
file_size = file_path.stat().st_size
result = IntegrityCheckResult(
record_id=record_id,
file_path=str(file_path),
stored_checksum=stored_checksum,
computed_checksum=computed_checksum,
matches=matches,
checked_at=datetime.utcnow(),
file_size_bytes=file_size,
storage_tier=storage_tier
)
# If mismatch detected, initiate recovery
if not matches:
self._initiate_recovery(result)
return result
def _compute_checksum(self, file_path: Path) -> str:
"""Compute SHA-256 checksum of file."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def _initiate_recovery(self, result: IntegrityCheckResult):
"""
Initiate recovery process for corrupted file.
Recovery steps:
1. Attempt to restore from geographic replica
2. If replica also corrupted, restore from backup
3. Log incident and alert administrators
"""
# Implementation would:
# - Check DR replica integrity
# - Restore from DR replica if valid
# - Escalate to backup restore if DR also corrupted
# - Log to incident tracking system
pass
Database Schema:
-- Table: storage_integrity_checks
CREATE TABLE storage_integrity_checks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),
-- File details
file_path VARCHAR(500) NOT NULL,
storage_tier VARCHAR(50) NOT NULL, -- hot, warm, cold
file_size_bytes BIGINT NOT NULL,
-- Checksum verification
stored_checksum VARCHAR(64) NOT NULL,
computed_checksum VARCHAR(64) NOT NULL,
matches BOOLEAN NOT NULL,
-- Check metadata
checked_at TIMESTAMP NOT NULL DEFAULT NOW(),
check_type VARCHAR(50) NOT NULL, -- scheduled, on_demand, recovery
-- Recovery (if needed)
recovery_initiated BOOLEAN DEFAULT FALSE,
recovery_successful BOOLEAN,
recovery_source VARCHAR(100), -- dr_replica, backup, etc.
-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
INDEX idx_integrity_record (record_id),
INDEX idx_integrity_tenant_date (tenant_id, checked_at),
INDEX idx_integrity_mismatches (matches, checked_at) WHERE matches = FALSE
);
7. Monitoring & Reporting
7.1 Archive Health Dashboard
Metrics:
| Metric | Calculation | Target | Alert Threshold |
|---|---|---|---|
| Format Compliance % | (valid_records / total_records) × 100 | >99% | <95% |
| Rendering Success % | (passed_tests / total_tests) × 100 | >98% | <90% |
| Storage Utilization | used_capacity / total_capacity | <80% | >90% |
| Integrity Check Pass % | (matching_checksums / total_checks) × 100 | 100% | <99.9% |
| Migration Backlog | records_needing_migration | <100 | >500 |
| Access Latency (p95) | 95th percentile retrieval time | Hot <1s, Warm <5s, Cold <12h | 2x target |
Implementation:
# File: backend/qms/archival/monitoring.py
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List
@dataclass
class ArchiveHealthMetrics:
"""Archive health dashboard metrics."""
timestamp: datetime
# Format compliance
total_records: int
valid_format_records: int
format_compliance_percentage: float
# Rendering verification
total_rendering_tests: int
passed_rendering_tests: int
rendering_success_percentage: float
# Storage utilization
hot_tier_used_gb: float
hot_tier_capacity_gb: float
warm_tier_used_gb: float
warm_tier_capacity_gb: float
cold_tier_used_gb: float
cold_tier_capacity_gb: float
total_utilization_percentage: float
# Integrity
total_integrity_checks: int
matching_checksums: int
integrity_pass_percentage: float
# Migration
records_needing_migration: int
# Access performance
hot_access_latency_p95_ms: float
warm_access_latency_p95_ms: float
cold_access_latency_p95_ms: float
class ArchiveMonitoringService:
"""
Collects and reports archive health metrics.
Compliance: Continuous monitoring for data integrity
"""
def collect_health_metrics(self) -> ArchiveHealthMetrics:
"""
Collect current archive health metrics.
Returns:
ArchiveHealthMetrics snapshot
"""
# Query database for metrics
# Format compliance
total_records = self._count_total_records()
valid_format_records = self._count_valid_format_records()
format_compliance_pct = (valid_format_records / total_records * 100) if total_records > 0 else 0
# Rendering verification
total_rendering_tests = self._count_rendering_tests_last_30_days()
passed_rendering_tests = self._count_passed_rendering_tests_last_30_days()
rendering_success_pct = (passed_rendering_tests / total_rendering_tests * 100) if total_rendering_tests > 0 else 0
# Storage utilization
storage_stats = self._get_storage_utilization()
# Integrity checks
total_integrity_checks = self._count_integrity_checks_last_30_days()
matching_checksums = self._count_matching_checksums_last_30_days()
integrity_pass_pct = (matching_checksums / total_integrity_checks * 100) if total_integrity_checks > 0 else 0
# Migration backlog
records_needing_migration = self._count_records_needing_migration()
# Access latency
latency_stats = self._get_access_latency_stats()
return ArchiveHealthMetrics(
timestamp=datetime.utcnow(),
total_records=total_records,
valid_format_records=valid_format_records,
format_compliance_percentage=format_compliance_pct,
total_rendering_tests=total_rendering_tests,
passed_rendering_tests=passed_rendering_tests,
rendering_success_percentage=rendering_success_pct,
hot_tier_used_gb=storage_stats['hot_used'],
hot_tier_capacity_gb=storage_stats['hot_capacity'],
warm_tier_used_gb=storage_stats['warm_used'],
warm_tier_capacity_gb=storage_stats['warm_capacity'],
cold_tier_used_gb=storage_stats['cold_used'],
cold_tier_capacity_gb=storage_stats['cold_capacity'],
total_utilization_percentage=storage_stats['total_utilization'],
total_integrity_checks=total_integrity_checks,
matching_checksums=matching_checksums,
integrity_pass_percentage=integrity_pass_pct,
records_needing_migration=records_needing_migration,
hot_access_latency_p95_ms=latency_stats['hot_p95'],
warm_access_latency_p95_ms=latency_stats['warm_p95'],
cold_access_latency_p95_ms=latency_stats['cold_p95']
)
def _count_total_records(self) -> int:
"""Count total archived records."""
pass
def _count_valid_format_records(self) -> int:
"""Count records with valid archival format."""
pass
def _count_rendering_tests_last_30_days(self) -> int:
"""Count rendering tests in last 30 days."""
pass
def _count_passed_rendering_tests_last_30_days(self) -> int:
"""Count passed rendering tests in last 30 days."""
pass
def _get_storage_utilization(self) -> Dict[str, float]:
"""Get storage utilization stats for all tiers."""
pass
def _count_integrity_checks_last_30_days(self) -> int:
"""Count integrity checks in last 30 days."""
pass
def _count_matching_checksums_last_30_days(self) -> int:
"""Count integrity checks with matching checksums in last 30 days."""
pass
def _count_records_needing_migration(self) -> int:
"""Count records flagged for format migration."""
pass
def _get_access_latency_stats(self) -> Dict[str, float]:
"""Get access latency statistics by tier (p95)."""
pass
7.2 Compliance Reporting
Automated Reports:
| Report | Frequency | Recipients | Purpose |
|---|---|---|---|
| Archive Health Summary | Weekly | QA Manager, IT Director | Overall archive status |
| Format Compliance Report | Monthly | Quality Assurance | Format validation metrics |
| Migration Status Report | Quarterly | IT Director, Compliance Officer | Obsolescence tracking |
| Integrity Audit Report | Quarterly | QA Manager, Regulatory Affairs | Checksum verification results |
| Cost Optimization Report | Monthly | CFO, IT Director | Storage costs by tier |
| Access Audit Report | Monthly | Security Officer | Who accessed what records |
8. Compliance Mapping
8.1 FDA 21 CFR Part 11
| Requirement | Control Implementation | Evidence |
|---|---|---|
| §11.10(b) - Legibility | PDF/A-2b archival format, rendering verification, print controls | archival_validation_log, rendering_test_results |
| §11.10(c) - Accurate and Complete Copies | Rendering comparison (95% threshold), format validation | rendering_test_results.similarity_score |
| §11.10(e) - Audit Trail | Print audit log, format migration log, integrity check log | print_audit_log, format_migration_log, storage_integrity_checks |
8.2 EU Annex 11
| Requirement | Control Implementation | Evidence |
|---|---|---|
| §7.1 - Data Storage | Multi-tier storage (Hot/Warm/Cold), geographic redundancy, encryption | GCS bucket configuration, KMS encryption |
| §7.2 - Data Protection | SHA-256 checksums, bit rot detection, automated recovery | storage_integrity_checks |
| §13 - Change and Configuration Management | Format migration with validation and chain of custody | format_migration_log |
8.3 WHO TRS 996 Annex 5
| Requirement | Control Implementation | Evidence |
|---|---|---|
| §8.3 - Data Integrity | Checksum verification, rendering tests, format validation | archival_validation_log, storage_integrity_checks |
| §13.2 - Electronic Records Retention | 25-year minimum retention, lifecycle policies, no deletion | GCS retention policy (788400000s) |
9. Validation & Testing
9.1 Installation Qualification (IQ)
Objective: Verify that archival infrastructure is installed according to specifications.
Test Cases:
| Test ID | Test Description | Expected Result | Evidence |
|---|---|---|---|
| IQ-01 | Verify veraPDF installation | veraPDF version ≥ 1.24 | veraPDF --version output |
| IQ-02 | Verify LibTIFF installation | tiffinfo available | tiffinfo -h output |
| IQ-03 | Verify GCS bucket creation | Buckets exist in US and EU regions | gsutil ls output |
| IQ-04 | Verify lifecycle policies applied | Policies match specification | gsutil lifecycle get output |
| IQ-05 | Verify KMS encryption keys | Keys exist and rotation enabled | gcloud kms keys describe output |
| IQ-06 | Verify database schema | Tables created with correct indexes | SQL schema verification |
9.2 Operational Qualification (OQ)
Objective: Verify that archival system operates according to specifications.
Test Cases:
| Test ID | Test Description | Expected Result | Evidence |
|---|---|---|---|
| OQ-01 | Format validation - PDF/A-2b compliant | Validation passes | ValidationResult.is_valid = True |
| OQ-02 | Format validation - PDF/A-2b non-compliant | Validation fails with errors | ValidationResult.errors populated |
| OQ-03 | Rendering verification - identical rendering | Similarity ≥ 95% | similarity_score ≥ 0.95 |
| OQ-04 | Rendering verification - different rendering | Similarity < 95%, queued for review | status = MANUAL_REVIEW |
| OQ-05 | Format migration - TIFF to PDF/A | Migration succeeds, rendering matches | MigrationResult.success = True |
| OQ-06 | Print watermarking - controlled copy | Watermark applied correctly | Visual verification of PDF |
| OQ-07 | Storage tier transition - Hot to Warm | File moved after 90 days | GCS storage class = NEARLINE |
| OQ-08 | Integrity check - matching checksum | Check passes | IntegrityCheckResult.matches = True |
| OQ-09 | Integrity check - corrupted file | Check fails, recovery initiated | recovery_initiated = True |
9.3 Performance Qualification (PQ)
Objective: Verify that archival system performs under production load.
Test Cases:
| Test ID | Test Description | Expected Result | Evidence |
|---|---|---|---|
| PQ-01 | Validation throughput | ≥100 records/hour | Performance test log |
| PQ-02 | Rendering verification throughput | ≥50 comparisons/hour | Performance test log |
| PQ-03 | Hot storage access latency | p95 < 1 second | Latency metrics |
| PQ-04 | Warm storage access latency | p95 < 5 seconds | Latency metrics |
| PQ-05 | Cold storage retrieval time | p95 < 12 hours | Latency metrics |
| PQ-06 | Concurrent access | 100 concurrent requests without degradation | Load test results |
10. Standard Operating Procedures (SOPs)
SOP-001: Annual Format Viability Assessment
Purpose: Assess continued viability of archival formats and identify obsolescence risks.
Frequency: Annually (January)
Procedure:
- Review current archival format usage statistics
- Research industry format adoption trends
- Assess renderer availability for each format
- Evaluate browser/software support status
- Check ISO standard status (active, stable, deprecated)
- Assign viability status (Green/Yellow/Red)
- If Red status, create migration project plan
- Document findings in annual assessment report
- Present to Quality Assurance and IT leadership
Responsibilities:
- Owner: IT Director
- Reviewer: Quality Assurance Manager
- Approver: Head of Regulatory Affairs
SOP-002: Quarterly Rendering Verification
Purpose: Verify archived records remain accurately renderable.
Frequency: Quarterly (Hot tier), Semi-annually (Warm tier), Annually (Cold tier)
Procedure:
- Generate test sample (5% random + 100% critical records)
- Execute automated rendering comparison script
- Review MANUAL_REVIEW queue items
- Document pass/fail results
- Escalate failures to Quality Assurance
- Update rendering test database
- Generate quarterly report
Responsibilities:
- Owner: QA Engineer
- Reviewer: QA Manager
- Approver: Quality Assurance Manager
SOP-003: Format Migration Execution
Purpose: Migrate records from obsolete format to current archival format.
Frequency: As needed (triggered by format assessment)
Procedure:
- Create migration project plan
- Identify records requiring migration
- Execute migration pipeline on test sample (10 records)
- Validate test migrations (format + rendering)
- If test successful, proceed to production migration
- Execute production migration in batches (100 records/batch)
- Validate each batch before proceeding
- Preserve original files in quarantine storage
- Log all migrations to audit trail
- Generate migration completion report
Responsibilities:
- Owner: IT Operations Engineer
- Reviewer: QA Engineer
- Approver: Quality Assurance Manager
11. Appendices
Appendix A: veraPDF Integration Script
#!/bin/bash
# File: scripts/validate-pdfa.sh
# Purpose: Validate PDF/A compliance using veraPDF
set -euo pipefail
VERAPDF_PATH="/opt/verapdf/verapdf"
INPUT_FILE="$1"
OUTPUT_REPORT="${2:-validation-report.xml}"
if [ ! -f "$INPUT_FILE" ]; then
echo "Error: Input file not found: $INPUT_FILE"
exit 1
fi
# Run veraPDF validation
"$VERAPDF_PATH" \
--format xml \
--flavour 2b \
--verbose \
"$INPUT_FILE" > "$OUTPUT_REPORT"
# Parse result
if grep -q '<compliant>true</compliant>' "$OUTPUT_REPORT"; then
echo "✓ PDF/A-2b validation PASSED"
exit 0
else
echo "✗ PDF/A-2b validation FAILED"
echo "See report: $OUTPUT_REPORT"
exit 1
fi
Appendix B: GCS Lifecycle Policy Deployment
#!/bin/bash
# File: scripts/deploy-gcs-lifecycle.sh
# Purpose: Deploy GCS lifecycle policies to archival buckets
set -euo pipefail
PROJECT_ID="coditect-qms-prod"
BUCKET_NAME="${PROJECT_ID}-archival-records"
POLICY_FILE="gcs-lifecycle-policy.json"
# Create lifecycle policy JSON
cat > "$POLICY_FILE" <<EOF
{
"lifecycle": {
"rule": [
{
"action": {
"type": "SetStorageClass",
"storageClass": "NEARLINE"
},
"condition": {
"age": 90,
"matchesPrefix": ["archived-records/"],
"matchesSuffix": [".pdf", ".pdfa.pdf"]
}
},
{
"action": {
"type": "SetStorageClass",
"storageClass": "ARCHIVE"
},
"condition": {
"age": 730,
"matchesPrefix": ["archived-records/"],
"matchesSuffix": [".pdf", ".pdfa.pdf"]
}
}
]
}
}
EOF
# Apply lifecycle policy
gsutil lifecycle set "$POLICY_FILE" "gs://${BUCKET_NAME}"
echo "✓ Lifecycle policy applied to gs://${BUCKET_NAME}"
# Verify
gsutil lifecycle get "gs://${BUCKET_NAME}"
Appendix C: Monitoring Setup (Prometheus Metrics)
# File: backend/qms/archival/metrics.py
from prometheus_client import Counter, Gauge, Histogram
# Format validation metrics
format_validation_total = Counter(
'archival_format_validation_total',
'Total format validations performed',
['tenant_id', 'format_type', 'result']
)
format_compliance_percentage = Gauge(
'archival_format_compliance_percentage',
'Percentage of records with valid format',
['tenant_id']
)
# Rendering verification metrics
rendering_test_total = Counter(
'archival_rendering_test_total',
'Total rendering tests performed',
['tenant_id', 'status']
)
rendering_similarity_score = Histogram(
'archival_rendering_similarity_score',
'Rendering similarity scores',
['tenant_id'],
buckets=[0.5, 0.7, 0.8, 0.9, 0.95, 0.98, 0.99, 1.0]
)
# Storage integrity metrics
integrity_check_total = Counter(
'archival_integrity_check_total',
'Total integrity checks performed',
['tenant_id', 'storage_tier', 'result']
)
# Migration metrics
migration_total = Counter(
'archival_migration_total',
'Total format migrations performed',
['tenant_id', 'source_format', 'target_format', 'result']
)
# Storage utilization metrics
storage_utilization_bytes = Gauge(
'archival_storage_utilization_bytes',
'Storage utilization in bytes',
['tenant_id', 'storage_tier']
)
# Access latency metrics
access_latency_seconds = Histogram(
'archival_access_latency_seconds',
'Record access latency in seconds',
['tenant_id', 'storage_tier'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
Document Control
Version History:
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0.0 | 2026-02-16 | CODITECT Compliance Framework Specialist | Initial release |
Approval:
| Role | Name | Signature | Date |
|---|---|---|---|
| Author | CODITECT Compliance Framework Specialist | Pending | 2026-02-16 |
| Reviewer | Quality Assurance Manager | Pending | Pending |
| Approver | Head of Regulatory Affairs | Pending | Pending |
Next Review Date: 2027-02-16 (annual review)
End of Document