Skip to main content

Agent Skills Framework Extension

Database Design Patterns Skill

When to Use This Skill

Use this skill when implementing database design patterns patterns in your codebase.

How to Use This Skill

  1. Review the patterns and examples below
  2. Apply the relevant patterns to your implementation
  3. Follow the best practices outlined in this skill

Production-grade database schema design, migrations, optimization, and scaling patterns.

Core Capabilities

  1. Schema Design - Normalization, denormalization, relationship modeling
  2. Migrations - Zero-downtime schema changes, rollback strategies
  3. Query Optimization - Indexing, query planning, performance tuning
  4. Connection Pooling - Resource management, connection lifecycle
  5. Scaling Patterns - Replication, sharding, partitioning

PostgreSQL Schema Design

-- migrations/001_initial_schema.sql
-- Production schema with proper normalization and indexing

BEGIN;

-- Enable extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pg_trgm"; -- Trigram indexing for text search
CREATE EXTENSION IF NOT EXISTS "pgcrypto";

-- Users table with authentication
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
email VARCHAR(255) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
email_verified BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ, -- Soft delete

CONSTRAINT email_format CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$')
);

-- Index for auth queries
CREATE INDEX idx_users_email ON users(email) WHERE deleted_at IS NULL;
CREATE INDEX idx_users_created_at ON users(created_at);

-- Organizations (multi-tenancy)
CREATE TABLE organizations (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(100) NOT NULL UNIQUE,
owner_id UUID NOT NULL REFERENCES users(id) ON DELETE RESTRICT,
plan VARCHAR(50) NOT NULL DEFAULT 'free',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

CONSTRAINT slug_format CHECK (slug ~* '^[a-z0-9-]+$')
);

CREATE INDEX idx_organizations_owner ON organizations(owner_id);

-- Organization membership
CREATE TABLE organization_members (
organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL DEFAULT 'member',
invited_by UUID REFERENCES users(id),
joined_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

PRIMARY KEY (organization_id, user_id)
);

CREATE INDEX idx_org_members_user ON organization_members(user_id);

-- Projects (scoped to organization)
CREATE TABLE projects (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'active',
created_by UUID NOT NULL REFERENCES users(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
archived_at TIMESTAMPTZ
);

-- Composite index for tenant isolation
CREATE INDEX idx_projects_org_status ON projects(organization_id, status)
WHERE archived_at IS NULL;

-- Full-text search index
CREATE INDEX idx_projects_search ON projects
USING gin(to_tsvector('english', name || ' ' || COALESCE(description, '')));

-- Tasks (hierarchical with parent-child relationships)
CREATE TABLE tasks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
parent_task_id UUID REFERENCES tasks(id) ON DELETE CASCADE,
title VARCHAR(500) NOT NULL,
description TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'todo',
priority INTEGER DEFAULT 0,
assigned_to UUID REFERENCES users(id) ON DELETE SET NULL,
created_by UUID NOT NULL REFERENCES users(id),
due_date TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

-- Prevent circular dependencies
CONSTRAINT no_self_reference CHECK (id != parent_task_id)
);

-- Indexes for common queries
CREATE INDEX idx_tasks_project ON tasks(project_id);
CREATE INDEX idx_tasks_assigned ON tasks(assigned_to) WHERE assigned_to IS NOT NULL;
CREATE INDEX idx_tasks_status ON tasks(status);
CREATE INDEX idx_tasks_due_date ON tasks(due_date) WHERE due_date IS NOT NULL;

-- Materialized path for hierarchical queries
CREATE INDEX idx_tasks_parent ON tasks(parent_task_id) WHERE parent_task_id IS NOT NULL;

-- Task comments with versioning
CREATE TABLE task_comments (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id),
content TEXT NOT NULL,
content_history JSONB, -- Store edit history
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);

CREATE INDEX idx_comments_task ON task_comments(task_id) WHERE deleted_at IS NULL;
CREATE INDEX idx_comments_created ON task_comments(created_at DESC);

-- Audit log for compliance
CREATE TABLE audit_logs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
user_id UUID REFERENCES users(id),
action VARCHAR(100) NOT NULL,
entity_type VARCHAR(100) NOT NULL,
entity_id UUID NOT NULL,
changes JSONB, -- Store before/after state
ip_address INET,
user_agent TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Partition by month for performance
CREATE INDEX idx_audit_logs_org_created ON audit_logs(organization_id, created_at DESC);
CREATE INDEX idx_audit_logs_entity ON audit_logs(entity_type, entity_id);

-- Triggers for updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_organizations_updated_at BEFORE UPDATE ON organizations
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_projects_updated_at BEFORE UPDATE ON projects
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_tasks_updated_at BEFORE UPDATE ON tasks
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- Row-level security for multi-tenancy
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE tasks ENABLE ROW LEVEL SECURITY;

-- RLS policies (example)
CREATE POLICY org_isolation ON projects
USING (organization_id = current_setting('app.current_organization_id')::UUID);

CREATE POLICY task_isolation ON tasks
USING (project_id IN (
SELECT id FROM projects
WHERE organization_id = current_setting('app.current_organization_id')::UUID
));

COMMIT;

Zero-Downtime Migration Pattern

// src/db/migrations/migration_runner.rs
use sqlx::{PgPool, Postgres, Transaction};
use std::time::Duration;
use anyhow::{Result, Context};

/// Zero-downtime migration strategies
pub struct MigrationRunner {
pool: PgPool,
}

impl MigrationRunner {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}

/// Add column with default (zero downtime)
pub async fn add_column_with_default(&self) -> Result<()> {
let mut tx = self.pool.begin().await?;

// Step 1: Add column as nullable (fast)
sqlx::query(
"ALTER TABLE users ADD COLUMN IF NOT EXISTS phone_number VARCHAR(20)"
)
.execute(&mut *tx)
.await?;

// Step 2: Backfill in batches (avoids long locks)
self.backfill_in_batches(
&mut tx,
"UPDATE users SET phone_number = '' WHERE phone_number IS NULL",
1000
).await?;

// Step 3: Add NOT NULL constraint
sqlx::query(
"ALTER TABLE users ALTER COLUMN phone_number SET NOT NULL"
)
.execute(&mut *tx)
.await?;

tx.commit().await?;
Ok(())
}

/// Rename column (zero downtime)
pub async fn rename_column_safe(&self) -> Result<()> {
// Step 1: Add new column
sqlx::query(
"ALTER TABLE tasks ADD COLUMN IF NOT EXISTS assignee_id UUID"
)
.execute(&self.pool)
.await?;

// Step 2: Backfill data
sqlx::query(
"UPDATE tasks SET assignee_id = assigned_to WHERE assignee_id IS NULL"
)
.execute(&self.pool)
.await?;

// Step 3: Deploy application code that writes to BOTH columns
tracing::info!("Deploy app version that writes to both assigned_to and assignee_id");

// Step 4: Verify data consistency
let inconsistent = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM tasks WHERE assigned_to != assignee_id"
)
.fetch_one(&self.pool)
.await?;

if inconsistent > 0 {
anyhow::bail!("Data inconsistency detected: {} rows", inconsistent);
}

// Step 5: Deploy app version that only uses assignee_id
tracing::info!("Deploy app version that only uses assignee_id");

// Step 6: Drop old column
sqlx::query("ALTER TABLE tasks DROP COLUMN assigned_to")
.execute(&self.pool)
.await?;

Ok(())
}

/// Add index concurrently (non-blocking)
pub async fn add_index_concurrent(&self) -> Result<()> {
// CONCURRENTLY prevents blocking writes
sqlx::query(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_priority
ON tasks(priority) WHERE status != 'completed'"
)
.execute(&self.pool)
.await
.context("Failed to create index")?;

Ok(())
}

/// Backfill data in batches to avoid long locks
async fn backfill_in_batches(
&self,
tx: &mut Transaction<'_, Postgres>,
update_query: &str,
batch_size: i64
) -> Result<()> {
loop {
let updated = sqlx::query(
&format!("{} LIMIT {}", update_query, batch_size)
)
.execute(&mut **tx)
.await?
.rows_affected();

if updated == 0 {
break;
}

// Small delay to allow other queries
tokio::time::sleep(Duration::from_millis(100)).await;
}

Ok(())
}
}

Query Optimization Patterns

// src/db/query_optimizer.rs
use sqlx::{PgPool, QueryBuilder, Postgres};
use uuid::Uuid;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Task {
pub id: Uuid,
pub title: String,
pub status: String,
pub assigned_to: Option<Uuid>,
pub created_at: chrono::DateTime<chrono::Utc>,
}

pub struct TaskRepository {
pool: PgPool,
}

impl TaskRepository {
/// Optimized query with proper indexing
pub async fn find_by_project_paginated(
&self,
project_id: Uuid,
page: i64,
page_size: i64
) -> Result<Vec<Task>, sqlx::Error> {
// Uses idx_tasks_project index
let offset = (page - 1) * page_size;

sqlx::query_as!(
Task,
r#"
SELECT id, title, status, assigned_to, created_at
FROM tasks
WHERE project_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
"#,
project_id,
page_size,
offset
)
.fetch_all(&self.pool)
.await
}

/// Use covering index to avoid table lookups
pub async fn count_by_status(
&self,
project_id: Uuid
) -> Result<std::collections::HashMap<String, i64>, sqlx::Error> {
// Uses idx_tasks_project + idx_tasks_status
let rows = sqlx::query!(
r#"
SELECT status, COUNT(*) as count
FROM tasks
WHERE project_id = $1
GROUP BY status
"#,
project_id
)
.fetch_all(&self.pool)
.await?;

Ok(rows.into_iter()
.map(|r| (r.status, r.count.unwrap_or(0)))
.collect())
}

/// Bulk insert with COPY (100x faster than individual INSERTs)
pub async fn bulk_insert(&self, tasks: Vec<Task>) -> Result<u64, sqlx::Error> {
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
"INSERT INTO tasks (id, title, status, assigned_to, created_at) "
);

query_builder.push_values(tasks, |mut b, task| {
b.push_bind(task.id)
.push_bind(task.title)
.push_bind(task.status)
.push_bind(task.assigned_to)
.push_bind(task.created_at);
});

let result = query_builder.build().execute(&self.pool).await?;
Ok(result.rows_affected())
}

/// N+1 query prevention with JOIN
pub async fn find_with_assignees(
&self,
project_id: Uuid
) -> Result<Vec<TaskWithUser>, sqlx::Error> {
// Single query instead of N+1
sqlx::query_as!(
TaskWithUser,
r#"
SELECT
t.id as task_id,
t.title,
t.status,
u.id as assignee_id,
u.email as assignee_email
FROM tasks t
LEFT JOIN users u ON t.assigned_to = u.id
WHERE t.project_id = $1
ORDER BY t.created_at DESC
"#,
project_id
)
.fetch_all(&self.pool)
.await
}

/// Use EXPLAIN ANALYZE for query planning
pub async fn explain_query(&self, query: &str) -> Result<String, sqlx::Error> {
let plan = sqlx::query_scalar::<_, String>(
&format!("EXPLAIN ANALYZE {}", query)
)
.fetch_one(&self.pool)
.await?;

Ok(plan)
}
}

#[derive(Debug, Serialize)]
struct TaskWithUser {
task_id: Uuid,
title: String,
status: String,
assignee_id: Option<Uuid>,
assignee_email: Option<String>,
}

Connection Pool Configuration

// src/db/pool.rs
use sqlx::postgres::{PgPoolOptions, PgConnectOptions};
use std::time::Duration;

pub async fn create_pool(database_url: &str) -> Result<sqlx::PgPool, sqlx::Error> {
let connect_options = database_url
.parse::<PgConnectOptions>()?
.application_name("myapp")
.statement_cache_capacity(100); // Prepared statement cache

PgPoolOptions::new()
.max_connections(20) // Tune based on DB and workload
.min_connections(5) // Keep warm connections
.acquire_timeout(Duration::from_secs(30))
.idle_timeout(Duration::from_secs(600)) // 10 min
.max_lifetime(Duration::from_secs(1800)) // 30 min
.test_before_acquire(true) // Validate connections
.after_connect(|conn, _meta| {
Box::pin(async move {
// Set connection-level settings
sqlx::query("SET statement_timeout = '30s'")
.execute(conn)
.await?;

sqlx::query("SET lock_timeout = '10s'")
.execute(conn)
.await?;

Ok(())
})
})
.connect_with(connect_options)
.await
}

Success Output

When successful, this skill MUST output:

✅ SKILL COMPLETE: database-design-patterns

Completed:
- [x] Database schema designed with proper normalization
- [x] Indexes created for common query patterns
- [x] Migration strategy defined (zero-downtime if required)
- [x] Connection pooling configured
- [x] Row-level security policies implemented (if multi-tenant)
- [x] Query optimization verified with EXPLAIN ANALYZE

Outputs:
- migrations/*.sql (schema and migration files)
- src/db/pool.rs or equivalent connection config
- Database documentation in docs/architecture/database/
- Query optimization report

Verification:
- All migrations run successfully
- Query performance meets SLA (<100ms for p95)
- Connection pool metrics within limits
- Multi-tenant isolation verified (if applicable)

Completion Checklist

Before marking this skill as complete, verify:

  • Schema normalized to 3NF (unless denormalization justified)
  • All foreign keys have proper ON DELETE/ON UPDATE actions
  • Indexes created for all frequent query patterns
  • Unique constraints and check constraints in place
  • Migrations tested with rollback capability
  • Connection pool sized appropriately for workload
  • Row-level security policies tested (if multi-tenant)
  • Query performance profiled with EXPLAIN ANALYZE
  • Backup/restore procedures documented
  • Database documentation updated

Failure Indicators

This skill has FAILED if:

  • ❌ Schema violates normalization principles without justification
  • ❌ Missing indexes cause full table scans on large tables
  • ❌ Migration causes downtime in production
  • ❌ Connection pool exhausted under normal load
  • ❌ N+1 query patterns present
  • ❌ No rollback strategy for migrations
  • ❌ Multi-tenant data leakage possible
  • ❌ Query timeouts under normal load
  • ❌ Missing foreign key constraints allow orphaned records
  • ❌ No monitoring/alerting for database health

When NOT to Use

Do NOT use this skill when:

  • Using NoSQL databases (use document-design-patterns instead)
  • Simple CRUD with <1000 records (over-engineering)
  • Prototyping/MVP phase (premature optimization)
  • In-memory data structures suffice
  • Third-party schema cannot be modified
  • Alternative: Use ORM defaults for simple cases

Use alternative approaches for:

  • Time-series data → TimescaleDB or InfluxDB patterns
  • Graph data → Neo4j or graph database patterns
  • Document storage → MongoDB or DynamoDB patterns
  • Cache layer → Redis patterns
  • Search → Elasticsearch patterns

Anti-Patterns (Avoid)

Anti-PatternProblemSolution
UUIDs without proper indexingSlow lookups, fragmentationUse UUID v7 (time-ordered) or ULID
ENUM columnsSchema changes require migrationsUse lookup tables instead
Over-normalizationExcessive JOINs, slow queriesDenormalize high-read, low-write tables
Generic JSONB columnsNo schema validation, query complexityUse proper columns, JSONB for metadata only
Missing connection poolExhausted connections, slow appUse PgPool with proper sizing
Synchronous migrations in CIDeployment downtimeUse blue-green deployment with async backfill
No query timeoutRunaway queries lock tablesSet statement_timeout and lock_timeout
Text primary keysStorage bloat, slow joinsUse BIGINT or UUID
Soft deletes everywhereTable bloat, complex queriesUse hard deletes with audit log
SELECT * queriesNetwork overhead, brittlenessSpecify columns explicitly

Principles

This skill embodies these CODITECT principles:

  • #5 Eliminate Ambiguity - Explicit schema constraints eliminate data ambiguity
  • #6 Clear, Understandable, Explainable - Schema is self-documenting with proper naming
  • #8 No Assumptions - Verify query performance with EXPLAIN ANALYZE
  • #10 Iterative Refinement - Optimize schema based on real query patterns
  • Data Integrity - Foreign keys, constraints, and RLS ensure correctness
  • Performance - Proper indexing and pooling for production workloads

Related Standards:

Usage Examples

Schema Design

Apply database-design-patterns skill to design normalized PostgreSQL schema for multi-tenant SaaS application

Zero-Downtime Migration

Apply database-design-patterns skill to add new column with backfill without downtime

Query Optimization

Apply database-design-patterns skill to optimize slow queries with proper indexing and query planning

Integration Points

  • foundationdb-patterns - Distributed transactions
  • cloud-infrastructure-patterns - Cloud SQL setup
  • data-engineering-patterns - ETL pipelines