#!/usr/bin/env python3 """ Codanna MCP Security Wrapper
A secure wrapper for codanna MCP server that provides:
- P0: Multi-tenant index isolation
- P0: MCP input sanitization (JSON injection protection)
- P0: Network namespace isolation (optional)
- P0: Process resource limits
Architecture: Claude Code --> This Wrapper --> codanna serve --watch | +-- Input validation +-- Tenant isolation +-- Audit logging +-- Resource limits
Usage: # Direct invocation (for testing) python3 codanna-mcp-wrapper.py --tenant-id abc123
# MCP configuration (~/.claude/mcp.json)
{
"mcpServers": {
"codanna": {
"command": "python3",
"args": ["/path/to/codanna-mcp-wrapper.py", "--tenant-id", "${TENANT_ID}"]
}
}
}
Security (ADR-065): - All MCP messages validated before forwarding - Tenant-isolated index directories - JSON injection patterns blocked - Optional network/resource sandboxing
Author: CODITECT (ADR-065) Version: 1.0.0 """
import argparse import json import logging import os import re import subprocess import sys import threading import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Optional
Configure logging
logging.basicConfig( level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [codanna-wrapper] %(message)s', stream=sys.stderr ) logger = logging.getLogger(name)
=============================================================================
Configuration
=============================================================================
@dataclass class SecurityConfig: """Security configuration for the wrapper."""
# Input validation
max_query_length: int = 10000
max_message_size: int = 1_000_000 # 1MB
blocked_patterns: list = field(default_factory=lambda: [
r'[{}\[\]"\\]', # JSON control characters in queries
r'\x00', # Null bytes
r'\.\./', # Path traversal
r'~/', # Home directory references
])
# Rate limiting
max_requests_per_minute: int = 100
rate_limit_window_seconds: int = 60
# Resource limits (Linux only)
memory_limit_mb: int = 512
cpu_quota_percent: int = 50
# Network isolation (Linux only)
enable_network_isolation: bool = True
# Tenant isolation
index_base_path: str = ".coditect/.codanna"
index_permissions: int = 0o700
@dataclass class TenantContext: """Tenant context for multi-tenant isolation."""
tenant_id: str
user_id: Optional[str] = None
team_id: Optional[str] = None
project_id: Optional[str] = None
workspace_root: Optional[Path] = None
def __post_init__(self):
# Validate tenant_id format
if not re.match(r'^[a-zA-Z0-9_-]{1,64}$', self.tenant_id):
raise ValueError(f"Invalid tenant_id format: {self.tenant_id}")
@property
def index_path(self) -> Path:
"""Get the tenant-isolated index path."""
base = Path(self.workspace_root or Path.cwd())
return base / ".coditect" / ".codanna" / self.tenant_id
def ensure_index_directory(self, permissions: int = 0o700) -> Path:
"""Create and secure the tenant index directory."""
index_path = self.index_path
index_path.mkdir(parents=True, exist_ok=True)
# Set restrictive permissions
os.chmod(index_path, permissions)
logger.info(f"Tenant index directory: {index_path} (mode: {oct(permissions)})")
return index_path
=============================================================================
Input Validation
=============================================================================
class InputValidator: """Validates and sanitizes MCP input messages."""
def __init__(self, config: SecurityConfig):
self.config = config
self.compiled_patterns = [
re.compile(pattern) for pattern in config.blocked_patterns
]
def validate_message(self, message: dict) -> tuple[bool, Optional[str]]:
"""
Validate an MCP message.
Returns:
(is_valid, error_message)
"""
# Check message size
message_str = json.dumps(message)
if len(message_str) > self.config.max_message_size:
return False, f"Message exceeds size limit ({len(message_str)} > {self.config.max_message_size})"
# Validate method
method = message.get("method", "")
if not isinstance(method, str) or len(method) > 100:
return False, "Invalid method field"
# Validate params
params = message.get("params", {})
if not isinstance(params, dict):
return False, "Params must be a dictionary"
# Check for injection in query parameters
if "query" in params:
query = params["query"]
valid, error = self.validate_query(query)
if not valid:
return False, error
# Check for path traversal in any path parameters
for key in ["path", "file", "directory", "index_path"]:
if key in params:
valid, error = self.validate_path(params[key])
if not valid:
return False, f"Invalid {key}: {error}"
return True, None
def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
"""Validate a search query string."""
if not isinstance(query, str):
return False, "Query must be a string"
if len(query) > self.config.max_query_length:
return False, f"Query exceeds length limit ({len(query)} > {self.config.max_query_length})"
# Check for blocked patterns
for pattern in self.compiled_patterns:
if pattern.search(query):
logger.warning(f"Blocked pattern detected in query: {pattern.pattern}")
return False, f"Query contains blocked pattern"
return True, None
def validate_path(self, path: str) -> tuple[bool, Optional[str]]:
"""Validate a file path parameter."""
if not isinstance(path, str):
return False, "Path must be a string"
# Check for path traversal
if ".." in path:
return False, "Path traversal detected"
# Check for absolute paths outside workspace
if path.startswith("/") and not path.startswith("/workspace"):
return False, "Absolute path outside workspace"
# Check for home directory references
if path.startswith("~"):
return False, "Home directory reference not allowed"
return True, None
def sanitize_for_logging(self, message: dict) -> dict:
"""Create a sanitized copy of message for logging (redact sensitive data)."""
sanitized = message.copy()
# Redact potentially sensitive fields
if "params" in sanitized:
params = sanitized["params"].copy()
for key in ["content", "query"]:
if key in params and isinstance(params[key], str) and len(params[key]) > 100:
params[key] = params[key][:100] + "...[truncated]"
sanitized["params"] = params
return sanitized
=============================================================================
Rate Limiting
=============================================================================
class RateLimiter: """Simple token bucket rate limiter."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: list[float] = []
self.lock = threading.Lock()
def allow_request(self) -> bool:
"""Check if a request should be allowed."""
now = time.time()
cutoff = now - self.window_seconds
with self.lock:
# Remove old requests
self.requests = [t for t in self.requests if t > cutoff]
# Check limit
if len(self.requests) >= self.max_requests:
return False
# Record request
self.requests.append(now)
return True
def get_stats(self) -> dict:
"""Get current rate limit stats."""
now = time.time()
cutoff = now - self.window_seconds
with self.lock:
current = len([t for t in self.requests if t > cutoff])
return {
"current_requests": current,
"max_requests": self.max_requests,
"window_seconds": self.window_seconds,
"remaining": self.max_requests - current
}
=============================================================================
Audit Logging
=============================================================================
class AuditLogger: """Structured audit logging for security events."""
def __init__(self, tenant: TenantContext):
self.tenant = tenant
self.log_file = tenant.index_path / "audit.jsonl" if tenant.workspace_root else None
def log_event(self, event_type: str, details: dict, success: bool = True):
"""Log a security event."""
event = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"tenant_id": self.tenant.tenant_id,
"user_id": self.tenant.user_id,
"team_id": self.tenant.team_id,
"event_type": event_type,
"success": success,
"details": details
}
# Log to stderr
level = logging.INFO if success else logging.WARNING
logger.log(level, f"AUDIT: {event_type} - {json.dumps(details)}")
# Log to file if configured
if self.log_file:
try:
self.log_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.log_file, "a") as f:
f.write(json.dumps(event) + "\n")
except Exception as e:
logger.error(f"Failed to write audit log: {e}")
def log_request(self, method: str, params: dict, allowed: bool):
"""Log an MCP request."""
self.log_event("mcp_request", {
"method": method,
"param_keys": list(params.keys()) if params else [],
}, success=allowed)
def log_validation_failure(self, reason: str, message: dict):
"""Log a validation failure."""
self.log_event("validation_failure", {
"reason": reason,
"method": message.get("method", "unknown"),
}, success=False)
def log_rate_limit(self, stats: dict):
"""Log a rate limit event."""
self.log_event("rate_limit_exceeded", stats, success=False)
=============================================================================
Process Management
=============================================================================
class CodannaProcess: """Manages the codanna subprocess with security controls."""
def __init__(
self,
tenant: TenantContext,
config: SecurityConfig,
codanna_path: str = "codanna"
):
self.tenant = tenant
self.config = config
self.codanna_path = codanna_path
self.process: Optional[subprocess.Popen] = None
def build_command(self) -> list[str]:
"""Build the codanna command with security options."""
cmd = []
# Network isolation (Linux only)
if self.config.enable_network_isolation and sys.platform == "linux":
cmd.extend(["unshare", "--net"])
# Resource limits via systemd-run (Linux only)
if sys.platform == "linux":
cmd.extend([
"systemd-run",
"--user",
"--scope",
f"--property=MemoryMax={self.config.memory_limit_mb}M",
f"--property=CPUQuota={self.config.cpu_quota_percent}%",
"--quiet",
])
# Codanna command
cmd.extend([
self.codanna_path,
"serve",
"--watch",
])
return cmd
def build_environment(self) -> dict:
"""Build environment variables for codanna."""
env = os.environ.copy()
# Set tenant-specific index path
env["CODANNA_INDEX_PATH"] = str(self.tenant.index_path)
# Set logging level
env["RUST_LOG"] = "info"
# Disable any network features
env["CODANNA_DISABLE_HTTP"] = "1"
return env
def start(self):
"""Start the codanna process."""
# Ensure tenant index directory exists
self.tenant.ensure_index_directory(self.config.index_permissions)
cmd = self.build_command()
env = self.build_environment()
logger.info(f"Starting codanna: {' '.join(cmd)}")
logger.info(f"Index path: {self.tenant.index_path}")
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr,
env=env,
cwd=str(self.tenant.workspace_root or Path.cwd()),
)
logger.info(f"Codanna started (PID: {self.process.pid})")
def stop(self):
"""Stop the codanna process."""
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
logger.info("Codanna stopped")
def send_message(self, message: dict) -> Optional[dict]:
"""Send a message to codanna and get the response."""
if not self.process or self.process.poll() is not None:
raise RuntimeError("Codanna process is not running")
# Send message
message_bytes = (json.dumps(message) + "\n").encode("utf-8")
self.process.stdin.write(message_bytes)
self.process.stdin.flush()
# Read response
response_line = self.process.stdout.readline()
if response_line:
return json.loads(response_line.decode("utf-8"))
return None
=============================================================================
Main Wrapper
=============================================================================
class CodannaMCPWrapper: """Main MCP wrapper that ties everything together."""
def __init__(
self,
tenant: TenantContext,
config: Optional[SecurityConfig] = None
):
self.tenant = tenant
self.config = config or SecurityConfig()
self.validator = InputValidator(self.config)
self.rate_limiter = RateLimiter(
self.config.max_requests_per_minute,
self.config.rate_limit_window_seconds
)
self.audit = AuditLogger(tenant)
self.codanna = CodannaProcess(tenant, self.config)
def handle_message(self, message: dict) -> dict:
"""Handle an incoming MCP message with security checks."""
method = message.get("method", "unknown")
params = message.get("params", {})
msg_id = message.get("id")
# Rate limiting
if not self.rate_limiter.allow_request():
stats = self.rate_limiter.get_stats()
self.audit.log_rate_limit(stats)
return self._error_response(msg_id, -32000, "Rate limit exceeded")
# Input validation
valid, error = self.validator.validate_message(message)
if not valid:
self.audit.log_validation_failure(error, message)
return self._error_response(msg_id, -32602, f"Validation failed: {error}")
# Log the request
self.audit.log_request(method, params, allowed=True)
# Forward to codanna
try:
response = self.codanna.send_message(message)
return response or self._error_response(msg_id, -32603, "No response from codanna")
except Exception as e:
logger.error(f"Error forwarding to codanna: {e}")
return self._error_response(msg_id, -32603, f"Internal error: {str(e)}")
def _error_response(self, msg_id: Any, code: int, message: str) -> dict:
"""Create a JSON-RPC error response."""
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": code,
"message": message
}
}
def run(self):
"""Run the wrapper as an MCP stdio server."""
logger.info(f"Starting Codanna MCP Wrapper for tenant: {self.tenant.tenant_id}")
# Start codanna
self.codanna.start()
try:
# Process stdin/stdout
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
message = json.loads(line)
response = self.handle_message(message)
print(json.dumps(response), flush=True)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
error = self._error_response(None, -32700, "Parse error")
print(json.dumps(error), flush=True)
except KeyboardInterrupt:
logger.info("Interrupted")
finally:
self.codanna.stop()
=============================================================================
CLI Entry Point
=============================================================================
def main(): parser = argparse.ArgumentParser( description="Codanna MCP Security Wrapper", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run with tenant isolation python3 codanna-mcp-wrapper.py --tenant-id tenant123
# Run with all context
python3 codanna-mcp-wrapper.py \\
--tenant-id tenant123 \\
--user-id user456 \\
--team-id team789 \\
--workspace /path/to/project
Security Features: - Input validation (JSON injection protection) - Rate limiting (100 req/min default) - Multi-tenant index isolation - Network namespace isolation (Linux) - Process resource limits (Linux) - Audit logging """ )
parser.add_argument(
"--tenant-id",
required=True,
help="Tenant ID for isolation"
)
parser.add_argument(
"--user-id",
help="User ID for audit logging"
)
parser.add_argument(
"--team-id",
help="Team ID for audit logging"
)
parser.add_argument(
"--project-id",
help="Project ID for context"
)
parser.add_argument(
"--workspace",
type=Path,
help="Workspace root directory"
)
parser.add_argument(
"--codanna-path",
default="codanna",
help="Path to codanna executable"
)
parser.add_argument(
"--max-requests-per-minute",
type=int,
default=100,
help="Rate limit (default: 100)"
)
parser.add_argument(
"--disable-network-isolation",
action="store_true",
help="Disable network namespace isolation"
)
parser.add_argument(
"--memory-limit-mb",
type=int,
default=512,
help="Memory limit in MB (default: 512)"
)
parser.add_argument(
"--cpu-quota-percent",
type=int,
default=50,
help="CPU quota percentage (default: 50)"
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug logging"
)
args = parser.parse_args()
# Configure logging level
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Create tenant context
tenant = TenantContext(
tenant_id=args.tenant_id,
user_id=args.user_id,
team_id=args.team_id,
project_id=args.project_id,
workspace_root=args.workspace or Path.cwd()
)
# Create security config
config = SecurityConfig(
max_requests_per_minute=args.max_requests_per_minute,
enable_network_isolation=not args.disable_network_isolation,
memory_limit_mb=args.memory_limit_mb,
cpu_quota_percent=args.cpu_quota_percent,
)
# Run wrapper
wrapper = CodannaMCPWrapper(tenant, config)
wrapper.run()
if name == "main": main()