Skip to main content

scripts-created-by-edge-builder

#!/usr/bin/env python3 """ CP-26: CREATED_BY Edge Builder (ADR-151)

Creates CREATED_BY edges from entities to their originating Session nodes.

Edge: decision:X -> session:Y error_solution:X -> session:Y skill_learning:X -> session:Y Source: session_id fields in org.db tables Properties: created_at

Establishes provenance by linking knowledge entities back to the sessions that created them.

Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.5.11 """

import json import logging from pathlib import Path from typing import Any, Dict, Generator, Optional, Set, Tuple

from .base_edge_builder import SQLiteSourceEdgeBuilder

logger = logging.getLogger(name)

class CreatedByEdgeBuilder(SQLiteSourceEdgeBuilder): """ Build CREATED_BY edges linking entities to their originating sessions.

Queries decision, error_solution, and skill_learning tables for
records that have session_id links.
"""

def __init__(
self,
org_db_path: Path,
sessions_db_path: Path,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
"""
Initialize edge builder.

Args:
org_db_path: Path to org.db containing decisions, etc.
sessions_db_path: Path to sessions.db containing messages
target_db_path: Path to org.db for kg_edges
dry_run: If True, don't write to database
tenant_id: Optional tenant ID
validate_nodes: If True, verify nodes exist
"""
super().__init__(org_db_path, target_db_path, dry_run, tenant_id, validate_nodes)
self.sessions_db_path = sessions_db_path
self._sessions_conn = None
self._message_to_session: Optional[Dict[int, str]] = None

@property
def edge_type(self) -> str:
return "CREATED_BY"

def connect_sessions(self):
"""Get connection to sessions.db."""
if self._sessions_conn is None:
import sqlite3
self._sessions_conn = sqlite3.connect(str(self.sessions_db_path))
self._sessions_conn.row_factory = sqlite3.Row
return self._sessions_conn

def _build_message_session_map(self):
"""Build mapping from message_id to session_id."""
if self._message_to_session is not None:
return

self._message_to_session = {}
try:
sessions_conn = self.connect_sessions()
cursor = sessions_conn.execute("""
SELECT id, session_id
FROM messages
WHERE session_id IS NOT NULL AND session_id != ''
""")
for row in cursor:
self._message_to_session[row['id']] = row['session_id']

logger.info(f"Built message->session map with {len(self._message_to_session)} entries")
except Exception as e:
logger.warning(f"Could not build message-session map: {e}")
self._message_to_session = {}

def close(self):
"""Close all database connections."""
super().close()
if self._sessions_conn:
self._sessions_conn.close()
self._sessions_conn = None

def _get_existing_sessions(self) -> Set[str]:
"""Get set of session node IDs that exist in kg_nodes."""
conn = self.connect_target()
try:
cursor = conn.execute("""
SELECT id FROM kg_nodes WHERE node_type = 'session'
""")
return {row[0] for row in cursor}
except Exception:
return set()

def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract CREATED_BY edges from org.db tables.

Links decisions, error_solutions, and skill_learnings to sessions.

Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
self._build_message_session_map()
existing_sessions = self._get_existing_sessions()

logger.info(f"Found {len(existing_sessions)} session nodes")

# Extract from decisions (via message_id)
yield from self._extract_decision_edges(existing_sessions)

# Extract from skill_learnings (direct session_id)
yield from self._extract_skill_learning_edges(existing_sessions)

# Note: error_solutions typically don't have direct session links
# but we can try to link via context if available

def _extract_decision_edges(
self,
existing_sessions: Set[str],
) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""Extract CREATED_BY edges for decisions."""
source_conn = self.connect_source()

try:
cursor = source_conn.execute("""
SELECT
id,
message_id,
created_at
FROM decisions
WHERE message_id IS NOT NULL
ORDER BY id
""")

for row in cursor:
decision_id = row['id']
message_id = row['message_id']

# Resolve message_id to session_id
session_id = self._message_to_session.get(message_id)
if not session_id:
continue

session_node = f"session:{session_id}"
if session_node not in existing_sessions:
continue

from_node = f"decision:{decision_id}"
to_node = session_node

properties = {
'created_at': row['created_at'],
'via': 'message_id',
}
properties = {k: v for k, v in properties.items() if v is not None}

yield (from_node, to_node, properties)

except Exception as e:
logger.error(f"Error extracting decision CREATED_BY edges: {e}")

def _extract_skill_learning_edges(
self,
existing_sessions: Set[str],
) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""Extract CREATED_BY edges for skill_learnings."""
source_conn = self.connect_source()

try:
cursor = source_conn.execute("""
SELECT
id,
session_id,
analyzed_at
FROM skill_learnings
WHERE session_id IS NOT NULL AND session_id != ''
ORDER BY id
""")

for row in cursor:
learning_id = row['id']
session_id = row['session_id']

session_node = f"session:{session_id}"
if session_node not in existing_sessions:
continue

from_node = f"skill_learning:{learning_id}"
to_node = session_node

properties = {
'created_at': row['analyzed_at'],
'via': 'direct_session_id',
}
properties = {k: v for k, v in properties.items() if v is not None}

yield (from_node, to_node, properties)

except Exception as e:
logger.error(f"Error extracting skill_learning CREATED_BY edges: {e}")