scripts-produces-edge-builder
#!/usr/bin/env python3 """ CP-18: PRODUCES Edge Builder (ADR-151)
Creates PRODUCES edges from Session nodes to Decision nodes.
Edge: session:X -> decision:Y Source: org.db decisions table (via message_id -> messages.session_id) Properties: timestamp, decision_type
Establishes lineage from sessions to the decisions made during them.
Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.5.2 """
import logging from pathlib import Path from typing import Any, Dict, Generator, Optional, Tuple
from .base_edge_builder import SQLiteSourceEdgeBuilder
logger = logging.getLogger(name)
class ProducesEdgeBuilder(SQLiteSourceEdgeBuilder): """ Build PRODUCES edges linking sessions to decisions.
Decisions are linked to sessions through:
1. decisions.message_id -> messages.session_id (if message_id exists)
2. Decision properties may contain session_id directly
Since we're working across databases (org.db and sessions.db),
we need to handle cross-database lookups.
"""
def __init__(
self,
org_db_path: Path,
sessions_db_path: Path,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
validate_nodes: bool = True,
):
"""
Initialize with paths to both org.db and sessions.db.
Args:
org_db_path: Path to org.db containing decisions
sessions_db_path: Path to sessions.db containing messages
target_db_path: Path to org.db for kg_edges (same as org_db_path)
dry_run: If True, don't write to database
tenant_id: Optional tenant ID
validate_nodes: If True, verify nodes exist
"""
super().__init__(org_db_path, target_db_path, dry_run, tenant_id, validate_nodes)
self.sessions_db_path = sessions_db_path
self._sessions_conn = None
self._message_to_session: Optional[Dict[int, str]] = None
@property
def edge_type(self) -> str:
return "PRODUCES"
def connect_sessions(self):
"""Get connection to sessions.db."""
if self._sessions_conn is None:
import sqlite3
self._sessions_conn = sqlite3.connect(str(self.sessions_db_path))
self._sessions_conn.row_factory = sqlite3.Row
return self._sessions_conn
def _build_message_session_map(self):
"""Build mapping from message_id to session_id."""
if self._message_to_session is not None:
return
self._message_to_session = {}
try:
sessions_conn = self.connect_sessions()
cursor = sessions_conn.execute("""
SELECT id, session_id
FROM messages
WHERE session_id IS NOT NULL AND session_id != ''
""")
for row in cursor:
self._message_to_session[row['id']] = row['session_id']
logger.info(f"Built message->session map with {len(self._message_to_session)} entries")
except Exception as e:
logger.warning(f"Could not build message-session map: {e}")
self._message_to_session = {}
def close(self):
"""Close all database connections."""
super().close()
if self._sessions_conn:
self._sessions_conn.close()
self._sessions_conn = None
def extract_edges(self) -> Generator[Tuple[str, str, Dict[str, Any]], None, None]:
"""
Extract PRODUCES edges from decisions table.
Links decisions to their originating session via message_id.
Yields:
Tuple of (from_node_id, to_node_id, properties)
"""
self._build_message_session_map()
source_conn = self.connect_source()
try:
cursor = source_conn.execute("""
SELECT
id,
message_id,
decision_type,
created_at
FROM decisions
ORDER BY id
""")
for row in cursor:
decision_id = row['id']
message_id = row['message_id']
# Skip if no message_id link
if message_id is None:
continue
# Resolve message_id to session_id
session_id = self._message_to_session.get(message_id)
if not session_id:
continue
# Generate node IDs
from_node = f"session:{session_id}"
to_node = f"decision:{decision_id}"
# Build properties
properties = {
'decision_type': row['decision_type'],
'created_at': row['created_at'],
'message_id': message_id,
}
# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}
yield (from_node, to_node, properties)
except Exception as e:
logger.error(f"Error extracting PRODUCES edges: {e}")
return