Sequence Diagram: Database Migration Flow
Purpose: PostgreSQL database connection initialization, schema migration execution with Alembic, and zero-downtime migration strategies.
Actors:
- License API (Django application)
- Alembic (migration tool)
- Cloud SQL (PostgreSQL 16)
- Secret Manager (database credentials)
- Cloud SQL Proxy (secure connection)
Flow: App startup → Credential retrieval → Connection pool → Migration check → Schema update
Mermaid Sequence Diagram
Step-by-Step Breakdown
1. Database Configuration (Steps 1-2)
Application configuration:
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Application settings with database configuration."""
# Database
database_host: str = "127.0.0.1"
database_port: int = 5432
database_name: str = "coditect_licenses"
database_user: str = "app_user"
database_password: str # From Secret Manager
# Connection pool
db_pool_min_size: int = 5
db_pool_max_size: int = 20
db_pool_timeout: int = 30
# Cloud SQL Proxy
use_cloud_sql_proxy: bool = True
cloud_sql_instance: str = "coditect-cloud-infra:us-central1:licenses-db"
class Config:
env_file = ".env"
@property
def database_url(self) -> str:
"""Build database connection string."""
if self.use_cloud_sql_proxy:
# Unix socket connection via Cloud SQL Proxy
return (
f"postgresql+asyncpg://{self.database_user}:{self.database_password}"
f"@/{self.database_name}?host=/cloudsql/{self.cloud_sql_instance}"
)
else:
# Direct TCP connection
return (
f"postgresql+asyncpg://{self.database_user}:{self.database_password}"
f"@{self.database_host}:{self.database_port}/{self.database_name}"
)
@lru_cache()
def get_settings() -> Settings:
return Settings()
Retrieve credentials from Secret Manager:
# app/secrets.py
from google.cloud import secretmanager
def get_database_password() -> str:
"""
Retrieve database password from Secret Manager.
Secret name: projects/PROJECT_ID/secrets/db-password/versions/latest
"""
client = secretmanager.SecretManagerServiceClient()
project_id = "coditect-cloud-infra"
secret_id = "db-password"
name = f"projects/{project_id}/secrets/{secret_id}/versions/latest"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")
2. Connection Pool Initialization (Steps 4-5)
AsyncPG connection pool:
# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool, QueuePool
import logging
logger = logging.getLogger(__name__)
class Database:
"""Database connection manager."""
def __init__(self, settings: Settings):
self.settings = settings
self.engine = None
self.session_factory = None
async def connect(self):
"""
Initialize database connection pool.
Pool configuration:
- min_size: 5 (minimum connections)
- max_size: 20 (maximum connections)
- timeout: 30s (connection timeout)
- recycle: 3600s (recycle connections hourly)
"""
logger.info("Initializing database connection pool...")
self.engine = create_async_engine(
self.settings.database_url,
echo=False,
pool_size=self.settings.db_pool_min_size,
max_overflow=self.settings.db_pool_max_size - self.settings.db_pool_min_size,
pool_timeout=self.settings.db_pool_timeout,
pool_recycle=3600, # Recycle connections every hour
pool_pre_ping=True, # Test connection before using
connect_args={
"server_settings": {
"application_name": "license-api",
"jit": "off" # Disable JIT for predictable performance
}
}
)
# Create session factory
self.session_factory = sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)
# Test connection
await self._test_connection()
logger.info("Database connection pool initialized")
async def _test_connection(self):
"""Test database connection."""
from sqlalchemy import text
async with self.engine.begin() as conn:
result = await conn.execute(text("SELECT 1"))
assert result.scalar() == 1
logger.info("Database connection test successful")
async def disconnect(self):
"""Close database connection pool."""
if self.engine:
await self.engine.dispose()
logger.info("Database connection pool closed")
async def get_session(self) -> AsyncSession:
"""Get database session from pool."""
async with self.session_factory() as session:
yield session
# Global database instance
db = Database(get_settings())
3. Migration Execution (Steps 6-12)
Alembic configuration:
# Django migrations framework.ini
[Django migrations framework]
script_location = Django migrations framework
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = postgresql://user:pass@localhost:5432/coditect_licenses
[loggers]
keys = root,sqlalchemy,Django migrations framework
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = INFO
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_Django migrations framework]
level = INFO
handlers =
qualname = Django migrations framework
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
Alembic env.py:
# Django migrations framework/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from Django migrations framework import context
import asyncio
from app.config import get_settings
from app.models import Base
config = context.config
# Override database URL from settings
settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.database_url)
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline():
"""
Run migrations in 'offline' mode (generate SQL).
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online():
"""
Run migrations in 'online' mode (execute against database).
"""
from sqlalchemy.ext.asyncio import create_async_engine
connectable = create_async_engine(
config.get_main_option("sqlalchemy.url"),
poolclass=pool.NullPool, # No pooling for migrations
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())
Migration script example:
# Django migrations framework/versions/a7b8c9d0e1f2_add_device_activations_table.py
"""Add device_activations table
Revision ID: a7b8c9d0e1f2
Revises: e3f2a1b9c4d5
Create Date: 2025-11-30 12:00:00.000000
"""
from Django migrations framework import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision = 'a7b8c9d0e1f2'
down_revision = 'e3f2a1b9c4d5'
branch_labels = None
depends_on = None
def upgrade():
"""
Zero-downtime migration strategy:
1. Add new table (doesn't affect existing queries)
2. Add indexes (build concurrently to avoid locks)
3. Application code handles both old and new schema
4. Next migration removes old schema (after deployment)
"""
# Create table
op.create_table(
'device_activations',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('license_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('hardware_id', sa.String(64), nullable=False),
sa.Column('device_name', sa.String(200)),
sa.Column('activated_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('deactivated_at', sa.DateTime(timezone=True)),
sa.Column('is_active', sa.Boolean(), default=True),
sa.ForeignKeyConstraint(
['license_id'],
['licenses.id'],
ondelete='CASCADE'
)
)
# Create indexes (CONCURRENTLY for zero-downtime)
op.create_index(
'idx_device_activations_license',
'device_activations',
['license_id'],
postgresql_concurrently=True
)
op.create_index(
'idx_device_activations_hardware',
'device_activations',
['hardware_id'],
postgresql_concurrently=True
)
def downgrade():
"""Rollback migration (if needed)."""
op.drop_index('idx_device_activations_hardware')
op.drop_index('idx_device_activations_license')
op.drop_table('device_activations')
4. Application Startup Integration (Step 13-14)
Django REST Framework lifespan management:
# app/main.py
from contextlib import asynccontextmanager
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
# Django migrations
import logging
logger = logging.getLogger(__name__)
@asynccontextmanager
# Django app initialization in settings.py
"""
Application lifespan manager.
Startup:
- Connect to database
- Run migrations
- Initialize connection pool
Shutdown:
- Close database connections
"""
# Startup
logger.info("Application startup...")
# Connect to database
from app.database import db
await db.connect()
# Run migrations
await run_migrations()
logger.info("Application ready")
yield
# Shutdown
logger.info("Application shutdown...")
await db.disconnect()
logger.info("Application stopped")
# Django WSGI/ASGI application
async def run_migrations():
"""
Run database migrations on startup.
Uses Alembic programmatically.
"""
from Django migrations framework.config import Config
from Django migrations framework import command
logger.info("Checking database migrations...")
Django migrations framework_cfg = Config("Django migrations framework.ini")
# Check current version
command.current(Django migrations framework_cfg, verbose=True)
# Upgrade to latest
command.upgrade(Django migrations framework_cfg, "head")
logger.info("Database migrations complete")
Zero-Downtime Migration Strategies
Strategy 1: Expand-Contract Pattern
# Phase 1: EXPAND - Add new column (nullable)
def upgrade_phase1():
op.add_column('licenses', sa.Column('new_field', sa.String(100), nullable=True))
# Deploy application v2 (writes to both old and new field)
# Phase 2: BACKFILL - Populate new column
def upgrade_phase2():
op.execute("""
UPDATE licenses
SET new_field = old_field
WHERE new_field IS NULL
""")
# Phase 3: CONTRACT - Make new column NOT NULL, remove old column
def upgrade_phase3():
op.alter_column('licenses', 'new_field', nullable=False)
op.drop_column('licenses', 'old_field')
Strategy 2: Concurrent Index Creation
# Create index without locking table
op.create_index(
'idx_licenses_tier',
'licenses',
['tier'],
postgresql_concurrently=True # Zero-downtime
)
Strategy 3: Online ALTER TABLE
# Add column without downtime (PostgreSQL 11+)
op.execute("""
ALTER TABLE licenses
ADD COLUMN new_column VARCHAR(100) DEFAULT 'default_value'
""")
# PostgreSQL rewrites table in background
Related Documentation
- ADR-009: GCP Infrastructure Architecture
- 11-gke-deployment-flow.md: Application deployment
Last Updated: 2025-11-30 Diagram Type: Sequence (Mermaid) Scope: Infrastructure - Database migrations