scripts-governed-by-edge-builder
#!/usr/bin/env python3 """ CP-25: GOVERNED_BY Edge Builder (ADR-151)
Creates GOVERNED_BY edges from components/tracks to ADR nodes.
Edge: component:X -> adr:Y OR track:X -> adr:Y Source: ADR frontmatter (governs:) or track governance.adrs: Properties: governance_type (mandatory, recommended)
Links entities to the ADRs that govern their behavior.
Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.5.10 """
import json import logging import re from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Set, Tuple
import yaml
from .base_edge_builder import BaseEdgeBuilder
logger = logging.getLogger(name)
class GovernedByEdgeBuilder(BaseEdgeBuilder): """ Build GOVERNED_BY edges linking entities to governing ADRs.
Sources:
1. ADR frontmatter: governs: list of components/tracks
2. Track files: governance.adrs: list of governing ADRs
3. Component frontmatter: governed_by: list of ADRs
"""
def __init__(
self,
target_db_path: Path,
framework_root: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
"""
Initialize edge builder.
Args:
target_db_path: Path to org.db
framework_root: Path to coditect-core root
dry_run: If True, don't write to database
tenant_id: Optional tenant ID
validate_nodes: If True, verify nodes exist
"""
super().__init__(target_db_path, dry_run, tenant_id, validate_nodes)
self.framework_root = framework_root
@property
def edge_type(self) -> str:
return "GOVERNED_BY"
def _parse_frontmatter(self, content: str) -> Optional[Dict]:
"""Extract YAML frontmatter from markdown content."""
match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL)
if match:
try:
return yaml.safe_load(match.group(1))
except yaml.YAMLError:
return None
return None
def _get_existing_adrs(self) -> Set[str]:
"""Get set of ADR node IDs that exist in kg_nodes."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id FROM kg_nodes WHERE node_type = 'adr'
""")
return {row[0] for row in cursor}
except Exception:
return set()
def _get_existing_components(self) -> Set[str]:
"""Get set of component node IDs that exist in kg_nodes."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id FROM kg_nodes WHERE node_type = 'component'
""")
return {row[0] for row in cursor}
except Exception:
return set()
def _get_existing_tracks(self) -> Set[str]:
"""Get set of track node IDs that exist in kg_nodes."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id FROM kg_nodes WHERE node_type = 'track'
""")
return {row[0] for row in cursor}
except Exception:
return set()
def _extract_adr_refs(self, text: str) -> List[str]:
"""Extract ADR references from text."""
pattern = re.compile(r'ADR[- ]?(\d{3})', re.IGNORECASE)
matches = pattern.findall(str(text))
return [f"ADR-{m.zfill(3)}" for m in matches]
def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract GOVERNED_BY edges from multiple sources.
Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
existing_adrs = self._get_existing_adrs()
existing_components = self._get_existing_components()
existing_tracks = self._get_existing_tracks()
logger.info(f"Found {len(existing_adrs)} ADRs, {len(existing_components)} components, {len(existing_tracks)} tracks")
# 1. Extract from ADR 'governs' frontmatter
yield from self._extract_from_adr_governs(existing_adrs, existing_components, existing_tracks)
# 2. Extract from track governance sections
yield from self._extract_from_track_governance(existing_adrs, existing_tracks)
# 3. Extract from component 'governed_by' frontmatter
yield from self._extract_from_component_governed_by(existing_adrs, existing_components)
# 4. J.25.4.2: Extract from ADR references in track file content
yield from self._extract_from_track_content_refs(existing_adrs, existing_tracks)
def _extract_from_adr_governs(
self,
existing_adrs: Set[str],
existing_components: Set[str],
existing_tracks: Set[str],
) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""Extract governance edges from ADR 'governs:' frontmatter."""
adrs_dir = self.framework_root / "internal" / "architecture" / "adrs"
if not adrs_dir.exists():
return
for adr_file in adrs_dir.glob("ADR-*.md"):
try:
content = adr_file.read_text(encoding='utf-8')
frontmatter = self._parse_frontmatter(content)
if not frontmatter:
continue
# Get ADR ID from filename
adr_match = re.match(r'ADR-(\d+)', adr_file.stem)
if not adr_match:
continue
adr_id = f"ADR-{adr_match.group(1).zfill(3)}"
adr_node = f"adr:{adr_id}"
if adr_node not in existing_adrs:
continue
# Check for 'governs' field
governs = frontmatter.get('governs', [])
if isinstance(governs, str):
governs = [governs]
for governed_entity in governs:
# Try to match to component or track
from_node = self._resolve_governed_entity(
governed_entity, existing_components, existing_tracks
)
if from_node:
yield (
from_node,
adr_node,
{
'governance_type': 'mandatory',
'source': 'adr_governs',
}
)
except Exception as e:
logger.debug(f"Error parsing ADR {adr_file}: {e}")
def _extract_from_track_governance(
self,
existing_adrs: Set[str],
existing_tracks: Set[str],
) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""Extract governance edges from track files."""
tracks_dir = self.framework_root / "internal" / "project" / "plans" / "tracks"
if not tracks_dir.exists():
return
for track_file in tracks_dir.glob("TRACK-*.md"):
try:
content = track_file.read_text(encoding='utf-8')
frontmatter = self._parse_frontmatter(content)
if not frontmatter:
continue
# Get track letter from filename
track_match = re.match(r'TRACK-([A-Z]+)', track_file.stem, re.IGNORECASE)
if not track_match:
continue
track_letter = track_match.group(1).upper()
track_node = f"track:{track_letter}"
if track_node not in existing_tracks:
continue
# Check for governance ADRs
governance = frontmatter.get('governance', {})
if isinstance(governance, dict):
adrs = governance.get('adrs', [])
else:
adrs = []
# Also check 'related_adrs' field
related = frontmatter.get('related_adrs', [])
if isinstance(related, str):
related = self._extract_adr_refs(related)
all_adrs = set(adrs) | set(related)
for adr_ref in all_adrs:
adr_id = adr_ref if adr_ref.startswith('ADR-') else f"ADR-{adr_ref}"
adr_node = f"adr:{adr_id}"
if adr_node in existing_adrs:
yield (
track_node,
adr_node,
{
'governance_type': 'recommended',
'source': 'track_governance',
}
)
except Exception as e:
logger.debug(f"Error parsing track {track_file}: {e}")
def _extract_from_component_governed_by(
self,
existing_adrs: Set[str],
existing_components: Set[str],
) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""Extract governance edges from component 'governed_by' frontmatter."""
# Query kg_nodes for component properties containing ADR references
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id, properties
FROM kg_nodes
WHERE node_type = 'component'
AND properties LIKE '%adr%'
""")
for row in cursor:
component_id = row['id']
properties_json = row['properties']
if not properties_json:
continue
try:
properties = json.loads(properties_json)
except json.JSONDecodeError:
continue
# Check for governed_by field
governed_by = properties.get('governed_by', [])
if isinstance(governed_by, str):
governed_by = self._extract_adr_refs(governed_by)
# Also check 'related' field for ADR references
related = properties.get('related', [])
if isinstance(related, str):
related = self._extract_adr_refs(related)
elif isinstance(related, list):
adr_refs = []
for item in related:
adr_refs.extend(self._extract_adr_refs(str(item)))
related = adr_refs
all_adrs = set(governed_by) | set(related)
for adr_id in all_adrs:
adr_node = f"adr:{adr_id}"
if adr_node in existing_adrs:
yield (
component_id,
adr_node,
{
'governance_type': 'recommended',
'source': 'component_properties',
}
)
except Exception as e:
logger.error(f"Error extracting component governance: {e}")
def _extract_from_track_content_refs(
self,
existing_adrs: Set[str],
existing_tracks: Set[str],
) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
J.25.4.2: Extract GOVERNED_BY edges from ADR references in track content.
Scans track file markdown content for ADR-NNN references and creates
governance edges. This provides coverage when frontmatter fields are
not yet populated.
"""
tracks_dir = self.framework_root / "internal" / "project" / "plans"
# Check both pilot-tracks and tracks directories
for subdir in ["pilot-tracks", "tracks"]:
track_dir = tracks_dir / subdir
if not track_dir.exists():
continue
for track_file in track_dir.glob("TRACK-*.md"):
try:
content = track_file.read_text(encoding='utf-8')
# Get track letter from filename
track_match = re.match(r'TRACK-([A-Z]+)', track_file.stem, re.IGNORECASE)
if not track_match:
continue
track_letter = track_match.group(1).upper()
track_node = f"track:{track_letter}"
if track_node not in existing_tracks:
continue
# Find all ADR references in content
adr_refs = set(self._extract_adr_refs(content))
for adr_id in adr_refs:
adr_node = f"adr:{adr_id}"
if adr_node in existing_adrs:
yield (
track_node,
adr_node,
{
'governance_type': 'referenced',
'source': 'track_content',
}
)
except Exception as e:
logger.debug(f"Error scanning track content {track_file}: {e}")
def _resolve_governed_entity(
self,
entity_ref: str,
existing_components: Set[str],
existing_tracks: Set[str],
) -> Optional[str]:
"""Resolve an entity reference to a kg_node ID."""
ref_lower = str(entity_ref).lower()
# Check if it's a track reference
track_match = re.match(r'track[- ]?([a-z]+)', ref_lower, re.IGNORECASE)
if track_match:
track_node = f"track:{track_match.group(1).upper()}"
if track_node in existing_tracks:
return track_node
# Check if it's a component reference
for comp_type in ['agent', 'skill', 'command', 'hook', 'script']:
comp_node = f"component:{comp_type}/{entity_ref}"
if comp_node in existing_components:
return comp_node
return None