#!/usr/bin/env python3 """ ADR-151 Context Graph Evolution - Phase 3: Edge Population
Orchestrates all edge builders to populate kg_edges with relationships between entities in the knowledge graph.
Usage: python3 scripts/knowledge_graph/populate_edges.py python3 scripts/knowledge_graph/populate_edges.py --dry-run python3 scripts/knowledge_graph/populate_edges.py --builders calls,invokes,belongs_to python3 scripts/knowledge_graph/populate_edges.py --stats
Critical Path Tasks: CP-16: CALLS edges from call_graph_edges CP-17: INVOKES edges from message_component_invocations CP-18: PRODUCES edges from decisions via message_id CP-19: BELONGS_TO edges from component frontmatter CP-20: DEFINES edges from ADR references in decisions CP-21: REFERENCES edges from decision text parsing CP-22: USES edges from component paths CP-23: SOLVES edges for error_solutions CP-24: SIMILAR_TO edges from embeddings [DEFERRED] CP-25: GOVERNED_BY edges from governance frontmatter CP-26: CREATED_BY edges from session_id fields
ADR References: - ADR-151: Context Graph Evolution Architecture - ADR-118: Four-Tier Database Architecture - ADR-054: Track Nomenclature
Created: 2026-02-03 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.3.5 """
import argparse import json import logging import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional
Handle imports for both module and direct execution
try: from scripts.core.paths import ( get_org_db_path, get_sessions_db_path, get_context_storage_dir, FRAMEWORK_LOC, ) except ModuleNotFoundError: _script_dir = Path(file).resolve().parent _core_root = _script_dir.parent.parent if str(_core_root) not in sys.path: sys.path.insert(0, str(_core_root)) from scripts.core.paths import ( get_org_db_path, get_sessions_db_path, get_context_storage_dir, FRAMEWORK_LOC, )
Import edge builders
from scripts.knowledge_graph.edge_builders.calls_edge_builder import CallsEdgeBuilder from scripts.knowledge_graph.edge_builders.invokes_edge_builder import InvokesEdgeBuilder from scripts.knowledge_graph.edge_builders.produces_edge_builder import ProducesEdgeBuilder from scripts.knowledge_graph.edge_builders.belongs_to_edge_builder import BelongsToEdgeBuilder from scripts.knowledge_graph.edge_builders.defines_edge_builder import DefinesEdgeBuilder from scripts.knowledge_graph.edge_builders.references_edge_builder import ReferencesEdgeBuilder from scripts.knowledge_graph.edge_builders.uses_edge_builder import UsesEdgeBuilder from scripts.knowledge_graph.edge_builders.solves_edge_builder import SolvesEdgeBuilder from scripts.knowledge_graph.edge_builders.similar_to_edge_builder import SimilarToEdgeBuilder from scripts.knowledge_graph.edge_builders.governed_by_edge_builder import GovernedByEdgeBuilder from scripts.knowledge_graph.edge_builders.created_by_edge_builder import CreatedByEdgeBuilder
Configure logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)
Available edge builders with their task IDs
BUILDER_CONFIG = { "calls": { "task_id": "CP-16", "description": "CALLS edges from call_graph_edges", "source_db": "sessions.db", }, "invokes": { "task_id": "CP-17", "description": "INVOKES edges from message_component_invocations", "source_db": "sessions.db", }, "produces": { "task_id": "CP-18", "description": "PRODUCES edges from decisions via message_id", "source_db": "org.db + sessions.db", }, "belongs_to": { "task_id": "CP-19", "description": "BELONGS_TO edges from component frontmatter", "source_db": "files", }, "defines": { "task_id": "CP-20", "description": "DEFINES edges from ADR references in decisions", "source_db": "org.db", }, "references": { "task_id": "CP-21", "description": "REFERENCES edges from decision text parsing", "source_db": "org.db", }, "uses": { "task_id": "CP-22", "description": "USES edges from component paths", "source_db": "kg_nodes", }, "solves": { "task_id": "CP-23", "description": "SOLVES edges for error_solutions", "source_db": "org.db", }, "similar_to": { "task_id": "CP-24", "description": "SIMILAR_TO edges from embeddings [DEFERRED]", "source_db": "kg_nodes", }, "governed_by": { "task_id": "CP-25", "description": "GOVERNED_BY edges from governance frontmatter", "source_db": "files + kg_nodes", }, "created_by": { "task_id": "CP-26", "description": "CREATED_BY edges from session_id fields", "source_db": "org.db + sessions.db", }, }
def create_builder( builder_name: str, dry_run: bool = False, tenant_id: Optional[str] = None, validate_nodes: bool = True, ): """ Create an edge builder instance by name.
Args:
builder_name: Name of the builder
dry_run: If True, don't write to database
tenant_id: Optional tenant ID
validate_nodes: If True, verify nodes exist before creating edges
Returns:
EdgeBuilder instance
"""
target_db = get_org_db_path()
sessions_db = get_sessions_db_path()
if builder_name == "calls":
return CallsEdgeBuilder(
source_db_path=sessions_db,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "invokes":
return InvokesEdgeBuilder(
source_db_path=sessions_db,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "produces":
return ProducesEdgeBuilder(
org_db_path=target_db,
sessions_db_path=sessions_db,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "belongs_to":
return BelongsToEdgeBuilder(
framework_root=FRAMEWORK_LOC,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "defines":
return DefinesEdgeBuilder(
source_db_path=target_db,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "references":
return ReferencesEdgeBuilder(
source_db_path=target_db,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "uses":
return UsesEdgeBuilder(
target_db_path=target_db,
framework_root=FRAMEWORK_LOC,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "solves":
return SolvesEdgeBuilder(
source_db_path=target_db,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "similar_to":
return SimilarToEdgeBuilder(
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "governed_by":
return GovernedByEdgeBuilder(
target_db_path=target_db,
framework_root=FRAMEWORK_LOC,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
elif builder_name == "created_by":
return CreatedByEdgeBuilder(
org_db_path=target_db,
sessions_db_path=sessions_db,
target_db_path=target_db,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
else:
raise ValueError(f"Unknown builder: {builder_name}")
def run_edge_population( builders: Optional[List[str]] = None, dry_run: bool = False, tenant_id: Optional[str] = None, validate_nodes: bool = True, ) -> Dict[str, Dict]: """ Run edge population for specified builders.
Args:
builders: List of builder names (None = all)
dry_run: If True, don't write to database
tenant_id: Optional tenant ID
validate_nodes: If True, verify nodes exist
Returns:
Dict mapping builder name to stats
"""
if builders is None:
builders = list(BUILDER_CONFIG.keys())
results = {}
total_edges = 0
logger.info("=" * 60)
logger.info("ADR-151 Phase 3: Edge Population")
logger.info(f"Mode: {'DRY RUN' if dry_run else 'EXECUTE'}")
logger.info(f"Builders: {', '.join(builders)}")
logger.info(f"Target: {get_org_db_path()}")
logger.info(f"Node validation: {'enabled' if validate_nodes else 'disabled'}")
logger.info("=" * 60)
for builder_name in builders:
if builder_name not in BUILDER_CONFIG:
logger.warning(f"Unknown builder: {builder_name}")
continue
config = BUILDER_CONFIG[builder_name]
task_id = config["task_id"]
description = config["description"]
logger.info("")
logger.info(f"[{task_id}] {description}")
logger.info("-" * 40)
try:
builder = create_builder(
builder_name,
dry_run=dry_run,
tenant_id=tenant_id,
validate_nodes=validate_nodes,
)
stats = builder.run()
results[builder_name] = {
"task_id": task_id,
"stats": stats,
"success": True,
}
total_edges += stats.get("inserted", 0)
except Exception as e:
logger.error(f"[{task_id}] Failed: {e}")
import traceback
traceback.print_exc()
results[builder_name] = {
"task_id": task_id,
"error": str(e),
"success": False,
}
logger.info("")
logger.info("=" * 60)
logger.info("SUMMARY")
logger.info("=" * 60)
for name, result in results.items():
status = "OK" if result.get("success") else "FAILED"
if result.get("success"):
stats = result.get("stats", {})
inserted = stats.get("inserted", 0)
skipped_from = stats.get("skipped_missing_from", 0)
skipped_to = stats.get("skipped_missing_to", 0)
logger.info(f" [{result['task_id']}] {name}: {status} ({inserted} edges, {skipped_from + skipped_to} skipped)")
else:
logger.info(f" [{result['task_id']}] {name}: {status} - {result.get('error', 'Unknown error')}")
logger.info(f"\nTotal edges created: {total_edges}")
return results
def get_edge_stats() -> Dict[str, int]: """ Get current edge counts by type from kg_edges.
Returns:
Dict mapping edge_type to count
"""
import sqlite3
org_db = get_org_db_path()
if not org_db.exists():
return {}
conn = sqlite3.connect(str(org_db))
try:
cursor = conn.execute("""
SELECT edge_type, COUNT(*) as count
FROM kg_edges
GROUP BY edge_type
ORDER BY count DESC
""")
return {row[0]: row[1] for row in cursor}
except sqlite3.OperationalError:
return {}
finally:
conn.close()
def print_stats(): """Print current kg_edges statistics.""" stats = get_edge_stats()
print("\nKG Edge Statistics")
print("=" * 40)
if not stats:
print("No edges found (kg_edges table may not exist or be empty)")
return
total = 0
for edge_type, count in stats.items():
print(f" {edge_type:20} {count:>8,}")
total += count
print("-" * 40)
print(f" {'TOTAL':20} {total:>8,}")
def main(): parser = argparse.ArgumentParser( description="ADR-151 Phase 3: Edge Population", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run all edge builders python3 scripts/knowledge_graph/populate_edges.py
# Dry run (preview without writing)
python3 scripts/knowledge_graph/populate_edges.py --dry-run
# Run specific builders
python3 scripts/knowledge_graph/populate_edges.py --builders calls,invokes,belongs_to
# Show current statistics
python3 scripts/knowledge_graph/populate_edges.py --stats
# Skip node validation (faster but may create orphan edges)
python3 scripts/knowledge_graph/populate_edges.py --no-validate
Edge Builders: calls CP-16: Function call relationships invokes CP-17: Session → Component invocations produces CP-18: Session → Decision relationships belongs_to CP-19: Component → Track assignments defines CP-20: ADR → Decision governance references CP-21: Decision → File/ADR references uses CP-22: Component → File source solves CP-23: ErrorSolution patterns similar_to CP-24: Embedding similarity [DEFERRED] governed_by CP-25: Entity → ADR governance created_by CP-26: Entity → Session provenance """ )
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without writing to database"
)
parser.add_argument(
"--builders", "-b",
type=str,
help="Comma-separated list of builders to run (default: all)"
)
parser.add_argument(
"--stats",
action="store_true",
help="Show current kg_edges statistics and exit"
)
parser.add_argument(
"--no-validate",
action="store_true",
help="Skip node existence validation (faster but may create orphan edges)"
)
parser.add_argument(
"--tenant-id",
type=str,
help="Tenant ID for multi-tenant isolation"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
if args.stats:
print_stats()
return 0
# Parse builders list
builders = None
if args.builders:
builders = [b.strip() for b in args.builders.split(",")]
# Validate
for b in builders:
if b not in BUILDER_CONFIG:
print(f"Error: Unknown builder '{b}'")
print(f"Available: {', '.join(BUILDER_CONFIG.keys())}")
return 1
# Run edge population
results = run_edge_population(
builders=builders,
dry_run=args.dry_run,
tenant_id=args.tenant_id,
validate_nodes=not args.no_validate,
)
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
# Print final stats
print_stats()
# Return success if all builders succeeded
all_success = all(r.get("success", False) for r in results.values())
return 0 if all_success else 1
if name == "main": sys.exit(main())