Skip to main content

#!/usr/bin/env python3 """ ADR-151 Context Graph Evolution - Phase 2: Entity Node Population

Orchestrates all entity extractors to populate kg_nodes from existing tables.

Usage: python3 scripts/knowledge_graph/populate_nodes.py python3 scripts/knowledge_graph/populate_nodes.py --dry-run python3 scripts/knowledge_graph/populate_nodes.py --extractors component,track,adr python3 scripts/knowledge_graph/populate_nodes.py --stats

Critical Path Tasks: CP-07: Component nodes from platform.db CP-08: File nodes from sessions.db call_graph CP-09: Function nodes from sessions.db call_graph CP-10: Track nodes (static definition) CP-11: ADR nodes from coditect-documentation/coditect-core/adrs/ (ADR-213) CP-12: Session nodes from sessions.db messages CP-13: Decision nodes from org.db decisions CP-14: ErrorSolution nodes from org.db error_solutions CP-15: SkillLearning nodes from org.db skill_learnings

ADR References: - ADR-151: Context Graph Evolution Architecture - ADR-118: Four-Tier Database Architecture - ADR-054: Track Nomenclature

Created: 2026-02-03 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.3.4 """

import argparse import json import logging import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional

Handle imports for both module and direct execution

try: from scripts.core.paths import ( get_org_db_path, get_sessions_db_path, get_context_storage_dir, get_adrs_dir, get_tracks_dir, get_docs_dir, get_session_logs_dir, get_diagrams_dir, FRAMEWORK_LOC, ) except ModuleNotFoundError: _script_dir = Path(file).resolve().parent _core_root = _script_dir.parent.parent if str(_core_root) not in sys.path: sys.path.insert(0, str(_core_root)) from scripts.core.paths import ( get_org_db_path, get_sessions_db_path, get_context_storage_dir, get_adrs_dir, get_tracks_dir, get_docs_dir, get_session_logs_dir, get_diagrams_dir, FRAMEWORK_LOC, )

Import extractors

from scripts.knowledge_graph.extractors.component_extractor import ComponentExtractor from scripts.knowledge_graph.extractors.file_extractor import FileExtractor from scripts.knowledge_graph.extractors.function_extractor import FunctionExtractor from scripts.knowledge_graph.extractors.track_extractor import TrackExtractor from scripts.knowledge_graph.extractors.adr_extractor import ADRExtractor from scripts.knowledge_graph.extractors.session_extractor import SessionExtractor from scripts.knowledge_graph.extractors.decision_extractor import DecisionExtractor from scripts.knowledge_graph.extractors.error_solution_extractor import ErrorSolutionExtractor from scripts.knowledge_graph.extractors.skill_learning_extractor import SkillLearningExtractor from scripts.knowledge_graph.extractors.policy_extractor import PolicyExtractor from scripts.knowledge_graph.extractors.audit_event_extractor import AuditEventExtractor from scripts.knowledge_graph.extractors.document_extractor import DocumentExtractor

Configure logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)

Available extractors with their task IDs

EXTRACTOR_CONFIG = { "component": { "task_id": "CP-07", "description": "Component nodes from platform.db", "source_db": "platform.db", }, "file": { "task_id": "CP-08", "description": "File nodes from call_graph", "source_db": "sessions.db", }, "function": { "task_id": "CP-09", "description": "Function nodes from call_graph", "source_db": "sessions.db", }, "track": { "task_id": "CP-10", "description": "Track nodes (static)", "source_db": None, }, "adr": { "task_id": "CP-11", "description": "ADR nodes from markdown files", "source_db": None, }, "session": { "task_id": "CP-12", "description": "Session nodes from messages", "source_db": "sessions.db", }, "decision": { "task_id": "CP-13", "description": "Decision nodes from org.db", "source_db": "org.db", }, "error_solution": { "task_id": "CP-14", "description": "ErrorSolution nodes from org.db", "source_db": "org.db", }, "skill_learning": { "task_id": "CP-15", "description": "SkillLearning nodes from org.db", "source_db": "org.db", }, "policy": { "task_id": "OPT-1", "description": "Policy nodes from CLAUDE.md, standards, hooks", "source_db": None, }, "audit_event": { "task_id": "OPT-2", "description": "Audit event nodes from sessions.db", "source_db": "sessions.db", }, "document": { "task_id": "CP-16", "description": "Document nodes from coditect-documentation, session logs, diagrams (ADR-213)", "source_db": None, }, }

def get_platform_db_path() -> Path: """Get platform.db path.""" return get_context_storage_dir() / "platform.db"

def get_track_files_dir() -> Path: """Get track files directory path (delegates to paths.py ADR-213).""" return get_tracks_dir()

def create_extractor( extractor_name: str, dry_run: bool = False, tenant_id: Optional[str] = None, project_id: Optional[str] = None, ): """ Create an extractor instance by name.

Args:
extractor_name: Name of the extractor
dry_run: If True, don't write to database
tenant_id: Optional tenant ID
project_id: Optional project ID

Returns:
Extractor instance
"""
target_db = get_org_db_path()

if extractor_name == "component":
return ComponentExtractor(
source_db_path=get_platform_db_path(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "file":
return FileExtractor(
source_db_path=get_sessions_db_path(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "function":
return FunctionExtractor(
source_db_path=get_sessions_db_path(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "track":
return TrackExtractor(
target_db_path=target_db,
track_files_dir=get_track_files_dir(),
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "adr":
return ADRExtractor(
adrs_dir=get_adrs_dir(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "session":
return SessionExtractor(
source_db_path=get_sessions_db_path(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "decision":
return DecisionExtractor(
source_db_path=get_org_db_path(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "error_solution":
return ErrorSolutionExtractor(
source_db_path=get_org_db_path(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "skill_learning":
return SkillLearningExtractor(
source_db_path=get_org_db_path(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "policy":
return PolicyExtractor(
framework_dir=FRAMEWORK_LOC,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "audit_event":
return AuditEventExtractor(
source_db_path=get_sessions_db_path(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
elif extractor_name == "document":
return DocumentExtractor(
docs_dir=get_docs_dir(),
session_logs_dir=get_session_logs_dir(),
diagrams_dir=get_diagrams_dir(),
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
else:
raise ValueError(f"Unknown extractor: {extractor_name}")

def run_extraction( extractors: Optional[List[str]] = None, dry_run: bool = False, tenant_id: Optional[str] = None, project_id: Optional[str] = None, ) -> Dict[str, Dict]: """ Run entity extraction for specified extractors.

Args:
extractors: List of extractor names (None = all)
dry_run: If True, don't write to database
tenant_id: Optional tenant ID
project_id: Optional project ID

Returns:
Dict mapping extractor name to stats
"""
if extractors is None:
extractors = list(EXTRACTOR_CONFIG.keys())

results = {}
total_nodes = 0

logger.info("=" * 60)
logger.info("ADR-151 Phase 2: Entity Node Population")
logger.info(f"Mode: {'DRY RUN' if dry_run else 'EXECUTE'}")
logger.info(f"Extractors: {', '.join(extractors)}")
logger.info(f"Target: {get_org_db_path()}")
logger.info("=" * 60)

for extractor_name in extractors:
if extractor_name not in EXTRACTOR_CONFIG:
logger.warning(f"Unknown extractor: {extractor_name}")
continue

config = EXTRACTOR_CONFIG[extractor_name]
task_id = config["task_id"]
description = config["description"]

logger.info("")
logger.info(f"[{task_id}] {description}")
logger.info("-" * 40)

try:
extractor = create_extractor(
extractor_name,
dry_run=dry_run,
tenant_id=tenant_id,
project_id=project_id,
)
stats = extractor.run()
results[extractor_name] = {
"task_id": task_id,
"stats": stats,
"success": True,
}
total_nodes += stats.get("inserted", 0) + stats.get("updated", 0)

except Exception as e:
logger.error(f"[{task_id}] Failed: {e}")
results[extractor_name] = {
"task_id": task_id,
"error": str(e),
"success": False,
}

logger.info("")
logger.info("=" * 60)
logger.info("SUMMARY")
logger.info("=" * 60)

for name, result in results.items():
status = "OK" if result.get("success") else "FAILED"
if result.get("success"):
stats = result.get("stats", {})
count = stats.get("inserted", 0) + stats.get("updated", 0)
logger.info(f" [{result['task_id']}] {name}: {status} ({count} nodes)")
else:
logger.info(f" [{result['task_id']}] {name}: {status} - {result.get('error', 'Unknown error')}")

logger.info(f"\nTotal nodes created/updated: {total_nodes}")

return results

def get_node_stats() -> Dict[str, int]: """ Get current node counts by type from kg_nodes.

Returns:
Dict mapping node_type to count
"""
import sqlite3

org_db = get_org_db_path()
if not org_db.exists():
return {}

conn = sqlite3.connect(str(org_db))
try:
cursor = conn.execute("""
SELECT node_type, COUNT(*) as count
FROM kg_nodes
GROUP BY node_type
ORDER BY count DESC
""")
return {row[0]: row[1] for row in cursor}
except sqlite3.OperationalError:
return {}
finally:
conn.close()

def print_stats(): """Print current kg_nodes statistics.""" stats = get_node_stats()

print("\nKG Node Statistics")
print("=" * 40)

if not stats:
print("No nodes found (kg_nodes table may not exist)")
return

total = 0
for node_type, count in stats.items():
print(f" {node_type:20} {count:>8,}")
total += count

print("-" * 40)
print(f" {'TOTAL':20} {total:>8,}")

def main(): parser = argparse.ArgumentParser( description="ADR-151 Phase 2: Entity Node Population", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run all extractors python3 scripts/knowledge_graph/populate_nodes.py

# Dry run (preview without writing)
python3 scripts/knowledge_graph/populate_nodes.py --dry-run

# Run specific extractors
python3 scripts/knowledge_graph/populate_nodes.py --extractors component,track,adr

# Show current statistics
python3 scripts/knowledge_graph/populate_nodes.py --stats

Extractors: component CP-07: Components from platform.db file CP-08: Files from call_graph function CP-09: Functions from call_graph track CP-10: PILOT tracks (static) adr CP-11: ADRs from markdown session CP-12: Sessions from messages decision CP-13: Decisions from org.db error_solution CP-14: Error solutions from org.db skill_learning CP-15: Skill learnings from org.db policy OPT-1: Policies from CLAUDE.md, standards, hooks audit_event OPT-2: Audit events from sessions.db """ )

parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without writing to database"
)
parser.add_argument(
"--extractors", "-e",
type=str,
help="Comma-separated list of extractors to run (default: all)"
)
parser.add_argument(
"--stats",
action="store_true",
help="Show current kg_nodes statistics and exit"
)
parser.add_argument(
"--tenant-id",
type=str,
help="Tenant ID for multi-tenant isolation"
)
parser.add_argument(
"--project-id",
type=str,
help="Project ID"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)

args = parser.parse_args()

if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

if args.stats:
print_stats()
return 0

# Parse extractors list
extractors = None
if args.extractors:
extractors = [e.strip() for e in args.extractors.split(",")]
# Validate
for e in extractors:
if e not in EXTRACTOR_CONFIG:
print(f"Error: Unknown extractor '{e}'")
print(f"Available: {', '.join(EXTRACTOR_CONFIG.keys())}")
return 1

# Run extraction
results = run_extraction(
extractors=extractors,
dry_run=args.dry_run,
tenant_id=args.tenant_id,
project_id=args.project_id,
)

if args.json:
print(json.dumps(results, indent=2, default=str))
else:
# Print final stats
print_stats()

# Return success if all extractors succeeded
all_success = all(r.get("success", False) for r in results.values())
return 0 if all_success else 1

if name == "main": sys.exit(main())