Security and Governance Framework
Securing Agentic AI Systems in Production
Document ID: C3-SECURITY-GOVERNANCE
Version: 1.0
Category: P3 - Technical Deep Dives
Audience: Security Engineers, Compliance Officers, Platform Architects
Executive Summary
Agentic AI systems introduce unique security challenges: autonomous decision-making, tool execution privileges, and memory persistence. This guide covers authentication, authorization, input validation, output safety, and governance frameworks for production deployments.
Part 1: Threat Model
Attack Surface Analysis
| Attack Vector | Risk Level | Mitigation |
|---|---|---|
| Prompt injection | High | Input sanitization, instruction hierarchy |
| Jailbreaking | High | Constitutional AI, output filtering |
| Data exfiltration | High | Tool sandboxing, output monitoring |
| Privilege escalation | Medium | Least privilege, capability scoping |
| Denial of service | Medium | Rate limiting, resource quotas |
| Model extraction | Low | API rate limits, watermarking |
Threat Categories
threats:
prompt_injection:
description: Malicious prompts that override system instructions
examples:
- "Ignore previous instructions and..."
- Hidden instructions in user data
- Multi-turn manipulation
mitigations:
- Input sanitization
- Instruction hierarchy enforcement
- Output validation
tool_abuse:
description: Misuse of tool capabilities
examples:
- Unauthorized API calls
- Data exfiltration via tools
- Resource exhaustion
mitigations:
- Tool-level authorization
- Sandboxed execution
- Action audit logging
memory_attacks:
description: Poisoning or extracting from memory
examples:
- Injecting false memories
- Extracting sensitive context
- Memory overflow attacks
mitigations:
- Memory validation
- Access controls
- Retention policies
Part 2: Authentication and Authorization
Identity Management
from dataclasses import dataclass
from typing import List, Set, Optional
from enum import Enum
import jwt
class Permission(Enum):
AGENT_EXECUTE = "agent:execute"
AGENT_ADMIN = "agent:admin"
TOOL_READ = "tool:read"
TOOL_WRITE = "tool:write"
TOOL_EXECUTE = "tool:execute"
MEMORY_READ = "memory:read"
MEMORY_WRITE = "memory:write"
AUDIT_READ = "audit:read"
@dataclass
class Identity:
"""User or service identity."""
id: str
type: str # user, service, agent
roles: List[str]
permissions: Set[Permission]
metadata: dict
class AuthenticationService:
"""Handle authentication for agentic systems."""
def __init__(self, jwt_secret: str, token_expiry: int = 3600):
self.jwt_secret = jwt_secret
self.token_expiry = token_expiry
def create_token(self, identity: Identity) -> str:
"""Create JWT token for identity."""
payload = {
"sub": identity.id,
"type": identity.type,
"roles": identity.roles,
"permissions": [p.value for p in identity.permissions],
"exp": time.time() + self.token_expiry
}
return jwt.encode(payload, self.jwt_secret, algorithm="HS256")
def verify_token(self, token: str) -> Identity:
"""Verify and decode JWT token."""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
return Identity(
id=payload["sub"],
type=payload["type"],
roles=payload["roles"],
permissions={Permission(p) for p in payload["permissions"]},
metadata={}
)
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token expired")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")
class AuthorizationService:
"""Handle authorization for agentic operations."""
def __init__(self):
self.role_permissions = {
"admin": {Permission.AGENT_ADMIN, Permission.AGENT_EXECUTE,
Permission.TOOL_READ, Permission.TOOL_WRITE, Permission.TOOL_EXECUTE,
Permission.MEMORY_READ, Permission.MEMORY_WRITE, Permission.AUDIT_READ},
"operator": {Permission.AGENT_EXECUTE, Permission.TOOL_READ,
Permission.TOOL_EXECUTE, Permission.MEMORY_READ},
"viewer": {Permission.TOOL_READ, Permission.MEMORY_READ, Permission.AUDIT_READ}
}
def check_permission(
self,
identity: Identity,
required: Permission,
resource: Optional[str] = None
) -> bool:
"""Check if identity has required permission."""
# Direct permission check
if required in identity.permissions:
return True
# Role-based check
for role in identity.roles:
if role in self.role_permissions:
if required in self.role_permissions[role]:
return True
return False
def enforce_permission(
self,
identity: Identity,
required: Permission,
resource: Optional[str] = None
):
"""Enforce permission, raise if denied."""
if not self.check_permission(identity, required, resource):
raise AuthorizationError(
f"Permission denied: {required.value} for {identity.id}"
)
Tool-Level Authorization
@dataclass
class ToolPermission:
"""Permission configuration for a tool."""
tool_name: str
allowed_roles: List[str]
allowed_identities: List[str]
requires_approval: bool = False
max_calls_per_minute: int = 60
allowed_parameters: Optional[Dict[str, List]] = None
class ToolAuthorizationPolicy:
"""Define and enforce tool access policies."""
def __init__(self):
self.policies: Dict[str, ToolPermission] = {}
def register_policy(self, policy: ToolPermission):
"""Register a tool permission policy."""
self.policies[policy.tool_name] = policy
def authorize_tool_call(
self,
identity: Identity,
tool_name: str,
parameters: Dict
) -> Tuple[bool, Optional[str]]:
"""Authorize a tool call."""
policy = self.policies.get(tool_name)
if not policy:
return False, "No policy defined for tool"
# Check identity
if identity.id not in policy.allowed_identities:
# Check roles
if not any(r in policy.allowed_roles for r in identity.roles):
return False, "Identity not authorized for tool"
# Check parameters
if policy.allowed_parameters:
for param, allowed_values in policy.allowed_parameters.items():
if param in parameters:
if parameters[param] not in allowed_values:
return False, f"Parameter {param} value not allowed"
return True, None
Part 3: Input Validation and Sanitization
Prompt Injection Defense
import re
from typing import Tuple
class InputValidator:
"""Validate and sanitize user inputs."""
# Patterns that may indicate injection attempts
INJECTION_PATTERNS = [
r"ignore\s+(all\s+)?previous\s+instructions",
r"forget\s+(all\s+)?(your\s+)?instructions",
r"you\s+are\s+now\s+a",
r"act\s+as\s+(if\s+you\s+are|a)",
r"pretend\s+(to\s+be|you\s+are)",
r"system\s*:\s*",
r"\[INST\]",
r"<\|im_start\|>",
r"###\s*(instruction|system)",
]
def __init__(self, sensitivity: str = "medium"):
self.sensitivity = sensitivity
self.patterns = [re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS]
def validate_input(self, text: str) -> Tuple[bool, List[str]]:
"""Validate user input for potential injection."""
warnings = []
# Check for injection patterns
for pattern in self.patterns:
if pattern.search(text):
warnings.append(f"Potential injection pattern detected")
# Check for unusual characters
if self._has_unusual_characters(text):
warnings.append("Unusual character sequences detected")
# Check for hidden instructions
if self._has_hidden_instructions(text):
warnings.append("Hidden instruction markers detected")
is_safe = len(warnings) == 0 or self.sensitivity == "low"
return is_safe, warnings
def sanitize_input(self, text: str) -> str:
"""Sanitize user input."""
# Remove control characters
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
# Escape potential instruction markers
text = text.replace("[INST]", "[_INST_]")
text = text.replace("<|im_start|>", "<_im_start_>")
return text.strip()
def _has_unusual_characters(self, text: str) -> bool:
"""Check for unusual character patterns."""
# Check for zero-width characters
if re.search(r'[\u200b-\u200f\u2028-\u202f\u2060-\u206f]', text):
return True
# Check for excessive special characters
special_ratio = len(re.findall(r'[^\w\s]', text)) / max(len(text), 1)
return special_ratio > 0.3
def _has_hidden_instructions(self, text: str) -> bool:
"""Check for hidden instruction patterns."""
# Check for base64 encoded content
if re.search(r'[A-Za-z0-9+/]{50,}={0,2}', text):
return True
# Check for hex encoded content
if re.search(r'(?:0x)?[0-9a-fA-F]{40,}', text):
return True
return False
Content Filtering
class ContentFilter:
"""Filter sensitive content from inputs and outputs."""
def __init__(self):
self.pii_patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b",
"api_key": r"\b[A-Za-z0-9_-]{32,}\b"
}
def detect_pii(self, text: str) -> Dict[str, List[str]]:
"""Detect PII in text."""
findings = {}
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, text)
if matches:
findings[pii_type] = matches
return findings
def redact_pii(self, text: str) -> str:
"""Redact PII from text."""
for pii_type, pattern in self.pii_patterns.items():
text = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
return text
def filter_output(self, text: str, allowed_content: List[str]) -> str:
"""Filter output to only allowed content types."""
# Remove code blocks if not allowed
if "code" not in allowed_content:
text = re.sub(r'```[\s\S]*?```', '[CODE REMOVED]', text)
# Remove URLs if not allowed
if "urls" not in allowed_content:
text = re.sub(r'https?://\S+', '[URL REMOVED]', text)
return text
Part 4: Tool Sandboxing
Execution Sandbox
import subprocess
import tempfile
import os
from pathlib import Path
class ToolSandbox:
"""Sandboxed execution environment for tools."""
def __init__(self, config: Dict):
self.config = config
self.workspace = Path(tempfile.mkdtemp())
self.resource_limits = config.get("resource_limits", {})
async def execute_sandboxed(
self,
tool_name: str,
code: str,
timeout: int = 30
) -> Dict[str, Any]:
"""Execute code in sandboxed environment."""
# Write code to temp file
script_path = self.workspace / f"{tool_name}.py"
script_path.write_text(code)
# Build sandboxed command
cmd = self._build_sandbox_command(script_path)
try:
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
cwd=str(self.workspace),
env=self._get_safe_env()
)
return {
"success": result.returncode == 0,
"stdout": result.stdout.decode()[:10000], # Limit output
"stderr": result.stderr.decode()[:10000],
"exit_code": result.returncode
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": "Execution timeout",
"exit_code": -1
}
finally:
# Cleanup
script_path.unlink(missing_ok=True)
def _build_sandbox_command(self, script_path: Path) -> List[str]:
"""Build command with sandboxing."""
cmd = ["python", str(script_path)]
# Add resource limits if available
if self.resource_limits.get("memory_mb"):
# Use ulimit or cgroups
pass
return cmd
def _get_safe_env(self) -> Dict[str, str]:
"""Get safe environment variables."""
safe_vars = ["PATH", "PYTHONPATH", "HOME", "USER"]
return {k: v for k, v in os.environ.items() if k in safe_vars}
class NetworkPolicy:
"""Control network access for tools."""
def __init__(self):
self.allowed_domains: Set[str] = set()
self.blocked_domains: Set[str] = set()
self.allow_all: bool = False
def allow_domain(self, domain: str):
"""Allow access to domain."""
self.allowed_domains.add(domain)
def block_domain(self, domain: str):
"""Block access to domain."""
self.blocked_domains.add(domain)
def check_access(self, url: str) -> bool:
"""Check if URL access is allowed."""
from urllib.parse import urlparse
domain = urlparse(url).netloc
if domain in self.blocked_domains:
return False
if self.allow_all:
return True
return domain in self.allowed_domains
Part 5: Audit and Compliance
Audit Logging
@dataclass
class AuditEvent:
"""Audit event record."""
event_id: str
timestamp: str
event_type: str
actor_id: str
actor_type: str
resource: str
action: str
outcome: str
details: Dict[str, Any]
ip_address: Optional[str] = None
user_agent: Optional[str] = None
class AuditLogger:
"""Immutable audit logging."""
def __init__(self, storage_backend: str = "database"):
self.storage = storage_backend
self.buffer: List[AuditEvent] = []
async def log_event(
self,
event_type: str,
actor: Identity,
resource: str,
action: str,
outcome: str,
details: Dict = None
):
"""Log an audit event."""
event = AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat(),
event_type=event_type,
actor_id=actor.id,
actor_type=actor.type,
resource=resource,
action=action,
outcome=outcome,
details=details or {}
)
await self._persist_event(event)
async def log_agent_action(
self,
agent_id: str,
action_type: str,
action_details: Dict,
outcome: str
):
"""Log agent action for audit trail."""
await self.log_event(
event_type="agent_action",
actor=Identity(id=agent_id, type="agent", roles=[], permissions=set(), metadata={}),
resource=action_details.get("resource", "unknown"),
action=action_type,
outcome=outcome,
details=action_details
)
async def query_audit_log(
self,
filters: Dict,
start_time: str,
end_time: str,
limit: int = 1000
) -> List[AuditEvent]:
"""Query audit log with filters."""
# Implementation depends on storage backend
pass
Compliance Reporting
class ComplianceReporter:
"""Generate compliance reports."""
def __init__(self, audit_logger: AuditLogger):
self.audit = audit_logger
async def generate_access_report(
self,
start_date: str,
end_date: str
) -> Dict:
"""Generate access control report."""
events = await self.audit.query_audit_log(
filters={"event_type": ["authentication", "authorization"]},
start_time=start_date,
end_time=end_date
)
return {
"period": {"start": start_date, "end": end_date},
"total_access_attempts": len(events),
"successful_authentications": len([e for e in events if e.outcome == "success"]),
"failed_authentications": len([e for e in events if e.outcome == "failure"]),
"unique_users": len(set(e.actor_id for e in events)),
"access_by_resource": self._group_by_resource(events)
}
async def generate_agent_activity_report(
self,
start_date: str,
end_date: str
) -> Dict:
"""Generate agent activity report."""
events = await self.audit.query_audit_log(
filters={"event_type": "agent_action"},
start_time=start_date,
end_time=end_date
)
return {
"period": {"start": start_date, "end": end_date},
"total_actions": len(events),
"actions_by_type": self._group_by_action(events),
"tool_usage": self._analyze_tool_usage(events),
"error_rate": len([e for e in events if e.outcome == "error"]) / max(len(events), 1)
}
Part 6: Governance Framework
Policy Configuration
# governance_policy.yml
governance:
data_retention:
conversation_logs: 90_days
audit_logs: 7_years
memory_data: 30_days
access_controls:
default_deny: true
require_mfa: true
session_timeout: 3600
agent_constraints:
max_iterations: 20
max_tool_calls: 50
require_human_approval:
- financial_transactions
- data_deletion
- external_communications
content_policies:
block_pii_in_logs: true
redact_sensitive_output: true
allowed_output_types:
- text
- code
- structured_data
monitoring:
log_all_llm_calls: true
log_all_tool_calls: true
alert_on_anomalies: true
Quick Reference
Security Checklist
- Input validation enabled
- Output filtering configured
- Tool sandboxing active
- Audit logging enabled
- Access controls defined
- Network policies set
- Retention policies configured
- Compliance reporting scheduled
Security Controls by Paradigm
| Paradigm | Key Risks | Primary Controls |
|---|---|---|
| LSR | Hallucination | Output validation |
| GS | Data leakage | Source filtering |
| EP | Action abuse | Tool authorization |
| VE | Protocol bypass | Strict validation |
Document maintained by CODITECT Security Team. Feedback: security@coditect.com