Skip to main content

ADR-032: Compliance Audit Layer for Corpus Processing

Status

PROPOSED

Date

2026-01-15

Context

Coditect targets regulated industries (healthcare, fintech) where corpus processing must comply with:

  • 21 CFR Part 11: FDA electronic records requirements
  • HIPAA: Healthcare data privacy
  • SOC2: Security controls for service organizations
  • GDPR: European data protection (where applicable)

The "unlimited memory" video demonstrates zero compliance controls—files are written without audit trails, access control, or signatures. Enterprise adoption requires comprehensive compliance infrastructure.

Regulatory Requirements Summary

RegulationKey RequirementsImpact on Corpus Processing
21 CFR Part 11Electronic signatures, audit trails, access controlAll operations must be signed and audited
HIPAAPHI protection, minimum necessary, access logsFilter PHI before processing, log all access
SOC2Security controls, availability, confidentialityEncrypt at rest/transit, access control
GDPRData minimization, right to erasureSupport data deletion, minimize retention

Decision

Implement a Compliance Audit Layer that wraps all corpus processing operations with comprehensive controls.

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│ COMPLIANCE AUDIT LAYER │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ACCESS CONTROL GATEWAY │ │
│ │ - Authentication verification │ │
│ │ - Role-based access control (RBAC) │ │
│ │ - Attribute-based access control (ABAC) │ │
│ │ - Corpus-level permissions │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ PHI/PII DETECTION & REDACTION │ │
│ │ - Pre-processing PHI scan │ │
│ │ - Configurable redaction policies │ │
│ │ - Safe harbor de-identification │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ OPERATION PROCESSOR │ │
│ │ (Actual corpus processing) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ AUDIT TRAIL RECORDER │ │
│ │ - Immutable event log │ │
│ │ - Tamper-evident chain │ │
│ │ - Timestamp authority │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ELECTRONIC SIGNATURE SERVICE │ │
│ │ - Cryptographic signatures │ │
│ │ - Signature meaning capture │ │
│ │ - Certificate management │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘

Core Components

1. Access Control Gateway

@dataclass
class Permission(Enum):
"""Corpus processing permissions"""
CORPUS_CREATE = "corpus:create"
CORPUS_READ = "corpus:read"
CORPUS_UPDATE = "corpus:update"
CORPUS_DELETE = "corpus:delete"
CORPUS_EXPORT = "corpus:export"
CORPUS_QUERY = "corpus:query"
CORPUS_ADMIN = "corpus:admin"


@dataclass
class AccessPolicy:
"""Access policy definition"""
policy_id: UUID
name: str

# RBAC
allowed_roles: List[str]

# ABAC conditions
conditions: List[AccessCondition]

# Scope
corpus_patterns: List[str] # Glob patterns for corpus IDs

# Temporal
valid_from: Optional[datetime]
valid_until: Optional[datetime]

# Audit
require_reason: bool = True
require_approval: bool = False


@dataclass
class AccessCondition:
"""Attribute-based access condition"""
attribute: str # e.g., "user.department", "corpus.classification"
operator: Literal["eq", "ne", "in", "not_in", "contains"]
value: Any


class AccessControlGateway:
"""Enforce access control for corpus operations"""

def __init__(
self,
policy_store: PolicyStore,
audit_recorder: AuditRecorder
):
self.policy_store = policy_store
self.audit_recorder = audit_recorder

async def check_access(
self,
user: UserContext,
operation: Permission,
corpus_id: UUID,
access_reason: Optional[str] = None
) -> AccessDecision:
"""Check if user can perform operation on corpus"""

# Get applicable policies
policies = await self.policy_store.get_policies_for_corpus(corpus_id)

for policy in policies:
# Check role
if not self._check_role(user, policy):
continue

# Check conditions
if not await self._check_conditions(user, corpus_id, policy):
continue

# Check temporal validity
if not self._check_temporal(policy):
continue

# Check operation permission
if operation in policy.allowed_permissions:
# Check if reason required
if policy.require_reason and not access_reason:
return AccessDecision(
allowed=False,
reason="Access reason required",
policy_id=policy.policy_id
)

# Check if approval required
if policy.require_approval:
approval = await self._get_approval(user, operation, corpus_id)
if not approval:
return AccessDecision(
allowed=False,
reason="Approval required",
policy_id=policy.policy_id
)

# Log access
await self.audit_recorder.record_access(
user=user,
operation=operation,
corpus_id=corpus_id,
access_reason=access_reason,
policy_id=policy.policy_id,
decision="ALLOWED"
)

return AccessDecision(
allowed=True,
policy_id=policy.policy_id
)

# No matching policy - deny
await self.audit_recorder.record_access(
user=user,
operation=operation,
corpus_id=corpus_id,
access_reason=access_reason,
decision="DENIED",
denial_reason="No matching policy"
)

return AccessDecision(
allowed=False,
reason="No policy allows this operation"
)

2. PHI/PII Detection and Redaction

@dataclass
class PHICategory(Enum):
"""HIPAA PHI categories"""
NAME = "name"
ADDRESS = "address"
DATES = "dates" # Except year
PHONE = "phone"
FAX = "fax"
EMAIL = "email"
SSN = "ssn"
MRN = "mrn" # Medical record number
HEALTH_PLAN = "health_plan"
ACCOUNT = "account"
LICENSE = "license"
VEHICLE = "vehicle"
DEVICE = "device"
URL = "url"
IP = "ip"
BIOMETRIC = "biometric"
PHOTO = "photo"
OTHER_UNIQUE = "other_unique"


@dataclass
class RedactionPolicy:
"""Policy for PHI/PII redaction"""
policy_id: UUID
name: str

# What to redact
phi_categories: List[PHICategory]
custom_patterns: Dict[str, str] # Name -> regex

# How to redact
redaction_method: Literal["remove", "mask", "hash", "generalize"]
mask_char: str = "X"

# Safe harbor compliance
enable_safe_harbor: bool = True
retain_year: bool = True # For dates, retain year per Safe Harbor


class PHIRedactor:
"""Detect and redact PHI/PII from documents"""

def __init__(
self,
policy: RedactionPolicy,
ner_model: str = "en_core_web_lg"
):
self.policy = policy
self.nlp = spacy.load(ner_model)
self.phi_detector = PHIDetector()

async def process_document(
self,
document: Document,
audit_context: AuditContext
) -> Tuple[Document, RedactionReport]:
"""Process document for PHI redaction"""

# Detect PHI
detections = await self._detect_phi(document.content)

# Filter by policy
to_redact = [
d for d in detections
if d.category in self.policy.phi_categories
]

# Apply redaction
redacted_content = document.content
redaction_records = []

for detection in sorted(to_redact, key=lambda x: x.span[0], reverse=True):
# Redact
replacement = self._get_replacement(detection)
redacted_content = (
redacted_content[:detection.span[0]] +
replacement +
redacted_content[detection.span[1]:]
)

# Record (with hash, not original value)
redaction_records.append(RedactionRecord(
category=detection.category,
original_hash=hashlib.sha256(detection.text.encode()).hexdigest(),
replacement=replacement,
span=detection.span,
confidence=detection.confidence
))

# Create report
report = RedactionReport(
document_id=document.id,
total_detections=len(detections),
total_redactions=len(redaction_records),
records=redaction_records,
policy_applied=self.policy.policy_id
)

# Audit
await audit_context.record_redaction(report)

return Document(
id=document.id,
content=redacted_content,
metadata={
**document.metadata,
'phi_redacted': True,
'redaction_report_id': report.id
}
), report

async def _detect_phi(self, text: str) -> List[PHIDetection]:
"""Detect PHI in text"""

detections = []

# NER-based detection
doc = self.nlp(text)
for ent in doc.ents:
category = self._map_ner_to_phi(ent.label_)
if category:
detections.append(PHIDetection(
text=ent.text,
category=category,
span=(ent.start_char, ent.end_char),
confidence=0.9,
method="ner"
))

# Pattern-based detection
for name, pattern in self.policy.custom_patterns.items():
for match in re.finditer(pattern, text):
detections.append(PHIDetection(
text=match.group(),
category=PHICategory.OTHER_UNIQUE,
span=(match.start(), match.end()),
confidence=0.95,
method=f"pattern:{name}"
))

# Specialized detectors
detections.extend(await self.phi_detector.detect_ssn(text))
detections.extend(await self.phi_detector.detect_mrn(text))
detections.extend(await self.phi_detector.detect_dates(text))

# Deduplicate overlapping
return self._deduplicate_detections(detections)

3. Audit Trail Recorder

@dataclass
class AuditEvent:
"""Immutable audit event (21 CFR Part 11 compliant)"""

# Identity
event_id: UUID
sequence_number: int # Monotonic within corpus

# Chain integrity
previous_hash: str
event_hash: str

# Timestamp
timestamp: datetime
timestamp_authority: str # e.g., "rfc3161.digicert.com"

# Actor
operator_id: str
operator_role: str
session_id: UUID

# Action
action: str # e.g., "CORPUS_CREATED", "DOCUMENT_ADDED", "QUERY_EXECUTED"
resource_type: str
resource_id: UUID

# Details
details: Dict[str, Any]

# Before/after for modifications
before_state: Optional[str] # JSON or hash
after_state: Optional[str]

# Reason
reason: Optional[str]

def compute_hash(self) -> str:
"""Compute tamper-evident hash"""
content = json.dumps({
'event_id': str(self.event_id),
'sequence_number': self.sequence_number,
'previous_hash': self.previous_hash,
'timestamp': self.timestamp.isoformat(),
'operator_id': self.operator_id,
'action': self.action,
'resource_id': str(self.resource_id),
'details': self.details
}, sort_keys=True)

return hashlib.sha256(content.encode()).hexdigest()

def verify_chain(self, previous_event: Optional['AuditEvent']) -> bool:
"""Verify chain integrity"""
if previous_event is None:
return self.previous_hash == "GENESIS"

return self.previous_hash == previous_event.event_hash


class AuditRecorder:
"""Record immutable audit trail"""

def __init__(
self,
fdb_cluster: str,
timestamp_authority: str = "internal"
):
self.db = fdb.open(fdb_cluster)
self.timestamp_authority = timestamp_authority
self._init_directories()

@fdb.transactional
async def record(
self,
tr: fdb.Transaction,
corpus_id: UUID,
action: str,
operator: UserContext,
resource_type: str,
resource_id: UUID,
details: Dict[str, Any],
before_state: Optional[str] = None,
after_state: Optional[str] = None,
reason: Optional[str] = None
) -> AuditEvent:
"""Record audit event atomically"""

# Get previous event for chain
previous = await self._get_latest_event(tr, corpus_id)

# Get sequence number
sequence = await self._get_next_sequence(tr, corpus_id)

# Get timestamp
timestamp = await self._get_trusted_timestamp()

# Create event
event = AuditEvent(
event_id=uuid4(),
sequence_number=sequence,
previous_hash=previous.event_hash if previous else "GENESIS",
event_hash="", # Computed below
timestamp=timestamp,
timestamp_authority=self.timestamp_authority,
operator_id=operator.user_id,
operator_role=operator.role,
session_id=operator.session_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
details=details,
before_state=before_state,
after_state=after_state,
reason=reason
)

# Compute hash
event.event_hash = event.compute_hash()

# Store
key = self.dirs['audit'][str(corpus_id)][str(sequence)]
tr[key] = self._serialize(event)

# Update latest pointer
latest_key = self.dirs['audit_latest'][str(corpus_id)]
tr[latest_key] = str(sequence).encode()

return event

async def verify_chain_integrity(
self,
corpus_id: UUID,
start_sequence: int = 0,
end_sequence: Optional[int] = None
) -> ChainVerificationResult:
"""Verify audit chain integrity"""

events = await self._get_events_range(corpus_id, start_sequence, end_sequence)

if not events:
return ChainVerificationResult(valid=True, events_checked=0)

previous = None
for event in events:
# Verify hash
computed_hash = event.compute_hash()
if computed_hash != event.event_hash:
return ChainVerificationResult(
valid=False,
error=f"Hash mismatch at sequence {event.sequence_number}",
invalid_event=event
)

# Verify chain
if not event.verify_chain(previous):
return ChainVerificationResult(
valid=False,
error=f"Chain break at sequence {event.sequence_number}",
invalid_event=event
)

previous = event

return ChainVerificationResult(
valid=True,
events_checked=len(events),
first_sequence=events[0].sequence_number,
last_sequence=events[-1].sequence_number
)

async def export_audit_trail(
self,
corpus_id: UUID,
format: Literal["json", "csv", "pdf"]
) -> bytes:
"""Export audit trail in specified format"""

events = await self._get_all_events(corpus_id)

if format == "json":
return json.dumps([self._event_to_dict(e) for e in events], indent=2).encode()

elif format == "csv":
return self._events_to_csv(events)

elif format == "pdf":
return await self._events_to_pdf(events)

4. Electronic Signature Service

@dataclass
class SignatureMeaning(Enum):
"""21 CFR Part 11 signature meanings"""
CREATED = "created"
REVIEWED = "reviewed"
APPROVED = "approved"
VERIFIED = "verified"
AUTHORIZED = "authorized"
RESPONSIBLE = "responsible"


@dataclass
class ElectronicSignature:
"""21 CFR Part 11 compliant electronic signature"""

signature_id: UUID

# Signer identity
signer_id: str
signer_name: str
signer_title: str

# Signature components (21 CFR Part 11.200)
# Must include at least two components
component_1: str # User ID
component_2_hash: str # Password hash (never stored plain)

# Meaning
meaning: SignatureMeaning
meaning_text: str # Full text of what signature means

# Timestamp
timestamp: datetime
timestamp_authority: str

# Cryptographic signature
signature_algorithm: str # e.g., "RSA-SHA256"
public_key_id: str # Reference to PKI certificate
signature_value: str # Base64 encoded signature

# What was signed
signed_content_hash: str
signed_content_type: str
signed_content_id: UUID


class SignatureService:
"""Electronic signature service (21 CFR Part 11 compliant)"""

def __init__(
self,
key_store: KeyStore,
audit_recorder: AuditRecorder
):
self.key_store = key_store
self.audit_recorder = audit_recorder

async def sign(
self,
content: bytes,
content_type: str,
content_id: UUID,
signer: UserContext,
meaning: SignatureMeaning,
meaning_text: str,
password: str # For 2-component verification
) -> ElectronicSignature:
"""Create electronic signature"""

# Verify signer identity (2 components per 21 CFR Part 11.200)
if not await self._verify_password(signer.user_id, password):
raise SignatureError("Authentication failed")

# Get signer's private key
private_key = await self.key_store.get_private_key(signer.user_id)

# Compute content hash
content_hash = hashlib.sha256(content).hexdigest()

# Create signature payload
payload = json.dumps({
'content_hash': content_hash,
'signer_id': signer.user_id,
'meaning': meaning.value,
'timestamp': datetime.utcnow().isoformat()
}, sort_keys=True)

# Sign
signature_value = private_key.sign(
payload.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)

# Create signature record
signature = ElectronicSignature(
signature_id=uuid4(),
signer_id=signer.user_id,
signer_name=signer.full_name,
signer_title=signer.title,
component_1=signer.user_id,
component_2_hash=hashlib.sha256(password.encode()).hexdigest(),
meaning=meaning,
meaning_text=meaning_text,
timestamp=datetime.utcnow(),
timestamp_authority="internal",
signature_algorithm="RSA-SHA256",
public_key_id=private_key.public_key_id,
signature_value=base64.b64encode(signature_value).decode(),
signed_content_hash=content_hash,
signed_content_type=content_type,
signed_content_id=content_id
)

# Audit
await self.audit_recorder.record(
corpus_id=content_id, # Assuming content is corpus
action="SIGNATURE_CREATED",
operator=signer,
resource_type="signature",
resource_id=signature.signature_id,
details={
'meaning': meaning.value,
'content_hash': content_hash
}
)

return signature

async def verify(
self,
signature: ElectronicSignature,
content: bytes
) -> VerificationResult:
"""Verify electronic signature"""

# Verify content hash
content_hash = hashlib.sha256(content).hexdigest()
if content_hash != signature.signed_content_hash:
return VerificationResult(
valid=False,
error="Content hash mismatch"
)

# Get public key
public_key = await self.key_store.get_public_key(signature.public_key_id)

# Verify signature
payload = json.dumps({
'content_hash': signature.signed_content_hash,
'signer_id': signature.signer_id,
'meaning': signature.meaning.value,
'timestamp': signature.timestamp.isoformat()
}, sort_keys=True)

try:
public_key.verify(
base64.b64decode(signature.signature_value),
payload.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
except InvalidSignature:
return VerificationResult(
valid=False,
error="Cryptographic signature invalid"
)

return VerificationResult(
valid=True,
signer_id=signature.signer_id,
signer_name=signature.signer_name,
meaning=signature.meaning,
timestamp=signature.timestamp
)

Commands

compliance_commands:
- command: "@compliance:audit"
description: View audit trail for corpus
parameters:
- corpus_id: string (required)
- start_date: datetime (optional)
- end_date: datetime (optional)
- actions: List[string] (optional, filter by action)
- format: enum[json, table, pdf] (default: table)

- command: "@compliance:verify"
description: Verify audit chain integrity
parameters:
- corpus_id: string (required)
returns:
- valid: bool
- events_checked: int
- errors: List[string]

- command: "@compliance:sign"
description: Sign corpus export
parameters:
- corpus_id: string (required)
- meaning: enum[approved, reviewed, authorized]
- meaning_text: string (optional)

- command: "@compliance:export"
description: Export corpus with audit trail
parameters:
- corpus_id: string (required)
- format: enum[json, pdf]
- include_audit: bool (default: true)
- include_signatures: bool (default: true)

Consequences

Positive

  • Full regulatory compliance: 21 CFR Part 11, HIPAA, SOC2
  • Tamper-evident audit: Cryptographic chain integrity
  • PHI protection: Automatic detection and redaction
  • Non-repudiation: Electronic signatures with legal validity
  • Access control: Fine-grained RBAC + ABAC

Negative

  • Performance overhead: 10-20% additional latency for audit
  • Storage growth: Audit trail grows with operations
  • Key management: PKI infrastructure required
  • Training requirement: Users must understand signature meanings

Compliance Mapping

RequirementComponentImplementation
21 CFR Part 11.10(a) - ValidationAllTest documentation, IQ/OQ/PQ
21 CFR Part 11.10(b) - Legible copiesExportPDF with signatures
21 CFR Part 11.10(c) - ProtectionAccess ControlRBAC + encryption
21 CFR Part 11.10(d) - Access limitsAccess ControlABAC conditions
21 CFR Part 11.10(e) - Audit trailsAudit RecorderImmutable chain
21 CFR Part 11.10(g) - Authority checksAccess ControlRole verification
21 CFR Part 11.50 - Signature manifestationSignature ServiceMeaning + timestamp
21 CFR Part 11.100 - UniquenessSignature ServiceUser ID + password
HIPAA 164.312(b) - Audit controlsAudit RecorderAll access logged
HIPAA 164.312(c) - IntegrityAudit RecorderHash chain
HIPAA 164.312(e) - Transmission securityAllTLS + encryption

References

Approval

RoleNameDateDecision
CTOHal Casteel
Security Lead
Compliance Officer