Agent Skills Framework Extension (Optional)
Compliance Frameworks Skill
When to Use This Skill
Use this skill when implementing compliance frameworks patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
SOC2, GDPR, HIPAA compliance implementation with audit logging and certification preparation.
Core Capabilities
- SOC2 Type II - Security, availability, processing integrity
- GDPR - Data protection, privacy rights, consent management
- HIPAA - Healthcare data security, BAA requirements
- Audit Logging - Comprehensive activity tracking
- Evidence Collection - Automated compliance artifacts
SOC2 Control Implementation
// src/compliance/soc2-controls.ts
/**
* SOC2 Trust Services Criteria Implementation
*/
// CC1 - Control Environment
export interface ControlEnvironment {
accessControl: {
mfaEnabled: boolean;
roleBasedAccess: boolean;
leastPrivilege: boolean;
periodicReview: boolean;
};
policies: {
securityPolicy: string;
acceptableUse: string;
incidentResponse: string;
};
}
// CC2 - Communication and Information
export interface CommunicationControls {
securityAwareness: {
trainingCompleted: Date;
testScores: number[];
};
changeManagement: {
approvalRequired: boolean;
documentationRequired: boolean;
};
}
// CC3 - Risk Assessment
export interface RiskControls {
vulnerabilityScanning: {
frequency: 'daily' | 'weekly' | 'monthly';
lastScan: Date;
criticalFindings: number;
};
penetrationTesting: {
frequency: 'annual' | 'semi-annual';
lastTest: Date;
reportLocation: string;
};
}
// CC6 - Logical and Physical Access
export class AccessControlService {
async enforceAccessPolicy(userId: string, resource: string): Promise<boolean> {
const user = await this.userRepository.findById(userId);
const policy = await this.policyEngine.evaluate({
subject: user,
resource,
action: 'access',
});
await this.auditLogger.log({
event: 'ACCESS_ATTEMPT',
userId,
resource,
allowed: policy.allowed,
reason: policy.reason,
});
return policy.allowed;
}
async revokeAccess(userId: string, reason: string): Promise<void> {
await this.sessionManager.invalidateAll(userId);
await this.auditLogger.log({
event: 'ACCESS_REVOKED',
userId,
reason,
timestamp: new Date(),
});
}
}
// CC7 - System Operations
export class SystemOperationsControls {
async detectSecurityEvents(): Promise<SecurityEvent[]> {
const events = await this.securityMonitor.getEvents({
severity: ['critical', 'high'],
since: new Date(Date.now() - 24 * 60 * 60 * 1000),
});
for (const event of events) {
if (event.severity === 'critical') {
await this.alerting.sendAlert({
channel: 'security-incidents',
message: `Critical security event: ${event.description}`,
priority: 'P0',
});
}
}
return events;
}
}
// CC8 - Change Management
export class ChangeManagementControls {
async requestChange(change: ChangeRequest): Promise<ChangeTicket> {
const ticket = await this.ticketSystem.create({
type: 'change',
title: change.title,
description: change.description,
riskAssessment: change.riskAssessment,
rollbackPlan: change.rollbackPlan,
requester: change.requesterId,
status: 'pending_approval',
});
await this.notifyApprovers(ticket);
return ticket;
}
async approveChange(ticketId: string, approverId: string): Promise<void> {
const ticket = await this.ticketSystem.get(ticketId);
if (!this.isAuthorizedApprover(approverId, ticket)) {
throw new UnauthorizedError('Not authorized to approve this change');
}
await this.ticketSystem.update(ticketId, {
status: 'approved',
approvedBy: approverId,
approvedAt: new Date(),
});
await this.auditLogger.log({
event: 'CHANGE_APPROVED',
ticketId,
approverId,
});
}
}
GDPR Data Protection
// src/compliance/gdpr/data-protection.ts
export interface PersonalData {
id: string;
dataSubjectId: string;
category: DataCategory;
processingBasis: ProcessingBasis;
retentionPeriod: number; // days
encryptionStatus: 'encrypted' | 'pseudonymized' | 'plain';
}
export enum DataCategory {
BASIC_IDENTITY = 'basic_identity',
CONTACT = 'contact',
FINANCIAL = 'financial',
SENSITIVE = 'sensitive',
BEHAVIORAL = 'behavioral',
}
export enum ProcessingBasis {
CONSENT = 'consent',
CONTRACT = 'contract',
LEGAL_OBLIGATION = 'legal_obligation',
VITAL_INTERESTS = 'vital_interests',
PUBLIC_TASK = 'public_task',
LEGITIMATE_INTERESTS = 'legitimate_interests',
}
export class GDPRComplianceService {
// Article 15 - Right of Access
async handleAccessRequest(dataSubjectId: string): Promise<DataExport> {
const personalData = await this.dataInventory.findBySubject(dataSubjectId);
await this.auditLogger.log({
event: 'DSAR_ACCESS_REQUEST',
dataSubjectId,
dataCategories: personalData.map(d => d.category),
});
return {
exportedAt: new Date(),
format: 'json',
data: this.formatForExport(personalData),
};
}
// Article 17 - Right to Erasure
async handleErasureRequest(dataSubjectId: string): Promise<ErasureReport> {
const personalData = await this.dataInventory.findBySubject(dataSubjectId);
const report: ErasureReport = { deleted: [], retained: [] };
for (const data of personalData) {
if (this.canErase(data)) {
await this.dataRepository.delete(data.id);
report.deleted.push(data.id);
} else {
report.retained.push({
id: data.id,
reason: this.getRetentionReason(data),
});
}
}
await this.auditLogger.log({
event: 'DSAR_ERASURE_REQUEST',
dataSubjectId,
deletedCount: report.deleted.length,
retainedCount: report.retained.length,
});
return report;
}
// Article 20 - Right to Data Portability
async handlePortabilityRequest(dataSubjectId: string): Promise<PortableData> {
const data = await this.dataInventory.findBySubject(dataSubjectId, {
processingBasis: [ProcessingBasis.CONSENT, ProcessingBasis.CONTRACT],
});
return {
format: 'application/json',
data: JSON.stringify(data, null, 2),
checksum: this.calculateChecksum(data),
};
}
// Consent Management
async recordConsent(consent: ConsentRecord): Promise<void> {
await this.consentRepository.save({
...consent,
recordedAt: new Date(),
ipAddress: consent.ipAddress,
userAgent: consent.userAgent,
});
await this.auditLogger.log({
event: 'CONSENT_RECORDED',
dataSubjectId: consent.dataSubjectId,
purposes: consent.purposes,
version: consent.privacyPolicyVersion,
});
}
async withdrawConsent(dataSubjectId: string, purposes: string[]): Promise<void> {
await this.consentRepository.withdraw(dataSubjectId, purposes);
// Stop processing for withdrawn purposes
await this.processingController.stopForPurposes(dataSubjectId, purposes);
await this.auditLogger.log({
event: 'CONSENT_WITHDRAWN',
dataSubjectId,
purposes,
});
}
private canErase(data: PersonalData): boolean {
// Check legal retention requirements
if (data.processingBasis === ProcessingBasis.LEGAL_OBLIGATION) {
return false;
}
// Check if within mandatory retention period
const retentionEnd = new Date(data.createdAt);
retentionEnd.setDate(retentionEnd.getDate() + data.retentionPeriod);
return new Date() > retentionEnd;
}
}
Audit Logging System
// src/compliance/audit/audit-logger.ts
import { Pool } from 'pg';
import { getCurrentTenant } from '../../middleware/tenant-context';
export interface AuditEntry {
id: string;
timestamp: Date;
tenantId: string;
userId: string;
event: string;
resource?: string;
resourceId?: string;
action: string;
outcome: 'success' | 'failure';
details: Record<string, unknown>;
metadata: {
ip?: string;
userAgent?: string;
sessionId?: string;
requestId?: string;
};
}
export class ComplianceAuditLogger {
private pool: Pool;
private buffer: AuditEntry[] = [];
private flushInterval: NodeJS.Timer;
constructor(pool: Pool) {
this.pool = pool;
this.flushInterval = setInterval(() => this.flush(), 5000);
}
async log(entry: Omit<AuditEntry, 'id' | 'timestamp' | 'tenantId'>): Promise<void> {
const context = getCurrentTenant();
const auditEntry: AuditEntry = {
id: crypto.randomUUID(),
timestamp: new Date(),
tenantId: context.tenantId,
...entry,
};
this.buffer.push(auditEntry);
// Immediate flush for security events
if (this.isSecurityEvent(entry.event)) {
await this.flush();
}
}
private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
const entries = this.buffer.splice(0, this.buffer.length);
const client = await this.pool.connect();
try {
await client.query('BEGIN');
for (const entry of entries) {
await client.query(
`INSERT INTO audit_log (
id, timestamp, tenant_id, user_id, event, resource,
resource_id, action, outcome, details, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`,
[
entry.id,
entry.timestamp,
entry.tenantId,
entry.userId,
entry.event,
entry.resource,
entry.resourceId,
entry.action,
entry.outcome,
JSON.stringify(entry.details),
JSON.stringify(entry.metadata),
]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
// Re-add to buffer for retry
this.buffer.unshift(...entries);
throw error;
} finally {
client.release();
}
}
private isSecurityEvent(event: string): boolean {
return [
'LOGIN_FAILED',
'ACCESS_DENIED',
'PERMISSION_CHANGED',
'SENSITIVE_DATA_ACCESS',
'API_KEY_CREATED',
'MFA_DISABLED',
].includes(event);
}
}
// Audit log schema
/*
CREATE TABLE audit_log (
id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
event VARCHAR(100) NOT NULL,
resource VARCHAR(100),
resource_id UUID,
action VARCHAR(50) NOT NULL,
outcome VARCHAR(20) NOT NULL,
details JSONB,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (timestamp);
-- Create monthly partitions
CREATE TABLE audit_log_2025_01 PARTITION OF audit_log
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
-- Index for compliance queries
CREATE INDEX idx_audit_log_tenant_event ON audit_log (tenant_id, event, timestamp);
CREATE INDEX idx_audit_log_user ON audit_log (user_id, timestamp);
CREATE INDEX idx_audit_log_resource ON audit_log (resource, resource_id, timestamp);
*/
Evidence Collection Automation
#!/usr/bin/env python3
# scripts/collect-compliance-evidence.py
import json
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
class ComplianceEvidenceCollector:
def __init__(self, output_dir: str = "compliance-evidence"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.period_start = datetime.now() - timedelta(days=30)
self.period_end = datetime.now()
def collect_all(self) -> dict:
"""Collect all compliance evidence."""
evidence = {
"collection_date": datetime.now().isoformat(),
"period": {
"start": self.period_start.isoformat(),
"end": self.period_end.isoformat(),
},
"controls": {},
}
# CC6 - Access Controls
evidence["controls"]["CC6"] = {
"access_reviews": self.collect_access_reviews(),
"mfa_status": self.collect_mfa_status(),
"terminated_access": self.collect_terminated_access(),
}
# CC7 - System Operations
evidence["controls"]["CC7"] = {
"security_events": self.collect_security_events(),
"vulnerability_scans": self.collect_vulnerability_scans(),
"incident_reports": self.collect_incident_reports(),
}
# CC8 - Change Management
evidence["controls"]["CC8"] = {
"change_tickets": self.collect_change_tickets(),
"deployment_logs": self.collect_deployment_logs(),
}
# Save evidence
evidence_file = self.output_dir / f"evidence_{datetime.now().strftime('%Y%m%d')}.json"
with open(evidence_file, 'w') as f:
json.dump(evidence, f, indent=2, default=str)
return evidence
def collect_access_reviews(self) -> dict:
"""Collect evidence of access reviews."""
# Query access review completions
result = subprocess.run([
"psql", "-c",
f"""
SELECT
reviewer_id,
review_date,
users_reviewed,
changes_made
FROM access_reviews
WHERE review_date >= '{self.period_start.isoformat()}'
""",
"-t", "-A", "-F", ","
], capture_output=True, text=True)
return {
"raw_data": result.stdout,
"summary": "Access reviews completed as scheduled",
}
def collect_mfa_status(self) -> dict:
"""Collect MFA enrollment evidence."""
result = subprocess.run([
"psql", "-c",
"""
SELECT
COUNT(*) as total_users,
SUM(CASE WHEN mfa_enabled THEN 1 ELSE 0 END) as mfa_enabled,
SUM(CASE WHEN mfa_enabled THEN 0 ELSE 1 END) as mfa_disabled
FROM users
WHERE status = 'active'
""",
"-t", "-A", "-F", ","
], capture_output=True, text=True)
return {"mfa_statistics": result.stdout.strip()}
def collect_security_events(self) -> dict:
"""Collect security event logs."""
result = subprocess.run([
"psql", "-c",
f"""
SELECT
event_type,
COUNT(*) as count,
MAX(timestamp) as last_occurrence
FROM security_events
WHERE timestamp >= '{self.period_start.isoformat()}'
GROUP BY event_type
ORDER BY count DESC
""",
"-t", "-A", "-F", ","
], capture_output=True, text=True)
return {"event_summary": result.stdout}
def collect_vulnerability_scans(self) -> dict:
"""Collect vulnerability scan reports."""
scan_dir = Path("security-reports")
scans = []
for scan_file in scan_dir.glob("*.json"):
with open(scan_file) as f:
scan_data = json.load(f)
scans.append({
"date": scan_file.stem,
"critical": scan_data.get("summary", {}).get("critical", 0),
"high": scan_data.get("summary", {}).get("high", 0),
})
return {"scans": scans}
def collect_change_tickets(self) -> dict:
"""Collect change management tickets."""
result = subprocess.run([
"gh", "issue", "list",
"--label", "change-request",
"--state", "all",
"--json", "number,title,state,createdAt,closedAt",
"--limit", "100"
], capture_output=True, text=True)
return {"tickets": json.loads(result.stdout) if result.stdout else []}
def collect_deployment_logs(self) -> dict:
"""Collect deployment history."""
result = subprocess.run([
"gcloud", "run", "revisions", "list",
"--format", "json",
"--filter", f"metadata.creationTimestamp>={self.period_start.isoformat()}"
], capture_output=True, text=True)
return {"deployments": json.loads(result.stdout) if result.stdout else []}
if __name__ == "__main__":
collector = ComplianceEvidenceCollector()
evidence = collector.collect_all()
print(f"Evidence collected: {len(evidence['controls'])} control categories")
Compliance Dashboard Metrics
# prometheus/rules/compliance-metrics.yml
groups:
- name: compliance
interval: 1m
rules:
# Access Control Metrics
- record: compliance:mfa_enabled_ratio
expr: |
sum(user_mfa_enabled) / count(user_mfa_enabled)
- alert: MFAComplianceLow
expr: compliance:mfa_enabled_ratio < 0.95
for: 1h
labels:
severity: warning
compliance: soc2
annotations:
summary: "MFA adoption below 95%"
# Data Retention Metrics
- record: compliance:data_retention_violations
expr: |
count(data_records{retention_expired="true"})
- alert: DataRetentionViolation
expr: compliance:data_retention_violations > 0
for: 5m
labels:
severity: critical
compliance: gdpr
annotations:
summary: "Data exceeding retention period found"
# Encryption Metrics
- record: compliance:unencrypted_sensitive_data
expr: |
count(sensitive_data{encrypted="false"})
- alert: UnencryptedSensitiveData
expr: compliance:unencrypted_sensitive_data > 0
for: 5m
labels:
severity: critical
compliance: hipaa
annotations:
summary: "Unencrypted sensitive data detected"
Usage Examples
Implement SOC2 Controls
Apply compliance-frameworks skill to implement SOC2 Type II controls for access management and change control
Create GDPR Data Processing
Apply compliance-frameworks skill to build GDPR-compliant data subject request handling with right to erasure
Set Up Audit Logging
Apply compliance-frameworks skill to implement comprehensive audit logging with tenant isolation and retention
Integration Points
- multi-tenant-security - Tenant-aware compliance controls
- vulnerability-assessment - Security scanning for compliance
- monitoring-observability - Compliance metrics and dashboards
- audit-logging - Detailed activity tracking
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: compliance-frameworks
Completed:
- [x] SOC2 controls implemented (CC1-CC8)
- [x] GDPR data processing handlers created
- [x] Audit logging system deployed
- [x] Compliance evidence collection automated
- [x] Prometheus metrics configured
Outputs:
- src/compliance/soc2-controls.ts
- src/compliance/gdpr/data-protection.ts
- src/compliance/audit/audit-logger.ts
- scripts/collect-compliance-evidence.py
- prometheus/rules/compliance-metrics.yml
- Compliance controls: 100% coverage
- Audit log retention: 7 years configured
Completion Checklist
Before marking this skill as complete, verify:
- All SOC2 Trust Services Criteria implemented (CC1-CC8)
- GDPR data subject rights handlers working (access, erasure, portability)
- Audit logging captures all security-critical events
- Evidence collection script tested and functional
- Compliance metrics dashboards created in Prometheus/Grafana
- Multi-tenant isolation validated for all compliance controls
- Retention policies configured correctly (GDPR, SOC2, HIPAA)
Failure Indicators
This skill has FAILED if:
- ❌ SOC2 control implementation incomplete or missing required checks
- ❌ GDPR data subject requests fail or return incomplete data
- ❌ Audit logs not capturing security events (login failures, access denials)
- ❌ Evidence collection script errors or produces invalid JSON
- ❌ Compliance metrics not appearing in dashboards
- ❌ Tenant isolation broken (cross-tenant data leakage)
When NOT to Use
Do NOT use this skill when:
- Application doesn't handle sensitive data (no compliance requirements)
- Early prototype/MVP phase (implement later when approaching production)
- Compliance requirements already satisfied by third-party services
- Non-commercial/personal projects without legal obligations
- Compliance needs are country-specific and not covered (SOC2/GDPR/HIPAA only)
Use alternatives:
- Third-party compliance platforms (Vanta, Drata) for automated compliance
- Manual compliance for small-scale, low-risk applications
- Consult legal/compliance experts for custom frameworks
- Use cloud provider's compliance features (AWS Config, Azure Policy)
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Compliance as afterthought | Expensive retrofitting | Design with compliance from day 1 |
| Incomplete audit logging | Can't prove compliance | Log all security-critical events |
| Ignoring data retention | GDPR violations | Automate retention policy enforcement |
| Copy-paste controls | Context mismatch | Adapt controls to your specific system |
| No evidence automation | Manual collection burden | Automate evidence collection scripts |
| Missing tenant isolation | Cross-tenant data leakage | Validate isolation for every control |
| Weak consent management | GDPR non-compliance | Granular consent with withdrawal capability |
Principles
This skill embodies:
- #4 Separation of Concerns - Each compliance framework isolated (SOC2, GDPR, HIPAA)
- #5 Eliminate Ambiguity - Clear control implementation with acceptance criteria
- #6 Clear, Understandable, Explainable - Audit logs and evidence are human-readable
- #7 Validate Continuously - Compliance metrics monitored in real-time
- #8 No Assumptions - Verify all controls with automated tests
- #9 Automate Repetitive Tasks - Evidence collection and metric tracking automated
- Trust & Transparency - All compliance controls documented and auditable