#!/usr/bin/env python3 """ ADR-156: Project-Scoped Context Databases Migration
Adds project scoping capabilities to org.db and sessions.db:
- Adds 'scope' column to decisions, skill_learnings, error_solutions
- Creates project-scoped views
- Creates additional indexes for project-scoped queries
Usage: python3 scripts/migrations/adr156_project_scoped_context.py python3 scripts/migrations/adr156_project_scoped_context.py --dry-run python3 scripts/migrations/adr156_project_scoped_context.py --rollback
Created: 2026-02-04 ADR: ADR-156 Project-Scoped Context Databases Architecture """
import argparse import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path
Add parent to path for imports
sys.path.insert(0, str(Path(file).parent.parent.parent))
try: from scripts.core.paths import get_org_db_path, get_sessions_db_path except ImportError: # Fallback paths def get_org_db_path(): return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "org.db"
def get_sessions_db_path():
return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db"
MIGRATION_VERSION = "156.1.0" MIGRATION_NAME = "adr156_project_scoped_context"
def get_iso_timestamp() -> str: """Get ISO 8601 timestamp in UTC.""" return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool: """Check if a column exists in a table.""" cursor = conn.execute(f"PRAGMA table_info({table})") columns = [row[1] for row in cursor.fetchall()] return column in columns
def index_exists(conn: sqlite3.Connection, index_name: str) -> bool: """Check if an index exists.""" cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,) ) return cursor.fetchone() is not None
def view_exists(conn: sqlite3.Connection, view_name: str) -> bool: """Check if a view exists.""" cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='view' AND name=?", (view_name,) ) return cursor.fetchone() is not None
def table_exists(conn: sqlite3.Connection, table_name: str) -> bool: """Check if a table exists.""" cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,) ) return cursor.fetchone() is not None
def migrate_org_db(dry_run: bool = False) -> dict: """ Migrate org.db for project scoping.
Adds:
- scope column to decisions, skill_learnings, error_solutions
- Indexes for scope column
- Views for project-scoped queries
"""
org_db = get_org_db_path()
if not org_db.exists():
return {"status": "skipped", "reason": "org.db does not exist"}
changes = []
conn = sqlite3.connect(str(org_db))
try:
# Tables that need scope column
tables_needing_scope = ["decisions", "skill_learnings", "error_solutions"]
for table in tables_needing_scope:
if not table_exists(conn, table):
changes.append(f"SKIP: Table '{table}' does not exist")
continue
# Add scope column if missing
if not column_exists(conn, table, "scope"):
sql = f"""
ALTER TABLE {table}
ADD COLUMN scope TEXT DEFAULT 'global'
CHECK (scope IN ('global', 'project', 'customer'))
"""
if dry_run:
changes.append(f"WOULD ADD: scope column to {table}")
else:
conn.execute(sql)
changes.append(f"ADDED: scope column to {table}")
# Create scope index if missing
idx_name = f"idx_{table}_scope"
if not index_exists(conn, idx_name):
sql = f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}(scope)"
if dry_run:
changes.append(f"WOULD CREATE: index {idx_name}")
else:
conn.execute(sql)
changes.append(f"CREATED: index {idx_name}")
# Create project_id index if missing (may already exist from cloud sync)
idx_proj_name = f"idx_{table}_project_id"
if not index_exists(conn, idx_proj_name):
sql = f"CREATE INDEX IF NOT EXISTS {idx_proj_name} ON {table}(project_id)"
if dry_run:
changes.append(f"WOULD CREATE: index {idx_proj_name}")
else:
conn.execute(sql)
changes.append(f"CREATED: index {idx_proj_name}")
# Create project_decisions view
if not view_exists(conn, "project_decisions"):
sql = """
CREATE VIEW IF NOT EXISTS project_decisions AS
SELECT
d.*,
p.name AS project_name,
p.scope AS project_scope
FROM decisions d
LEFT JOIN projects p ON d.project_id = p.project_id
"""
if dry_run:
changes.append("WOULD CREATE: view project_decisions")
else:
conn.execute(sql)
changes.append("CREATED: view project_decisions")
# Create project_learnings view
if not view_exists(conn, "project_learnings"):
sql = """
CREATE VIEW IF NOT EXISTS project_learnings AS
SELECT
sl.*,
p.name AS project_name,
p.scope AS project_scope
FROM skill_learnings sl
LEFT JOIN projects p ON sl.project_id = p.project_id
"""
if dry_run:
changes.append("WOULD CREATE: view project_learnings")
else:
conn.execute(sql)
changes.append("CREATED: view project_learnings")
# Create project_error_solutions view
if not view_exists(conn, "project_error_solutions"):
sql = """
CREATE VIEW IF NOT EXISTS project_error_solutions AS
SELECT
es.*,
p.name AS project_name,
p.scope AS project_scope
FROM error_solutions es
LEFT JOIN projects p ON es.project_id = p.project_id
"""
if dry_run:
changes.append("WOULD CREATE: view project_error_solutions")
else:
conn.execute(sql)
changes.append("CREATED: view project_error_solutions")
if not dry_run:
conn.commit()
return {"status": "success", "changes": changes}
except Exception as e:
conn.rollback()
return {"status": "error", "error": str(e), "changes": changes}
finally:
conn.close()
def migrate_sessions_db(dry_run: bool = False) -> dict: """ Migrate sessions.db for project scoping.
Adds:
- project_id indexes if missing
- Views for project-scoped queries
"""
sessions_db = get_sessions_db_path()
if not sessions_db.exists():
return {"status": "skipped", "reason": "sessions.db does not exist"}
changes = []
conn = sqlite3.connect(str(sessions_db))
try:
# Tables that need project_id index
tables_needing_index = ["messages", "tool_analytics", "activity_feed"]
for table in tables_needing_index:
if not table_exists(conn, table):
changes.append(f"SKIP: Table '{table}' does not exist")
continue
# Check if project_id column exists
if not column_exists(conn, table, "project_id"):
changes.append(f"SKIP: Table '{table}' has no project_id column")
continue
# Create project_id index if missing
idx_name = f"idx_{table}_project_id"
if not index_exists(conn, idx_name):
sql = f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}(project_id)"
if dry_run:
changes.append(f"WOULD CREATE: index {idx_name}")
else:
conn.execute(sql)
changes.append(f"CREATED: index {idx_name}")
# Create project_messages view
if table_exists(conn, "messages") and not view_exists(conn, "project_messages"):
sql = """
CREATE VIEW IF NOT EXISTS project_messages AS
SELECT
m.*,
strftime('%Y-%m-%d', m.timestamp) AS message_date
FROM messages m
WHERE m.project_id IS NOT NULL
"""
if dry_run:
changes.append("WOULD CREATE: view project_messages")
else:
conn.execute(sql)
changes.append("CREATED: view project_messages")
# Create project_tool_analytics view
if table_exists(conn, "tool_analytics") and not view_exists(conn, "project_tool_analytics"):
sql = """
CREATE VIEW IF NOT EXISTS project_tool_analytics AS
SELECT
ta.*,
strftime('%Y-%m-%d', ta.created_at) AS analytics_date
FROM tool_analytics ta
WHERE ta.project_id IS NOT NULL
"""
if dry_run:
changes.append("WOULD CREATE: view project_tool_analytics")
else:
conn.execute(sql)
changes.append("CREATED: view project_tool_analytics")
# Create project_activity view
if table_exists(conn, "activity_feed") and not view_exists(conn, "project_activity"):
sql = """
CREATE VIEW IF NOT EXISTS project_activity AS
SELECT
af.*,
strftime('%Y-%m-%d', af.occurred_at) AS activity_date
FROM activity_feed af
WHERE af.project_id IS NOT NULL
"""
if dry_run:
changes.append("WOULD CREATE: view project_activity")
else:
conn.execute(sql)
changes.append("CREATED: view project_activity")
if not dry_run:
conn.commit()
return {"status": "success", "changes": changes}
except Exception as e:
conn.rollback()
return {"status": "error", "error": str(e), "changes": changes}
finally:
conn.close()
def record_migration(dry_run: bool = False) -> None: """Record the migration in org.db migrations table.""" if dry_run: print(f"WOULD RECORD: migration {MIGRATION_NAME} v{MIGRATION_VERSION}") return
org_db = get_org_db_path()
if not org_db.exists():
return
conn = sqlite3.connect(str(org_db))
try:
# Create migrations table if not exists
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
migration_name TEXT NOT NULL UNIQUE,
version TEXT NOT NULL,
applied_at TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'applied'
)
""")
# Record this migration
conn.execute("""
INSERT OR REPLACE INTO schema_migrations (migration_name, version, applied_at, status)
VALUES (?, ?, ?, 'applied')
""", (MIGRATION_NAME, MIGRATION_VERSION, get_iso_timestamp()))
conn.commit()
print(f"RECORDED: migration {MIGRATION_NAME} v{MIGRATION_VERSION}")
finally:
conn.close()
def rollback_org_db() -> dict: """Rollback org.db changes (drop views, but keep columns for safety).""" org_db = get_org_db_path() if not org_db.exists(): return {"status": "skipped", "reason": "org.db does not exist"}
changes = []
conn = sqlite3.connect(str(org_db))
try:
# Drop views
views_to_drop = ["project_decisions", "project_learnings", "project_error_solutions"]
for view in views_to_drop:
if view_exists(conn, view):
conn.execute(f"DROP VIEW IF EXISTS {view}")
changes.append(f"DROPPED: view {view}")
# Note: Not dropping columns or indexes as they may contain data
changes.append("NOTE: Columns and indexes preserved (contain data)")
conn.commit()
return {"status": "success", "changes": changes}
except Exception as e:
conn.rollback()
return {"status": "error", "error": str(e), "changes": changes}
finally:
conn.close()
def rollback_sessions_db() -> dict: """Rollback sessions.db changes (drop views).""" sessions_db = get_sessions_db_path() if not sessions_db.exists(): return {"status": "skipped", "reason": "sessions.db does not exist"}
changes = []
conn = sqlite3.connect(str(sessions_db))
try:
# Drop views
views_to_drop = ["project_messages", "project_tool_analytics", "project_activity"]
for view in views_to_drop:
if view_exists(conn, view):
conn.execute(f"DROP VIEW IF EXISTS {view}")
changes.append(f"DROPPED: view {view}")
conn.commit()
return {"status": "success", "changes": changes}
except Exception as e:
conn.rollback()
return {"status": "error", "error": str(e), "changes": changes}
finally:
conn.close()
def main(): parser = argparse.ArgumentParser( description="ADR-156: Project-Scoped Context Databases Migration" ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be done without making changes" ) parser.add_argument( "--rollback", action="store_true", help="Rollback the migration (drops views, preserves columns)" ) parser.add_argument( "--org-only", action="store_true", help="Only migrate org.db" ) parser.add_argument( "--sessions-only", action="store_true", help="Only migrate sessions.db" )
args = parser.parse_args()
print("=" * 60)
print("ADR-156: Project-Scoped Context Databases Migration")
print(f"Version: {MIGRATION_VERSION}")
print(f"Timestamp: {get_iso_timestamp()}")
if args.dry_run:
print("Mode: DRY RUN (no changes will be made)")
elif args.rollback:
print("Mode: ROLLBACK")
else:
print("Mode: APPLY")
print("=" * 60)
if args.rollback:
# Rollback mode
if not args.sessions_only:
print("\n--- Rollback org.db ---")
result = rollback_org_db()
print(f"Status: {result['status']}")
if result.get('changes'):
for change in result['changes']:
print(f" {change}")
if not args.org_only:
print("\n--- Rollback sessions.db ---")
result = rollback_sessions_db()
print(f"Status: {result['status']}")
if result.get('changes'):
for change in result['changes']:
print(f" {change}")
else:
# Apply mode
if not args.sessions_only:
print("\n--- Migrating org.db ---")
print(f"Path: {get_org_db_path()}")
result = migrate_org_db(args.dry_run)
print(f"Status: {result['status']}")
if result.get('reason'):
print(f"Reason: {result['reason']}")
if result.get('error'):
print(f"Error: {result['error']}")
if result.get('changes'):
for change in result['changes']:
print(f" {change}")
if not args.org_only:
print("\n--- Migrating sessions.db ---")
print(f"Path: {get_sessions_db_path()}")
result = migrate_sessions_db(args.dry_run)
print(f"Status: {result['status']}")
if result.get('reason'):
print(f"Reason: {result['reason']}")
if result.get('error'):
print(f"Error: {result['error']}")
if result.get('changes'):
for change in result['changes']:
print(f" {change}")
if not args.dry_run:
print("\n--- Recording migration ---")
record_migration(args.dry_run)
print("\n" + "=" * 60)
print("Migration complete")
print("=" * 60)
if name == "main": main()