Skip to main content

#!/usr/bin/env python3 """ CP-27 & CP-28: Call Graph Migration Verification Script (ADR-151 Phase 4)

Verifies that call_graph_functions and call_graph_edges from sessions.db have been properly migrated to kg_nodes and kg_edges in org.db.

This script:

  1. Compares counts between source and target tables
  2. Verifies node_id mappings
  3. Reports any missing functions or edges
  4. Generates migration status report

Usage: python3 scripts/knowledge_graph/verify_call_graph_migration.py python3 scripts/knowledge_graph/verify_call_graph_migration.py --verbose python3 scripts/knowledge_graph/verify_call_graph_migration.py --fix

Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.6.1, J.3.6.2 Author: Claude (Opus 4.5) """

import argparse import json import logging import sqlite3 import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Set, Tuple

Add coditect-core to path

_script_dir = Path(file).resolve().parent.parent.parent if str(_script_dir) not in sys.path: sys.path.insert(0, str(_script_dir))

try: from scripts.core.paths import get_org_db_path, get_sessions_db_path PATHS_AVAILABLE = True except ImportError: PATHS_AVAILABLE = False # Legacy fallback _coditect_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" get_org_db_path = lambda: _coditect_data / "org.db" get_sessions_db_path = lambda: _coditect_data / "sessions.db"

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)

class CallGraphMigrationVerifier: """ Verify call graph migration from sessions.db to org.db kg_* tables.

ADR-151 Phase 4: Unifies call_graph_* tables with kg_nodes/kg_edges.
"""

def __init__(self, sessions_db: Path, org_db: Path, verbose: bool = False):
self.sessions_db = sessions_db
self.org_db = org_db
self.verbose = verbose

self.results = {
"timestamp": datetime.now().isoformat(),
"sessions_db": str(sessions_db),
"org_db": str(org_db),
"functions": {},
"edges": {},
"memory": {},
"status": "unknown",
"issues": [],
}

def _connect(self, db_path: Path) -> Optional[sqlite3.Connection]:
"""Connect to a database with error handling."""
if not db_path.exists():
logger.error(f"Database not found: {db_path}")
return None

try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
return conn
except Exception as e:
logger.error(f"Error connecting to {db_path}: {e}")
return None

def verify_functions(self) -> Dict:
"""
CP-27: Verify call_graph_functions migrated to kg_nodes.

Returns:
Dict with migration status and any discrepancies
"""
logger.info("CP-27: Verifying function migration...")

sessions_conn = self._connect(self.sessions_db)
org_conn = self._connect(self.org_db)

result = {
"source_count": 0,
"target_count": 0,
"matched": 0,
"missing_in_target": 0,
"status": "unknown",
"sample_missing": [],
}

if not sessions_conn or not org_conn:
result["status"] = "error"
result["error"] = "Could not connect to databases"
return result

try:
# Count source functions
try:
source_count = sessions_conn.execute(
"SELECT COUNT(*) FROM call_graph_functions"
).fetchone()[0]
result["source_count"] = source_count
except sqlite3.OperationalError:
# Table doesn't exist
result["source_count"] = 0
result["status"] = "no_source"
logger.warning("call_graph_functions table not found in sessions.db")
return result

# Count target function nodes
target_count = org_conn.execute(
"SELECT COUNT(*) FROM kg_nodes WHERE node_type = 'function'"
).fetchone()[0]
result["target_count"] = target_count

if source_count == 0:
result["status"] = "no_source_data"
return result

# Get all source function IDs
source_functions = set()
cursor = sessions_conn.execute("SELECT node_id FROM call_graph_functions")
for row in cursor:
source_functions.add(row['node_id'])

# Check which are in target (via call_graph_node_id in properties)
# The kg_nodes stores original node_id in properties.call_graph_node_id
matched = 0
missing = []

# Build set of migrated node_ids from kg_nodes
target_cursor = org_conn.execute("""
SELECT id, properties FROM kg_nodes
WHERE node_type = 'function' AND properties IS NOT NULL
""")

migrated_cg_node_ids = set()
for row in target_cursor:
try:
props = json.loads(row['properties']) if row['properties'] else {}
cg_node_id = props.get('call_graph_node_id')
if cg_node_id:
migrated_cg_node_ids.add(cg_node_id)
except json.JSONDecodeError:
continue

# Check each source function
for cg_node_id in source_functions:
if cg_node_id in migrated_cg_node_ids:
matched += 1
else:
missing.append(cg_node_id)

result["matched"] = matched
result["missing_in_target"] = len(missing)
result["sample_missing"] = missing[:10] # First 10 missing

# Determine status
if matched == source_count:
result["status"] = "complete"
elif matched > 0:
result["status"] = "partial"
self.results["issues"].append(
f"Functions migration partial: {matched}/{source_count} migrated"
)
else:
result["status"] = "not_migrated"
self.results["issues"].append(
"Functions not migrated to kg_nodes"
)

if self.verbose:
logger.info(f" Source functions: {source_count}")
logger.info(f" Target function nodes: {target_count}")
logger.info(f" Matched: {matched}")
logger.info(f" Missing: {len(missing)}")

except Exception as e:
result["status"] = "error"
result["error"] = str(e)
logger.error(f"Error verifying functions: {e}")
finally:
sessions_conn.close()
org_conn.close()

return result

def verify_edges(self) -> Dict:
"""
CP-28: Verify call_graph_edges migrated to kg_edges with edge_type='CALLS'.

Returns:
Dict with migration status and any discrepancies
"""
logger.info("CP-28: Verifying edge migration...")

sessions_conn = self._connect(self.sessions_db)
org_conn = self._connect(self.org_db)

result = {
"source_count": 0,
"target_count": 0,
"status": "unknown",
}

if not sessions_conn or not org_conn:
result["status"] = "error"
result["error"] = "Could not connect to databases"
return result

try:
# Count source edges
try:
source_count = sessions_conn.execute(
"SELECT COUNT(*) FROM call_graph_edges"
).fetchone()[0]
result["source_count"] = source_count
except sqlite3.OperationalError:
result["source_count"] = 0
result["status"] = "no_source"
logger.warning("call_graph_edges table not found in sessions.db")
return result

# Count target CALLS edges
target_count = org_conn.execute(
"SELECT COUNT(*) FROM kg_edges WHERE edge_type = 'CALLS'"
).fetchone()[0]
result["target_count"] = target_count

if source_count == 0:
result["status"] = "no_source_data"
return result

# Note: Source edges may be more than target due to:
# 1. Aggregation (multiple calls between same functions -> 1 edge)
# 2. Filtering (built-in functions, unresolved callees)
# 3. Self-calls are skipped

# Get unique source pairs to compare against aggregated target
source_pairs = sessions_conn.execute(
"SELECT COUNT(DISTINCT caller_id || ':' || callee_name) FROM call_graph_edges"
).fetchone()[0]
result["source_unique_pairs"] = source_pairs

if target_count > 0:
# Consider migration complete if we have CALLS edges
# The count won't match 1:1 due to aggregation
result["status"] = "complete"

if self.verbose:
logger.info(f" Source edges: {source_count}")
logger.info(f" Source unique pairs: {source_pairs}")
logger.info(f" Target CALLS edges: {target_count}")
else:
result["status"] = "not_migrated"
self.results["issues"].append(
"CALLS edges not found in kg_edges"
)

except Exception as e:
result["status"] = "error"
result["error"] = str(e)
logger.error(f"Error verifying edges: {e}")
finally:
sessions_conn.close()
org_conn.close()

return result

def verify_memory_split(self) -> Dict:
"""
CP-30: Document that call_graph_memory stays in sessions.db.

call_graph_memory is ephemeral session context that links functions
to session messages. It stays in sessions.db (Tier 3) because:
1. It's regenerable from session data
2. It's session-specific, not global knowledge
3. kg_nodes/kg_edges are for persistent knowledge

Returns:
Dict with memory table status
"""
logger.info("CP-30: Verifying call_graph_memory split...")

sessions_conn = self._connect(self.sessions_db)

result = {
"table_exists": False,
"row_count": 0,
"location": "sessions.db",
"rationale": "Session-specific ephemeral context (regenerable)",
"status": "documented",
}

if not sessions_conn:
result["status"] = "error"
return result

try:
# Check if table exists
cursor = sessions_conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='call_graph_memory'
""")

if cursor.fetchone():
result["table_exists"] = True

row_count = sessions_conn.execute(
"SELECT COUNT(*) FROM call_graph_memory"
).fetchone()[0]
result["row_count"] = row_count

if self.verbose:
logger.info(f" call_graph_memory exists: {result['table_exists']}")
logger.info(f" Row count: {row_count}")
logger.info(f" Location: sessions.db (correct)")
else:
result["status"] = "table_not_created"

except Exception as e:
result["status"] = "error"
result["error"] = str(e)
finally:
sessions_conn.close()

return result

def run_verification(self) -> Dict:
"""
Run complete migration verification.

Returns:
Complete verification results
"""
logger.info("=" * 60)
logger.info("ADR-151 Phase 4: Call Graph Migration Verification")
logger.info("=" * 60)

self.results["functions"] = self.verify_functions()
self.results["edges"] = self.verify_edges()
self.results["memory"] = self.verify_memory_split()

# Determine overall status
func_status = self.results["functions"].get("status", "unknown")
edge_status = self.results["edges"].get("status", "unknown")

if func_status == "complete" and edge_status == "complete":
self.results["status"] = "MIGRATION_COMPLETE"
elif func_status in ("complete", "partial") and edge_status == "complete":
self.results["status"] = "MIGRATION_PARTIAL"
elif func_status == "no_source_data" and edge_status == "no_source_data":
self.results["status"] = "NO_SOURCE_DATA"
else:
self.results["status"] = "MIGRATION_INCOMPLETE"

# Print summary
print("\n" + "=" * 60)
print("VERIFICATION SUMMARY")
print("=" * 60)
print(f"Overall Status: {self.results['status']}")
print(f"\nFunctions (CP-27):")
print(f" - Source (call_graph_functions): {self.results['functions'].get('source_count', 'N/A')}")
print(f" - Target (kg_nodes function): {self.results['functions'].get('target_count', 'N/A')}")
print(f" - Status: {func_status}")

print(f"\nEdges (CP-28):")
print(f" - Source (call_graph_edges): {self.results['edges'].get('source_count', 'N/A')}")
print(f" - Target (kg_edges CALLS): {self.results['edges'].get('target_count', 'N/A')}")
print(f" - Status: {edge_status}")

print(f"\nMemory Split (CP-30):")
print(f" - call_graph_memory stays in: sessions.db")
print(f" - Rationale: Session-specific, ephemeral, regenerable")
print(f" - Status: {self.results['memory'].get('status', 'N/A')}")

if self.results["issues"]:
print(f"\nIssues Found:")
for issue in self.results["issues"]:
print(f" - {issue}")

print("=" * 60)

return self.results

def main(): parser = argparse.ArgumentParser( description="Verify call graph migration (ADR-151 Phase 4)" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show detailed verification output" ) parser.add_argument( "--json", action="store_true", help="Output results as JSON" ) parser.add_argument( "--sessions-db", type=Path, help="Path to sessions.db (default: auto-detect)" ) parser.add_argument( "--org-db", type=Path, help="Path to org.db (default: auto-detect)" )

args = parser.parse_args()

# Resolve database paths
sessions_db = args.sessions_db or get_sessions_db_path()
org_db = args.org_db or get_org_db_path()

# Run verification
verifier = CallGraphMigrationVerifier(
sessions_db=sessions_db,
org_db=org_db,
verbose=args.verbose
)

results = verifier.run_verification()

if args.json:
print(json.dumps(results, indent=2))

# Exit code based on status
if results["status"] == "MIGRATION_COMPLETE":
sys.exit(0)
elif results["status"] == "NO_SOURCE_DATA":
sys.exit(0) # No source data is not an error
else:
sys.exit(1)

if name == "main": main()