Skip to main content

#!/usr/bin/env python3 """ Trajectory Extractor - Extract tool calls from session files into sessions.db (ADR-118)

Part of /cx pipeline. Replaces real-time trajectory_logger_hook.py with batch processing. Writes to tool_analytics table with hash-based deduplication.

Usage: # Called automatically by /cx python3 trajectory_extractor.py

# Process specific session file
python3 trajectory_extractor.py --file path/to/session.jsonl

# Export to JSONL (optional, for debugging)
python3 trajectory_extractor.py --export-jsonl

# Dry run
python3 trajectory_extractor.py --dry-run

Output: sessions.db → tool_analytics table (ADR-118 Tier 3 - regenerable) Optional: ~/.coditect/logs/trajectory.jsonl (with --export-jsonl) """

import argparse import hashlib import json import os import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, Iterator, Optional, Set, Any, List

Paths - ADR-114 & ADR-118: Use centralized path discovery

CODITECT_HOME = Path.home() / ".coditect" SCRIPT_DIR = Path(file).parent sys.path.insert(0, str(SCRIPT_DIR / "core"))

try: from paths import ( get_context_storage_dir, get_sessions_db_path, SESSIONS_DB as _SESSIONS_DB, ) SESSIONS_DB = _SESSIONS_DB except ImportError: # ADR-118: tool_analytics goes to sessions.db (Tier 3 - regenerable) _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): SESSIONS_DB = _user_data / "sessions.db" else: SESSIONS_DB = CODITECT_HOME / "context-storage" / "sessions.db"

TRAJECTORY_LOG = CODITECT_HOME / "logs" / "trajectory.jsonl" CLAUDE_PROJECTS = Path.home() / ".claude" / "projects"

Tool category mapping

TOOL_CATEGORIES = { 'Read': 'file_ops', 'Write': 'file_ops', 'Edit': 'file_ops', 'Glob': 'search', 'Grep': 'search', 'Bash': 'execution', 'Task': 'orchestration', 'Skill': 'orchestration', 'WebFetch': 'web', 'WebSearch': 'web', 'AskUserQuestion': 'interaction', 'NotebookEdit': 'file_ops', 'TaskCreate': 'task_management', 'TaskGet': 'task_management', 'TaskUpdate': 'task_management', 'TaskList': 'task_management', 'TaskOutput': 'task_management', 'KillShell': 'execution', 'EnterPlanMode': 'interaction', 'ExitPlanMode': 'interaction', }

def compute_tool_hash(session_id: str, tool_name: str, timestamp: str, tool_input: Dict) -> str: """Compute unique hash for a tool call.""" input_str = json.dumps(tool_input, sort_keys=True, default=str) hash_input = f"{session_id}:{tool_name}:{timestamp}:{input_str}" return hashlib.sha256(hash_input.encode()).hexdigest()[:32]

def get_existing_hashes(conn: sqlite3.Connection) -> Set[str]: """Load existing trajectory hashes from database.""" cursor = conn.execute( "SELECT trajectory_hash FROM tool_analytics WHERE trajectory_hash IS NOT NULL" ) return {row[0] for row in cursor.fetchall()}

def extract_task_id(tool_input: Dict) -> Optional[str]: """Extract task ID from tool input description.""" description = tool_input.get('description', '') if description: # Pattern: X.N.N[.N]: action import re match = re.match(r'^([A-Z].\d+.\d+(?:.\d+)?)', description) if match: return match.group(1) return None

def extract_tool_calls(file_path: Path) -> Iterator[Dict[str, Any]]: """Extract tool_use entries from a JSONL session file.""" session_id = file_path.stem

try:
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue

try:
entry = json.loads(line)
except json.JSONDecodeError:
continue

entry_type = entry.get('type', '')
timestamp = entry.get('timestamp', '')

# Handle tool_use entries
if entry_type == 'tool_use':
tool_name = entry.get('name', 'unknown')
tool_input = entry.get('input', {})

yield {
'session_id': session_id,
'tool_name': tool_name,
'tool_input': tool_input,
'timestamp': timestamp,
'line_num': line_num,
'source_file': str(file_path),
}

# Also handle assistant messages with tool_use content blocks
elif entry_type == 'assistant' or entry.get('role') == 'assistant':
message = entry.get('message', {})
content_blocks = message.get('content', [])
if isinstance(content_blocks, list):
for block in content_blocks:
if isinstance(block, dict) and block.get('type') == 'tool_use':
tool_name = block.get('name', 'unknown')
tool_input = block.get('input', {})

yield {
'session_id': session_id,
'tool_name': tool_name,
'tool_input': tool_input,
'timestamp': timestamp,
'line_num': line_num,
'source_file': str(file_path),
}
except Exception as e:
print(f" Warning: Error reading {file_path}: {e}", file=sys.stderr)

def find_session_files() -> Iterator[Path]: """Find all JSONL session files.""" if not CLAUDE_PROJECTS.exists(): return

for project_dir in CLAUDE_PROJECTS.iterdir():
if project_dir.is_dir():
for jsonl_file in project_dir.glob("*.jsonl"):
yield jsonl_file

def insert_tool_call(conn: sqlite3.Connection, tool_call: Dict, tool_hash: str) -> bool: """Insert a tool call into tool_analytics table.""" tool_name = tool_call['tool_name'] tool_input = tool_call['tool_input']

try:
conn.execute("""
INSERT INTO tool_analytics (
session_id, tool_name, tool_category, task_id,
status, input_size_bytes, created_at, trajectory_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
tool_call['session_id'],
tool_name,
TOOL_CATEGORIES.get(tool_name, 'other'),
extract_task_id(tool_input),
'success', # Default status for extracted calls
len(json.dumps(tool_input)),
tool_call['timestamp'] or datetime.now(timezone.utc).isoformat(),
tool_hash
))
return True
except sqlite3.IntegrityError:
# Duplicate hash - already exists
return False
except Exception as e:
print(f" Warning: Failed to insert tool call: {e}", file=sys.stderr)
return False

def process_trajectory( files: Optional[list] = None, dry_run: bool = False, verbose: bool = False, export_jsonl: bool = False ) -> Dict[str, int]: """ Extract trajectory data from session files with deduplication. Writes to sessions.db tool_analytics table (ADR-118 Tier 3).

Returns stats dict with counts.
"""
stats = {
'files_processed': 0,
'tool_calls_found': 0,
'new_entries': 0,
'duplicates_skipped': 0,
}

# Connect to database (ADR-118: sessions.db for tool_analytics)
if not dry_run:
if not SESSIONS_DB.exists():
print(f"Error: Database not found at {SESSIONS_DB}", file=sys.stderr)
return stats
conn = sqlite3.connect(str(SESSIONS_DB))
else:
conn = None

# Load existing hashes for deduplication
existing_hashes = get_existing_hashes(conn) if conn else set()
new_hashes = set()

# Determine files to process
if files:
session_files = [Path(f) for f in files if Path(f).exists()]
else:
session_files = list(find_session_files())

if not session_files:
print("No session files found to process.", file=sys.stderr)
if conn:
conn.close()
return stats

# Optional JSONL export
jsonl_file = None
if export_jsonl and not dry_run:
TRAJECTORY_LOG.parent.mkdir(parents=True, exist_ok=True)
jsonl_file = open(TRAJECTORY_LOG, 'a', encoding='utf-8')

try:
for file_path in session_files:
stats['files_processed'] += 1
file_new = 0

for tool_call in extract_tool_calls(file_path):
stats['tool_calls_found'] += 1

# Compute hash
tool_hash = compute_tool_hash(
tool_call['session_id'],
tool_call['tool_name'],
tool_call['timestamp'],
tool_call['tool_input']
)

# Skip if already exists
if tool_hash in existing_hashes or tool_hash in new_hashes:
stats['duplicates_skipped'] += 1
continue

# New entry
new_hashes.add(tool_hash)

if not dry_run:
# Insert into database
if insert_tool_call(conn, tool_call, tool_hash):
stats['new_entries'] += 1
file_new += 1

# Optional JSONL export
if jsonl_file:
entry = {
'hash': tool_hash,
'session_id': tool_call['session_id'],
'tool_name': tool_call['tool_name'],
'tool_input': tool_call['tool_input'],
'timestamp': tool_call['timestamp'],
'extracted_at': datetime.now(timezone.utc).isoformat(),
}
jsonl_file.write(json.dumps(entry) + '\n')
else:
stats['duplicates_skipped'] += 1
else:
stats['new_entries'] += 1
file_new += 1

if verbose and file_new > 0:
print(f" {file_path.name}: {file_new} new tool calls")

# Commit database changes
if conn:
conn.commit()

finally:
if jsonl_file:
jsonl_file.close()
if conn:
conn.close()

return stats

def main(): parser = argparse.ArgumentParser(description='Extract trajectory from session files into sessions.db (ADR-118)') parser.add_argument('--file', '-f', help='Specific file to process') parser.add_argument('--dry-run', action='store_true', help='Show what would be extracted') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') parser.add_argument('--export-jsonl', action='store_true', help='Also export to trajectory.jsonl') parser.add_argument('--stats', action='store_true', help='Show database stats')

args = parser.parse_args()

if args.stats:
if SESSIONS_DB.exists():
conn = sqlite3.connect(str(SESSIONS_DB))
cursor = conn.execute("SELECT COUNT(*) FROM tool_analytics")
total = cursor.fetchone()[0]
cursor = conn.execute("SELECT COUNT(*) FROM tool_analytics WHERE trajectory_hash IS NOT NULL")
with_hash = cursor.fetchone()[0]
conn.close()
print(f"Tool analytics records: {total:,}")
print(f"With trajectory_hash: {with_hash:,}")
return

files = [args.file] if args.file else None

print("Extracting trajectory data to sessions.db (ADR-118)...")
stats = process_trajectory(
files=files,
dry_run=args.dry_run,
verbose=args.verbose,
export_jsonl=args.export_jsonl
)

print(f"\nTrajectory extraction complete:")
print(f" Files processed: {stats['files_processed']}")
print(f" Tool calls found: {stats['tool_calls_found']}")
print(f" New entries: {stats['new_entries']}")
print(f" Duplicates skipped: {stats['duplicates_skipped']}")
print(f" Output: {SESSIONS_DB} (ADR-118 Tier 3)")

if args.export_jsonl and not args.dry_run:
print(f" JSONL export: {TRAJECTORY_LOG}")

if name == "main": main()