scripts-deduplicate-export
#!/usr/bin/env python3 """
title: "Add core scripts to path" component_type: script version: "1.0.0" audience: contributor status: stable summary: "CODITECT Conversation Export Deduplicator - CLI Tool" keywords: ['deduplicate', 'export', 'review', 'validation'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "deduplicate_export.py" language: python executable: true usage: "python3 scripts/deduplicate_export.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
CODITECT Conversation Export Deduplicator - CLI Tool
User-friendly command-line interface for conversation export deduplication. Supports single file, batch directory processing, statistics, and integrity checks.
PRODUCTION-GRADE ERROR HANDLING:
- Custom exception hierarchy (7 exceptions)
- Dual logging (file + stdout)
- Atomic file operations (temp → rename pattern)
- Backup before modifications
- Data integrity verification (checksums)
- Hash collision detection
- Resource cleanup with finally blocks
- Input validation
- Standardized exit codes (0/1/130)
- User-friendly error messages
Usage: deduplicate-export --file export.json --session-id my-session deduplicate-export --batch MEMORY-CONTEXT/exports/ deduplicate-export --stats --session-id my-session deduplicate-export --integrity --storage-dir MEMORY-CONTEXT/dedup_state
Author: Claude + AZ1.AI License: MIT """
import argparse import json import sys import logging import hashlib import shutil import tempfile import signal from pathlib import Path from typing import List, Optional, Dict, Any from datetime import datetime
Add core scripts to path
sys.path.insert(0, str(Path(file).parent))
from core.conversation_deduplicator import ( ClaudeConversationDeduplicator, parse_claude_export_file, extract_session_id_from_filename as extract_session_id_core )
============================================================================
CUSTOM EXCEPTIONS
============================================================================
class DedupError(Exception): """Base exception for deduplication operations""" pass
class SourceFileError(DedupError): """Export file not found or unreadable""" pass
class HashCollisionError(DedupError): """Hash collision detected during deduplication""" pass
class ProcessingError(DedupError): """Deduplication processing failure""" pass
class BackupError(DedupError): """Backup creation or restoration failure""" pass
class OutputError(DedupError): """Output file write failure""" pass
class DataIntegrityError(DedupError): """Data integrity verification failure""" pass
============================================================================
LOGGING CONFIGURATION
============================================================================
def setup_logging(log_dir: Path, verbose: bool = False) -> logging.Logger: """ Configure dual logging (file + stdout).
Args:
log_dir: Directory for log files
verbose: Enable verbose logging
Returns:
Configured logger instance
"""
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"deduplicate-export-{datetime.now().strftime('%Y%m%d-%H%M%S')}.log"
logger = logging.getLogger("deduplicate_export")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
# File handler - detailed logs
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# Console handler - user-friendly output
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
console_formatter = logging.Formatter("%(message)s")
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
return logger
============================================================================
SIGNAL HANDLING
============================================================================
class GracefulExit: """Handle graceful shutdown on SIGINT/SIGTERM"""
def __init__(self):
self.exit_requested = False
signal.signal(signal.SIGINT, self.request_exit)
signal.signal(signal.SIGTERM, self.request_exit)
def request_exit(self, signum, frame):
"""Set exit flag on signal"""
self.exit_requested = True
print("\n\n⚠️ Interrupt received. Cleaning up...")
============================================================================
UTILITY FUNCTIONS
============================================================================
def compute_file_checksum(filepath: Path) -> str: """ Compute SHA-256 checksum of file.
Args:
filepath: File to checksum
Returns:
Hex digest of SHA-256 hash
Raises:
DataIntegrityError: If checksum fails
"""
try:
sha256 = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
except Exception as e:
raise DataIntegrityError(f"Failed to compute checksum for {filepath}: {e}") from e
def create_backup(filepath: Path, logger: logging.Logger) -> Optional[Path]: """ Create timestamped backup of file.
Args:
filepath: File to backup
logger: Logger instance
Returns:
Path to backup file, or None if file doesn't exist
Raises:
BackupError: If backup creation fails
"""
try:
if not filepath.exists():
logger.debug(f"No existing file to backup: {filepath}")
return None
backup_path = filepath.parent / f"{filepath.name}.backup-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
# Copy file with metadata
shutil.copy2(filepath, backup_path)
# Verify backup integrity
original_checksum = compute_file_checksum(filepath)
backup_checksum = compute_file_checksum(backup_path)
if original_checksum != backup_checksum:
raise BackupError(f"Backup checksum mismatch for {filepath}")
logger.debug(f"Created backup: {backup_path} (verified)")
return backup_path
except BackupError:
raise
except Exception as e:
raise BackupError(f"Failed to create backup of {filepath}: {e}") from e
def atomic_write(filepath: Path, content: str, logger: logging.Logger) -> None: """ Atomically write content to file using temp + rename.
Args:
filepath: Target file path
content: Content to write
logger: Logger instance
Raises:
OutputError: If write fails
"""
temp_file = None
try:
# Ensure parent directory exists
filepath.parent.mkdir(parents=True, exist_ok=True)
# Create temp file in same directory for atomic rename
import os
temp_fd, temp_path = tempfile.mkstemp(
dir=filepath.parent,
prefix=f".{filepath.name}.tmp-",
text=True
)
temp_file = Path(temp_path)
# Write to temp file
with os.fdopen(temp_fd, 'w', encoding='utf-8') as f:
f.write(content)
# Verify write succeeded
if not temp_file.exists() or temp_file.stat().st_size == 0:
raise OutputError(f"Temp file write verification failed: {temp_file}")
# Atomic rename
temp_file.rename(filepath)
logger.debug(f"Atomically wrote: {filepath}")
except Exception as e:
# Clean up temp file on failure
if temp_file and temp_file.exists():
try:
temp_file.unlink()
except:
pass
raise OutputError(f"Failed to write {filepath}: {e}") from e
def verify_data_integrity( original_file: Path, processed_file: Path, logger: logging.Logger ) -> None: """ Verify data integrity after processing.
Args:
original_file: Original file
processed_file: Processed file to verify
logger: Logger instance
Raises:
DataIntegrityError: If integrity check fails
"""
try:
# Verify processed file exists and is readable
if not processed_file.exists():
raise DataIntegrityError(f"Processed file disappeared: {processed_file}")
if processed_file.stat().st_size == 0:
raise DataIntegrityError(f"Processed file is empty: {processed_file}")
# Compute checksums
original_checksum = compute_file_checksum(original_file)
processed_checksum = compute_file_checksum(processed_file)
# Note: For dedup, checksums WILL differ (that's expected!)
# We just verify both files are valid and non-corrupt
logger.debug(f"Original checksum: {original_checksum}")
logger.debug(f"Processed checksum: {processed_checksum}")
logger.debug(f"Data integrity verified: {processed_file}")
except DataIntegrityError:
raise
except Exception as e:
raise DataIntegrityError(f"Integrity verification failed: {e}") from e
============================================================================
COLOR OUTPUT
============================================================================
Shared Colors module (consolidates 36 duplicate definitions)
from colors import Colors
def print_header(text: str): """Print colored header""" print(f"\n{Colors.BOLD}{Colors.HEADER}{text}{Colors.ENDC}") print(f"{Colors.HEADER}{'=' * len(text)}{Colors.ENDC}\n")
def print_success(text: str): """Print success message""" print(f"{Colors.OKGREEN}✅ {text}{Colors.ENDC}")
def print_error(text: str): """Print error message""" print(f"{Colors.FAIL}❌ {text}{Colors.ENDC}")
def print_warning(text: str): """Print warning message""" print(f"{Colors.WARNING}⚠️ {text}{Colors.ENDC}")
def print_info(text: str): """Print info message""" print(f"{Colors.OKCYAN}ℹ️ {text}{Colors.ENDC}")
============================================================================
CORE FUNCTIONS
============================================================================
def extract_session_id_from_filename(filepath: Path) -> str: """ Extract session ID from export filename.
Wrapper around core function for consistency.
Args:
filepath: Export file path
Returns:
Extracted session ID
"""
return extract_session_id_core(filepath)
def parse_export_file(filepath: Path, logger: logging.Logger) -> Dict[str, Any]: """ Parse export file and convert to standard format.
Supports:
- JSON format (if already structured)
- Plain text format (Claude Code /export output)
Args:
filepath: Export file path
logger: Logger instance
Returns:
Parsed export data
Raises:
SourceFileError: If parsing fails
"""
try:
if not filepath.exists():
raise SourceFileError(f"Export file not found: {filepath}")
if not filepath.is_file():
raise SourceFileError(f"Not a regular file: {filepath}")
if filepath.stat().st_size == 0:
raise SourceFileError(f"Export file is empty: {filepath}")
with open(filepath, "r") as f:
content = f.read()
# Try parsing as JSON first
try:
data = json.loads(content)
if "messages" in data:
logger.debug(f"Parsed as JSON: {filepath}")
return data
except json.JSONDecodeError:
pass
# Plain text format - use proper Claude export parser
logger.debug(f"Parsing as Claude export format: {filepath}")
return parse_claude_export_file(filepath)
except SourceFileError:
raise
except Exception as e:
raise SourceFileError(f"Failed to parse export file {filepath}: {e}") from e
def process_single_file( filepath: Path, session_id: Optional[str], dedup: ClaudeConversationDeduplicator, dry_run: bool = False, verbose: bool = False, logger: logging.Logger = None, graceful_exit: GracefulExit = None ) -> Dict[str, Any]: """ Process a single export file with comprehensive error handling.
Args:
filepath: Export file to process
session_id: Session identifier (auto-detected if None)
dedup: Deduplicator instance
dry_run: Preview mode (no changes)
verbose: Verbose output
logger: Logger instance
graceful_exit: Graceful exit handler
Returns:
Processing result dictionary
Raises:
Various DedupError subclasses on failure
"""
backup_file = None
try:
if verbose:
print_info(f"Processing: {filepath}")
# Check for interrupt
if graceful_exit and graceful_exit.exit_requested:
raise KeyboardInterrupt()
# Auto-detect session ID if not provided
if not session_id:
session_id = extract_session_id_from_filename(filepath)
if verbose:
print_info(f"Auto-detected session ID: {session_id}")
# Create backup before processing (unless dry run)
if not dry_run:
backup_file = create_backup(filepath, logger)
# Parse export file
export_data = parse_export_file(filepath, logger)
# Process with deduplicator
try:
if dry_run:
new_messages, stats = dedup.process_export(session_id, export_data, dry_run=True)
else:
new_messages, stats = dedup.process_export(session_id, export_data)
except Exception as e:
# Check for hash collision
if "collision" in str(e).lower() or "hash" in str(e).lower():
raise HashCollisionError(f"Hash collision in {filepath.name}: {e}") from e
raise ProcessingError(f"Deduplication failed for {filepath.name}: {e}") from e
result = {
"success": True,
"session_id": session_id,
"file": str(filepath),
"total_messages": stats["messages_in_export"],
"new_messages": stats["new_messages"],
"duplicates_filtered": stats["duplicates_filtered"],
"content_collisions": stats["content_collisions"],
"deduplication_rate": (
(stats["duplicates_filtered"] / stats["messages_in_export"] * 100)
if stats["messages_in_export"] > 0
else 0
),
}
if verbose or not dry_run:
print_success(f"Processed {filepath.name}")
print(f" Session: {session_id}")
print(
f" Total: {stats['messages_in_export']} | "
f"New: {stats['new_messages']} | "
f"Duplicates: {stats['duplicates_filtered']}"
)
if stats["messages_in_export"] > 0:
print(f" Deduplication: {result['deduplication_rate']:.1f}%")
# Clean up backup on success (unless dry run)
if backup_file and backup_file.exists() and not dry_run:
try:
backup_file.unlink()
logger.debug(f"Cleaned up backup: {backup_file}")
except Exception as e:
logger.warning(f"Failed to clean up backup: {e}")
return result
except (SourceFileError, HashCollisionError, ProcessingError, BackupError):
# Restore from backup if available
if backup_file and backup_file.exists():
try:
shutil.copy2(backup_file, filepath)
logger.info(f"Restored from backup: {filepath}")
except Exception as restore_error:
logger.error(f"Failed to restore from backup: {restore_error}")
raise
except Exception as e:
# Restore from backup if available
if backup_file and backup_file.exists():
try:
shutil.copy2(backup_file, filepath)
logger.info(f"Restored from backup: {filepath}")
except Exception as restore_error:
logger.error(f"Failed to restore from backup: {restore_error}")
print_error(f"Processing failed: {e}")
return {"success": False, "error": str(e)}
def process_batch( directory: Path, dedup: ClaudeConversationDeduplicator, dry_run: bool = False, verbose: bool = False, logger: logging.Logger = None, graceful_exit: GracefulExit = None ) -> List[Dict[str, Any]]: """ Process all export files in a directory.
Args:
directory: Directory containing export files
dedup: Deduplicator instance
dry_run: Preview mode
verbose: Verbose output
logger: Logger instance
graceful_exit: Graceful exit handler
Returns:
List of processing results
Raises:
SourceFileError: If directory invalid
"""
try:
if not directory.exists():
raise SourceFileError(f"Directory not found: {directory}")
if not directory.is_dir():
raise SourceFileError(f"Not a directory: {directory}")
print_header(f"Batch Processing: {directory}")
# Find all export files
patterns = ["*.txt", "*.json", "*.md"]
export_files = []
for pattern in patterns:
export_files.extend(directory.glob(pattern))
if not export_files:
print_warning(f"No export files found in {directory}")
return []
print_info(f"Found {len(export_files)} files to process")
results = []
for filepath in sorted(export_files):
# Check for interrupt
if graceful_exit and graceful_exit.exit_requested:
logger.warning("Batch processing interrupted by user")
break
result = process_single_file(
filepath, None, dedup, dry_run, verbose, logger, graceful_exit
)
results.append(result)
# Summary
print_header("Batch Processing Summary")
successful = sum(1 for r in results if r.get("success"))
failed = len(results) - successful
print(f"Total files: {len(results)}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
if successful > 0:
total_msgs = sum(r.get("total_messages", 0) for r in results if r.get("success"))
total_new = sum(r.get("new_messages", 0) for r in results if r.get("success"))
total_dup = total_msgs - total_new
print(f"\nTotal messages: {total_msgs}")
print(f"New messages: {total_new}")
print(f"Duplicates filtered: {total_dup}")
if total_msgs > 0:
print(f"Overall deduplication rate: {total_dup / total_msgs * 100:.1f}%")
return results
except SourceFileError:
raise
except Exception as e:
raise ProcessingError(f"Batch processing failed: {e}") from e
def show_statistics( session_id: str, dedup: ClaudeConversationDeduplicator ) -> None: """ Display statistics for a session.
Args:
session_id: Session identifier
dedup: Deduplicator instance
Raises:
ProcessingError: If statistics retrieval fails
"""
try:
print_header(f"Statistics: {session_id}")
stats = dedup.get_statistics(session_id)
print(f"Session ID: {session_id}")
print(f"Watermark: {stats.get('watermark', 'N/A')}")
print(f"Unique messages: {stats.get('unique_messages', 'N/A')}")
print(f"Total messages: {stats.get('total_messages', 'N/A')}")
# Get full conversation for additional stats
try:
messages = dedup.get_full_conversation(session_id)
print(f"Reconstructable messages: {len(messages)}")
except Exception as e:
print_warning(f"Could not reconstruct conversation: {e}")
print_success("Statistics retrieved successfully")
except Exception as e:
raise ProcessingError(f"Failed to get statistics: {e}") from e
def run_integrity_check( dedup: ClaudeConversationDeduplicator, verbose: bool = False ) -> None: """ Run integrity check on all conversations.
Args:
dedup: Deduplicator instance
verbose: Verbose output
Raises:
ProcessingError: If integrity check fails critically
"""
try:
print_header("Integrity Check")
# Get all conversation IDs from watermarks
watermarks = dedup.watermarks
if not watermarks:
print_warning("No conversations found")
return
print_info(f"Checking {len(watermarks)} conversations...")
all_valid = True
for conv_id in watermarks:
try:
is_valid, issues = dedup.validate_integrity(conv_id)
if is_valid:
if verbose:
print_success(f"{conv_id}: Valid")
else:
print_warning(f"{conv_id}: Issues found")
for issue in issues:
print(f" - {issue}")
all_valid = False
except Exception as e:
print_error(f"{conv_id}: Check failed - {e}")
all_valid = False
if all_valid:
print_success(f"All {len(watermarks)} conversations passed integrity check")
else:
print_warning("Some conversations have integrity issues (see above)")
except Exception as e:
raise ProcessingError(f"Integrity check failed: {e}") from e
def main(): """ Main CLI entry point with comprehensive error handling.
Returns:
Exit code (0=success, 1=error, 130=interrupted)
"""
parser = argparse.ArgumentParser(
description="CODITECT Conversation Export Deduplicator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
Process single file
%(prog)s --file export.json --session-id my-session
Auto-detect session ID from filename
%(prog)s --file 2025-11-17-EXPORT-ROLLOUT-MASTER.txt
Batch process directory
%(prog)s --batch MEMORY-CONTEXT/exports/
Show statistics
%(prog)s --stats --session-id my-session
Run integrity check
%(prog)s --integrity
Dry run (preview without changes)
%(prog)s --file export.json --dry-run
For more information, see: DEVELOPMENT-SETUP.md """, )
# Mode selection (mutually exclusive)
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument(
"--file", "-f", type=Path, help="Process single export file"
)
mode_group.add_argument(
"--batch", "-b", type=Path, help="Process all files in directory"
)
mode_group.add_argument(
"--stats", action="store_true", help="Show statistics for a session"
)
mode_group.add_argument(
"--integrity", action="store_true", help="Run integrity check on all conversations"
)
# Common options
parser.add_argument(
"--session-id",
"-s",
help="Session identifier (auto-detected if not provided)",
)
parser.add_argument(
"--storage-dir",
"-d",
type=Path,
default=Path("../../MEMORY-CONTEXT/dedup_state"),
help="Storage directory for deduplication state (default: ../../MEMORY-CONTEXT/dedup_state)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without modifying storage",
)
parser.add_argument(
"--verbose", "-v", action="store_true", help="Verbose output"
)
parser.add_argument(
"--quiet", "-q", action="store_true", help="Minimal output (errors only)"
)
parser.add_argument(
"--no-color", action="store_true", help="Disable colored output"
)
parser.add_argument(
"--output",
"-o",
type=Path,
help="Write results to JSON file",
)
args = parser.parse_args()
# Initialize graceful exit handler
graceful_exit = GracefulExit()
# Configure colors
if args.no_color or not sys.stdout.isatty():
Colors.disable()
# Setup logging
repo_root = Path(__file__).resolve().parent.parent.parent.parent.parent
log_dir = repo_root / "MEMORY-CONTEXT" / "logs"
logger = setup_logging(log_dir, args.verbose)
try:
# Initialize deduplicator
try:
dedup = ClaudeConversationDeduplicator(storage_dir=args.storage_dir)
if args.verbose:
print_success(f"Deduplicator initialized: {args.storage_dir}")
except Exception as e:
raise ProcessingError(f"Failed to initialize deduplicator: {e}") from e
# Execute requested mode
result = None
if args.file:
# Single file mode
if not args.file.exists():
raise SourceFileError(f"File not found: {args.file}")
print_header("Single File Processing")
result = process_single_file(
args.file, args.session_id, dedup, args.dry_run, args.verbose, logger, graceful_exit
)
if args.output:
atomic_write(args.output, json.dumps(result, indent=2), logger)
print_success(f"Results written to {args.output}")
elif args.batch:
# Batch directory mode
if not args.batch.is_dir():
raise SourceFileError(f"Directory not found: {args.batch}")
results = process_batch(args.batch, dedup, args.dry_run, args.verbose, logger, graceful_exit)
if args.output:
atomic_write(args.output, json.dumps(results, indent=2), logger)
print_success(f"Results written to {args.output}")
elif args.stats:
# Statistics mode
if not args.session_id:
raise ValueError("--session-id required for --stats mode")
show_statistics(args.session_id, dedup)
elif args.integrity:
# Integrity check mode
run_integrity_check(dedup, args.verbose)
return 0
except KeyboardInterrupt:
print_warning("\n\nOperation cancelled by user")
return 130
except SourceFileError as e:
print_error(f"Source file error: {e}")
logger.info("\nSuggestion: Check file paths and permissions")
return 1
except HashCollisionError as e:
print_error(f"Hash collision detected: {e}")
logger.info("\nSuggestion: This is rare - check for data corruption")
return 1
except ProcessingError as e:
print_error(f"Processing error: {e}")
logger.info("\nSuggestion: Check dedup_state directory integrity")
return 1
except BackupError as e:
print_error(f"Backup error: {e}")
logger.info("\nSuggestion: Check disk space and permissions")
return 1
except OutputError as e:
print_error(f"Output error: {e}")
logger.info("\nSuggestion: Check write permissions and disk space")
return 1
except DataIntegrityError as e:
print_error(f"Data integrity error: {e}")
logger.info("\nSuggestion: Data may be corrupted - restore from backup")
return 1
except Exception as e:
print_error(f"Unexpected error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
logger.info("\nSuggestion: Re-run with --verbose for details")
return 1
finally:
logger.debug("Cleanup complete")
if name == "main": sys.exit(main())