Skip to main content

#!/usr/bin/env python3 """ CODITECT Database Backend Abstraction Layer

Provides unified interface for SQLite (local) and PostgreSQL (production) backends. Automatically selects backend based on environment configuration.

Usage: from db_backend import get_database, DatabaseBackend

db = get_database()  # Auto-selects based on CODITECT_DB_BACKEND env var
db.execute("SELECT * FROM messages WHERE role = %s", ("user",))
results = db.fetchall()

Environment Variables: CODITECT_DB_BACKEND: "sqlite" (default) or "postgresql" CODITECT_DB_PATH: SQLite database path (for sqlite backend) CODITECT_PG_HOST: PostgreSQL host CODITECT_PG_PORT: PostgreSQL port (default: 5432) CODITECT_PG_DATABASE: PostgreSQL database name CODITECT_PG_USER: PostgreSQL username CODITECT_PG_PASSWORD: PostgreSQL password (or use Secret Manager) CODITECT_PG_SSL_MODE: SSL mode (default: require)

# For GKE/Cloud SQL:
CODITECT_PG_SOCKET_DIR: Cloud SQL socket directory
CODITECT_PG_INSTANCE: Cloud SQL instance connection name

Author: CODITECT Team Version: 1.0.0 Created: 2025-12-23 """

import os import json import sqlite3 import logging from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from datetime import datetime, timezone from contextlib import contextmanager

Configure logging

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

Try to import psycopg2 for PostgreSQL

try: import psycopg2 import psycopg2.extras from psycopg2.pool import ThreadedConnectionPool POSTGRES_AVAILABLE = True except ImportError: POSTGRES_AVAILABLE = False logger.warning("psycopg2 not installed. PostgreSQL backend unavailable. Install with: pip install psycopg2-binary")

Try to import pgvector

try: from pgvector.psycopg2 import register_vector PGVECTOR_AVAILABLE = True except ImportError: PGVECTOR_AVAILABLE = False

class DatabaseBackend(ABC): """Abstract base class for database backends."""

@abstractmethod
def connect(self) -> None:
"""Establish database connection."""
pass

@abstractmethod
def close(self) -> None:
"""Close database connection."""
pass

@abstractmethod
def execute(self, query: str, params: Optional[Tuple] = None) -> Any:
"""Execute a query with optional parameters."""
pass

@abstractmethod
def executemany(self, query: str, params_list: List[Tuple]) -> Any:
"""Execute a query with multiple parameter sets."""
pass

@abstractmethod
def fetchone(self) -> Optional[Tuple]:
"""Fetch one result from last query."""
pass

@abstractmethod
def fetchall(self) -> List[Tuple]:
"""Fetch all results from last query."""
pass

@abstractmethod
def fetchmany(self, size: int) -> List[Tuple]:
"""Fetch specified number of results."""
pass

@abstractmethod
def commit(self) -> None:
"""Commit current transaction."""
pass

@abstractmethod
def rollback(self) -> None:
"""Rollback current transaction."""
pass

@abstractmethod
def get_placeholder(self) -> str:
"""Get parameter placeholder style ('?' for SQLite, '%s' for PostgreSQL)."""
pass

@abstractmethod
def adapt_query(self, query: str) -> str:
"""Adapt query syntax for specific backend."""
pass

@contextmanager
def transaction(self):
"""Context manager for transactions."""
try:
yield self
self.commit()
except Exception as e:
self.rollback()
raise e

def table_exists(self, table_name: str) -> bool:
"""Check if a table exists."""
pass

@property
@abstractmethod
def backend_type(self) -> str:
"""Return backend type identifier."""
pass

class SQLiteBackend(DatabaseBackend): """SQLite database backend for local development."""

def __init__(self, db_path: str):
self.db_path = db_path
self.conn: Optional[sqlite3.Connection] = None
self.cursor: Optional[sqlite3.Cursor] = None
self._last_cursor: Optional[sqlite3.Cursor] = None

def connect(self) -> None:
"""Establish SQLite connection."""
# Ensure directory exists
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)

self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()

# Enable FTS5 and other optimizations
self.cursor.execute("PRAGMA journal_mode=WAL")
self.cursor.execute("PRAGMA synchronous=NORMAL")
self.cursor.execute("PRAGMA cache_size=-64000") # 64MB cache
self.cursor.execute("PRAGMA temp_store=MEMORY")

logger.info(f"Connected to SQLite: {self.db_path}")

def close(self) -> None:
"""Close SQLite connection."""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
logger.info("SQLite connection closed")

def execute(self, query: str, params: Optional[Tuple] = None) -> sqlite3.Cursor:
"""Execute query with SQLite."""
if not self.conn:
self.connect()

adapted_query = self.adapt_query(query)
if params:
self._last_cursor = self.conn.execute(adapted_query, params)
else:
self._last_cursor = self.conn.execute(adapted_query)
return self._last_cursor

def executemany(self, query: str, params_list: List[Tuple]) -> sqlite3.Cursor:
"""Execute query with multiple parameter sets."""
if not self.conn:
self.connect()

adapted_query = self.adapt_query(query)
self._last_cursor = self.conn.executemany(adapted_query, params_list)
return self._last_cursor

def fetchone(self) -> Optional[Tuple]:
"""Fetch one result."""
if self._last_cursor:
return self._last_cursor.fetchone()
return None

def fetchall(self) -> List[Tuple]:
"""Fetch all results."""
if self._last_cursor:
return self._last_cursor.fetchall()
return []

def fetchmany(self, size: int) -> List[Tuple]:
"""Fetch specified number of results."""
if self._last_cursor:
return self._last_cursor.fetchmany(size)
return []

def commit(self) -> None:
"""Commit transaction."""
if self.conn:
self.conn.commit()

def rollback(self) -> None:
"""Rollback transaction."""
if self.conn:
self.conn.rollback()

def get_placeholder(self) -> str:
"""SQLite uses ? placeholders."""
return "?"

def adapt_query(self, query: str) -> str:
"""Adapt PostgreSQL-style queries to SQLite.

Conversions:
- %s -> ?
- RETURNING id -> (handled separately)
- vector type -> TEXT (JSON array)
- NOW() -> datetime('now')
- ILIKE -> LIKE (SQLite is case-insensitive by default for ASCII)
"""
# Replace %s with ?
adapted = query.replace("%s", "?")

# Replace PostgreSQL functions
adapted = adapted.replace("NOW()", "datetime('now')")
adapted = adapted.replace("CURRENT_TIMESTAMP", "datetime('now')")

# Handle ILIKE (SQLite LIKE is case-insensitive for ASCII)
adapted = adapted.replace(" ILIKE ", " LIKE ")

# Handle array syntax (PostgreSQL arrays become JSON in SQLite)
# This is a simplified handling - complex arrays need manual conversion

return adapted

def table_exists(self, table_name: str) -> bool:
"""Check if table exists in SQLite."""
self.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
return self.fetchone() is not None

@property
def backend_type(self) -> str:
return "sqlite"

@property
def lastrowid(self) -> Optional[int]:
"""Get last inserted row ID."""
if self._last_cursor:
return self._last_cursor.lastrowid
return None

class PostgreSQLBackend(DatabaseBackend): """PostgreSQL database backend for production."""

def __init__(
self,
host: str = "localhost",
port: int = 5432,
database: str = "coditect",
user: str = "coditect",
password: Optional[str] = None,
ssl_mode: str = "require",
socket_dir: Optional[str] = None,
instance_connection: Optional[str] = None,
pool_min: int = 1,
pool_max: int = 10,
org_id: Optional[str] = None # For multi-tenant RLS
):
if not POSTGRES_AVAILABLE:
raise ImportError("psycopg2 not available. Install with: pip install psycopg2-binary")

self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.ssl_mode = ssl_mode
self.socket_dir = socket_dir
self.instance_connection = instance_connection
self.pool_min = pool_min
self.pool_max = pool_max
self.org_id = org_id

self.pool: Optional[ThreadedConnectionPool] = None
self.conn = None
self._last_cursor = None

def _get_connection_params(self) -> Dict[str, Any]:
"""Build connection parameters."""
params = {
"database": self.database,
"user": self.user,
}

if self.password:
params["password"] = self.password

# Cloud SQL connection via Unix socket
if self.socket_dir and self.instance_connection:
params["host"] = f"{self.socket_dir}/{self.instance_connection}"
else:
params["host"] = self.host
params["port"] = self.port
params["sslmode"] = self.ssl_mode

return params

def connect(self) -> None:
"""Establish PostgreSQL connection pool."""
params = self._get_connection_params()

try:
# Create connection pool
self.pool = ThreadedConnectionPool(
self.pool_min,
self.pool_max,
**params
)

# Get a connection and set up
self.conn = self.pool.getconn()
self.conn.autocommit = False

# Register pgvector if available
if PGVECTOR_AVAILABLE:
register_vector(self.conn)
logger.info("pgvector extension registered")

# Set organization context for RLS if provided
if self.org_id:
with self.conn.cursor() as cur:
cur.execute("SET app.current_org_id = %s", (self.org_id,))

logger.info(f"Connected to PostgreSQL: {self.database}@{self.host}")

except Exception as e:
logger.error(f"PostgreSQL connection failed: {e}")
raise

def close(self) -> None:
"""Close PostgreSQL connection and pool."""
if self.conn and self.pool:
self.pool.putconn(self.conn)
if self.pool:
self.pool.closeall()
logger.info("PostgreSQL connection closed")

def execute(self, query: str, params: Optional[Tuple] = None) -> Any:
"""Execute query with PostgreSQL."""
if not self.conn:
self.connect()

self._last_cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

try:
if params:
self._last_cursor.execute(query, params)
else:
self._last_cursor.execute(query)
return self._last_cursor
except Exception as e:
logger.error(f"Query execution failed: {e}\nQuery: {query[:200]}")
raise

def executemany(self, query: str, params_list: List[Tuple]) -> Any:
"""Execute query with multiple parameter sets."""
if not self.conn:
self.connect()

self._last_cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

try:
self._last_cursor.executemany(query, params_list)
return self._last_cursor
except Exception as e:
logger.error(f"Batch execution failed: {e}")
raise

def fetchone(self) -> Optional[Dict]:
"""Fetch one result as dict."""
if self._last_cursor:
return self._last_cursor.fetchone()
return None

def fetchall(self) -> List[Dict]:
"""Fetch all results as list of dicts."""
if self._last_cursor:
return self._last_cursor.fetchall()
return []

def fetchmany(self, size: int) -> List[Dict]:
"""Fetch specified number of results."""
if self._last_cursor:
return self._last_cursor.fetchmany(size)
return []

def commit(self) -> None:
"""Commit transaction."""
if self.conn:
self.conn.commit()

def rollback(self) -> None:
"""Rollback transaction."""
if self.conn:
self.conn.rollback()

def get_placeholder(self) -> str:
"""PostgreSQL uses %s placeholders."""
return "%s"

def adapt_query(self, query: str) -> str:
"""PostgreSQL queries don't need adaptation (native format)."""
return query

def table_exists(self, table_name: str) -> bool:
"""Check if table exists in PostgreSQL."""
self.execute(
"""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = %s
)
""",
(table_name,)
)
result = self.fetchone()
return result and result.get('exists', False)

@property
def backend_type(self) -> str:
return "postgresql"

def execute_returning(self, query: str, params: Optional[Tuple] = None) -> Optional[int]:
"""Execute INSERT with RETURNING clause and return the id."""
self.execute(query, params)
result = self.fetchone()
if result:
return result.get('id')
return None

def set_org_context(self, org_id: str) -> None:
"""Set organization context for Row-Level Security."""
self.org_id = org_id
if self.conn:
with self.conn.cursor() as cur:
cur.execute("SET app.current_org_id = %s", (org_id,))

def semantic_search(
self,
embedding: List[float],
table: str = "messages",
limit: int = 10,
threshold: float = 0.7
) -> List[Dict]:
"""Perform semantic search using pgvector."""
if not PGVECTOR_AVAILABLE:
logger.warning("pgvector not available for semantic search")
return []

query = f"""
SELECT *, 1 - (embedding <=> %s::vector) as similarity
FROM {table}
WHERE embedding IS NOT NULL
AND 1 - (embedding <=> %s::vector) > %s
ORDER BY embedding <=> %s::vector
LIMIT %s
"""

self.execute(query, (embedding, embedding, threshold, embedding, limit))
return self.fetchall()

def get_database( backend: Optional[str] = None, **kwargs ) -> DatabaseBackend: """ Factory function to get appropriate database backend.

Args:
backend: "sqlite" or "postgresql". If None, uses CODITECT_DB_BACKEND env var.
**kwargs: Backend-specific configuration parameters.

Returns:
DatabaseBackend instance (SQLite or PostgreSQL).

Environment Variables:
CODITECT_DB_BACKEND: "sqlite" (default) or "postgresql"

For SQLite:
CODITECT_DB_PATH: Database file path

For PostgreSQL:
CODITECT_PG_HOST: Host
CODITECT_PG_PORT: Port (default 5432)
CODITECT_PG_DATABASE: Database name
CODITECT_PG_USER: Username
CODITECT_PG_PASSWORD: Password
CODITECT_PG_SSL_MODE: SSL mode
CODITECT_PG_SOCKET_DIR: Cloud SQL socket dir
CODITECT_PG_INSTANCE: Cloud SQL instance
CODITECT_ORG_ID: Organization ID for RLS
"""
backend_type = backend or os.environ.get("CODITECT_DB_BACKEND", "sqlite")

if backend_type == "postgresql":
if not POSTGRES_AVAILABLE:
raise ImportError(
"PostgreSQL backend requested but psycopg2 not installed. "
"Install with: pip install psycopg2-binary pgvector"
)

return PostgreSQLBackend(
host=kwargs.get("host") or os.environ.get("CODITECT_PG_HOST", "localhost"),
port=int(kwargs.get("port") or os.environ.get("CODITECT_PG_PORT", "5432")),
database=kwargs.get("database") or os.environ.get("CODITECT_PG_DATABASE", "coditect"),
user=kwargs.get("user") or os.environ.get("CODITECT_PG_USER", "coditect"),
password=kwargs.get("password") or os.environ.get("CODITECT_PG_PASSWORD"),
ssl_mode=kwargs.get("ssl_mode") or os.environ.get("CODITECT_PG_SSL_MODE", "require"),
socket_dir=kwargs.get("socket_dir") or os.environ.get("CODITECT_PG_SOCKET_DIR"),
instance_connection=kwargs.get("instance") or os.environ.get("CODITECT_PG_INSTANCE"),
pool_min=kwargs.get("pool_min", 1),
pool_max=kwargs.get("pool_max", 10),
org_id=kwargs.get("org_id") or os.environ.get("CODITECT_ORG_ID")
)

else: # SQLite
# ADR-114 & ADR-118: Use centralized path discovery for user data
# Default to sessions.db (Tier 3) unless explicitly specified
script_dir = Path(__file__).parent
try:
import sys
sys.path.insert(0, str(script_dir / "core"))
from paths import get_sessions_db_path, SESSIONS_DB
default_path = SESSIONS_DB
except ImportError:
# Fallback for backward compatibility
_user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if _user_data.exists():
default_path = _user_data / "sessions.db"
else:
# Legacy fallback
default_path = script_dir.parent / "context-storage" / "sessions.db"

db_path = kwargs.get("db_path") or os.environ.get("CODITECT_DB_PATH", str(default_path))

return SQLiteBackend(db_path=db_path)

class QueryBuilder: """ Cross-backend SQL query builder.

Generates SQL that works with both SQLite and PostgreSQL.
"""

def __init__(self, backend: DatabaseBackend):
self.backend = backend
self.placeholder = backend.get_placeholder()

def insert(
self,
table: str,
columns: List[str],
returning: Optional[str] = None
) -> str:
"""Build INSERT query."""
placeholders = ", ".join([self.placeholder] * len(columns))
cols = ", ".join(columns)

query = f"INSERT INTO {table} ({cols}) VALUES ({placeholders})"

if returning and self.backend.backend_type == "postgresql":
query += f" RETURNING {returning}"

return query

def select(
self,
table: str,
columns: List[str] = ["*"],
where: Optional[str] = None,
order_by: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None
) -> str:
"""Build SELECT query."""
cols = ", ".join(columns)
query = f"SELECT {cols} FROM {table}"

if where:
query += f" WHERE {where}"
if order_by:
query += f" ORDER BY {order_by}"
if limit:
query += f" LIMIT {limit}"
if offset:
query += f" OFFSET {offset}"

return query

def update(
self,
table: str,
columns: List[str],
where: str
) -> str:
"""Build UPDATE query."""
sets = ", ".join([f"{col} = {self.placeholder}" for col in columns])
return f"UPDATE {table} SET {sets} WHERE {where}"

def delete(self, table: str, where: str) -> str:
"""Build DELETE query."""
return f"DELETE FROM {table} WHERE {where}"

def fts_search(
self,
table: str,
fts_table: str,
query_column: str,
columns: List[str] = ["*"]
) -> str:
"""Build full-text search query (backend-specific)."""
cols = ", ".join(columns)

if self.backend.backend_type == "sqlite":
return f"""
SELECT {cols}, rank
FROM {fts_table}
WHERE {query_column} MATCH {self.placeholder}
ORDER BY rank
"""
else: # PostgreSQL
return f"""
SELECT {cols},
ts_rank(to_tsvector('english', {query_column}),
plainto_tsquery('english', {self.placeholder})) as rank
FROM {table}
WHERE to_tsvector('english', {query_column}) @@
plainto_tsquery('english', {self.placeholder})
ORDER BY rank DESC
"""

def vector_search(
self,
table: str,
embedding_column: str = "embedding",
columns: List[str] = ["*"],
threshold: float = 0.7
) -> str:
"""Build vector similarity search query (PostgreSQL only)."""
if self.backend.backend_type != "postgresql":
raise NotImplementedError("Vector search only available with PostgreSQL + pgvector")

cols = ", ".join(columns)
return f"""
SELECT {cols},
1 - ({embedding_column} <=> {self.placeholder}::vector) as similarity
FROM {table}
WHERE {embedding_column} IS NOT NULL
AND 1 - ({embedding_column} <=> {self.placeholder}::vector) > {threshold}
ORDER BY {embedding_column} <=> {self.placeholder}::vector
LIMIT {self.placeholder}
"""

Convenience functions for common operations

def init_database(backend: Optional[str] = None, **kwargs) -> DatabaseBackend: """Initialize and return database backend with schema setup.""" db = get_database(backend, **kwargs) db.connect() return db

def migrate_sqlite_to_postgres( sqlite_path: str, pg_config: Dict[str, Any], batch_size: int = 1000 ) -> Dict[str, int]: """ Migrate data from SQLite to PostgreSQL.

Args:
sqlite_path: Path to SQLite database
pg_config: PostgreSQL connection parameters
batch_size: Number of rows per batch

Returns:
Dict with migration statistics
"""
stats = {"tables": 0, "rows": 0, "errors": 0}

# Connect to both databases
sqlite_db = SQLiteBackend(sqlite_path)
sqlite_db.connect()

pg_db = PostgreSQLBackend(**pg_config)
pg_db.connect()

try:
# Get list of tables from SQLite
sqlite_db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
)
tables = [row['name'] for row in sqlite_db.fetchall()]

for table in tables:
logger.info(f"Migrating table: {table}")

try:
# Get column info
sqlite_db.execute(f"PRAGMA table_info({table})")
columns = [row['name'] for row in sqlite_db.fetchall()]

# Skip FTS virtual tables
if table.endswith('_fts') or table.endswith('_content'):
continue

# Count rows
sqlite_db.execute(f"SELECT COUNT(*) as count FROM {table}")
total = sqlite_db.fetchone()['count']

# Migrate in batches
offset = 0
while offset < total:
sqlite_db.execute(
f"SELECT * FROM {table} LIMIT {batch_size} OFFSET {offset}"
)
rows = sqlite_db.fetchall()

if not rows:
break

# Insert into PostgreSQL
placeholders = ", ".join(["%s"] * len(columns))
cols = ", ".join(columns)

for row in rows:
try:
values = tuple(row[col] for col in columns)
pg_db.execute(
f"INSERT INTO {table} ({cols}) VALUES ({placeholders}) ON CONFLICT DO NOTHING",
values
)
stats["rows"] += 1
except Exception as e:
logger.warning(f"Row insert error in {table}: {e}")
stats["errors"] += 1

pg_db.commit()
offset += batch_size
logger.info(f" Migrated {min(offset, total)}/{total} rows")

stats["tables"] += 1

except Exception as e:
logger.error(f"Table migration error for {table}: {e}")
stats["errors"] += 1
pg_db.rollback()

logger.info(f"Migration complete: {stats}")

finally:
sqlite_db.close()
pg_db.close()

return stats

if name == "main": import argparse

parser = argparse.ArgumentParser(description="CODITECT Database Backend")
parser.add_argument("--backend", choices=["sqlite", "postgresql"], default="sqlite",
help="Database backend to use")
parser.add_argument("--test", action="store_true", help="Run connection test")
parser.add_argument("--migrate", action="store_true", help="Migrate SQLite to PostgreSQL")
parser.add_argument("--sqlite-path", help="SQLite database path for migration")

args = parser.parse_args()

if args.test:
print(f"Testing {args.backend} backend...")
try:
db = get_database(args.backend)
db.connect()

# Test query
if db.backend_type == "sqlite":
db.execute("SELECT sqlite_version()")
else:
db.execute("SELECT version()")

result = db.fetchone()
print(f"✅ Connection successful!")
print(f" Backend: {db.backend_type}")
print(f" Version: {result}")

db.close()
except Exception as e:
print(f"❌ Connection failed: {e}")

elif args.migrate:
if not args.sqlite_path:
print("❌ --sqlite-path required for migration")
else:
print(f"Migrating from SQLite to PostgreSQL...")
stats = migrate_sqlite_to_postgres(
args.sqlite_path,
{
"host": os.environ.get("CODITECT_PG_HOST", "localhost"),
"database": os.environ.get("CODITECT_PG_DATABASE", "coditect"),
"user": os.environ.get("CODITECT_PG_USER", "coditect"),
"password": os.environ.get("CODITECT_PG_PASSWORD"),
}
)
print(f"Migration stats: {json.dumps(stats, indent=2)}")

else:
print("CODITECT Database Backend")
print(f"PostgreSQL available: {POSTGRES_AVAILABLE}")
print(f"pgvector available: {PGVECTOR_AVAILABLE}")
print(f"\nUsage:")
print(f" python db_backend.py --test --backend sqlite")
print(f" python db_backend.py --test --backend postgresql")
print(f" python db_backend.py --migrate --sqlite-path /path/to/sessions.db")