Skip to main content

scripts-similar-to-edge-builder

#!/usr/bin/env python3 """ CP-24: SIMILAR_TO Edge Builder (ADR-151, J.31.4)

Creates SIMILAR_TO edges between nodes with high embedding similarity.

Edge: any:X <-> any:Y (same type) Source: Compute from embedding vectors in kg_nodes Properties: similarity_score (0.0-1.0)

Uses numpy for efficient batch cosine similarity computation. Groups nodes by type to avoid cross-type comparisons.

Created: 2026-02-03 Updated: 2026-02-11 Track: J (Memory Intelligence) Task: J.3.5.6, J.31.4 """

import logging import struct from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Tuple

from .base_edge_builder import BaseEdgeBuilder

logger = logging.getLogger(name)

class SimilarToEdgeBuilder(BaseEdgeBuilder): """ Build SIMILAR_TO edges based on embedding similarity.

Groups nodes by type, loads embeddings into numpy arrays,
computes pairwise cosine similarity, and yields edges for
pairs above the configured threshold (top-k per node).
"""

def __init__(
self,
target_db_path: Path,
similarity_threshold: float = 0.85,
max_similar_per_node: int = 5,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
super().__init__(target_db_path, dry_run, tenant_id, validate_nodes)
self.similarity_threshold = similarity_threshold
self.max_similar_per_node = max_similar_per_node

@property
def edge_type(self) -> str:
return "SIMILAR_TO"

def _check_embeddings_available(self) -> bool:
"""Check if any nodes have embeddings."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT COUNT(*) FROM kg_nodes WHERE embedding IS NOT NULL
""")
count = cursor.fetchone()[0]
return count > 0
except Exception:
return False

def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract SIMILAR_TO edges based on embedding similarity.

Groups nodes by type, computes pairwise cosine similarity within
each group, and yields edges for pairs above threshold.
"""
if not self._check_embeddings_available():
logger.warning(
"SIMILAR_TO edge builder: No embeddings found in kg_nodes. "
"Run embedding population first."
)
return

try:
import numpy as np
except ImportError:
logger.error("numpy required for similarity computation. Install: pip install numpy")
return

conn = self.connect_target()

# Get distinct node types that have embeddings
cursor = conn.execute("""
SELECT DISTINCT node_type FROM kg_nodes
WHERE embedding IS NOT NULL
""")
node_types = [row[0] for row in cursor.fetchall()]

logger.info(f"Computing similarity edges for {len(node_types)} node types...")
total_edges = 0

for node_type in node_types:
# Load all nodes with embeddings for this type
cursor = conn.execute("""
SELECT node_id, embedding FROM kg_nodes
WHERE node_type = ? AND embedding IS NOT NULL
""", (node_type,))
rows = cursor.fetchall()

if len(rows) < 2:
continue

node_ids = [r[0] for r in rows]
n_dims = len(rows[0][1]) // 4 # float32 = 4 bytes

# Build numpy matrix from embedding bytes
matrix = np.zeros((len(rows), n_dims), dtype=np.float32)
for i, (_, emb_bytes) in enumerate(rows):
matrix[i] = np.frombuffer(emb_bytes, dtype=np.float32)

# Normalize rows for cosine similarity (dot product of normalized = cosine)
norms = np.linalg.norm(matrix, axis=1, keepdims=True)
norms[norms == 0] = 1.0 # Avoid division by zero
matrix_norm = matrix / norms

# Compute pairwise cosine similarity
sim_matrix = matrix_norm @ matrix_norm.T

# Extract top-k pairs above threshold per node
for i in range(len(node_ids)):
scores = sim_matrix[i]
# Exclude self-similarity
scores[i] = 0.0

# Get indices above threshold, sorted descending
above_thresh = np.where(scores >= self.similarity_threshold)[0]
if len(above_thresh) == 0:
continue

# Sort by score descending, take top-k
sorted_idx = above_thresh[np.argsort(-scores[above_thresh])]
top_k = sorted_idx[:self.max_similar_per_node]

for j in top_k:
# Only yield one direction (i < j) to avoid duplicates
if i < j:
yield (
node_ids[i],
node_ids[j],
{
"similarity_score": float(scores[j]),
"model": "all-MiniLM-L6-v2",
"node_type": node_type,
}
)
total_edges += 1

logger.info(f" {node_type}: {len(rows)} nodes, computed similarities")

logger.info(f"Total SIMILAR_TO edges: {total_edges}")

def _compute_cosine_similarity(self, vec1: bytes, vec2: bytes) -> float:
"""
Compute cosine similarity between two embedding vectors.

Args:
vec1: First embedding as bytes (float32 array)
vec2: Second embedding as bytes (float32 array)

Returns:
Similarity score between 0.0 and 1.0
"""
n_dims = len(vec1) // 4
v1 = struct.unpack(f'{n_dims}f', vec1)
v2 = struct.unpack(f'{n_dims}f', vec2)

dot_product = sum(a * b for a, b in zip(v1, v2))
norm1 = sum(a * a for a in v1) ** 0.5
norm2 = sum(b * b for b in v2) ** 0.5

if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)