Skip to main content

Legibility Controls for Long-Term Storage

Executive Summary

This document defines the comprehensive legibility controls for the CODITECT BIO-QMS platform, ensuring that regulated electronic records remain readable, accessible, and verifiable throughout their retention period (25+ years). The controls address format validation, rendering verification, format migration, accessibility, print management, storage architecture, and continuous monitoring in compliance with FDA 21 CFR Part 11, EU Annex 11, and WHO TRS 996 Annex 5.

Key Requirements:

  • Legibility: Records must remain readable for the duration of their retention period
  • Accuracy: Records must maintain their original content and meaning
  • Availability: Records must be retrievable within defined timeframes
  • Integrity: Records must be protected from corruption, degradation, and unauthorized modification

Architecture Overview:

  • Primary Archive Format: PDF/A-2b (ISO 19005-2)
  • Storage Tiers: Hot (SSD) → Warm (HDD) → Cold (GCS Archive)
  • Validation: Automated format compliance + periodic rendering verification
  • Migration: Technology obsolescence monitoring + controlled migration pipeline

1. Format Validation

1.1 Supported Archival Formats

All regulated records must be archived in one of the following validated formats:

FormatStandardUse CaseRetention Profile
PDF/A-2bISO 19005-2:2011Primary archive format for all document typesHot/Warm/Cold
PDF/A-3bISO 19005-3:2012Documents with embedded source files (e.g., data files)Hot/Warm/Cold
TIFF 6.0TIFF Specification 6.0Legacy scanned documents, migration sourceWarm/Cold

Rationale:

  • PDF/A is the industry-standard long-term archival format
  • PDF/A-2b provides ISO standardization with wide renderer support
  • PDF/A-3b allows embedding of source data files while maintaining archival compliance
  • TIFF 6.0 is accepted for legacy scanned documents but will be migrated to PDF/A

1.2 Automatic Format Integrity Check

Trigger: Every record archival event (document finalization, batch upload, migration)

Validation Pipeline:

Implementation:

# File: backend/qms/archival/format_validator.py

import subprocess
import hashlib
from pathlib import Path
from typing import Tuple, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import xml.etree.ElementTree as ET

@dataclass
class ValidationResult:
"""Format validation result."""
is_valid: bool
format_type: str
pdf_a_compliance: Optional[str] # e.g., "PDF/A-2b"
errors: list[str]
warnings: list[str]
metadata_complete: bool
fonts_embedded: bool
color_profiles_embedded: bool
checksum_sha256: str
validated_at: datetime
validator_version: str


class ArchivalFormatValidator:
"""
Validates archival format compliance for long-term storage.

Compliance: FDA 21 CFR Part 11 §11.10(b) - legibility
"""

VERAPDF_PATH = "/opt/verapdf/verapdf"
LIBTIFF_TIFFINFO_PATH = "/usr/bin/tiffinfo"

REQUIRED_METADATA_FIELDS = [
"dc:title",
"dc:creator",
"dc:subject",
"dc:description",
"xmp:CreateDate",
"xmp:ModifyDate",
"pdf:Producer",
]

def validate_pdf_a(self, file_path: Path) -> ValidationResult:
"""
Validate PDF/A compliance using veraPDF.

Args:
file_path: Path to PDF file

Returns:
ValidationResult with compliance status
"""
errors = []
warnings = []

# Run veraPDF validation
try:
result = subprocess.run(
[
self.VERAPDF_PATH,
"--format", "xml",
"--flavour", "2b", # PDF/A-2b
str(file_path)
],
capture_output=True,
text=True,
timeout=60
)

# Parse XML output
root = ET.fromstring(result.stdout)

# Check compliance
compliant_elem = root.find(".//compliant")
is_compliant = compliant_elem.text.lower() == "true" if compliant_elem is not None else False

# Extract errors
for error in root.findall(".//error"):
errors.append(error.find("message").text)

# Extract warnings
for warning in root.findall(".//warning"):
warnings.append(warning.find("message").text)

# Determine PDF/A flavor
flavour = root.find(".//flavour")
pdf_a_compliance = flavour.text if flavour is not None else "Unknown"

except subprocess.TimeoutExpired:
errors.append("veraPDF validation timeout (>60s)")
is_compliant = False
pdf_a_compliance = None
except Exception as e:
errors.append(f"veraPDF validation failed: {str(e)}")
is_compliant = False
pdf_a_compliance = None

# Metadata verification
metadata_complete = self._verify_metadata(file_path)
if not metadata_complete:
warnings.append("Incomplete XMP metadata")

# Font embedding verification
fonts_embedded = self._verify_fonts_embedded(file_path)
if not fonts_embedded:
errors.append("Not all fonts are embedded")

# Color profile verification
color_profiles_embedded = self._verify_color_profiles(file_path)
if not color_profiles_embedded:
errors.append("ICC color profiles not embedded")

# Generate checksum
checksum = self._generate_checksum(file_path)

return ValidationResult(
is_valid=is_compliant and fonts_embedded and color_profiles_embedded,
format_type="PDF/A",
pdf_a_compliance=pdf_a_compliance,
errors=errors,
warnings=warnings,
metadata_complete=metadata_complete,
fonts_embedded=fonts_embedded,
color_profiles_embedded=color_profiles_embedded,
checksum_sha256=checksum,
validated_at=datetime.utcnow(),
validator_version=self._get_verapdf_version()
)

def validate_tiff(self, file_path: Path) -> ValidationResult:
"""
Validate TIFF 6.0 compliance using LibTIFF.

Args:
file_path: Path to TIFF file

Returns:
ValidationResult with compliance status
"""
errors = []
warnings = []

try:
result = subprocess.run(
[self.LIBTIFF_TIFFINFO_PATH, str(file_path)],
capture_output=True,
text=True,
timeout=30
)

# Parse tiffinfo output
output = result.stdout

# Check for required TIFF tags
if "Image Width" not in output:
errors.append("Missing Image Width tag")
if "Image Length" not in output:
errors.append("Missing Image Length tag")
if "Bits/Sample" not in output:
errors.append("Missing Bits/Sample tag")

# Check compression
if "Compression Scheme" in output:
if "LZW" in output or "ZIP" in output or "None" in output:
pass # Acceptable compression
else:
warnings.append("Non-standard compression detected")

is_valid = len(errors) == 0

except subprocess.TimeoutExpired:
errors.append("TIFF validation timeout (>30s)")
is_valid = False
except Exception as e:
errors.append(f"TIFF validation failed: {str(e)}")
is_valid = False

# Generate checksum
checksum = self._generate_checksum(file_path)

return ValidationResult(
is_valid=is_valid,
format_type="TIFF",
pdf_a_compliance=None,
errors=errors,
warnings=warnings,
metadata_complete=False, # TIFF has limited metadata
fonts_embedded=True, # N/A for raster
color_profiles_embedded=False, # Not required for TIFF 6.0
checksum_sha256=checksum,
validated_at=datetime.utcnow(),
validator_version=self._get_libtiff_version()
)

def _verify_metadata(self, file_path: Path) -> bool:
"""Verify XMP metadata completeness using exiftool."""
try:
result = subprocess.run(
["exiftool", "-xmp:all", "-json", str(file_path)],
capture_output=True,
text=True,
timeout=10
)

import json
metadata = json.loads(result.stdout)[0]

# Check for required fields
for field in self.REQUIRED_METADATA_FIELDS:
if field not in metadata or not metadata[field]:
return False

return True
except Exception:
return False

def _verify_fonts_embedded(self, file_path: Path) -> bool:
"""Verify all fonts are embedded using pdffonts."""
try:
result = subprocess.run(
["pdffonts", str(file_path)],
capture_output=True,
text=True,
timeout=10
)

# Parse pdffonts output (skip header lines)
lines = result.stdout.strip().split("\n")[2:]

for line in lines:
columns = line.split()
if len(columns) >= 4:
# Column 3 is "emb" (embedded) - should be "yes"
if columns[3].lower() != "yes":
return False

return True
except Exception:
return False

def _verify_color_profiles(self, file_path: Path) -> bool:
"""Verify ICC color profiles are embedded."""
try:
result = subprocess.run(
["exiftool", "-icc_profile:all", str(file_path)],
capture_output=True,
text=True,
timeout=10
)

# Check if ICC profile info is present
return "ICC" in result.stdout
except Exception:
return False

def _generate_checksum(self, file_path: Path) -> str:
"""Generate SHA-256 checksum for file corruption detection."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()

def _get_verapdf_version(self) -> str:
"""Get veraPDF version for audit trail."""
try:
result = subprocess.run(
[self.VERAPDF_PATH, "--version"],
capture_output=True,
text=True,
timeout=5
)
return result.stdout.strip()
except Exception:
return "Unknown"

def _get_libtiff_version(self) -> str:
"""Get LibTIFF version for audit trail."""
try:
result = subprocess.run(
[self.LIBTIFF_TIFFINFO_PATH, "-v"],
capture_output=True,
text=True,
timeout=5
)
# Extract version from output
for line in result.stdout.split("\n"):
if "LIBTIFF" in line.upper():
return line.strip()
return "Unknown"
except Exception:
return "Unknown"

Database Schema:

-- Table: archival_validation_log
CREATE TABLE archival_validation_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),

-- Validation metadata
validated_at TIMESTAMP NOT NULL DEFAULT NOW(),
validator_version VARCHAR(255) NOT NULL,

-- Validation result
is_valid BOOLEAN NOT NULL,
format_type VARCHAR(50) NOT NULL, -- PDF/A, TIFF
pdf_a_compliance VARCHAR(50), -- PDF/A-2b, PDF/A-3b, etc.

-- Detailed checks
metadata_complete BOOLEAN NOT NULL,
fonts_embedded BOOLEAN NOT NULL,
color_profiles_embedded BOOLEAN NOT NULL,

-- Issues
errors JSONB, -- Array of error messages
warnings JSONB, -- Array of warning messages

-- File integrity
checksum_sha256 VARCHAR(64) NOT NULL,
file_size_bytes BIGINT NOT NULL,

-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),

INDEX idx_validation_record (record_id),
INDEX idx_validation_tenant_date (tenant_id, validated_at),
INDEX idx_validation_invalid (is_valid, validated_at) WHERE is_valid = FALSE
);

2. Rendering Verification

2.1 Periodic Rendering Tests

Purpose: Verify that archived records can still be rendered accurately after storage.

Test Frequency:

  • Hot tier (0-90 days): Quarterly (every 90 days)
  • Warm tier (90 days - 2 years): Semi-annually (every 180 days)
  • Cold tier (2+ years): Annually (every 365 days)

Test Coverage:

  • Sample-based: 5% random sample of all records per test cycle
  • Critical records: 100% of GxP-critical records (batch records, validation protocols, audit reports)

2.2 Automated Rendering Comparison

Pipeline:

Implementation:

# File: backend/qms/archival/rendering_verifier.py

import io
from pathlib import Path
from typing import Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
from PIL import Image
import fitz # PyMuPDF
import numpy as np
from skimage.metrics import structural_similarity as ssim


@dataclass
class RenderingTestResult:
"""Rendering verification test result."""
record_id: str
test_date: datetime
status: str # PASS, FAIL, MANUAL_REVIEW
similarity_score: float # 0.0 - 1.0
pixel_match_percentage: float # 0.0 - 100.0
reference_checksum: str
current_checksum: str
differences_detected: list[str]
reviewer_notes: Optional[str] = None
reviewed_by: Optional[str] = None
reviewed_at: Optional[datetime] = None


class RenderingVerifier:
"""
Verifies rendering accuracy of archived records.

Compliance: FDA 21 CFR Part 11 §11.10(b) - accurate reproduction
"""

SIMILARITY_THRESHOLD = 0.95 # 95% match required
DPI = 300 # Render resolution

def __init__(self, storage_client, database):
self.storage = storage_client
self.db = database

def verify_rendering(
self,
record_id: str,
archived_file_path: Path
) -> RenderingTestResult:
"""
Verify rendering accuracy against reference.

Args:
record_id: Unique record identifier
archived_file_path: Path to archived file

Returns:
RenderingTestResult with comparison metrics
"""
# Retrieve or create reference rendering
reference_img = self._get_reference_rendering(record_id)

if reference_img is None:
# First-time rendering - create reference
reference_img = self._render_to_image(archived_file_path)
self._store_reference_rendering(record_id, reference_img)

return RenderingTestResult(
record_id=record_id,
test_date=datetime.utcnow(),
status="PASS",
similarity_score=1.0,
pixel_match_percentage=100.0,
reference_checksum=self._image_checksum(reference_img),
current_checksum=self._image_checksum(reference_img),
differences_detected=[]
)

# Render current version
current_img = self._render_to_image(archived_file_path)

# Compare images
similarity, pixel_match, differences = self._compare_images(
reference_img,
current_img
)

# Determine status
status = "PASS" if similarity >= self.SIMILARITY_THRESHOLD else "FAIL"

if status == "FAIL":
status = "MANUAL_REVIEW" # Queue for human review

return RenderingTestResult(
record_id=record_id,
test_date=datetime.utcnow(),
status=status,
similarity_score=similarity,
pixel_match_percentage=pixel_match * 100,
reference_checksum=self._image_checksum(reference_img),
current_checksum=self._image_checksum(current_img),
differences_detected=differences
)

def _render_to_image(self, file_path: Path) -> Image.Image:
"""
Render PDF/A or TIFF to PNG image at specified DPI.

Args:
file_path: Path to archival file

Returns:
PIL Image object
"""
if file_path.suffix.lower() == '.pdf':
# Render PDF using PyMuPDF
doc = fitz.open(file_path)
page = doc[0] # First page

# Calculate zoom for target DPI
zoom = self.DPI / 72 # PDF default is 72 DPI
mat = fitz.Matrix(zoom, zoom)

pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data))

doc.close()

elif file_path.suffix.lower() in ['.tif', '.tiff']:
# Open TIFF directly
img = Image.open(file_path)

else:
raise ValueError(f"Unsupported format: {file_path.suffix}")

return img.convert('RGB') # Normalize to RGB

def _compare_images(
self,
reference: Image.Image,
current: Image.Image
) -> Tuple[float, float, list[str]]:
"""
Compare two images using SSIM and pixel-level matching.

Args:
reference: Reference image
current: Current rendering

Returns:
(similarity_score, pixel_match_ratio, differences_list)
"""
differences = []

# Convert to numpy arrays
ref_array = np.array(reference)
cur_array = np.array(current)

# Check dimensions match
if ref_array.shape != cur_array.shape:
differences.append(
f"Dimension mismatch: {ref_array.shape} vs {cur_array.shape}"
)
# Resize current to match reference for comparison
current = current.resize(reference.size, Image.Resampling.LANCZOS)
cur_array = np.array(current)

# Convert to grayscale for SSIM
ref_gray = np.mean(ref_array, axis=2) if len(ref_array.shape) == 3 else ref_array
cur_gray = np.mean(cur_array, axis=2) if len(cur_array.shape) == 3 else cur_array

# Calculate SSIM (Structural Similarity Index)
similarity_score = ssim(ref_gray, cur_gray, data_range=255)

# Calculate pixel-level match percentage
pixel_diff = np.abs(ref_array.astype(float) - cur_array.astype(float))
pixel_match_ratio = np.mean(pixel_diff < 10) / 255.0 # Tolerance: 10/255

# Identify specific differences
if similarity_score < self.SIMILARITY_THRESHOLD:
# Find regions with significant differences
diff_threshold = 50
significant_diff = np.any(pixel_diff > diff_threshold, axis=2) if len(pixel_diff.shape) == 3 else pixel_diff > diff_threshold

diff_count = np.sum(significant_diff)
diff_percentage = (diff_count / significant_diff.size) * 100

differences.append(
f"Significant pixel differences: {diff_percentage:.2f}% of image"
)

return similarity_score, pixel_match_ratio, differences

def _get_reference_rendering(self, record_id: str) -> Optional[Image.Image]:
"""Retrieve stored reference rendering from storage."""
reference_key = f"rendering-references/{record_id}.png"

try:
img_bytes = self.storage.download_blob(reference_key)
return Image.open(io.BytesIO(img_bytes))
except Exception:
return None

def _store_reference_rendering(self, record_id: str, image: Image.Image):
"""Store reference rendering to storage."""
reference_key = f"rendering-references/{record_id}.png"

img_buffer = io.BytesIO()
image.save(img_buffer, format='PNG')
img_bytes = img_buffer.getvalue()

self.storage.upload_blob(reference_key, img_bytes)

def _image_checksum(self, image: Image.Image) -> str:
"""Calculate SHA-256 checksum of image data."""
import hashlib

img_buffer = io.BytesIO()
image.save(img_buffer, format='PNG')
img_bytes = img_buffer.getvalue()

return hashlib.sha256(img_bytes).hexdigest()

Database Schema:

-- Table: rendering_test_results
CREATE TABLE rendering_test_results (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),

-- Test metadata
test_date TIMESTAMP NOT NULL DEFAULT NOW(),
test_type VARCHAR(50) NOT NULL, -- quarterly, annual, critical

-- Test result
status VARCHAR(50) NOT NULL, -- PASS, FAIL, MANUAL_REVIEW
similarity_score NUMERIC(5,4) NOT NULL, -- 0.0000 - 1.0000
pixel_match_percentage NUMERIC(5,2) NOT NULL, -- 0.00 - 100.00

-- Checksums
reference_checksum VARCHAR(64) NOT NULL,
current_checksum VARCHAR(64) NOT NULL,

-- Differences
differences_detected JSONB, -- Array of difference descriptions

-- Human review (if needed)
reviewer_notes TEXT,
reviewed_by UUID REFERENCES users(id),
reviewed_at TIMESTAMP,

-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),

INDEX idx_rendering_record (record_id),
INDEX idx_rendering_status (status, test_date),
INDEX idx_rendering_tenant_date (tenant_id, test_date)
);

3. Format Migration

3.1 Technology Obsolescence Monitoring

Annual Format Viability Assessment:

Assessment CriteriaGreen (No Action)Yellow (Monitor)Red (Migration Required)
Renderer Availability5+ active renderers2-4 active renderers<2 active renderers
Browser SupportAll major browsers2-3 browsers<2 browsers
Standard StatusActive ISO standardStable ISO standardDeprecated standard
Vendor SupportActive developmentMaintenance modeEnd-of-life announced
Prevalence>50% industry usage10-50% industry usage<10% industry usage

Current Assessment (2026):

FormatRenderer AvailabilityBrowser SupportStandard StatusVendor SupportPrevalenceOverall Status
PDF/A-2b10+ (Adobe, PyMuPDF, etc.)All major browsersActive (ISO 19005-2)Active>80%Green
PDF/A-3b8+All major browsersActive (ISO 19005-3)Active40%Green
TIFF 6.05+Limited (plugins)Stable (no updates)Maintenance20%Yellow

Migration Schedule:

  • TIFF 6.0 → PDF/A-2b: Migrate legacy TIFF archives within 2 years (by 2028)
  • PDF (non-A) → PDF/A-2b: Immediate migration on detection

3.2 Migration Pipeline

Architecture:

Implementation:

# File: backend/qms/archival/format_migrator.py

from pathlib import Path
from typing import Optional
from dataclasses import dataclass
from datetime import datetime
import subprocess


@dataclass
class MigrationResult:
"""Format migration result."""
success: bool
source_format: str
target_format: str
source_checksum: str
target_checksum: str
rendering_match: bool
validation_passed: bool
errors: list[str]
migrated_at: datetime
migration_tool: str
migration_tool_version: str


class FormatMigrator:
"""
Migrates archived records to current archival formats.

Compliance: FDA 21 CFR Part 11 §11.10(b) - migration with validation
"""

def migrate_tiff_to_pdfa(self, source_path: Path) -> MigrationResult:
"""
Migrate TIFF to PDF/A-2b using ImageMagick + Ghostscript.

Args:
source_path: Path to source TIFF file

Returns:
MigrationResult with migration status
"""
errors = []
target_path = source_path.with_suffix('.pdf')

try:
# Convert TIFF to PDF using ImageMagick
# -density 300: maintain 300 DPI
# -compress zip: lossless compression
# -quality 100: maximum quality
result = subprocess.run(
[
"convert",
str(source_path),
"-density", "300",
"-compress", "zip",
"-quality", "100",
str(target_path)
],
capture_output=True,
text=True,
timeout=120
)

if result.returncode != 0:
errors.append(f"ImageMagick conversion failed: {result.stderr}")
return self._failed_migration(errors)

# Convert PDF to PDF/A-2b using Ghostscript
pdfa_path = source_path.with_suffix('.pdfa.pdf')

result = subprocess.run(
[
"gs",
"-dPDFA=2",
"-dBATCH",
"-dNOPAUSE",
"-dUseCIEColor",
"-sProcessColorModel=DeviceRGB",
"-sDEVICE=pdfwrite",
f"-sOutputFile={pdfa_path}",
"-dPDFACompatibilityPolicy=1",
str(target_path)
],
capture_output=True,
text=True,
timeout=120
)

if result.returncode != 0:
errors.append(f"Ghostscript PDF/A conversion failed: {result.stderr}")
return self._failed_migration(errors)

# Validate resulting PDF/A
validator = ArchivalFormatValidator()
validation = validator.validate_pdf_a(pdfa_path)

if not validation.is_valid:
errors.extend(validation.errors)
return self._failed_migration(errors)

# Rendering comparison
verifier = RenderingVerifier(None, None)
ref_img = verifier._render_to_image(source_path)
new_img = verifier._render_to_image(pdfa_path)

similarity, _, _ = verifier._compare_images(ref_img, new_img)
rendering_match = similarity >= 0.95

if not rendering_match:
errors.append(f"Rendering mismatch: {similarity:.4f} < 0.95")

return MigrationResult(
success=rendering_match,
source_format="TIFF 6.0",
target_format="PDF/A-2b",
source_checksum=self._checksum(source_path),
target_checksum=validation.checksum_sha256,
rendering_match=rendering_match,
validation_passed=validation.is_valid,
errors=errors,
migrated_at=datetime.utcnow(),
migration_tool="ImageMagick + Ghostscript",
migration_tool_version=self._get_tool_versions()
)

except subprocess.TimeoutExpired:
errors.append("Migration timeout (>120s)")
return self._failed_migration(errors)
except Exception as e:
errors.append(f"Migration exception: {str(e)}")
return self._failed_migration(errors)

def migrate_pdf_to_pdfa(self, source_path: Path) -> MigrationResult:
"""
Migrate non-PDF/A PDF to PDF/A-2b using Ghostscript.

Args:
source_path: Path to source PDF file

Returns:
MigrationResult with migration status
"""
errors = []
target_path = source_path.with_stem(f"{source_path.stem}_pdfa")

try:
# Convert to PDF/A-2b using Ghostscript
result = subprocess.run(
[
"gs",
"-dPDFA=2",
"-dBATCH",
"-dNOPAUSE",
"-dUseCIEColor",
"-sProcessColorModel=DeviceRGB",
"-sDEVICE=pdfwrite",
f"-sOutputFile={target_path}",
"-dPDFACompatibilityPolicy=1",
str(source_path)
],
capture_output=True,
text=True,
timeout=120
)

if result.returncode != 0:
errors.append(f"Ghostscript PDF/A conversion failed: {result.stderr}")
return self._failed_migration(errors)

# Validate resulting PDF/A
validator = ArchivalFormatValidator()
validation = validator.validate_pdf_a(target_path)

if not validation.is_valid:
errors.extend(validation.errors)
return self._failed_migration(errors)

# Rendering comparison
verifier = RenderingVerifier(None, None)
ref_img = verifier._render_to_image(source_path)
new_img = verifier._render_to_image(target_path)

similarity, _, _ = verifier._compare_images(ref_img, new_img)
rendering_match = similarity >= 0.95

if not rendering_match:
errors.append(f"Rendering mismatch: {similarity:.4f} < 0.95")

return MigrationResult(
success=rendering_match,
source_format="PDF",
target_format="PDF/A-2b",
source_checksum=self._checksum(source_path),
target_checksum=validation.checksum_sha256,
rendering_match=rendering_match,
validation_passed=validation.is_valid,
errors=errors,
migrated_at=datetime.utcnow(),
migration_tool="Ghostscript",
migration_tool_version=self._get_tool_versions()
)

except subprocess.TimeoutExpired:
errors.append("Migration timeout (>120s)")
return self._failed_migration(errors)
except Exception as e:
errors.append(f"Migration exception: {str(e)}")
return self._failed_migration(errors)

def _failed_migration(self, errors: list[str]) -> MigrationResult:
"""Create a failed migration result."""
return MigrationResult(
success=False,
source_format="Unknown",
target_format="Unknown",
source_checksum="",
target_checksum="",
rendering_match=False,
validation_passed=False,
errors=errors,
migrated_at=datetime.utcnow(),
migration_tool="Unknown",
migration_tool_version="Unknown"
)

def _checksum(self, file_path: Path) -> str:
"""Calculate SHA-256 checksum."""
import hashlib
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()

def _get_tool_versions(self) -> str:
"""Get migration tool versions for audit trail."""
versions = []

# ImageMagick version
try:
result = subprocess.run(
["convert", "-version"],
capture_output=True,
text=True,
timeout=5
)
for line in result.stdout.split("\n"):
if "ImageMagick" in line:
versions.append(line.strip())
break
except Exception:
pass

# Ghostscript version
try:
result = subprocess.run(
["gs", "--version"],
capture_output=True,
text=True,
timeout=5
)
versions.append(f"Ghostscript {result.stdout.strip()}")
except Exception:
pass

return " | ".join(versions) if versions else "Unknown"

Database Schema:

-- Table: format_migration_log
CREATE TABLE format_migration_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),

-- Migration metadata
migrated_at TIMESTAMP NOT NULL DEFAULT NOW(),
migration_reason VARCHAR(255) NOT NULL, -- obsolescence, quality_improvement, etc.

-- Source and target
source_format VARCHAR(50) NOT NULL,
target_format VARCHAR(50) NOT NULL,
source_checksum VARCHAR(64) NOT NULL,
target_checksum VARCHAR(64) NOT NULL,

-- Validation
validation_passed BOOLEAN NOT NULL,
rendering_match BOOLEAN NOT NULL,

-- Migration result
success BOOLEAN NOT NULL,
errors JSONB, -- Array of error messages

-- Tools
migration_tool VARCHAR(255) NOT NULL,
migration_tool_version VARCHAR(255) NOT NULL,

-- Original preservation
original_file_preserved BOOLEAN NOT NULL DEFAULT TRUE,
original_file_location VARCHAR(500),

-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),

INDEX idx_migration_record (record_id),
INDEX idx_migration_tenant_date (tenant_id, migrated_at),
INDEX idx_migration_success (success, migrated_at)
);

4. Accessibility

4.1 WCAG 2.1 AA Compliance

All archived electronic records must meet WCAG 2.1 AA accessibility standards to ensure readability by users with disabilities.

Requirements:

WCAG CriterionImplementationVerification
1.1.1 Non-text ContentAlt text for all images, charts, graphsAutomated check + manual review
1.3.1 Info and RelationshipsTagged PDF structure, reading orderPDF accessibility checker
1.3.2 Meaningful SequenceLogical reading order in PDF tagsManual verification
1.4.3 Contrast (Minimum)4.5:1 contrast ratio for textAutomated color contrast analysis
1.4.4 Resize TextText readable at 200% zoomManual testing
2.1.1 KeyboardAll navigation via keyboardManual testing
2.4.2 Page TitledDescriptive page/document titlesMetadata verification
3.1.1 Language of PageLanguage identified in metadataMetadata verification
4.1.2 Name, Role, ValueForm fields properly labeledPDF form checker

4.2 Tagged PDF Structure

Implementation:

# File: backend/qms/archival/accessibility_tagger.py

from pathlib import Path
import fitz # PyMuPDF
from typing import Dict, List


class AccessibilityTagger:
"""
Adds accessibility tags to PDF/A documents.

Compliance: WCAG 2.1 AA, Section 508
"""

def tag_pdf_structure(self, pdf_path: Path) -> Dict[str, any]:
"""
Add structure tags to PDF for screen reader navigation.

Args:
pdf_path: Path to PDF file

Returns:
Dict with tagging results
"""
doc = fitz.open(pdf_path)

# Enable tagged PDF
doc.set_metadata({
'marked': 'true', # Mark as tagged PDF
'suspects': 'false' # No suspects in tagging
})

tags_added = []

for page_num, page in enumerate(doc):
# Extract text blocks with position
blocks = page.get_text("dict")["blocks"]

for block in blocks:
if block["type"] == 0: # Text block
# Determine semantic role based on formatting
lines = block.get("lines", [])

for line in lines:
spans = line.get("spans", [])

for span in spans:
text = span.get("text", "")
font_size = span.get("size", 12)
font_flags = span.get("flags", 0)

# Classify as heading or paragraph
if font_size > 14 or (font_flags & 2 ** 4): # Large or bold
tag_type = "H1" if font_size > 18 else "H2"
else:
tag_type = "P"

# Add structure tag (simplified - full implementation uses PDF libraries)
tags_added.append({
'page': page_num,
'type': tag_type,
'text': text[:50] # First 50 chars
})

elif block["type"] == 1: # Image block
# Tag as Figure
tags_added.append({
'page': page_num,
'type': 'Figure',
'bbox': block.get("bbox")
})

# Save updated PDF
output_path = pdf_path.with_stem(f"{pdf_path.stem}_tagged")
doc.save(output_path, garbage=4, deflate=True)
doc.close()

return {
'success': True,
'tags_added': len(tags_added),
'output_path': str(output_path),
'tags': tags_added
}

def add_alt_text_to_images(
self,
pdf_path: Path,
alt_text_map: Dict[int, str] # page_num -> alt text
) -> bool:
"""
Add alt text to images in PDF.

Args:
pdf_path: Path to PDF file
alt_text_map: Mapping of page numbers to alt text

Returns:
Success status
"""
# Implementation uses pikepdf or other PDF library
# that supports adding alt text to image XObjects

# Simplified placeholder
return True

def verify_reading_order(self, pdf_path: Path) -> Dict[str, any]:
"""
Verify logical reading order in tagged PDF.

Args:
pdf_path: Path to PDF file

Returns:
Dict with verification results
"""
doc = fitz.open(pdf_path)

reading_order_issues = []

for page_num, page in enumerate(doc):
blocks = page.get_text("dict")["blocks"]

# Sort blocks by vertical position (top to bottom)
sorted_blocks = sorted(blocks, key=lambda b: b.get("bbox", [0, 0, 0, 0])[1])

# Check for out-of-order blocks
prev_y = -1
for block in sorted_blocks:
bbox = block.get("bbox", [0, 0, 0, 0])
current_y = bbox[1]

if current_y < prev_y - 10: # Tolerance: 10 points
reading_order_issues.append({
'page': page_num,
'issue': 'Out-of-order block detected',
'position': bbox
})

prev_y = bbox[3] # Bottom of current block

doc.close()

return {
'compliant': len(reading_order_issues) == 0,
'issues': reading_order_issues
}

5. Print Controls

5.1 Controlled Printing

Purpose: Maintain control over physical copies of regulated records with audit trail.

Implementation:

# File: backend/qms/archival/print_controller.py

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import fitz # PyMuPDF
from pathlib import Path


@dataclass
class PrintRequest:
"""Print request data."""
record_id: str
user_id: str
tenant_id: str
copy_type: str # CONTROLLED, UNCONTROLLED
num_copies: int
reason: str
requested_at: datetime


@dataclass
class PrintJob:
"""Print job result."""
job_id: str
record_id: str
user_id: str
watermarked_file_path: Path
copy_type: str
num_copies: int
printed_at: datetime
printer_name: str


class PrintController:
"""
Manages controlled printing of archived records.

Compliance: ISO 9001 §4.2.3 - control of documents
"""

WATERMARK_CONTROLLED = "CONTROLLED COPY"
WATERMARK_UNCONTROLLED = "UNCONTROLLED COPY - FOR INFORMATION ONLY"

def create_print_job(self, request: PrintRequest) -> PrintJob:
"""
Create a controlled print job with watermarking.

Args:
request: Print request details

Returns:
PrintJob with watermarked file ready for printing
"""
# Retrieve archived file
archived_file = self._retrieve_archived_file(request.record_id)

# Apply watermark
watermarked_file = self._apply_watermark(
archived_file,
request.copy_type,
request.record_id,
request.user_id
)

# Log print audit event
job_id = self._log_print_event(request, watermarked_file)

return PrintJob(
job_id=job_id,
record_id=request.record_id,
user_id=request.user_id,
watermarked_file_path=watermarked_file,
copy_type=request.copy_type,
num_copies=request.num_copies,
printed_at=datetime.utcnow(),
printer_name="default" # Configured printer
)

def _apply_watermark(
self,
source_file: Path,
copy_type: str,
record_id: str,
user_id: str
) -> Path:
"""
Apply watermark and header/footer to PDF.

Args:
source_file: Path to source PDF
copy_type: CONTROLLED or UNCONTROLLED
record_id: Record identifier
user_id: User requesting print

Returns:
Path to watermarked PDF
"""
doc = fitz.open(source_file)

watermark_text = (
self.WATERMARK_CONTROLLED if copy_type == "CONTROLLED"
else self.WATERMARK_UNCONTROLLED
)

for page_num, page in enumerate(doc, start=1):
# Add diagonal watermark
text_rect = page.rect
text_writer = fitz.TextWriter(text_rect)

# Calculate diagonal position
page_width = text_rect.width
page_height = text_rect.height

# Add watermark at center, rotated 45 degrees
page.insert_text(
point=(page_width / 2, page_height / 2),
text=watermark_text,
fontsize=48,
fontname="helv",
color=(0.9, 0.9, 0.9), # Light gray
rotate=45,
overlay=False
)

# Add header with document ID
header_text = f"Document ID: {record_id} | Page {page_num}/{len(doc)}"
page.insert_text(
point=(50, 30),
text=header_text,
fontsize=10,
fontname="helv",
color=(0, 0, 0)
)

# Add footer with print date and user
footer_text = f"Printed: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')} by User {user_id}"
page.insert_text(
point=(50, page_height - 30),
text=footer_text,
fontsize=10,
fontname="helv",
color=(0, 0, 0)
)

# Save watermarked PDF
output_path = source_file.with_stem(f"{source_file.stem}_print_{copy_type}")
doc.save(output_path, garbage=4, deflate=True)
doc.close()

return output_path

def _retrieve_archived_file(self, record_id: str) -> Path:
"""Retrieve archived file from storage."""
# Implementation retrieves from GCS or local storage
pass

def _log_print_event(self, request: PrintRequest, watermarked_file: Path) -> str:
"""Log print event to audit trail."""
# Implementation logs to database
pass

Database Schema:

-- Table: print_audit_log
CREATE TABLE print_audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id VARCHAR(50) UNIQUE NOT NULL,
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),
user_id UUID NOT NULL REFERENCES users(id),

-- Print details
copy_type VARCHAR(50) NOT NULL, -- CONTROLLED, UNCONTROLLED
num_copies INTEGER NOT NULL,
reason TEXT NOT NULL,

-- Watermark details
watermarked_file_path VARCHAR(500) NOT NULL,
watermark_text VARCHAR(255) NOT NULL,

-- Printer
printer_name VARCHAR(255),
print_queue VARCHAR(255),

-- Timestamps
requested_at TIMESTAMP NOT NULL,
printed_at TIMESTAMP NOT NULL DEFAULT NOW(),

-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),

INDEX idx_print_record (record_id),
INDEX idx_print_user (user_id, printed_at),
INDEX idx_print_tenant_date (tenant_id, printed_at)
);

6. Long-Term Storage Architecture

6.1 Storage Tier Management

Architecture:

Storage Tier Specifications:

TierStorage MediumAccess TimeCost/GB/MonthRetention PeriodUse Case
HotGCS Standard (SSD)<1 second$0.0200-90 daysActive records, frequent access
WarmGCS Nearline (HDD)<5 seconds$0.01090 days - 2 yearsOccasional access
ColdGCS Archive<12 hours$0.00122+ yearsLong-term retention, rare access

GCS Lifecycle Policy:

{
"lifecycle": {
"rule": [
{
"action": {
"type": "SetStorageClass",
"storageClass": "NEARLINE"
},
"condition": {
"age": 90,
"matchesPrefix": ["archived-records/"],
"matchesSuffix": [".pdf", ".pdfa.pdf"]
}
},
{
"action": {
"type": "SetStorageClass",
"storageClass": "ARCHIVE"
},
"condition": {
"age": 730,
"matchesPrefix": ["archived-records/"],
"matchesSuffix": [".pdf", ".pdfa.pdf"]
}
},
{
"action": {
"type": "Delete"
},
"condition": {
"age": 9125,
"matchesPrefix": ["archived-records/non-regulated/"]
}
}
]
}
}

Terraform Configuration:

# File: infrastructure/terraform/storage.tf

resource "google_storage_bucket" "archival_storage" {
name = "${var.project_id}-archival-records"
location = "US" # Multi-region for high availability
storage_class = "STANDARD"

uniform_bucket_level_access = true

versioning {
enabled = true # Maintain version history
}

lifecycle_rule {
action {
type = "SetStorageClass"
storage_class = "NEARLINE"
}
condition {
age = 90
matches_prefix = ["archived-records/"]
matches_suffix = [".pdf", ".pdfa.pdf"]
}
}

lifecycle_rule {
action {
type = "SetStorageClass"
storage_class = "ARCHIVE"
}
condition {
age = 730 # 2 years
matches_prefix = ["archived-records/"]
matches_suffix = [".pdf", ".pdfa.pdf"]
}
}

# Retention policy for regulated records (25 years minimum)
retention_policy {
retention_period = 788400000 # 25 years in seconds
}

# Encryption
encryption {
default_kms_key_name = google_kms_crypto_key.archival_encryption_key.id
}

# Logging
logging {
log_bucket = google_storage_bucket.archival_access_logs.name
}

# Labels
labels = {
environment = var.environment
compliance = "fda-21cfr11"
data_class = "regulated-records"
}
}

# Geographic redundancy - replicate to second region
resource "google_storage_bucket" "archival_storage_dr" {
name = "${var.project_id}-archival-records-dr"
location = "EU" # Different geography for DR
storage_class = "STANDARD"

uniform_bucket_level_access = true

versioning {
enabled = true
}

# Same lifecycle rules as primary
lifecycle_rule {
action {
type = "SetStorageClass"
storage_class = "NEARLINE"
}
condition {
age = 90
matches_prefix = ["archived-records/"]
}
}

lifecycle_rule {
action {
type = "SetStorageClass"
storage_class = "ARCHIVE"
}
condition {
age = 730
matches_prefix = ["archived-records/"]
}
}

retention_policy {
retention_period = 788400000 # 25 years
}

encryption {
default_kms_key_name = google_kms_crypto_key.archival_encryption_key_eu.id
}

labels = {
environment = var.environment
compliance = "fda-21cfr11"
data_class = "regulated-records"
dr_replica = "true"
}
}

# KMS encryption key for archival storage
resource "google_kms_crypto_key" "archival_encryption_key" {
name = "archival-records-encryption-key"
key_ring = google_kms_key_ring.qms_keyring.id

rotation_period = "7776000s" # 90 days

lifecycle {
prevent_destroy = true # Never destroy encryption keys
}
}

# Access logs bucket
resource "google_storage_bucket" "archival_access_logs" {
name = "${var.project_id}-archival-access-logs"
location = "US"
storage_class = "STANDARD"

uniform_bucket_level_access = true

lifecycle_rule {
action {
type = "Delete"
}
condition {
age = 2555 # 7 years retention for audit logs
}
}
}

6.2 Bit Rot Detection & Recovery

Purpose: Detect and recover from storage medium degradation (bit rot).

Implementation:

# File: backend/qms/archival/integrity_checker.py

from pathlib import Path
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
import hashlib


@dataclass
class IntegrityCheckResult:
"""File integrity check result."""
record_id: str
file_path: str
stored_checksum: str
computed_checksum: str
matches: bool
checked_at: datetime
file_size_bytes: int
storage_tier: str


class StorageIntegrityChecker:
"""
Periodically verifies file integrity using checksums.

Compliance: FDA 21 CFR Part 11 §11.10(a) - data integrity
"""

# Check frequency by storage tier
CHECK_INTERVALS = {
'hot': 30, # days
'warm': 90, # days
'cold': 365 # days
}

def verify_file_integrity(
self,
record_id: str,
file_path: Path,
stored_checksum: str,
storage_tier: str
) -> IntegrityCheckResult:
"""
Verify file integrity by comparing stored and computed checksums.

Args:
record_id: Record identifier
file_path: Path to file
stored_checksum: Previously stored SHA-256 checksum
storage_tier: Storage tier (hot, warm, cold)

Returns:
IntegrityCheckResult with verification status
"""
# Compute current checksum
computed_checksum = self._compute_checksum(file_path)

# Compare checksums
matches = stored_checksum == computed_checksum

# Get file size
file_size = file_path.stat().st_size

result = IntegrityCheckResult(
record_id=record_id,
file_path=str(file_path),
stored_checksum=stored_checksum,
computed_checksum=computed_checksum,
matches=matches,
checked_at=datetime.utcnow(),
file_size_bytes=file_size,
storage_tier=storage_tier
)

# If mismatch detected, initiate recovery
if not matches:
self._initiate_recovery(result)

return result

def _compute_checksum(self, file_path: Path) -> str:
"""Compute SHA-256 checksum of file."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()

def _initiate_recovery(self, result: IntegrityCheckResult):
"""
Initiate recovery process for corrupted file.

Recovery steps:
1. Attempt to restore from geographic replica
2. If replica also corrupted, restore from backup
3. Log incident and alert administrators
"""
# Implementation would:
# - Check DR replica integrity
# - Restore from DR replica if valid
# - Escalate to backup restore if DR also corrupted
# - Log to incident tracking system
pass

Database Schema:

-- Table: storage_integrity_checks
CREATE TABLE storage_integrity_checks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
record_id UUID NOT NULL REFERENCES records(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),

-- File details
file_path VARCHAR(500) NOT NULL,
storage_tier VARCHAR(50) NOT NULL, -- hot, warm, cold
file_size_bytes BIGINT NOT NULL,

-- Checksum verification
stored_checksum VARCHAR(64) NOT NULL,
computed_checksum VARCHAR(64) NOT NULL,
matches BOOLEAN NOT NULL,

-- Check metadata
checked_at TIMESTAMP NOT NULL DEFAULT NOW(),
check_type VARCHAR(50) NOT NULL, -- scheduled, on_demand, recovery

-- Recovery (if needed)
recovery_initiated BOOLEAN DEFAULT FALSE,
recovery_successful BOOLEAN,
recovery_source VARCHAR(100), -- dr_replica, backup, etc.

-- Audit
created_at TIMESTAMP NOT NULL DEFAULT NOW(),

INDEX idx_integrity_record (record_id),
INDEX idx_integrity_tenant_date (tenant_id, checked_at),
INDEX idx_integrity_mismatches (matches, checked_at) WHERE matches = FALSE
);

7. Monitoring & Reporting

7.1 Archive Health Dashboard

Metrics:

MetricCalculationTargetAlert Threshold
Format Compliance %(valid_records / total_records) × 100>99%<95%
Rendering Success %(passed_tests / total_tests) × 100>98%<90%
Storage Utilizationused_capacity / total_capacity<80%>90%
Integrity Check Pass %(matching_checksums / total_checks) × 100100%<99.9%
Migration Backlogrecords_needing_migration<100>500
Access Latency (p95)95th percentile retrieval timeHot <1s, Warm <5s, Cold <12h2x target

Implementation:

# File: backend/qms/archival/monitoring.py

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List


@dataclass
class ArchiveHealthMetrics:
"""Archive health dashboard metrics."""
timestamp: datetime

# Format compliance
total_records: int
valid_format_records: int
format_compliance_percentage: float

# Rendering verification
total_rendering_tests: int
passed_rendering_tests: int
rendering_success_percentage: float

# Storage utilization
hot_tier_used_gb: float
hot_tier_capacity_gb: float
warm_tier_used_gb: float
warm_tier_capacity_gb: float
cold_tier_used_gb: float
cold_tier_capacity_gb: float
total_utilization_percentage: float

# Integrity
total_integrity_checks: int
matching_checksums: int
integrity_pass_percentage: float

# Migration
records_needing_migration: int

# Access performance
hot_access_latency_p95_ms: float
warm_access_latency_p95_ms: float
cold_access_latency_p95_ms: float


class ArchiveMonitoringService:
"""
Collects and reports archive health metrics.

Compliance: Continuous monitoring for data integrity
"""

def collect_health_metrics(self) -> ArchiveHealthMetrics:
"""
Collect current archive health metrics.

Returns:
ArchiveHealthMetrics snapshot
"""
# Query database for metrics

# Format compliance
total_records = self._count_total_records()
valid_format_records = self._count_valid_format_records()
format_compliance_pct = (valid_format_records / total_records * 100) if total_records > 0 else 0

# Rendering verification
total_rendering_tests = self._count_rendering_tests_last_30_days()
passed_rendering_tests = self._count_passed_rendering_tests_last_30_days()
rendering_success_pct = (passed_rendering_tests / total_rendering_tests * 100) if total_rendering_tests > 0 else 0

# Storage utilization
storage_stats = self._get_storage_utilization()

# Integrity checks
total_integrity_checks = self._count_integrity_checks_last_30_days()
matching_checksums = self._count_matching_checksums_last_30_days()
integrity_pass_pct = (matching_checksums / total_integrity_checks * 100) if total_integrity_checks > 0 else 0

# Migration backlog
records_needing_migration = self._count_records_needing_migration()

# Access latency
latency_stats = self._get_access_latency_stats()

return ArchiveHealthMetrics(
timestamp=datetime.utcnow(),
total_records=total_records,
valid_format_records=valid_format_records,
format_compliance_percentage=format_compliance_pct,
total_rendering_tests=total_rendering_tests,
passed_rendering_tests=passed_rendering_tests,
rendering_success_percentage=rendering_success_pct,
hot_tier_used_gb=storage_stats['hot_used'],
hot_tier_capacity_gb=storage_stats['hot_capacity'],
warm_tier_used_gb=storage_stats['warm_used'],
warm_tier_capacity_gb=storage_stats['warm_capacity'],
cold_tier_used_gb=storage_stats['cold_used'],
cold_tier_capacity_gb=storage_stats['cold_capacity'],
total_utilization_percentage=storage_stats['total_utilization'],
total_integrity_checks=total_integrity_checks,
matching_checksums=matching_checksums,
integrity_pass_percentage=integrity_pass_pct,
records_needing_migration=records_needing_migration,
hot_access_latency_p95_ms=latency_stats['hot_p95'],
warm_access_latency_p95_ms=latency_stats['warm_p95'],
cold_access_latency_p95_ms=latency_stats['cold_p95']
)

def _count_total_records(self) -> int:
"""Count total archived records."""
pass

def _count_valid_format_records(self) -> int:
"""Count records with valid archival format."""
pass

def _count_rendering_tests_last_30_days(self) -> int:
"""Count rendering tests in last 30 days."""
pass

def _count_passed_rendering_tests_last_30_days(self) -> int:
"""Count passed rendering tests in last 30 days."""
pass

def _get_storage_utilization(self) -> Dict[str, float]:
"""Get storage utilization stats for all tiers."""
pass

def _count_integrity_checks_last_30_days(self) -> int:
"""Count integrity checks in last 30 days."""
pass

def _count_matching_checksums_last_30_days(self) -> int:
"""Count integrity checks with matching checksums in last 30 days."""
pass

def _count_records_needing_migration(self) -> int:
"""Count records flagged for format migration."""
pass

def _get_access_latency_stats(self) -> Dict[str, float]:
"""Get access latency statistics by tier (p95)."""
pass

7.2 Compliance Reporting

Automated Reports:

ReportFrequencyRecipientsPurpose
Archive Health SummaryWeeklyQA Manager, IT DirectorOverall archive status
Format Compliance ReportMonthlyQuality AssuranceFormat validation metrics
Migration Status ReportQuarterlyIT Director, Compliance OfficerObsolescence tracking
Integrity Audit ReportQuarterlyQA Manager, Regulatory AffairsChecksum verification results
Cost Optimization ReportMonthlyCFO, IT DirectorStorage costs by tier
Access Audit ReportMonthlySecurity OfficerWho accessed what records

8. Compliance Mapping

8.1 FDA 21 CFR Part 11

RequirementControl ImplementationEvidence
§11.10(b) - LegibilityPDF/A-2b archival format, rendering verification, print controlsarchival_validation_log, rendering_test_results
§11.10(c) - Accurate and Complete CopiesRendering comparison (95% threshold), format validationrendering_test_results.similarity_score
§11.10(e) - Audit TrailPrint audit log, format migration log, integrity check logprint_audit_log, format_migration_log, storage_integrity_checks

8.2 EU Annex 11

RequirementControl ImplementationEvidence
§7.1 - Data StorageMulti-tier storage (Hot/Warm/Cold), geographic redundancy, encryptionGCS bucket configuration, KMS encryption
§7.2 - Data ProtectionSHA-256 checksums, bit rot detection, automated recoverystorage_integrity_checks
§13 - Change and Configuration ManagementFormat migration with validation and chain of custodyformat_migration_log

8.3 WHO TRS 996 Annex 5

RequirementControl ImplementationEvidence
§8.3 - Data IntegrityChecksum verification, rendering tests, format validationarchival_validation_log, storage_integrity_checks
§13.2 - Electronic Records Retention25-year minimum retention, lifecycle policies, no deletionGCS retention policy (788400000s)

9. Validation & Testing

9.1 Installation Qualification (IQ)

Objective: Verify that archival infrastructure is installed according to specifications.

Test Cases:

Test IDTest DescriptionExpected ResultEvidence
IQ-01Verify veraPDF installationveraPDF version ≥ 1.24veraPDF --version output
IQ-02Verify LibTIFF installationtiffinfo availabletiffinfo -h output
IQ-03Verify GCS bucket creationBuckets exist in US and EU regionsgsutil ls output
IQ-04Verify lifecycle policies appliedPolicies match specificationgsutil lifecycle get output
IQ-05Verify KMS encryption keysKeys exist and rotation enabledgcloud kms keys describe output
IQ-06Verify database schemaTables created with correct indexesSQL schema verification

9.2 Operational Qualification (OQ)

Objective: Verify that archival system operates according to specifications.

Test Cases:

Test IDTest DescriptionExpected ResultEvidence
OQ-01Format validation - PDF/A-2b compliantValidation passesValidationResult.is_valid = True
OQ-02Format validation - PDF/A-2b non-compliantValidation fails with errorsValidationResult.errors populated
OQ-03Rendering verification - identical renderingSimilarity ≥ 95%similarity_score ≥ 0.95
OQ-04Rendering verification - different renderingSimilarity < 95%, queued for reviewstatus = MANUAL_REVIEW
OQ-05Format migration - TIFF to PDF/AMigration succeeds, rendering matchesMigrationResult.success = True
OQ-06Print watermarking - controlled copyWatermark applied correctlyVisual verification of PDF
OQ-07Storage tier transition - Hot to WarmFile moved after 90 daysGCS storage class = NEARLINE
OQ-08Integrity check - matching checksumCheck passesIntegrityCheckResult.matches = True
OQ-09Integrity check - corrupted fileCheck fails, recovery initiatedrecovery_initiated = True

9.3 Performance Qualification (PQ)

Objective: Verify that archival system performs under production load.

Test Cases:

Test IDTest DescriptionExpected ResultEvidence
PQ-01Validation throughput≥100 records/hourPerformance test log
PQ-02Rendering verification throughput≥50 comparisons/hourPerformance test log
PQ-03Hot storage access latencyp95 < 1 secondLatency metrics
PQ-04Warm storage access latencyp95 < 5 secondsLatency metrics
PQ-05Cold storage retrieval timep95 < 12 hoursLatency metrics
PQ-06Concurrent access100 concurrent requests without degradationLoad test results

10. Standard Operating Procedures (SOPs)

SOP-001: Annual Format Viability Assessment

Purpose: Assess continued viability of archival formats and identify obsolescence risks.

Frequency: Annually (January)

Procedure:

  1. Review current archival format usage statistics
  2. Research industry format adoption trends
  3. Assess renderer availability for each format
  4. Evaluate browser/software support status
  5. Check ISO standard status (active, stable, deprecated)
  6. Assign viability status (Green/Yellow/Red)
  7. If Red status, create migration project plan
  8. Document findings in annual assessment report
  9. Present to Quality Assurance and IT leadership

Responsibilities:

  • Owner: IT Director
  • Reviewer: Quality Assurance Manager
  • Approver: Head of Regulatory Affairs

SOP-002: Quarterly Rendering Verification

Purpose: Verify archived records remain accurately renderable.

Frequency: Quarterly (Hot tier), Semi-annually (Warm tier), Annually (Cold tier)

Procedure:

  1. Generate test sample (5% random + 100% critical records)
  2. Execute automated rendering comparison script
  3. Review MANUAL_REVIEW queue items
  4. Document pass/fail results
  5. Escalate failures to Quality Assurance
  6. Update rendering test database
  7. Generate quarterly report

Responsibilities:

  • Owner: QA Engineer
  • Reviewer: QA Manager
  • Approver: Quality Assurance Manager

SOP-003: Format Migration Execution

Purpose: Migrate records from obsolete format to current archival format.

Frequency: As needed (triggered by format assessment)

Procedure:

  1. Create migration project plan
  2. Identify records requiring migration
  3. Execute migration pipeline on test sample (10 records)
  4. Validate test migrations (format + rendering)
  5. If test successful, proceed to production migration
  6. Execute production migration in batches (100 records/batch)
  7. Validate each batch before proceeding
  8. Preserve original files in quarantine storage
  9. Log all migrations to audit trail
  10. Generate migration completion report

Responsibilities:

  • Owner: IT Operations Engineer
  • Reviewer: QA Engineer
  • Approver: Quality Assurance Manager

11. Appendices

Appendix A: veraPDF Integration Script

#!/bin/bash
# File: scripts/validate-pdfa.sh
# Purpose: Validate PDF/A compliance using veraPDF

set -euo pipefail

VERAPDF_PATH="/opt/verapdf/verapdf"
INPUT_FILE="$1"
OUTPUT_REPORT="${2:-validation-report.xml}"

if [ ! -f "$INPUT_FILE" ]; then
echo "Error: Input file not found: $INPUT_FILE"
exit 1
fi

# Run veraPDF validation
"$VERAPDF_PATH" \
--format xml \
--flavour 2b \
--verbose \
"$INPUT_FILE" > "$OUTPUT_REPORT"

# Parse result
if grep -q '<compliant>true</compliant>' "$OUTPUT_REPORT"; then
echo "✓ PDF/A-2b validation PASSED"
exit 0
else
echo "✗ PDF/A-2b validation FAILED"
echo "See report: $OUTPUT_REPORT"
exit 1
fi

Appendix B: GCS Lifecycle Policy Deployment

#!/bin/bash
# File: scripts/deploy-gcs-lifecycle.sh
# Purpose: Deploy GCS lifecycle policies to archival buckets

set -euo pipefail

PROJECT_ID="coditect-qms-prod"
BUCKET_NAME="${PROJECT_ID}-archival-records"
POLICY_FILE="gcs-lifecycle-policy.json"

# Create lifecycle policy JSON
cat > "$POLICY_FILE" <<EOF
{
"lifecycle": {
"rule": [
{
"action": {
"type": "SetStorageClass",
"storageClass": "NEARLINE"
},
"condition": {
"age": 90,
"matchesPrefix": ["archived-records/"],
"matchesSuffix": [".pdf", ".pdfa.pdf"]
}
},
{
"action": {
"type": "SetStorageClass",
"storageClass": "ARCHIVE"
},
"condition": {
"age": 730,
"matchesPrefix": ["archived-records/"],
"matchesSuffix": [".pdf", ".pdfa.pdf"]
}
}
]
}
}
EOF

# Apply lifecycle policy
gsutil lifecycle set "$POLICY_FILE" "gs://${BUCKET_NAME}"

echo "✓ Lifecycle policy applied to gs://${BUCKET_NAME}"

# Verify
gsutil lifecycle get "gs://${BUCKET_NAME}"

Appendix C: Monitoring Setup (Prometheus Metrics)

# File: backend/qms/archival/metrics.py

from prometheus_client import Counter, Gauge, Histogram

# Format validation metrics
format_validation_total = Counter(
'archival_format_validation_total',
'Total format validations performed',
['tenant_id', 'format_type', 'result']
)

format_compliance_percentage = Gauge(
'archival_format_compliance_percentage',
'Percentage of records with valid format',
['tenant_id']
)

# Rendering verification metrics
rendering_test_total = Counter(
'archival_rendering_test_total',
'Total rendering tests performed',
['tenant_id', 'status']
)

rendering_similarity_score = Histogram(
'archival_rendering_similarity_score',
'Rendering similarity scores',
['tenant_id'],
buckets=[0.5, 0.7, 0.8, 0.9, 0.95, 0.98, 0.99, 1.0]
)

# Storage integrity metrics
integrity_check_total = Counter(
'archival_integrity_check_total',
'Total integrity checks performed',
['tenant_id', 'storage_tier', 'result']
)

# Migration metrics
migration_total = Counter(
'archival_migration_total',
'Total format migrations performed',
['tenant_id', 'source_format', 'target_format', 'result']
)

# Storage utilization metrics
storage_utilization_bytes = Gauge(
'archival_storage_utilization_bytes',
'Storage utilization in bytes',
['tenant_id', 'storage_tier']
)

# Access latency metrics
access_latency_seconds = Histogram(
'archival_access_latency_seconds',
'Record access latency in seconds',
['tenant_id', 'storage_tier'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)

Document Control

Version History:

VersionDateAuthorChanges
1.0.02026-02-16CODITECT Compliance Framework SpecialistInitial release

Approval:

RoleNameSignatureDate
AuthorCODITECT Compliance Framework SpecialistPending2026-02-16
ReviewerQuality Assurance ManagerPendingPending
ApproverHead of Regulatory AffairsPendingPending

Next Review Date: 2027-02-16 (annual review)


End of Document