ADR-032: Compliance Audit Layer for Corpus Processing
Status
PROPOSED
Date
2026-01-15
Context
Coditect targets regulated industries (healthcare, fintech) where corpus processing must comply with:
- 21 CFR Part 11: FDA electronic records requirements
- HIPAA: Healthcare data privacy
- SOC2: Security controls for service organizations
- GDPR: European data protection (where applicable)
The "unlimited memory" video demonstrates zero compliance controls—files are written without audit trails, access control, or signatures. Enterprise adoption requires comprehensive compliance infrastructure.
Regulatory Requirements Summary
| Regulation | Key Requirements | Impact on Corpus Processing |
|---|---|---|
| 21 CFR Part 11 | Electronic signatures, audit trails, access control | All operations must be signed and audited |
| HIPAA | PHI protection, minimum necessary, access logs | Filter PHI before processing, log all access |
| SOC2 | Security controls, availability, confidentiality | Encrypt at rest/transit, access control |
| GDPR | Data minimization, right to erasure | Support data deletion, minimize retention |
Decision
Implement a Compliance Audit Layer that wraps all corpus processing operations with comprehensive controls.
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ COMPLIANCE AUDIT LAYER │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ACCESS CONTROL GATEWAY │ │
│ │ - Authentication verification │ │
│ │ - Role-based access control (RBAC) │ │
│ │ - Attribute-based access control (ABAC) │ │
│ │ - Corpus-level permissions │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ PHI/PII DETECTION & REDACTION │ │
│ │ - Pre-processing PHI scan │ │
│ │ - Configurable redaction policies │ │
│ │ - Safe harbor de-identification │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ OPERATION PROCESSOR │ │
│ │ (Actual corpus processing) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ AUDIT TRAIL RECORDER │ │
│ │ - Immutable event log │ │
│ │ - Tamper-evident chain │ │
│ │ - Timestamp authority │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ELECTRONIC SIGNATURE SERVICE │ │
│ │ - Cryptographic signatures │ │
│ │ - Signature meaning capture │ │
│ │ - Certificate management │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Core Components
1. Access Control Gateway
@dataclass
class Permission(Enum):
"""Corpus processing permissions"""
CORPUS_CREATE = "corpus:create"
CORPUS_READ = "corpus:read"
CORPUS_UPDATE = "corpus:update"
CORPUS_DELETE = "corpus:delete"
CORPUS_EXPORT = "corpus:export"
CORPUS_QUERY = "corpus:query"
CORPUS_ADMIN = "corpus:admin"
@dataclass
class AccessPolicy:
"""Access policy definition"""
policy_id: UUID
name: str
# RBAC
allowed_roles: List[str]
# ABAC conditions
conditions: List[AccessCondition]
# Scope
corpus_patterns: List[str] # Glob patterns for corpus IDs
# Temporal
valid_from: Optional[datetime]
valid_until: Optional[datetime]
# Audit
require_reason: bool = True
require_approval: bool = False
@dataclass
class AccessCondition:
"""Attribute-based access condition"""
attribute: str # e.g., "user.department", "corpus.classification"
operator: Literal["eq", "ne", "in", "not_in", "contains"]
value: Any
class AccessControlGateway:
"""Enforce access control for corpus operations"""
def __init__(
self,
policy_store: PolicyStore,
audit_recorder: AuditRecorder
):
self.policy_store = policy_store
self.audit_recorder = audit_recorder
async def check_access(
self,
user: UserContext,
operation: Permission,
corpus_id: UUID,
access_reason: Optional[str] = None
) -> AccessDecision:
"""Check if user can perform operation on corpus"""
# Get applicable policies
policies = await self.policy_store.get_policies_for_corpus(corpus_id)
for policy in policies:
# Check role
if not self._check_role(user, policy):
continue
# Check conditions
if not await self._check_conditions(user, corpus_id, policy):
continue
# Check temporal validity
if not self._check_temporal(policy):
continue
# Check operation permission
if operation in policy.allowed_permissions:
# Check if reason required
if policy.require_reason and not access_reason:
return AccessDecision(
allowed=False,
reason="Access reason required",
policy_id=policy.policy_id
)
# Check if approval required
if policy.require_approval:
approval = await self._get_approval(user, operation, corpus_id)
if not approval:
return AccessDecision(
allowed=False,
reason="Approval required",
policy_id=policy.policy_id
)
# Log access
await self.audit_recorder.record_access(
user=user,
operation=operation,
corpus_id=corpus_id,
access_reason=access_reason,
policy_id=policy.policy_id,
decision="ALLOWED"
)
return AccessDecision(
allowed=True,
policy_id=policy.policy_id
)
# No matching policy - deny
await self.audit_recorder.record_access(
user=user,
operation=operation,
corpus_id=corpus_id,
access_reason=access_reason,
decision="DENIED",
denial_reason="No matching policy"
)
return AccessDecision(
allowed=False,
reason="No policy allows this operation"
)
2. PHI/PII Detection and Redaction
@dataclass
class PHICategory(Enum):
"""HIPAA PHI categories"""
NAME = "name"
ADDRESS = "address"
DATES = "dates" # Except year
PHONE = "phone"
FAX = "fax"
EMAIL = "email"
SSN = "ssn"
MRN = "mrn" # Medical record number
HEALTH_PLAN = "health_plan"
ACCOUNT = "account"
LICENSE = "license"
VEHICLE = "vehicle"
DEVICE = "device"
URL = "url"
IP = "ip"
BIOMETRIC = "biometric"
PHOTO = "photo"
OTHER_UNIQUE = "other_unique"
@dataclass
class RedactionPolicy:
"""Policy for PHI/PII redaction"""
policy_id: UUID
name: str
# What to redact
phi_categories: List[PHICategory]
custom_patterns: Dict[str, str] # Name -> regex
# How to redact
redaction_method: Literal["remove", "mask", "hash", "generalize"]
mask_char: str = "X"
# Safe harbor compliance
enable_safe_harbor: bool = True
retain_year: bool = True # For dates, retain year per Safe Harbor
class PHIRedactor:
"""Detect and redact PHI/PII from documents"""
def __init__(
self,
policy: RedactionPolicy,
ner_model: str = "en_core_web_lg"
):
self.policy = policy
self.nlp = spacy.load(ner_model)
self.phi_detector = PHIDetector()
async def process_document(
self,
document: Document,
audit_context: AuditContext
) -> Tuple[Document, RedactionReport]:
"""Process document for PHI redaction"""
# Detect PHI
detections = await self._detect_phi(document.content)
# Filter by policy
to_redact = [
d for d in detections
if d.category in self.policy.phi_categories
]
# Apply redaction
redacted_content = document.content
redaction_records = []
for detection in sorted(to_redact, key=lambda x: x.span[0], reverse=True):
# Redact
replacement = self._get_replacement(detection)
redacted_content = (
redacted_content[:detection.span[0]] +
replacement +
redacted_content[detection.span[1]:]
)
# Record (with hash, not original value)
redaction_records.append(RedactionRecord(
category=detection.category,
original_hash=hashlib.sha256(detection.text.encode()).hexdigest(),
replacement=replacement,
span=detection.span,
confidence=detection.confidence
))
# Create report
report = RedactionReport(
document_id=document.id,
total_detections=len(detections),
total_redactions=len(redaction_records),
records=redaction_records,
policy_applied=self.policy.policy_id
)
# Audit
await audit_context.record_redaction(report)
return Document(
id=document.id,
content=redacted_content,
metadata={
**document.metadata,
'phi_redacted': True,
'redaction_report_id': report.id
}
), report
async def _detect_phi(self, text: str) -> List[PHIDetection]:
"""Detect PHI in text"""
detections = []
# NER-based detection
doc = self.nlp(text)
for ent in doc.ents:
category = self._map_ner_to_phi(ent.label_)
if category:
detections.append(PHIDetection(
text=ent.text,
category=category,
span=(ent.start_char, ent.end_char),
confidence=0.9,
method="ner"
))
# Pattern-based detection
for name, pattern in self.policy.custom_patterns.items():
for match in re.finditer(pattern, text):
detections.append(PHIDetection(
text=match.group(),
category=PHICategory.OTHER_UNIQUE,
span=(match.start(), match.end()),
confidence=0.95,
method=f"pattern:{name}"
))
# Specialized detectors
detections.extend(await self.phi_detector.detect_ssn(text))
detections.extend(await self.phi_detector.detect_mrn(text))
detections.extend(await self.phi_detector.detect_dates(text))
# Deduplicate overlapping
return self._deduplicate_detections(detections)
3. Audit Trail Recorder
@dataclass
class AuditEvent:
"""Immutable audit event (21 CFR Part 11 compliant)"""
# Identity
event_id: UUID
sequence_number: int # Monotonic within corpus
# Chain integrity
previous_hash: str
event_hash: str
# Timestamp
timestamp: datetime
timestamp_authority: str # e.g., "rfc3161.digicert.com"
# Actor
operator_id: str
operator_role: str
session_id: UUID
# Action
action: str # e.g., "CORPUS_CREATED", "DOCUMENT_ADDED", "QUERY_EXECUTED"
resource_type: str
resource_id: UUID
# Details
details: Dict[str, Any]
# Before/after for modifications
before_state: Optional[str] # JSON or hash
after_state: Optional[str]
# Reason
reason: Optional[str]
def compute_hash(self) -> str:
"""Compute tamper-evident hash"""
content = json.dumps({
'event_id': str(self.event_id),
'sequence_number': self.sequence_number,
'previous_hash': self.previous_hash,
'timestamp': self.timestamp.isoformat(),
'operator_id': self.operator_id,
'action': self.action,
'resource_id': str(self.resource_id),
'details': self.details
}, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def verify_chain(self, previous_event: Optional['AuditEvent']) -> bool:
"""Verify chain integrity"""
if previous_event is None:
return self.previous_hash == "GENESIS"
return self.previous_hash == previous_event.event_hash
class AuditRecorder:
"""Record immutable audit trail"""
def __init__(
self,
fdb_cluster: str,
timestamp_authority: str = "internal"
):
self.db = fdb.open(fdb_cluster)
self.timestamp_authority = timestamp_authority
self._init_directories()
@fdb.transactional
async def record(
self,
tr: fdb.Transaction,
corpus_id: UUID,
action: str,
operator: UserContext,
resource_type: str,
resource_id: UUID,
details: Dict[str, Any],
before_state: Optional[str] = None,
after_state: Optional[str] = None,
reason: Optional[str] = None
) -> AuditEvent:
"""Record audit event atomically"""
# Get previous event for chain
previous = await self._get_latest_event(tr, corpus_id)
# Get sequence number
sequence = await self._get_next_sequence(tr, corpus_id)
# Get timestamp
timestamp = await self._get_trusted_timestamp()
# Create event
event = AuditEvent(
event_id=uuid4(),
sequence_number=sequence,
previous_hash=previous.event_hash if previous else "GENESIS",
event_hash="", # Computed below
timestamp=timestamp,
timestamp_authority=self.timestamp_authority,
operator_id=operator.user_id,
operator_role=operator.role,
session_id=operator.session_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
details=details,
before_state=before_state,
after_state=after_state,
reason=reason
)
# Compute hash
event.event_hash = event.compute_hash()
# Store
key = self.dirs['audit'][str(corpus_id)][str(sequence)]
tr[key] = self._serialize(event)
# Update latest pointer
latest_key = self.dirs['audit_latest'][str(corpus_id)]
tr[latest_key] = str(sequence).encode()
return event
async def verify_chain_integrity(
self,
corpus_id: UUID,
start_sequence: int = 0,
end_sequence: Optional[int] = None
) -> ChainVerificationResult:
"""Verify audit chain integrity"""
events = await self._get_events_range(corpus_id, start_sequence, end_sequence)
if not events:
return ChainVerificationResult(valid=True, events_checked=0)
previous = None
for event in events:
# Verify hash
computed_hash = event.compute_hash()
if computed_hash != event.event_hash:
return ChainVerificationResult(
valid=False,
error=f"Hash mismatch at sequence {event.sequence_number}",
invalid_event=event
)
# Verify chain
if not event.verify_chain(previous):
return ChainVerificationResult(
valid=False,
error=f"Chain break at sequence {event.sequence_number}",
invalid_event=event
)
previous = event
return ChainVerificationResult(
valid=True,
events_checked=len(events),
first_sequence=events[0].sequence_number,
last_sequence=events[-1].sequence_number
)
async def export_audit_trail(
self,
corpus_id: UUID,
format: Literal["json", "csv", "pdf"]
) -> bytes:
"""Export audit trail in specified format"""
events = await self._get_all_events(corpus_id)
if format == "json":
return json.dumps([self._event_to_dict(e) for e in events], indent=2).encode()
elif format == "csv":
return self._events_to_csv(events)
elif format == "pdf":
return await self._events_to_pdf(events)
4. Electronic Signature Service
@dataclass
class SignatureMeaning(Enum):
"""21 CFR Part 11 signature meanings"""
CREATED = "created"
REVIEWED = "reviewed"
APPROVED = "approved"
VERIFIED = "verified"
AUTHORIZED = "authorized"
RESPONSIBLE = "responsible"
@dataclass
class ElectronicSignature:
"""21 CFR Part 11 compliant electronic signature"""
signature_id: UUID
# Signer identity
signer_id: str
signer_name: str
signer_title: str
# Signature components (21 CFR Part 11.200)
# Must include at least two components
component_1: str # User ID
component_2_hash: str # Password hash (never stored plain)
# Meaning
meaning: SignatureMeaning
meaning_text: str # Full text of what signature means
# Timestamp
timestamp: datetime
timestamp_authority: str
# Cryptographic signature
signature_algorithm: str # e.g., "RSA-SHA256"
public_key_id: str # Reference to PKI certificate
signature_value: str # Base64 encoded signature
# What was signed
signed_content_hash: str
signed_content_type: str
signed_content_id: UUID
class SignatureService:
"""Electronic signature service (21 CFR Part 11 compliant)"""
def __init__(
self,
key_store: KeyStore,
audit_recorder: AuditRecorder
):
self.key_store = key_store
self.audit_recorder = audit_recorder
async def sign(
self,
content: bytes,
content_type: str,
content_id: UUID,
signer: UserContext,
meaning: SignatureMeaning,
meaning_text: str,
password: str # For 2-component verification
) -> ElectronicSignature:
"""Create electronic signature"""
# Verify signer identity (2 components per 21 CFR Part 11.200)
if not await self._verify_password(signer.user_id, password):
raise SignatureError("Authentication failed")
# Get signer's private key
private_key = await self.key_store.get_private_key(signer.user_id)
# Compute content hash
content_hash = hashlib.sha256(content).hexdigest()
# Create signature payload
payload = json.dumps({
'content_hash': content_hash,
'signer_id': signer.user_id,
'meaning': meaning.value,
'timestamp': datetime.utcnow().isoformat()
}, sort_keys=True)
# Sign
signature_value = private_key.sign(
payload.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
# Create signature record
signature = ElectronicSignature(
signature_id=uuid4(),
signer_id=signer.user_id,
signer_name=signer.full_name,
signer_title=signer.title,
component_1=signer.user_id,
component_2_hash=hashlib.sha256(password.encode()).hexdigest(),
meaning=meaning,
meaning_text=meaning_text,
timestamp=datetime.utcnow(),
timestamp_authority="internal",
signature_algorithm="RSA-SHA256",
public_key_id=private_key.public_key_id,
signature_value=base64.b64encode(signature_value).decode(),
signed_content_hash=content_hash,
signed_content_type=content_type,
signed_content_id=content_id
)
# Audit
await self.audit_recorder.record(
corpus_id=content_id, # Assuming content is corpus
action="SIGNATURE_CREATED",
operator=signer,
resource_type="signature",
resource_id=signature.signature_id,
details={
'meaning': meaning.value,
'content_hash': content_hash
}
)
return signature
async def verify(
self,
signature: ElectronicSignature,
content: bytes
) -> VerificationResult:
"""Verify electronic signature"""
# Verify content hash
content_hash = hashlib.sha256(content).hexdigest()
if content_hash != signature.signed_content_hash:
return VerificationResult(
valid=False,
error="Content hash mismatch"
)
# Get public key
public_key = await self.key_store.get_public_key(signature.public_key_id)
# Verify signature
payload = json.dumps({
'content_hash': signature.signed_content_hash,
'signer_id': signature.signer_id,
'meaning': signature.meaning.value,
'timestamp': signature.timestamp.isoformat()
}, sort_keys=True)
try:
public_key.verify(
base64.b64decode(signature.signature_value),
payload.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
except InvalidSignature:
return VerificationResult(
valid=False,
error="Cryptographic signature invalid"
)
return VerificationResult(
valid=True,
signer_id=signature.signer_id,
signer_name=signature.signer_name,
meaning=signature.meaning,
timestamp=signature.timestamp
)
Commands
compliance_commands:
- command: "@compliance:audit"
description: View audit trail for corpus
parameters:
- corpus_id: string (required)
- start_date: datetime (optional)
- end_date: datetime (optional)
- actions: List[string] (optional, filter by action)
- format: enum[json, table, pdf] (default: table)
- command: "@compliance:verify"
description: Verify audit chain integrity
parameters:
- corpus_id: string (required)
returns:
- valid: bool
- events_checked: int
- errors: List[string]
- command: "@compliance:sign"
description: Sign corpus export
parameters:
- corpus_id: string (required)
- meaning: enum[approved, reviewed, authorized]
- meaning_text: string (optional)
- command: "@compliance:export"
description: Export corpus with audit trail
parameters:
- corpus_id: string (required)
- format: enum[json, pdf]
- include_audit: bool (default: true)
- include_signatures: bool (default: true)
Consequences
Positive
- Full regulatory compliance: 21 CFR Part 11, HIPAA, SOC2
- Tamper-evident audit: Cryptographic chain integrity
- PHI protection: Automatic detection and redaction
- Non-repudiation: Electronic signatures with legal validity
- Access control: Fine-grained RBAC + ABAC
Negative
- Performance overhead: 10-20% additional latency for audit
- Storage growth: Audit trail grows with operations
- Key management: PKI infrastructure required
- Training requirement: Users must understand signature meanings
Compliance Mapping
| Requirement | Component | Implementation |
|---|---|---|
| 21 CFR Part 11.10(a) - Validation | All | Test documentation, IQ/OQ/PQ |
| 21 CFR Part 11.10(b) - Legible copies | Export | PDF with signatures |
| 21 CFR Part 11.10(c) - Protection | Access Control | RBAC + encryption |
| 21 CFR Part 11.10(d) - Access limits | Access Control | ABAC conditions |
| 21 CFR Part 11.10(e) - Audit trails | Audit Recorder | Immutable chain |
| 21 CFR Part 11.10(g) - Authority checks | Access Control | Role verification |
| 21 CFR Part 11.50 - Signature manifestation | Signature Service | Meaning + timestamp |
| 21 CFR Part 11.100 - Uniqueness | Signature Service | User ID + password |
| HIPAA 164.312(b) - Audit controls | Audit Recorder | All access logged |
| HIPAA 164.312(c) - Integrity | Audit Recorder | Hash chain |
| HIPAA 164.312(e) - Transmission security | All | TLS + encryption |
References
- ADR-027: Corpus Processing Subsystem Architecture
- ADR-022: Compliance Framework (21 CFR Part 11)
- 21 CFR Part 11 Full Text
- HIPAA Security Rule
- SOC2 Trust Services Criteria
Approval
| Role | Name | Date | Decision |
|---|---|---|---|
| CTO | Hal Casteel | ||
| Security Lead | |||
| Compliance Officer |