scripts-unified-message-extractor-v3.0.0-backup
#!/usr/bin/env python3 """
title: "Unified Message Extractor" component_type: script version: "3.0.0" audience: contributor status: stable summary: "Extract ALL data from Claude Code sessions - zero data loss" keywords: ['extractor', 'database', 'session', 'deduplication', 'indexing'] tokens: ~500 created: 2025-12-22 updated: 2025-12-23 script_name: "unified-message-extractor.py" language: python executable: true usage: "python3 scripts/unified-message-extractor.py [options]" python_version: "3.10+" dependencies: [] modifies_files: true network_access: false requires_auth: false
Unified Message Extractor for CODITECT v3.0
Extracts ALL data from:
- Native JSONL session files (~/.claude/projects/*.jsonl)
- Export TXT files (from /export command)
Captures ALL 6 entry types:
- user: User messages with todos, tool results, thinking metadata
- assistant: AI responses with token usage, model info, errors
- system: Compaction events, retries, errors
- queue-operation: Command queue operations
- summary: Conversation summaries
- file-history-snapshot: File backup data
Zero data loss - raw JSON preserved for all entries.
Usage: python3 scripts/unified-message-extractor.py --jsonl SESSION.jsonl python3 scripts/unified-message-extractor.py --export EXPORT.txt python3 scripts/unified-message-extractor.py --batch --min-size 10 python3 scripts/unified-message-extractor.py --merge # Merge both stores """
import json import hashlib import re import sys import os import subprocess from pathlib import Path from datetime import datetime, timezone from typing import Iterator, Dict, Any, Optional, List
def find_context_storage() -> Path: """Find the real context-storage directory, preventing duplicate databases.
Checks multiple candidate paths and returns the one with actual data.
Falls back to script's parent directory to ensure we use coditect-core's storage.
"""
# Get the script's directory (should be coditect-core/scripts/)
script_dir = Path(__file__).resolve().parent
coditect_core_root = script_dir.parent
# Candidate paths in priority order
candidates = [
Path("context-storage"), # If already in coditect-core
Path(".coditect/context-storage"), # Via symlink from parent repo
coditect_core_root / "context-storage", # Relative to script location
]
# First, check for existing unified_messages.jsonl with real data (>1MB = has content)
for candidate in candidates:
jsonl_path = candidate / "unified_messages.jsonl"
if jsonl_path.exists():
try:
size = jsonl_path.stat().st_size
if size > 1_000_000: # >1MB means real data
return candidate.resolve()
except (OSError, PermissionError):
continue
# No large data found - use script's parent (coditect-core/context-storage)
# This prevents creating storage in wrong directories
return coditect_core_root / "context-storage"
Dynamic paths based on actual location
_CONTEXT_STORAGE = find_context_storage() DEFAULT_OUTPUT_DIR = str(_CONTEXT_STORAGE) DEFAULT_ARCHIVE_DIR = str(_CONTEXT_STORAGE / "exports-archive")
def is_claude_export_file(file_path: str) -> bool: """ Detect if a file is a Claude Code /export file by checking for signature patterns.
Claude Code exports have distinctive markers:
1. ASCII art banner (▐▛███▜▌ or similar Unicode box drawing)
2. "Claude Code v" version string in first 5 lines
3. Model identifier (Opus, Sonnet, Haiku)
4. Path line with ~/
5. ════ separator line
6. Message markers: "> " for user, "⏺ " for assistant
Returns True if file matches export format, False otherwise.
"""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
# Read first 2KB for header detection
header = f.read(2048)
# Must have at least one of these signature patterns
signatures = [
'Claude Code v', # Version string
'▐▛███▜▌', # ASCII art banner (top)
'▝▜█████▛▘', # ASCII art banner (middle)
'Opus 4', # Model identifier
'Sonnet 4', # Model identifier
'Sonnet 3.5', # Model identifier
'═══════', # Separator line
]
has_signature = any(sig in header for sig in signatures)
# Must also have message markers somewhere in file
# Read more if needed for marker detection
if has_signature:
content = header + f.read(8192) # Read more for markers
has_user_marker = '\n> ' in content or content.startswith('> ')
has_assistant_marker = '⏺ ' in content
return has_user_marker or has_assistant_marker
return False
except Exception:
return False
def detect_file_type(file_path: str) -> Optional[str]: """ Auto-detect file type: 'jsonl', 'export', or None (unknown). """ path = Path(file_path)
# Check extension first
if path.suffix.lower() == '.jsonl':
return 'jsonl'
# For .txt or other files, check content
if path.suffix.lower() in ['.txt', '.md', '']:
if is_claude_export_file(file_path):
return 'export'
# Try JSONL detection for extensionless files
try:
with open(file_path, 'r', encoding='utf-8') as f:
first_line = f.readline().strip()
if first_line.startswith('{') and first_line.endswith('}'):
json.loads(first_line) # Validate JSON
return 'jsonl'
except:
pass
return None
Unified output format
def create_unified_message( content: str, role: str, source_type: str, # "jsonl" or "export" source_file: str, source_line: Optional[int] = None, session_id: Optional[str] = None, checkpoint: Optional[str] = None, timestamp: Optional[str] = None, metadata: Optional[Dict] = None, token_usage: Optional[Dict] = None, # Token accounting (ADR-005) agent_context: Optional[Dict] = None # Agent metadata for MoE/Task outputs ) -> Dict[str, Any]: """Create a unified message format from any source."""
# Generate content hash
content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
now = datetime.now(timezone.utc).isoformat()
msg = {
"hash": content_hash,
"content": content,
"role": role,
"provenance": {
"source_type": source_type,
"source_file": source_file,
"source_line": source_line,
"session_id": session_id,
"checkpoint": checkpoint,
},
"timestamps": {
"occurred": timestamp or now,
"extracted_at": now,
},
"metadata": metadata or {
"content_length": len(content),
"has_code": "```" in content or " " in content,
"has_markdown": any(c in content for c in ["#", "**", "- ", "| "]),
}
}
# Add token_usage if provided (ADR-005: Token Accounting)
if token_usage:
msg["token_usage"] = token_usage
# Add agent_context if provided (agent session metadata)
if agent_context:
msg["agent_context"] = agent_context
return msg
def extract_token_usage(entry: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Extract token usage from a JSONL assistant message entry.
Claude Code JSONL format for assistant messages:
{
"type": "assistant",
"message": {
"usage": {
"input_tokens": 12847,
"output_tokens": 3521,
"cache_read_input_tokens": 89234,
"cache_creation_input_tokens": 45123
}
},
"costUSD": 0.082437,
"durationMs": 4523
}
Returns dict with token usage or None if not available.
"""
# Get usage from message.usage
message = entry.get('message', {})
usage = message.get('usage', {})
# Check if we have any token data
if not usage and 'costUSD' not in entry:
return None
# Extract model from message if available
model = message.get('model', entry.get('model'))
return {
'input_tokens': usage.get('input_tokens', 0),
'output_tokens': usage.get('output_tokens', 0),
'cache_read_input_tokens': usage.get('cache_read_input_tokens', 0),
'cache_creation_input_tokens': usage.get('cache_creation_input_tokens', 0),
'cost_usd': entry.get('costUSD', 0.0),
'duration_ms': entry.get('durationMs', 0),
'model': model
}
def extract_agent_context(entry: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Extract agent context from a JSONL entry.
Agent JSONL entries have additional fields for MoE/Task agent sessions:
{
"agentId": "ab782dc",
"parentUuid": "b101bc78-e0c9-4824-b815-3e94f6cc3ba3",
"isSidechain": true,
"uuid": "62a64845-97ca-4ee9-967b-930196b50567",
"slug": "kind-chasing-hennessy",
"sessionId": "60cacdf6-403f-4350-8abc-42111016a762",
"cwd": "/path/to/project",
"version": "2.0.75",
"gitBranch": "main",
"userType": "external"
}
Returns dict with agent context or None if not an agent session.
"""
agent_id = entry.get('agentId')
# Not an agent session if no agentId
if not agent_id:
return None
return {
'agent_id': agent_id,
'parent_uuid': entry.get('parentUuid'),
'is_sidechain': entry.get('isSidechain', False),
'uuid': entry.get('uuid'),
'slug': entry.get('slug'),
'parent_session_id': entry.get('sessionId'), # The parent session, not the agent session
'cwd': entry.get('cwd'),
'version': entry.get('version'),
'git_branch': entry.get('gitBranch'),
'user_type': entry.get('userType'),
}
def create_comprehensive_entry( entry: Dict[str, Any], source_file: str, source_line: int ) -> Dict[str, Any]: """ Create a comprehensive entry that captures ALL data from any JSONL entry type.
Supports all 6 entry types:
- user, assistant: Conversational messages
- system: System events (compaction, errors)
- queue-operation: Command queue
- summary: Conversation summaries
- file-history-snapshot: File backups
Zero data loss - raw JSON preserved.
"""
entry_type = entry.get('type', 'unknown')
now = datetime.now(timezone.utc).isoformat()
# Extract content based on entry type
content = None
if entry_type in ('user', 'assistant'):
message = entry.get('message', {})
if isinstance(message.get('content'), str):
content = message['content']
elif isinstance(message.get('content'), list):
texts = []
for block in message['content']:
if isinstance(block, dict):
if block.get('type') == 'text':
texts.append(block.get('text', ''))
elif block.get('type') == 'tool_result':
texts.append(f"[Tool Result: {block.get('content', '')[:500]}]")
elif block.get('type') == 'tool_use':
texts.append(f"[Tool: {block.get('name', 'unknown')}]")
content = '\n'.join(texts)
role = message.get('role', entry_type)
elif entry_type == 'system':
content = entry.get('content', '')
role = 'system'
elif entry_type == 'queue-operation':
content = entry.get('content', '')
role = 'queue'
elif entry_type == 'summary':
content = entry.get('summary', '')
role = 'summary'
elif entry_type == 'file-history-snapshot':
snapshot = entry.get('snapshot', {})
files = list(snapshot.get('trackedFileBackups', {}).keys())
content = f"File snapshot: {len(files)} files - {', '.join(files[:5])}"
if len(files) > 5:
content += f"... and {len(files) - 5} more"
role = 'snapshot'
else:
content = json.dumps(entry)[:1000]
role = 'unknown'
# Generate hash for deduplication
hash_input = f"{entry_type}:{entry.get('uuid', '')}:{entry.get('timestamp', '')}:{content or ''}"
content_hash = hashlib.sha256(hash_input.encode('utf-8')).hexdigest()
# Build comprehensive entry
result = {
"hash": content_hash,
"entry_type": entry_type,
"content": content,
"role": role,
# Identity fields
"uuid": entry.get('uuid'),
"parent_uuid": entry.get('parentUuid'),
"logical_parent_uuid": entry.get('logicalParentUuid'),
# Session context
"session_id": entry.get('sessionId'),
"agent_id": entry.get('agentId'),
"slug": entry.get('slug'),
# Environment
"cwd": entry.get('cwd'),
"git_branch": entry.get('gitBranch'),
"version": entry.get('version'),
# Timestamps
"timestamp": entry.get('timestamp'),
"extracted_at": now,
# Flags
"is_sidechain": entry.get('isSidechain', False),
"is_meta": entry.get('isMeta', False),
"user_type": entry.get('userType'),
# Source tracking
"source_file": source_file,
"source_line": source_line,
# CRITICAL: Preserve raw JSON for zero data loss
"raw_json": json.dumps(entry),
}
# Type-specific fields
if entry_type == 'assistant':
message = entry.get('message', {})
usage = message.get('usage', {})
result['message_data'] = {
'message_id': message.get('id'),
'request_id': entry.get('requestId'),
'model': message.get('model'),
'stop_reason': message.get('stop_reason'),
'stop_sequence': message.get('stop_sequence'),
'input_tokens': usage.get('input_tokens', 0),
'output_tokens': usage.get('output_tokens', 0),
'cache_read_tokens': usage.get('cache_read_input_tokens', 0),
'cache_creation_tokens': usage.get('cache_creation_input_tokens', 0),
'is_api_error': entry.get('isApiErrorMessage', False),
'error': entry.get('error'),
}
# Token cost calculation
cost_usd = entry.get('costUSD', 0.0)
duration_ms = entry.get('durationMs', 0)
if cost_usd or duration_ms:
result['message_data']['cost_usd'] = cost_usd
result['message_data']['duration_ms'] = duration_ms
elif entry_type == 'user':
message = entry.get('message', {})
result['message_data'] = {
'source_tool_use_id': entry.get('sourceToolUseID'),
'tool_use_result': json.dumps(entry.get('toolUseResult')) if entry.get('toolUseResult') else None,
'thinking_metadata': json.dumps(entry.get('thinkingMetadata')) if entry.get('thinkingMetadata') else None,
'todos': json.dumps(entry.get('todos')) if entry.get('todos') else None,
'is_compact_summary': entry.get('isCompactSummary', False),
'is_visible_transcript_only': entry.get('isVisibleInTranscriptOnly', False),
}
elif entry_type == 'system':
compact_meta = entry.get('compactMetadata', {})
result['system_data'] = {
'subtype': entry.get('subtype'),
'level': entry.get('level'),
'compact_trigger': compact_meta.get('trigger'),
'compact_pre_tokens': compact_meta.get('preTokens'),
'cause': entry.get('cause'),
'error': entry.get('error'),
'max_retries': entry.get('maxRetries'),
'retry_attempt': entry.get('retryAttempt'),
'retry_in_ms': entry.get('retryInMs'),
}
elif entry_type == 'queue-operation':
result['queue_data'] = {
'operation': entry.get('operation'),
}
elif entry_type == 'summary':
result['summary_data'] = {
'summary_text': entry.get('summary'),
'leaf_uuid': entry.get('leafUuid'),
}
elif entry_type == 'file-history-snapshot':
snapshot = entry.get('snapshot', {})
result['snapshot_data'] = {
'message_id': entry.get('messageId'),
'is_snapshot_update': entry.get('isSnapshotUpdate', False),
'snapshot_timestamp': snapshot.get('timestamp'),
'tracked_files': snapshot.get('trackedFileBackups', {}),
}
return result
class JSONLExtractor: """Extract messages from native Claude Code JSONL session files."""
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self.session_id = self.file_path.stem
def extract(self) -> Iterator[Dict[str, Any]]:
"""Yield unified messages from JSONL file (legacy format)."""
with open(self.file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
# Extract based on entry type
msg = self._parse_entry(entry, line_num)
if msg:
yield msg
def extract_comprehensive(self) -> Iterator[Dict[str, Any]]:
"""
Yield comprehensive entries from JSONL file.
Captures ALL 6 entry types with ALL fields.
Zero data loss - raw JSON preserved.
"""
with open(self.file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
# Create comprehensive entry for ALL types
yield create_comprehensive_entry(
entry=entry,
source_file=str(self.file_path),
source_line=line_num
)
def _parse_entry(self, entry: Dict, line_num: int) -> Optional[Dict[str, Any]]:
"""Parse a single JSONL entry into unified format."""
entry_type = entry.get('type', '')
# Extract agent context (if this is an agent session)
agent_context = extract_agent_context(entry)
# User message
if entry_type == 'user' or entry.get('role') == 'user':
content = self._extract_content(entry)
if content:
return create_unified_message(
content=content,
role="user",
source_type="jsonl",
source_file=str(self.file_path),
source_line=line_num,
session_id=self.session_id,
timestamp=entry.get('timestamp'),
agent_context=agent_context,
)
# Assistant message
elif entry_type == 'assistant' or entry.get('role') == 'assistant':
content = self._extract_content(entry)
if content:
# Extract token usage for assistant messages (ADR-005)
token_usage = extract_token_usage(entry)
return create_unified_message(
content=content,
role="assistant",
source_type="jsonl",
source_file=str(self.file_path),
source_line=line_num,
session_id=self.session_id,
timestamp=entry.get('timestamp'),
token_usage=token_usage,
agent_context=agent_context,
)
# Tool use
elif entry_type == 'tool_use':
tool_name = entry.get('name', 'unknown')
tool_input = entry.get('input', {})
content = f"[Tool: {tool_name}]\n{json.dumps(tool_input, indent=2)}"
return create_unified_message(
content=content,
role="tool_use",
source_type="jsonl",
source_file=str(self.file_path),
source_line=line_num,
session_id=self.session_id,
timestamp=entry.get('timestamp'),
metadata={"tool_name": tool_name},
agent_context=agent_context,
)
# Tool result
elif entry_type == 'tool_result':
content = str(entry.get('content', ''))[:5000] # Truncate large results
return create_unified_message(
content=content,
role="tool_result",
source_type="jsonl",
source_file=str(self.file_path),
source_line=line_num,
session_id=self.session_id,
timestamp=entry.get('timestamp'),
agent_context=agent_context,
)
return None
def _extract_content(self, entry: Dict) -> Optional[str]:
"""Extract text content from various entry formats."""
# Direct content field
if 'content' in entry:
content = entry['content']
if isinstance(content, str):
return content.strip() if content.strip() else None
elif isinstance(content, list):
# Handle content blocks
texts = []
for block in content:
if isinstance(block, dict):
if block.get('type') == 'text':
texts.append(block.get('text', ''))
elif block.get('type') == 'tool_use':
texts.append(f"[Tool: {block.get('name', 'unknown')}]")
elif isinstance(block, str):
texts.append(block)
return '\n'.join(texts).strip() if texts else None
# Message field
if 'message' in entry:
return self._extract_content(entry['message'])
# Text field
if 'text' in entry:
return entry['text'].strip() if entry['text'] else None
return None
class ExportTXTExtractor: """Extract messages from Claude Code /export TXT files."""
# Patterns for parsing export format
USER_PATTERN = re.compile(r'^> (.+?)(?=\n\n⏺|\n\n>|\Z)', re.MULTILINE | re.DOTALL)
ASSISTANT_PATTERN = re.compile(r'^⏺ (.+?)(?=\n\n⏺|\n\n>|\Z)', re.MULTILINE | re.DOTALL)
TOOL_PATTERN = re.compile(r'^⏺ (Bash|Read|Write|Edit|Grep|Glob|Task)\((.+?)\)\s*\n\s*⎿\s*(.+?)(?=\n\n⏺|\n\n>|\Z)', re.MULTILINE | re.DOTALL)
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self.checkpoint = self.file_path.stem
def extract(self) -> Iterator[Dict[str, Any]]:
"""Yield unified messages from export TXT file."""
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Remove header (everything before first > or ⏺)
header_end = min(
content.find('\n> ') if content.find('\n> ') > 0 else len(content),
content.find('\n⏺ ') if content.find('\n⏺ ') > 0 else len(content)
)
content = content[header_end:]
# Split into message blocks
blocks = self._split_into_blocks(content)
for i, (role, text, line_approx) in enumerate(blocks):
if not text.strip():
continue
yield create_unified_message(
content=text.strip(),
role=role,
source_type="export",
source_file=str(self.file_path),
source_line=line_approx,
checkpoint=self.checkpoint,
)
def _split_into_blocks(self, content: str) -> List[tuple]:
"""Split export content into (role, text, line_num) tuples."""
blocks = []
lines = content.split('\n')
current_role = None
current_text = []
current_start = 0
for i, line in enumerate(lines):
# User message starts with >
if line.startswith('> '):
if current_text and current_role:
blocks.append((current_role, '\n'.join(current_text), current_start))
current_role = 'user'
current_text = [line[2:]] # Remove "> "
current_start = i
# Assistant message/action starts with ⏺
elif line.startswith('⏺ '):
if current_text and current_role:
blocks.append((current_role, '\n'.join(current_text), current_start))
# Check if it's a tool call
tool_match = re.match(r'⏺ (Bash|Read|Write|Edit|Grep|Glob|Task|WebFetch|WebSearch)\(', line)
if tool_match:
current_role = 'tool_use'
else:
current_role = 'assistant'
current_text = [line[2:]] # Remove "⏺ "
current_start = i
# Tool output starts with ⎿
elif line.startswith(' ⎿'):
if current_role == 'tool_use':
current_text.append(line)
else:
# Standalone tool result
if current_text and current_role:
blocks.append((current_role, '\n'.join(current_text), current_start))
current_role = 'tool_result'
current_text = [line[4:]] # Remove " ⎿ "
current_start = i
# Continuation of current block
elif current_role:
current_text.append(line)
# Don't forget the last block
if current_text and current_role:
blocks.append((current_role, '\n'.join(current_text), current_start))
return blocks
class UnifiedMessageStore: """Unified storage for deduplicated messages from all sources."""
def __init__(self, store_path: str):
self.store_path = Path(store_path)
self.store_path.mkdir(parents=True, exist_ok=True)
self.hashes_file = self.store_path / 'unified_hashes.json'
self.messages_file = self.store_path / 'unified_messages.jsonl'
self.stats_file = self.store_path / 'unified_stats.json'
self.hashes = self._load_hashes()
self.stats = {
'total_processed': 0,
'unique_count': len(self.hashes),
'duplicates_filtered': 0,
'sources': {'jsonl': 0, 'export': 0}
}
def _load_hashes(self) -> set:
"""Load existing hashes."""
if self.hashes_file.exists():
with open(self.hashes_file, 'r') as f:
data = json.load(f)
if isinstance(data, list):
return set(data)
return set(data.get('hashes', []))
return set()
def add_message(self, msg: Dict[str, Any]) -> bool:
"""Add message if unique. Returns True if new."""
msg_hash = msg['hash']
self.stats['total_processed'] += 1
self.stats['sources'][msg['provenance']['source_type']] += 1
if msg_hash in self.hashes:
self.stats['duplicates_filtered'] += 1
return False
self.hashes.add(msg_hash)
self.stats['unique_count'] = len(self.hashes)
# Append to messages file
with open(self.messages_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(msg) + '\n')
return True
def save(self):
"""Save hashes and stats."""
with open(self.hashes_file, 'w') as f:
json.dump({
'updated': datetime.now(timezone.utc).isoformat(),
'total_unique': len(self.hashes),
'hashes': list(self.hashes)
}, f, indent=2)
with open(self.stats_file, 'w') as f:
json.dump(self.stats, f, indent=2)
print(f"\n✅ Saved {len(self.hashes):,} unique messages to {self.store_path}")
def process_jsonl(file_path: str, store: UnifiedMessageStore) -> Dict: """Process a single JSONL file."""
extractor = JSONLExtractor(file_path)
new_count = 0
total = 0
for msg in extractor.extract():
total += 1
if store.add_message(msg):
new_count += 1
return {'total': total, 'new': new_count, 'file': file_path}
def get_file_hash(file_path: str) -> str: """Compute SHA256 hash of file content.""" import hashlib hasher = hashlib.sha256() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(65536), b''): hasher.update(chunk) return hasher.hexdigest()
def archive_export_file(file_path: str, archive_dir: str = None) -> Optional[str]: """ Move processed export file to archive directory with hash-based deduplication.
Uses DEFAULT_ARCHIVE_DIR if archive_dir is not specified.
Args:
file_path: Path to export file
archive_dir: Archive destination directory
Returns:
New path if archived, 'duplicate' if already exists, None if failed
"""
if archive_dir is None:
archive_dir = DEFAULT_ARCHIVE_DIR
try:
source = Path(file_path)
archive_path = Path(archive_dir)
archive_path.mkdir(parents=True, exist_ok=True)
# Compute hash of source file
source_hash = get_file_hash(str(source))
# Check if any existing archive file has the same hash
for existing in archive_path.glob('*.txt'):
if get_file_hash(str(existing)) == source_hash:
# Duplicate content - delete source instead of archiving
source.unlink()
return 'duplicate'
dest = archive_path / source.name
# Handle duplicate names (different content, same name)
if dest.exists():
stem = source.stem
suffix = source.suffix
counter = 1
while dest.exists():
dest = archive_path / f"{stem}-{counter}{suffix}"
counter += 1
# Move the file
import shutil
shutil.move(str(source), str(dest))
return str(dest)
except Exception as e:
print(f" ⚠️ Failed to archive {file_path}: {e}")
return None
def process_export(file_path: str, store: UnifiedMessageStore, archive: bool = True, archive_dir: str = None) -> Dict: """ Process a single export TXT file.
Args:
file_path: Path to export file
store: UnifiedMessageStore instance
archive: If True, move file to archive after processing
archive_dir: Archive destination directory (uses DEFAULT_ARCHIVE_DIR if None)
Returns:
Dict with processing results
"""
if archive_dir is None:
archive_dir = DEFAULT_ARCHIVE_DIR
extractor = ExportTXTExtractor(file_path)
new_count = 0
total = 0
for msg in extractor.extract():
total += 1
if store.add_message(msg):
new_count += 1
result = {'total': total, 'new': new_count, 'file': file_path, 'archived': None}
# Archive after successful processing
if archive and total > 0:
archived_path = archive_export_file(file_path, archive_dir)
result['archived'] = archived_path
return result
def find_jsonl_files(min_size_mb: float = 0) -> List[Path]: """Find all JSONL session files."""
claude_dir = Path.home() / '.claude' / 'projects'
files = []
if claude_dir.exists():
for f in claude_dir.rglob('*.jsonl'):
size_mb = f.stat().st_size / (1024 * 1024)
if size_mb >= min_size_mb:
files.append(f)
return sorted(files, key=lambda f: f.stat().st_size, reverse=True)
def find_git_root() -> Optional[Path]: """Find the git repository root from current directory.""" try: result = subprocess.run( ['git', 'rev-parse', '--show-toplevel'], capture_output=True, text=True, check=True ) return Path(result.stdout.strip()) except (subprocess.CalledProcessError, FileNotFoundError): return None
def find_export_files(search_paths: List[str] = None, verify_content: bool = True) -> List[Path]: """ Find all Claude Code export TXT files.
Searches from the ACTUAL current working directory (where command was invoked),
not from where the script is located. Also searches parent directories up to
the git root to find exports anywhere in the project tree.
Args:
search_paths: Directories to search (if None, auto-discovers from cwd)
verify_content: If True, verify each file is actually a Claude export
by checking for signature patterns (slower but accurate)
Returns:
List of verified export file paths
"""
if search_paths is None:
# Use ACTUAL current working directory (where user ran command)
cwd = Path(os.getcwd()).resolve()
git_root = find_git_root()
search_paths = [
str(cwd), # Current directory where command was run
]
# Add parent directories up to 5 levels (handles submodule case)
# This ensures we find exports in parent repos even when in a submodule
current = cwd
for _ in range(5):
if current.parent != current:
current = current.parent
if str(current) not in search_paths:
search_paths.append(str(current))
else:
break
# Also check common export locations relative to cwd, git root, and parents
bases = [cwd]
if git_root:
bases.append(git_root)
# Add parents as bases too
current = cwd
for _ in range(3):
if current.parent != current:
current = current.parent
bases.append(current)
for base in bases:
if base:
for subdir in ['MEMORY-CONTEXT/exports-archive', 'MEMORY-CONTEXT',
'context-storage/exports-archive']:
candidate = base / subdir
if candidate.exists() and str(candidate) not in search_paths:
search_paths.append(str(candidate))
candidates = []
# Directories to exclude from recursive search (performance + irrelevant)
exclude_dirs = {
'node_modules', '.git', '.venv', 'venv', '__pycache__', '.mypy_cache',
'.pytest_cache', 'dist', 'build', '.tox', '.eggs', '*.egg-info',
'exports-archive', 'archive', # Already processed
}
# First pass: find candidate files by recursive search
# Search from git root to find exports anywhere in the project tree
git_root = find_git_root()
search_root = git_root if git_root else Path(os.getcwd())
# Also check rollout-master parent if we're in a submodule
potential_rollout = search_root.parent.parent
if (potential_rollout / 'submodules').exists():
search_root = potential_rollout
def should_exclude(path: Path) -> bool:
"""Check if path should be excluded from search."""
parts = path.parts
for part in parts:
if part in exclude_dirs or part.endswith('.egg-info'):
return True
return False
# Recursive search for .txt files
try:
for f in search_root.rglob('*.txt'):
if not should_exclude(f):
candidates.append(f)
except PermissionError:
pass # Skip directories we can't access
# Remove duplicates
candidates = list(set(candidates))
# Filter out files already in archive directories to prevent re-processing
# This prevents the -1-1-1... filename proliferation bug
archive_patterns = ['exports-archive', 'archive']
filtered_candidates = []
for f in candidates:
path_str = str(f.resolve())
if not any(pattern in path_str for pattern in archive_patterns):
filtered_candidates.append(f)
candidates = filtered_candidates
# Second pass: verify each file is actually a Claude export
if verify_content:
verified = []
for f in candidates:
try:
if is_claude_export_file(str(f)):
verified.append(f)
except Exception:
continue
return sorted(verified, key=lambda f: f.stat().st_mtime, reverse=True)
return sorted(candidates, key=lambda f: f.stat().st_mtime, reverse=True)
def main(): import argparse
# Custom formatter for better help display
class CustomFormatter(argparse.RawDescriptionHelpFormatter):
def __init__(self, prog):
super().__init__(prog, max_help_position=35, width=100)
epilog = f"""
EXAMPLES: %(prog)s Process ALL JSONL + exports (default) %(prog)s session.txt Process single file (auto-detect) %(prog)s --min-size 10 Only JSONL files >10MB %(prog)s --dry-run Preview what would be processed %(prog)s file.txt --verify Test if file is Claude export %(prog)s --no-archive Keep exports in place after processing
STORAGE LOCATIONS: Extracted messages: {DEFAULT_OUTPUT_DIR}/unified_messages.jsonl Export archive: {DEFAULT_ARCHIVE_DIR}/ JSONL sessions: ~/.claude/projects/ (READ-ONLY)
WORKFLOW: /export Export current session to TXT /cx Extract all + archive exports (this command) /cxq "search" Search the database """
parser = argparse.ArgumentParser(
description='Context Extraction - Extract messages from JSONL sessions and export TXT files',
epilog=epilog,
formatter_class=CustomFormatter
)
# Positional file argument
parser.add_argument('file', nargs='?', metavar='FILE',
help='Single file to process (auto-detects JSONL or export TXT)')
# Processing options
proc_group = parser.add_argument_group('Processing Options')
proc_group.add_argument('--jsonl', metavar='FILE',
help='Process single JSONL file (explicit type)')
proc_group.add_argument('--export', metavar='FILE',
help='Process single export TXT file (explicit type)')
proc_group.add_argument('--min-size', type=float, default=0, metavar='MB',
help='Min JSONL size in MB for batch (default: 0 = all)')
proc_group.add_argument('--merge', action='store_true',
help='Merge existing legacy dedup stores')
# Archive options
archive_group = parser.add_argument_group('Archive Options')
archive_group.add_argument('--no-archive', action='store_true',
help='Keep export files in place (do not archive)')
archive_group.add_argument('--archive-dir', metavar='DIR',
default=DEFAULT_ARCHIVE_DIR,
help=f'Archive directory (default: {DEFAULT_ARCHIVE_DIR})')
# Output options
output_group = parser.add_argument_group('Output Options')
output_group.add_argument('--output', '-o', metavar='DIR',
default=DEFAULT_OUTPUT_DIR,
help=f'Output directory (default: {DEFAULT_OUTPUT_DIR})')
output_group.add_argument('--dry-run', '-n', action='store_true',
help='Show what would be processed without changes')
output_group.add_argument('--verify', '-v', action='store_true',
help='Test if file is Claude export (detection test)')
# Auto-indexing options
index_group = parser.add_argument_group('Indexing Options')
index_group.add_argument('--no-index', action='store_true',
help='Skip auto-indexing after extraction (indexing is ON by default)')
index_group.add_argument('--with-embeddings', action='store_true',
help='Also generate semantic embeddings for RAG search (slower)')
args = parser.parse_args()
# Determine if we should batch (default when no specific file given)
do_batch = not args.file and not args.jsonl and not args.export
# Handle --verify mode (just test file detection)
if args.verify:
file_path = args.file
if not file_path:
print("Error: --verify requires a file argument")
print("Usage: python3 unified-message-extractor.py FILE --verify")
sys.exit(1)
file_type = detect_file_type(file_path)
is_export = is_claude_export_file(file_path)
print(f"File: {file_path}")
print(f"Detected type: {file_type or 'unknown'}")
print(f"Is Claude export: {is_export}")
if is_export:
# Show what signatures matched
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
header = f.read(500)
print(f"\nHeader preview:\n{header[:300]}...")
return
# Initialize store
store = UnifiedMessageStore(args.output)
print("=" * 60)
print("CODITECT Unified Message Extractor")
print("=" * 60)
print()
print("Storage Locations:")
print(f" Extracted messages: {args.output}/unified_messages.jsonl")
print(f" Export archive: {args.archive_dir}/")
print(f" JSONL sessions: ~/.claude/projects/ (READ-ONLY, never moved)")
print()
print(f"Existing unique messages: {len(store.hashes):,}")
print()
if args.dry_run:
print("DRY RUN - No changes will be made\n")
results = []
# Process single file with auto-detection (positional argument)
if args.file and not args.verify:
file_type = detect_file_type(args.file)
if args.dry_run:
print(f"Would process {args.file} (detected: {file_type or 'unknown'})")
elif file_type == 'jsonl':
print(f"Processing JSONL (auto-detected): {args.file}")
result = process_jsonl(args.file, store)
results.append(result)
print(f" → {result['new']:,} new / {result['total']:,} total")
elif file_type == 'export':
archive = not args.no_archive
print(f"Processing export (auto-detected): {args.file}")
result = process_export(args.file, store, archive=archive, archive_dir=args.archive_dir)
results.append(result)
print(f" → {result['new']:,} new / {result['total']:,} total")
if result.get('archived'):
if result['archived'] == 'duplicate':
print(f" → Deleted (duplicate content already in archive)")
else:
print(f" → Archived to: {result['archived']}")
else:
print(f"⚠️ Unknown file type: {args.file}")
print(" Not a Claude Code export or JSONL session file")
# Process single JSONL
if args.jsonl:
if args.dry_run:
print(f"Would process JSONL: {args.jsonl}")
else:
print(f"Processing JSONL: {args.jsonl}")
result = process_jsonl(args.jsonl, store)
results.append(result)
print(f" → {result['new']:,} new / {result['total']:,} total")
# Process single export
if args.export:
archive = not args.no_archive
if args.dry_run:
print(f"Would process export: {args.export}")
if archive:
print(f" Would archive to: {args.archive_dir}")
else:
print(f"Processing export: {args.export}")
result = process_export(args.export, store, archive=archive, archive_dir=args.archive_dir)
results.append(result)
print(f" → {result['new']:,} new / {result['total']:,} total")
if result.get('archived'):
if result['archived'] == 'duplicate':
print(f" → Deleted (duplicate content already in archive)")
else:
print(f" → Archived to: {result['archived']}")
# Batch process all (default when no file specified)
if do_batch:
jsonl_files = find_jsonl_files(args.min_size)
export_files = find_export_files()
# Show where we're searching for exports
cwd = Path(os.getcwd()).resolve()
git_root = find_git_root()
print(f"Search context:")
print(f" Current directory: {cwd}")
if git_root:
print(f" Git root: {git_root}")
print()
print(f"Found {len(jsonl_files)} JSONL files (>= {args.min_size} MB)")
print(f"Found {len(export_files)} export files")
print()
if args.dry_run:
for f in jsonl_files[:10]:
size = f.stat().st_size / (1024*1024)
print(f" JSONL: {f.name} ({size:.1f} MB)")
if len(jsonl_files) > 10:
print(f" ... and {len(jsonl_files) - 10} more")
print()
for f in export_files[:10]:
print(f" Export: {f.name}")
if len(export_files) > 10:
print(f" ... and {len(export_files) - 10} more")
else:
# Process JSONL files
print("Processing JSONL files...")
for i, f in enumerate(jsonl_files, 1):
size = f.stat().st_size / (1024*1024)
print(f" [{i}/{len(jsonl_files)}] {f.name} ({size:.1f} MB)", end='', flush=True)
result = process_jsonl(str(f), store)
results.append(result)
print(f" → {result['new']:,} new")
# Process export files
archive = not args.no_archive
archived_count = 0
duplicate_count = 0
print("\nProcessing export files...")
if archive:
print(f" (archiving to: {args.archive_dir})")
for i, f in enumerate(export_files, 1):
print(f" [{i}/{len(export_files)}] {f.name}", end='', flush=True)
result = process_export(str(f), store, archive=archive, archive_dir=args.archive_dir)
results.append(result)
if result.get('archived'):
if result['archived'] == 'duplicate':
duplicate_count += 1
else:
archived_count += 1
print(f" → {result['new']:,} new")
if archive and (archived_count > 0 or duplicate_count > 0):
print(f"\n ✓ Archived {archived_count} export files to {args.archive_dir}")
if duplicate_count > 0:
print(f" ✓ Deleted {duplicate_count} duplicate files (content already in archive)")
# Merge existing stores
if args.merge:
print("Merging existing stores...")
# Import from JSONL processor store
jsonl_store = Path('MEMORY-CONTEXT/dedup_state/unique_messages.jsonl')
if jsonl_store.exists():
print(f" Importing from {jsonl_store}...")
count = 0
with open(jsonl_store, 'r') as f:
for line in f:
try:
msg = json.loads(line)
# Convert to unified format
unified = create_unified_message(
content=msg.get('content', ''),
role=msg.get('entry_type', 'unknown'),
source_type='jsonl',
source_file=msg.get('source_file', ''),
source_line=msg.get('source_line'),
session_id=msg.get('source_session'),
timestamp=msg.get('timestamp'),
)
if store.add_message(unified):
count += 1
except:
continue
print(f" → {count:,} new from JSONL store")
# Import from export-dedup store
export_store = Path('../../../MEMORY-CONTEXT/dedup_state/unique_messages.jsonl')
if export_store.exists():
print(f" Importing from {export_store}...")
count = 0
with open(export_store, 'r') as f:
for line in f:
try:
msg = json.loads(line)
inner = msg.get('message', {})
unified = create_unified_message(
content=inner.get('content', ''),
role=inner.get('role', 'unknown'),
source_type='export',
source_file='',
checkpoint=msg.get('checkpoint'),
timestamp=msg.get('first_seen'),
)
if store.add_message(unified):
count += 1
except:
continue
print(f" → {count:,} new from export store")
# Save results
if not args.dry_run and (results or args.merge):
store.save()
print("\n" + "=" * 60)
print("Summary")
print("=" * 60)
print(f"Total processed: {store.stats['total_processed']:,}")
print(f"Unique messages: {store.stats['unique_count']:,}")
print(f"Duplicates filtered: {store.stats['duplicates_filtered']:,}")
print(f"From JSONL: {store.stats['sources']['jsonl']:,}")
print(f"From exports: {store.stats['sources']['export']:,}")
if store.stats['total_processed'] > 0:
dedup_rate = store.stats['duplicates_filtered'] / store.stats['total_processed'] * 100
print(f"Deduplication rate: {dedup_rate:.1f}%")
# Auto-indexing (default ON, skip with --no-index)
if not args.no_index:
print("\n" + "=" * 60)
print("Auto-Indexing")
print("=" * 60)
script_dir = Path(__file__).parent
context_db = script_dir / "context-db.py"
if not context_db.exists():
print("⚠️ context-db.py not found, skipping auto-index")
else:
# Step 1: Index messages into SQLite (legacy messages table)
print("\n📥 Indexing messages into SQLite...")
try:
result = subprocess.run(
["python3", str(context_db), "--index"],
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
# Extract key stats from output
for line in result.stdout.split('\n'):
if 'inserted' in line.lower() or 'total' in line.lower() or 'skipped' in line.lower():
print(f" {line.strip()}")
if result.returncode == 0:
print(" ✅ Index complete")
else:
print(f" ⚠️ Index returned code {result.returncode}")
except subprocess.TimeoutExpired:
print(" ⚠️ Index timed out after 5 minutes")
except Exception as e:
print(f" ⚠️ Index error: {e}")
# Step 1b: Index comprehensive entries (ALL 6 types - ADR-025)
print("\n📥 Indexing comprehensive entries (all 6 types)...")
try:
result = subprocess.run(
["python3", str(context_db), "--index-comprehensive"],
capture_output=True,
text=True,
timeout=600 # 10 minute timeout for comprehensive
)
# Extract key stats from output
for line in result.stdout.split('\n'):
if any(x in line.lower() for x in ['user:', 'assistant:', 'system:', 'summary:',
'queue', 'file-history', 'total', 'indexed', 'skipped']):
print(f" {line.strip()}")
if result.returncode == 0:
print(" ✅ Comprehensive index complete (all 6 entry types)")
else:
print(f" ⚠️ Comprehensive index returned code {result.returncode}")
except subprocess.TimeoutExpired:
print(" ⚠️ Index timed out after 5 minutes")
except Exception as e:
print(f" ⚠️ Index error: {e}")
# Step 2: Extract knowledge (decisions, patterns, errors)
print("\n🧠 Extracting knowledge (decisions, patterns, errors)...")
try:
result = subprocess.run(
["python3", str(context_db), "--extract"],
capture_output=True,
text=True,
timeout=600 # 10 minute timeout
)
for line in result.stdout.split('\n'):
if any(x in line.lower() for x in ['decision', 'pattern', 'error', 'extracted', 'total']):
print(f" {line.strip()}")
if result.returncode == 0:
print(" ✅ Extraction complete")
else:
print(f" ⚠️ Extraction returned code {result.returncode}")
except subprocess.TimeoutExpired:
print(" ⚠️ Extraction timed out after 10 minutes")
except Exception as e:
print(f" ⚠️ Extraction error: {e}")
# Step 3: Index components (agents, commands, skills, scripts)
print("\n🧩 Indexing components...")
try:
result = subprocess.run(
["python3", str(context_db), "--index-components"],
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
for line in result.stdout.split('\n'):
if any(x in line.lower() for x in ['agent', 'command', 'skill', 'script', 'indexed', 'total', 'component']):
print(f" {line.strip()}")
if result.returncode == 0:
print(" ✅ Components indexed")
else:
print(f" ⚠️ Component index returned code {result.returncode}")
except subprocess.TimeoutExpired:
print(" ⚠️ Component index timed out")
except Exception as e:
print(f" ⚠️ Component index error: {e}")
# Step 4: Index documentation
print("\n📚 Indexing documentation...")
try:
result = subprocess.run(
["python3", str(context_db), "--index-docs"],
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
for line in result.stdout.split('\n'):
if any(x in line.lower() for x in ['doc', 'indexed', 'total', 'guide', 'reference']):
print(f" {line.strip()}")
if result.returncode == 0:
print(" ✅ Documentation indexed")
else:
print(f" ⚠️ Doc index returned code {result.returncode}")
except subprocess.TimeoutExpired:
print(" ⚠️ Doc index timed out")
except Exception as e:
print(f" ⚠️ Doc index error: {e}")
# Step 5: Generate embeddings (only with --with-embeddings)
if args.with_embeddings:
print("\n🔮 Generating semantic embeddings...")
print(" (This may take several minutes for large databases)")
try:
result = subprocess.run(
["python3", str(context_db), "--embeddings"],
capture_output=True,
text=True,
timeout=1800 # 30 minute timeout
)
for line in result.stdout.split('\n'):
if any(x in line.lower() for x in ['embedding', 'progress', 'complete', 'coverage']):
print(f" {line.strip()}")
if result.returncode == 0:
print(" ✅ Embeddings complete")
else:
print(f" ⚠️ Embeddings returned code {result.returncode}")
except subprocess.TimeoutExpired:
print(" ⚠️ Embeddings timed out after 30 minutes")
except Exception as e:
print(f" ⚠️ Embeddings error: {e}")
else:
print("\n💡 Tip: Use --with-embeddings to also generate semantic embeddings")
print("\n" + "=" * 60)
print("✅ AUTO-INDEXING COMPLETE")
print("=" * 60)
print("\nIndexed tables:")
print(" ✓ messages (conversation history)")
print(" ✓ entries (all 6 JSONL types)")
print(" ✓ decisions, patterns, errors (knowledge)")
print(" ✓ components (agents, commands, skills)")
print(" ✓ docs (documentation)")
print("\nQuery commands:")
print(" /cxq --recent 50 # Recent messages")
print(" /cxq --unified \"query\" # Search ALL tables")
print(" /cxq --decisions # View decisions")
print(" /cxq --patterns # View patterns")
print(" /cxq --components # View components")
print(" /cxq --graph-stats # Knowledge graph stats")
else:
print("\n⏭️ Skipping auto-index (--no-index specified)")
print(" Run manually: /cxq --index && /cxq --extract")
if name == 'main': main()