Skip to main content

Security and Governance Framework

Securing Agentic AI Systems in Production

Document ID: C3-SECURITY-GOVERNANCE
Version: 1.0
Category: P3 - Technical Deep Dives
Audience: Security Engineers, Compliance Officers, Platform Architects


Executive Summary

Agentic AI systems introduce unique security challenges: autonomous decision-making, tool execution privileges, and memory persistence. This guide covers authentication, authorization, input validation, output safety, and governance frameworks for production deployments.


Part 1: Threat Model

Attack Surface Analysis

Attack VectorRisk LevelMitigation
Prompt injectionHighInput sanitization, instruction hierarchy
JailbreakingHighConstitutional AI, output filtering
Data exfiltrationHighTool sandboxing, output monitoring
Privilege escalationMediumLeast privilege, capability scoping
Denial of serviceMediumRate limiting, resource quotas
Model extractionLowAPI rate limits, watermarking

Threat Categories

threats:
prompt_injection:
description: Malicious prompts that override system instructions
examples:
- "Ignore previous instructions and..."
- Hidden instructions in user data
- Multi-turn manipulation
mitigations:
- Input sanitization
- Instruction hierarchy enforcement
- Output validation

tool_abuse:
description: Misuse of tool capabilities
examples:
- Unauthorized API calls
- Data exfiltration via tools
- Resource exhaustion
mitigations:
- Tool-level authorization
- Sandboxed execution
- Action audit logging

memory_attacks:
description: Poisoning or extracting from memory
examples:
- Injecting false memories
- Extracting sensitive context
- Memory overflow attacks
mitigations:
- Memory validation
- Access controls
- Retention policies

Part 2: Authentication and Authorization

Identity Management

from dataclasses import dataclass
from typing import List, Set, Optional
from enum import Enum
import jwt

class Permission(Enum):
AGENT_EXECUTE = "agent:execute"
AGENT_ADMIN = "agent:admin"
TOOL_READ = "tool:read"
TOOL_WRITE = "tool:write"
TOOL_EXECUTE = "tool:execute"
MEMORY_READ = "memory:read"
MEMORY_WRITE = "memory:write"
AUDIT_READ = "audit:read"

@dataclass
class Identity:
"""User or service identity."""
id: str
type: str # user, service, agent
roles: List[str]
permissions: Set[Permission]
metadata: dict


class AuthenticationService:
"""Handle authentication for agentic systems."""

def __init__(self, jwt_secret: str, token_expiry: int = 3600):
self.jwt_secret = jwt_secret
self.token_expiry = token_expiry

def create_token(self, identity: Identity) -> str:
"""Create JWT token for identity."""
payload = {
"sub": identity.id,
"type": identity.type,
"roles": identity.roles,
"permissions": [p.value for p in identity.permissions],
"exp": time.time() + self.token_expiry
}
return jwt.encode(payload, self.jwt_secret, algorithm="HS256")

def verify_token(self, token: str) -> Identity:
"""Verify and decode JWT token."""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
return Identity(
id=payload["sub"],
type=payload["type"],
roles=payload["roles"],
permissions={Permission(p) for p in payload["permissions"]},
metadata={}
)
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token expired")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")


class AuthorizationService:
"""Handle authorization for agentic operations."""

def __init__(self):
self.role_permissions = {
"admin": {Permission.AGENT_ADMIN, Permission.AGENT_EXECUTE,
Permission.TOOL_READ, Permission.TOOL_WRITE, Permission.TOOL_EXECUTE,
Permission.MEMORY_READ, Permission.MEMORY_WRITE, Permission.AUDIT_READ},
"operator": {Permission.AGENT_EXECUTE, Permission.TOOL_READ,
Permission.TOOL_EXECUTE, Permission.MEMORY_READ},
"viewer": {Permission.TOOL_READ, Permission.MEMORY_READ, Permission.AUDIT_READ}
}

def check_permission(
self,
identity: Identity,
required: Permission,
resource: Optional[str] = None
) -> bool:
"""Check if identity has required permission."""
# Direct permission check
if required in identity.permissions:
return True

# Role-based check
for role in identity.roles:
if role in self.role_permissions:
if required in self.role_permissions[role]:
return True

return False

def enforce_permission(
self,
identity: Identity,
required: Permission,
resource: Optional[str] = None
):
"""Enforce permission, raise if denied."""
if not self.check_permission(identity, required, resource):
raise AuthorizationError(
f"Permission denied: {required.value} for {identity.id}"
)

Tool-Level Authorization

@dataclass
class ToolPermission:
"""Permission configuration for a tool."""
tool_name: str
allowed_roles: List[str]
allowed_identities: List[str]
requires_approval: bool = False
max_calls_per_minute: int = 60
allowed_parameters: Optional[Dict[str, List]] = None


class ToolAuthorizationPolicy:
"""Define and enforce tool access policies."""

def __init__(self):
self.policies: Dict[str, ToolPermission] = {}

def register_policy(self, policy: ToolPermission):
"""Register a tool permission policy."""
self.policies[policy.tool_name] = policy

def authorize_tool_call(
self,
identity: Identity,
tool_name: str,
parameters: Dict
) -> Tuple[bool, Optional[str]]:
"""Authorize a tool call."""
policy = self.policies.get(tool_name)

if not policy:
return False, "No policy defined for tool"

# Check identity
if identity.id not in policy.allowed_identities:
# Check roles
if not any(r in policy.allowed_roles for r in identity.roles):
return False, "Identity not authorized for tool"

# Check parameters
if policy.allowed_parameters:
for param, allowed_values in policy.allowed_parameters.items():
if param in parameters:
if parameters[param] not in allowed_values:
return False, f"Parameter {param} value not allowed"

return True, None

Part 3: Input Validation and Sanitization

Prompt Injection Defense

import re
from typing import Tuple

class InputValidator:
"""Validate and sanitize user inputs."""

# Patterns that may indicate injection attempts
INJECTION_PATTERNS = [
r"ignore\s+(all\s+)?previous\s+instructions",
r"forget\s+(all\s+)?(your\s+)?instructions",
r"you\s+are\s+now\s+a",
r"act\s+as\s+(if\s+you\s+are|a)",
r"pretend\s+(to\s+be|you\s+are)",
r"system\s*:\s*",
r"\[INST\]",
r"<\|im_start\|>",
r"###\s*(instruction|system)",
]

def __init__(self, sensitivity: str = "medium"):
self.sensitivity = sensitivity
self.patterns = [re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS]

def validate_input(self, text: str) -> Tuple[bool, List[str]]:
"""Validate user input for potential injection."""
warnings = []

# Check for injection patterns
for pattern in self.patterns:
if pattern.search(text):
warnings.append(f"Potential injection pattern detected")

# Check for unusual characters
if self._has_unusual_characters(text):
warnings.append("Unusual character sequences detected")

# Check for hidden instructions
if self._has_hidden_instructions(text):
warnings.append("Hidden instruction markers detected")

is_safe = len(warnings) == 0 or self.sensitivity == "low"
return is_safe, warnings

def sanitize_input(self, text: str) -> str:
"""Sanitize user input."""
# Remove control characters
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text)

# Normalize whitespace
text = re.sub(r'\s+', ' ', text)

# Escape potential instruction markers
text = text.replace("[INST]", "[_INST_]")
text = text.replace("<|im_start|>", "<_im_start_>")

return text.strip()

def _has_unusual_characters(self, text: str) -> bool:
"""Check for unusual character patterns."""
# Check for zero-width characters
if re.search(r'[\u200b-\u200f\u2028-\u202f\u2060-\u206f]', text):
return True
# Check for excessive special characters
special_ratio = len(re.findall(r'[^\w\s]', text)) / max(len(text), 1)
return special_ratio > 0.3

def _has_hidden_instructions(self, text: str) -> bool:
"""Check for hidden instruction patterns."""
# Check for base64 encoded content
if re.search(r'[A-Za-z0-9+/]{50,}={0,2}', text):
return True
# Check for hex encoded content
if re.search(r'(?:0x)?[0-9a-fA-F]{40,}', text):
return True
return False

Content Filtering

class ContentFilter:
"""Filter sensitive content from inputs and outputs."""

def __init__(self):
self.pii_patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b",
"api_key": r"\b[A-Za-z0-9_-]{32,}\b"
}

def detect_pii(self, text: str) -> Dict[str, List[str]]:
"""Detect PII in text."""
findings = {}
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, text)
if matches:
findings[pii_type] = matches
return findings

def redact_pii(self, text: str) -> str:
"""Redact PII from text."""
for pii_type, pattern in self.pii_patterns.items():
text = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
return text

def filter_output(self, text: str, allowed_content: List[str]) -> str:
"""Filter output to only allowed content types."""
# Remove code blocks if not allowed
if "code" not in allowed_content:
text = re.sub(r'```[\s\S]*?```', '[CODE REMOVED]', text)

# Remove URLs if not allowed
if "urls" not in allowed_content:
text = re.sub(r'https?://\S+', '[URL REMOVED]', text)

return text

Part 4: Tool Sandboxing

Execution Sandbox

import subprocess
import tempfile
import os
from pathlib import Path

class ToolSandbox:
"""Sandboxed execution environment for tools."""

def __init__(self, config: Dict):
self.config = config
self.workspace = Path(tempfile.mkdtemp())
self.resource_limits = config.get("resource_limits", {})

async def execute_sandboxed(
self,
tool_name: str,
code: str,
timeout: int = 30
) -> Dict[str, Any]:
"""Execute code in sandboxed environment."""

# Write code to temp file
script_path = self.workspace / f"{tool_name}.py"
script_path.write_text(code)

# Build sandboxed command
cmd = self._build_sandbox_command(script_path)

try:
result = subprocess.run(
cmd,
capture_output=True,
timeout=timeout,
cwd=str(self.workspace),
env=self._get_safe_env()
)

return {
"success": result.returncode == 0,
"stdout": result.stdout.decode()[:10000], # Limit output
"stderr": result.stderr.decode()[:10000],
"exit_code": result.returncode
}

except subprocess.TimeoutExpired:
return {
"success": False,
"error": "Execution timeout",
"exit_code": -1
}
finally:
# Cleanup
script_path.unlink(missing_ok=True)

def _build_sandbox_command(self, script_path: Path) -> List[str]:
"""Build command with sandboxing."""
cmd = ["python", str(script_path)]

# Add resource limits if available
if self.resource_limits.get("memory_mb"):
# Use ulimit or cgroups
pass

return cmd

def _get_safe_env(self) -> Dict[str, str]:
"""Get safe environment variables."""
safe_vars = ["PATH", "PYTHONPATH", "HOME", "USER"]
return {k: v for k, v in os.environ.items() if k in safe_vars}


class NetworkPolicy:
"""Control network access for tools."""

def __init__(self):
self.allowed_domains: Set[str] = set()
self.blocked_domains: Set[str] = set()
self.allow_all: bool = False

def allow_domain(self, domain: str):
"""Allow access to domain."""
self.allowed_domains.add(domain)

def block_domain(self, domain: str):
"""Block access to domain."""
self.blocked_domains.add(domain)

def check_access(self, url: str) -> bool:
"""Check if URL access is allowed."""
from urllib.parse import urlparse
domain = urlparse(url).netloc

if domain in self.blocked_domains:
return False

if self.allow_all:
return True

return domain in self.allowed_domains

Part 5: Audit and Compliance

Audit Logging

@dataclass
class AuditEvent:
"""Audit event record."""
event_id: str
timestamp: str
event_type: str
actor_id: str
actor_type: str
resource: str
action: str
outcome: str
details: Dict[str, Any]
ip_address: Optional[str] = None
user_agent: Optional[str] = None


class AuditLogger:
"""Immutable audit logging."""

def __init__(self, storage_backend: str = "database"):
self.storage = storage_backend
self.buffer: List[AuditEvent] = []

async def log_event(
self,
event_type: str,
actor: Identity,
resource: str,
action: str,
outcome: str,
details: Dict = None
):
"""Log an audit event."""
event = AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat(),
event_type=event_type,
actor_id=actor.id,
actor_type=actor.type,
resource=resource,
action=action,
outcome=outcome,
details=details or {}
)

await self._persist_event(event)

async def log_agent_action(
self,
agent_id: str,
action_type: str,
action_details: Dict,
outcome: str
):
"""Log agent action for audit trail."""
await self.log_event(
event_type="agent_action",
actor=Identity(id=agent_id, type="agent", roles=[], permissions=set(), metadata={}),
resource=action_details.get("resource", "unknown"),
action=action_type,
outcome=outcome,
details=action_details
)

async def query_audit_log(
self,
filters: Dict,
start_time: str,
end_time: str,
limit: int = 1000
) -> List[AuditEvent]:
"""Query audit log with filters."""
# Implementation depends on storage backend
pass

Compliance Reporting

class ComplianceReporter:
"""Generate compliance reports."""

def __init__(self, audit_logger: AuditLogger):
self.audit = audit_logger

async def generate_access_report(
self,
start_date: str,
end_date: str
) -> Dict:
"""Generate access control report."""
events = await self.audit.query_audit_log(
filters={"event_type": ["authentication", "authorization"]},
start_time=start_date,
end_time=end_date
)

return {
"period": {"start": start_date, "end": end_date},
"total_access_attempts": len(events),
"successful_authentications": len([e for e in events if e.outcome == "success"]),
"failed_authentications": len([e for e in events if e.outcome == "failure"]),
"unique_users": len(set(e.actor_id for e in events)),
"access_by_resource": self._group_by_resource(events)
}

async def generate_agent_activity_report(
self,
start_date: str,
end_date: str
) -> Dict:
"""Generate agent activity report."""
events = await self.audit.query_audit_log(
filters={"event_type": "agent_action"},
start_time=start_date,
end_time=end_date
)

return {
"period": {"start": start_date, "end": end_date},
"total_actions": len(events),
"actions_by_type": self._group_by_action(events),
"tool_usage": self._analyze_tool_usage(events),
"error_rate": len([e for e in events if e.outcome == "error"]) / max(len(events), 1)
}

Part 6: Governance Framework

Policy Configuration

# governance_policy.yml
governance:
data_retention:
conversation_logs: 90_days
audit_logs: 7_years
memory_data: 30_days

access_controls:
default_deny: true
require_mfa: true
session_timeout: 3600

agent_constraints:
max_iterations: 20
max_tool_calls: 50
require_human_approval:
- financial_transactions
- data_deletion
- external_communications

content_policies:
block_pii_in_logs: true
redact_sensitive_output: true
allowed_output_types:
- text
- code
- structured_data

monitoring:
log_all_llm_calls: true
log_all_tool_calls: true
alert_on_anomalies: true

Quick Reference

Security Checklist

  • Input validation enabled
  • Output filtering configured
  • Tool sandboxing active
  • Audit logging enabled
  • Access controls defined
  • Network policies set
  • Retention policies configured
  • Compliance reporting scheduled

Security Controls by Paradigm

ParadigmKey RisksPrimary Controls
LSRHallucinationOutput validation
GSData leakageSource filtering
EPAction abuseTool authorization
VEProtocol bypassStrict validation

Document maintained by CODITECT Security Team. Feedback: security@coditect.com