#!/usr/bin/env python3 """ ADR-151 Context Graph Evolution - Phase 5 Schema Migration
Creates context graph tables in sessions.db (Tier 3 - REGENERABLE):
- context_graphs: Task-specific subgraph projections
- context_graph_nodes: Junction table linking graphs to kg_nodes
- context_graph_usage: Analytics/audit table for context graph usage
This migration is IDEMPOTENT - safe to run multiple times.
Usage: python3 scripts/migrations/migrate-to-context-graph-schema.py python3 scripts/migrations/migrate-to-context-graph-schema.py --dry-run python3 scripts/migrations/migrate-to-context-graph-schema.py --verify-only
Tasks: CP-31: Create context_graphs table CP-32: Create context_graph_nodes junction table CP-33: Create context_graph_usage analytics table Update db_router.py with context_graph* table routing
ADR References: - ADR-151: Context Graph Evolution Architecture - ADR-118: Four-Tier Database Architecture - ADR-148: Database Schema Documentation Standard
Created: 2026-02-03 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.25.1.1-J.25.1.4 """
import argparse import json import logging import shutil import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple
Handle both module import and direct script execution
try: from scripts.core.paths import get_sessions_db_path, get_context_storage_dir except ModuleNotFoundError: _script_dir = Path(file).resolve().parent _core_root = _script_dir.parent.parent if str(_core_root) not in sys.path: sys.path.insert(0, str(_core_root)) from scripts.core.paths import get_sessions_db_path, get_context_storage_dir
Configure logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)
=============================================================================
Schema Definitions (ADR-151 Phase 5)
=============================================================================
CP-31: context_graphs table
CONTEXT_GRAPHS_SCHEMA = """ CREATE TABLE IF NOT EXISTS context_graphs ( -- Primary identification id TEXT PRIMARY KEY, -- UUID format: "cg:{timestamp}:{hash}"
-- Graph metadata
name TEXT, -- Human-readable name (optional)
task_description TEXT, -- Task/query that prompted this graph
-- Seed configuration
seed_nodes TEXT, -- JSON array of node IDs used as seeds
seed_strategy TEXT DEFAULT 'anchor', -- 'anchor', 'semantic', 'policy_first'
-- Build parameters
token_budget INTEGER DEFAULT 4000, -- Max tokens for serialized context
max_depth INTEGER DEFAULT 3, -- BFS expansion depth limit
max_nodes INTEGER DEFAULT 128, -- Maximum nodes in graph
relevance_threshold REAL DEFAULT 0.5, -- Minimum relevance score for inclusion
-- Statistics (populated after build)
node_count INTEGER DEFAULT 0, -- Actual nodes in graph
edge_count INTEGER DEFAULT 0, -- Edges included in graph
tokens_estimated INTEGER DEFAULT 0, -- Estimated token count
-- Multi-tenant isolation (ADR-053)
tenant_id TEXT, -- Tenant UUID for multi-org isolation
project_id TEXT, -- Project UUID within tenant
session_id TEXT, -- Session that created this graph
-- Lifecycle
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
expires_at TEXT, -- Optional expiration for cleanup
-- Build metadata
build_time_ms INTEGER, -- Time to build graph
builder_version TEXT DEFAULT '1.0.0', -- Version of builder used
-- Governance overlay (J.25.4.4)
policies_applied TEXT, -- JSON array of applied governance policies
-- PHI compliance (J.25.4.3)
phi_node_count INTEGER DEFAULT 0 -- Count of nodes with potential PHI indicators
); """
CP-32: context_graph_nodes junction table
CONTEXT_GRAPH_NODES_SCHEMA = """ CREATE TABLE IF NOT EXISTS context_graph_nodes ( -- Junction keys context_graph_id TEXT NOT NULL, -- References context_graphs.id node_id TEXT NOT NULL, -- References kg_nodes.id in org.db
-- Node metadata within this graph
relevance_score REAL DEFAULT 1.0, -- Relevance to the task (0.0-1.0)
depth INTEGER DEFAULT 0, -- BFS depth from seed node
is_seed INTEGER DEFAULT 0, -- 1 if this was a seed node
-- Serialization hints
include_properties INTEGER DEFAULT 1, -- Include node properties in context
token_estimate INTEGER DEFAULT 0, -- Estimated tokens for this node
-- Audit
included_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
-- Composite primary key
PRIMARY KEY (context_graph_id, node_id),
-- Foreign key to context_graphs (kg_nodes is in different DB, so no FK)
FOREIGN KEY (context_graph_id) REFERENCES context_graphs(id) ON DELETE CASCADE
); """
CP-33: context_graph_usage analytics table
CONTEXT_GRAPH_USAGE_SCHEMA = """ CREATE TABLE IF NOT EXISTS context_graph_usage ( -- Primary identification id INTEGER PRIMARY KEY AUTOINCREMENT,
-- Foreign key
context_graph_id TEXT NOT NULL, -- References context_graphs.id
-- Usage context
session_id TEXT, -- Session using this graph
message_id TEXT, -- Message where graph was used
agent_id TEXT, -- Agent that used this graph
-- Performance metrics
tokens_used INTEGER, -- Actual tokens consumed
retrieval_time_ms INTEGER, -- Time to retrieve/serialize
-- Effectiveness (optional, for feedback loop)
was_helpful INTEGER, -- User feedback: 1=helpful, 0=not, NULL=unknown
feedback_text TEXT, -- Optional text feedback
-- Audit
used_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
-- Foreign key constraint
FOREIGN KEY (context_graph_id) REFERENCES context_graphs(id) ON DELETE CASCADE
); """
CP-33 (continued): Indices
CONTEXT_GRAPH_INDICES = [ # context_graphs indices ("idx_context_graphs_tenant", "CREATE INDEX IF NOT EXISTS idx_context_graphs_tenant ON context_graphs(tenant_id);"), ("idx_context_graphs_project", "CREATE INDEX IF NOT EXISTS idx_context_graphs_project ON context_graphs(tenant_id, project_id);"), ("idx_context_graphs_session", "CREATE INDEX IF NOT EXISTS idx_context_graphs_session ON context_graphs(session_id);"), ("idx_context_graphs_created", "CREATE INDEX IF NOT EXISTS idx_context_graphs_created ON context_graphs(created_at);"), ("idx_context_graphs_expires", "CREATE INDEX IF NOT EXISTS idx_context_graphs_expires ON context_graphs(expires_at);"),
# context_graph_nodes indices
("idx_context_graph_nodes_graph", "CREATE INDEX IF NOT EXISTS idx_context_graph_nodes_graph ON context_graph_nodes(context_graph_id);"),
("idx_context_graph_nodes_node", "CREATE INDEX IF NOT EXISTS idx_context_graph_nodes_node ON context_graph_nodes(node_id);"),
("idx_context_graph_nodes_relevance", "CREATE INDEX IF NOT EXISTS idx_context_graph_nodes_relevance ON context_graph_nodes(context_graph_id, relevance_score DESC);"),
("idx_context_graph_nodes_depth", "CREATE INDEX IF NOT EXISTS idx_context_graph_nodes_depth ON context_graph_nodes(context_graph_id, depth);"),
# context_graph_usage indices
("idx_context_graph_usage_graph", "CREATE INDEX IF NOT EXISTS idx_context_graph_usage_graph ON context_graph_usage(context_graph_id);"),
("idx_context_graph_usage_session", "CREATE INDEX IF NOT EXISTS idx_context_graph_usage_session ON context_graph_usage(session_id);"),
("idx_context_graph_usage_time", "CREATE INDEX IF NOT EXISTS idx_context_graph_usage_time ON context_graph_usage(used_at);"),
]
=============================================================================
Migration Functions
=============================================================================
def backup_database(db_path: Path, backup_dir: Optional[Path] = None) -> Path: """ Create a backup of the database before migration.
Args:
db_path: Path to the database file
backup_dir: Optional directory for backup (defaults to same directory)
Returns:
Path to the backup file
"""
if not db_path.exists():
raise FileNotFoundError(f"Database not found: {db_path}")
timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
backup_name = f"{db_path.stem}_backup_{timestamp}{db_path.suffix}"
if backup_dir:
backup_dir.mkdir(parents=True, exist_ok=True)
backup_path = backup_dir / backup_name
else:
backup_path = db_path.parent / backup_name
shutil.copy2(db_path, backup_path)
logger.info(f"Created backup: {backup_path}")
return backup_path
def get_existing_tables(conn: sqlite3.Connection) -> List[str]: """Get list of existing tables in the database.""" cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" ) return [row[0] for row in cursor.fetchall()]
def get_existing_indices(conn: sqlite3.Connection) -> List[str]: """Get list of existing indices in the database.""" cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%'" ) return [row[0] for row in cursor.fetchall()]
def verify_context_graph_schema(conn: sqlite3.Connection) -> Dict[str, bool]: """ Verify that context graph schema exists and is correct.
Returns:
Dict with verification results for each component
"""
results = {
"context_graphs": False,
"context_graph_nodes": False,
"context_graph_usage": False,
"indices": False,
}
existing_tables = get_existing_tables(conn)
existing_indices = get_existing_indices(conn)
# Check tables
results["context_graphs"] = "context_graphs" in existing_tables
results["context_graph_nodes"] = "context_graph_nodes" in existing_tables
results["context_graph_usage"] = "context_graph_usage" in existing_tables
# Check indices
expected_indices = {name for name, _ in CONTEXT_GRAPH_INDICES}
results["indices"] = expected_indices.issubset(set(existing_indices))
return results
def create_context_graphs_table(conn: sqlite3.Connection, dry_run: bool = False) -> bool: """ CP-31: Create context_graphs table.
Args:
conn: Database connection
dry_run: If True, only log what would be done
Returns:
True if table was created or already exists
"""
logger.info("CP-31: Creating context_graphs table...")
if dry_run:
logger.info("[DRY RUN] Would execute context_graphs schema")
return True
try:
conn.execute(CONTEXT_GRAPHS_SCHEMA)
conn.commit()
logger.info("CP-31: context_graphs table created successfully")
return True
except sqlite3.Error as e:
logger.error(f"CP-31: Failed to create context_graphs table: {e}")
return False
def create_context_graph_nodes_table(conn: sqlite3.Connection, dry_run: bool = False) -> bool: """ CP-32: Create context_graph_nodes junction table.
Args:
conn: Database connection
dry_run: If True, only log what would be done
Returns:
True if table was created or already exists
"""
logger.info("CP-32: Creating context_graph_nodes table...")
if dry_run:
logger.info("[DRY RUN] Would execute context_graph_nodes schema")
return True
try:
conn.execute(CONTEXT_GRAPH_NODES_SCHEMA)
conn.commit()
logger.info("CP-32: context_graph_nodes table created successfully")
return True
except sqlite3.Error as e:
logger.error(f"CP-32: Failed to create context_graph_nodes table: {e}")
return False
def create_context_graph_usage_table(conn: sqlite3.Connection, dry_run: bool = False) -> bool: """ CP-33: Create context_graph_usage analytics table.
Args:
conn: Database connection
dry_run: If True, only log what would be done
Returns:
True if table was created or already exists
"""
logger.info("CP-33: Creating context_graph_usage table...")
if dry_run:
logger.info("[DRY RUN] Would execute context_graph_usage schema")
return True
try:
conn.execute(CONTEXT_GRAPH_USAGE_SCHEMA)
conn.commit()
logger.info("CP-33: context_graph_usage table created successfully")
return True
except sqlite3.Error as e:
logger.error(f"CP-33: Failed to create context_graph_usage table: {e}")
return False
def create_context_graph_indices(conn: sqlite3.Connection, dry_run: bool = False) -> bool: """ Create indices for context graph tables.
Args:
conn: Database connection
dry_run: If True, only log what would be done
Returns:
True if all indices were created
"""
logger.info(f"Creating {len(CONTEXT_GRAPH_INDICES)} indices...")
if dry_run:
for name, _ in CONTEXT_GRAPH_INDICES:
logger.info(f"[DRY RUN] Would create index: {name}")
return True
try:
for name, sql in CONTEXT_GRAPH_INDICES:
conn.execute(sql)
logger.debug(f"Created index: {name}")
conn.commit()
logger.info(f"{len(CONTEXT_GRAPH_INDICES)} indices created successfully")
return True
except sqlite3.Error as e:
logger.error(f"Failed to create indices: {e}")
return False
def run_migration( db_path: Optional[Path] = None, dry_run: bool = False, skip_backup: bool = False, verify_only: bool = False, ) -> Tuple[bool, Dict[str, bool]]: """ Run the complete ADR-151 Phase 5 migration.
Args:
db_path: Path to sessions.db (defaults to ADR-118 location)
dry_run: If True, only log what would be done
skip_backup: If True, skip database backup
verify_only: If True, only verify schema without changes
Returns:
Tuple of (success, verification_results)
"""
# Get database path
if db_path is None:
db_path = get_sessions_db_path()
logger.info(f"ADR-151 Phase 5 Migration: {db_path}")
logger.info(f"Mode: {'DRY RUN' if dry_run else 'VERIFY ONLY' if verify_only else 'EXECUTE'}")
# Create database directory if needed
if not db_path.parent.exists():
if dry_run:
logger.info(f"[DRY RUN] Would create directory: {db_path.parent}")
else:
db_path.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Created directory: {db_path.parent}")
# Connect to database
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
# Enable foreign keys
conn.execute("PRAGMA foreign_keys = ON;")
# Verify current state
verification = verify_context_graph_schema(conn)
if verify_only:
logger.info("Verification results:")
for component, exists in verification.items():
status = "EXISTS" if exists else "MISSING"
logger.info(f" {component}: {status}")
all_exist = all(verification.values())
return all_exist, verification
# Check if migration is needed
if all(verification.values()):
logger.info("Context graph schema already exists - migration not needed")
return True, verification
# Backup database before migration
if not skip_backup and db_path.exists() and not dry_run:
backup_path = backup_database(db_path)
logger.info(f"Backup created: {backup_path}")
# Run migrations
success = True
if not verification["context_graphs"]:
success = success and create_context_graphs_table(conn, dry_run)
else:
logger.info("CP-31: context_graphs table already exists")
if not verification["context_graph_nodes"]:
success = success and create_context_graph_nodes_table(conn, dry_run)
else:
logger.info("CP-32: context_graph_nodes table already exists")
if not verification["context_graph_usage"]:
success = success and create_context_graph_usage_table(conn, dry_run)
else:
logger.info("CP-33: context_graph_usage table already exists")
if not verification["indices"]:
success = success and create_context_graph_indices(conn, dry_run)
else:
logger.info("All indices already exist")
# Final verification
final_verification = verify_context_graph_schema(conn)
if not dry_run:
logger.info("\nFinal verification:")
for component, exists in final_verification.items():
status = "OK" if exists else "FAILED"
logger.info(f" {component}: {status}")
return success and all(final_verification.values()), final_verification
finally:
conn.close()
def print_schema_summary(): """Print a summary of the context graph schema being created.""" print(""" ADR-151 Context Graph Evolution - Phase 5 Schema
DATABASE: sessions.db (Tier 3 - Regenerable)
TABLES: context_graphs - Task-specific subgraph projections context_graph_nodes - Junction table linking graphs to kg_nodes context_graph_usage - Analytics/audit for graph usage
CONTEXT_GRAPHS FIELDS: id, name, task_description, seed_nodes, seed_strategy, token_budget, max_depth, max_nodes, relevance_threshold, node_count, edge_count, tokens_estimated, tenant_id, project_id, session_id, created_at, expires_at, build_time_ms, builder_version
CONTEXT_GRAPH_NODES FIELDS: context_graph_id, node_id, relevance_score, depth, is_seed, include_properties, token_estimate, included_at
CONTEXT_GRAPH_USAGE FIELDS: id, context_graph_id, session_id, message_id, agent_id, tokens_used, retrieval_time_ms, was_helpful, feedback_text, used_at
INDICES (12): context_graphs: tenant, project, session, created, expires context_graph_nodes: graph, node, relevance, depth context_graph_usage: graph, session, time
SEED STRATEGIES:
- anchor: Ego network around anchor nodes (default)
- semantic: Vector similarity + KG expansion
- policy_first: Start from policies, intersect with entities """)
=============================================================================
CLI
=============================================================================
def main(): parser = argparse.ArgumentParser( description="ADR-151 Context Graph Evolution - Phase 5 Schema Migration", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run migration (with backup) python3 scripts/migrations/migrate-to-context-graph-schema.py
# Preview changes without executing
python3 scripts/migrations/migrate-to-context-graph-schema.py --dry-run
# Only verify current schema state
python3 scripts/migrations/migrate-to-context-graph-schema.py --verify-only
# Show schema summary
python3 scripts/migrations/migrate-to-context-graph-schema.py --show-schema
"""
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without executing"
)
parser.add_argument(
"--verify-only",
action="store_true",
help="Only verify schema state without changes"
)
parser.add_argument(
"--skip-backup",
action="store_true",
help="Skip database backup (not recommended)"
)
parser.add_argument(
"--db-path",
type=Path,
help="Path to sessions.db (defaults to ADR-118 location)"
)
parser.add_argument(
"--show-schema",
action="store_true",
help="Show schema summary and exit"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
if args.show_schema:
print_schema_summary()
return 0
# Run migration
success, verification = run_migration(
db_path=args.db_path,
dry_run=args.dry_run,
skip_backup=args.skip_backup,
verify_only=args.verify_only,
)
if args.json:
result = {
"success": success,
"verification": verification,
"db_path": str(args.db_path or get_sessions_db_path()),
"mode": "dry_run" if args.dry_run else "verify_only" if args.verify_only else "execute",
}
print(json.dumps(result, indent=2))
else:
print()
if success:
print("Migration completed successfully")
else:
print("Migration failed - check logs for details")
return 0 if success else 1
if name == "main": sys.exit(main())