#!/usr/bin/env python3 """ ADR-118 Migration Analyzer (J.1.4.0.8)
Analyzes Python files for context.db usage and categorizes migration needs.
Usage: python3 scripts/migrations/adr118_migration_analyzer.py python3 scripts/migrations/adr118_migration_analyzer.py --json python3 scripts/migrations/adr118_migration_analyzer.py --file scripts/memory-retrieval.py
Task: J.1.4.0.8 ADR: ADR-118 Four-Tier Database Architecture Created: 2026-02-04 """
import argparse import json import os import re import sys from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Dict, List, Optional, Set
Add project root to path
SCRIPT_DIR = Path(file).resolve().parent CORE_ROOT = SCRIPT_DIR.parent.parent if str(CORE_ROOT) not in sys.path: sys.path.insert(0, str(CORE_ROOT))
@dataclass class FileAnalysis: """Analysis result for a single file.""" file_path: str migration_status: str = "unknown" # 'migrated', 'partial', 'needs_migration', 'reference_only' has_db_router_import: bool = False uses_get_context_db_path: bool = False uses_context_db_literal: bool = False uses_CONTEXT_DB: bool = False has_adr118_comment: bool = False tables_accessed: List[str] = field(default_factory=list) target_tier: str = "" # 'org', 'sessions', 'platform', 'projects', 'mixed' line_count: int = 0 patterns_found: List[str] = field(default_factory=list) migration_notes: List[str] = field(default_factory=list)
Table to tier mapping (from db_router.py)
TABLE_TIERS = { # Tier 2: org.db (IRREPLACEABLE) "decisions": "org", "skill_learnings": "org", "error_solutions": "org", "adr_references": "org", "knowledge_graph": "org", "entity_mentions": "org", "kg_nodes": "org", "kg_edges": "org", "kg_nodes_fts": "org",
# Tier 3: sessions.db (regenerable)
"sessions": "sessions",
"messages": "sessions",
"message_search": "sessions",
"tool_analytics": "sessions",
"token_economics": "sessions",
"activity_associations": "sessions",
"message_component_invocations": "sessions",
"code_patterns": "sessions",
"session_insights": "sessions",
"embeddings": "sessions",
"sync_queue": "sessions",
"task_tracking": "sessions",
"task_messages": "sessions",
"call_graph_functions": "sessions",
"call_graph_edges": "sessions",
"call_graph_memory": "sessions",
"call_graph_fts": "sessions",
"context_graphs": "sessions",
"context_graph_nodes": "sessions",
"context_graph_usage": "sessions",
"context_graph_checkpoints": "sessions",
"work_items": "sessions",
# Tier 1: platform.db
"components": "platform",
"capabilities": "platform",
"component_search": "platform",
"component_dependencies": "platform",
"component_health": "platform",
# Tier 4: projects.db
"projects": "projects",
"content_hashes": "projects",
"project_embeddings": "projects",
"exclude_patterns": "projects",
}
def analyze_file(file_path: Path) -> FileAnalysis: """Analyze a single file for context.db usage patterns.""" analysis = FileAnalysis(file_path=str(file_path))
try:
content = file_path.read_text(encoding='utf-8')
except Exception as e:
analysis.migration_notes.append(f"Could not read file: {e}")
analysis.migration_status = "error"
return analysis
lines = content.split('\n')
analysis.line_count = len(lines)
# Check for db_router import
if re.search(r'from scripts\.core\.db_router import|import.*db_router', content):
analysis.has_db_router_import = True
analysis.patterns_found.append("has_db_router_import")
# Check for get_context_db_path usage
if 'get_context_db_path' in content:
analysis.uses_get_context_db_path = True
analysis.patterns_found.append("uses_get_context_db_path")
# Check for context.db literal
if re.search(r'["\']context\.db["\']|context\.db', content):
analysis.uses_context_db_literal = True
analysis.patterns_found.append("uses_context_db_literal")
# Check for CONTEXT_DB variable
if re.search(r'\bCONTEXT_DB\b', content):
analysis.uses_CONTEXT_DB = True
analysis.patterns_found.append("uses_CONTEXT_DB")
# Check for ADR-118 comments
if re.search(r'ADR-118|context\.db.*DEPRECATED|NO FALLBACK', content, re.IGNORECASE):
analysis.has_adr118_comment = True
analysis.patterns_found.append("has_adr118_comment")
# Detect table access patterns
tables_found = set()
# FROM table patterns
from_matches = re.findall(r'FROM\s+(\w+)', content, re.IGNORECASE)
tables_found.update(from_matches)
# INSERT INTO patterns
insert_matches = re.findall(r'INSERT\s+INTO\s+(\w+)', content, re.IGNORECASE)
tables_found.update(insert_matches)
# CREATE TABLE patterns
create_matches = re.findall(r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?(\w+)', content, re.IGNORECASE)
tables_found.update(create_matches)
# UPDATE table patterns
update_matches = re.findall(r'UPDATE\s+(\w+)\s+SET', content, re.IGNORECASE)
tables_found.update(update_matches)
# DELETE FROM patterns
delete_matches = re.findall(r'DELETE\s+FROM\s+(\w+)', content, re.IGNORECASE)
tables_found.update(delete_matches)
# Filter to known tables
analysis.tables_accessed = sorted([t for t in tables_found if t.lower() in [k.lower() for k in TABLE_TIERS.keys()]])
# Determine target tier
tiers_used = set()
for table in analysis.tables_accessed:
table_lower = table.lower()
for known_table, tier in TABLE_TIERS.items():
if table_lower == known_table.lower():
tiers_used.add(tier)
break
if len(tiers_used) == 0:
analysis.target_tier = "unknown"
elif len(tiers_used) == 1:
analysis.target_tier = list(tiers_used)[0]
else:
analysis.target_tier = "mixed"
# Determine migration status
if analysis.has_db_router_import:
if analysis.uses_get_context_db_path or analysis.uses_context_db_literal:
analysis.migration_status = "partial"
analysis.migration_notes.append("Has db_router import but still uses legacy patterns")
else:
analysis.migration_status = "migrated"
elif analysis.has_adr118_comment and not (analysis.uses_get_context_db_path or analysis.uses_context_db_literal or analysis.uses_CONTEXT_DB):
analysis.migration_status = "reference_only"
analysis.migration_notes.append("Has ADR-118 awareness comments but no direct usage")
elif analysis.uses_get_context_db_path or analysis.uses_context_db_literal or analysis.uses_CONTEXT_DB:
analysis.migration_status = "needs_migration"
if analysis.has_adr118_comment:
analysis.migration_notes.append("Has ADR-118 comments but still uses legacy patterns")
else:
analysis.migration_status = "no_database"
analysis.migration_notes.append("No database usage detected")
return analysis
def find_python_files(root: Path, exclude_patterns: List[str] = None) -> List[Path]: """Find all Python files, excluding specified patterns.""" exclude_patterns = exclude_patterns or [ 'pycache', '.venv', 'test_', 'conftest', '.git', 'node_modules', ]
files = []
for py_file in root.rglob('*.py'):
path_str = str(py_file)
if any(pattern in path_str for pattern in exclude_patterns):
continue
files.append(py_file)
return sorted(files)
def categorize_by_status(analyses: List[FileAnalysis]) -> Dict[str, List[FileAnalysis]]: """Group analyses by migration status.""" categories = { 'needs_migration': [], 'partial': [], 'migrated': [], 'reference_only': [], 'no_database': [], 'error': [], }
for analysis in analyses:
status = analysis.migration_status
if status in categories:
categories[status].append(analysis)
else:
categories['error'].append(analysis)
return categories
def print_report(categories: Dict[str, List[FileAnalysis]], verbose: bool = False): """Print migration analysis report.""" print("=" * 70) print("ADR-118 Migration Analysis Report") print("=" * 70)
total = sum(len(files) for files in categories.values())
print(f"\nTotal files analyzed: {total}")
print()
# Summary
print("Summary by Status:")
print("-" * 40)
for status, files in categories.items():
if files:
emoji = {
'needs_migration': 'š“',
'partial': 'š”',
'migrated': 'ā
',
'reference_only': 'š',
'no_database': 'āŖ',
'error': 'ā',
}.get(status, 'ā')
print(f" {emoji} {status}: {len(files)} files")
# Detail for files needing migration
if categories['needs_migration']:
print("\n" + "=" * 70)
print("Files REQUIRING Migration (needs_migration):")
print("=" * 70)
for analysis in categories['needs_migration']:
print(f"\nš {analysis.file_path}")
print(f" Target tier: {analysis.target_tier}")
print(f" Tables: {', '.join(analysis.tables_accessed) or 'unknown'}")
print(f" Patterns: {', '.join(analysis.patterns_found)}")
if analysis.migration_notes:
print(f" Notes: {'; '.join(analysis.migration_notes)}")
# Detail for partial migrations
if categories['partial']:
print("\n" + "=" * 70)
print("Files with PARTIAL Migration:")
print("=" * 70)
for analysis in categories['partial']:
print(f"\nš {analysis.file_path}")
print(f" Patterns: {', '.join(analysis.patterns_found)}")
if analysis.migration_notes:
print(f" Notes: {'; '.join(analysis.migration_notes)}")
if verbose:
# Show all files
for status in ['migrated', 'reference_only', 'no_database']:
if categories[status]:
print(f"\n{status.upper()} files:")
for analysis in categories[status]:
print(f" - {analysis.file_path}")
def main(): parser = argparse.ArgumentParser( description="ADR-118 Migration Analyzer" ) parser.add_argument( '--file', '-f', help="Analyze a specific file" ) parser.add_argument( '--json', '-j', action='store_true', help="Output as JSON" ) parser.add_argument( '--verbose', '-v', action='store_true', help="Show all file categories" ) parser.add_argument( '--dir', '-d', default=str(CORE_ROOT), help="Directory to analyze" )
args = parser.parse_args()
if args.file:
# Analyze single file
file_path = Path(args.file)
if not file_path.is_absolute():
file_path = CORE_ROOT / file_path
analysis = analyze_file(file_path)
if args.json:
print(json.dumps(asdict(analysis), indent=2))
else:
print(f"File: {analysis.file_path}")
print(f"Status: {analysis.migration_status}")
print(f"Target Tier: {analysis.target_tier}")
print(f"Tables: {', '.join(analysis.tables_accessed) or 'none detected'}")
print(f"Patterns: {', '.join(analysis.patterns_found)}")
if analysis.migration_notes:
print(f"Notes: {'; '.join(analysis.migration_notes)}")
else:
# Analyze all files
root = Path(args.dir)
# Find files with context.db patterns
files_to_analyze = []
for py_file in find_python_files(root):
try:
content = py_file.read_text(encoding='utf-8')
if re.search(r'context\.db|get_context_db_path|CONTEXT_DB', content, re.IGNORECASE):
files_to_analyze.append(py_file)
except:
pass
analyses = [analyze_file(f) for f in files_to_analyze]
categories = categorize_by_status(analyses)
if args.json:
output = {
'total': len(analyses),
'summary': {k: len(v) for k, v in categories.items()},
'files': {k: [asdict(a) for a in v] for k, v in categories.items()}
}
print(json.dumps(output, indent=2))
else:
print_report(categories, verbose=args.verbose)
if name == "main": main()