Skip to main content

Technical Design Document: CODITECT Multi-Tenant SaaS Platform

Document Version: 1.0.0 Date: December 24, 2025 Status: Draft Owner: AZ1.AI INC Lead Architect: Hal Casteel


1. Introduction and Purpose

1.1 Overview

The CODITECT Multi-Tenant SaaS Platform provides an enterprise-grade development environment with intelligent AI agents, delivered through isolated cloud workstations. This document specifies the technical architecture, data models, API contracts, and integration patterns required to implement the production system.

1.2 Goals

  • Multi-Tenant Isolation: Secure data separation across organizations using PostgreSQL Row-Level Security (RLS)
  • Scalable Infrastructure: Serverless APIs with per-user Cloud Workstation provisioning
  • Subscription Management: Stripe-integrated billing with tier-based access control
  • Authentication: Firebase Auth with JWT-based API authorization
  • High Availability: 99.9% uptime SLA for API and workstation services

1.3 Non-Goals

  • On-premise deployment options
  • Multi-cloud vendor support (GCP-only for MVP)
  • Self-service admin console (Phase 2)

1.4 Assumptions

  • Users have Google accounts for Firebase Auth
  • Organizations accept credit card payments via Stripe
  • GCP Cloud Workstations API is generally available in target regions

2. Technical Architecture

2.1 System Components

6.1.11 context_messages (User Context Sync)

Architecture Reference: ADR-044 (Custom REST Sync)

CREATE TABLE context_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES organizations(id),
user_id UUID NOT NULL REFERENCES users(id),
device_id VARCHAR(255) NOT NULL,

-- Content
message_type VARCHAR(50) NOT NULL, -- 'decision', 'pattern', 'error', 'insight'
content TEXT NOT NULL,
content_hash VARCHAR(64) NOT NULL, -- SHA256 for deduplication
metadata JSONB DEFAULT '{}',

-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

CONSTRAINT unique_user_content UNIQUE(user_id, content_hash)
);

CREATE INDEX idx_context_messages_user_id ON context_messages(user_id);
CREATE INDEX idx_context_messages_tenant_id ON context_messages(tenant_id);
CREATE INDEX idx_context_messages_created_at ON context_messages(created_at DESC);
CREATE INDEX idx_context_messages_type ON context_messages(message_type);

6.1.12 team_contexts (Team Context Sync)

Architecture Reference: ADR-045 (Team/Project Context Sync)

CREATE TABLE team_contexts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES organizations(id),
team_id UUID NOT NULL REFERENCES teams(id),

-- Content
message_type VARCHAR(50) NOT NULL, -- 'decision', 'pattern', 'error', 'insight'
content TEXT NOT NULL,
content_hash VARCHAR(64) NOT NULL, -- SHA256 for deduplication
metadata JSONB DEFAULT '{}',

-- Attribution
contributed_by UUID REFERENCES users(id),

-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

CONSTRAINT unique_team_content UNIQUE(team_id, content_hash)
);

CREATE INDEX idx_team_contexts_team_id ON team_contexts(team_id);
CREATE INDEX idx_team_contexts_tenant_id ON team_contexts(tenant_id);
CREATE INDEX idx_team_contexts_created_at ON team_contexts(created_at DESC);
CREATE INDEX idx_team_contexts_type ON team_contexts(message_type);
CREATE INDEX idx_team_contexts_contributor ON team_contexts(contributed_by);

6.1.13 project_contexts (Project Context Sync)

Architecture Reference: ADR-045 (Team/Project Context Sync)

CREATE TABLE project_contexts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES organizations(id),
project_id UUID NOT NULL REFERENCES projects(id),

-- Content
message_type VARCHAR(50) NOT NULL, -- 'decision', 'pattern', 'error', 'insight'
content TEXT NOT NULL,
content_hash VARCHAR(64) NOT NULL, -- SHA256 for deduplication
metadata JSONB DEFAULT '{}',

-- Attribution
contributed_by UUID REFERENCES users(id),

-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

CONSTRAINT unique_project_content UNIQUE(project_id, content_hash)
);

CREATE INDEX idx_project_contexts_project_id ON project_contexts(project_id);
CREATE INDEX idx_project_contexts_tenant_id ON project_contexts(tenant_id);
CREATE INDEX idx_project_contexts_created_at ON project_contexts(created_at DESC);
CREATE INDEX idx_project_contexts_type ON project_contexts(message_type);
CREATE INDEX idx_project_contexts_contributor ON project_contexts(contributed_by);

6.1.14 sync_cursors (Multi-Level Sync Tracking)

Architecture Reference: ADR-044, ADR-045

-- User-level sync cursors (per-device)
CREATE TABLE user_sync_cursors (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
device_id VARCHAR(255) NOT NULL,
last_sync_at TIMESTAMP WITH TIME ZONE,
cursor_position BIGINT DEFAULT 0,

UNIQUE(user_id, device_id)
);

-- Team-level sync cursors (per-user per-team per-device)
CREATE TABLE team_sync_cursors (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
team_id UUID NOT NULL REFERENCES teams(id),
user_id UUID NOT NULL REFERENCES users(id),
device_id VARCHAR(255) NOT NULL,
last_sync_at TIMESTAMP WITH TIME ZONE,
cursor_position BIGINT DEFAULT 0,

UNIQUE(team_id, user_id, device_id)
);

-- Project-level sync cursors (per-user per-project per-device)
CREATE TABLE project_sync_cursors (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID NOT NULL REFERENCES projects(id),
user_id UUID NOT NULL REFERENCES users(id),
device_id VARCHAR(255) NOT NULL,
last_sync_at TIMESTAMP WITH TIME ZONE,
cursor_position BIGINT DEFAULT 0,

UNIQUE(project_id, user_id, device_id)
);

CREATE INDEX idx_user_sync_cursors_user ON user_sync_cursors(user_id);
CREATE INDEX idx_team_sync_cursors_team ON team_sync_cursors(team_id);
CREATE INDEX idx_team_sync_cursors_user ON team_sync_cursors(user_id);
CREATE INDEX idx_project_sync_cursors_project ON project_sync_cursors(project_id);
CREATE INDEX idx_project_sync_cursors_user ON project_sync_cursors(user_id);

6.1.15 context_backups (GCS Backup Metadata)

Architecture Reference: ADR-045 (Backup System)

CREATE TABLE context_backups (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES organizations(id),

-- Scope
scope_type VARCHAR(20) NOT NULL, -- 'user', 'team', 'project', 'organization'
scope_id UUID NOT NULL, -- References user_id, team_id, project_id, or org_id

-- Backup metadata
backup_type VARCHAR(20) NOT NULL, -- 'incremental', 'full', 'on_demand'
gcs_path TEXT NOT NULL, -- gs://bucket/path/to/backup.jsonl.gz
size_bytes BIGINT NOT NULL,
record_count INTEGER NOT NULL,
checksum VARCHAR(64) NOT NULL, -- SHA256 of backup file

-- Status
status VARCHAR(20) NOT NULL DEFAULT 'completed', -- 'pending', 'in_progress', 'completed', 'failed'
error_message TEXT,

-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
expires_at TIMESTAMP WITH TIME ZONE, -- For retention policy

CONSTRAINT valid_backup_scope CHECK (scope_type IN ('user', 'team', 'project', 'organization')),
CONSTRAINT valid_backup_type CHECK (backup_type IN ('incremental', 'full', 'on_demand')),
CONSTRAINT valid_backup_status CHECK (status IN ('pending', 'in_progress', 'completed', 'failed'))
);

CREATE INDEX idx_context_backups_tenant ON context_backups(tenant_id);
CREATE INDEX idx_context_backups_scope ON context_backups(scope_type, scope_id);
CREATE INDEX idx_context_backups_created ON context_backups(created_at DESC);
CREATE INDEX idx_context_backups_expires ON context_backups(expires_at) WHERE status = 'completed';

6.2 Indexes and Constraints

6.2.1 Performance Indexes

-- Composite index for org member lookups
CREATE INDEX idx_memberships_org_user_active
ON memberships(org_id, user_id, role)
WHERE revoked_at IS NULL;

-- Composite index for user's active workstations
CREATE INDEX idx_workstations_user_org_active
ON workstations(user_id, org_id, status)
WHERE deleted_at IS NULL;

-- Index for subscription period queries
CREATE INDEX idx_subscriptions_period
ON subscriptions(current_period_end)
WHERE status = 'active';

-- Partial index for active projects
CREATE INDEX idx_projects_org_active
ON projects(org_id, created_at DESC)
WHERE deleted_at IS NULL;
```text

#### 6.2.2 Foreign Key Constraints

```sql
-- Ensure at least one owner per organization
CREATE OR REPLACE FUNCTION check_org_has_owner()
RETURNS TRIGGER AS $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM memberships
WHERE org_id = NEW.org_id
AND role = 'owner'
AND revoked_at IS NULL
) THEN
RAISE EXCEPTION 'Organization must have at least one owner';
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER ensure_org_owner
AFTER UPDATE ON memberships
FOR EACH ROW
EXECUTE FUNCTION check_org_has_owner();
```text

### 6.3 Row-Level Security (RLS) Policies

#### 6.3.1 Enable RLS on All Tables

```sql
ALTER TABLE organizations ENABLE ROW LEVEL SECURITY;
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE memberships ENABLE ROW LEVEL SECURITY;
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE subscriptions ENABLE ROW LEVEL SECURITY;
ALTER TABLE workstations ENABLE ROW LEVEL SECURITY;
```text

#### 6.3.2 Organization Access Policy

```sql
-- Users can only see organizations they belong to
CREATE POLICY org_member_access ON organizations
FOR SELECT
USING (
id IN (
SELECT org_id FROM memberships
WHERE user_id = current_setting('app.current_user_id')::UUID
AND revoked_at IS NULL
)
);

-- Only owners can update organizations
CREATE POLICY org_owner_update ON organizations
FOR UPDATE
USING (
id IN (
SELECT org_id FROM memberships
WHERE user_id = current_setting('app.current_user_id')::UUID
AND role = 'owner'
AND revoked_at IS NULL
)
);
```text

#### 6.3.3 Project Access Policy

```sql
-- Users can see projects in their organizations
CREATE POLICY project_org_access ON projects
FOR SELECT
USING (
org_id IN (
SELECT org_id FROM memberships
WHERE user_id = current_setting('app.current_user_id')::UUID
AND revoked_at IS NULL
)
);

-- Members can create projects
CREATE POLICY project_member_create ON projects
FOR INSERT
WITH CHECK (
org_id IN (
SELECT org_id FROM memberships
WHERE user_id = current_setting('app.current_user_id')::UUID
AND role IN ('owner', 'admin', 'member')
AND revoked_at IS NULL
)
);
```text

#### 6.3.4 Workstation Access Policy

```sql
-- Users can only see their own workstations
CREATE POLICY workstation_owner_access ON workstations
FOR ALL
USING (user_id = current_setting('app.current_user_id')::UUID);

-- Admins can see all org workstations
CREATE POLICY workstation_admin_access ON workstations
FOR SELECT
USING (
org_id IN (
SELECT org_id FROM memberships
WHERE user_id = current_setting('app.current_user_id')::UUID
AND role IN ('owner', 'admin')
AND revoked_at IS NULL
)
);
```text

#### 6.3.5 Setting RLS Context

```sql
-- API sets this at start of each request
SELECT set_config('app.current_user_id', '550e8400-e29b-41d4-a716-446655440000', true);
```text

---

## 7. Integration Points

### 7.1 Firebase Auth Integration

#### 7.1.1 Token Verification Flow

```python
from firebase_admin import auth
import redis

async def verify_firebase_token(id_token: str) -> dict:
"""Verify Firebase ID token with caching."""

# Check Redis cache first
token_hash = hashlib.sha256(id_token.encode()).hexdigest()
cached = await redis_client.get(f"token:{token_hash}")

if cached:
return json.loads(cached)

# Verify with Firebase
try:
decoded_token = auth.verify_id_token(id_token)

# Cache for 1 hour (tokens expire after 1 hour)
await redis_client.setex(
f"token:{token_hash}",
3600,
json.dumps(decoded_token)
)

return decoded_token
except auth.InvalidIdTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
```text

#### 7.1.2 Custom Claims for Authorization

```python
# Set custom claims after user registration
auth.set_custom_user_claims(
firebase_uid,
{
'user_id': str(user_id),
'org_ids': [str(org_id)],
'roles': {'org_1': 'owner'}
}
)

# Claims available in JWT token
# Frontend can use without API call
```text

### 7.2 Stripe Webhooks Integration

#### 7.2.1 Webhook Event Handler

```python
from stripe import Webhook
from google.cloud import pubsub_v1

async def handle_stripe_webhook(request: Request):
"""Process Stripe webhook events."""

# Verify webhook signature
payload = await request.body()
sig_header = request.headers.get('stripe-signature')

try:
event = Webhook.construct_event(
payload, sig_header, STRIPE_WEBHOOK_SECRET
)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid payload")
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")

# Publish to Pub/Sub for async processing
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, 'stripe-events')

publisher.publish(
topic_path,
data=json.dumps(event).encode('utf-8'),
event_type=event['type']
)

return {"received": True}
```text

#### 7.2.2 Subscription Event Processing

```python
async def process_subscription_updated(event: dict):
"""Handle subscription.updated event."""

subscription = event['data']['object']

await db.execute("""
UPDATE subscriptions
SET status = $1,
current_period_start = $2,
current_period_end = $3
WHERE stripe_subscription_id = $4
""", [
subscription['status'],
datetime.fromtimestamp(subscription['current_period_start']),
datetime.fromtimestamp(subscription['current_period_end']),
subscription['id']
])

# Update organization tier if downgraded
if subscription['status'] == 'past_due':
await downgrade_organization(subscription['metadata']['org_id'])
```text

#### 7.2.3 Webhook Event Types

| Event | Handler | Action |
|-------|---------|--------|
| `checkout.session.completed` | `process_checkout_completed` | Create subscription record |
| `customer.subscription.updated` | `process_subscription_updated` | Update subscription status |
| `customer.subscription.deleted` | `process_subscription_deleted` | Cancel subscription |
| `invoice.payment_succeeded` | `process_payment_succeeded` | Record payment |
| `invoice.payment_failed` | `process_payment_failed` | Send notification |

### 7.3 GCP Cloud Workstations API Integration

#### 7.3.1 Workstation Creation

```python
from google.cloud import workstations_v1

async def create_workstation(
user_id: UUID,
org_id: UUID,
project_id: UUID,
machine_type: str = "e2-standard-4"
) -> str:
"""Create GCP Cloud Workstation."""

client = workstations_v1.WorkstationsClient()

# Create workstation configuration
config = workstations_v1.Workstation(
display_name=f"coditect-{user_id}",
machine_type=machine_type,
disable_public_ip_addresses=False,
idle_timeout=duration_pb2.Duration(seconds=3600),
env={
"USER_ID": str(user_id),
"ORG_ID": str(org_id),
"PROJECT_ID": str(project_id)
},
persistent_directories=[
workstations_v1.PersistentDirectory(
mount_path="/home/user/projects",
gce_pd=workstations_v1.GceRegionalPersistentDisk(
size_gb=50,
fs_type="ext4"
)
)
]
)

# Create workstation (long-running operation)
parent = f"projects/{GCP_PROJECT}/locations/{REGION}/workstationClusters/{CLUSTER}/workstationConfigs/{CONFIG}"

operation = client.create_workstation(
parent=parent,
workstation=config,
workstation_id=f"ws-{user_id}"
)

# Return operation ID for status polling
return operation.operation.name
```text

#### 7.3.2 Workstation Lifecycle Management

```python
async def start_workstation(workstation_id: str) -> str:
"""Start a stopped workstation."""
client = workstations_v1.WorkstationsClient()

operation = client.start_workstation(name=workstation_id)
return operation.operation.name

async def stop_workstation(workstation_id: str) -> str:
"""Stop a running workstation."""
client = workstations_v1.WorkstationsClient()

operation = client.stop_workstation(name=workstation_id)
return operation.operation.name

async def delete_workstation(workstation_id: str) -> str:
"""Delete a workstation permanently."""
client = workstations_v1.WorkstationsClient()

operation = client.delete_workstation(name=workstation_id)
return operation.operation.name
```text

### 7.4 Pub/Sub Event Processing

#### 7.4.1 Event Topics

```python
# Topic definitions
TOPICS = {
'user-events': 'user.created, user.updated, user.deleted',
'org-events': 'org.created, org.updated, org.deleted',
'subscription-events': 'subscription.activated, subscription.canceled',
'workstation-events': 'workstation.ready, workstation.error',
'stripe-events': 'checkout.completed, subscription.updated'
}
```text

#### 7.4.2 Subscriber Pattern

```python
from google.cloud import pubsub_v1

def create_subscriber(topic: str, handler: callable):
"""Create Pub/Sub subscriber."""

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
PROJECT_ID, f"{topic}-subscription"
)

def callback(message: pubsub_v1.subscriber.message.Message):
try:
event = json.loads(message.data.decode('utf-8'))
handler(event)
message.ack()
except Exception as e:
logger.error(f"Error processing message: {e}")
message.nack()

future = subscriber.subscribe(subscription_path, callback)
return future
```text

#### 7.4.3 Workstation Event Handler

```python
async def handle_workstation_event(event: dict):
"""Process workstation lifecycle events."""

event_type = event['event_type']
workstation_data = event['data']

if event_type == 'workstation.ready':
await db.execute("""
UPDATE workstations
SET status = 'ready',
access_url = $1,
last_started_at = NOW()
WHERE gcp_workstation_id = $2
""", [
workstation_data['access_url'],
workstation_data['id']
])

elif event_type == 'workstation.error':
await db.execute("""
UPDATE workstations
SET status = 'error'
WHERE gcp_workstation_id = $1
""", [workstation_data['id']])

# Send notification to user
await notify_user_workstation_error(workstation_data)
```text

---

## 8. Error Handling Strategy

### 8.1 Error Classification

| Category | HTTP Code | Retry Strategy | User Impact |
|----------|-----------|----------------|-------------|
| **Client Errors** | 4xx | No retry | Show error message |
| **Server Errors** | 5xx | Exponential backoff | Retry automatically |
| **Rate Limits** | 429 | Wait + retry | Queue request |
| **Quota Exceeded** | 429 | Upgrade prompt | Block action |
| **External Service** | 503 | Circuit breaker | Fallback mode |

### 8.2 Retry Logic

```python
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(ServiceUnavailableError)
)
async def call_external_api(endpoint: str, payload: dict):
"""Call external API with automatic retries."""

try:
response = await httpx.post(endpoint, json=payload, timeout=30)
response.raise_for_status()
return response.json()

except httpx.TimeoutException:
logger.warning(f"Timeout calling {endpoint}")
raise ServiceUnavailableError("External service timeout")

except httpx.HTTPStatusError as e:
if e.response.status_code >= 500:
raise ServiceUnavailableError("External service error")
raise
```text

### 8.3 Circuit Breaker Pattern

```python
from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=60)
async def create_stripe_checkout(org_id: UUID, price_id: str):
"""Create Stripe checkout with circuit breaker."""

try:
session = stripe.checkout.Session.create(
customer_email=org.email,
line_items=[{"price": price_id, "quantity": 1}],
mode='subscription',
metadata={'org_id': str(org_id)}
)
return session

except stripe.error.StripeError as e:
logger.error(f"Stripe error: {e}")
raise ExternalServiceError("Payment service unavailable")
```json

### 8.4 Graceful Degradation

```python
async def get_workstation_status(workstation_id: UUID):
"""Get workstation status with fallback."""

try:
# Try GCP Workstations API first
gcp_status = await gcp_client.get_workstation(workstation_id)
return gcp_status

except GCPError:
# Fallback to database cache
logger.warning("GCP API unavailable, using cached status")

db_status = await db.fetchrow("""
SELECT status, access_url, last_started_at
FROM workstations WHERE id = $1
""", workstation_id)

return {
'status': db_status['status'],
'access_url': db_status['access_url'],
'cached': True
}
```text

### 8.5 Error Monitoring and Alerting

```python
from google.cloud import error_reporting

error_client = error_reporting.Client()

def report_error(e: Exception, context: dict = None):
"""Report error to Cloud Error Reporting."""

error_client.report_exception(
http_context={
'method': context.get('method', 'UNKNOWN'),
'url': context.get('url', ''),
'userAgent': context.get('user_agent', ''),
'responseStatusCode': context.get('status_code', 500)
}
)

# Also log locally
logger.error(f"Error: {e}", extra=context)
```json

### 8.6 User-Facing Error Messages

```python
ERROR_MESSAGES = {
'INSUFFICIENT_QUOTA': {
'message': "You've reached your plan's limit for {resource}",
'action': "Upgrade your plan to increase limits",
'link': '/billing/upgrade'
},
'WORKSTATION_PROVISIONING_FAILED': {
'message': "We couldn't create your workstation",
'action': "Please try again or contact support",
'link': '/support'
},
'PAYMENT_FAILED': {
'message': "Your payment method was declined",
'action': "Update your payment method to continue",
'link': '/billing/payment-methods'
}
}
```text

---

## 9. Performance Considerations

### 9.1 Database Query Optimization

#### 9.1.1 N+1 Query Prevention

```python
# BAD: N+1 query
async def get_projects_with_members():
projects = await db.fetch("SELECT * FROM projects")

for project in projects:
# N queries!
members = await db.fetch("""
SELECT * FROM memberships WHERE org_id = $1
""", project['org_id'])
project['members'] = members

# GOOD: Single query with JOIN
async def get_projects_with_members():
return await db.fetch("""
SELECT
p.*,
json_agg(json_build_object(
'user_id', m.user_id,
'role', m.role
)) as members
FROM projects p
LEFT JOIN memberships m ON m.org_id = p.org_id
WHERE p.deleted_at IS NULL
GROUP BY p.id
""")
```text

#### 9.1.2 Query Result Caching

```python
from functools import lru_cache
import redis.asyncio as redis

@lru_cache(maxsize=1000)
async def get_organization_cached(org_id: UUID) -> dict:
"""Get org with 5-minute cache."""

# Check Redis first
cached = await redis_client.get(f"org:{org_id}")
if cached:
return json.loads(cached)

# Query database
org = await db.fetchrow("""
SELECT * FROM organizations WHERE id = $1
""", org_id)

# Cache for 5 minutes
await redis_client.setex(
f"org:{org_id}",
300,
json.dumps(dict(org))
)

return org
```text

### 9.2 API Response Pagination

```python
from fastapi import Query

@app.get("/api/v1/projects")
async def list_projects(
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100)
):
"""List projects with cursor pagination."""

offset = (page - 1) * limit

projects = await db.fetch("""
SELECT * FROM projects
WHERE deleted_at IS NULL
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
""", limit, offset)

total = await db.fetchval("""
SELECT COUNT(*) FROM projects WHERE deleted_at IS NULL
""")

return {
'data': projects,
'pagination': {
'page': page,
'limit': limit,
'total': total,
'pages': (total + limit - 1) // limit
}
}
```text

### 9.3 Rate Limiting

```python
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/api/v1/workstations")
@limiter.limit("10/minute")
async def create_workstation(request: Request):
"""Create workstation with rate limiting."""

# Check org-level quota
org_id = request.state.user['org_id']

workstation_count = await db.fetchval("""
SELECT COUNT(*) FROM workstations
WHERE org_id = $1 AND deleted_at IS NULL
""", org_id)

subscription = await get_subscription(org_id)

if workstation_count >= subscription['max_workstations']:
raise HTTPException(
status_code=429,
detail="INSUFFICIENT_QUOTA"
)

# Proceed with creation
return await provision_workstation(...)
```text

### 9.4 Connection Pooling

```python
import asyncpg

# Create connection pool at startup
async def create_db_pool():
return await asyncpg.create_pool(
dsn=DATABASE_URL,
min_size=10,
max_size=50,
command_timeout=60,
max_queries=50000,
max_inactive_connection_lifetime=300
)

# Use pool for queries
async def query_with_pool(query: str, *args):
async with db_pool.acquire() as conn:
return await conn.fetch(query, *args)
```text

### 9.5 Workstation Provisioning Queue

```python
from google.cloud import tasks_v2

async def queue_workstation_provisioning(
user_id: UUID,
org_id: UUID,
project_id: UUID
):
"""Queue workstation creation to avoid timeout."""

client = tasks_v2.CloudTasksClient()
parent = client.queue_path(PROJECT_ID, REGION, 'workstation-provisioning')

task = {
'http_request': {
'http_method': tasks_v2.HttpMethod.POST,
'url': f'{API_BASE_URL}/internal/provision-workstation',
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'user_id': str(user_id),
'org_id': str(org_id),
'project_id': str(project_id)
}).encode()
}
}

client.create_task(parent=parent, task=task)
```text

### 9.6 Performance Targets

| Metric | Target | Measurement |
|--------|--------|-------------|
| **API Response Time (p95)** | < 200ms | Cloud Monitoring |
| **Database Query Time (p95)** | < 50ms | pganalyze |
| **Workstation Provisioning** | < 5 minutes | Custom metric |
| **Cache Hit Rate** | > 80% | Redis INFO |
| **Connection Pool Utilization** | < 70% | asyncpg stats |
| **API Throughput** | 1000 req/sec | Load testing |

---

## 10. Monitoring and Observability

### 10.1 Logging Strategy

#### 10.1.1 Structured Logging

```python
import structlog

logger = structlog.get_logger()

async def create_project(org_id: UUID, name: str):
"""Create project with structured logging."""

logger.info(
"project.create.started",
org_id=str(org_id),
project_name=name
)

try:
project = await db.fetchrow("""
INSERT INTO projects (org_id, name)
VALUES ($1, $2)
RETURNING *
""", org_id, name)

logger.info(
"project.create.success",
project_id=str(project['id']),
org_id=str(org_id),
duration_ms=123
)

return project

except Exception as e:
logger.error(
"project.create.failed",
org_id=str(org_id),
error=str(e),
error_type=type(e).__name__
)
raise
```text

#### 10.1.2 Log Levels

| Level | Usage | Examples |
|-------|-------|----------|
| **DEBUG** | Development only | SQL queries, cache hits |
| **INFO** | Normal operations | User login, resource created |
| **WARNING** | Recoverable issues | Rate limit hit, retry attempt |
| **ERROR** | Operation failed | API error, database timeout |
| **CRITICAL** | System failure | Database down, payment gateway unavailable |

### 10.2 Metrics Collection

#### 10.2.1 Application Metrics

```python
from prometheus_client import Counter, Histogram, Gauge

# Request metrics
request_count = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status']
)

request_duration = Histogram(
'api_request_duration_seconds',
'API request duration',
['method', 'endpoint']
)

# Business metrics
active_workstations = Gauge(
'workstations_active',
'Number of active workstations',
['org_id', 'tier']
)

subscription_count = Gauge(
'subscriptions_total',
'Total subscriptions',
['tier', 'status']
)

# Usage example
@app.post("/api/v1/projects")
async def create_project(request: Request):
with request_duration.labels(
method='POST',
endpoint='/projects'
).time():
try:
result = await _create_project(...)
request_count.labels(
method='POST',
endpoint='/projects',
status='success'
).inc()
return result
except Exception as e:
request_count.labels(
method='POST',
endpoint='/projects',
status='error'
).inc()
raise
```text

#### 10.2.2 Custom Metrics

```python
from google.cloud import monitoring_v3

def record_workstation_provisioning_time(duration_seconds: float):
"""Record custom metric to Cloud Monitoring."""

client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{PROJECT_ID}"

series = monitoring_v3.TimeSeries()
series.metric.type = 'custom.googleapis.com/workstation/provisioning_duration'
series.resource.type = 'global'

point = monitoring_v3.Point()
point.value.double_value = duration_seconds
point.interval.end_time.seconds = int(time.time())

series.points = [point]

client.create_time_series(name=project_name, time_series=[series])
```text

### 10.3 Distributed Tracing

#### 10.3.1 OpenTelemetry Integration

```python
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Setup tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

cloud_trace_exporter = CloudTraceSpanExporter()
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(cloud_trace_exporter)
)

# Trace database queries
async def create_organization(name: str):
with tracer.start_as_current_span("create_organization") as span:
span.set_attribute("org.name", name)

with tracer.start_as_current_span("db.insert_org"):
org = await db.fetchrow("""
INSERT INTO organizations (name, slug)
VALUES ($1, $2) RETURNING *
""", name, slugify(name))

span.set_attribute("org.id", str(org['id']))

return org
```text

### 10.4 Health Checks

```python
from fastapi import status

@app.get("/health")
async def health_check():
"""Basic health check."""
return {"status": "healthy"}

@app.get("/health/ready")
async def readiness_check():
"""Readiness check with dependency validation."""

checks = {
'database': False,
'redis': False,
'stripe': False
}

# Check database
try:
await db.fetchval("SELECT 1")
checks['database'] = True
except Exception as e:
logger.error(f"Database health check failed: {e}")

# Check Redis
try:
await redis_client.ping()
checks['redis'] = True
except Exception as e:
logger.error(f"Redis health check failed: {e}")

# Check Stripe
try:
stripe.Plan.list(limit=1)
checks['stripe'] = True
except Exception as e:
logger.error(f"Stripe health check failed: {e}")

all_healthy = all(checks.values())

return JSONResponse(
status_code=status.HTTP_200_OK if all_healthy else status.HTTP_503_SERVICE_UNAVAILABLE,
content={
'status': 'healthy' if all_healthy else 'degraded',
'checks': checks
}
)
```text

### 10.5 Alerting Rules

#### 10.5.1 Cloud Monitoring Alerts

```yaml
# Error rate alert
alert_policies:
- display_name: "High API Error Rate"
conditions:
- display_name: "Error rate > 5%"
condition_threshold:
filter: 'resource.type="cloud_run_revision" AND metric.type="run.googleapis.com/request_count"'
comparison: COMPARISON_GT
threshold_value: 0.05
duration: 300s
notification_channels:
- projects/${PROJECT_ID}/notificationChannels/${CHANNEL_ID}

# Workstation provisioning failure
- display_name: "Workstation Provisioning Failures"
conditions:
- display_name: "Failed provisioning > 3 in 10 min"
condition_threshold:
filter: 'metric.type="custom.googleapis.com/workstation/provisioning_status" AND metric.label.status="error"'
comparison: COMPARISON_GT
threshold_value: 3
duration: 600s
```text

#### 10.5.2 On-Call Rotation

```yaml
# PagerDuty escalation policy
escalation_policy:
name: "CODITECT Platform"
escalation_rules:
- escalation_delay_in_minutes: 0
targets:
- type: schedule
id: primary_oncall_schedule

- escalation_delay_in_minutes: 15
targets:
- type: schedule
id: backup_oncall_schedule

- escalation_delay_in_minutes: 30
targets:
- type: user
id: engineering_manager
```text

### 10.6 Dashboards

#### 10.6.1 Operational Dashboard

**Widgets:**

- API request rate (requests/sec)
- API error rate (%)
- Database connection pool utilization (%)
- Active workstations (count)
- Workstation provisioning queue depth (count)
- Subscription creation rate (subs/hour)
- Stripe webhook processing latency (ms)

#### 10.6.2 Business Metrics Dashboard

**Widgets:**

- New user registrations (per day)
- Active subscriptions by tier (pie chart)
- Monthly recurring revenue (MRR)
- Workstation hours consumed (per org)
- Subscription churn rate (%)
- Average revenue per user (ARPU)

---

## 11. Security Considerations

### 11.1 Authentication Security

```python
# JWT token validation
async def validate_token(token: str) -> dict:
"""Validate Firebase JWT token."""

try:
# Verify signature and expiration
decoded = auth.verify_id_token(token, check_revoked=True)

# Check token age (max 1 hour)
issued_at = decoded.get('iat')
if time.time() - issued_at > 3600:
raise HTTPException(401, "Token expired")

return decoded

except auth.RevokedIdTokenError:
raise HTTPException(401, "Token revoked")
except auth.InvalidIdTokenError:
raise HTTPException(401, "Invalid token")
```text

### 11.2 Data Encryption

```python
# Encrypt sensitive data at rest
from cryptography.fernet import Fernet

def encrypt_sensitive_field(plaintext: str) -> str:
"""Encrypt sensitive data before storing."""
cipher = Fernet(ENCRYPTION_KEY)
return cipher.encrypt(plaintext.encode()).decode()

def decrypt_sensitive_field(ciphertext: str) -> str:
"""Decrypt sensitive data after retrieval."""
cipher = Fernet(ENCRYPTION_KEY)
return cipher.decrypt(ciphertext.encode()).decode()

# Use in database operations
async def store_api_key(user_id: UUID, api_key: str):
encrypted = encrypt_sensitive_field(api_key)
await db.execute("""
INSERT INTO user_api_keys (user_id, encrypted_key)
VALUES ($1, $2)
""", user_id, encrypted)
```text

### 11.3 SQL Injection Prevention

```python
# ALWAYS use parameterized queries
# BAD - SQL injection vulnerability
async def get_user_bad(email: str):
query = f"SELECT * FROM users WHERE email = '{email}'"
return await db.fetchrow(query)

# GOOD - parameterized query
async def get_user_good(email: str):
query = "SELECT * FROM users WHERE email = $1"
return await db.fetchrow(query, email)
```text

### 11.4 Rate Limiting per User

```python
from redis import asyncio as aioredis

async def check_user_rate_limit(user_id: UUID, action: str) -> bool:
"""Check if user has exceeded rate limit."""

key = f"ratelimit:{user_id}:{action}"

# Increment counter
count = await redis_client.incr(key)

# Set expiry on first request
if count == 1:
await redis_client.expire(key, 60) # 1 minute window

# Check limit (100 requests per minute)
if count > 100:
raise HTTPException(429, "Rate limit exceeded")

return True
```text

---

## 12. Deployment Strategy

### 12.1 CI/CD Pipeline

```yaml
# .github/workflows/deploy.yml
name: Deploy to Production

on:
push:
branches: [main]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run tests
run: pytest tests/

deploy:
needs: test
runs-on: ubuntu-latest
steps:
- name: Deploy to Cloud Run
run: |
gcloud run deploy coditect-api \
--image gcr.io/${PROJECT_ID}/coditect-api:${GITHUB_SHA} \
--region us-central1 \
--platform managed \
--allow-unauthenticated

- name: Run database migrations
run: |
python scripts/run-migrations.py
```text

### 12.2 Database Migration Strategy

```python
# Use Alembic for migrations
from alembic import command
from alembic.config import Config

def run_migrations():
"""Run pending database migrations."""

alembic_cfg = Config("alembic.ini")
command.upgrade(alembic_cfg, "head")

# Migration file example (versions/001_initial_schema.py)
def upgrade():
op.create_table(
'organizations',
sa.Column('id', postgresql.UUID(), nullable=False),
sa.Column('name', sa.String(255), nullable=False),
sa.PrimaryKeyConstraint('id')
)

def downgrade():
op.drop_table('organizations')
```bash

### 12.3 Zero-Downtime Deployment

```bash
# Blue-green deployment strategy
gcloud run deploy coditect-api-green \
--image gcr.io/${PROJECT_ID}/coditect-api:${NEW_VERSION} \
--no-traffic

# Run smoke tests
./scripts/smoke-test.sh <https://coditect-api-green-xyz.run.app>

# Gradual traffic migration
gcloud run services update-traffic coditect-api \
--to-revisions=coditect-api-green=10

# Monitor for errors (wait 5 minutes)

# Complete migration
gcloud run services update-traffic coditect-api \
--to-revisions=coditect-api-green=100
```text

---

## 13. Appendix

### 13.1 Glossary

| Term | Definition |
|------|------------|
| **RLS** | Row-Level Security - PostgreSQL feature for tenant isolation |
| **JWT** | JSON Web Token - Authentication token format |
| **LRO** | Long-Running Operation - GCP async operation pattern |
| **Pub/Sub** | Google Cloud Pub/Sub - Async messaging service |
| **Cloud Run** | GCP serverless container platform |
| **Cloud SQL** | GCP managed PostgreSQL service |
| **Workstation** | GCP Cloud Workstation - Managed development environment |

### 13.2 References

- [Firebase Auth Documentation](https://firebase.google.com/docs/auth)
- [Stripe API Reference](https://stripe.com/docs/api)
- [GCP Cloud Workstations](https://cloud.google.com/workstations/docs)
- [PostgreSQL RLS](https://www.postgresql.org/docs/current/ddl-rowsecurity.html)
- [Cloud Run Documentation](https://cloud.google.com/run/docs)
- [FastAPI Documentation](https://fastapi.tiangolo.com/)

### 13.3 Revision History

| Version | Date | Author | Changes |
|---------|------|--------|---------|
| 1.0.0 | 2025-12-24 | Hal Casteel | Initial TDD creation |

---

**Document Status:** Draft
**Next Review:** 2026-01-24
**Owner:** Hal Casteel (<hal@az1.ai>)
**Approvers:** Engineering Team, Security Team, Product Team