Skip to main content

ADR-030: Compliance-Aware RAG System

Status

PROPOSED

Date

2026-01-15

Context

Coditect targets regulated industries (healthcare, fintech, life sciences) where document retrieval and AI-generated responses require:

  1. 21 CFR Part 11: Electronic records with audit trails, electronic signatures
  2. HIPAA: Protected health information access logging
  3. SOC2: Security controls and access management
  4. GDPR: Data subject access and deletion rights

Standard RAG implementations lack these compliance controls. Every query, retrieval, and generation must be auditable, attributable, and access-controlled.

Regulatory Requirements Summary

RegulationKey RequirementRAG Impact
21 CFR Part 11Audit trails for all record accessLog every retrieval
21 CFR Part 11Electronic signaturesSign generated reports
HIPAAAccess logging for PHITrack who accessed what
HIPAAMinimum necessary standardFilter results by need
SOC2Access controlsRole-based retrieval
GDPRRight to be forgottenSelective deletion from index

Decision

Implement a Compliance-Aware RAG System with built-in audit, access control, and regulatory controls.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│ COMPLIANCE-AWARE RAG SYSTEM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 1: ACCESS CONTROL │ │
│ │ • User authentication (SSO/SAML) │ │
│ │ • Role-based permissions (RBAC) │ │
│ │ • Document classification levels │ │
│ │ • Query authorization │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 2: QUERY PROCESSING │ │
│ │ • Intent classification │ │
│ │ • Query expansion │ │
│ │ • PII detection and handling │ │
│ │ • Scope restriction based on permissions │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 3: RETRIEVAL WITH AUDIT │ │
│ │ • Hybrid search (BM25 + Vector + Graph) │ │
│ │ • Access-filtered results │ │
│ │ • Retrieval event logging │ │
│ │ • Source document tracking │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 4: GENERATION WITH CITATIONS │ │
│ │ • Citation-required generation │ │
│ │ • Hallucination detection │ │
│ │ • Confidence scoring │ │
│ │ • Response signing (optional) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 5: AUDIT PERSISTENCE │ │
│ │ • Immutable audit log (FoundationDB) │ │
│ │ • Electronic signatures (ECDSA) │ │
│ │ • Retention policy enforcement │ │
│ │ • Audit report generation │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘

Audit Event Schema

@dataclass
class RAGAuditEvent:
"""
Immutable audit record for RAG operations.
Compliant with 21 CFR Part 11 electronic records requirements.
"""
# Event identification
event_id: str # UUID
event_type: str # query|retrieval|generation|export
timestamp: datetime

# User context (21 CFR Part 11 §11.10(e))
user_id: str
user_role: str
session_id: str
client_ip: str

# Query context
query_text: str
query_hash: str # SHA-256 for deduplication
intent_classification: str

# Retrieval details
documents_accessed: List[str] # Document IDs
chunks_retrieved: List[str] # Chunk IDs
retrieval_scores: Dict[str, float]
access_filter_applied: Dict[str, Any]

# Generation details
response_text: str
response_hash: str
citations: List[Dict[str, Any]]
confidence_score: float
hallucination_flags: List[str]

# Compliance metadata
data_classification: str # public|internal|confidential|restricted
pii_detected: bool
pii_types: List[str]
retention_category: str

# Electronic signature (21 CFR Part 11 §11.50)
signature: Optional[str] = None
signature_timestamp: Optional[datetime] = None
signature_meaning: Optional[str] = None # "Reviewed"|"Approved"|"Created"
signer_id: Optional[str] = None

def sign(self, signer: 'Signer', meaning: str) -> None:
"""Apply electronic signature to audit event"""
self.signer_id = signer.user_id
self.signature_meaning = meaning
self.signature_timestamp = datetime.utcnow()

# Create signature over event content
content_hash = self._compute_content_hash()
self.signature = signer.sign(content_hash)

def verify_signature(self, signer: 'Signer') -> bool:
"""Verify electronic signature"""
content_hash = self._compute_content_hash()
return signer.verify(content_hash, self.signature)

def _compute_content_hash(self) -> str:
"""Compute hash of auditable content"""
content = f"{self.event_id}|{self.timestamp}|{self.user_id}|{self.query_hash}|{self.response_hash}"
return hashlib.sha256(content.encode()).hexdigest()

Implementation

# /coditect/rag/compliance_rag.py

from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum

class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
PHI = "phi" # Protected Health Information

class ComplianceRAG:
"""
RAG system with built-in compliance controls for regulated industries.

Implements:
- 21 CFR Part 11: Audit trails, electronic signatures
- HIPAA: PHI access logging, minimum necessary
- SOC2: Access controls, security logging
"""

def __init__(
self,
knowledge_store: 'HierarchicalKnowledgeStore',
vector_store: 'VectorStore',
fdb_client: 'FoundationDBClient',
config: 'ComplianceRAGConfig'
):
self.knowledge_store = knowledge_store
self.vector_store = vector_store
self.fdb = fdb_client
self.config = config
self.audit_logger = AuditLogger(fdb_client)
self.access_controller = AccessController(config.rbac_config)
self.pii_detector = PIIDetector()

async def query(
self,
query: str,
user_context: 'UserContext',
options: Optional['QueryOptions'] = None
) -> 'ComplianceRAGResponse':
"""
Execute RAG query with full compliance controls.

Flow:
1. Authenticate and authorize user
2. Process and classify query
3. Retrieve with access filtering
4. Generate with mandatory citations
5. Log audit event
"""
options = options or QueryOptions()

# Start audit event
audit_event = RAGAuditEvent(
event_id=str(uuid4()),
event_type="query",
timestamp=datetime.utcnow(),
user_id=user_context.user_id,
user_role=user_context.role,
session_id=user_context.session_id,
client_ip=user_context.client_ip,
query_text=query,
query_hash=hashlib.sha256(query.encode()).hexdigest()
)

try:
# Layer 1: Access Control
await self._authorize_query(user_context, query)

# Layer 2: Query Processing
processed_query = await self._process_query(
query,
user_context,
audit_event
)

# Layer 3: Retrieval with Audit
retrieval_result = await self._retrieve_with_audit(
processed_query,
user_context,
audit_event
)

# Layer 4: Generation with Citations
response = await self._generate_with_citations(
query,
retrieval_result,
audit_event
)

# Layer 5: Persist Audit
await self._persist_audit(audit_event)

return ComplianceRAGResponse(
answer=response.text,
citations=response.citations,
confidence=response.confidence,
audit_id=audit_event.event_id,
data_classification=audit_event.data_classification
)

except Exception as e:
audit_event.error = str(e)
await self._persist_audit(audit_event)
raise

# ==================== Layer 1: Access Control ====================

async def _authorize_query(
self,
user_context: 'UserContext',
query: str
) -> None:
"""Verify user has permission to execute this query"""

# Check basic query permission
if not self.access_controller.can_query(user_context):
raise AuthorizationError(
f"User {user_context.user_id} not authorized to query"
)

# Check for restricted topic access
restricted_topics = self.access_controller.get_restricted_topics(
user_context.role
)

for topic in restricted_topics:
if topic.lower() in query.lower():
raise AuthorizationError(
f"User role {user_context.role} cannot query topic: {topic}"
)

# ==================== Layer 2: Query Processing ====================

async def _process_query(
self,
query: str,
user_context: 'UserContext',
audit_event: 'RAGAuditEvent'
) -> 'ProcessedQuery':
"""Process query with PII detection and scope restriction"""

# Detect PII in query
pii_results = await self.pii_detector.detect(query)
audit_event.pii_detected = pii_results.has_pii
audit_event.pii_types = pii_results.types

if pii_results.has_pii and not user_context.can_access_pii:
raise AuthorizationError(
"Query contains PII but user lacks PII access permission"
)

# Classify intent
intent = await self._classify_intent(query)
audit_event.intent_classification = intent

# Apply scope restrictions based on role
scope_filter = self.access_controller.get_scope_filter(user_context)

# Expand query for better retrieval
expanded = await self._expand_query(query)

return ProcessedQuery(
original=query,
expanded=expanded,
intent=intent,
scope_filter=scope_filter,
pii_types=pii_results.types
)

# ==================== Layer 3: Retrieval with Audit ====================

async def _retrieve_with_audit(
self,
query: 'ProcessedQuery',
user_context: 'UserContext',
audit_event: 'RAGAuditEvent'
) -> 'RetrievalResult':
"""Hybrid retrieval with access filtering and logging"""

# Build access filter
access_filter = self._build_access_filter(user_context)
audit_event.access_filter_applied = access_filter.to_dict()

# Hybrid retrieval: BM25 + Vector + Graph
results = await asyncio.gather(
self._bm25_retrieve(query, access_filter),
self._vector_retrieve(query, access_filter),
self._graph_retrieve(query, access_filter)
)

# Merge and re-rank
merged = self._merge_results(results)
reranked = await self._rerank(query.original, merged)

# Log accessed documents
audit_event.documents_accessed = [r.document_id for r in reranked]
audit_event.chunks_retrieved = [r.chunk_id for r in reranked]
audit_event.retrieval_scores = {
r.chunk_id: r.score for r in reranked
}

# Determine data classification of results
classifications = [r.classification for r in reranked]
audit_event.data_classification = max(
classifications,
key=lambda c: DataClassification[c].value
)

return RetrievalResult(
chunks=reranked,
total_retrieved=len(reranked),
classification=audit_event.data_classification
)

def _build_access_filter(
self,
user_context: 'UserContext'
) -> 'AccessFilter':
"""Build retrieval filter based on user permissions"""

# Get user's allowed classification levels
allowed_levels = self.access_controller.get_allowed_classifications(
user_context
)

# Get user's department/project scope
allowed_scopes = self.access_controller.get_allowed_scopes(
user_context
)

return AccessFilter(
classification_levels=allowed_levels,
department_scopes=allowed_scopes,
user_id=user_context.user_id,
include_shared=True
)

async def _vector_retrieve(
self,
query: 'ProcessedQuery',
access_filter: 'AccessFilter'
) -> List['RetrievalChunk']:
"""Vector similarity search with access filtering"""

# Generate query embedding
query_embedding = await self.embedder.embed(query.expanded)

# Search with metadata filter
results = await self.vector_store.search(
embedding=query_embedding,
top_k=self.config.retrieval_top_k,
filter={
"classification": {"$in": access_filter.classification_levels},
"scope": {"$in": access_filter.department_scopes}
}
)

return [
RetrievalChunk(
chunk_id=r.id,
document_id=r.metadata["document_id"],
content=r.content,
score=r.score,
classification=r.metadata["classification"],
source="vector"
)
for r in results
]

# ==================== Layer 4: Generation with Citations ====================

async def _generate_with_citations(
self,
query: str,
retrieval_result: 'RetrievalResult',
audit_event: 'RAGAuditEvent'
) -> 'GenerationResult':
"""Generate response with mandatory citations and hallucination check"""

# Build context from retrieved chunks
context = self._build_context(retrieval_result.chunks)

# Generate with citation requirement
generation_prompt = f"""
You are answering a query using ONLY the provided context.
Every factual claim MUST include a citation to the source chunk.

CONTEXT:
{context}

QUERY: {query}

REQUIREMENTS:
1. Only use information from the provided context
2. Cite sources using [chunk_id] format
3. If information is not in context, say "Not found in available documents"
4. Never make claims without citation

RESPONSE:
"""

agent = await self.agent_pool.acquire(AgentRole.GENERATOR)
try:
response = await agent.execute(
prompt=generation_prompt,
tools=["cite_source"]
)
finally:
await self.agent_pool.release(agent)

# Parse citations
citations = self._parse_citations(response.text, retrieval_result.chunks)

# Hallucination detection
hallucination_check = await self._check_hallucinations(
response.text,
citations,
retrieval_result.chunks
)

audit_event.response_text = response.text
audit_event.response_hash = hashlib.sha256(response.text.encode()).hexdigest()
audit_event.citations = [c.to_dict() for c in citations]
audit_event.confidence_score = hallucination_check.confidence
audit_event.hallucination_flags = hallucination_check.flags

return GenerationResult(
text=response.text,
citations=citations,
confidence=hallucination_check.confidence,
hallucination_flags=hallucination_check.flags
)

async def _check_hallucinations(
self,
response: str,
citations: List['Citation'],
chunks: List['RetrievalChunk']
) -> 'HallucinationCheck':
"""Verify all claims are supported by cited sources"""

# Extract claims from response
claims = self._extract_claims(response)

flags = []
supported_count = 0

for claim in claims:
# Find citation for this claim
citation = self._find_citation_for_claim(claim, citations)

if not citation:
flags.append(f"Uncited claim: {claim[:50]}...")
continue

# Verify claim is supported by cited chunk
chunk = next(
(c for c in chunks if c.chunk_id == citation.chunk_id),
None
)

if not chunk:
flags.append(f"Citation references missing chunk: {citation.chunk_id}")
continue

# Semantic similarity check
support_score = await self._compute_support_score(claim, chunk.content)

if support_score < 0.7:
flags.append(f"Weak support ({support_score:.2f}): {claim[:50]}...")
else:
supported_count += 1

confidence = supported_count / len(claims) if claims else 1.0

return HallucinationCheck(
confidence=confidence,
flags=flags,
total_claims=len(claims),
supported_claims=supported_count
)

# ==================== Layer 5: Audit Persistence ====================

async def _persist_audit(self, audit_event: 'RAGAuditEvent') -> None:
"""Persist audit event to immutable log"""

# Determine retention category
audit_event.retention_category = self._determine_retention(audit_event)

# Store in FoundationDB with immutability guarantee
key = (
'audit',
'rag',
audit_event.timestamp.strftime('%Y%m%d'),
audit_event.event_id
)

# Compute integrity hash
integrity_hash = audit_event._compute_content_hash()

record = {
**audit_event.__dict__,
"integrity_hash": integrity_hash,
"stored_at": datetime.utcnow().isoformat()
}

await self.fdb.set(key, record)

# Index by user for access reporting
user_idx_key = ('audit', 'idx', 'user', audit_event.user_id, audit_event.event_id)
await self.fdb.set(user_idx_key, {"event_id": audit_event.event_id})

# Index by document for document access reporting
for doc_id in audit_event.documents_accessed:
doc_idx_key = ('audit', 'idx', 'document', doc_id, audit_event.event_id)
await self.fdb.set(doc_idx_key, {"event_id": audit_event.event_id})

# ==================== Audit Reporting ====================

async def generate_audit_report(
self,
start_date: datetime,
end_date: datetime,
report_type: str = "access_summary"
) -> 'AuditReport':
"""Generate compliance audit report"""

events = await self._query_audit_events(start_date, end_date)

if report_type == "access_summary":
return self._generate_access_summary(events)
elif report_type == "document_access":
return self._generate_document_access_report(events)
elif report_type == "user_activity":
return self._generate_user_activity_report(events)
elif report_type == "phi_access":
return self._generate_phi_access_report(events)
else:
raise ValueError(f"Unknown report type: {report_type}")

async def export_audit_trail(
self,
event_ids: List[str],
format: str = "json",
signer: Optional['Signer'] = None
) -> bytes:
"""Export audit trail with optional electronic signature"""

events = await self._load_audit_events(event_ids)

export_data = {
"export_id": str(uuid4()),
"exported_at": datetime.utcnow().isoformat(),
"event_count": len(events),
"events": [e.__dict__ for e in events]
}

if signer:
# Sign the export (21 CFR Part 11)
export_hash = hashlib.sha256(
json.dumps(export_data, sort_keys=True).encode()
).hexdigest()

export_data["signature"] = signer.sign(export_hash)
export_data["signer_id"] = signer.user_id
export_data["signature_timestamp"] = datetime.utcnow().isoformat()

if format == "json":
return json.dumps(export_data, indent=2).encode()
elif format == "csv":
return self._events_to_csv(events)
else:
raise ValueError(f"Unknown export format: {format}")

CLI Commands

# Query with compliance
coditect rag query \
--query "What are the adverse events for drug X?" \
--corpus-id clinical_corpus \
--user-role researcher \
--output-format json

# Generate audit report
coditect audit report \
--type phi_access \
--start-date 2026-01-01 \
--end-date 2026-01-15 \
--output ./reports/phi_audit.pdf \
--sign

# Export audit trail
coditect audit export \
--event-ids event1,event2,event3 \
--format json \
--sign \
--output ./exports/audit_trail.json

# Validate audit integrity
coditect audit validate \
--start-date 2026-01-01 \
--check-signatures \
--check-integrity

Consequences

Positive

  • Regulatory compliance: Built-in 21 CFR Part 11, HIPAA, SOC2 controls
  • Auditability: Complete record of all RAG operations
  • Access control: Role-based filtering prevents unauthorized access
  • Quality assurance: Hallucination detection improves response reliability

Negative

  • Performance overhead: Audit logging adds latency (~10-20ms)
  • Storage growth: Audit logs grow with usage
  • Complexity: More configuration for RBAC and classification
  • User friction: May restrict some queries based on role

Metrics

MetricTargetMeasurement
Audit log completeness100%Events logged / operations
Citation accuracy>95%Valid citations / total citations
Hallucination rate<5%Unsupported claims / total claims
Access violation rate0%Unauthorized access attempts
  • ADR-027: Hybrid Document Processing Architecture (parent)
  • ADR-029: Hierarchical Knowledge Store (data source)
  • ADR-015: Compliance Audit Framework (audit infrastructure)