#!/usr/bin/env python3 """ Base Edge Builder for ADR-151 Knowledge Graph Edge Population (Phase 3)
Provides common functionality for all edge builders:
- Database connection management
- Batch upsert operations for edges
- Node existence validation
- Logging and progress tracking
- Dry-run support
Edge ID Format: {edge_type}:{from_node_id}:{to_node_id} This ensures deterministic IDs for idempotent upserts.
Created: 2026-02-03 Track: J (Memory Intelligence) Task: CP-16 through CP-26 """
import json import logging import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Set, Tuple
Configure logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)
class BaseEdgeBuilder(ABC): """ Abstract base class for all KG edge builders.
Subclasses must implement:
- edge_type: The type of edge being built
- extract_edges(): Generator yielding edge tuples
"""
BATCH_SIZE = 1000
def __init__(
self,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
"""
Initialize the edge builder.
Args:
target_db_path: Path to org.db containing kg_nodes and kg_edges
dry_run: If True, don't write to database
tenant_id: Optional tenant ID for multi-tenant isolation
validate_nodes: If True, verify nodes exist before creating edges
"""
self.target_db_path = target_db_path
self.dry_run = dry_run
self.tenant_id = tenant_id
self.validate_nodes = validate_nodes
self._target_conn: Optional[sqlite3.Connection] = None
self._existing_nodes: Optional[Set[str]] = None
# Statistics
self.stats = {
"total_extracted": 0,
"inserted": 0,
"updated": 0,
"skipped_missing_from": 0,
"skipped_missing_to": 0,
"skipped_other": 0,
"errors": 0,
}
@property
@abstractmethod
def edge_type(self) -> str:
"""The edge_type value for edges built by this builder."""
pass
@abstractmethod
def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract edges from source.
Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
pass
def connect_target(self) -> sqlite3.Connection:
"""Get or create connection to target database."""
if self._target_conn is None:
self._target_conn = sqlite3.connect(str(self.target_db_path))
self._target_conn.row_factory = sqlite3.Row
self._target_conn.execute("PRAGMA foreign_keys = ON;")
return self._target_conn
def close(self):
"""Close database connections."""
if self._target_conn:
self._target_conn.close()
self._target_conn = None
def generate_edge_id(self, from_node: str, to_node: str) -> str:
"""
Generate deterministic edge ID.
Format: {edge_type}:{from_node}:{to_node}
This ensures idempotency for upserts.
"""
return f"{self.edge_type}:{from_node}:{to_node}"
def load_existing_nodes(self) -> Set[str]:
"""Load all existing node IDs from kg_nodes for validation."""
if self._existing_nodes is not None:
return self._existing_nodes
conn = self.connect_target()
try:
cursor = conn.execute("SELECT id FROM kg_nodes")
self._existing_nodes = {row[0] for row in cursor}
logger.info(f"Loaded {len(self._existing_nodes)} existing nodes for validation")
return self._existing_nodes
except sqlite3.OperationalError as e:
logger.warning(f"Could not load existing nodes: {e}")
self._existing_nodes = set()
return self._existing_nodes
def node_exists(self, node_id: str) -> bool:
"""Check if a node exists in kg_nodes."""
if not self.validate_nodes:
return True
nodes = self.load_existing_nodes()
return node_id in nodes
def upsert_edge(
self,
conn: sqlite3.Connection,
from_node: str,
to_node: str,
properties: Dict[str, Any],
) -> bool:
"""
Insert or update an edge in kg_edges.
Uses UPSERT pattern with ON CONFLICT on UNIQUE(from_node, to_node, edge_type).
Returns:
True if edge was inserted/updated, False if error
"""
edge_id = self.generate_edge_id(from_node, to_node)
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
properties_json = json.dumps(properties, ensure_ascii=False) if properties else None
try:
conn.execute("""
INSERT INTO kg_edges (
id, edge_type, from_node, to_node, properties,
tenant_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(from_node, to_node, edge_type) DO UPDATE SET
properties = excluded.properties
""", (
edge_id, self.edge_type, from_node, to_node,
properties_json, self.tenant_id, now
))
return True
except sqlite3.Error as e:
logger.error(f"Error upserting edge {edge_id}: {e}")
return False
def batch_upsert(
self,
conn: sqlite3.Connection,
edges: List[Tuple[str, str, Dict[str, Any]]],
) -> Tuple[int, int]:
"""
Batch upsert edges.
Args:
conn: Database connection
edges: List of (from_node, to_node, properties)
Returns:
Tuple of (success_count, error_count)
"""
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
success = 0
errors = 0
for from_node, to_node, properties in edges:
edge_id = self.generate_edge_id(from_node, to_node)
properties_json = json.dumps(properties, ensure_ascii=False) if properties else None
try:
conn.execute("""
INSERT INTO kg_edges (
id, edge_type, from_node, to_node, properties,
tenant_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(from_node, to_node, edge_type) DO UPDATE SET
properties = excluded.properties
""", (
edge_id, self.edge_type, from_node, to_node,
properties_json, self.tenant_id, now
))
success += 1
except sqlite3.Error as e:
logger.debug(f"Error upserting edge {edge_id}: {e}")
errors += 1
return success, errors
def run(self) -> Dict[str, int]:
"""
Run the edge building process.
Returns:
Statistics dictionary
"""
logger.info(f"Starting {self.edge_type} edge building...")
logger.info(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}")
conn = self.connect_target()
batch: List[Tuple] = []
# Pre-load existing nodes if validation enabled
if self.validate_nodes:
self.load_existing_nodes()
try:
for from_node, to_node, properties in self.extract_edges():
self.stats["total_extracted"] += 1
# Validate nodes exist
if self.validate_nodes:
if not self.node_exists(from_node):
self.stats["skipped_missing_from"] += 1
if self.stats["skipped_missing_from"] <= 5:
logger.debug(f"Skipped: from_node not found: {from_node}")
continue
if not self.node_exists(to_node):
self.stats["skipped_missing_to"] += 1
if self.stats["skipped_missing_to"] <= 5:
logger.debug(f"Skipped: to_node not found: {to_node}")
continue
if self.dry_run:
self.stats["skipped_other"] += 1
if self.stats["total_extracted"] <= 5:
logger.info(f"[DRY RUN] Would create: {self.edge_type}: {from_node} -> {to_node}")
continue
batch.append((from_node, to_node, properties))
if len(batch) >= self.BATCH_SIZE:
success, errors = self.batch_upsert(conn, batch)
conn.commit()
self.stats["inserted"] += success
self.stats["errors"] += errors
batch = []
if self.stats["total_extracted"] % 10000 == 0:
logger.info(f"Progress: {self.stats['total_extracted']} edges processed")
# Process remaining batch
if batch and not self.dry_run:
success, errors = self.batch_upsert(conn, batch)
conn.commit()
self.stats["inserted"] += success
self.stats["errors"] += errors
logger.info(f"Edge building complete: {self.stats}")
return self.stats
finally:
self.close()
def get_edge_count(self) -> int:
"""Get current count of edges of this type in kg_edges."""
try:
conn = self.connect_target()
cursor = conn.execute(
"SELECT COUNT(*) FROM kg_edges WHERE edge_type = ?",
(self.edge_type,)
)
return cursor.fetchone()[0]
except sqlite3.Error:
return 0
class SQLiteSourceEdgeBuilder(BaseEdgeBuilder): """ Base class for edge builders that read from a SQLite source database. """
def __init__(
self,
source_db_path: Path,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
super().__init__(target_db_path, dry_run, tenant_id, validate_nodes)
self.source_db_path = source_db_path
self._source_conn: Optional[sqlite3.Connection] = None
def connect_source(self) -> sqlite3.Connection:
"""Get or create connection to source database."""
if self._source_conn is None:
self._source_conn = sqlite3.connect(str(self.source_db_path))
self._source_conn.row_factory = sqlite3.Row
return self._source_conn
def close(self):
"""Close all database connections."""
super().close()
if self._source_conn:
self._source_conn.close()
self._source_conn = None