scripts-defines-edge-builder
#!/usr/bin/env python3 """ CP-20: DEFINES Edge Builder (ADR-151)
Creates DEFINES edges from ADR nodes to Decision nodes.
Edge: adr:ADR-XXX -> decision:Y Source: Cross-reference ADR numbers mentioned in decision text Properties: governance_level, reference_context
Links architecture decision records to the runtime decisions they govern.
Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.5.5 """
import json import logging import re from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Set, Tuple
from .base_edge_builder import SQLiteSourceEdgeBuilder
logger = logging.getLogger(name)
class DefinesEdgeBuilder(SQLiteSourceEdgeBuilder): """ Build DEFINES edges linking ADRs to decisions they govern.
Parses decision text for ADR references (ADR-XXX patterns) and
creates edges from the referenced ADRs to those decisions.
"""
# Pattern to match ADR references
ADR_PATTERN = re.compile(r'ADR[- ]?(\d{3})', re.IGNORECASE)
@property
def edge_type(self) -> str:
return "DEFINES"
def _extract_adr_references(self, text: str) -> List[str]:
"""
Extract ADR numbers from text.
Returns list of ADR IDs in format "ADR-XXX".
"""
if not text:
return []
matches = self.ADR_PATTERN.findall(text)
# Normalize to ADR-XXX format
return [f"ADR-{m.zfill(3)}" for m in matches]
def _get_existing_adrs(self) -> Set[str]:
"""Get set of ADR node IDs that exist in kg_nodes."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id FROM kg_nodes WHERE node_type = 'adr'
""")
return {row[0] for row in cursor}
except Exception:
return set()
def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract DEFINES edges by finding ADR references in decisions.
Scans decision text and rationale for ADR-XXX patterns.
Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
source_conn = self.connect_source()
# Get existing ADR nodes to validate references
existing_adrs = self._get_existing_adrs()
logger.info(f"Found {len(existing_adrs)} existing ADR nodes")
try:
cursor = source_conn.execute("""
SELECT
id,
decision,
rationale,
decision_type,
confidence
FROM decisions
ORDER BY id
""")
for row in cursor:
decision_id = row['id']
decision_text = row['decision'] or ''
rationale = row['rationale'] or ''
# Combine text to search
full_text = f"{decision_text} {rationale}"
# Find ADR references
adr_refs = self._extract_adr_references(full_text)
if not adr_refs:
continue
# Create edges for each referenced ADR
for adr_id in adr_refs:
adr_node_id = f"adr:{adr_id}"
# Skip if ADR node doesn't exist
if adr_node_id not in existing_adrs:
logger.debug(f"Skipping non-existent ADR: {adr_id}")
continue
from_node = adr_node_id
to_node = f"decision:{decision_id}"
# Determine governance level based on context
governance_level = self._infer_governance_level(
adr_id, decision_text, rationale
)
properties = {
'governance_level': governance_level,
'decision_type': row['decision_type'],
'confidence': row['confidence'],
}
# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}
yield (from_node, to_node, properties)
except Exception as e:
logger.error(f"Error extracting DEFINES edges: {e}")
return
def _infer_governance_level(
self,
adr_id: str,
decision_text: str,
rationale: str
) -> str:
"""
Infer governance level based on how ADR is referenced.
Levels:
- mandatory: Decision explicitly follows ADR
- recommended: Decision references ADR as guidance
- informational: ADR mentioned for context only
"""
full_text = f"{decision_text} {rationale}".lower()
adr_lower = adr_id.lower()
# Check for mandatory language
mandatory_patterns = [
f'per {adr_lower}',
f'as per {adr_lower}',
f'following {adr_lower}',
f'according to {adr_lower}',
f'required by {adr_lower}',
f'{adr_lower} requires',
f'{adr_lower} mandates',
]
for pattern in mandatory_patterns:
if pattern in full_text:
return 'mandatory'
# Check for recommended language
recommended_patterns = [
f'{adr_lower} recommends',
f'based on {adr_lower}',
f'guided by {adr_lower}',
f'see {adr_lower}',
]
for pattern in recommended_patterns:
if pattern in full_text:
return 'recommended'
return 'informational'