Skip to main content

#!/usr/bin/env python3 """ Base Extractor for ADR-151 Knowledge Graph Entity Population

Provides common functionality for all entity extractors:

  • Database connection management
  • Batch upsert operations
  • Logging and progress tracking
  • Dry-run support

Created: 2026-02-03 Track: J (Memory Intelligence) Task: CP-07 through CP-15 """

import json import logging import sqlite3 from abc import ABC, abstractmethod from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Tuple

Configure logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)

class BaseExtractor(ABC): """ Abstract base class for all KG entity extractors.

Subclasses must implement:
- node_type: The type of node being extracted
- extract_entities(): Generator yielding (node_id, name, subtype, properties, source_table, source_id)
"""

BATCH_SIZE = 500

def __init__(
self,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
project_id: Optional[str] = None,
):
"""
Initialize the extractor.

Args:
target_db_path: Path to org.db containing kg_nodes
dry_run: If True, don't write to database
tenant_id: Optional tenant ID for multi-tenant isolation
project_id: Optional project ID
"""
self.target_db_path = target_db_path
self.dry_run = dry_run
self.tenant_id = tenant_id
self.project_id = project_id
self._target_conn: Optional[sqlite3.Connection] = None

# Statistics
self.stats = {
"total_extracted": 0,
"inserted": 0,
"updated": 0,
"skipped": 0,
"errors": 0,
}

@property
@abstractmethod
def node_type(self) -> str:
"""The node_type value for extracted entities."""
pass

@abstractmethod
def extract_entities(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract entities from source.

Yields:
Tuple of (node_id, name, subtype, properties, source_table, source_id)
"""
pass

def connect_target(self) -> sqlite3.Connection:
"""Get or create connection to target database."""
if self._target_conn is None:
self._target_conn = sqlite3.connect(str(self.target_db_path))
self._target_conn.row_factory = sqlite3.Row
self._target_conn.execute("PRAGMA foreign_keys = ON;")
return self._target_conn

def close(self):
"""Close database connections."""
if self._target_conn:
self._target_conn.close()
self._target_conn = None

def generate_node_id(self, source_id: str) -> str:
"""
Generate deterministic node ID.

Format: {node_type}:{source_id}
Examples:
component:agent/senior-architect
function:scripts/core/paths.py:get_org_db_path
decision:42
"""
return f"{self.node_type}:{source_id}"

def upsert_node(
self,
conn: sqlite3.Connection,
node_id: str,
name: str,
subtype: Optional[str],
properties: Dict[str, Any],
source_table: Optional[str] = None,
source_id: Optional[str] = None,
) -> bool:
"""
Insert or update a node in kg_nodes.

Uses UPSERT pattern for idempotency.

Returns:
True if node was inserted/updated, False if error
"""
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
properties_json = json.dumps(properties, ensure_ascii=False) if properties else None

try:
conn.execute("""
INSERT INTO kg_nodes (
id, node_type, subtype, name, properties,
tenant_id, project_id, source_table, source_id,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
subtype = excluded.subtype,
properties = excluded.properties,
updated_at = excluded.updated_at
""", (
node_id, self.node_type, subtype, name, properties_json,
self.tenant_id, self.project_id, source_table, source_id,
now, now
))
return True
except sqlite3.Error as e:
logger.error(f"Error upserting node {node_id}: {e}")
return False

def batch_upsert(
self,
conn: sqlite3.Connection,
entities: List[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]]],
) -> Tuple[int, int]:
"""
Batch upsert entities.

Args:
conn: Database connection
entities: List of (node_id, name, subtype, properties, source_table, source_id)

Returns:
Tuple of (success_count, error_count)
"""
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
success = 0
errors = 0

for node_id, name, subtype, properties, source_table, source_id in entities:
properties_json = json.dumps(properties, ensure_ascii=False) if properties else None
try:
conn.execute("""
INSERT INTO kg_nodes (
id, node_type, subtype, name, properties,
tenant_id, project_id, source_table, source_id,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
subtype = excluded.subtype,
properties = excluded.properties,
updated_at = excluded.updated_at
""", (
node_id, self.node_type, subtype, name, properties_json,
self.tenant_id, self.project_id, source_table, source_id,
now, now
))
success += 1
except sqlite3.Error as e:
logger.debug(f"Error upserting {node_id}: {e}")
errors += 1

return success, errors

def run(self) -> Dict[str, int]:
"""
Run the extraction process.

Returns:
Statistics dictionary
"""
logger.info(f"Starting {self.node_type} extraction...")
logger.info(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}")

conn = self.connect_target()
batch: List[Tuple] = []

try:
for entity in self.extract_entities():
self.stats["total_extracted"] += 1

if self.dry_run:
self.stats["skipped"] += 1
if self.stats["total_extracted"] <= 5:
logger.info(f"[DRY RUN] Would upsert: {entity[0]}")
continue

batch.append(entity)

if len(batch) >= self.BATCH_SIZE:
success, errors = self.batch_upsert(conn, batch)
conn.commit()
self.stats["inserted"] += success
self.stats["errors"] += errors
batch = []

if self.stats["total_extracted"] % 1000 == 0:
logger.info(f"Progress: {self.stats['total_extracted']} entities processed")

# Process remaining batch
if batch and not self.dry_run:
success, errors = self.batch_upsert(conn, batch)
conn.commit()
self.stats["inserted"] += success
self.stats["errors"] += errors

logger.info(f"Extraction complete: {self.stats}")
return self.stats

finally:
self.close()

def get_node_count(self) -> int:
"""Get current count of nodes of this type in kg_nodes."""
try:
conn = self.connect_target()
cursor = conn.execute(
"SELECT COUNT(*) FROM kg_nodes WHERE node_type = ?",
(self.node_type,)
)
return cursor.fetchone()[0]
except sqlite3.Error:
return 0

class SQLiteSourceExtractor(BaseExtractor): """ Base class for extractors that read from a SQLite source database. """

def __init__(
self,
source_db_path: Path,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
project_id: Optional[str] = None,
):
super().__init__(target_db_path, dry_run, tenant_id, project_id)
self.source_db_path = source_db_path
self._source_conn: Optional[sqlite3.Connection] = None

def connect_source(self) -> sqlite3.Connection:
"""Get or create connection to source database."""
if self._source_conn is None:
self._source_conn = sqlite3.connect(str(self.source_db_path))
self._source_conn.row_factory = sqlite3.Row
return self._source_conn

def close(self):
"""Close all database connections."""
super().close()
if self._source_conn:
self._source_conn.close()
self._source_conn = None