scripts-references-edge-builder
#!/usr/bin/env python3 """ CP-21: REFERENCES Edge Builder (ADR-151)
Creates REFERENCES edges from Decision nodes to File or ADR nodes.
Edge: decision:X -> file:Y OR decision:X -> adr:Y Source: Parse file paths and ADR-XXX patterns in decision text Properties: reference_type (file_path, adr_number), context
Links decisions to the files and documents they reference.
Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.5.7 """
import json import logging import re from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Set, Tuple
from .base_edge_builder import SQLiteSourceEdgeBuilder
logger = logging.getLogger(name)
class ReferencesEdgeBuilder(SQLiteSourceEdgeBuilder): """ Build REFERENCES edges linking decisions to files and ADRs they mention.
Parses decision text for:
- File paths (e.g., /path/to/file.py, scripts/foo.py)
- ADR references (ADR-XXX)
"""
# Pattern for file paths
FILE_PATH_PATTERN = re.compile(
r'(?:^|[\s\(`\'"])' # Start or whitespace/delimiter
r'((?:\.{1,2}/|/|~/)[\w\-./]+\.\w+|' # Unix paths with extension
r'[\w\-]+/[\w\-./]+\.\w+)', # Relative paths like scripts/foo.py
re.MULTILINE
)
# Pattern for ADR references
ADR_PATTERN = re.compile(r'ADR[- ]?(\d{3})', re.IGNORECASE)
@property
def edge_type(self) -> str:
return "REFERENCES"
def _extract_file_references(self, text: str) -> List[str]:
"""
Extract file paths from text.
Returns list of file paths found.
"""
if not text:
return []
matches = self.FILE_PATH_PATTERN.findall(text)
# Clean and deduplicate
paths = []
for match in matches:
path = match.strip()
# Skip common false positives
if any(skip in path.lower() for skip in [
'http://', 'https://', '.com/', '.org/',
'example.', 'test.'
]):
continue
# Must have a reasonable file extension
ext = Path(path).suffix.lower()
if ext in ['.py', '.md', '.yaml', '.yml', '.json', '.sh',
'.js', '.ts', '.tsx', '.jsx', '.sql', '.html', '.css']:
paths.append(path)
return list(set(paths))
def _extract_adr_references(self, text: str) -> List[str]:
"""Extract ADR numbers from text."""
if not text:
return []
matches = self.ADR_PATTERN.findall(text)
return [f"ADR-{m.zfill(3)}" for m in matches]
def _get_existing_files(self) -> Set[str]:
"""Get set of file node IDs that exist in kg_nodes."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id FROM kg_nodes WHERE node_type = 'file'
""")
return {row[0] for row in cursor}
except Exception:
return set()
def _get_existing_adrs(self) -> Set[str]:
"""Get set of ADR node IDs that exist in kg_nodes."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id FROM kg_nodes WHERE node_type = 'adr'
""")
return {row[0] for row in cursor}
except Exception:
return set()
def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract REFERENCES edges from decision text.
Finds file paths and ADR references in decisions.
Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
source_conn = self.connect_source()
# Get existing nodes to validate references
existing_files = self._get_existing_files()
existing_adrs = self._get_existing_adrs()
logger.info(f"Found {len(existing_files)} file nodes, {len(existing_adrs)} ADR nodes")
try:
cursor = source_conn.execute("""
SELECT
id,
decision,
rationale,
project_path
FROM decisions
ORDER BY id
""")
for row in cursor:
decision_id = row['id']
decision_text = row['decision'] or ''
rationale = row['rationale'] or ''
project_path = row['project_path'] or ''
full_text = f"{decision_text} {rationale}"
from_node = f"decision:{decision_id}"
# Extract and create file references
file_refs = self._extract_file_references(full_text)
for file_path in file_refs:
# Try to find matching file node
# File nodes use format: file:{absolute_path}
file_node_id = self._find_file_node(file_path, existing_files)
if file_node_id:
yield (
from_node,
file_node_id,
{
'reference_type': 'file_path',
'referenced_path': file_path,
}
)
# Extract and create ADR references
adr_refs = self._extract_adr_references(full_text)
for adr_id in adr_refs:
adr_node_id = f"adr:{adr_id}"
if adr_node_id in existing_adrs:
yield (
from_node,
adr_node_id,
{
'reference_type': 'adr_number',
'referenced_adr': adr_id,
}
)
except Exception as e:
logger.error(f"Error extracting REFERENCES edges: {e}")
return
def _find_file_node(self, file_path: str, existing_files: Set[str]) -> Optional[str]:
"""
Find the kg_node ID for a file path reference.
Handles relative paths by searching for suffix matches.
"""
# Direct match
full_node_id = f"file:{file_path}"
if full_node_id in existing_files:
return full_node_id
# Try suffix matching for relative paths
normalized = file_path.lstrip('./')
for node_id in existing_files:
if node_id.endswith(f"/{normalized}") or node_id.endswith(f":{normalized}"):
return node_id
return None