scripts-invokes-edge-builder
#!/usr/bin/env python3 """ CP-17: INVOKES Edge Builder (ADR-151)
Creates INVOKES edges from Session nodes to Component nodes.
Edge: session:X -> component:Y Source: sessions.db message_component_invocations table Properties: invocation_count, last_invoked, component_type
Captures which components (agents, skills, commands, hooks) were used in each session, enabling session-to-component lineage tracking.
Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.5.1 """
import logging from collections import defaultdict from pathlib import Path from typing import Any, Dict, Generator, Optional, Tuple
from .base_edge_builder import SQLiteSourceEdgeBuilder
logger = logging.getLogger(name)
class InvokesEdgeBuilder(SQLiteSourceEdgeBuilder): """ Build INVOKES edges from message_component_invocations table.
Aggregates multiple invocations of the same component within a session
into a single edge with count and timing information.
"""
@property
def edge_type(self) -> str:
return "INVOKES"
def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract INVOKES edges from message_component_invocations.
Aggregates invocations per session-component pair.
Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
source_conn = self.connect_source()
# Aggregate invocations per session-component pair
edge_aggregation: Dict[Tuple[str, str], Dict] = {}
try:
cursor = source_conn.execute("""
SELECT
session_id,
component_name,
component_type,
timestamp,
COUNT(*) as invocation_count
FROM message_component_invocations
WHERE session_id IS NOT NULL
AND component_name IS NOT NULL
GROUP BY session_id, component_name, component_type
ORDER BY session_id
""")
for row in cursor:
session_id = row['session_id']
component_name = row['component_name']
component_type = row['component_type'] or 'unknown'
# Generate node IDs
from_node = f"session:{session_id}"
# Component node IDs use format: component:{type}/{name}
to_node = f"component:{component_type}/{component_name}"
# Build properties
properties = {
'invocation_count': row['invocation_count'],
'component_type': component_type,
}
yield (from_node, to_node, properties)
except Exception as e:
logger.error(f"Error extracting INVOKES edges: {e}")
return
def extract_edges_with_timestamps(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Alternative extraction that includes timestamp tracking.
This version queries individual invocations to capture
first_invoked and last_invoked timestamps.
"""
source_conn = self.connect_source()
# Aggregate with min/max timestamps
edge_aggregation: Dict[Tuple[str, str], Dict] = defaultdict(lambda: {
'invocation_count': 0,
'first_invoked': None,
'last_invoked': None,
'component_type': None,
})
try:
cursor = source_conn.execute("""
SELECT
session_id,
component_name,
component_type,
timestamp
FROM message_component_invocations
WHERE session_id IS NOT NULL
AND component_name IS NOT NULL
ORDER BY session_id, timestamp
""")
for row in cursor:
session_id = row['session_id']
component_name = row['component_name']
component_type = row['component_type'] or 'unknown'
timestamp = row['timestamp']
from_node = f"session:{session_id}"
to_node = f"component:{component_type}/{component_name}"
edge_key = (from_node, to_node)
agg = edge_aggregation[edge_key]
agg['invocation_count'] += 1
agg['component_type'] = component_type
if timestamp:
if agg['first_invoked'] is None or timestamp < agg['first_invoked']:
agg['first_invoked'] = timestamp
if agg['last_invoked'] is None or timestamp > agg['last_invoked']:
agg['last_invoked'] = timestamp
except Exception as e:
logger.error(f"Error extracting INVOKES edges: {e}")
return
# Yield aggregated edges
for (from_node, to_node), properties in edge_aggregation.items():
# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}
yield (from_node, to_node, properties)