Skip to main content

#!/usr/bin/env python3 """ ADR-151 Context Graph Evolution - Phase 1 Schema Migration

Creates knowledge graph tables in org.db (Tier 2 - CRITICAL):

  • kg_nodes: Universal entity table for all CODITECT entities
  • kg_edges: Typed relationships between entities
  • kg_nodes_fts: FTS5 full-text search over nodes

This migration is IDEMPOTENT - safe to run multiple times.

Usage: python3 scripts/migrations/migrate-to-kg-schema.py python3 scripts/migrations/migrate-to-kg-schema.py --dry-run python3 scripts/migrations/migrate-to-kg-schema.py --verify-only

Tasks: CP-01: Create kg_nodes table CP-02: Create kg_edges table CP-03: Create kg_nodes_fts FTS5 virtual table CP-04: Add indices

ADR References: - ADR-151: Context Graph Evolution Architecture - ADR-118: Four-Tier Database Architecture - ADR-148: Database Schema Documentation Standard

Created: 2026-02-03 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.3.3.1-J.3.3.5 """

import argparse import json import logging import shutil import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple

Handle both module import and direct script execution

try: from scripts.core.paths import get_org_db_path, get_context_storage_dir except ModuleNotFoundError: _script_dir = Path(file).resolve().parent _core_root = _script_dir.parent.parent if str(_core_root) not in sys.path: sys.path.insert(0, str(_core_root)) from scripts.core.paths import get_org_db_path, get_context_storage_dir

Configure logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)

=============================================================================

Schema Definitions (ADR-151 Phase 1)

=============================================================================

CP-01: kg_nodes table

KG_NODES_SCHEMA = """ CREATE TABLE IF NOT EXISTS kg_nodes ( -- Primary identification id TEXT PRIMARY KEY, -- UUID format: "{type}:{uuid}" e.g., "decision:42"

-- Node classification (ADR-151 Entity Types)
node_type TEXT NOT NULL, -- component, session, decision, error_solution,
-- skill_learning, file, function, track, adr,
-- policy, audit_event
subtype TEXT, -- For component: agent/skill/command/script/hook
-- For file: python/markdown/yaml/json

-- Human-readable identification
name TEXT NOT NULL, -- Display name for the entity

-- Flexible attributes (ADR-148 documentation)
properties TEXT, -- JSON properties bag for type-specific data
-- decisions: decision_type, confidence, rationale
-- components: capabilities, invocation_patterns
-- files: path, language, lines_of_code

-- Semantic search support
embedding BLOB, -- 1536-dim float32 vector (6144 bytes)
-- Generated via OpenAI ada-002 or local model

-- Multi-tenant isolation (ADR-053)
tenant_id TEXT, -- Tenant UUID for multi-org isolation
project_id TEXT, -- Project UUID within tenant

-- Audit trail
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),

-- Source tracking for provenance
source_table TEXT, -- Original table if migrated (decisions, error_solutions, etc.)
source_id TEXT -- Original ID in source table

); """

CP-02: kg_edges table

KG_EDGES_SCHEMA = """ CREATE TABLE IF NOT EXISTS kg_edges ( -- Primary identification id TEXT PRIMARY KEY, -- UUID format

-- Edge classification (ADR-151 Edge Types)
edge_type TEXT NOT NULL, -- INVOKES, PRODUCES, SOLVES, BELONGS_TO, DEFINES,
-- SIMILAR_TO, REFERENCES, CALLS, USES,
-- GOVERNED_BY, CREATED_BY

-- Graph structure
from_node TEXT NOT NULL, -- Source node_id
to_node TEXT NOT NULL, -- Target node_id

-- Edge properties
properties TEXT, -- JSON properties bag
-- weight: relevance/confidence score (0.0-1.0)
-- context: why this relationship exists
-- metadata: type-specific attributes

-- Multi-tenant isolation
tenant_id TEXT,

-- Audit trail
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),

-- Referential integrity
FOREIGN KEY (from_node) REFERENCES kg_nodes(id) ON DELETE CASCADE,
FOREIGN KEY (to_node) REFERENCES kg_nodes(id) ON DELETE CASCADE,

-- Prevent duplicate edges of same type between same nodes
UNIQUE(from_node, to_node, edge_type)

); """

CP-03: kg_nodes_fts FTS5 virtual table

KG_NODES_FTS_SCHEMA = """ CREATE VIRTUAL TABLE IF NOT EXISTS kg_nodes_fts USING fts5( name, properties, content='kg_nodes', content_rowid='rowid' ); """

FTS5 synchronization triggers

KG_NODES_FTS_TRIGGERS = [ # Insert trigger """ CREATE TRIGGER IF NOT EXISTS kg_nodes_fts_insert AFTER INSERT ON kg_nodes BEGIN INSERT INTO kg_nodes_fts(rowid, name, properties) VALUES (NEW.rowid, NEW.name, NEW.properties); END; """, # Delete trigger """ CREATE TRIGGER IF NOT EXISTS kg_nodes_fts_delete AFTER DELETE ON kg_nodes BEGIN INSERT INTO kg_nodes_fts(kg_nodes_fts, rowid, name, properties) VALUES ('delete', OLD.rowid, OLD.name, OLD.properties); END; """, # Update trigger """ CREATE TRIGGER IF NOT EXISTS kg_nodes_fts_update AFTER UPDATE ON kg_nodes BEGIN INSERT INTO kg_nodes_fts(kg_nodes_fts, rowid, name, properties) VALUES ('delete', OLD.rowid, OLD.name, OLD.properties); INSERT INTO kg_nodes_fts(rowid, name, properties) VALUES (NEW.rowid, NEW.name, NEW.properties); END; """ ]

CP-04: Indices

KG_INDICES = [ # Node indices ("idx_kg_nodes_type", "CREATE INDEX IF NOT EXISTS idx_kg_nodes_type ON kg_nodes(node_type);"), ("idx_kg_nodes_subtype", "CREATE INDEX IF NOT EXISTS idx_kg_nodes_subtype ON kg_nodes(subtype);"), ("idx_kg_nodes_tenant", "CREATE INDEX IF NOT EXISTS idx_kg_nodes_tenant ON kg_nodes(tenant_id);"), ("idx_kg_nodes_project", "CREATE INDEX IF NOT EXISTS idx_kg_nodes_project ON kg_nodes(tenant_id, project_id);"), ("idx_kg_nodes_source", "CREATE INDEX IF NOT EXISTS idx_kg_nodes_source ON kg_nodes(source_table, source_id);"), ("idx_kg_nodes_updated", "CREATE INDEX IF NOT EXISTS idx_kg_nodes_updated ON kg_nodes(updated_at);"),

# Edge indices
("idx_kg_edges_type", "CREATE INDEX IF NOT EXISTS idx_kg_edges_type ON kg_edges(edge_type);"),
("idx_kg_edges_from", "CREATE INDEX IF NOT EXISTS idx_kg_edges_from ON kg_edges(from_node);"),
("idx_kg_edges_to", "CREATE INDEX IF NOT EXISTS idx_kg_edges_to ON kg_edges(to_node);"),
("idx_kg_edges_tenant", "CREATE INDEX IF NOT EXISTS idx_kg_edges_tenant ON kg_edges(tenant_id);"),
("idx_kg_edges_pair", "CREATE INDEX IF NOT EXISTS idx_kg_edges_pair ON kg_edges(from_node, to_node);"),

]

=============================================================================

Migration Functions

=============================================================================

def backup_database(db_path: Path, backup_dir: Optional[Path] = None) -> Path: """ Create a backup of the database before migration.

Args:
db_path: Path to the database file
backup_dir: Optional directory for backup (defaults to same directory)

Returns:
Path to the backup file
"""
if not db_path.exists():
raise FileNotFoundError(f"Database not found: {db_path}")

timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
backup_name = f"{db_path.stem}_backup_{timestamp}{db_path.suffix}"

if backup_dir:
backup_dir.mkdir(parents=True, exist_ok=True)
backup_path = backup_dir / backup_name
else:
backup_path = db_path.parent / backup_name

shutil.copy2(db_path, backup_path)
logger.info(f"Created backup: {backup_path}")

return backup_path

def get_existing_tables(conn: sqlite3.Connection) -> List[str]: """Get list of existing tables in the database.""" cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" ) return [row[0] for row in cursor.fetchall()]

def get_existing_indices(conn: sqlite3.Connection) -> List[str]: """Get list of existing indices in the database.""" cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%'" ) return [row[0] for row in cursor.fetchall()]

def get_existing_triggers(conn: sqlite3.Connection) -> List[str]: """Get list of existing triggers in the database.""" cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='trigger'" ) return [row[0] for row in cursor.fetchall()]

def verify_kg_schema(conn: sqlite3.Connection) -> Dict[str, bool]: """ Verify that KG schema exists and is correct.

Returns:
Dict with verification results for each component
"""
results = {
"kg_nodes": False,
"kg_edges": False,
"kg_nodes_fts": False,
"fts_triggers": False,
"indices": False,
}

existing_tables = get_existing_tables(conn)
existing_indices = get_existing_indices(conn)
existing_triggers = get_existing_triggers(conn)

# Check tables
results["kg_nodes"] = "kg_nodes" in existing_tables
results["kg_edges"] = "kg_edges" in existing_tables

# Check FTS5 virtual table (shows as table in sqlite_master)
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='kg_nodes_fts'"
)
results["kg_nodes_fts"] = cursor.fetchone() is not None

# Check triggers
expected_triggers = {"kg_nodes_fts_insert", "kg_nodes_fts_delete", "kg_nodes_fts_update"}
results["fts_triggers"] = expected_triggers.issubset(set(existing_triggers))

# Check indices
expected_indices = {name for name, _ in KG_INDICES}
results["indices"] = expected_indices.issubset(set(existing_indices))

return results

def create_kg_nodes_table(conn: sqlite3.Connection, dry_run: bool = False) -> bool: """ CP-01: Create kg_nodes table.

Args:
conn: Database connection
dry_run: If True, only log what would be done

Returns:
True if table was created or already exists
"""
logger.info("CP-01: Creating kg_nodes table...")

if dry_run:
logger.info("[DRY RUN] Would execute kg_nodes schema")
return True

try:
conn.execute(KG_NODES_SCHEMA)
conn.commit()
logger.info("CP-01: kg_nodes table created successfully")
return True
except sqlite3.Error as e:
logger.error(f"CP-01: Failed to create kg_nodes table: {e}")
return False

def create_kg_edges_table(conn: sqlite3.Connection, dry_run: bool = False) -> bool: """ CP-02: Create kg_edges table.

Args:
conn: Database connection
dry_run: If True, only log what would be done

Returns:
True if table was created or already exists
"""
logger.info("CP-02: Creating kg_edges table...")

if dry_run:
logger.info("[DRY RUN] Would execute kg_edges schema")
return True

try:
conn.execute(KG_EDGES_SCHEMA)
conn.commit()
logger.info("CP-02: kg_edges table created successfully")
return True
except sqlite3.Error as e:
logger.error(f"CP-02: Failed to create kg_edges table: {e}")
return False

def create_kg_nodes_fts(conn: sqlite3.Connection, dry_run: bool = False) -> bool: """ CP-03: Create kg_nodes_fts FTS5 virtual table and sync triggers.

Args:
conn: Database connection
dry_run: If True, only log what would be done

Returns:
True if FTS5 table and triggers were created
"""
logger.info("CP-03: Creating kg_nodes_fts FTS5 virtual table...")

if dry_run:
logger.info("[DRY RUN] Would create FTS5 table and 3 sync triggers")
return True

try:
# Create FTS5 virtual table
conn.execute(KG_NODES_FTS_SCHEMA)

# Create sync triggers
for trigger_sql in KG_NODES_FTS_TRIGGERS:
conn.execute(trigger_sql)

conn.commit()
logger.info("CP-03: kg_nodes_fts and sync triggers created successfully")
return True
except sqlite3.Error as e:
logger.error(f"CP-03: Failed to create FTS5 table: {e}")
return False

def create_kg_indices(conn: sqlite3.Connection, dry_run: bool = False) -> bool: """ CP-04: Create indices for kg_nodes and kg_edges.

Args:
conn: Database connection
dry_run: If True, only log what would be done

Returns:
True if all indices were created
"""
logger.info(f"CP-04: Creating {len(KG_INDICES)} indices...")

if dry_run:
for name, _ in KG_INDICES:
logger.info(f"[DRY RUN] Would create index: {name}")
return True

try:
for name, sql in KG_INDICES:
conn.execute(sql)
logger.debug(f"Created index: {name}")

conn.commit()
logger.info(f"CP-04: {len(KG_INDICES)} indices created successfully")
return True
except sqlite3.Error as e:
logger.error(f"CP-04: Failed to create indices: {e}")
return False

def run_migration( db_path: Optional[Path] = None, dry_run: bool = False, skip_backup: bool = False, verify_only: bool = False, ) -> Tuple[bool, Dict[str, bool]]: """ Run the complete ADR-151 Phase 1 migration.

Args:
db_path: Path to org.db (defaults to ADR-118 location)
dry_run: If True, only log what would be done
skip_backup: If True, skip database backup
verify_only: If True, only verify schema without changes

Returns:
Tuple of (success, verification_results)
"""
# Get database path
if db_path is None:
db_path = get_org_db_path()

logger.info(f"ADR-151 Phase 1 Migration: {db_path}")
logger.info(f"Mode: {'DRY RUN' if dry_run else 'VERIFY ONLY' if verify_only else 'EXECUTE'}")

# Create database directory if needed
if not db_path.parent.exists():
if dry_run:
logger.info(f"[DRY RUN] Would create directory: {db_path.parent}")
else:
db_path.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Created directory: {db_path.parent}")

# Connect to database
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row

try:
# Enable foreign keys
conn.execute("PRAGMA foreign_keys = ON;")

# Verify current state
verification = verify_kg_schema(conn)

if verify_only:
logger.info("Verification results:")
for component, exists in verification.items():
status = "EXISTS" if exists else "MISSING"
logger.info(f" {component}: {status}")
all_exist = all(verification.values())
return all_exist, verification

# Check if migration is needed
if all(verification.values()):
logger.info("KG schema already exists - migration not needed")
return True, verification

# Backup database before migration
if not skip_backup and db_path.exists() and not dry_run:
backup_path = backup_database(db_path)
logger.info(f"Backup created: {backup_path}")

# Run migrations
success = True

if not verification["kg_nodes"]:
success = success and create_kg_nodes_table(conn, dry_run)
else:
logger.info("CP-01: kg_nodes table already exists")

if not verification["kg_edges"]:
success = success and create_kg_edges_table(conn, dry_run)
else:
logger.info("CP-02: kg_edges table already exists")

if not verification["kg_nodes_fts"] or not verification["fts_triggers"]:
success = success and create_kg_nodes_fts(conn, dry_run)
else:
logger.info("CP-03: kg_nodes_fts already exists")

if not verification["indices"]:
success = success and create_kg_indices(conn, dry_run)
else:
logger.info("CP-04: All indices already exist")

# Final verification
final_verification = verify_kg_schema(conn)

if not dry_run:
logger.info("\nFinal verification:")
for component, exists in final_verification.items():
status = "OK" if exists else "FAILED"
logger.info(f" {component}: {status}")

return success and all(final_verification.values()), final_verification

finally:
conn.close()

def print_schema_summary(): """Print a summary of the KG schema being created.""" print(""" ADR-151 Context Graph Evolution - Phase 1 Schema

TABLES: kg_nodes - Universal entity table (11 node types) kg_edges - Typed relationships (12 edge types) kg_nodes_fts - Full-text search (FTS5)

NODE TYPES: component, session, decision, error_solution, skill_learning, file, function, track, adr, policy, audit_event

EDGE TYPES: INVOKES, PRODUCES, SOLVES, BELONGS_TO, DEFINES, SIMILAR_TO, REFERENCES, CALLS, USES, GOVERNED_BY, CREATED_BY

INDICES (11): kg_nodes: type, subtype, tenant, project, source, updated kg_edges: type, from, to, tenant, pair

TRIGGERS (3): kg_nodes_fts_insert, kg_nodes_fts_delete, kg_nodes_fts_update """)

=============================================================================

CLI

=============================================================================

def main(): parser = argparse.ArgumentParser( description="ADR-151 Context Graph Evolution - Phase 1 Schema Migration", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run migration (with backup) python3 scripts/migrations/migrate-to-kg-schema.py

# Preview changes without executing
python3 scripts/migrations/migrate-to-kg-schema.py --dry-run

# Only verify current schema state
python3 scripts/migrations/migrate-to-kg-schema.py --verify-only

# Show schema summary
python3 scripts/migrations/migrate-to-kg-schema.py --show-schema
"""
)

parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without executing"
)
parser.add_argument(
"--verify-only",
action="store_true",
help="Only verify schema state without changes"
)
parser.add_argument(
"--skip-backup",
action="store_true",
help="Skip database backup (not recommended)"
)
parser.add_argument(
"--db-path",
type=Path,
help="Path to org.db (defaults to ADR-118 location)"
)
parser.add_argument(
"--show-schema",
action="store_true",
help="Show schema summary and exit"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)

args = parser.parse_args()

if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

if args.show_schema:
print_schema_summary()
return 0

# Run migration
success, verification = run_migration(
db_path=args.db_path,
dry_run=args.dry_run,
skip_backup=args.skip_backup,
verify_only=args.verify_only,
)

if args.json:
result = {
"success": success,
"verification": verification,
"db_path": str(args.db_path or get_org_db_path()),
"mode": "dry_run" if args.dry_run else "verify_only" if args.verify_only else "execute",
}
print(json.dumps(result, indent=2))
else:
print()
if success:
print("Migration completed successfully")
else:
print("Migration failed - check logs for details")

return 0 if success else 1

if name == "main": sys.exit(main())