ADR-030: Compliance-Aware RAG System
Status
PROPOSED
Date
2026-01-15
Context
Coditect targets regulated industries (healthcare, fintech, life sciences) where document retrieval and AI-generated responses require:
- 21 CFR Part 11: Electronic records with audit trails, electronic signatures
- HIPAA: Protected health information access logging
- SOC2: Security controls and access management
- GDPR: Data subject access and deletion rights
Standard RAG implementations lack these compliance controls. Every query, retrieval, and generation must be auditable, attributable, and access-controlled.
Regulatory Requirements Summary
| Regulation | Key Requirement | RAG Impact |
|---|---|---|
| 21 CFR Part 11 | Audit trails for all record access | Log every retrieval |
| 21 CFR Part 11 | Electronic signatures | Sign generated reports |
| HIPAA | Access logging for PHI | Track who accessed what |
| HIPAA | Minimum necessary standard | Filter results by need |
| SOC2 | Access controls | Role-based retrieval |
| GDPR | Right to be forgotten | Selective deletion from index |
Decision
Implement a Compliance-Aware RAG System with built-in audit, access control, and regulatory controls.
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ COMPLIANCE-AWARE RAG SYSTEM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 1: ACCESS CONTROL │ │
│ │ • User authentication (SSO/SAML) │ │
│ │ • Role-based permissions (RBAC) │ │
│ │ • Document classification levels │ │
│ │ • Query authorization │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 2: QUERY PROCESSING │ │
│ │ • Intent classification │ │
│ │ • Query expansion │ │
│ │ • PII detection and handling │ │
│ │ • Scope restriction based on permissions │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 3: RETRIEVAL WITH AUDIT │ │
│ │ • Hybrid search (BM25 + Vector + Graph) │ │
│ │ • Access-filtered results │ │
│ │ • Retrieval event logging │ │
│ │ • Source document tracking │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 4: GENERATION WITH CITATIONS │ │
│ │ • Citation-required generation │ │
│ │ • Hallucination detection │ │
│ │ • Confidence scoring │ │
│ │ • Response signing (optional) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ LAYER 5: AUDIT PERSISTENCE │ │
│ │ • Immutable audit log (FoundationDB) │ │
│ │ • Electronic signatures (ECDSA) │ │
│ │ • Retention policy enforcement │ │
│ │ • Audit report generation │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Audit Event Schema
@dataclass
class RAGAuditEvent:
"""
Immutable audit record for RAG operations.
Compliant with 21 CFR Part 11 electronic records requirements.
"""
# Event identification
event_id: str # UUID
event_type: str # query|retrieval|generation|export
timestamp: datetime
# User context (21 CFR Part 11 §11.10(e))
user_id: str
user_role: str
session_id: str
client_ip: str
# Query context
query_text: str
query_hash: str # SHA-256 for deduplication
intent_classification: str
# Retrieval details
documents_accessed: List[str] # Document IDs
chunks_retrieved: List[str] # Chunk IDs
retrieval_scores: Dict[str, float]
access_filter_applied: Dict[str, Any]
# Generation details
response_text: str
response_hash: str
citations: List[Dict[str, Any]]
confidence_score: float
hallucination_flags: List[str]
# Compliance metadata
data_classification: str # public|internal|confidential|restricted
pii_detected: bool
pii_types: List[str]
retention_category: str
# Electronic signature (21 CFR Part 11 §11.50)
signature: Optional[str] = None
signature_timestamp: Optional[datetime] = None
signature_meaning: Optional[str] = None # "Reviewed"|"Approved"|"Created"
signer_id: Optional[str] = None
def sign(self, signer: 'Signer', meaning: str) -> None:
"""Apply electronic signature to audit event"""
self.signer_id = signer.user_id
self.signature_meaning = meaning
self.signature_timestamp = datetime.utcnow()
# Create signature over event content
content_hash = self._compute_content_hash()
self.signature = signer.sign(content_hash)
def verify_signature(self, signer: 'Signer') -> bool:
"""Verify electronic signature"""
content_hash = self._compute_content_hash()
return signer.verify(content_hash, self.signature)
def _compute_content_hash(self) -> str:
"""Compute hash of auditable content"""
content = f"{self.event_id}|{self.timestamp}|{self.user_id}|{self.query_hash}|{self.response_hash}"
return hashlib.sha256(content.encode()).hexdigest()
Implementation
# /coditect/rag/compliance_rag.py
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from enum import Enum
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
PHI = "phi" # Protected Health Information
class ComplianceRAG:
"""
RAG system with built-in compliance controls for regulated industries.
Implements:
- 21 CFR Part 11: Audit trails, electronic signatures
- HIPAA: PHI access logging, minimum necessary
- SOC2: Access controls, security logging
"""
def __init__(
self,
knowledge_store: 'HierarchicalKnowledgeStore',
vector_store: 'VectorStore',
fdb_client: 'FoundationDBClient',
config: 'ComplianceRAGConfig'
):
self.knowledge_store = knowledge_store
self.vector_store = vector_store
self.fdb = fdb_client
self.config = config
self.audit_logger = AuditLogger(fdb_client)
self.access_controller = AccessController(config.rbac_config)
self.pii_detector = PIIDetector()
async def query(
self,
query: str,
user_context: 'UserContext',
options: Optional['QueryOptions'] = None
) -> 'ComplianceRAGResponse':
"""
Execute RAG query with full compliance controls.
Flow:
1. Authenticate and authorize user
2. Process and classify query
3. Retrieve with access filtering
4. Generate with mandatory citations
5. Log audit event
"""
options = options or QueryOptions()
# Start audit event
audit_event = RAGAuditEvent(
event_id=str(uuid4()),
event_type="query",
timestamp=datetime.utcnow(),
user_id=user_context.user_id,
user_role=user_context.role,
session_id=user_context.session_id,
client_ip=user_context.client_ip,
query_text=query,
query_hash=hashlib.sha256(query.encode()).hexdigest()
)
try:
# Layer 1: Access Control
await self._authorize_query(user_context, query)
# Layer 2: Query Processing
processed_query = await self._process_query(
query,
user_context,
audit_event
)
# Layer 3: Retrieval with Audit
retrieval_result = await self._retrieve_with_audit(
processed_query,
user_context,
audit_event
)
# Layer 4: Generation with Citations
response = await self._generate_with_citations(
query,
retrieval_result,
audit_event
)
# Layer 5: Persist Audit
await self._persist_audit(audit_event)
return ComplianceRAGResponse(
answer=response.text,
citations=response.citations,
confidence=response.confidence,
audit_id=audit_event.event_id,
data_classification=audit_event.data_classification
)
except Exception as e:
audit_event.error = str(e)
await self._persist_audit(audit_event)
raise
# ==================== Layer 1: Access Control ====================
async def _authorize_query(
self,
user_context: 'UserContext',
query: str
) -> None:
"""Verify user has permission to execute this query"""
# Check basic query permission
if not self.access_controller.can_query(user_context):
raise AuthorizationError(
f"User {user_context.user_id} not authorized to query"
)
# Check for restricted topic access
restricted_topics = self.access_controller.get_restricted_topics(
user_context.role
)
for topic in restricted_topics:
if topic.lower() in query.lower():
raise AuthorizationError(
f"User role {user_context.role} cannot query topic: {topic}"
)
# ==================== Layer 2: Query Processing ====================
async def _process_query(
self,
query: str,
user_context: 'UserContext',
audit_event: 'RAGAuditEvent'
) -> 'ProcessedQuery':
"""Process query with PII detection and scope restriction"""
# Detect PII in query
pii_results = await self.pii_detector.detect(query)
audit_event.pii_detected = pii_results.has_pii
audit_event.pii_types = pii_results.types
if pii_results.has_pii and not user_context.can_access_pii:
raise AuthorizationError(
"Query contains PII but user lacks PII access permission"
)
# Classify intent
intent = await self._classify_intent(query)
audit_event.intent_classification = intent
# Apply scope restrictions based on role
scope_filter = self.access_controller.get_scope_filter(user_context)
# Expand query for better retrieval
expanded = await self._expand_query(query)
return ProcessedQuery(
original=query,
expanded=expanded,
intent=intent,
scope_filter=scope_filter,
pii_types=pii_results.types
)
# ==================== Layer 3: Retrieval with Audit ====================
async def _retrieve_with_audit(
self,
query: 'ProcessedQuery',
user_context: 'UserContext',
audit_event: 'RAGAuditEvent'
) -> 'RetrievalResult':
"""Hybrid retrieval with access filtering and logging"""
# Build access filter
access_filter = self._build_access_filter(user_context)
audit_event.access_filter_applied = access_filter.to_dict()
# Hybrid retrieval: BM25 + Vector + Graph
results = await asyncio.gather(
self._bm25_retrieve(query, access_filter),
self._vector_retrieve(query, access_filter),
self._graph_retrieve(query, access_filter)
)
# Merge and re-rank
merged = self._merge_results(results)
reranked = await self._rerank(query.original, merged)
# Log accessed documents
audit_event.documents_accessed = [r.document_id for r in reranked]
audit_event.chunks_retrieved = [r.chunk_id for r in reranked]
audit_event.retrieval_scores = {
r.chunk_id: r.score for r in reranked
}
# Determine data classification of results
classifications = [r.classification for r in reranked]
audit_event.data_classification = max(
classifications,
key=lambda c: DataClassification[c].value
)
return RetrievalResult(
chunks=reranked,
total_retrieved=len(reranked),
classification=audit_event.data_classification
)
def _build_access_filter(
self,
user_context: 'UserContext'
) -> 'AccessFilter':
"""Build retrieval filter based on user permissions"""
# Get user's allowed classification levels
allowed_levels = self.access_controller.get_allowed_classifications(
user_context
)
# Get user's department/project scope
allowed_scopes = self.access_controller.get_allowed_scopes(
user_context
)
return AccessFilter(
classification_levels=allowed_levels,
department_scopes=allowed_scopes,
user_id=user_context.user_id,
include_shared=True
)
async def _vector_retrieve(
self,
query: 'ProcessedQuery',
access_filter: 'AccessFilter'
) -> List['RetrievalChunk']:
"""Vector similarity search with access filtering"""
# Generate query embedding
query_embedding = await self.embedder.embed(query.expanded)
# Search with metadata filter
results = await self.vector_store.search(
embedding=query_embedding,
top_k=self.config.retrieval_top_k,
filter={
"classification": {"$in": access_filter.classification_levels},
"scope": {"$in": access_filter.department_scopes}
}
)
return [
RetrievalChunk(
chunk_id=r.id,
document_id=r.metadata["document_id"],
content=r.content,
score=r.score,
classification=r.metadata["classification"],
source="vector"
)
for r in results
]
# ==================== Layer 4: Generation with Citations ====================
async def _generate_with_citations(
self,
query: str,
retrieval_result: 'RetrievalResult',
audit_event: 'RAGAuditEvent'
) -> 'GenerationResult':
"""Generate response with mandatory citations and hallucination check"""
# Build context from retrieved chunks
context = self._build_context(retrieval_result.chunks)
# Generate with citation requirement
generation_prompt = f"""
You are answering a query using ONLY the provided context.
Every factual claim MUST include a citation to the source chunk.
CONTEXT:
{context}
QUERY: {query}
REQUIREMENTS:
1. Only use information from the provided context
2. Cite sources using [chunk_id] format
3. If information is not in context, say "Not found in available documents"
4. Never make claims without citation
RESPONSE:
"""
agent = await self.agent_pool.acquire(AgentRole.GENERATOR)
try:
response = await agent.execute(
prompt=generation_prompt,
tools=["cite_source"]
)
finally:
await self.agent_pool.release(agent)
# Parse citations
citations = self._parse_citations(response.text, retrieval_result.chunks)
# Hallucination detection
hallucination_check = await self._check_hallucinations(
response.text,
citations,
retrieval_result.chunks
)
audit_event.response_text = response.text
audit_event.response_hash = hashlib.sha256(response.text.encode()).hexdigest()
audit_event.citations = [c.to_dict() for c in citations]
audit_event.confidence_score = hallucination_check.confidence
audit_event.hallucination_flags = hallucination_check.flags
return GenerationResult(
text=response.text,
citations=citations,
confidence=hallucination_check.confidence,
hallucination_flags=hallucination_check.flags
)
async def _check_hallucinations(
self,
response: str,
citations: List['Citation'],
chunks: List['RetrievalChunk']
) -> 'HallucinationCheck':
"""Verify all claims are supported by cited sources"""
# Extract claims from response
claims = self._extract_claims(response)
flags = []
supported_count = 0
for claim in claims:
# Find citation for this claim
citation = self._find_citation_for_claim(claim, citations)
if not citation:
flags.append(f"Uncited claim: {claim[:50]}...")
continue
# Verify claim is supported by cited chunk
chunk = next(
(c for c in chunks if c.chunk_id == citation.chunk_id),
None
)
if not chunk:
flags.append(f"Citation references missing chunk: {citation.chunk_id}")
continue
# Semantic similarity check
support_score = await self._compute_support_score(claim, chunk.content)
if support_score < 0.7:
flags.append(f"Weak support ({support_score:.2f}): {claim[:50]}...")
else:
supported_count += 1
confidence = supported_count / len(claims) if claims else 1.0
return HallucinationCheck(
confidence=confidence,
flags=flags,
total_claims=len(claims),
supported_claims=supported_count
)
# ==================== Layer 5: Audit Persistence ====================
async def _persist_audit(self, audit_event: 'RAGAuditEvent') -> None:
"""Persist audit event to immutable log"""
# Determine retention category
audit_event.retention_category = self._determine_retention(audit_event)
# Store in FoundationDB with immutability guarantee
key = (
'audit',
'rag',
audit_event.timestamp.strftime('%Y%m%d'),
audit_event.event_id
)
# Compute integrity hash
integrity_hash = audit_event._compute_content_hash()
record = {
**audit_event.__dict__,
"integrity_hash": integrity_hash,
"stored_at": datetime.utcnow().isoformat()
}
await self.fdb.set(key, record)
# Index by user for access reporting
user_idx_key = ('audit', 'idx', 'user', audit_event.user_id, audit_event.event_id)
await self.fdb.set(user_idx_key, {"event_id": audit_event.event_id})
# Index by document for document access reporting
for doc_id in audit_event.documents_accessed:
doc_idx_key = ('audit', 'idx', 'document', doc_id, audit_event.event_id)
await self.fdb.set(doc_idx_key, {"event_id": audit_event.event_id})
# ==================== Audit Reporting ====================
async def generate_audit_report(
self,
start_date: datetime,
end_date: datetime,
report_type: str = "access_summary"
) -> 'AuditReport':
"""Generate compliance audit report"""
events = await self._query_audit_events(start_date, end_date)
if report_type == "access_summary":
return self._generate_access_summary(events)
elif report_type == "document_access":
return self._generate_document_access_report(events)
elif report_type == "user_activity":
return self._generate_user_activity_report(events)
elif report_type == "phi_access":
return self._generate_phi_access_report(events)
else:
raise ValueError(f"Unknown report type: {report_type}")
async def export_audit_trail(
self,
event_ids: List[str],
format: str = "json",
signer: Optional['Signer'] = None
) -> bytes:
"""Export audit trail with optional electronic signature"""
events = await self._load_audit_events(event_ids)
export_data = {
"export_id": str(uuid4()),
"exported_at": datetime.utcnow().isoformat(),
"event_count": len(events),
"events": [e.__dict__ for e in events]
}
if signer:
# Sign the export (21 CFR Part 11)
export_hash = hashlib.sha256(
json.dumps(export_data, sort_keys=True).encode()
).hexdigest()
export_data["signature"] = signer.sign(export_hash)
export_data["signer_id"] = signer.user_id
export_data["signature_timestamp"] = datetime.utcnow().isoformat()
if format == "json":
return json.dumps(export_data, indent=2).encode()
elif format == "csv":
return self._events_to_csv(events)
else:
raise ValueError(f"Unknown export format: {format}")
CLI Commands
# Query with compliance
coditect rag query \
--query "What are the adverse events for drug X?" \
--corpus-id clinical_corpus \
--user-role researcher \
--output-format json
# Generate audit report
coditect audit report \
--type phi_access \
--start-date 2026-01-01 \
--end-date 2026-01-15 \
--output ./reports/phi_audit.pdf \
--sign
# Export audit trail
coditect audit export \
--event-ids event1,event2,event3 \
--format json \
--sign \
--output ./exports/audit_trail.json
# Validate audit integrity
coditect audit validate \
--start-date 2026-01-01 \
--check-signatures \
--check-integrity
Consequences
Positive
- Regulatory compliance: Built-in 21 CFR Part 11, HIPAA, SOC2 controls
- Auditability: Complete record of all RAG operations
- Access control: Role-based filtering prevents unauthorized access
- Quality assurance: Hallucination detection improves response reliability
Negative
- Performance overhead: Audit logging adds latency (~10-20ms)
- Storage growth: Audit logs grow with usage
- Complexity: More configuration for RBAC and classification
- User friction: May restrict some queries based on role
Metrics
| Metric | Target | Measurement |
|---|---|---|
| Audit log completeness | 100% | Events logged / operations |
| Citation accuracy | >95% | Valid citations / total citations |
| Hallucination rate | <5% | Unsupported claims / total claims |
| Access violation rate | 0% | Unauthorized access attempts |
Related ADRs
- ADR-027: Hybrid Document Processing Architecture (parent)
- ADR-029: Hierarchical Knowledge Store (data source)
- ADR-015: Compliance Audit Framework (audit infrastructure)