Agent Skills Framework Extension
Database Design Patterns Skill
When to Use This Skill
Use this skill when implementing database design patterns patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
Production-grade database schema design, migrations, optimization, and scaling patterns.
Core Capabilities
- Schema Design - Normalization, denormalization, relationship modeling
- Migrations - Zero-downtime schema changes, rollback strategies
- Query Optimization - Indexing, query planning, performance tuning
- Connection Pooling - Resource management, connection lifecycle
- Scaling Patterns - Replication, sharding, partitioning
PostgreSQL Schema Design
-- migrations/001_initial_schema.sql
-- Production schema with proper normalization and indexing
BEGIN;
-- Enable extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pg_trgm"; -- Trigram indexing for text search
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
-- Users table with authentication
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
email VARCHAR(255) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
email_verified BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ, -- Soft delete
CONSTRAINT email_format CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$')
);
-- Index for auth queries
CREATE INDEX idx_users_email ON users(email) WHERE deleted_at IS NULL;
CREATE INDEX idx_users_created_at ON users(created_at);
-- Organizations (multi-tenancy)
CREATE TABLE organizations (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(100) NOT NULL UNIQUE,
owner_id UUID NOT NULL REFERENCES users(id) ON DELETE RESTRICT,
plan VARCHAR(50) NOT NULL DEFAULT 'free',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT slug_format CHECK (slug ~* '^[a-z0-9-]+$')
);
CREATE INDEX idx_organizations_owner ON organizations(owner_id);
-- Organization membership
CREATE TABLE organization_members (
organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL DEFAULT 'member',
invited_by UUID REFERENCES users(id),
joined_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (organization_id, user_id)
);
CREATE INDEX idx_org_members_user ON organization_members(user_id);
-- Projects (scoped to organization)
CREATE TABLE projects (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'active',
created_by UUID NOT NULL REFERENCES users(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
archived_at TIMESTAMPTZ
);
-- Composite index for tenant isolation
CREATE INDEX idx_projects_org_status ON projects(organization_id, status)
WHERE archived_at IS NULL;
-- Full-text search index
CREATE INDEX idx_projects_search ON projects
USING gin(to_tsvector('english', name || ' ' || COALESCE(description, '')));
-- Tasks (hierarchical with parent-child relationships)
CREATE TABLE tasks (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
parent_task_id UUID REFERENCES tasks(id) ON DELETE CASCADE,
title VARCHAR(500) NOT NULL,
description TEXT,
status VARCHAR(50) NOT NULL DEFAULT 'todo',
priority INTEGER DEFAULT 0,
assigned_to UUID REFERENCES users(id) ON DELETE SET NULL,
created_by UUID NOT NULL REFERENCES users(id),
due_date TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Prevent circular dependencies
CONSTRAINT no_self_reference CHECK (id != parent_task_id)
);
-- Indexes for common queries
CREATE INDEX idx_tasks_project ON tasks(project_id);
CREATE INDEX idx_tasks_assigned ON tasks(assigned_to) WHERE assigned_to IS NOT NULL;
CREATE INDEX idx_tasks_status ON tasks(status);
CREATE INDEX idx_tasks_due_date ON tasks(due_date) WHERE due_date IS NOT NULL;
-- Materialized path for hierarchical queries
CREATE INDEX idx_tasks_parent ON tasks(parent_task_id) WHERE parent_task_id IS NOT NULL;
-- Task comments with versioning
CREATE TABLE task_comments (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id),
content TEXT NOT NULL,
content_history JSONB, -- Store edit history
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
CREATE INDEX idx_comments_task ON task_comments(task_id) WHERE deleted_at IS NULL;
CREATE INDEX idx_comments_created ON task_comments(created_at DESC);
-- Audit log for compliance
CREATE TABLE audit_logs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
user_id UUID REFERENCES users(id),
action VARCHAR(100) NOT NULL,
entity_type VARCHAR(100) NOT NULL,
entity_id UUID NOT NULL,
changes JSONB, -- Store before/after state
ip_address INET,
user_agent TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Partition by month for performance
CREATE INDEX idx_audit_logs_org_created ON audit_logs(organization_id, created_at DESC);
CREATE INDEX idx_audit_logs_entity ON audit_logs(entity_type, entity_id);
-- Triggers for updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_organizations_updated_at BEFORE UPDATE ON organizations
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_projects_updated_at BEFORE UPDATE ON projects
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_tasks_updated_at BEFORE UPDATE ON tasks
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
-- Row-level security for multi-tenancy
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE tasks ENABLE ROW LEVEL SECURITY;
-- RLS policies (example)
CREATE POLICY org_isolation ON projects
USING (organization_id = current_setting('app.current_organization_id')::UUID);
CREATE POLICY task_isolation ON tasks
USING (project_id IN (
SELECT id FROM projects
WHERE organization_id = current_setting('app.current_organization_id')::UUID
));
COMMIT;
Zero-Downtime Migration Pattern
// src/db/migrations/migration_runner.rs
use sqlx::{PgPool, Postgres, Transaction};
use std::time::Duration;
use anyhow::{Result, Context};
/// Zero-downtime migration strategies
pub struct MigrationRunner {
pool: PgPool,
}
impl MigrationRunner {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
/// Add column with default (zero downtime)
pub async fn add_column_with_default(&self) -> Result<()> {
let mut tx = self.pool.begin().await?;
// Step 1: Add column as nullable (fast)
sqlx::query(
"ALTER TABLE users ADD COLUMN IF NOT EXISTS phone_number VARCHAR(20)"
)
.execute(&mut *tx)
.await?;
// Step 2: Backfill in batches (avoids long locks)
self.backfill_in_batches(
&mut tx,
"UPDATE users SET phone_number = '' WHERE phone_number IS NULL",
1000
).await?;
// Step 3: Add NOT NULL constraint
sqlx::query(
"ALTER TABLE users ALTER COLUMN phone_number SET NOT NULL"
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
/// Rename column (zero downtime)
pub async fn rename_column_safe(&self) -> Result<()> {
// Step 1: Add new column
sqlx::query(
"ALTER TABLE tasks ADD COLUMN IF NOT EXISTS assignee_id UUID"
)
.execute(&self.pool)
.await?;
// Step 2: Backfill data
sqlx::query(
"UPDATE tasks SET assignee_id = assigned_to WHERE assignee_id IS NULL"
)
.execute(&self.pool)
.await?;
// Step 3: Deploy application code that writes to BOTH columns
tracing::info!("Deploy app version that writes to both assigned_to and assignee_id");
// Step 4: Verify data consistency
let inconsistent = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM tasks WHERE assigned_to != assignee_id"
)
.fetch_one(&self.pool)
.await?;
if inconsistent > 0 {
anyhow::bail!("Data inconsistency detected: {} rows", inconsistent);
}
// Step 5: Deploy app version that only uses assignee_id
tracing::info!("Deploy app version that only uses assignee_id");
// Step 6: Drop old column
sqlx::query("ALTER TABLE tasks DROP COLUMN assigned_to")
.execute(&self.pool)
.await?;
Ok(())
}
/// Add index concurrently (non-blocking)
pub async fn add_index_concurrent(&self) -> Result<()> {
// CONCURRENTLY prevents blocking writes
sqlx::query(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_priority
ON tasks(priority) WHERE status != 'completed'"
)
.execute(&self.pool)
.await
.context("Failed to create index")?;
Ok(())
}
/// Backfill data in batches to avoid long locks
async fn backfill_in_batches(
&self,
tx: &mut Transaction<'_, Postgres>,
update_query: &str,
batch_size: i64
) -> Result<()> {
loop {
let updated = sqlx::query(
&format!("{} LIMIT {}", update_query, batch_size)
)
.execute(&mut **tx)
.await?
.rows_affected();
if updated == 0 {
break;
}
// Small delay to allow other queries
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
}
Query Optimization Patterns
// src/db/query_optimizer.rs
use sqlx::{PgPool, QueryBuilder, Postgres};
use uuid::Uuid;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Task {
pub id: Uuid,
pub title: String,
pub status: String,
pub assigned_to: Option<Uuid>,
pub created_at: chrono::DateTime<chrono::Utc>,
}
pub struct TaskRepository {
pool: PgPool,
}
impl TaskRepository {
/// Optimized query with proper indexing
pub async fn find_by_project_paginated(
&self,
project_id: Uuid,
page: i64,
page_size: i64
) -> Result<Vec<Task>, sqlx::Error> {
// Uses idx_tasks_project index
let offset = (page - 1) * page_size;
sqlx::query_as!(
Task,
r#"
SELECT id, title, status, assigned_to, created_at
FROM tasks
WHERE project_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
"#,
project_id,
page_size,
offset
)
.fetch_all(&self.pool)
.await
}
/// Use covering index to avoid table lookups
pub async fn count_by_status(
&self,
project_id: Uuid
) -> Result<std::collections::HashMap<String, i64>, sqlx::Error> {
// Uses idx_tasks_project + idx_tasks_status
let rows = sqlx::query!(
r#"
SELECT status, COUNT(*) as count
FROM tasks
WHERE project_id = $1
GROUP BY status
"#,
project_id
)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter()
.map(|r| (r.status, r.count.unwrap_or(0)))
.collect())
}
/// Bulk insert with COPY (100x faster than individual INSERTs)
pub async fn bulk_insert(&self, tasks: Vec<Task>) -> Result<u64, sqlx::Error> {
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
"INSERT INTO tasks (id, title, status, assigned_to, created_at) "
);
query_builder.push_values(tasks, |mut b, task| {
b.push_bind(task.id)
.push_bind(task.title)
.push_bind(task.status)
.push_bind(task.assigned_to)
.push_bind(task.created_at);
});
let result = query_builder.build().execute(&self.pool).await?;
Ok(result.rows_affected())
}
/// N+1 query prevention with JOIN
pub async fn find_with_assignees(
&self,
project_id: Uuid
) -> Result<Vec<TaskWithUser>, sqlx::Error> {
// Single query instead of N+1
sqlx::query_as!(
TaskWithUser,
r#"
SELECT
t.id as task_id,
t.title,
t.status,
u.id as assignee_id,
u.email as assignee_email
FROM tasks t
LEFT JOIN users u ON t.assigned_to = u.id
WHERE t.project_id = $1
ORDER BY t.created_at DESC
"#,
project_id
)
.fetch_all(&self.pool)
.await
}
/// Use EXPLAIN ANALYZE for query planning
pub async fn explain_query(&self, query: &str) -> Result<String, sqlx::Error> {
let plan = sqlx::query_scalar::<_, String>(
&format!("EXPLAIN ANALYZE {}", query)
)
.fetch_one(&self.pool)
.await?;
Ok(plan)
}
}
#[derive(Debug, Serialize)]
struct TaskWithUser {
task_id: Uuid,
title: String,
status: String,
assignee_id: Option<Uuid>,
assignee_email: Option<String>,
}
Connection Pool Configuration
// src/db/pool.rs
use sqlx::postgres::{PgPoolOptions, PgConnectOptions};
use std::time::Duration;
pub async fn create_pool(database_url: &str) -> Result<sqlx::PgPool, sqlx::Error> {
let connect_options = database_url
.parse::<PgConnectOptions>()?
.application_name("myapp")
.statement_cache_capacity(100); // Prepared statement cache
PgPoolOptions::new()
.max_connections(20) // Tune based on DB and workload
.min_connections(5) // Keep warm connections
.acquire_timeout(Duration::from_secs(30))
.idle_timeout(Duration::from_secs(600)) // 10 min
.max_lifetime(Duration::from_secs(1800)) // 30 min
.test_before_acquire(true) // Validate connections
.after_connect(|conn, _meta| {
Box::pin(async move {
// Set connection-level settings
sqlx::query("SET statement_timeout = '30s'")
.execute(conn)
.await?;
sqlx::query("SET lock_timeout = '10s'")
.execute(conn)
.await?;
Ok(())
})
})
.connect_with(connect_options)
.await
}
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: database-design-patterns
Completed:
- [x] Database schema designed with proper normalization
- [x] Indexes created for common query patterns
- [x] Migration strategy defined (zero-downtime if required)
- [x] Connection pooling configured
- [x] Row-level security policies implemented (if multi-tenant)
- [x] Query optimization verified with EXPLAIN ANALYZE
Outputs:
- migrations/*.sql (schema and migration files)
- src/db/pool.rs or equivalent connection config
- Database documentation in docs/architecture/database/
- Query optimization report
Verification:
- All migrations run successfully
- Query performance meets SLA (<100ms for p95)
- Connection pool metrics within limits
- Multi-tenant isolation verified (if applicable)
Completion Checklist
Before marking this skill as complete, verify:
- Schema normalized to 3NF (unless denormalization justified)
- All foreign keys have proper ON DELETE/ON UPDATE actions
- Indexes created for all frequent query patterns
- Unique constraints and check constraints in place
- Migrations tested with rollback capability
- Connection pool sized appropriately for workload
- Row-level security policies tested (if multi-tenant)
- Query performance profiled with EXPLAIN ANALYZE
- Backup/restore procedures documented
- Database documentation updated
Failure Indicators
This skill has FAILED if:
- ❌ Schema violates normalization principles without justification
- ❌ Missing indexes cause full table scans on large tables
- ❌ Migration causes downtime in production
- ❌ Connection pool exhausted under normal load
- ❌ N+1 query patterns present
- ❌ No rollback strategy for migrations
- ❌ Multi-tenant data leakage possible
- ❌ Query timeouts under normal load
- ❌ Missing foreign key constraints allow orphaned records
- ❌ No monitoring/alerting for database health
When NOT to Use
Do NOT use this skill when:
- Using NoSQL databases (use document-design-patterns instead)
- Simple CRUD with <1000 records (over-engineering)
- Prototyping/MVP phase (premature optimization)
- In-memory data structures suffice
- Third-party schema cannot be modified
- Alternative: Use ORM defaults for simple cases
Use alternative approaches for:
- Time-series data → TimescaleDB or InfluxDB patterns
- Graph data → Neo4j or graph database patterns
- Document storage → MongoDB or DynamoDB patterns
- Cache layer → Redis patterns
- Search → Elasticsearch patterns
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| UUIDs without proper indexing | Slow lookups, fragmentation | Use UUID v7 (time-ordered) or ULID |
| ENUM columns | Schema changes require migrations | Use lookup tables instead |
| Over-normalization | Excessive JOINs, slow queries | Denormalize high-read, low-write tables |
| Generic JSONB columns | No schema validation, query complexity | Use proper columns, JSONB for metadata only |
| Missing connection pool | Exhausted connections, slow app | Use PgPool with proper sizing |
| Synchronous migrations in CI | Deployment downtime | Use blue-green deployment with async backfill |
| No query timeout | Runaway queries lock tables | Set statement_timeout and lock_timeout |
| Text primary keys | Storage bloat, slow joins | Use BIGINT or UUID |
| Soft deletes everywhere | Table bloat, complex queries | Use hard deletes with audit log |
| SELECT * queries | Network overhead, brittleness | Specify columns explicitly |
Principles
This skill embodies these CODITECT principles:
- #5 Eliminate Ambiguity - Explicit schema constraints eliminate data ambiguity
- #6 Clear, Understandable, Explainable - Schema is self-documenting with proper naming
- #8 No Assumptions - Verify query performance with EXPLAIN ANALYZE
- #10 Iterative Refinement - Optimize schema based on real query patterns
- Data Integrity - Foreign keys, constraints, and RLS ensure correctness
- Performance - Proper indexing and pooling for production workloads
Related Standards:
Usage Examples
Schema Design
Apply database-design-patterns skill to design normalized PostgreSQL schema for multi-tenant SaaS application
Zero-Downtime Migration
Apply database-design-patterns skill to add new column with backfill without downtime
Query Optimization
Apply database-design-patterns skill to optimize slow queries with proper indexing and query planning
Integration Points
- foundationdb-patterns - Distributed transactions
- cloud-infrastructure-patterns - Cloud SQL setup
- data-engineering-patterns - ETL pipelines