Database Integration Guide
Executive Summary
This guide documents the integration patterns for CODITECT's local-first, cloud-synchronized database architecture. Claude Code maintains local SQLite databases that sync with the cloud PostgreSQL backend for persistence, cross-device continuity, and analytics.
Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ CODITECT DATABASE INTEGRATION │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ LOCAL ENVIRONMENT CLOUD ENVIRONMENT │
│ ───────────────── ───────────────── │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Claude Code │ │ Django REST │ │
│ │ Application │ │ Framework │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ HTTPS/JWT ┌─────────────────┐ │
│ │ SQLite │◄─────────────────►│ PostgreSQL │ │
│ │ context.db │ Sync API │ Cloud SQL │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ Tables: Tables: │
│ • messages • context_messages │
│ • task_tracking • context_task_tracking │
│ • task_messages • context_task_messages │
│ • sync_queue • sync_cursors │
│ • sync_stats │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Key Principles
| Principle | Description |
|---|---|
| Local-First | All operations work offline; sync is opportunistic |
| Idempotent Sync | Content hash deduplication prevents duplicates |
| Cursor-Based | Monotonic sync_cursor for efficient polling |
| Multi-Tenant | Automatic tenant isolation via django-multitenant |
| Conflict Resolution | Last-write-wins based on synced_at timestamp |
Table of Contents
- Local Database Schema
- Cloud Database Schema
- Synchronization Protocol
- API Endpoints
- Authentication Flow
- Connection Management
- Error Handling
- Monitoring & Observability
- Troubleshooting
- Security Considerations
1. Local Database Schema
Location
~/.coditect/context-storage/context.db # User's home directory
<project>/.coditect/context-storage/context.db # Project-specific
Schema Definition
-- Messages table: Session context messages
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL, -- 'user', 'assistant', 'system'
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
content_hash TEXT UNIQUE, -- SHA256 for dedup
sync_cursor INTEGER, -- Cloud sync position
synced_at DATETIME -- NULL if not synced
);
CREATE INDEX idx_messages_session ON messages(session_id);
CREATE INDEX idx_messages_hash ON messages(content_hash);
CREATE INDEX idx_messages_cursor ON messages(sync_cursor);
-- Task tracking table: TodoWrite operations
CREATE TABLE IF NOT EXISTS task_tracking (
id TEXT PRIMARY KEY, -- UUID
task_id TEXT NOT NULL, -- e.g., "A.9.1.1"
description TEXT NOT NULL,
active_form TEXT,
session_id TEXT,
project_id TEXT,
status TEXT DEFAULT 'pending', -- pending, in_progress, completed
outcome TEXT, -- success, error, partial, skipped
outcome_score REAL,
tool_success_count INTEGER DEFAULT 0,
tool_error_count INTEGER DEFAULT 0,
user_corrections INTEGER DEFAULT 0,
started_at DATETIME,
completed_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
content_hash TEXT UNIQUE,
sync_cursor INTEGER,
synced_at DATETIME
);
CREATE INDEX idx_task_tracking_task_id ON task_tracking(task_id);
CREATE INDEX idx_task_tracking_status ON task_tracking(status);
CREATE INDEX idx_task_tracking_session ON task_tracking(session_id);
-- Task-message relationships
CREATE TABLE IF NOT EXISTS task_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
message_id INTEGER NOT NULL,
sequence INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (task_id) REFERENCES task_tracking(id),
FOREIGN KEY (message_id) REFERENCES messages(id),
UNIQUE (task_id, message_id)
);
-- Offline sync queue
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
operation TEXT NOT NULL, -- 'push_message', 'push_task', etc.
payload TEXT NOT NULL, -- JSON serialized data
attempts INTEGER DEFAULT 0,
last_attempt DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_sync_queue_operation ON sync_queue(operation);
Content Hash Generation
import hashlib
import json
def generate_content_hash(tenant_id: str, session_id: str, content: dict) -> str:
"""Generate SHA256 hash for content deduplication."""
payload = f"{tenant_id}:{session_id}:{json.dumps(content, sort_keys=True)}"
return hashlib.sha256(payload.encode()).hexdigest()
2. Cloud Database Schema
Models Overview
# context/models.py
from django_multitenant.models import TenantModel
from django_multitenant.fields import TenantForeignKey
from django_multitenant.models import TenantManager
class ContextMessage(TenantModel):
"""Synced context messages from Claude Code sessions."""
tenant_id = 'tenant_id'
id = models.BigAutoField(primary_key=True)
tenant = models.ForeignKey('tenants.Tenant', on_delete=models.CASCADE)
user_id = TenantForeignKey('users.User', on_delete=models.CASCADE)
content_hash = models.CharField(max_length=64, db_index=True)
message_type = models.CharField(max_length=50)
content = models.JSONField()
source_device_id = models.CharField(max_length=255)
sequence_number = models.BigIntegerField()
synced_at = models.DateTimeField(auto_now_add=True, db_index=True)
created_at = models.DateTimeField(auto_now_add=True)
objects = TenantManager()
class Meta:
db_table = 'context_messages'
unique_together = [('user_id', 'content_hash')]
indexes = [
models.Index(fields=['tenant_id', 'sync_cursor']),
models.Index(fields=['tenant_id', 'user_id', 'synced_at']),
]
class TaskTracking(TenantModel):
"""Task execution tracking from TodoWrite operations."""
tenant_id = 'tenant_id'
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
tenant = TenantForeignKey('tenants.Tenant', on_delete=models.CASCADE)
user_id = models.CharField(max_length=63, db_index=True)
task_id = models.CharField(max_length=50, db_index=True)
description = models.TextField()
active_form = models.CharField(max_length=255, blank=True)
session_id = models.CharField(max_length=255, db_index=True, blank=True)
project_id = models.CharField(max_length=255, db_index=True, blank=True)
status = models.CharField(max_length=20, default='pending', db_index=True)
outcome = models.CharField(max_length=20, blank=True)
outcome_score = models.FloatField(null=True, blank=True)
tool_success_count = models.IntegerField(default=0)
tool_error_count = models.IntegerField(default=0)
user_corrections = models.IntegerField(default=0)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
synced_at = models.DateTimeField(auto_now_add=True, db_index=True)
updated_at = models.DateTimeField(auto_now=True)
content_hash = models.CharField(max_length=64, unique=True, db_index=True)
sync_cursor = models.BigIntegerField(unique=True, null=True, db_index=True)
client_version = models.CharField(max_length=50, blank=True)
hardware_id = models.CharField(max_length=255, blank=True)
objects = TenantManager()
class Meta:
db_table = 'context_task_tracking'
indexes = [
models.Index(fields=['tenant_id', 'sync_cursor']),
models.Index(fields=['tenant_id', 'user_id', 'status']),
models.Index(fields=['tenant_id', 'project_id', 'status']),
models.Index(fields=['session_id', 'synced_at']),
models.Index(fields=['task_id', 'status']),
]
3. Synchronization Protocol
Push Flow (Local → Cloud)
Pull Flow (Cloud → Local)
Conflict Resolution
def resolve_conflict(local_record, cloud_record):
"""Last-write-wins conflict resolution."""
local_time = local_record.get('updated_at') or local_record.get('created_at')
cloud_time = cloud_record.get('synced_at')
if cloud_time > local_time:
# Cloud version wins
return cloud_record
else:
# Local version wins, will be pushed on next sync
return local_record
4. API Endpoints
Base URL
Production: https://api.coditect.ai/api/v1/context/
Staging: https://api-staging.coditect.ai/api/v1/context/
Push Endpoint
POST /api/v1/context/push/
Authorization: Bearer <jwt_token>
Content-Type: application/json
{
"messages": [
{
"content_hash": "a1b2c3...",
"message_type": "assistant",
"content": {"role": "assistant", "text": "..."},
"source_device_id": "device-uuid",
"sequence_number": 1234
}
],
"tasks": [
{
"id": "uuid",
"task_id": "A.9.1.1",
"description": "Implement feature X",
"active_form": "Implementing feature X",
"status": "completed",
"outcome": "success",
"content_hash": "d4e5f6..."
}
],
"client_version": "1.5.0",
"hardware_id": "hw-fingerprint"
}
Response:
{
"success": true,
"synced_messages": 5,
"synced_tasks": 2,
"duplicates_skipped": 1,
"cursor": 12456,
"server_time": "2026-01-04T20:30:00Z"
}
Pull Endpoint
GET /api/v1/context/pull/?cursor=12340&limit=100&types=message,task
Authorization: Bearer <jwt_token>
Response:
{
"messages": [...],
"tasks": [...],
"next_cursor": 12445,
"has_more": false,
"server_time": "2026-01-04T20:30:00Z"
}
Status Endpoint
GET /api/v1/context/status/
Authorization: Bearer <jwt_token>
Response:
{
"tenant_id": "uuid",
"user_id": "uuid",
"total_messages": 1523,
"total_tasks": 89,
"last_sync": "2026-01-04T20:15:00Z",
"storage_bytes": 2457600,
"quota_bytes": 104857600,
"quota_percent": 2.3
}
Delete Endpoint
DELETE /api/v1/context/messages/?older_than=2025-01-01
Authorization: Bearer <jwt_token>
Response:
{
"deleted_messages": 1200,
"deleted_tasks": 45,
"freed_bytes": 1245678
}
5. Authentication Flow
JWT Token Structure
{
"sub": "user-uuid",
"tenant_id": "tenant-uuid",
"email": "user@example.com",
"roles": ["member"],
"iat": 1704393600,
"exp": 1704397200,
"iss": "https://auth.coditect.ai"
}
Token Refresh Flow
Middleware: Tenant Extraction
# context/middleware.py
from django_multitenant.utils import set_current_tenant
from tenants.models import Tenant
import jwt
class TenantMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Extract tenant from JWT
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:]
try:
payload = jwt.decode(token, options={"verify_signature": False})
tenant_id = payload.get('tenant_id')
if tenant_id:
tenant = Tenant.objects.get(id=tenant_id)
set_current_tenant(tenant)
except (jwt.DecodeError, Tenant.DoesNotExist):
pass
return self.get_response(request)
6. Connection Management
Local SQLite
import sqlite3
from contextlib import contextmanager
DATABASE_PATH = Path.home() / '.coditect' / 'context-storage' / 'context.db'
@contextmanager
def get_db_connection():
"""Thread-safe SQLite connection with WAL mode."""
conn = sqlite3.connect(DATABASE_PATH, check_same_thread=False)
conn.execute('PRAGMA journal_mode=WAL')
conn.execute('PRAGMA synchronous=NORMAL')
conn.execute('PRAGMA busy_timeout=5000')
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
Cloud PostgreSQL
# Django settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'coditect',
'USER': os.environ.get('DB_USER'),
'PASSWORD': os.environ.get('DB_PASSWORD'),
'HOST': '/cloudsql/coditect-citus-prod:us-central1:coditect-citus-dev',
'PORT': '5432',
'CONN_MAX_AGE': 600, # Keep connections alive 10 min
'OPTIONS': {
'connect_timeout': 10,
'options': '-c statement_timeout=30000', # 30s query timeout
},
}
}
PgBouncer Configuration (Production)
[databases]
coditect = host=127.0.0.1 port=5432 dbname=coditect
[pgbouncer]
listen_port = 6432
listen_addr = 127.0.0.1
auth_type = scram-sha-256
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 20
min_pool_size = 5
reserve_pool_size = 5
reserve_pool_timeout = 3
max_db_connections = 100
Rate Limiting
# context/throttling.py
from rest_framework.throttling import UserRateThrottle
class ContextSyncThrottle(UserRateThrottle):
"""Rate limiting for sync endpoints."""
scope = 'context_sync'
# settings.py
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': [
'context.throttling.ContextSyncThrottle',
],
'DEFAULT_THROTTLE_RATES': {
'context_sync': '100/minute', # 100 requests per minute per user
}
}
7. Error Handling
Offline Mode
# scripts/core/cloud_sync_client.py
class CloudSyncClient:
def push(self, records: list) -> dict:
try:
response = self._make_request('POST', '/push/', json=records)
return response.json()
except (ConnectionError, Timeout) as e:
# Queue for later sync
self._queue_for_retry(records)
return {'success': False, 'queued': True, 'error': str(e)}
def _queue_for_retry(self, records: list):
"""Store in sync_queue table for later retry."""
with get_db_connection() as conn:
for record in records:
conn.execute('''
INSERT INTO sync_queue (operation, payload)
VALUES (?, ?)
''', ('push', json.dumps(record)))
conn.commit()
Retry Logic
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1.0, max_delay=60.0):
"""Exponential backoff retry decorator."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except (ConnectionError, Timeout) as e:
retries += 1
if retries == max_retries:
raise
delay = min(base_delay * (2 ** retries), max_delay)
time.sleep(delay)
return wrapper
return decorator
Duplicate Detection
def handle_duplicate(content_hash: str, existing_record: dict, new_record: dict):
"""Handle duplicate content_hash collision."""
# Log the duplicate
logger.info(f"Duplicate detected: {content_hash}")
# Check if content actually differs (hash collision - rare)
if existing_record['content'] != new_record['content']:
logger.error(f"Hash collision detected for {content_hash}")
# Generate new hash with timestamp suffix
new_hash = f"{content_hash}_{int(time.time())}"
new_record['content_hash'] = new_hash
return new_record
# True duplicate - skip
return None
8. Monitoring & Observability
Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge
# Counters
sync_push_total = Counter(
'context_sync_push_total',
'Total push sync operations',
['tenant_id', 'status']
)
sync_pull_total = Counter(
'context_sync_pull_total',
'Total pull sync operations',
['tenant_id', 'status']
)
# Histograms
sync_duration = Histogram(
'context_sync_duration_seconds',
'Sync operation duration',
['operation', 'tenant_id']
)
# Gauges
pending_sync_queue = Gauge(
'context_pending_sync_queue',
'Number of items in offline sync queue'
)
Logging Configuration
LOGGING = {
'version': 1,
'handlers': {
'cloud_logging': {
'class': 'google.cloud.logging.handlers.CloudLoggingHandler',
'client': google.cloud.logging.Client(),
},
},
'loggers': {
'context.sync': {
'handlers': ['cloud_logging'],
'level': 'INFO',
},
},
}
Health Check Endpoint
@api_view(['GET'])
def health_check(request):
"""Database and sync health check."""
checks = {
'database': check_database(),
'redis': check_redis(),
'sync_queue': check_sync_queue(),
}
status = 200 if all(c['healthy'] for c in checks.values()) else 503
return Response(checks, status=status)
def check_database():
try:
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
return {'healthy': True}
except Exception as e:
return {'healthy': False, 'error': str(e)}
9. Troubleshooting
Common Issues
| Issue | Cause | Solution |
|---|---|---|
401 Unauthorized | Expired JWT | Refresh token and retry |
409 Conflict | Duplicate content_hash | Skip or regenerate hash |
429 Too Many Requests | Rate limit exceeded | Implement backoff |
503 Service Unavailable | Cloud SQL down | Use offline mode |
| Sync stuck | Corrupt sync_cursor | Reset cursor to 0 |
Diagnostic Queries
-- Check sync status
SELECT
COUNT(*) as total,
COUNT(synced_at) as synced,
COUNT(*) - COUNT(synced_at) as pending
FROM messages;
-- Find sync gaps
SELECT
sync_cursor,
LAG(sync_cursor) OVER (ORDER BY sync_cursor) as prev_cursor,
sync_cursor - LAG(sync_cursor) OVER (ORDER BY sync_cursor) as gap
FROM context_task_tracking
WHERE sync_cursor IS NOT NULL
ORDER BY sync_cursor
LIMIT 100;
-- Recent sync activity
SELECT
DATE(synced_at) as date,
COUNT(*) as records_synced
FROM context_messages
WHERE synced_at > NOW() - INTERVAL '7 days'
GROUP BY DATE(synced_at)
ORDER BY date DESC;
Reset Procedures
# Reset local sync state (re-sync everything)
sqlite3 ~/.coditect/context-storage/context.db "UPDATE messages SET synced_at = NULL, sync_cursor = NULL"
# Clear sync queue
sqlite3 ~/.coditect/context-storage/context.db "DELETE FROM sync_queue"
# Force full re-sync
curl -X POST https://api.coditect.ai/api/v1/context/reset-cursor/ \
-H "Authorization: Bearer $TOKEN"
10. Security Considerations
Data in Transit
| Layer | Protection |
|---|---|
| Transport | TLS 1.3 required |
| Authentication | JWT with RS256 signature |
| Authorization | Tenant isolation via middleware |
Data at Rest
| Layer | Protection |
|---|---|
| Local SQLite | OS-level file permissions |
| Cloud SQL | Google-managed encryption + CMEK |
| Backups | Encrypted with Cloud KMS |
Sensitive Data Handling
SENSITIVE_FIELDS = ['api_key', 'password', 'secret', 'token']
def sanitize_content(content: dict) -> dict:
"""Remove or mask sensitive fields before sync."""
sanitized = content.copy()
for key in list(sanitized.keys()):
if any(s in key.lower() for s in SENSITIVE_FIELDS):
sanitized[key] = '[REDACTED]'
return sanitized
Audit Logging
def log_sync_operation(user_id, tenant_id, operation, record_count):
"""Log sync operations for audit trail."""
AuditLog.objects.create(
user_id=user_id,
tenant_id=tenant_id,
action=f'context_sync_{operation}',
details={
'record_count': record_count,
'timestamp': timezone.now().isoformat(),
}
)
Related Documents
| Document | Purpose |
|---|---|
| database-architecture.md | Architecture overview |
| database-schema.md | Table definitions |
| ADR-052 | Intent-aware context |
| ADR-053 | Cloud sync architecture |
Document Version: 1.0.0 Last Updated: January 4, 2026 Author: CODITECT Engineering Team Status: Active