Skip to main content

scripts-decision-extractor

#!/usr/bin/env python3 """ CP-13: Decision Node Extractor (ADR-151)

Migrates decision entities from org.db decisions table:

  • node_type: 'decision'
  • Properties: decision_text, context, rationale, session_id, decision_type

Source: org.db decisions table Target: org.db kg_nodes table

Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.4.3 """

import json import logging from pathlib import Path from typing import Any, Dict, Generator, Optional, Tuple

from .base_extractor import SQLiteSourceExtractor

logger = logging.getLogger(name)

class DecisionExtractor(SQLiteSourceExtractor): """ Migrate decisions from org.db decisions table into kg_nodes. """

@property
def node_type(self) -> str:
return "decision"

def extract_entities(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract decisions from decisions table.

Yields:
Tuple of (node_id, name, subtype, properties, source_table, source_id)
"""
conn = self.connect_source()

cursor = conn.execute("""
SELECT
id,
message_id,
project_path,
decision_type,
decision,
rationale,
alternatives_considered,
confidence,
tags,
created_at,
tenant_id,
user_id,
team_id,
project_id
FROM decisions
ORDER BY created_at
""")

for row in cursor:
decision_id = row['id']
decision_text = row['decision'] or ""
decision_type = row['decision_type'] or "general"

# Generate node_id
node_id = self.generate_node_id(str(decision_id))

# Display name - truncate decision text
name = self._generate_name(decision_text, decision_type)

# Subtype is the decision type
subtype = self._normalize_decision_type(decision_type)

# Build properties
properties = {
"decision_text": decision_text,
"decision_type": decision_type,
"rationale": row['rationale'],
"confidence": row['confidence'],
"message_id": row['message_id'],
"project_path": row['project_path'],
"created_at": row['created_at'],
}

# Parse alternatives if present
if row['alternatives_considered']:
try:
alternatives = json.loads(row['alternatives_considered'])
properties['alternatives'] = alternatives
except json.JSONDecodeError:
properties['alternatives_raw'] = row['alternatives_considered']

# Parse tags if present
if row['tags']:
try:
tags = json.loads(row['tags'])
properties['tags'] = tags
except json.JSONDecodeError:
properties['tags_raw'] = row['tags']

# Include tenant/project info if present
if row['tenant_id']:
properties['tenant_id'] = row['tenant_id']
if row['project_id']:
properties['project_id'] = row['project_id']
if row['user_id']:
properties['user_id'] = row['user_id']

# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}

yield (
node_id,
name,
subtype,
properties,
"decisions",
str(decision_id),
)

def _generate_name(self, decision_text: str, decision_type: str) -> str:
"""
Generate display name from decision text.

Truncates to ~80 chars with type prefix.
"""
# Clean up text
text = decision_text.strip().replace('\n', ' ')

# Truncate
max_len = 80
if len(text) > max_len:
text = text[:max_len-3] + "..."

# Add type prefix if meaningful
type_prefix = self._normalize_decision_type(decision_type).capitalize()
return f"[{type_prefix}] {text}"

def _normalize_decision_type(self, decision_type: str) -> str:
"""
Normalize decision type for consistent subtype values.

Maps various decision types to standard categories.
"""
if not decision_type:
return "general"

type_lower = decision_type.lower()

# Map to standard categories
type_map = {
"architecture": "architecture",
"arch": "architecture",
"api": "api",
"database": "database",
"db": "database",
"security": "security",
"ui": "ui",
"frontend": "ui",
"backend": "backend",
"deployment": "deployment",
"testing": "testing",
"performance": "performance",
"design": "design",
"process": "process",
"tool": "tool",
"library": "tool",
"general": "general",
}

for key, value in type_map.items():
if key in type_lower:
return value

return "general"