Skip to main content

scripts-session-extractor

#!/usr/bin/env python3 """ CP-12: Session Node Extractor (ADR-151)

Extracts session entities from messages table in sessions.db:

  • node_type: 'session'
  • Properties: session_id, message_count, start_time, end_time, llm_provider

Derives session nodes from distinct session_id values in messages.

Source: sessions.db messages table Target: org.db kg_nodes table

Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.4.2 """

import logging from pathlib import Path from typing import Any, Dict, Generator, Optional, Tuple

from .base_extractor import SQLiteSourceExtractor

logger = logging.getLogger(name)

class SessionExtractor(SQLiteSourceExtractor): """ Extract session entities from messages table into kg_nodes.

Sessions are derived from distinct session_id values in messages.
"""

@property
def node_type(self) -> str:
return "session"

def extract_entities(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract sessions from messages table.

Yields:
Tuple of (node_id, name, subtype, properties, source_table, source_id)
"""
conn = self.connect_source()

# Check if messages table exists and has session_id
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='messages'
""")
if not cursor.fetchone():
logger.warning("messages table not found in sessions.db")
return

# Get distinct sessions with aggregated stats
cursor = conn.execute("""
SELECT
session_id,
COUNT(*) as message_count,
MIN(timestamp) as start_time,
MAX(timestamp) as end_time,
COUNT(DISTINCT CASE WHEN role = 'user' THEN 1 END) as user_message_count,
COUNT(DISTINCT CASE WHEN role = 'assistant' THEN 1 END) as assistant_message_count
FROM messages
WHERE session_id IS NOT NULL AND session_id != ''
GROUP BY session_id
ORDER BY MIN(timestamp)
""")

for row in cursor:
session_id = row['session_id']
message_count = row['message_count']
start_time = row['start_time']
end_time = row['end_time']

# Generate node_id
node_id = self.generate_node_id(session_id)

# Display name - include date if available
if start_time:
# Parse date from timestamp
date_part = start_time[:10] if len(start_time) >= 10 else "unknown"
name = f"Session {session_id[:8]}... ({date_part})"
else:
name = f"Session {session_id[:8]}..."

# Determine subtype from provider (try to detect from content or metadata)
subtype = self._detect_llm_provider(session_id, conn)

# Build properties
properties = {
"session_id": session_id,
"message_count": message_count,
"start_time": start_time,
"end_time": end_time,
"user_message_count": row['user_message_count'],
"assistant_message_count": row['assistant_message_count'],
}

# Calculate duration if both times available
if start_time and end_time:
try:
from datetime import datetime
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
duration_seconds = (end_dt - start_dt).total_seconds()
properties['duration_seconds'] = duration_seconds
properties['duration_minutes'] = round(duration_seconds / 60, 1)
except (ValueError, TypeError):
pass

# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}

yield (
node_id,
name,
subtype,
properties,
"messages",
session_id,
)

def _detect_llm_provider(self, session_id: str, conn) -> str:
"""
Detect LLM provider from session metadata or content.

Returns: claude, codex, gemini, kimi, or unknown
"""
# Check if there's a session_metadata or similar table
try:
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='session_metadata'
""")
if cursor.fetchone():
cursor = conn.execute(
"SELECT llm_provider FROM session_metadata WHERE session_id = ?",
(session_id,)
)
row = cursor.fetchone()
if row and row[0]:
return row[0].lower()
except Exception:
pass

# Check for activity_feed or agent_sessions
try:
cursor = conn.execute("""
SELECT metadata FROM agent_sessions WHERE id = ? LIMIT 1
""", (session_id,))
row = cursor.fetchone()
if row and row[0]:
import json
try:
meta = json.loads(row[0])
if 'llm_provider' in meta:
return meta['llm_provider'].lower()
except json.JSONDecodeError:
pass
except Exception:
pass

# Default to claude (most common)
return "claude"