Skip to main content

scripts-uses-edge-builder

#!/usr/bin/env python3 """ CP-22: USES Edge Builder (ADR-151)

Creates USES edges from Component nodes to File nodes.

Edge: component:X -> file:Y Source: Component path determines primary file Properties: usage_type (source, config, test)

Links components to their source files.

Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.5.9 """

import json import logging from pathlib import Path from typing import Any, Dict, Generator, Optional, Set, Tuple

from .base_edge_builder import BaseEdgeBuilder

logger = logging.getLogger(name)

class UsesEdgeBuilder(BaseEdgeBuilder): """ Build USES edges linking components to their source files.

Each component (agent, skill, command, script) has associated files.
This builder creates edges based on component metadata.
"""

def __init__(
self,
target_db_path: Path,
framework_root: Optional[Path] = None,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
"""
Initialize edge builder.

Args:
target_db_path: Path to org.db
framework_root: Optional path to coditect-core root
dry_run: If True, don't write to database
tenant_id: Optional tenant ID
validate_nodes: If True, verify nodes exist
"""
super().__init__(target_db_path, dry_run, tenant_id, validate_nodes)
self.framework_root = framework_root

@property
def edge_type(self) -> str:
return "USES"

def _get_existing_files(self) -> Set[str]:
"""Get set of file node IDs that exist in kg_nodes."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id FROM kg_nodes WHERE node_type = 'file'
""")
return {row[0] for row in cursor}
except Exception:
return set()

def _infer_usage_type(self, file_path: str, component_type: str) -> str:
"""
Infer usage type from file path.

Types:
- source: Main component file
- config: Configuration files
- test: Test files
- resource: Supporting resources
"""
path_lower = file_path.lower()

if 'test' in path_lower:
return 'test'
if any(cfg in path_lower for cfg in ['config', 'settings', '.yaml', '.yml', '.json']):
return 'config'

return 'source'

def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract USES edges from component nodes.

Reads component node properties to find associated file paths.

Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
conn = self.connect_target()

# Get existing file nodes for validation
existing_files = self._get_existing_files()
logger.info(f"Found {len(existing_files)} existing file nodes")

try:
# Query component nodes with their properties
cursor = conn.execute("""
SELECT id, node_type, subtype, name, properties
FROM kg_nodes
WHERE node_type = 'component'
""")

for row in cursor:
component_id = row['id']
subtype = row['subtype'] # agent, skill, command, etc.
properties_json = row['properties']

if not properties_json:
continue

try:
properties = json.loads(properties_json)
except json.JSONDecodeError:
continue

# Extract file path from properties
file_path = properties.get('file_path') or properties.get('path')
if not file_path:
continue

# Find matching file node
file_node_id = self._find_file_node(file_path, existing_files)
if not file_node_id:
continue

# Create edge
from_node = component_id
to_node = file_node_id

usage_type = self._infer_usage_type(file_path, subtype or 'unknown')

edge_properties = {
'usage_type': usage_type,
'file_path': file_path,
}

yield (from_node, to_node, edge_properties)

except Exception as e:
logger.error(f"Error extracting USES edges: {e}")
return

def _find_file_node(self, file_path: str, existing_files: Set[str]) -> Optional[str]:
"""
Find the kg_node ID for a file path.

Handles both absolute and relative paths.
"""
# Try direct match
full_node_id = f"file:{file_path}"
if full_node_id in existing_files:
return full_node_id

# Try with framework root prefix
if self.framework_root and not file_path.startswith('/'):
abs_path = str(self.framework_root / file_path)
abs_node_id = f"file:{abs_path}"
if abs_node_id in existing_files:
return abs_node_id

# Try suffix matching
normalized = file_path.lstrip('./')
for node_id in existing_files:
if node_id.endswith(f"/{normalized}") or node_id.endswith(f":{normalized}"):
return node_id

return None