scripts-audit-event-extractor
#!/usr/bin/env python3 """ OPT-2: AuditEvent Node Extractor (ADR-151)
Extracts compliance-relevant audit events from sessions.db into kg_nodes.
Subtypes:
- tool_error: Aggregated tool failures per (session_id, tool_name, error_type)
- session_boundary: Session compaction/lifecycle events from system_events
- task_completion: Task tracking outcomes from task_tracking
Source: sessions.db (tool_analytics, system_events, task_tracking) Target: org.db kg_nodes table
Created: 2026-02-05 Author: Claude (Opus 4.6) Track: J (Memory Intelligence) Task: J.3.4.11 """
import logging from pathlib import Path from typing import Any, Dict, Generator, Optional, Tuple
from .base_extractor import SQLiteSourceExtractor
logger = logging.getLogger(name)
class AuditEventExtractor(SQLiteSourceExtractor): """ Extract compliance-relevant audit events from sessions.db.
Aggregates tool_analytics errors (17M+ rows) into manageable node counts
and extracts system events and task outcomes for compliance tracking.
"""
@property
def node_type(self) -> str:
return "audit_event"
def extract_entities(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract audit events from multiple sessions.db tables.
Yields:
Tuple of (node_id, name, subtype, properties, source_table, source_id)
"""
# Extract from all three sources
yield from self._extract_tool_errors()
yield from self._extract_session_boundaries()
yield from self._extract_task_completions()
def _extract_tool_errors(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract aggregated tool error events from tool_analytics.
Aggregates by (session_id, tool_name, error_type) to reduce
17M+ rows into manageable audit nodes.
"""
conn = self.connect_source()
try:
cursor = conn.execute("""
SELECT
session_id,
tool_name,
COALESCE(error_type, 'unknown') as error_type,
COUNT(*) as error_count,
MIN(created_at) as first_occurrence,
MAX(created_at) as last_occurrence,
GROUP_CONCAT(DISTINCT COALESCE(agent_name, '')) as agents,
GROUP_CONCAT(DISTINCT COALESCE(task_id, '')) as task_ids,
AVG(execution_time_ms) as avg_execution_ms
FROM tool_analytics
WHERE status = 'error'
GROUP BY session_id, tool_name, error_type
HAVING error_count >= 1
ORDER BY last_occurrence DESC
""")
except Exception as e:
logger.warning(f"tool_analytics query failed (table may not exist): {e}")
return
for row in cursor:
session_id = row['session_id'] or "unknown"
tool_name = row['tool_name'] or "unknown"
error_type = row['error_type']
error_count = row['error_count']
# Deterministic source ID for aggregation
source_id = f"{session_id}:{tool_name}:{error_type}"
node_id = self.generate_node_id(f"err:{source_id}")
# Clean agent and task lists (GROUP_CONCAT uses comma by default)
agents = [a for a in (row['agents'] or "").split(",") if a]
task_ids = [t for t in (row['task_ids'] or "").split(",") if t]
name = f"[ToolError] {tool_name} ({error_type}) x{error_count}"
if len(name) > 80:
name = name[:77] + "..."
properties = {
"session_id": session_id,
"tool_name": tool_name,
"error_type": error_type,
"error_count": error_count,
"first_occurrence": row['first_occurrence'],
"last_occurrence": row['last_occurrence'],
"avg_execution_ms": round(row['avg_execution_ms'], 1) if row['avg_execution_ms'] else None,
}
if agents:
properties["agents"] = agents[:10] # Cap at 10
if task_ids:
properties["task_ids"] = task_ids[:10]
# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}
yield (
node_id,
name,
"tool_error",
properties,
"tool_analytics",
source_id,
)
def _extract_session_boundaries(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract session boundary/lifecycle events from system_events.
These track when sessions are compacted, commands executed, etc.
"""
conn = self.connect_source()
try:
cursor = conn.execute("""
SELECT
id,
entry_id,
subtype,
level,
content,
compact_trigger
FROM system_events
ORDER BY id
""")
except Exception as e:
logger.warning(f"system_events query failed (table may not exist): {e}")
return
for row in cursor:
event_id = row['id']
subtype = row['subtype'] or "unknown"
content = row['content'] or ""
node_id = self.generate_node_id(f"sys:{event_id}")
# Generate descriptive name
name = self._generate_boundary_name(subtype, content, row['compact_trigger'])
properties = {
"entry_id": row['entry_id'],
"event_subtype": subtype,
"level": row['level'],
}
if content:
# Truncate content for storage (keep first 500 chars)
properties["content"] = content[:500] if len(content) > 500 else content
if row['compact_trigger']:
properties["compact_trigger"] = row['compact_trigger']
# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}
yield (
node_id,
name,
"session_boundary",
properties,
"system_events",
str(event_id),
)
def _extract_task_completions(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract task completion/failure events from task_tracking.
Each tracked task becomes an audit node capturing outcome and metrics.
"""
conn = self.connect_source()
try:
cursor = conn.execute("""
SELECT
id,
task_id,
task_description,
source,
status,
outcome,
tool_success_count,
tool_error_count,
created_at,
updated_at
FROM task_tracking
ORDER BY created_at
""")
except Exception as e:
logger.warning(f"task_tracking query failed (table may not exist): {e}")
return
for row in cursor:
tracking_id = row['id']
task_id = row['task_id'] or f"task-{tracking_id}"
description = row['task_description'] or ""
status = row['status'] or "unknown"
outcome = row['outcome'] or "unknown"
node_id = self.generate_node_id(f"task:{tracking_id}")
name = self._generate_task_name(task_id, description, outcome)
properties = {
"task_id": task_id,
"task_description": description[:200] if len(description) > 200 else description,
"source": row['source'],
"status": status,
"outcome": outcome,
"tool_success_count": row['tool_success_count'],
"tool_error_count": row['tool_error_count'],
"created_at": row['created_at'],
"updated_at": row['updated_at'],
}
# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}
yield (
node_id,
name,
"task_completion",
properties,
"task_tracking",
str(tracking_id),
)
def _generate_boundary_name(self, subtype: str, content: str, compact_trigger: Optional[str]) -> str:
"""Generate display name for session boundary events."""
if subtype == "compact_boundary":
trigger = compact_trigger or "unknown"
return f"[Boundary] Compaction ({trigger})"
elif subtype == "local_command":
cmd = content.strip()[:50] if content else "unknown"
return f"[Command] {cmd}"
else:
text = content.strip()[:50] if content else subtype
return f"[System] {text}"
def _generate_task_name(self, task_id: str, description: str, outcome: str) -> str:
"""Generate display name for task completion events."""
desc = description.strip().replace('\n', ' ')
if len(desc) > 60:
desc = desc[:57] + "..."
outcome_icon = "+" if outcome in ("success", "completed") else "-"
return f"[Task{outcome_icon}] {task_id}: {desc}"[:80]