Prompt 08: Architecture Decision Record - Integration Framework
Context
You are a principal architect designing the integration framework for CODITECT-COMPLIANCE. This ADR establishes how external systems are connected, credentials managed, and data synchronized across 15+ integration types.
Output Specification
Generate a comprehensive Architecture Decision Record (ADR) following the standard ADR format. The document should be 3,000-4,500 words (9,000-14,000 tokens).
Document Structure
ADR-004: Integration Framework Architecture
# ADR-004: Integration Framework Architecture
## Status
Proposed | Accepted | Deprecated | Superseded
## Date
[Current Date]
## Decision Makers
- [Role: Chief Architect]
- [Role: Security Architect]
- [Role: Platform Engineering Lead]
## Context
### Problem Statement
CODITECT-COMPLIANCE must integrate with 15+ external systems at launch, expanding to 300+ integrations over time:
**Tier 1 (Launch)**: AWS, GCP, Azure, Okta, GitHub, Jira, Slack
**Tier 2 (Growth)**: GitLab, Linear, Azure AD, Google Workspace, Datadog
**Tier 3 (Enterprise)**: ServiceNow, Workday, SAP, Salesforce, custom APIs
### Integration Challenges
| Challenge | Description | Impact |
|-----------|-------------|--------|
| **Credential Security** | Store OAuth tokens, API keys, IAM roles | Data breach risk |
| **Rate Limiting** | Each provider has different limits | Service disruption |
| **Schema Variance** | No standard across providers | Complex normalization |
| **Authentication Diversity** | OAuth 2.0, API keys, SAML, IAM roles | Implementation complexity |
| **Webhook Reliability** | Providers may retry, deliver out of order | Duplicate/missing events |
| **Versioning** | APIs evolve, breaking changes | Maintenance burden |
| **Multi-tenancy** | Each org has own credentials | Isolation requirements |
### Integration Patterns Required
1. **Pull (Polling)**: Scheduled data fetches (IAM users, configs)
2. **Push (Webhooks)**: Real-time event notifications (code pushes, tickets)
3. **Bidirectional**: Query + write back (create tickets, update configs)
4. **Streaming**: Continuous log forwarding (SIEM, monitoring)
### Technical Constraints
- Credentials encrypted at rest (AES-256)
- Network isolation between tenants
- Support for customer-managed credentials (BYOC)
- Integration health monitoring required
- Must handle provider outages gracefully
## Decision Drivers
1. **Security**: Zero credential exposure in logs/errors
2. **Reliability**: Graceful degradation during outages
3. **Extensibility**: New integrations without core changes
4. **Performance**: Efficient polling without overwhelming providers
5. **Observability**: Monitor integration health and usage
6. **Developer Experience**: Easy to add new connectors
## Options Considered
### Option 1: Monolithic Integration Service
**Description**: Single service handles all integrations with provider-specific logic.
**Pros**:
- Simple deployment
- Shared infrastructure code
- Easy debugging
**Cons**:
- Single point of failure
- Blast radius of bugs affects all integrations
- Difficult to scale hot integrations independently
### Option 2: Plugin-Based Connector Architecture
**Description**: Core integration service with dynamically-loaded connector plugins.
**Pros**:
- Modular development
- Independent connector testing
- Shared core infrastructure
**Cons**:
- Plugin interface design critical
- Dynamic loading complexity
- Still single deployment unit
### Option 3: Microservice per Integration Type
**Description**: Each integration type deployed as separate microservice.
**Pros**:
- Independent scaling
- Failure isolation
- Team ownership clarity
**Cons**:
- Infrastructure overhead (15+ services)
- Cross-cutting concerns duplication
- Complex orchestration
### Option 4: Hybrid - Core Service + Connector Pool
**Description**: Core service handles orchestration; connector pool executes provider-specific logic.
┌─────────────────────┐ │ Integration Service │ ← Orchestration, scheduling, credentials └─────────┬───────────┘ │ ▼ ┌─────────────────────────────────────────────┐ │ Connector Worker Pool │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │AWS │ │Okta │ │GitHub │ ... │ │ │Workers │ │Workers │ │Workers │ │ │ └─────────┘ └─────────┘ └─────────┘ │ └─────────────────────────────────────────────┘
**Pros**:
- Centralized orchestration
- Scalable worker pool
- Failure isolation per provider
- Shared infrastructure
**Cons**:
- Worker pool management complexity
- Need robust job distribution
## Decision
**Chosen Option**: Option 4 - Hybrid Core Service + Connector Pool
### Rationale
1. **Centralized orchestration** simplifies credential management and scheduling
2. **Worker pool** allows scaling by provider demand
3. **Connector isolation** prevents one provider's issues from affecting others
4. **Shared infrastructure** reduces duplication
## Detailed Design
### Architecture Overview
┌──────────────────────────────────────────────────────────────────────────────┐ │ Integration Framework │ └──────────────────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────────────┐ │ API Layer │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ Integration │ │ Credential │ │ Webhook │ │ │ │ Management API │ │ Management API │ │ Receiver API │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ └──────────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────────┐ │ Integration Service (Core) │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ Scheduler │ │ Credential │ │ Health │ │ │ │ (Polling Jobs) │ │ Vault │ │ Monitor │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ Rate Limiter │ │ Circuit │ │ Metrics │ │ │ │ │ │ Breaker │ │ Collector │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ └──────────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────────────────────┐ │ Job Queue (Redis Streams) │ │ ┌────────────────────────────────────────────────────────────────────┐ │ │ │ integrations.jobs.{provider_type}.{priority} │ │ │ └────────────────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────────────────┘ │ ┌───────────────┼───────────────┐ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ AWS Worker │ │ Okta Worker │ │ GitHub │ │ Pool (N) │ │ Pool (N) │ │ Worker Pool │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────┐ │ External Providers │ │ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ │ │ │ AWS │ │ Okta │ │ GitHub│ │ Jira │ │ │ └───────┘ └───────┘ └───────┘ └───────┘ │ └─────────────────────────────────────────────────┘
### Connector Interface
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Any, AsyncIterator, Optional
from enum import Enum
from datetime import datetime
class AuthType(Enum):
OAUTH2 = "oauth2"
API_KEY = "api_key"
IAM_ROLE = "iam_role"
SAML = "saml"
BASIC = "basic"
CUSTOM = "custom"
class IntegrationCapability(Enum):
EVIDENCE_COLLECTION = "evidence_collection"
WEBHOOK_RECEIVER = "webhook_receiver"
BIDIRECTIONAL = "bidirectional"
STREAMING = "streaming"
@dataclass
class ConnectorMetadata:
"""Metadata about a connector implementation."""
provider_type: str # "aws", "okta", "github"
display_name: str # "Amazon Web Services"
description: str
version: str
auth_types: List[AuthType]
capabilities: List[IntegrationCapability]
required_scopes: List[str] # OAuth scopes or IAM permissions
rate_limits: Dict[str, int] # {"requests_per_second": 10}
webhook_events: List[str] # Events this connector handles
documentation_url: str
@dataclass
class IntegrationCredentials:
"""Encrypted credentials for an integration."""
id: str
organization_id: str
provider_type: str
auth_type: AuthType
# Encrypted credential data (decrypted at runtime)
encrypted_data: bytes
encryption_key_id: str
# OAuth-specific
access_token: Optional[str] = None
refresh_token: Optional[str] = None
token_expires_at: Optional[datetime] = None
# Metadata
created_at: datetime
last_used_at: Optional[datetime] = None
last_rotated_at: Optional[datetime] = None
@dataclass
class ConnectionTestResult:
"""Result of testing integration connectivity."""
success: bool
provider_type: str
latency_ms: int
permissions_verified: List[str]
permissions_missing: List[str]
error_message: Optional[str] = None
error_code: Optional[str] = None
class BaseConnector(ABC):
"""Base class for all integration connectors."""
@property
@abstractmethod
def metadata(self) -> ConnectorMetadata:
"""Return connector metadata."""
pass
@abstractmethod
async def test_connection(
self,
credentials: IntegrationCredentials
) -> ConnectionTestResult:
"""
Test connectivity and permissions.
Should verify:
1. Credentials are valid
2. Required permissions are granted
3. API is reachable
"""
pass
@abstractmethod
async def collect_evidence(
self,
credentials: IntegrationCredentials,
config: CollectionConfig
) -> AsyncIterator[RawEvidenceItem]:
"""
Collect evidence from the provider.
Must:
- Handle pagination internally
- Respect rate limits
- Yield items as they're collected
- Handle partial failures gracefully
"""
pass
async def handle_webhook(
self,
payload: Dict[str, Any],
headers: Dict[str, str],
credentials: IntegrationCredentials
) -> List[RawEvidenceItem]:
"""
Process incoming webhook.
Default implementation raises NotImplementedError.
Override in connectors that support webhooks.
"""
raise NotImplementedError(
f"{self.metadata.provider_type} does not support webhooks"
)
async def execute_action(
self,
action: str,
parameters: Dict[str, Any],
credentials: IntegrationCredentials
) -> ActionResult:
"""
Execute an action on the provider (e.g., create ticket).
Default implementation raises NotImplementedError.
Override in connectors that support bidirectional operations.
"""
raise NotImplementedError(
f"{self.metadata.provider_type} does not support actions"
)
Credential Management
from cryptography.fernet import Fernet
from google.cloud import secretmanager
import json
class CredentialVault:
"""
Secure credential storage and retrieval.
Architecture:
- Master encryption key stored in GCP Secret Manager
- Credentials encrypted with per-organization keys
- Keys rotated automatically every 90 days
- Audit log for all credential access
"""
def __init__(
self,
secret_manager: secretmanager.SecretManagerServiceClient,
fdb_client: FoundationDBClient,
audit_logger: AuditLogger
):
self.secrets = secret_manager
self.fdb = fdb_client
self.audit = audit_logger
async def store_credentials(
self,
organization_id: str,
provider_type: str,
credentials: Dict[str, Any],
auth_type: AuthType
) -> IntegrationCredentials:
"""
Store encrypted credentials.
Steps:
1. Get or create organization encryption key
2. Encrypt credential data
3. Store encrypted blob in FoundationDB
4. Log credential creation
"""
# Get organization's encryption key
key_id, key = await self._get_or_create_org_key(organization_id)
# Encrypt credentials
fernet = Fernet(key)
encrypted_data = fernet.encrypt(
json.dumps(credentials).encode()
)
# Create credential record
cred = IntegrationCredentials(
id=str(uuid.uuid4()),
organization_id=organization_id,
provider_type=provider_type,
auth_type=auth_type,
encrypted_data=encrypted_data,
encryption_key_id=key_id,
created_at=datetime.utcnow()
)
# Store in FoundationDB
await self._store_credential_record(cred)
# Audit log
await self.audit.log(
event_type="credential_created",
organization_id=organization_id,
details={
"credential_id": cred.id,
"provider_type": provider_type,
"auth_type": auth_type.value
}
)
return cred
async def retrieve_credentials(
self,
credential_id: str,
requester: str
) -> Dict[str, Any]:
"""
Retrieve and decrypt credentials.
Access is logged for audit trail.
"""
# Fetch encrypted record
cred = await self._get_credential_record(credential_id)
# Get decryption key
key = await self._get_org_key(
cred.organization_id,
cred.encryption_key_id
)
# Decrypt
fernet = Fernet(key)
decrypted = fernet.decrypt(cred.encrypted_data)
credentials = json.loads(decrypted)
# Audit log
await self.audit.log(
event_type="credential_accessed",
organization_id=cred.organization_id,
details={
"credential_id": credential_id,
"provider_type": cred.provider_type,
"requester": requester
}
)
# Update last used
await self._update_last_used(credential_id)
return credentials
async def rotate_credentials(
self,
credential_id: str
) -> IntegrationCredentials:
"""
Rotate credentials (re-encrypt with new key).
"""
# Implementation for key rotation
pass
async def _get_or_create_org_key(
self,
organization_id: str
) -> tuple[str, bytes]:
"""Get or create organization encryption key from Secret Manager."""
secret_name = f"projects/coditect/secrets/org-{organization_id}-key"
try:
# Try to get existing key
response = self.secrets.access_secret_version(
request={"name": f"{secret_name}/versions/latest"}
)
return response.name.split("/")[-1], response.payload.data
except Exception:
# Create new key
key = Fernet.generate_key()
# Store in Secret Manager
self.secrets.create_secret(
request={
"parent": "projects/coditect",
"secret_id": f"org-{organization_id}-key",
"secret": {"replication": {"automatic": {}}}
}
)
version = self.secrets.add_secret_version(
request={
"parent": secret_name,
"payload": {"data": key}
}
)
return version.name.split("/")[-1], key
Rate Limiting
from dataclasses import dataclass
from typing import Dict
import asyncio
import time
@dataclass
class RateLimitConfig:
"""Rate limit configuration per provider."""
requests_per_second: float
requests_per_minute: int
requests_per_hour: int
burst_size: int # Max concurrent requests
class TokenBucketRateLimiter:
"""
Token bucket rate limiter with Redis backing.
Supports:
- Per-organization limits
- Per-provider limits
- Burst allowance
- Distributed coordination via Redis
"""
def __init__(
self,
redis: Redis,
config: Dict[str, RateLimitConfig]
):
self.redis = redis
self.config = config
async def acquire(
self,
organization_id: str,
provider_type: str,
weight: int = 1
) -> bool:
"""
Acquire rate limit tokens.
Returns True if request can proceed, False if rate limited.
"""
config = self.config.get(provider_type, self._default_config())
key = f"ratelimit:{provider_type}:{organization_id}"
# Lua script for atomic token bucket
script = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local data = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(data[1]) or capacity
local last_update = tonumber(data[2]) or now
-- Add tokens based on time elapsed
local elapsed = now - last_update
tokens = math.min(capacity, tokens + elapsed * rate)
-- Check if we have enough tokens
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return 1
else
return 0
end
"""
result = await self.redis.eval(
script,
keys=[key],
args=[
config.requests_per_second,
config.burst_size,
time.time(),
weight
]
)
return bool(result)
async def wait_for_token(
self,
organization_id: str,
provider_type: str,
timeout: float = 30.0
) -> bool:
"""Wait until rate limit token is available."""
start = time.time()
while time.time() - start < timeout:
if await self.acquire(organization_id, provider_type):
return True
await asyncio.sleep(0.1)
return False
class AdaptiveRateLimiter:
"""
Adaptive rate limiter that adjusts based on provider responses.
Learns optimal rate from:
- 429 responses (reduce rate)
- Successful responses (increase rate)
- Response latency (adjust for congestion)
"""
def __init__(self, base_limiter: TokenBucketRateLimiter):
self.base = base_limiter
self.rate_adjustments: Dict[str, float] = {}
async def record_response(
self,
provider_type: str,
status_code: int,
latency_ms: int
):
"""Adjust rate based on response."""
key = f"adjustment:{provider_type}"
current = self.rate_adjustments.get(key, 1.0)
if status_code == 429:
# Rate limited - reduce by 50%
self.rate_adjustments[key] = current * 0.5
elif status_code >= 500:
# Server error - reduce by 25%
self.rate_adjustments[key] = current * 0.75
elif status_code == 200 and latency_ms < 500:
# Success and fast - slowly increase
self.rate_adjustments[key] = min(1.0, current * 1.1)
Health Monitoring
@dataclass
class IntegrationHealth:
"""Health status for an integration."""
integration_id: str
provider_type: str
organization_id: str
status: str # "healthy", "degraded", "unhealthy"
last_check: datetime
last_success: Optional[datetime]
last_failure: Optional[datetime]
# Metrics
success_rate_1h: float
avg_latency_ms: int
error_count_1h: int
# Recent errors
recent_errors: List[Dict[str, Any]]
class IntegrationHealthMonitor:
"""Monitor and report integration health."""
def __init__(
self,
metrics: MetricsCollector,
alerting: AlertingService
):
self.metrics = metrics
self.alerting = alerting
async def record_request(
self,
integration_id: str,
provider_type: str,
organization_id: str,
success: bool,
latency_ms: int,
error: Optional[str] = None
):
"""Record integration request for health tracking."""
labels = {
"integration_id": integration_id,
"provider_type": provider_type,
"organization_id": organization_id
}
# Record metrics
self.metrics.increment(
"integration_requests_total",
labels=labels,
value=1
)
self.metrics.observe(
"integration_latency_ms",
labels=labels,
value=latency_ms
)
if not success:
self.metrics.increment(
"integration_errors_total",
labels={**labels, "error": error or "unknown"}
)
# Check if alerting threshold reached
await self._check_alert_threshold(
integration_id,
provider_type,
organization_id
)
async def get_health(
self,
integration_id: str
) -> IntegrationHealth:
"""Get current health status for integration."""
# Query metrics for health calculation
pass
async def _check_alert_threshold(
self,
integration_id: str,
provider_type: str,
organization_id: str
):
"""Check if error rate exceeds alert threshold."""
error_rate = await self._get_error_rate_1h(integration_id)
if error_rate > 0.5: # 50% error rate
await self.alerting.send_alert(
severity="critical",
title=f"Integration {provider_type} critically degraded",
message=f"Error rate: {error_rate:.1%}",
organization_id=organization_id,
integration_id=integration_id
)
elif error_rate > 0.1: # 10% error rate
await self.alerting.send_alert(
severity="warning",
title=f"Integration {provider_type} degraded",
message=f"Error rate: {error_rate:.1%}",
organization_id=organization_id,
integration_id=integration_id
)
Webhook Handling
from fastapi import Request, HTTPException
from typing import Callable, Dict
import hmac
import hashlib
class WebhookRouter:
"""Route incoming webhooks to appropriate handlers."""
def __init__(self):
self.handlers: Dict[str, Callable] = {}
self.signature_validators: Dict[str, Callable] = {}
def register_handler(
self,
provider_type: str,
handler: Callable,
signature_validator: Optional[Callable] = None
):
"""Register webhook handler for provider."""
self.handlers[provider_type] = handler
if signature_validator:
self.signature_validators[provider_type] = signature_validator
async def handle_webhook(
self,
provider_type: str,
request: Request
) -> Dict[str, Any]:
"""
Handle incoming webhook.
Steps:
1. Validate signature (provider-specific)
2. Parse payload
3. Route to handler
4. Return acknowledgment
"""
if provider_type not in self.handlers:
raise HTTPException(404, f"Unknown provider: {provider_type}")
body = await request.body()
headers = dict(request.headers)
# Validate signature
if provider_type in self.signature_validators:
validator = self.signature_validators[provider_type]
if not await validator(body, headers):
raise HTTPException(401, "Invalid webhook signature")
# Parse and handle
payload = json.loads(body)
handler = self.handlers[provider_type]
return await handler(payload, headers)
# Provider-specific signature validation
async def validate_github_signature(body: bytes, headers: Dict) -> bool:
"""Validate GitHub webhook signature."""
signature = headers.get("x-hub-signature-256", "")
if not signature.startswith("sha256="):
return False
secret = await get_github_webhook_secret()
expected = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
async def validate_okta_signature(body: bytes, headers: Dict) -> bool:
"""Validate Okta webhook signature."""
# Okta uses different verification method
pass
Consequences
Positive
- Security: Credentials encrypted with per-org keys in Secret Manager
- Reliability: Circuit breakers and health monitoring prevent cascading failures
- Scalability: Worker pool scales with demand per provider
- Extensibility: New connectors follow standard interface
- Observability: Comprehensive metrics and health tracking
Negative
- Complexity: Multiple components to coordinate
- Latency: Rate limiting adds potential delays
- Cost: Secret Manager and Redis infrastructure costs
Mitigations
- Implement comprehensive integration dashboard
- Pre-warm rate limit tokens for scheduled jobs
- Cache credentials in memory with short TTL
Implementation Plan
Phase 1: Core Framework (Week 1-2)
- Implement BaseConnector interface
- Set up CredentialVault with Secret Manager
- Create rate limiter infrastructure
Phase 2: First Connectors (Week 3-4)
- AWS Connector
- Okta Connector
- GitHub Connector
Phase 3: Webhook Support (Week 5)
- Webhook router implementation
- Signature validation per provider
- Idempotency handling
Phase 4: Health Monitoring (Week 6)
- Metrics collection
- Health dashboard
- Alerting integration
Validation Criteria
- Security: Zero credential exposure in logs/errors
- Reliability: 99.9% webhook delivery success
- Performance: < 100ms rate limit check latency
- Coverage: All Tier 1 integrations functional
- Monitoring: Real-time health status for all integrations
References
## Acceptance Criteria
1. **Interface Design**: Complete connector interface with all methods
2. **Credential Security**: Encryption and vault patterns documented
3. **Rate Limiting**: Token bucket implementation with adaptive learning
4. **Webhook Handling**: Signature validation per provider type
5. **Health Monitoring**: Metrics and alerting integration
## Token Budget
- Target: 10,000-16,000 tokens
- Priority: Connector interface and credential management sections
## Dependencies
- Input: PRD integration requirements (FR-IN-*)
- Input: SDD integration service container
- Output: Feeds into component build prompts for Integration Connectors
## Integration Points
This ADR establishes patterns used by:
- Evidence Collection ADR (connector data flow)
- All connector build prompts
- Agent tools for integration queries