scripts-claude-extractor
#!/usr/bin/env python3 """
title: "Claude Session Extractor" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Extract session data from Claude Code JSONL files" keywords: ['claude', 'extractor', 'session', 'jsonl', 'anthropic'] tokens: ~500 created: 2026-01-28 updated: 2026-01-28 script_name: "claude_extractor.py" language: python executable: true usage: "from scripts.extractors.claude_extractor import ClaudeExtractor" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Claude Session Extractor for CODITECT /sx command.
Extracts session data from Claude Code native JSONL files and export TXT files.
Session file locations:
- Native: ~/.claude/projects/<project_hash>/<session_uuid>.jsonl
- Exports: ~/Claude-Exports/claude-export-*.txt
Track: J.13 (Memory - Generic Session Export) Task: J.13.2.1 """
from future import annotations
import json import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Tuple
Add parent paths for imports
_script_dir = Path(file).resolve().parent _scripts_dir = _script_dir.parent _coditect_root = _scripts_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root)) if str(_scripts_dir) not in sys.path: sys.path.insert(0, str(_scripts_dir))
from core.session_extractor import ( SessionExtractor, SessionMetadata, ExtractedEntry, ExtractionResult )
class ClaudeExtractor(SessionExtractor): """ Extracts session data from Claude Code sessions.
Supports:
- Native JSONL session files (~/.claude/projects/*.jsonl)
- Export TXT files (from /export command)
Entry types extracted:
- user: User messages with tool results
- assistant: AI responses with token usage
- system: Compaction events, retries
- tool_use: Tool invocations
- tool_result: Tool outputs
"""
# Message type constants from Claude Code
TYPE_USER = "user"
TYPE_ASSISTANT = "assistant"
TYPE_SYSTEM = "system"
TYPE_SUMMARY = "summary"
@property
def llm_name(self) -> str:
return "claude"
def can_extract(self, source: Path) -> bool:
"""Check if this extractor can handle the source."""
if not source.exists():
return False
# Native JSONL file
if source.suffix == ".jsonl":
# Check for Claude Code format markers
try:
with open(source, 'r', encoding='utf-8') as f:
first_line = f.readline()
if first_line:
data = json.loads(first_line)
# Claude Code uses specific entry types
return data.get("type") in ("user", "assistant", "system", "summary")
except (json.JSONDecodeError, IOError):
pass
# Export TXT file
if source.suffix == ".txt":
try:
with open(source, 'r', encoding='utf-8') as f:
header = f.read(500)
# Claude exports have specific header format
return "Claude Code" in header or "claude code export" in header.lower()
except IOError:
pass
return False
def extract(
self,
source: Path,
session_id: Optional[str] = None,
include_tool_results: bool = True,
include_thinking: bool = True,
**kwargs
) -> ExtractionResult:
"""
Extract session data from Claude Code source.
Args:
source: Path to JSONL or TXT file
session_id: Session ID (extracted from filename if not provided)
include_tool_results: Include tool result content
include_thinking: Include extended thinking content
Returns:
ExtractionResult with all extracted data
"""
if not source.exists():
return ExtractionResult(
success=False,
metadata=SessionMetadata(
session_id=session_id or "unknown",
llm_source=self.llm_name
),
errors=[f"Source file not found: {source}"]
)
# Determine format and extract
if source.suffix == ".jsonl":
return self._extract_jsonl(source, session_id, include_tool_results, include_thinking)
elif source.suffix == ".txt":
return self._extract_export(source, session_id, include_tool_results, include_thinking)
else:
return ExtractionResult(
success=False,
metadata=SessionMetadata(
session_id=session_id or "unknown",
llm_source=self.llm_name
),
errors=[f"Unsupported file format: {source.suffix}"]
)
def _extract_jsonl(
self,
source: Path,
session_id: Optional[str],
include_tool_results: bool,
include_thinking: bool
) -> ExtractionResult:
"""Extract from native JSONL session file."""
entries: List[ExtractedEntry] = []
errors: List[str] = []
warnings: List[str] = []
# Session ID from filename
if session_id is None:
session_id = source.stem
# Initialize metadata
metadata = SessionMetadata(
session_id=session_id,
llm_source=self.llm_name,
source_path=source
)
first_timestamp: Optional[datetime] = None
last_timestamp: Optional[datetime] = None
total_input = 0
total_output = 0
message_count = 0
last_message_id: Optional[str] = None
try:
with open(source, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError as e:
warnings.append(f"Line {line_num}: JSON parse error: {e}")
continue
# Extract timestamp
timestamp = self._parse_timestamp(data)
if timestamp:
if first_timestamp is None:
first_timestamp = timestamp
last_timestamp = timestamp
# Process by entry type
entry_type = data.get("type", "")
if entry_type == self.TYPE_USER:
extracted = self._extract_user_entry(
data, timestamp, last_message_id, include_tool_results
)
entries.extend(extracted)
message_count += 1
if extracted:
last_message_id = extracted[0].data.get("message_id")
elif entry_type == self.TYPE_ASSISTANT:
extracted = self._extract_assistant_entry(
data, timestamp, last_message_id, include_thinking
)
entries.extend(extracted)
message_count += 1
# Track token usage
usage = data.get("message", {}).get("usage", {})
total_input += usage.get("input_tokens", 0)
total_output += usage.get("output_tokens", 0)
if extracted:
last_message_id = extracted[0].data.get("message_id")
# Extract model info
if metadata.llm_model is None:
model = data.get("message", {}).get("model")
if model:
metadata.llm_model = model
elif entry_type == self.TYPE_SYSTEM:
# System entries (compaction, retries)
entry = self._create_message(
role="system",
content=data.get("message", ""),
timestamp=timestamp or datetime.now(timezone.utc),
raw=data
)
entries.append(entry)
except IOError as e:
errors.append(f"File read error: {e}")
return ExtractionResult(
success=False,
metadata=metadata,
errors=errors
)
# Update metadata
metadata.started_at = first_timestamp
metadata.ended_at = last_timestamp
metadata.total_messages = message_count
metadata.total_tokens_input = total_input
metadata.total_tokens_output = total_output
# Try to extract project/cwd from session directory
self._extract_path_metadata(source, metadata)
return ExtractionResult(
success=True,
metadata=metadata,
entries=entries,
errors=errors,
warnings=warnings
)
def _extract_user_entry(
self,
data: Dict[str, Any],
timestamp: datetime,
parent_id: Optional[str],
include_tool_results: bool
) -> List[ExtractedEntry]:
"""Extract entries from a user message."""
entries: List[ExtractedEntry] = []
message = data.get("message", {})
if isinstance(message, str):
# Simple string message
entry = self._create_message(
role="user",
content=message,
timestamp=timestamp,
parent_id=parent_id,
raw=data
)
entries.append(entry)
elif isinstance(message, dict):
# Complex message with content blocks
content_parts = []
content = message.get("content", [])
if isinstance(content, str):
content_parts.append(content)
elif isinstance(content, list):
for block in content:
if isinstance(block, str):
content_parts.append(block)
elif isinstance(block, dict):
block_type = block.get("type", "")
if block_type == "text":
content_parts.append(block.get("text", ""))
elif block_type == "tool_result" and include_tool_results:
# Create tool_result entry
tool_id = block.get("tool_use_id", "")
result_content = block.get("content", "")
if isinstance(result_content, list):
result_content = "\n".join(
b.get("text", str(b)) if isinstance(b, dict) else str(b)
for b in result_content
)
entries.append(self._create_tool_result(
tool_id=tool_id,
result=str(result_content),
timestamp=timestamp,
is_error=block.get("is_error", False),
raw=block
))
# Create main user message
if content_parts:
entry = self._create_message(
role="user",
content="\n".join(content_parts),
timestamp=timestamp,
parent_id=parent_id,
raw=data
)
entries.insert(0, entry) # Message first, then tool results
return entries
def _extract_assistant_entry(
self,
data: Dict[str, Any],
timestamp: datetime,
parent_id: Optional[str],
include_thinking: bool
) -> List[ExtractedEntry]:
"""Extract entries from an assistant message."""
entries: List[ExtractedEntry] = []
message = data.get("message", {})
content_parts = []
thinking_parts = []
content = message.get("content", [])
usage = message.get("usage", {})
if isinstance(content, str):
content_parts.append(content)
elif isinstance(content, list):
for block in content:
if isinstance(block, str):
content_parts.append(block)
elif isinstance(block, dict):
block_type = block.get("type", "")
if block_type == "text":
content_parts.append(block.get("text", ""))
elif block_type == "thinking" and include_thinking:
thinking_parts.append(block.get("thinking", ""))
elif block_type == "tool_use":
# Create tool_use entry
entries.append(self._create_tool_use(
tool_name=block.get("name", ""),
tool_input=block.get("input", {}),
tool_id=block.get("id", ""),
timestamp=timestamp,
parent_id=parent_id,
raw=block
))
# Create main assistant message
entry = self._create_message(
role="assistant",
content="\n".join(content_parts),
timestamp=timestamp,
parent_id=parent_id,
model=message.get("model"),
usage={
"input": usage.get("input_tokens", 0),
"output": usage.get("output_tokens", 0),
"cache_read": usage.get("cache_read_input_tokens", 0),
"cache_write": usage.get("cache_creation_input_tokens", 0)
} if usage else None,
thinking="\n".join(thinking_parts) if thinking_parts else None,
stop_reason=message.get("stop_reason"),
raw=data
)
entries.insert(0, entry)
return entries
def _extract_export(
self,
source: Path,
session_id: Optional[str],
include_tool_results: bool,
include_thinking: bool
) -> ExtractionResult:
"""Extract from Claude Code export TXT file."""
# Use state machine parser for TXT exports
# This is a simplified version - full implementation would use
# the parsing logic from unified-message-extractor.py
entries: List[ExtractedEntry] = []
errors: List[str] = []
if session_id is None:
# Extract from filename: claude-export-YYYYMMDD-HHMMSS.txt
match = re.search(r'claude-export-(\d{8})-(\d{6})', source.name)
if match:
session_id = f"export-{match.group(1)}-{match.group(2)}"
else:
session_id = source.stem
metadata = SessionMetadata(
session_id=session_id,
llm_source=self.llm_name,
source_path=source
)
try:
content = source.read_text(encoding='utf-8')
# Parse header for metadata
header_match = re.search(
r'Claude Code v([\d.]+).*?Model: ([^\n]+).*?Working directory: ([^\n]+)',
content[:1000],
re.DOTALL
)
if header_match:
metadata.llm_model = header_match.group(2).strip()
metadata.cwd = header_match.group(3).strip()
# Parse conversation turns
# Pattern: > (user message) or (assistant response)
turn_pattern = re.compile(
r'^>\s*(.+?)(?=^>|\Z|^───)',
re.MULTILINE | re.DOTALL
)
timestamp = datetime.now(timezone.utc)
role = "user" # Alternating
for i, match in enumerate(turn_pattern.finditer(content)):
turn_content = match.group(1).strip()
entry = self._create_message(
role=role,
content=turn_content,
timestamp=timestamp
)
entries.append(entry)
# Alternate roles
role = "assistant" if role == "user" else "user"
metadata.total_messages = len(entries)
except IOError as e:
errors.append(f"File read error: {e}")
return ExtractionResult(
success=False,
metadata=metadata,
errors=errors
)
return ExtractionResult(
success=True,
metadata=metadata,
entries=entries,
errors=errors
)
def _parse_timestamp(self, data: Dict[str, Any]) -> datetime:
"""Parse timestamp from entry data."""
# Try common timestamp fields
for field in ("timestamp", "ts", "time", "created_at"):
ts = data.get(field)
if ts:
try:
if isinstance(ts, (int, float)):
return datetime.fromtimestamp(ts, tz=timezone.utc)
elif isinstance(ts, str):
# Try ISO format
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except (ValueError, OSError):
pass
return datetime.now(timezone.utc)
def _extract_path_metadata(self, source: Path, metadata: SessionMetadata) -> None:
"""Extract project/cwd from session file path."""
# Claude Code sessions are in ~/.claude/projects/<hash>/<uuid>.jsonl
parts = source.parts
if "projects" in parts:
idx = parts.index("projects")
if idx + 1 < len(parts):
# Project hash is the directory name
project_hash = parts[idx + 1]
metadata.project_path = project_hash
def list_sessions(self, source: Path) -> List[SessionMetadata]:
"""List available sessions in source directory."""
sessions: List[SessionMetadata] = []
if source.is_file():
# Single file - return its metadata
result = self.extract(source)
if result.success:
sessions.append(result.metadata)
elif source.is_dir():
# Scan for JSONL files
for jsonl_file in source.glob("**/*.jsonl"):
try:
stat = jsonl_file.stat()
sessions.append(SessionMetadata(
session_id=jsonl_file.stem,
llm_source=self.llm_name,
source_path=jsonl_file,
started_at=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
))
except OSError:
pass
return sessions
if name == "main": import argparse import os
parser = argparse.ArgumentParser(description="Claude Session Extractor")
parser.add_argument("source", help="Session file or directory")
parser.add_argument("--list", action="store_true", help="List sessions only")
parser.add_argument("--output", "-o", help="Output directory for CUSF files")
parser.add_argument("--json", action="store_true", help="Output as JSON")
# ADR-156: Project-scoped context
parser.add_argument("--project", help="Project ID for attribution (e.g., CUST-avivatec-fpa)")
args = parser.parse_args()
# Get project from args or environment (ADR-156)
project_id = args.project or os.environ.get('CODITECT_PROJECT')
extractor = ClaudeExtractor()
source = Path(args.source)
if args.list:
sessions = extractor.list_sessions(source)
for s in sessions:
print(f"{s.session_id}: {s.source_path}")
else:
result = extractor.extract(source)
# Apply project attribution (ADR-156)
if project_id:
result.metadata.project_id = project_id
# Determine scope based on project pattern
if project_id.startswith('CUST-'):
result.metadata.scope = 'customer'
else:
result.metadata.scope = 'project'
if args.json:
import json
output_data = {
"metadata": result.metadata.to_dict(),
"entries": [e.to_dict() for e in result.entries],
"success": result.success,
"errors": result.errors
}
if args.output:
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H-%M-%SZ')
output_file = output_dir / f"{timestamp}-claude-{result.metadata.session_id[:8]}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, default=str)
print(f"✓ Exported to: {output_file}")
else:
print(json.dumps(output_data, indent=2, default=str))
else:
print(f"Success: {result.success}")
print(f"Session: {result.metadata.session_id}")
print(f"Model: {result.metadata.llm_model}")
print(f"Messages: {result.metadata.total_messages}")
print(f"Entries: {result.entry_count}")
if project_id:
print(f"Project: {result.metadata.project_id}")
print(f"Scope: {result.metadata.scope}")
if result.errors:
print(f"Errors: {result.errors}")