Skip to main content

#!/usr/bin/env python3 """ ADR-149 Phase 1 - Knowledge Graph Traversal Strategies (J.4.7.3)

Provides different traversal strategies for exploring the knowledge graph:

  • bfs: Breadth-first search (default, existing behavior)
  • dfs: Depth-first search (prioritizes depth over breadth)
  • shortest: Find shortest path between two nodes
  • policy_first: Start from policy/ADR nodes, then expand
  • semantic: Use semantic relevance scoring for traversal order

Created: 2026-02-03 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.4.7.3 """

import json import logging import sqlite3 from collections import deque from dataclasses import dataclass from typing import Any, Dict, List, Optional, Set, Tuple

logger = logging.getLogger(name)

@dataclass class TraversalResult: """Result of a graph traversal operation.""" success: bool nodes: List[Dict[str, Any]] edges: List[Dict[str, Any]] strategy: str start_node: Optional[str] = None end_node: Optional[str] = None depth: int = 0 error: Optional[str] = None

Valid traversal strategies

TRAVERSAL_STRATEGIES = ['bfs', 'dfs', 'shortest', 'policy_first', 'semantic']

def validate_traversal_strategy(strategy: str) -> Tuple[bool, Optional[str]]: """ Validate that the strategy is valid.

Args:
strategy: Strategy name to validate

Returns:
Tuple of (is_valid, error_message)
"""
if strategy not in TRAVERSAL_STRATEGIES:
return False, f"Invalid strategy '{strategy}'. Valid options: {', '.join(TRAVERSAL_STRATEGIES)}"
return True, None

def traverse_graph( conn: sqlite3.Connection, start_node: str, strategy: str = 'bfs', max_depth: int = 3, max_nodes: int = 50, end_node: Optional[str] = None, edge_types: Optional[List[str]] = None, node_types: Optional[List[str]] = None, ) -> TraversalResult: """ Traverse the knowledge graph using the specified strategy.

Args:
conn: Connection to org.db containing kg_nodes and kg_edges
start_node: Node ID or name to start from
strategy: Traversal strategy ('bfs', 'dfs', 'shortest', 'policy_first', 'semantic')
max_depth: Maximum traversal depth
max_nodes: Maximum nodes to return
end_node: Target node for 'shortest' strategy
edge_types: Filter edges by type
node_types: Filter nodes by type

Returns:
TraversalResult with nodes and edges
"""
# Validate strategy
valid, error = validate_traversal_strategy(strategy)
if not valid:
return TraversalResult(
success=False,
nodes=[],
edges=[],
strategy=strategy,
error=error
)

# Resolve start node (can be ID or name)
resolved_start = _resolve_node(conn, start_node)
if not resolved_start:
return TraversalResult(
success=False,
nodes=[],
edges=[],
strategy=strategy,
start_node=start_node,
error=f"Start node not found: {start_node}"
)

# Resolve end node for shortest path
resolved_end = None
if strategy == 'shortest' and end_node:
resolved_end = _resolve_node(conn, end_node)
if not resolved_end:
return TraversalResult(
success=False,
nodes=[],
edges=[],
strategy=strategy,
start_node=start_node,
end_node=end_node,
error=f"End node not found: {end_node}"
)

# Execute strategy
if strategy == 'bfs':
return _traverse_bfs(conn, resolved_start, max_depth, max_nodes, edge_types, node_types)
elif strategy == 'dfs':
return _traverse_dfs(conn, resolved_start, max_depth, max_nodes, edge_types, node_types)
elif strategy == 'shortest':
if not resolved_end:
return TraversalResult(
success=False,
nodes=[],
edges=[],
strategy=strategy,
error="'shortest' strategy requires --end-node"
)
return _traverse_shortest(conn, resolved_start, resolved_end, max_depth, edge_types)
elif strategy == 'policy_first':
return _traverse_policy_first(conn, resolved_start, max_depth, max_nodes, edge_types)
elif strategy == 'semantic':
return _traverse_semantic(conn, resolved_start, max_depth, max_nodes, edge_types, node_types)
else:
return TraversalResult(
success=False,
nodes=[],
edges=[],
strategy=strategy,
error=f"Unimplemented strategy: {strategy}"
)

def _resolve_node(conn: sqlite3.Connection, node_identifier: str) -> Optional[str]: """ Resolve a node identifier (ID or name) to a node ID.

Args:
conn: Database connection
node_identifier: Node ID or name

Returns:
Node ID if found, None otherwise
"""
# Try direct ID match first
cursor = conn.execute("SELECT id FROM kg_nodes WHERE id = ?", (node_identifier,))
row = cursor.fetchone()
if row:
return row[0]

# Try name match
cursor = conn.execute("SELECT id FROM kg_nodes WHERE name = ?", (node_identifier,))
row = cursor.fetchone()
if row:
return row[0]

# Try partial name match
cursor = conn.execute(
"SELECT id FROM kg_nodes WHERE name LIKE ? LIMIT 1",
(f"%{node_identifier}%",)
)
row = cursor.fetchone()
if row:
return row[0]

return None

def _fetch_node_dict(conn: sqlite3.Connection, node_id: str) -> Optional[Dict[str, Any]]: """Fetch a node as a dictionary.""" cursor = conn.execute(""" SELECT id, node_type, name, subtype, properties, created_at, updated_at FROM kg_nodes WHERE id = ? """, (node_id,)) row = cursor.fetchone() if not row: return None

properties = None
if row[4]:
try:
properties = json.loads(row[4])
except json.JSONDecodeError:
pass

return {
'id': row[0],
'node_type': row[1],
'name': row[2],
'subtype': row[3],
'properties': properties,
'created_at': row[5],
'updated_at': row[6],
}

def _get_neighbors( conn: sqlite3.Connection, node_id: str, edge_types: Optional[List[str]] = None, direction: str = 'both' ) -> List[Tuple[str, str, Dict[str, Any]]]: """ Get neighbors of a node.

Args:
conn: Database connection
node_id: Node ID
edge_types: Filter by edge types
direction: 'outgoing', 'incoming', or 'both'

Returns:
List of (neighbor_id, edge_type, edge_dict) tuples
"""
neighbors = []

# Outgoing edges
if direction in ('outgoing', 'both'):
query = "SELECT to_node, edge_type, properties FROM kg_edges WHERE from_node = ?"
params = [node_id]

if edge_types:
placeholders = ",".join("?" * len(edge_types))
query += f" AND edge_type IN ({placeholders})"
params.extend(edge_types)

cursor = conn.execute(query, params)
for row in cursor:
edge_dict = {
'from_node': node_id,
'to_node': row[0],
'edge_type': row[1],
'direction': 'outgoing'
}
neighbors.append((row[0], row[1], edge_dict))

# Incoming edges
if direction in ('incoming', 'both'):
query = "SELECT from_node, edge_type, properties FROM kg_edges WHERE to_node = ?"
params = [node_id]

if edge_types:
placeholders = ",".join("?" * len(edge_types))
query += f" AND edge_type IN ({placeholders})"
params.extend(edge_types)

cursor = conn.execute(query, params)
for row in cursor:
edge_dict = {
'from_node': row[0],
'to_node': node_id,
'edge_type': row[1],
'direction': 'incoming'
}
neighbors.append((row[0], row[1], edge_dict))

return neighbors

def _traverse_bfs( conn: sqlite3.Connection, start_node: str, max_depth: int, max_nodes: int, edge_types: Optional[List[str]], node_types: Optional[List[str]], ) -> TraversalResult: """Breadth-first search traversal.""" nodes = [] edges = [] visited: Set[str] = set() queue: deque = deque([(start_node, 0)]) # (node_id, depth)

while queue and len(nodes) < max_nodes:
node_id, depth = queue.popleft()

if node_id in visited:
continue
visited.add(node_id)

# Fetch node
node = _fetch_node_dict(conn, node_id)
if not node:
continue

# Filter by node type
if node_types and node['node_type'] not in node_types:
continue

node['depth'] = depth
nodes.append(node)

# Stop expanding if at max depth
if depth >= max_depth:
continue

# Get neighbors
for neighbor_id, edge_type, edge_dict in _get_neighbors(conn, node_id, edge_types):
if neighbor_id not in visited:
queue.append((neighbor_id, depth + 1))
edges.append(edge_dict)

return TraversalResult(
success=True,
nodes=nodes,
edges=edges,
strategy='bfs',
start_node=start_node,
depth=max(n.get('depth', 0) for n in nodes) if nodes else 0,
)

def _traverse_dfs( conn: sqlite3.Connection, start_node: str, max_depth: int, max_nodes: int, edge_types: Optional[List[str]], node_types: Optional[List[str]], ) -> TraversalResult: """Depth-first search traversal.""" nodes = [] edges = [] visited: Set[str] = set() stack = [(start_node, 0)] # (node_id, depth)

while stack and len(nodes) < max_nodes:
node_id, depth = stack.pop()

if node_id in visited:
continue
visited.add(node_id)

# Fetch node
node = _fetch_node_dict(conn, node_id)
if not node:
continue

# Filter by node type
if node_types and node['node_type'] not in node_types:
continue

node['depth'] = depth
nodes.append(node)

# Stop expanding if at max depth
if depth >= max_depth:
continue

# Get neighbors (add in reverse to maintain order)
neighbors = _get_neighbors(conn, node_id, edge_types)
for neighbor_id, edge_type, edge_dict in reversed(neighbors):
if neighbor_id not in visited:
stack.append((neighbor_id, depth + 1))
edges.append(edge_dict)

return TraversalResult(
success=True,
nodes=nodes,
edges=edges,
strategy='dfs',
start_node=start_node,
depth=max(n.get('depth', 0) for n in nodes) if nodes else 0,
)

def _traverse_shortest( conn: sqlite3.Connection, start_node: str, end_node: str, max_depth: int, edge_types: Optional[List[str]], ) -> TraversalResult: """Find shortest path between two nodes using BFS.""" visited: Set[str] = set() queue: deque = deque([(start_node, [start_node], [])]) # (node_id, path, edges)

while queue:
node_id, path, path_edges = queue.popleft()

if len(path) > max_depth + 1:
continue

if node_id == end_node:
# Found the shortest path
nodes = []
for i, nid in enumerate(path):
node = _fetch_node_dict(conn, nid)
if node:
node['depth'] = i
nodes.append(node)

return TraversalResult(
success=True,
nodes=nodes,
edges=path_edges,
strategy='shortest',
start_node=start_node,
end_node=end_node,
depth=len(path) - 1,
)

if node_id in visited:
continue
visited.add(node_id)

# Get neighbors
for neighbor_id, edge_type, edge_dict in _get_neighbors(conn, node_id, edge_types):
if neighbor_id not in visited:
queue.append((
neighbor_id,
path + [neighbor_id],
path_edges + [edge_dict]
))

return TraversalResult(
success=False,
nodes=[],
edges=[],
strategy='shortest',
start_node=start_node,
end_node=end_node,
error=f"No path found from '{start_node}' to '{end_node}' within depth {max_depth}"
)

def _traverse_policy_first( conn: sqlite3.Connection, start_node: str, max_depth: int, max_nodes: int, edge_types: Optional[List[str]], ) -> TraversalResult: """ Policy-first traversal: prioritize policy and ADR nodes.

This traversal is useful for understanding governance and architectural
context around a concept.
"""
nodes = []
edges = []
visited: Set[str] = set()

# Priority queue: (priority, depth, node_id)
# Lower priority number = higher priority
# Policy nodes get priority 0, ADRs get 1, others get 2
import heapq
priority_queue = [(2, 0, start_node)] # Start node

while priority_queue and len(nodes) < max_nodes:
priority, depth, node_id = heapq.heappop(priority_queue)

if node_id in visited:
continue
visited.add(node_id)

# Fetch node
node = _fetch_node_dict(conn, node_id)
if not node:
continue

node['depth'] = depth
node['priority'] = priority
nodes.append(node)

# Stop expanding if at max depth
if depth >= max_depth:
continue

# Get neighbors and assign priorities
for neighbor_id, edge_type, edge_dict in _get_neighbors(conn, node_id, edge_types):
if neighbor_id not in visited:
# Determine priority based on node type
neighbor_node = _fetch_node_dict(conn, neighbor_id)
if neighbor_node:
neighbor_type = neighbor_node.get('node_type', '')
if neighbor_type == 'policy':
neighbor_priority = 0
elif neighbor_type == 'adr':
neighbor_priority = 1
elif neighbor_type == 'decision':
neighbor_priority = 1
else:
neighbor_priority = 2

heapq.heappush(priority_queue, (neighbor_priority, depth + 1, neighbor_id))
edges.append(edge_dict)

return TraversalResult(
success=True,
nodes=nodes,
edges=edges,
strategy='policy_first',
start_node=start_node,
depth=max(n.get('depth', 0) for n in nodes) if nodes else 0,
)

def _traverse_semantic( conn: sqlite3.Connection, start_node: str, max_depth: int, max_nodes: int, edge_types: Optional[List[str]], node_types: Optional[List[str]], ) -> TraversalResult: """ Semantic traversal: use node importance/relevance scoring.

This traversal prioritizes nodes with:
- More connections (higher degree)
- More properties (richer content)
- More recent updates
"""
nodes = []
edges = []
visited: Set[str] = set()

# Priority queue: (negative_score, depth, node_id)
# Higher score = more relevant, use negative for min-heap
import heapq

start_score = _compute_node_score(conn, start_node)
priority_queue = [(-start_score, 0, start_node)]

while priority_queue and len(nodes) < max_nodes:
neg_score, depth, node_id = heapq.heappop(priority_queue)
score = -neg_score

if node_id in visited:
continue
visited.add(node_id)

# Fetch node
node = _fetch_node_dict(conn, node_id)
if not node:
continue

# Filter by node type
if node_types and node['node_type'] not in node_types:
continue

node['depth'] = depth
node['semantic_score'] = round(score, 4)
nodes.append(node)

# Stop expanding if at max depth
if depth >= max_depth:
continue

# Get neighbors and compute scores
for neighbor_id, edge_type, edge_dict in _get_neighbors(conn, node_id, edge_types):
if neighbor_id not in visited:
neighbor_score = _compute_node_score(conn, neighbor_id)
# Apply depth decay
decayed_score = neighbor_score * (0.8 ** (depth + 1))
heapq.heappush(priority_queue, (-decayed_score, depth + 1, neighbor_id))
edges.append(edge_dict)

return TraversalResult(
success=True,
nodes=nodes,
edges=edges,
strategy='semantic',
start_node=start_node,
depth=max(n.get('depth', 0) for n in nodes) if nodes else 0,
)

def _compute_node_score(conn: sqlite3.Connection, node_id: str) -> float: """ Compute a relevance score for a node.

Factors:
- Degree (number of connections)
- Property richness
- Node type importance
"""
score = 0.0

# Degree centrality (count edges)
cursor = conn.execute("""
SELECT
(SELECT COUNT(*) FROM kg_edges WHERE from_node = ?) +
(SELECT COUNT(*) FROM kg_edges WHERE to_node = ?) as degree
""", (node_id, node_id))
row = cursor.fetchone()
degree = row[0] if row else 0
score += min(0.4, degree * 0.05) # Cap at 0.4

# Property richness
cursor = conn.execute("SELECT properties FROM kg_nodes WHERE id = ?", (node_id,))
row = cursor.fetchone()
if row and row[0]:
try:
props = json.loads(row[0])
score += min(0.2, len(props) * 0.04)
except json.JSONDecodeError:
pass

# Node type importance
cursor = conn.execute("SELECT node_type FROM kg_nodes WHERE id = ?", (node_id,))
row = cursor.fetchone()
if row:
type_scores = {
'decision': 0.3,
'adr': 0.3,
'policy': 0.25,
'error_solution': 0.2,
'skill_learning': 0.15,
'component': 0.1,
'file': 0.05,
}
score += type_scores.get(row[0], 0.1)

return min(1.0, score)

def format_traversal_result(result: TraversalResult, format: str = 'text') -> str: """ Format traversal result for display.

Args:
result: TraversalResult to format
format: 'text', 'markdown', or 'json'

Returns:
Formatted string
"""
if format == 'json':
return json.dumps({
'success': result.success,
'strategy': result.strategy,
'start_node': result.start_node,
'end_node': result.end_node,
'depth': result.depth,
'node_count': len(result.nodes),
'edge_count': len(result.edges),
'nodes': result.nodes,
'edges': result.edges,
'error': result.error,
}, indent=2, default=str)

lines = []

if not result.success:
lines.append(f"ERROR: {result.error}")
return "\n".join(lines)

# Header
if format == 'markdown':
lines.append(f"# Graph Traversal: {result.strategy.upper()}")
lines.append("")
lines.append(f"**Start:** {result.start_node}")
if result.end_node:
lines.append(f"**End:** {result.end_node}")
lines.append(f"**Depth:** {result.depth}")
lines.append(f"**Nodes:** {len(result.nodes)} | **Edges:** {len(result.edges)}")
lines.append("")
lines.append("## Nodes")
lines.append("")
else:
lines.append(f"Graph Traversal: {result.strategy.upper()}")
lines.append("-" * 50)
lines.append(f"Start: {result.start_node}")
if result.end_node:
lines.append(f"End: {result.end_node}")
lines.append(f"Depth: {result.depth}")
lines.append(f"Nodes: {len(result.nodes)}, Edges: {len(result.edges)}")
lines.append("-" * 50)

# Nodes
for node in result.nodes:
depth_indent = " " * node.get('depth', 0)
node_type = node.get('node_type', 'unknown')
name = node.get('name', node.get('id', 'unnamed'))

if format == 'markdown':
score_str = ""
if 'semantic_score' in node:
score_str = f" (score: {node['semantic_score']:.2f})"
elif 'priority' in node:
score_str = f" (priority: {node['priority']})"
lines.append(f"{depth_indent}- **[{node_type}]** {name}{score_str}")
else:
score_str = ""
if 'semantic_score' in node:
score_str = f" [score:{node['semantic_score']:.2f}]"
elif 'priority' in node:
score_str = f" [pri:{node['priority']}]"
lines.append(f"{depth_indent}[{node_type}] {name}{score_str}")

return "\n".join(lines)