Skip to main content

#!/usr/bin/env python3 """ Claude Conversation Export Deduplicator

Hybrid deduplication system for Claude Code conversation exports that combines:

  • Sequence number tracking (primary deduplication mechanism)
  • Content hashing (secondary, catches exact duplicates)
  • Append-only log (persistent storage with zero data loss)
  • Idempotent processing (safe to re-run on same exports)

Solves the exponential growth problem in multi-day sessions:

  • Day 1: 13KB export
  • Day 2: 51KB export (cumulative)
  • Day 3: 439KB export (cumulative with full history)

Expected storage reduction: 95%+ through deduplication.

Author: Claude + AZ1.AI License: MIT """

import hashlib import json import logging from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple, Any

Configure logging

logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(name)

class ClaudeConversationDeduplicator: """ Hybrid deduplication for Claude conversation exports.

Combines:
- Sequence number tracking (primary)
- Content hashing (secondary, for exact duplicates)
- Append-only log (persistence)
- Idempotent processing (safety)

Usage:
dedup = ClaudeConversationDeduplicator(storage_dir='dedup_state')

# Process first export
new_msgs = dedup.process_export('session-1', export_data)
print(f"Added {len(new_msgs)} new messages")

# Process second export (with duplicates)
new_msgs = dedup.process_export('session-1', export_data_2)
print(f"Added {len(new_msgs)} new messages (duplicates filtered)")

# Get full conversation
full = dedup.get_full_conversation('session-1')

# Get statistics
stats = dedup.get_statistics('session-1')
"""

def __init__(self, storage_dir: str):
"""
Initialize deduplicator with persistent storage directory.

Args:
storage_dir: Path to directory for state files
"""
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)

# State files
self.watermarks_file = self.storage_dir / "watermarks.json"
self.content_hashes_file = self.storage_dir / "content_hashes.json"
self.log_file = self.storage_dir / "conversation_log.jsonl"

# Load state
self.watermarks = self._load_json(self.watermarks_file, default={})
self.content_hashes = self._load_json(self.content_hashes_file, default={})

logger.info(f"Deduplicator initialized with storage at {self.storage_dir}")
logger.info(f"Loaded {len(self.watermarks)} conversation watermarks")

def process_export(
self, conversation_id: str, export_data: Dict[str, Any], dry_run: bool = False
) -> Tuple[List[Dict], Dict[str, Any]]:
"""
Process a Claude conversation export, returning only new unique messages.

Args:
conversation_id: Unique identifier for the conversation/session
export_data: Export dict with 'messages' array
dry_run: If True, don't save state (for testing)

Returns:
Tuple of (new_messages, statistics)
- new_messages: List of new unique messages not seen before
- statistics: Dict with processing stats
"""
messages = export_data.get("messages", [])
new_messages = []
duplicates_filtered = 0
content_collisions = 0

# Get current state for this conversation
watermark = self.watermarks.get(conversation_id, -1)
seen_hashes = set(self.content_hashes.get(conversation_id, []))
original_seen_count = len(seen_hashes)

logger.info(f"Processing export for '{conversation_id}'")
logger.info(f" Current watermark: {watermark}")
logger.info(f" Messages in export: {len(messages)}")
logger.info(f" Known unique hashes: {len(seen_hashes)}")

for msg in sorted(messages, key=lambda m: m.get("index", 0)):
msg_index = msg.get("index", 0)

# Check 1: Sequence number (primary deduplication)
if msg_index <= watermark:
duplicates_filtered += 1
continue # Already processed by sequence

# Check 2: Content hash (catch exact duplicates)
content_hash = self._create_message_hash(msg)
if content_hash in seen_hashes:
# Same content but higher sequence - edge case
content_collisions += 1
logger.warning(
f"Content collision at index {msg_index}: "
f"Same content as earlier message (hash: {content_hash[:8]}...)"
)
continue

# New unique message - add to results
new_messages.append(msg)
seen_hashes.add(content_hash)
watermark = max(watermark, msg_index)

# Append to persistent log (if not dry run)
if not dry_run:
self._append_to_log(conversation_id, msg, content_hash)

# Update state (if not dry run)
if new_messages and not dry_run:
self.watermarks[conversation_id] = watermark
self.content_hashes[conversation_id] = list(seen_hashes)
self._save_state()

# Calculate statistics
stats = {
"conversation_id": conversation_id,
"messages_in_export": len(messages),
"new_messages": len(new_messages),
"duplicates_filtered": duplicates_filtered,
"content_collisions": content_collisions,
"new_watermark": watermark,
"total_unique_messages": len(seen_hashes),
"new_hashes_added": len(seen_hashes) - original_seen_count,
}

logger.info("Processing complete:")
logger.info(f" New messages: {stats['new_messages']}")
logger.info(f" Duplicates filtered: {stats['duplicates_filtered']}")
logger.info(f" Content collisions: {stats['content_collisions']}")
logger.info(f" New watermark: {stats['new_watermark']}")

return new_messages, stats

def _create_message_hash(self, message: Dict[str, Any]) -> str:
"""
Create SHA-256 hash of message content for deduplication.

Normalizes message to exclude ephemeral fields like timestamps,
focusing only on semantic content.

Args:
message: Message dict

Returns:
Hex digest of SHA-256 hash
"""
# Normalize message to exclude ephemeral fields
normalized = {
"role": message.get("type", message.get("role")),
"content": message.get("message", message.get("content")),
"index": message.get("index", 0),
}
content_str = json.dumps(normalized, sort_keys=True)
return hashlib.sha256(content_str.encode()).hexdigest()

def _append_to_log(
self, conversation_id: str, message: Dict[str, Any], content_hash: str
) -> None:
"""
Append message to append-only log.

The log provides:
- Persistent storage of all unique messages
- Audit trail of when messages were added
- Recovery mechanism if state files are corrupted

Args:
conversation_id: Conversation identifier
message: Message dict to append
content_hash: Pre-computed content hash
"""
event = {
"conversation_id": conversation_id,
"timestamp": datetime.utcnow().isoformat(),
"content_hash": content_hash,
"message": message,
}

try:
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(event) + "\n")
except Exception as e:
logger.error(f"Failed to append to log: {e}")
raise

def _save_state(self) -> None:
"""
Persist watermarks and content hashes atomically.

Uses atomic write pattern (write to temp file, then rename)
to prevent corruption if process is interrupted.
"""
try:
self._save_json(self.watermarks_file, self.watermarks)
self._save_json(self.content_hashes_file, self.content_hashes)
logger.debug("State saved successfully")
except Exception as e:
logger.error(f"Failed to save state: {e}")
raise

def _load_json(self, filepath: Path, default: Optional[Any] = None) -> Any:
"""
Load JSON from file or return default.

Args:
filepath: Path to JSON file
default: Default value if file doesn't exist

Returns:
Loaded JSON data or default value
"""
if filepath.exists():
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError as e:
logger.error(f"Corrupted JSON in {filepath}: {e}")
return default if default is not None else {}
return default if default is not None else {}

def _save_json(self, filepath: Path, data: Any) -> None:
"""
Save JSON to file atomically.

Args:
filepath: Path to JSON file
data: Data to serialize
"""
# Write to temp file first
temp_file = filepath.with_suffix(".tmp")
with open(temp_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# Atomic rename
temp_file.replace(filepath)

def get_full_conversation(self, conversation_id: str) -> List[Dict[str, Any]]:
"""
Reconstruct full conversation from append-only log.

This is the source of truth for all messages, providing:
- Complete conversation history
- Chronological ordering by message index
- Validation that no messages were lost

Args:
conversation_id: Conversation identifier

Returns:
List of all messages for this conversation, sorted by index
"""
messages: List[Dict[str, Any]] = []

if not self.log_file.exists():
logger.warning(f"Log file does not exist: {self.log_file}")
return messages

logger.info(f"Reconstructing conversation '{conversation_id}' from log")

try:
with open(self.log_file, "r", encoding="utf-8") as f:
for line in f:
event = json.loads(line)
if event["conversation_id"] == conversation_id:
messages.append(event["message"])
except Exception as e:
logger.error(f"Failed to read log file: {e}")
raise

# Sort by index for chronological order
messages = sorted(messages, key=lambda m: m.get("index", 0))
logger.info(f"Reconstructed {len(messages)} messages")

return messages

def get_statistics(self, conversation_id: str) -> Dict[str, Any]:
"""
Get statistics for a conversation.

Args:
conversation_id: Conversation identifier

Returns:
Dict with watermark, unique message count, etc.
"""
watermark = self.watermarks.get(conversation_id, -1)
unique_hashes = len(self.content_hashes.get(conversation_id, []))

return {
"conversation_id": conversation_id,
"watermark": watermark,
"unique_messages": unique_hashes,
"total_messages_processed": watermark + 1 if watermark >= 0 else 0,
}

def get_all_conversations(self) -> List[str]:
"""
Get list of all conversation IDs in the system.

Returns:
List of conversation IDs
"""
return list(self.watermarks.keys())

def validate_integrity(self, conversation_id: str) -> Dict[str, Any]:
"""
Validate data integrity for a conversation.

Checks:
- Watermark consistency
- Hash count matches log count
- No gaps in sequence numbers

Args:
conversation_id: Conversation identifier

Returns:
Dict with validation results
"""
logger.info(f"Validating integrity for '{conversation_id}'")

# Get state
watermark = self.watermarks.get(conversation_id, -1)
hashes = self.content_hashes.get(conversation_id, [])

# Get messages from log
messages = self.get_full_conversation(conversation_id)

# Check 1: Hash count matches message count
hash_count_ok = len(hashes) == len(messages)

# Check 2: No gaps in sequence
if messages:
indices = [m.get("index", 0) for m in messages]
expected_indices = list(range(len(messages)))
sequence_ok = indices == expected_indices
else:
sequence_ok = True

# Check 3: Watermark matches highest index
if messages:
max_index = max(m.get("index", 0) for m in messages)
watermark_ok = watermark == max_index
else:
watermark_ok = watermark == -1

results = {
"conversation_id": conversation_id,
"valid": hash_count_ok and sequence_ok and watermark_ok,
"checks": {
"hash_count_matches": hash_count_ok,
"no_sequence_gaps": sequence_ok,
"watermark_correct": watermark_ok,
},
"stats": {
"watermark": watermark,
"hash_count": len(hashes),
"message_count": len(messages),
},
}

if results["valid"]:
logger.info("✓ Integrity validation passed")
else:
logger.warning("✗ Integrity validation failed!")
logger.warning(f" Details: {results['checks']}")

return results

def parse_claude_export_file(filepath: Path) -> Dict[str, Any]: """ Parse Claude Code conversation export file.

Claude exports are text-based with special markers:
- ⏺ = User message/action
- ⎿ = Assistant response/tool result

Args:
filepath: Path to export file

Returns:
Dict with 'messages' array containing parsed conversation
"""
logger.info(f"Parsing Claude export: {filepath}")

with open(filepath, "r", encoding="utf-8") as f:
lines = f.readlines()

messages = []
current_message = None
current_content = []
index = 0

for line in lines:
# User message/action marker
if line.startswith("⏺"):
# Save previous message if exists
if current_message is not None:
current_message["content"] = "".join(current_content).strip()
messages.append(current_message)
index += 1

# Start new user message
current_message = {"index": index, "role": "user", "type": "user"}
current_content = [line[2:]] # Remove marker

# Assistant response marker
elif line.strip().startswith("⎿"):
# Save previous message if exists
if current_message is not None:
current_message["content"] = "".join(current_content).strip()
messages.append(current_message)
index += 1

# Start new assistant message
current_message = {"index": index, "role": "assistant", "type": "assistant"}
current_content = [line.strip()[2:]] # Remove marker and whitespace

# Continuation of current message
else:
if current_message is not None:
current_content.append(line)

# Save final message
if current_message is not None:
current_message["content"] = "".join(current_content).strip()
messages.append(current_message)

logger.info(f"Parsed {len(messages)} messages from export")

return {"messages": messages}

def extract_session_id_from_filename(filepath: Path) -> str: """ Extract session ID from export filename.

Examples:
2025-11-16-EXPORT-CHECKPOINT.txt → 2025-11-16-checkpoint
2025-11-17-EXPORT-ROLLOUT-MASTER.txt → rollout-master

Args:
filepath: Path to export file

Returns:
Session ID string
"""
filename = filepath.stem # Remove .txt extension

# Remove common prefixes
filename = filename.replace("EXPORT-", "").replace("export-", "")

# Convert to lowercase and replace spaces/underscores with dashes
session_id = filename.lower().replace("_", "-").replace(" ", "-")

return session_id

if name == "main": # CLI interface for testing import argparse

parser = argparse.ArgumentParser(description="Claude Conversation Deduplicator")
parser.add_argument("export_file", help="Path to Claude export file")
parser.add_argument(
"--storage-dir", default="dedup_state", help="Directory for state files"
)
parser.add_argument(
"--session-id", help="Session ID (auto-detected if not provided)"
)
parser.add_argument(
"--dry-run", action="store_true", help="Process without saving state"
)
parser.add_argument("--verbose", action="store_true", help="Verbose logging")

args = parser.parse_args()

if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

# Parse export file
export_path = Path(args.export_file)
export_data = parse_claude_export_file(export_path)

# Detect or use provided session ID
session_id = args.session_id or extract_session_id_from_filename(export_path)

# Process with deduplicator
dedup = ClaudeConversationDeduplicator(args.storage_dir)
new_messages, stats = dedup.process_export(
session_id, export_data, dry_run=args.dry_run
)

# Print results
print(f"\n{'='*60}")
print(f"Processing Results for: {session_id}")
print(f"{'='*60}")
print(f"Messages in export: {stats['messages_in_export']}")
print(f"New messages: {stats['new_messages']}")
print(f"Duplicates filtered: {stats['duplicates_filtered']}")
print(f"Content collisions: {stats['content_collisions']}")
print(f"New watermark: {stats['new_watermark']}")
print(f"Total unique messages: {stats['total_unique_messages']}")

if not args.dry_run:
# Validate integrity
validation = dedup.validate_integrity(session_id)
print(f"\nIntegrity: {'✓ VALID' if validation['valid'] else '✗ INVALID'}")