Skip to main content

#!/usr/bin/env python3 """ Base Edge Builder for ADR-151 Knowledge Graph Edge Population (Phase 3)

Provides common functionality for all edge builders:

  • Database connection management
  • Batch upsert operations for edges
  • Node existence validation
  • Logging and progress tracking
  • Dry-run support

Edge ID Format: {edge_type}:{from_node_id}:{to_node_id} This ensures deterministic IDs for idempotent upserts.

Created: 2026-02-03 Track: J (Memory Intelligence) Task: CP-16 through CP-26 """

import json import logging import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Set, Tuple

Configure logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)

class BaseEdgeBuilder(ABC): """ Abstract base class for all KG edge builders.

Subclasses must implement:
- edge_type: The type of edge being built
- extract_edges(): Generator yielding edge tuples
"""

BATCH_SIZE = 1000

def __init__(
self,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
"""
Initialize the edge builder.

Args:
target_db_path: Path to org.db containing kg_nodes and kg_edges
dry_run: If True, don't write to database
tenant_id: Optional tenant ID for multi-tenant isolation
validate_nodes: If True, verify nodes exist before creating edges
"""
self.target_db_path = target_db_path
self.dry_run = dry_run
self.tenant_id = tenant_id
self.validate_nodes = validate_nodes
self._target_conn: Optional[sqlite3.Connection] = None
self._existing_nodes: Optional[Set[str]] = None

# Statistics
self.stats = {
"total_extracted": 0,
"inserted": 0,
"updated": 0,
"skipped_missing_from": 0,
"skipped_missing_to": 0,
"skipped_other": 0,
"errors": 0,
}

@property
@abstractmethod
def edge_type(self) -> str:
"""The edge_type value for edges built by this builder."""
pass

@abstractmethod
def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract edges from source.

Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
pass

def connect_target(self) -> sqlite3.Connection:
"""Get or create connection to target database."""
if self._target_conn is None:
self._target_conn = sqlite3.connect(str(self.target_db_path))
self._target_conn.row_factory = sqlite3.Row
self._target_conn.execute("PRAGMA foreign_keys = ON;")
return self._target_conn

def close(self):
"""Close database connections."""
if self._target_conn:
self._target_conn.close()
self._target_conn = None

def generate_edge_id(self, from_node: str, to_node: str) -> str:
"""
Generate deterministic edge ID.

Format: {edge_type}:{from_node}:{to_node}
This ensures idempotency for upserts.
"""
return f"{self.edge_type}:{from_node}:{to_node}"

def load_existing_nodes(self) -> Set[str]:
"""Load all existing node IDs from kg_nodes for validation."""
if self._existing_nodes is not None:
return self._existing_nodes

conn = self.connect_target()
try:
cursor = conn.execute("SELECT id FROM kg_nodes")
self._existing_nodes = {row[0] for row in cursor}
logger.info(f"Loaded {len(self._existing_nodes)} existing nodes for validation")
return self._existing_nodes
except sqlite3.OperationalError as e:
logger.warning(f"Could not load existing nodes: {e}")
self._existing_nodes = set()
return self._existing_nodes

def node_exists(self, node_id: str) -> bool:
"""Check if a node exists in kg_nodes."""
if not self.validate_nodes:
return True
nodes = self.load_existing_nodes()
return node_id in nodes

def upsert_edge(
self,
conn: sqlite3.Connection,
from_node: str,
to_node: str,
properties: Dict[str, Any],
) -> bool:
"""
Insert or update an edge in kg_edges.

Uses UPSERT pattern with ON CONFLICT on UNIQUE(from_node, to_node, edge_type).

Returns:
True if edge was inserted/updated, False if error
"""
edge_id = self.generate_edge_id(from_node, to_node)
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
properties_json = json.dumps(properties, ensure_ascii=False) if properties else None

try:
conn.execute("""
INSERT INTO kg_edges (
id, edge_type, from_node, to_node, properties,
tenant_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(from_node, to_node, edge_type) DO UPDATE SET
properties = excluded.properties
""", (
edge_id, self.edge_type, from_node, to_node,
properties_json, self.tenant_id, now
))
return True
except sqlite3.Error as e:
logger.error(f"Error upserting edge {edge_id}: {e}")
return False

def batch_upsert(
self,
conn: sqlite3.Connection,
edges: List[Tuple[str, str, Dict[str, Any]]],
) -> Tuple[int, int]:
"""
Batch upsert edges.

Args:
conn: Database connection
edges: List of (from_node, to_node, properties)

Returns:
Tuple of (success_count, error_count)
"""
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
success = 0
errors = 0

for from_node, to_node, properties in edges:
edge_id = self.generate_edge_id(from_node, to_node)
properties_json = json.dumps(properties, ensure_ascii=False) if properties else None

try:
conn.execute("""
INSERT INTO kg_edges (
id, edge_type, from_node, to_node, properties,
tenant_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(from_node, to_node, edge_type) DO UPDATE SET
properties = excluded.properties
""", (
edge_id, self.edge_type, from_node, to_node,
properties_json, self.tenant_id, now
))
success += 1
except sqlite3.Error as e:
logger.debug(f"Error upserting edge {edge_id}: {e}")
errors += 1

return success, errors

def run(self) -> Dict[str, int]:
"""
Run the edge building process.

Returns:
Statistics dictionary
"""
logger.info(f"Starting {self.edge_type} edge building...")
logger.info(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}")

conn = self.connect_target()
batch: List[Tuple] = []

# Pre-load existing nodes if validation enabled
if self.validate_nodes:
self.load_existing_nodes()

try:
for from_node, to_node, properties in self.extract_edges():
self.stats["total_extracted"] += 1

# Validate nodes exist
if self.validate_nodes:
if not self.node_exists(from_node):
self.stats["skipped_missing_from"] += 1
if self.stats["skipped_missing_from"] <= 5:
logger.debug(f"Skipped: from_node not found: {from_node}")
continue
if not self.node_exists(to_node):
self.stats["skipped_missing_to"] += 1
if self.stats["skipped_missing_to"] <= 5:
logger.debug(f"Skipped: to_node not found: {to_node}")
continue

if self.dry_run:
self.stats["skipped_other"] += 1
if self.stats["total_extracted"] <= 5:
logger.info(f"[DRY RUN] Would create: {self.edge_type}: {from_node} -> {to_node}")
continue

batch.append((from_node, to_node, properties))

if len(batch) >= self.BATCH_SIZE:
success, errors = self.batch_upsert(conn, batch)
conn.commit()
self.stats["inserted"] += success
self.stats["errors"] += errors
batch = []

if self.stats["total_extracted"] % 10000 == 0:
logger.info(f"Progress: {self.stats['total_extracted']} edges processed")

# Process remaining batch
if batch and not self.dry_run:
success, errors = self.batch_upsert(conn, batch)
conn.commit()
self.stats["inserted"] += success
self.stats["errors"] += errors

logger.info(f"Edge building complete: {self.stats}")
return self.stats

finally:
self.close()

def get_edge_count(self) -> int:
"""Get current count of edges of this type in kg_edges."""
try:
conn = self.connect_target()
cursor = conn.execute(
"SELECT COUNT(*) FROM kg_edges WHERE edge_type = ?",
(self.edge_type,)
)
return cursor.fetchone()[0]
except sqlite3.Error:
return 0

class SQLiteSourceEdgeBuilder(BaseEdgeBuilder): """ Base class for edge builders that read from a SQLite source database. """

def __init__(
self,
source_db_path: Path,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
super().__init__(target_db_path, dry_run, tenant_id, validate_nodes)
self.source_db_path = source_db_path
self._source_conn: Optional[sqlite3.Connection] = None

def connect_source(self) -> sqlite3.Connection:
"""Get or create connection to source database."""
if self._source_conn is None:
self._source_conn = sqlite3.connect(str(self.source_db_path))
self._source_conn.row_factory = sqlite3.Row
return self._source_conn

def close(self):
"""Close all database connections."""
super().close()
if self._source_conn:
self._source_conn.close()
self._source_conn = None