Skip to main content

#!/usr/bin/env python3 """ CODITECT PostgreSQL Deployment Script

Deploys the production PostgreSQL schema to GKE or local PostgreSQL instance. Includes pgvector extension for semantic search and multi-tenant RLS.

Usage: # Deploy to GKE (uses kubectl port-forward) python3 scripts/deploy-postgresql.py --target gke

# Deploy to local PostgreSQL
python3 scripts/deploy-postgresql.py --target local

# Deploy and migrate data from SQLite
python3 scripts/deploy-postgresql.py --target gke --migrate

# Test connection
python3 scripts/deploy-postgresql.py --test

Environment Variables: CODITECT_PG_HOST: PostgreSQL host (default: localhost) CODITECT_PG_PORT: PostgreSQL port (default: 5432) CODITECT_PG_DATABASE: Database name (default: coditect) CODITECT_PG_USER: Username (default: coditect) CODITECT_PG_PASSWORD: Password

# For GKE:
GKE_CLUSTER: GKE cluster name
GKE_ZONE: GKE zone
GKE_PROJECT: GCP project ID
PG_POD_NAME: PostgreSQL pod name (default: postgres-0)
PG_NAMESPACE: Kubernetes namespace (default: default)

Author: CODITECT Team Version: 1.0.0 Created: 2025-12-23 """

import os import sys import json import subprocess import argparse import time from pathlib import Path from datetime import datetime, timezone from typing import Optional, Dict, Any

Add scripts directory to path

sys.path.insert(0, str(Path(file).parent))

try: import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT POSTGRES_AVAILABLE = True except ImportError: POSTGRES_AVAILABLE = False print("āš ļø psycopg2 not installed. Install with: pip install psycopg2-binary")

def get_schema_path() -> Path: """Get path to PostgreSQL schema file.""" script_dir = Path(file).parent return script_dir.parent / "internal" / "architecture" / "database" / "POSTGRESQL-PRODUCTION-SCHEMA.sql"

def read_schema() -> str: """Read the PostgreSQL schema SQL file.""" schema_path = get_schema_path() if not schema_path.exists(): raise FileNotFoundError(f"Schema file not found: {schema_path}") return schema_path.read_text()

def get_connection_params(target: str = "local") -> Dict[str, Any]: """Get PostgreSQL connection parameters based on target.""" params = { "host": os.environ.get("CODITECT_PG_HOST", "localhost"), "port": int(os.environ.get("CODITECT_PG_PORT", "5432")), "database": os.environ.get("CODITECT_PG_DATABASE", "coditect"), "user": os.environ.get("CODITECT_PG_USER", "coditect"), "password": os.environ.get("CODITECT_PG_PASSWORD", ""), }

if target == "gke":
# For GKE, we'll use port-forward
params["host"] = "127.0.0.1"
params["port"] = 5432

return params

def setup_gke_port_forward() -> Optional[subprocess.Popen]: """Set up kubectl port-forward to GKE PostgreSQL pod.""" pod_name = os.environ.get("PG_POD_NAME", "postgres-0") namespace = os.environ.get("PG_NAMESPACE", "default")

print(f"šŸ”— Setting up port-forward to {pod_name} in namespace {namespace}...")

try:
# Start port-forward in background
process = subprocess.Popen(
["kubectl", "port-forward", f"pod/{pod_name}", "5432:5432", "-n", namespace],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)

# Wait for port-forward to establish
time.sleep(3)

# Check if process is still running
if process.poll() is not None:
stderr = process.stderr.read().decode()
raise RuntimeError(f"Port-forward failed: {stderr}")

print("āœ… Port-forward established")
return process

except FileNotFoundError:
print("āŒ kubectl not found. Please install kubectl and configure GKE access.")
return None
except Exception as e:
print(f"āŒ Port-forward failed: {e}")
return None

def test_connection(params: Dict[str, Any]) -> bool: """Test PostgreSQL connection.""" print(f"\nšŸ” Testing connection to {params['host']}:{params['port']}...")

try:
conn = psycopg2.connect(**params)
cursor = conn.cursor()

# Test basic query
cursor.execute("SELECT version();")
version = cursor.fetchone()[0]
print(f"āœ… Connected to PostgreSQL")
print(f" Version: {version[:80]}...")

# Check for pgvector extension
cursor.execute("SELECT * FROM pg_extension WHERE extname = 'vector';")
if cursor.fetchone():
print("āœ… pgvector extension installed")
else:
print("āš ļø pgvector extension not installed")

cursor.close()
conn.close()
return True

except Exception as e:
print(f"āŒ Connection failed: {e}")
return False

def create_database_if_not_exists(params: Dict[str, Any]) -> bool: """Create database if it doesn't exist.""" db_name = params["database"]

# Connect to postgres database to create new database
admin_params = params.copy()
admin_params["database"] = "postgres"

try:
conn = psycopg2.connect(**admin_params)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()

# Check if database exists
cursor.execute(
"SELECT 1 FROM pg_database WHERE datname = %s;",
(db_name,)
)

if not cursor.fetchone():
print(f"šŸ“¦ Creating database: {db_name}")
cursor.execute(f'CREATE DATABASE "{db_name}";')
print(f"āœ… Database created: {db_name}")
else:
print(f"āœ… Database exists: {db_name}")

cursor.close()
conn.close()
return True

except Exception as e:
print(f"āŒ Database creation failed: {e}")
return False

def install_extensions(params: Dict[str, Any]) -> bool: """Install required PostgreSQL extensions.""" print("\nšŸ“¦ Installing extensions...")

try:
conn = psycopg2.connect(**params)
cursor = conn.cursor()

extensions = ["vector", "pg_trgm", "btree_gin"]

for ext in extensions:
try:
cursor.execute(f"CREATE EXTENSION IF NOT EXISTS {ext};")
print(f" āœ… {ext}")
except Exception as e:
print(f" āš ļø {ext}: {e}")

conn.commit()
cursor.close()
conn.close()
return True

except Exception as e:
print(f"āŒ Extension installation failed: {e}")
return False

def deploy_schema(params: Dict[str, Any]) -> bool: """Deploy the full schema to PostgreSQL.""" print("\nšŸ“‹ Deploying schema...")

try:
schema_sql = read_schema()
print(f" Schema file: {get_schema_path()}")
print(f" Size: {len(schema_sql):,} bytes")

conn = psycopg2.connect(**params)
cursor = conn.cursor()

# Execute schema in smaller chunks (split by semicolon outside comments)
# For simplicity, execute as one block with error handling
try:
cursor.execute(schema_sql)
conn.commit()
print("āœ… Schema deployed successfully")
except Exception as e:
conn.rollback()
# Try to execute statements one by one
print(f"āš ļø Bulk deploy failed, trying statement by statement...")

statements = schema_sql.split(';')
success = 0
errors = 0

for stmt in statements:
stmt = stmt.strip()
if not stmt or stmt.startswith('--'):
continue

try:
cursor.execute(stmt + ';')
conn.commit()
success += 1
except Exception as stmt_error:
conn.rollback()
if 'already exists' not in str(stmt_error).lower():
errors += 1
if errors <= 3: # Only show first few errors
print(f" āš ļø {str(stmt_error)[:100]}")

print(f" Statements executed: {success}, Errors: {errors}")

cursor.close()
conn.close()
return True

except FileNotFoundError as e:
print(f"āŒ Schema file not found: {e}")
return False
except Exception as e:
print(f"āŒ Schema deployment failed: {e}")
return False

def verify_tables(params: Dict[str, Any]) -> Dict[str, int]: """Verify tables were created and count rows.""" print("\nšŸ“Š Verifying tables...")

tables = [
"organizations", "teams", "team_members", "users",
"sessions", "messages", "entries", "summaries",
"decisions", "code_patterns", "error_solutions",
"doc_index", "knowledge_graph", "knowledge_entities"
]

counts = {}

try:
conn = psycopg2.connect(**params)
cursor = conn.cursor()

for table in tables:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table};")
count = cursor.fetchone()[0]
counts[table] = count
status = "āœ…" if count >= 0 else "āš ļø"
print(f" {status} {table}: {count:,} rows")
except Exception as e:
counts[table] = -1
print(f" āŒ {table}: {e}")

cursor.close()
conn.close()

except Exception as e:
print(f"āŒ Verification failed: {e}")

return counts

def migrate_from_sqlite(params: Dict[str, Any], sqlite_path: str) -> Dict[str, int]: """Migrate data from SQLite to PostgreSQL.""" print(f"\nšŸ”„ Migrating from SQLite: {sqlite_path}")

from db_backend import SQLiteBackend, PostgreSQLBackend, migrate_sqlite_to_postgres

try:
stats = migrate_sqlite_to_postgres(sqlite_path, params)
print(f"\nāœ… Migration complete:")
print(f" Tables: {stats.get('tables', 0)}")
print(f" Rows: {stats.get('rows', 0):,}")
print(f" Errors: {stats.get('errors', 0)}")
return stats

except Exception as e:
print(f"āŒ Migration failed: {e}")
return {"error": str(e)}

def create_initial_org(params: Dict[str, Any], org_name: str = "CODITECT") -> Optional[str]: """Create initial organization and admin user.""" print(f"\nšŸ‘„ Creating initial organization: {org_name}")

try:
conn = psycopg2.connect(**params)
cursor = conn.cursor()

# Check if org exists
cursor.execute("SELECT id FROM organizations WHERE name = %s;", (org_name,))
existing = cursor.fetchone()

if existing:
print(f" Organization exists: {existing[0]}")
return existing[0]

# Create organization
cursor.execute("""
INSERT INTO organizations (name, slug, plan_tier, settings)
VALUES (%s, %s, 'enterprise', %s)
RETURNING id;
""", (
org_name,
org_name.lower().replace(" ", "-"),
json.dumps({
"features": ["pgvector", "knowledge_graph", "multi_tenant"],
"created_by": "deploy-postgresql.py",
"created_at": datetime.now(timezone.utc).isoformat()
})
))

org_id = cursor.fetchone()[0]
conn.commit()

print(f" āœ… Created organization: {org_id}")

# Create default team
cursor.execute("""
INSERT INTO teams (org_id, name, slug)
VALUES (%s, 'Default Team', 'default')
RETURNING id;
""", (org_id,))

team_id = cursor.fetchone()[0]
conn.commit()

print(f" āœ… Created default team: {team_id}")

cursor.close()
conn.close()

return org_id

except Exception as e:
print(f"āŒ Organization creation failed: {e}")
return None

def write_env_file(params: Dict[str, Any], target: str) -> None: """Write environment file for the deployment.""" env_path = Path(file).parent.parent / ".env.postgresql"

env_content = f"""# CODITECT PostgreSQL Configuration

Generated: {datetime.now(timezone.utc).isoformat()}

Target: {target}

Backend selection

CODITECT_DB_BACKEND=postgresql

PostgreSQL connection

CODITECT_PG_HOST={params.get('host', 'localhost')} CODITECT_PG_PORT={params.get('port', 5432)} CODITECT_PG_DATABASE={params.get('database', 'coditect')} CODITECT_PG_USER={params.get('user', 'coditect')} CODITECT_PG_PASSWORD={params.get('password', '')} CODITECT_PG_SSL_MODE=require

Organization context (for RLS)

CODITECT_ORG_ID=

GKE settings (if applicable)

GKE_CLUSTER={os.environ.get('GKE_CLUSTER', '')} GKE_ZONE={os.environ.get('GKE_ZONE', '')} GKE_PROJECT={os.environ.get('GKE_PROJECT', '')} PG_POD_NAME={os.environ.get('PG_POD_NAME', 'postgres-0')} PG_NAMESPACE={os.environ.get('PG_NAMESPACE', 'default')} """

env_path.write_text(env_content)
print(f"\nšŸ“ Environment file written: {env_path}")
print(" Source it with: source .env.postgresql")

def main(): parser = argparse.ArgumentParser( description="Deploy CODITECT PostgreSQL schema", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Test connection python3 scripts/deploy-postgresql.py --test

# Deploy to local PostgreSQL
python3 scripts/deploy-postgresql.py --target local

# Deploy to GKE
python3 scripts/deploy-postgresql.py --target gke

# Full deployment with migration (ADR-118 Four-Tier Architecture)
python3 scripts/deploy-postgresql.py --target gke --migrate --sqlite-path ~/PROJECTS/.coditect-data/context-storage/sessions.db
"""
)

parser.add_argument("--target", choices=["local", "gke"], default="local",
help="Deployment target")
parser.add_argument("--test", action="store_true",
help="Test connection only")
parser.add_argument("--migrate", action="store_true",
help="Migrate data from SQLite (ADR-118: sessions.db or org.db)")
# ADR-114 & ADR-118: Default to sessions.db in user data directory
_default_db = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db"
parser.add_argument("--sqlite-path", default=str(_default_db),
help="Path to SQLite database for migration (default: sessions.db)")
parser.add_argument("--create-org", action="store_true",
help="Create initial organization")
parser.add_argument("--org-name", default="CODITECT",
help="Organization name")
parser.add_argument("--skip-schema", action="store_true",
help="Skip schema deployment")
parser.add_argument("--write-env", action="store_true",
help="Write environment file")

args = parser.parse_args()

if not POSTGRES_AVAILABLE:
print("āŒ psycopg2 required. Install with: pip install psycopg2-binary pgvector")
sys.exit(1)

print("=" * 60)
print("CODITECT PostgreSQL Deployment")
print("=" * 60)
print(f"Target: {args.target}")
print(f"Time: {datetime.now(timezone.utc).isoformat()}")

# Set up port-forward for GKE
port_forward_proc = None
if args.target == "gke":
port_forward_proc = setup_gke_port_forward()
if not port_forward_proc:
print("āŒ Failed to establish GKE connection")
sys.exit(1)

try:
params = get_connection_params(args.target)

# Test connection
if args.test or not test_connection(params):
if args.test:
sys.exit(0)
print("āŒ Connection test failed")
sys.exit(1)

if args.test:
sys.exit(0)

# Create database
if not create_database_if_not_exists(params):
sys.exit(1)

# Install extensions
if not install_extensions(params):
print("āš ļø Some extensions failed to install")

# Deploy schema
if not args.skip_schema:
if not deploy_schema(params):
print("āš ļø Schema deployment had issues")

# Verify tables
table_counts = verify_tables(params)

# Create initial organization
if args.create_org:
create_initial_org(params, args.org_name)

# Migrate from SQLite
if args.migrate:
sqlite_path = Path(args.sqlite_path)
if sqlite_path.exists():
migrate_from_sqlite(params, str(sqlite_path))
else:
print(f"āš ļø SQLite database not found: {sqlite_path}")

# Write environment file
if args.write_env:
write_env_file(params, args.target)

print("\n" + "=" * 60)
print("āœ… PostgreSQL deployment complete!")
print("=" * 60)

if args.target == "gke":
print("\nšŸ“Œ To use PostgreSQL in your session:")
print(" export CODITECT_DB_BACKEND=postgresql")
print(" source .env.postgresql # if generated")

finally:
# Cleanup port-forward
if port_forward_proc:
print("\nšŸ”Œ Closing port-forward...")
port_forward_proc.terminate()
port_forward_proc.wait()

if name == "main": main()