Skip to main content

""" J.4.7.5: Cross-Table Correlation Query Module (ADR-149 Phase 1)

Provides cross-table correlation queries for /cxq command.

Usage: /cxq --correlate "messages,token_economics" --by session_id /cxq --correlate "decisions,messages" --by message_id /cxq --correlate "tool_analytics,token_economics" --by session_id,task_id

Supported Tables: sessions.db (Tier 3): - messages: Session messages - token_economics: Token usage and costs - tool_analytics: Tool execution metrics - message_component_invocations: Component usage per message

org.db (Tier 2):
- decisions: Architectural decisions
- skill_learnings: Skill effectiveness records
- error_solutions: Error-solution pairs

ADR References: - ADR-149: Query Language Evolution Strategy (Phase 1) - ADR-118: Four-Tier Database Architecture

Created: 2026-02-04 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.4.7.5 """

from dataclasses import dataclass, field from typing import List, Dict, Any, Optional, Tuple, Set import sqlite3 from pathlib import Path

Valid tables and their databases

TABLE_DATABASE_MAP = { # sessions.db (Tier 3) "messages": "sessions", "token_economics": "sessions", "tool_analytics": "sessions", "message_component_invocations": "sessions", "activity_feed": "sessions", "session_insights": "sessions", # org.db (Tier 2) "decisions": "org", "skill_learnings": "org", "error_solutions": "org", }

Valid join keys per table

TABLE_JOIN_KEYS = { "messages": {"session_id", "id"}, "token_economics": {"session_id", "task_id", "entry_id"}, "tool_analytics": {"session_id", "task_id", "entry_id"}, "message_component_invocations": {"message_id", "session_id"}, "activity_feed": {"session_id"}, "session_insights": {"session_id"}, "decisions": {"message_id", "id"}, "skill_learnings": {"session_id", "id"}, "error_solutions": {"id"}, }

Column subsets for display (most useful columns)

TABLE_DISPLAY_COLUMNS = { "messages": ["id", "session_id", "role", "content", "timestamp"], "token_economics": ["id", "session_id", "model_name", "token_input", "token_output", "cost_total_usd", "task_id"], "tool_analytics": ["id", "session_id", "tool_name", "status", "execution_time_ms", "task_id"], "message_component_invocations": ["id", "message_id", "session_id", "component_name", "component_type"], "decisions": ["id", "message_id", "decision_type", "decision", "confidence", "created_at"], "skill_learnings": ["id", "session_id", "skill_name", "outcome", "effectiveness_score"], "error_solutions": ["id", "error_type", "error_message", "solution"], }

Predefined correlation shortcuts

CORRELATION_SHORTCUTS = { "cost-by-session": { "tables": ["messages", "token_economics"], "join_key": "session_id", "description": "Message content with associated token costs", }, "tool-performance": { "tables": ["tool_analytics", "token_economics"], "join_key": "session_id", "description": "Tool execution metrics with token costs", }, "decisions-context": { "tables": ["decisions", "messages"], "join_key": "message_id", "description": "Decisions with their source message context", }, "skill-sessions": { "tables": ["skill_learnings", "messages"], "join_key": "session_id", "description": "Skill learnings with session messages", }, }

@dataclass class CorrelationQuery: """Parsed correlation query specification."""

tables: List[str]
join_keys: List[str]
raw: str
shortcut: Optional[str] = None

def validate(self) -> Tuple[bool, Optional[str]]:
"""Validate the correlation query.

Returns:
Tuple of (is_valid, error_message)
"""
# Check table count
if len(self.tables) < 2:
return False, "Correlation requires at least 2 tables"
if len(self.tables) > 4:
return False, "Correlation supports maximum 4 tables"

# Check tables exist
for table in self.tables:
if table not in TABLE_DATABASE_MAP:
valid_tables = ", ".join(sorted(TABLE_DATABASE_MAP.keys()))
return False, f"Unknown table '{table}'. Valid tables: {valid_tables}"

# Check join keys are valid for all tables
for join_key in self.join_keys:
for table in self.tables:
if join_key not in TABLE_JOIN_KEYS.get(table, set()):
valid_keys = ", ".join(sorted(TABLE_JOIN_KEYS.get(table, set())))
return False, f"Join key '{join_key}' not valid for table '{table}'. Valid keys: {valid_keys}"

# Check all tables can be joined (have common database or cross-db support)
databases = {TABLE_DATABASE_MAP[t] for t in self.tables}
if len(databases) > 1:
# Cross-database correlation - only allowed for specific combinations
cross_db_allowed = self._check_cross_db_allowed()
if not cross_db_allowed:
return False, f"Cross-database correlation between {databases} requires session_id join key"

return True, None

def _check_cross_db_allowed(self) -> bool:
"""Check if cross-database correlation is allowed."""
# Cross-database joins are allowed if using session_id
# (since all tables have tenant isolation)
return "session_id" in self.join_keys

def get_databases(self) -> Set[str]:
"""Get the set of databases involved in this correlation."""
return {TABLE_DATABASE_MAP[t] for t in self.tables}

@dataclass class CorrelationResult: """Result of a correlation query."""

tables: List[str]
join_keys: List[str]
rows: List[Dict[str, Any]]
row_count: int
columns: List[str]
query_sql: str
execution_time_ms: float = 0.0

def parse_correlation(tables_spec: str, join_key_spec: str) -> CorrelationQuery: """Parse correlation specification.

Args:
tables_spec: Comma-separated table names or shortcut name
join_key_spec: Comma-separated join key names

Returns:
CorrelationQuery object

Raises:
ValueError: If specification is invalid
"""
# Check for shortcut
if tables_spec in CORRELATION_SHORTCUTS:
shortcut = CORRELATION_SHORTCUTS[tables_spec]
return CorrelationQuery(
tables=shortcut["tables"],
join_keys=[shortcut["join_key"]] if join_key_spec == "" else join_key_spec.split(","),
raw=tables_spec,
shortcut=tables_spec,
)

# Parse tables
tables = [t.strip().lower() for t in tables_spec.split(",") if t.strip()]
if not tables:
raise ValueError("No tables specified for correlation")

# Parse join keys
join_keys = [k.strip().lower() for k in join_key_spec.split(",") if k.strip()]
if not join_keys:
raise ValueError("No join key specified (use --by)")

query = CorrelationQuery(
tables=tables,
join_keys=join_keys,
raw=tables_spec,
)

# Validate
is_valid, error = query.validate()
if not is_valid:
raise ValueError(error)

return query

def execute_correlation( query: CorrelationQuery, sessions_conn: sqlite3.Connection, org_conn: Optional[sqlite3.Connection] = None, limit: int = 20, where_clause: Optional[str] = None, ) -> CorrelationResult: """Execute a correlation query.

Args:
query: Parsed correlation query
sessions_conn: Connection to sessions.db
org_conn: Connection to org.db (optional, for cross-db queries)
limit: Maximum rows to return
where_clause: Optional WHERE clause to filter results

Returns:
CorrelationResult with joined data
"""
import time
start_time = time.time()

databases = query.get_databases()

# For single-database queries, use SQL JOIN
if len(databases) == 1:
db = list(databases)[0]
conn = sessions_conn if db == "sessions" else org_conn
if conn is None:
raise ValueError(f"No connection provided for {db}.db")

result = _execute_single_db_correlation(query, conn, limit, where_clause)
else:
# Cross-database: use Python-side join
result = _execute_cross_db_correlation(query, sessions_conn, org_conn, limit, where_clause)

result.execution_time_ms = (time.time() - start_time) * 1000
return result

def _execute_single_db_correlation( query: CorrelationQuery, conn: sqlite3.Connection, limit: int, where_clause: Optional[str], ) -> CorrelationResult: """Execute correlation within a single database.

Uses subquery optimization to limit base table first before JOIN.
This is critical for performance with large tables (e.g., 32M+ rows).
"""

# Build SELECT columns with table prefixes
# Use "__" separator to avoid confusion with table names containing "_"
select_parts = []
all_columns = []
for i, table in enumerate(query.tables):
alias = f"t{i}"
columns = TABLE_DISPLAY_COLUMNS.get(table, ["*"])
for col in columns:
select_parts.append(f"{alias}.{col} AS \"{table}__{col}\"")
all_columns.append(f"{table}__{col}")

select_clause = ", ".join(select_parts)

# Build base table subquery with LIMIT to optimize JOIN
# This is critical for performance with large tables
base_table = query.tables[0]
base_columns = TABLE_DISPLAY_COLUMNS.get(base_table, ["*"])
base_col_list = ", ".join(base_columns)

# Apply where_clause to base table if provided
base_where = f"WHERE {where_clause}" if where_clause else ""

# Use subquery to limit base table BEFORE joining
# This prevents expensive full table scans on large tables
base_subquery = f"""
(SELECT {base_col_list}
FROM {base_table}
{base_where}
ORDER BY id DESC
LIMIT {limit * 5}) t0""" # Fetch 5x limit for LEFT JOIN coverage

# Build JOINs to other tables
join_clauses = ""
for i, table in enumerate(query.tables[1:], start=1):
alias = f"t{i}"
join_conditions = " AND ".join([
f"t0.{key} = {alias}.{key}" for key in query.join_keys
])
join_clauses += f"\nLEFT JOIN {table} {alias} ON {join_conditions}"

# Build full query with optimized base subquery
sql = f"""
SELECT {select_clause}
FROM {base_subquery}{join_clauses}
ORDER BY t0.id DESC
LIMIT {limit}
"""

cursor = conn.execute(sql)
rows = [dict(row) for row in cursor.fetchall()]

return CorrelationResult(
tables=query.tables,
join_keys=query.join_keys,
rows=rows,
row_count=len(rows),
columns=all_columns,
query_sql=sql.strip(),
)

def _execute_cross_db_correlation( query: CorrelationQuery, sessions_conn: sqlite3.Connection, org_conn: Optional[sqlite3.Connection], limit: int, where_clause: Optional[str], ) -> CorrelationResult: """Execute correlation across multiple databases using Python-side join."""

if org_conn is None:
raise ValueError("Cross-database correlation requires org.db connection")

# Separate tables by database
sessions_tables = [t for t in query.tables if TABLE_DATABASE_MAP[t] == "sessions"]
org_tables = [t for t in query.tables if TABLE_DATABASE_MAP[t] == "org"]

# Fetch data from each database
sessions_data = {}
org_data = {}

for table in sessions_tables:
columns = TABLE_DISPLAY_COLUMNS.get(table, ["*"])
col_list = ", ".join(columns)
sql = f"SELECT {col_list} FROM {table} ORDER BY id DESC LIMIT {limit * 10}"
cursor = sessions_conn.execute(sql)
sessions_data[table] = [dict(row) for row in cursor.fetchall()]

for table in org_tables:
columns = TABLE_DISPLAY_COLUMNS.get(table, ["*"])
col_list = ", ".join(columns)
sql = f"SELECT {col_list} FROM {table} ORDER BY id DESC LIMIT {limit * 10}"
cursor = org_conn.execute(sql)
org_data[table] = [dict(row) for row in cursor.fetchall()]

# Python-side join using join keys
all_data = {**sessions_data, **org_data}
base_table = query.tables[0]
result_rows = []

for base_row in all_data.get(base_table, []):
# Use "__" separator to avoid confusion with table names containing "_"
combined_row = {f"{base_table}__{k}": v for k, v in base_row.items()}

# Join with other tables
all_matched = True
for other_table in query.tables[1:]:
matched = False
for other_row in all_data.get(other_table, []):
# Check all join keys match
if all(base_row.get(key) == other_row.get(key) for key in query.join_keys):
combined_row.update({f"{other_table}__{k}": v for k, v in other_row.items()})
matched = True
break

if not matched:
# LEFT JOIN behavior - include NULLs
for col in TABLE_DISPLAY_COLUMNS.get(other_table, []):
combined_row[f"{other_table}__{col}"] = None

result_rows.append(combined_row)
if len(result_rows) >= limit:
break

# Build column list
all_columns = []
for table in query.tables:
for col in TABLE_DISPLAY_COLUMNS.get(table, []):
all_columns.append(f"{table}__{col}")

return CorrelationResult(
tables=query.tables,
join_keys=query.join_keys,
rows=result_rows,
row_count=len(result_rows),
columns=all_columns,
query_sql="[Cross-database Python-side join]",
)

def format_correlation_result( result: CorrelationResult, format_type: str = "text", max_content_length: int = 100, ) -> str: """Format correlation result for display.

Args:
result: Correlation query result
format_type: Output format (text, json, markdown)
max_content_length: Max length for content fields

Returns:
Formatted string
"""
import json

if format_type == "json":
return json.dumps({
"tables": result.tables,
"join_keys": result.join_keys,
"row_count": result.row_count,
"columns": result.columns,
"rows": result.rows,
"execution_time_ms": result.execution_time_ms,
}, indent=2, default=str)

lines = []

# Header
tables_str = " × ".join(result.tables)
keys_str = ", ".join(result.join_keys)
lines.append(f"=== Correlation: {tables_str} (by {keys_str}) ===")
lines.append(f"Rows: {result.row_count} | Execution: {result.execution_time_ms:.1f}ms")
lines.append("")

if not result.rows:
lines.append("No correlated data found.")
return "\n".join(lines)

if format_type == "markdown":
# Markdown table
lines.append("| " + " | ".join(result.columns) + " |")
lines.append("| " + " | ".join(["---"] * len(result.columns)) + " |")

for row in result.rows:
values = []
for col in result.columns:
val = row.get(col, "")
val_str = str(val) if val is not None else ""
if len(val_str) > max_content_length:
val_str = val_str[:max_content_length] + "..."
values.append(val_str.replace("|", "\\|").replace("\n", " "))
lines.append("| " + " | ".join(values) + " |")
else:
# Text format - show each row as a block
for i, row in enumerate(result.rows, 1):
lines.append(f"--- Row {i} ---")
for col in result.columns:
val = row.get(col, "")
val_str = str(val) if val is not None else "NULL"
if len(val_str) > max_content_length:
val_str = val_str[:max_content_length] + "..."
# Group by table - use "__" separator
if "__" in col:
table_name, col_name = col.split("__", 1)
else:
table_name = ""
col_name = col
lines.append(f" [{table_name}] {col_name}: {val_str}")
lines.append("")

return "\n".join(lines)

def format_correlation_help() -> str: """Generate help text for --correlate command.""" lines = [ "=== Cross-Table Correlation Help (--correlate) ===", "", "Correlate data across tables using common keys.", "", "Usage:", " /cxq --correlate "table1,table2" --by <join_key>", "", "Examples:", " /cxq --correlate "messages,token_economics" --by session_id", " /cxq --correlate "decisions,messages" --by message_id", " /cxq --correlate "tool_analytics,token_economics" --by session_id,task_id", "", "Shortcuts:", ]

for name, config in CORRELATION_SHORTCUTS.items():
tables = " × ".join(config["tables"])
lines.append(f" {name}: {tables}")
lines.append(f" {config['description']}")

lines.extend([
"",
"Valid Tables:",
])

for db in ["sessions", "org"]:
tables = [t for t, d in TABLE_DATABASE_MAP.items() if d == db]
lines.append(f" {db}.db: {', '.join(sorted(tables))}")

lines.extend([
"",
"Join Keys by Table:",
])

for table in sorted(TABLE_JOIN_KEYS.keys()):
keys = ", ".join(sorted(TABLE_JOIN_KEYS[table]))
lines.append(f" {table}: {keys}")

return "\n".join(lines)

Export public interface

all = [ "parse_correlation", "execute_correlation", "format_correlation_result", "format_correlation_help", "CorrelationQuery", "CorrelationResult", "CORRELATION_SHORTCUTS", "TABLE_DATABASE_MAP", "TABLE_JOIN_KEYS", ]