scripts-calls-edge-builder
#!/usr/bin/env python3 """ CP-16: CALLS Edge Builder (ADR-151)
Creates CALLS edges between function nodes based on call_graph_edges table.
Edge: function:X -> function:Y Source: sessions.db call_graph_edges table Properties: call_count, call_contexts (file:line pairs)
Note: The call_graph_edges table uses caller_id (function node_id) and callee_name (function name string), so we need to resolve callee_name to the target function node.
Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.5.8 """
import logging from collections import defaultdict from pathlib import Path from typing import Any, Dict, Generator, Optional, Tuple
from .base_edge_builder import SQLiteSourceEdgeBuilder
logger = logging.getLogger(name)
class CallsEdgeBuilder(SQLiteSourceEdgeBuilder): """ Build CALLS edges from call_graph_edges table.
Maps function-to-function call relationships from the existing
code analysis into the knowledge graph.
"""
@property
def edge_type(self) -> str:
return "CALLS"
def __init__(
self,
source_db_path: Path,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
super().__init__(source_db_path, target_db_path, dry_run, tenant_id, validate_nodes)
self._function_name_to_nodes: Optional[Dict[str, list]] = None
self._caller_id_to_kg_node: Optional[Dict[str, str]] = None
def _build_function_lookups(self):
"""
Build lookup tables for resolving function names and IDs to kg_nodes.
The call_graph uses:
- caller_id: The node_id in call_graph_functions (e.g., "7eec3b8a1815bc01")
- callee_name: Just the function name (e.g., "hasattr")
We need to map these to kg_nodes IDs which use format:
- function:{file_path}:{function_name}
"""
if self._function_name_to_nodes is not None:
return
# Load call_graph_functions to build mappings
source_conn = self.connect_source()
target_conn = self.connect_target()
# Map caller_id to kg_node_id
# call_graph_functions.node_id -> function:{file_path}:{name}
self._caller_id_to_kg_node = {}
self._function_name_to_nodes = defaultdict(list)
try:
# Get all functions from call_graph
cursor = source_conn.execute("""
SELECT node_id, name, file_path
FROM call_graph_functions
""")
for row in cursor:
cg_node_id = row['node_id']
func_name = row['name']
file_path = row['file_path']
# Generate the kg_node_id format used by function_extractor
kg_node_id = f"function:{file_path}:{func_name}"
self._caller_id_to_kg_node[cg_node_id] = kg_node_id
self._function_name_to_nodes[func_name].append({
'kg_node_id': kg_node_id,
'file_path': file_path,
})
logger.info(f"Built lookup for {len(self._caller_id_to_kg_node)} caller IDs")
logger.info(f"Built lookup for {len(self._function_name_to_nodes)} function names")
except Exception as e:
logger.error(f"Error building function lookups: {e}")
self._caller_id_to_kg_node = {}
self._function_name_to_nodes = defaultdict(list)
def _resolve_callee(self, callee_name: str, call_file: str) -> Optional[str]:
"""
Resolve callee function name to kg_node_id.
Prioritizes functions in the same file, then any matching function.
Returns None if no match found (e.g., built-in functions).
"""
candidates = self._function_name_to_nodes.get(callee_name, [])
if not candidates:
return None
# Prefer same-file match
for candidate in candidates:
if candidate['file_path'] == call_file:
return candidate['kg_node_id']
# Fall back to first match
return candidates[0]['kg_node_id']
def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract CALLS edges from call_graph_edges.
Aggregates multiple calls between same function pair into single edge
with call_count and call_contexts properties.
Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
self._build_function_lookups()
source_conn = self.connect_source()
# Aggregate calls between same function pairs
edge_aggregation: Dict[Tuple[str, str], Dict] = {}
try:
cursor = source_conn.execute("""
SELECT
caller_id,
callee_name,
call_line,
call_file,
arguments
FROM call_graph_edges
ORDER BY caller_id, callee_name
""")
for row in cursor:
caller_id = row['caller_id']
callee_name = row['callee_name']
call_line = row['call_line']
call_file = row['call_file']
# Resolve caller to kg_node
from_node = self._caller_id_to_kg_node.get(caller_id)
if not from_node:
continue
# Resolve callee to kg_node
to_node = self._resolve_callee(callee_name, call_file)
if not to_node:
# Skip built-in functions and unresolved callees
continue
# Skip self-calls
if from_node == to_node:
continue
# Aggregate
edge_key = (from_node, to_node)
if edge_key not in edge_aggregation:
edge_aggregation[edge_key] = {
'call_count': 0,
'call_contexts': [],
}
edge_aggregation[edge_key]['call_count'] += 1
# Store up to 10 call contexts
if len(edge_aggregation[edge_key]['call_contexts']) < 10:
edge_aggregation[edge_key]['call_contexts'].append({
'file': call_file,
'line': call_line,
})
except Exception as e:
logger.error(f"Error extracting CALLS edges: {e}")
return
# Yield aggregated edges
for (from_node, to_node), properties in edge_aggregation.items():
yield (from_node, to_node, properties)