Skip to main content

Agent Skills Framework Extension (Optional)

Compliance Frameworks Skill

When to Use This Skill

Use this skill when implementing compliance frameworks patterns in your codebase.

How to Use This Skill

  1. Review the patterns and examples below
  2. Apply the relevant patterns to your implementation
  3. Follow the best practices outlined in this skill

SOC2, GDPR, HIPAA compliance implementation with audit logging and certification preparation.

Core Capabilities

  1. SOC2 Type II - Security, availability, processing integrity
  2. GDPR - Data protection, privacy rights, consent management
  3. HIPAA - Healthcare data security, BAA requirements
  4. Audit Logging - Comprehensive activity tracking
  5. Evidence Collection - Automated compliance artifacts

SOC2 Control Implementation

// src/compliance/soc2-controls.ts

/**
* SOC2 Trust Services Criteria Implementation
*/

// CC1 - Control Environment
export interface ControlEnvironment {
accessControl: {
mfaEnabled: boolean;
roleBasedAccess: boolean;
leastPrivilege: boolean;
periodicReview: boolean;
};
policies: {
securityPolicy: string;
acceptableUse: string;
incidentResponse: string;
};
}

// CC2 - Communication and Information
export interface CommunicationControls {
securityAwareness: {
trainingCompleted: Date;
testScores: number[];
};
changeManagement: {
approvalRequired: boolean;
documentationRequired: boolean;
};
}

// CC3 - Risk Assessment
export interface RiskControls {
vulnerabilityScanning: {
frequency: 'daily' | 'weekly' | 'monthly';
lastScan: Date;
criticalFindings: number;
};
penetrationTesting: {
frequency: 'annual' | 'semi-annual';
lastTest: Date;
reportLocation: string;
};
}

// CC6 - Logical and Physical Access
export class AccessControlService {
async enforceAccessPolicy(userId: string, resource: string): Promise<boolean> {
const user = await this.userRepository.findById(userId);
const policy = await this.policyEngine.evaluate({
subject: user,
resource,
action: 'access',
});

await this.auditLogger.log({
event: 'ACCESS_ATTEMPT',
userId,
resource,
allowed: policy.allowed,
reason: policy.reason,
});

return policy.allowed;
}

async revokeAccess(userId: string, reason: string): Promise<void> {
await this.sessionManager.invalidateAll(userId);
await this.auditLogger.log({
event: 'ACCESS_REVOKED',
userId,
reason,
timestamp: new Date(),
});
}
}

// CC7 - System Operations
export class SystemOperationsControls {
async detectSecurityEvents(): Promise<SecurityEvent[]> {
const events = await this.securityMonitor.getEvents({
severity: ['critical', 'high'],
since: new Date(Date.now() - 24 * 60 * 60 * 1000),
});

for (const event of events) {
if (event.severity === 'critical') {
await this.alerting.sendAlert({
channel: 'security-incidents',
message: `Critical security event: ${event.description}`,
priority: 'P0',
});
}
}

return events;
}
}

// CC8 - Change Management
export class ChangeManagementControls {
async requestChange(change: ChangeRequest): Promise<ChangeTicket> {
const ticket = await this.ticketSystem.create({
type: 'change',
title: change.title,
description: change.description,
riskAssessment: change.riskAssessment,
rollbackPlan: change.rollbackPlan,
requester: change.requesterId,
status: 'pending_approval',
});

await this.notifyApprovers(ticket);
return ticket;
}

async approveChange(ticketId: string, approverId: string): Promise<void> {
const ticket = await this.ticketSystem.get(ticketId);

if (!this.isAuthorizedApprover(approverId, ticket)) {
throw new UnauthorizedError('Not authorized to approve this change');
}

await this.ticketSystem.update(ticketId, {
status: 'approved',
approvedBy: approverId,
approvedAt: new Date(),
});

await this.auditLogger.log({
event: 'CHANGE_APPROVED',
ticketId,
approverId,
});
}
}

GDPR Data Protection

// src/compliance/gdpr/data-protection.ts

export interface PersonalData {
id: string;
dataSubjectId: string;
category: DataCategory;
processingBasis: ProcessingBasis;
retentionPeriod: number; // days
encryptionStatus: 'encrypted' | 'pseudonymized' | 'plain';
}

export enum DataCategory {
BASIC_IDENTITY = 'basic_identity',
CONTACT = 'contact',
FINANCIAL = 'financial',
SENSITIVE = 'sensitive',
BEHAVIORAL = 'behavioral',
}

export enum ProcessingBasis {
CONSENT = 'consent',
CONTRACT = 'contract',
LEGAL_OBLIGATION = 'legal_obligation',
VITAL_INTERESTS = 'vital_interests',
PUBLIC_TASK = 'public_task',
LEGITIMATE_INTERESTS = 'legitimate_interests',
}

export class GDPRComplianceService {
// Article 15 - Right of Access
async handleAccessRequest(dataSubjectId: string): Promise<DataExport> {
const personalData = await this.dataInventory.findBySubject(dataSubjectId);

await this.auditLogger.log({
event: 'DSAR_ACCESS_REQUEST',
dataSubjectId,
dataCategories: personalData.map(d => d.category),
});

return {
exportedAt: new Date(),
format: 'json',
data: this.formatForExport(personalData),
};
}

// Article 17 - Right to Erasure
async handleErasureRequest(dataSubjectId: string): Promise<ErasureReport> {
const personalData = await this.dataInventory.findBySubject(dataSubjectId);
const report: ErasureReport = { deleted: [], retained: [] };

for (const data of personalData) {
if (this.canErase(data)) {
await this.dataRepository.delete(data.id);
report.deleted.push(data.id);
} else {
report.retained.push({
id: data.id,
reason: this.getRetentionReason(data),
});
}
}

await this.auditLogger.log({
event: 'DSAR_ERASURE_REQUEST',
dataSubjectId,
deletedCount: report.deleted.length,
retainedCount: report.retained.length,
});

return report;
}

// Article 20 - Right to Data Portability
async handlePortabilityRequest(dataSubjectId: string): Promise<PortableData> {
const data = await this.dataInventory.findBySubject(dataSubjectId, {
processingBasis: [ProcessingBasis.CONSENT, ProcessingBasis.CONTRACT],
});

return {
format: 'application/json',
data: JSON.stringify(data, null, 2),
checksum: this.calculateChecksum(data),
};
}

// Consent Management
async recordConsent(consent: ConsentRecord): Promise<void> {
await this.consentRepository.save({
...consent,
recordedAt: new Date(),
ipAddress: consent.ipAddress,
userAgent: consent.userAgent,
});

await this.auditLogger.log({
event: 'CONSENT_RECORDED',
dataSubjectId: consent.dataSubjectId,
purposes: consent.purposes,
version: consent.privacyPolicyVersion,
});
}

async withdrawConsent(dataSubjectId: string, purposes: string[]): Promise<void> {
await this.consentRepository.withdraw(dataSubjectId, purposes);

// Stop processing for withdrawn purposes
await this.processingController.stopForPurposes(dataSubjectId, purposes);

await this.auditLogger.log({
event: 'CONSENT_WITHDRAWN',
dataSubjectId,
purposes,
});
}

private canErase(data: PersonalData): boolean {
// Check legal retention requirements
if (data.processingBasis === ProcessingBasis.LEGAL_OBLIGATION) {
return false;
}

// Check if within mandatory retention period
const retentionEnd = new Date(data.createdAt);
retentionEnd.setDate(retentionEnd.getDate() + data.retentionPeriod);

return new Date() > retentionEnd;
}
}

Audit Logging System

// src/compliance/audit/audit-logger.ts
import { Pool } from 'pg';
import { getCurrentTenant } from '../../middleware/tenant-context';

export interface AuditEntry {
id: string;
timestamp: Date;
tenantId: string;
userId: string;
event: string;
resource?: string;
resourceId?: string;
action: string;
outcome: 'success' | 'failure';
details: Record<string, unknown>;
metadata: {
ip?: string;
userAgent?: string;
sessionId?: string;
requestId?: string;
};
}

export class ComplianceAuditLogger {
private pool: Pool;
private buffer: AuditEntry[] = [];
private flushInterval: NodeJS.Timer;

constructor(pool: Pool) {
this.pool = pool;
this.flushInterval = setInterval(() => this.flush(), 5000);
}

async log(entry: Omit<AuditEntry, 'id' | 'timestamp' | 'tenantId'>): Promise<void> {
const context = getCurrentTenant();

const auditEntry: AuditEntry = {
id: crypto.randomUUID(),
timestamp: new Date(),
tenantId: context.tenantId,
...entry,
};

this.buffer.push(auditEntry);

// Immediate flush for security events
if (this.isSecurityEvent(entry.event)) {
await this.flush();
}
}

private async flush(): Promise<void> {
if (this.buffer.length === 0) return;

const entries = this.buffer.splice(0, this.buffer.length);

const client = await this.pool.connect();
try {
await client.query('BEGIN');

for (const entry of entries) {
await client.query(
`INSERT INTO audit_log (
id, timestamp, tenant_id, user_id, event, resource,
resource_id, action, outcome, details, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`,
[
entry.id,
entry.timestamp,
entry.tenantId,
entry.userId,
entry.event,
entry.resource,
entry.resourceId,
entry.action,
entry.outcome,
JSON.stringify(entry.details),
JSON.stringify(entry.metadata),
]
);
}

await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
// Re-add to buffer for retry
this.buffer.unshift(...entries);
throw error;
} finally {
client.release();
}
}

private isSecurityEvent(event: string): boolean {
return [
'LOGIN_FAILED',
'ACCESS_DENIED',
'PERMISSION_CHANGED',
'SENSITIVE_DATA_ACCESS',
'API_KEY_CREATED',
'MFA_DISABLED',
].includes(event);
}
}

// Audit log schema
/*
CREATE TABLE audit_log (
id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
tenant_id UUID NOT NULL,
user_id UUID NOT NULL,
event VARCHAR(100) NOT NULL,
resource VARCHAR(100),
resource_id UUID,
action VARCHAR(50) NOT NULL,
outcome VARCHAR(20) NOT NULL,
details JSONB,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (timestamp);

-- Create monthly partitions
CREATE TABLE audit_log_2025_01 PARTITION OF audit_log
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

-- Index for compliance queries
CREATE INDEX idx_audit_log_tenant_event ON audit_log (tenant_id, event, timestamp);
CREATE INDEX idx_audit_log_user ON audit_log (user_id, timestamp);
CREATE INDEX idx_audit_log_resource ON audit_log (resource, resource_id, timestamp);
*/

Evidence Collection Automation

#!/usr/bin/env python3
# scripts/collect-compliance-evidence.py

import json
import subprocess
from datetime import datetime, timedelta
from pathlib import Path

class ComplianceEvidenceCollector:
def __init__(self, output_dir: str = "compliance-evidence"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.period_start = datetime.now() - timedelta(days=30)
self.period_end = datetime.now()

def collect_all(self) -> dict:
"""Collect all compliance evidence."""
evidence = {
"collection_date": datetime.now().isoformat(),
"period": {
"start": self.period_start.isoformat(),
"end": self.period_end.isoformat(),
},
"controls": {},
}

# CC6 - Access Controls
evidence["controls"]["CC6"] = {
"access_reviews": self.collect_access_reviews(),
"mfa_status": self.collect_mfa_status(),
"terminated_access": self.collect_terminated_access(),
}

# CC7 - System Operations
evidence["controls"]["CC7"] = {
"security_events": self.collect_security_events(),
"vulnerability_scans": self.collect_vulnerability_scans(),
"incident_reports": self.collect_incident_reports(),
}

# CC8 - Change Management
evidence["controls"]["CC8"] = {
"change_tickets": self.collect_change_tickets(),
"deployment_logs": self.collect_deployment_logs(),
}

# Save evidence
evidence_file = self.output_dir / f"evidence_{datetime.now().strftime('%Y%m%d')}.json"
with open(evidence_file, 'w') as f:
json.dump(evidence, f, indent=2, default=str)

return evidence

def collect_access_reviews(self) -> dict:
"""Collect evidence of access reviews."""
# Query access review completions
result = subprocess.run([
"psql", "-c",
f"""
SELECT
reviewer_id,
review_date,
users_reviewed,
changes_made
FROM access_reviews
WHERE review_date >= '{self.period_start.isoformat()}'
""",
"-t", "-A", "-F", ","
], capture_output=True, text=True)

return {
"raw_data": result.stdout,
"summary": "Access reviews completed as scheduled",
}

def collect_mfa_status(self) -> dict:
"""Collect MFA enrollment evidence."""
result = subprocess.run([
"psql", "-c",
"""
SELECT
COUNT(*) as total_users,
SUM(CASE WHEN mfa_enabled THEN 1 ELSE 0 END) as mfa_enabled,
SUM(CASE WHEN mfa_enabled THEN 0 ELSE 1 END) as mfa_disabled
FROM users
WHERE status = 'active'
""",
"-t", "-A", "-F", ","
], capture_output=True, text=True)

return {"mfa_statistics": result.stdout.strip()}

def collect_security_events(self) -> dict:
"""Collect security event logs."""
result = subprocess.run([
"psql", "-c",
f"""
SELECT
event_type,
COUNT(*) as count,
MAX(timestamp) as last_occurrence
FROM security_events
WHERE timestamp >= '{self.period_start.isoformat()}'
GROUP BY event_type
ORDER BY count DESC
""",
"-t", "-A", "-F", ","
], capture_output=True, text=True)

return {"event_summary": result.stdout}

def collect_vulnerability_scans(self) -> dict:
"""Collect vulnerability scan reports."""
scan_dir = Path("security-reports")
scans = []

for scan_file in scan_dir.glob("*.json"):
with open(scan_file) as f:
scan_data = json.load(f)
scans.append({
"date": scan_file.stem,
"critical": scan_data.get("summary", {}).get("critical", 0),
"high": scan_data.get("summary", {}).get("high", 0),
})

return {"scans": scans}

def collect_change_tickets(self) -> dict:
"""Collect change management tickets."""
result = subprocess.run([
"gh", "issue", "list",
"--label", "change-request",
"--state", "all",
"--json", "number,title,state,createdAt,closedAt",
"--limit", "100"
], capture_output=True, text=True)

return {"tickets": json.loads(result.stdout) if result.stdout else []}

def collect_deployment_logs(self) -> dict:
"""Collect deployment history."""
result = subprocess.run([
"gcloud", "run", "revisions", "list",
"--format", "json",
"--filter", f"metadata.creationTimestamp>={self.period_start.isoformat()}"
], capture_output=True, text=True)

return {"deployments": json.loads(result.stdout) if result.stdout else []}

if __name__ == "__main__":
collector = ComplianceEvidenceCollector()
evidence = collector.collect_all()
print(f"Evidence collected: {len(evidence['controls'])} control categories")

Compliance Dashboard Metrics

# prometheus/rules/compliance-metrics.yml
groups:
- name: compliance
interval: 1m
rules:
# Access Control Metrics
- record: compliance:mfa_enabled_ratio
expr: |
sum(user_mfa_enabled) / count(user_mfa_enabled)

- alert: MFAComplianceLow
expr: compliance:mfa_enabled_ratio < 0.95
for: 1h
labels:
severity: warning
compliance: soc2
annotations:
summary: "MFA adoption below 95%"

# Data Retention Metrics
- record: compliance:data_retention_violations
expr: |
count(data_records{retention_expired="true"})

- alert: DataRetentionViolation
expr: compliance:data_retention_violations > 0
for: 5m
labels:
severity: critical
compliance: gdpr
annotations:
summary: "Data exceeding retention period found"

# Encryption Metrics
- record: compliance:unencrypted_sensitive_data
expr: |
count(sensitive_data{encrypted="false"})

- alert: UnencryptedSensitiveData
expr: compliance:unencrypted_sensitive_data > 0
for: 5m
labels:
severity: critical
compliance: hipaa
annotations:
summary: "Unencrypted sensitive data detected"

Usage Examples

Implement SOC2 Controls

Apply compliance-frameworks skill to implement SOC2 Type II controls for access management and change control

Create GDPR Data Processing

Apply compliance-frameworks skill to build GDPR-compliant data subject request handling with right to erasure

Set Up Audit Logging

Apply compliance-frameworks skill to implement comprehensive audit logging with tenant isolation and retention

Integration Points

  • multi-tenant-security - Tenant-aware compliance controls
  • vulnerability-assessment - Security scanning for compliance
  • monitoring-observability - Compliance metrics and dashboards
  • audit-logging - Detailed activity tracking

Success Output

When successful, this skill MUST output:

✅ SKILL COMPLETE: compliance-frameworks

Completed:
- [x] SOC2 controls implemented (CC1-CC8)
- [x] GDPR data processing handlers created
- [x] Audit logging system deployed
- [x] Compliance evidence collection automated
- [x] Prometheus metrics configured

Outputs:
- src/compliance/soc2-controls.ts
- src/compliance/gdpr/data-protection.ts
- src/compliance/audit/audit-logger.ts
- scripts/collect-compliance-evidence.py
- prometheus/rules/compliance-metrics.yml
- Compliance controls: 100% coverage
- Audit log retention: 7 years configured

Completion Checklist

Before marking this skill as complete, verify:

  • All SOC2 Trust Services Criteria implemented (CC1-CC8)
  • GDPR data subject rights handlers working (access, erasure, portability)
  • Audit logging captures all security-critical events
  • Evidence collection script tested and functional
  • Compliance metrics dashboards created in Prometheus/Grafana
  • Multi-tenant isolation validated for all compliance controls
  • Retention policies configured correctly (GDPR, SOC2, HIPAA)

Failure Indicators

This skill has FAILED if:

  • ❌ SOC2 control implementation incomplete or missing required checks
  • ❌ GDPR data subject requests fail or return incomplete data
  • ❌ Audit logs not capturing security events (login failures, access denials)
  • ❌ Evidence collection script errors or produces invalid JSON
  • ❌ Compliance metrics not appearing in dashboards
  • ❌ Tenant isolation broken (cross-tenant data leakage)

When NOT to Use

Do NOT use this skill when:

  • Application doesn't handle sensitive data (no compliance requirements)
  • Early prototype/MVP phase (implement later when approaching production)
  • Compliance requirements already satisfied by third-party services
  • Non-commercial/personal projects without legal obligations
  • Compliance needs are country-specific and not covered (SOC2/GDPR/HIPAA only)

Use alternatives:

  • Third-party compliance platforms (Vanta, Drata) for automated compliance
  • Manual compliance for small-scale, low-risk applications
  • Consult legal/compliance experts for custom frameworks
  • Use cloud provider's compliance features (AWS Config, Azure Policy)

Anti-Patterns (Avoid)

Anti-PatternProblemSolution
Compliance as afterthoughtExpensive retrofittingDesign with compliance from day 1
Incomplete audit loggingCan't prove complianceLog all security-critical events
Ignoring data retentionGDPR violationsAutomate retention policy enforcement
Copy-paste controlsContext mismatchAdapt controls to your specific system
No evidence automationManual collection burdenAutomate evidence collection scripts
Missing tenant isolationCross-tenant data leakageValidate isolation for every control
Weak consent managementGDPR non-complianceGranular consent with withdrawal capability

Principles

This skill embodies:

  • #4 Separation of Concerns - Each compliance framework isolated (SOC2, GDPR, HIPAA)
  • #5 Eliminate Ambiguity - Clear control implementation with acceptance criteria
  • #6 Clear, Understandable, Explainable - Audit logs and evidence are human-readable
  • #7 Validate Continuously - Compliance metrics monitored in real-time
  • #8 No Assumptions - Verify all controls with automated tests
  • #9 Automate Repetitive Tasks - Evidence collection and metric tracking automated
  • Trust & Transparency - All compliance controls documented and auditable