Skip to main content

scripts-deduplicate-export

#!/usr/bin/env python3 """

title: "Add core scripts to path" component_type: script version: "1.0.0" audience: contributor status: stable summary: "CODITECT Conversation Export Deduplicator - CLI Tool" keywords: ['deduplicate', 'export', 'review', 'validation'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "deduplicate_export.py" language: python executable: true usage: "python3 scripts/deduplicate_export.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

CODITECT Conversation Export Deduplicator - CLI Tool

User-friendly command-line interface for conversation export deduplication. Supports single file, batch directory processing, statistics, and integrity checks.

PRODUCTION-GRADE ERROR HANDLING:

  • Custom exception hierarchy (7 exceptions)
  • Dual logging (file + stdout)
  • Atomic file operations (temp → rename pattern)
  • Backup before modifications
  • Data integrity verification (checksums)
  • Hash collision detection
  • Resource cleanup with finally blocks
  • Input validation
  • Standardized exit codes (0/1/130)
  • User-friendly error messages

Usage: deduplicate-export --file export.json --session-id my-session deduplicate-export --batch MEMORY-CONTEXT/exports/ deduplicate-export --stats --session-id my-session deduplicate-export --integrity --storage-dir MEMORY-CONTEXT/dedup_state

Author: Claude + AZ1.AI License: MIT """

import argparse import json import sys import logging import hashlib import shutil import tempfile import signal from pathlib import Path from typing import List, Optional, Dict, Any from datetime import datetime

Add core scripts to path

sys.path.insert(0, str(Path(file).parent))

from core.conversation_deduplicator import ( ClaudeConversationDeduplicator, parse_claude_export_file, extract_session_id_from_filename as extract_session_id_core )

============================================================================

CUSTOM EXCEPTIONS

============================================================================

class DedupError(Exception): """Base exception for deduplication operations""" pass

class SourceFileError(DedupError): """Export file not found or unreadable""" pass

class HashCollisionError(DedupError): """Hash collision detected during deduplication""" pass

class ProcessingError(DedupError): """Deduplication processing failure""" pass

class BackupError(DedupError): """Backup creation or restoration failure""" pass

class OutputError(DedupError): """Output file write failure""" pass

class DataIntegrityError(DedupError): """Data integrity verification failure""" pass

============================================================================

LOGGING CONFIGURATION

============================================================================

def setup_logging(log_dir: Path, verbose: bool = False) -> logging.Logger: """ Configure dual logging (file + stdout).

Args:
log_dir: Directory for log files
verbose: Enable verbose logging

Returns:
Configured logger instance
"""
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"deduplicate-export-{datetime.now().strftime('%Y%m%d-%H%M%S')}.log"

logger = logging.getLogger("deduplicate_export")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)

# File handler - detailed logs
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)

# Console handler - user-friendly output
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG if verbose else logging.INFO)
console_formatter = logging.Formatter("%(message)s")
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)

return logger

============================================================================

SIGNAL HANDLING

============================================================================

class GracefulExit: """Handle graceful shutdown on SIGINT/SIGTERM"""

def __init__(self):
self.exit_requested = False
signal.signal(signal.SIGINT, self.request_exit)
signal.signal(signal.SIGTERM, self.request_exit)

def request_exit(self, signum, frame):
"""Set exit flag on signal"""
self.exit_requested = True
print("\n\n⚠️ Interrupt received. Cleaning up...")

============================================================================

UTILITY FUNCTIONS

============================================================================

def compute_file_checksum(filepath: Path) -> str: """ Compute SHA-256 checksum of file.

Args:
filepath: File to checksum

Returns:
Hex digest of SHA-256 hash

Raises:
DataIntegrityError: If checksum fails
"""
try:
sha256 = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
except Exception as e:
raise DataIntegrityError(f"Failed to compute checksum for {filepath}: {e}") from e

def create_backup(filepath: Path, logger: logging.Logger) -> Optional[Path]: """ Create timestamped backup of file.

Args:
filepath: File to backup
logger: Logger instance

Returns:
Path to backup file, or None if file doesn't exist

Raises:
BackupError: If backup creation fails
"""
try:
if not filepath.exists():
logger.debug(f"No existing file to backup: {filepath}")
return None

backup_path = filepath.parent / f"{filepath.name}.backup-{datetime.now().strftime('%Y%m%d-%H%M%S')}"

# Copy file with metadata
shutil.copy2(filepath, backup_path)

# Verify backup integrity
original_checksum = compute_file_checksum(filepath)
backup_checksum = compute_file_checksum(backup_path)

if original_checksum != backup_checksum:
raise BackupError(f"Backup checksum mismatch for {filepath}")

logger.debug(f"Created backup: {backup_path} (verified)")
return backup_path

except BackupError:
raise
except Exception as e:
raise BackupError(f"Failed to create backup of {filepath}: {e}") from e

def atomic_write(filepath: Path, content: str, logger: logging.Logger) -> None: """ Atomically write content to file using temp + rename.

Args:
filepath: Target file path
content: Content to write
logger: Logger instance

Raises:
OutputError: If write fails
"""
temp_file = None
try:
# Ensure parent directory exists
filepath.parent.mkdir(parents=True, exist_ok=True)

# Create temp file in same directory for atomic rename
import os
temp_fd, temp_path = tempfile.mkstemp(
dir=filepath.parent,
prefix=f".{filepath.name}.tmp-",
text=True
)
temp_file = Path(temp_path)

# Write to temp file
with os.fdopen(temp_fd, 'w', encoding='utf-8') as f:
f.write(content)

# Verify write succeeded
if not temp_file.exists() or temp_file.stat().st_size == 0:
raise OutputError(f"Temp file write verification failed: {temp_file}")

# Atomic rename
temp_file.rename(filepath)
logger.debug(f"Atomically wrote: {filepath}")

except Exception as e:
# Clean up temp file on failure
if temp_file and temp_file.exists():
try:
temp_file.unlink()
except:
pass
raise OutputError(f"Failed to write {filepath}: {e}") from e

def verify_data_integrity( original_file: Path, processed_file: Path, logger: logging.Logger ) -> None: """ Verify data integrity after processing.

Args:
original_file: Original file
processed_file: Processed file to verify
logger: Logger instance

Raises:
DataIntegrityError: If integrity check fails
"""
try:
# Verify processed file exists and is readable
if not processed_file.exists():
raise DataIntegrityError(f"Processed file disappeared: {processed_file}")

if processed_file.stat().st_size == 0:
raise DataIntegrityError(f"Processed file is empty: {processed_file}")

# Compute checksums
original_checksum = compute_file_checksum(original_file)
processed_checksum = compute_file_checksum(processed_file)

# Note: For dedup, checksums WILL differ (that's expected!)
# We just verify both files are valid and non-corrupt
logger.debug(f"Original checksum: {original_checksum}")
logger.debug(f"Processed checksum: {processed_checksum}")
logger.debug(f"Data integrity verified: {processed_file}")

except DataIntegrityError:
raise
except Exception as e:
raise DataIntegrityError(f"Integrity verification failed: {e}") from e

============================================================================

COLOR OUTPUT

============================================================================

Shared Colors module (consolidates 36 duplicate definitions)

from colors import Colors

def print_header(text: str): """Print colored header""" print(f"\n{Colors.BOLD}{Colors.HEADER}{text}{Colors.ENDC}") print(f"{Colors.HEADER}{'=' * len(text)}{Colors.ENDC}\n")

def print_success(text: str): """Print success message""" print(f"{Colors.OKGREEN}✅ {text}{Colors.ENDC}")

def print_error(text: str): """Print error message""" print(f"{Colors.FAIL}❌ {text}{Colors.ENDC}")

def print_warning(text: str): """Print warning message""" print(f"{Colors.WARNING}⚠️ {text}{Colors.ENDC}")

def print_info(text: str): """Print info message""" print(f"{Colors.OKCYAN}ℹ️ {text}{Colors.ENDC}")

============================================================================

CORE FUNCTIONS

============================================================================

def extract_session_id_from_filename(filepath: Path) -> str: """ Extract session ID from export filename.

Wrapper around core function for consistency.

Args:
filepath: Export file path

Returns:
Extracted session ID
"""
return extract_session_id_core(filepath)

def parse_export_file(filepath: Path, logger: logging.Logger) -> Dict[str, Any]: """ Parse export file and convert to standard format.

Supports:
- JSON format (if already structured)
- Plain text format (Claude Code /export output)

Args:
filepath: Export file path
logger: Logger instance

Returns:
Parsed export data

Raises:
SourceFileError: If parsing fails
"""
try:
if not filepath.exists():
raise SourceFileError(f"Export file not found: {filepath}")

if not filepath.is_file():
raise SourceFileError(f"Not a regular file: {filepath}")

if filepath.stat().st_size == 0:
raise SourceFileError(f"Export file is empty: {filepath}")

with open(filepath, "r") as f:
content = f.read()

# Try parsing as JSON first
try:
data = json.loads(content)
if "messages" in data:
logger.debug(f"Parsed as JSON: {filepath}")
return data
except json.JSONDecodeError:
pass

# Plain text format - use proper Claude export parser
logger.debug(f"Parsing as Claude export format: {filepath}")
return parse_claude_export_file(filepath)

except SourceFileError:
raise
except Exception as e:
raise SourceFileError(f"Failed to parse export file {filepath}: {e}") from e

def process_single_file( filepath: Path, session_id: Optional[str], dedup: ClaudeConversationDeduplicator, dry_run: bool = False, verbose: bool = False, logger: logging.Logger = None, graceful_exit: GracefulExit = None ) -> Dict[str, Any]: """ Process a single export file with comprehensive error handling.

Args:
filepath: Export file to process
session_id: Session identifier (auto-detected if None)
dedup: Deduplicator instance
dry_run: Preview mode (no changes)
verbose: Verbose output
logger: Logger instance
graceful_exit: Graceful exit handler

Returns:
Processing result dictionary

Raises:
Various DedupError subclasses on failure
"""
backup_file = None

try:
if verbose:
print_info(f"Processing: {filepath}")

# Check for interrupt
if graceful_exit and graceful_exit.exit_requested:
raise KeyboardInterrupt()

# Auto-detect session ID if not provided
if not session_id:
session_id = extract_session_id_from_filename(filepath)
if verbose:
print_info(f"Auto-detected session ID: {session_id}")

# Create backup before processing (unless dry run)
if not dry_run:
backup_file = create_backup(filepath, logger)

# Parse export file
export_data = parse_export_file(filepath, logger)

# Process with deduplicator
try:
if dry_run:
new_messages, stats = dedup.process_export(session_id, export_data, dry_run=True)
else:
new_messages, stats = dedup.process_export(session_id, export_data)
except Exception as e:
# Check for hash collision
if "collision" in str(e).lower() or "hash" in str(e).lower():
raise HashCollisionError(f"Hash collision in {filepath.name}: {e}") from e
raise ProcessingError(f"Deduplication failed for {filepath.name}: {e}") from e

result = {
"success": True,
"session_id": session_id,
"file": str(filepath),
"total_messages": stats["messages_in_export"],
"new_messages": stats["new_messages"],
"duplicates_filtered": stats["duplicates_filtered"],
"content_collisions": stats["content_collisions"],
"deduplication_rate": (
(stats["duplicates_filtered"] / stats["messages_in_export"] * 100)
if stats["messages_in_export"] > 0
else 0
),
}

if verbose or not dry_run:
print_success(f"Processed {filepath.name}")
print(f" Session: {session_id}")
print(
f" Total: {stats['messages_in_export']} | "
f"New: {stats['new_messages']} | "
f"Duplicates: {stats['duplicates_filtered']}"
)
if stats["messages_in_export"] > 0:
print(f" Deduplication: {result['deduplication_rate']:.1f}%")

# Clean up backup on success (unless dry run)
if backup_file and backup_file.exists() and not dry_run:
try:
backup_file.unlink()
logger.debug(f"Cleaned up backup: {backup_file}")
except Exception as e:
logger.warning(f"Failed to clean up backup: {e}")

return result

except (SourceFileError, HashCollisionError, ProcessingError, BackupError):
# Restore from backup if available
if backup_file and backup_file.exists():
try:
shutil.copy2(backup_file, filepath)
logger.info(f"Restored from backup: {filepath}")
except Exception as restore_error:
logger.error(f"Failed to restore from backup: {restore_error}")
raise

except Exception as e:
# Restore from backup if available
if backup_file and backup_file.exists():
try:
shutil.copy2(backup_file, filepath)
logger.info(f"Restored from backup: {filepath}")
except Exception as restore_error:
logger.error(f"Failed to restore from backup: {restore_error}")

print_error(f"Processing failed: {e}")
return {"success": False, "error": str(e)}

def process_batch( directory: Path, dedup: ClaudeConversationDeduplicator, dry_run: bool = False, verbose: bool = False, logger: logging.Logger = None, graceful_exit: GracefulExit = None ) -> List[Dict[str, Any]]: """ Process all export files in a directory.

Args:
directory: Directory containing export files
dedup: Deduplicator instance
dry_run: Preview mode
verbose: Verbose output
logger: Logger instance
graceful_exit: Graceful exit handler

Returns:
List of processing results

Raises:
SourceFileError: If directory invalid
"""
try:
if not directory.exists():
raise SourceFileError(f"Directory not found: {directory}")

if not directory.is_dir():
raise SourceFileError(f"Not a directory: {directory}")

print_header(f"Batch Processing: {directory}")

# Find all export files
patterns = ["*.txt", "*.json", "*.md"]
export_files = []

for pattern in patterns:
export_files.extend(directory.glob(pattern))

if not export_files:
print_warning(f"No export files found in {directory}")
return []

print_info(f"Found {len(export_files)} files to process")

results = []
for filepath in sorted(export_files):
# Check for interrupt
if graceful_exit and graceful_exit.exit_requested:
logger.warning("Batch processing interrupted by user")
break

result = process_single_file(
filepath, None, dedup, dry_run, verbose, logger, graceful_exit
)
results.append(result)

# Summary
print_header("Batch Processing Summary")

successful = sum(1 for r in results if r.get("success"))
failed = len(results) - successful

print(f"Total files: {len(results)}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")

if successful > 0:
total_msgs = sum(r.get("total_messages", 0) for r in results if r.get("success"))
total_new = sum(r.get("new_messages", 0) for r in results if r.get("success"))
total_dup = total_msgs - total_new

print(f"\nTotal messages: {total_msgs}")
print(f"New messages: {total_new}")
print(f"Duplicates filtered: {total_dup}")

if total_msgs > 0:
print(f"Overall deduplication rate: {total_dup / total_msgs * 100:.1f}%")

return results

except SourceFileError:
raise
except Exception as e:
raise ProcessingError(f"Batch processing failed: {e}") from e

def show_statistics( session_id: str, dedup: ClaudeConversationDeduplicator ) -> None: """ Display statistics for a session.

Args:
session_id: Session identifier
dedup: Deduplicator instance

Raises:
ProcessingError: If statistics retrieval fails
"""
try:
print_header(f"Statistics: {session_id}")

stats = dedup.get_statistics(session_id)

print(f"Session ID: {session_id}")
print(f"Watermark: {stats.get('watermark', 'N/A')}")
print(f"Unique messages: {stats.get('unique_messages', 'N/A')}")
print(f"Total messages: {stats.get('total_messages', 'N/A')}")

# Get full conversation for additional stats
try:
messages = dedup.get_full_conversation(session_id)
print(f"Reconstructable messages: {len(messages)}")
except Exception as e:
print_warning(f"Could not reconstruct conversation: {e}")

print_success("Statistics retrieved successfully")

except Exception as e:
raise ProcessingError(f"Failed to get statistics: {e}") from e

def run_integrity_check( dedup: ClaudeConversationDeduplicator, verbose: bool = False ) -> None: """ Run integrity check on all conversations.

Args:
dedup: Deduplicator instance
verbose: Verbose output

Raises:
ProcessingError: If integrity check fails critically
"""
try:
print_header("Integrity Check")

# Get all conversation IDs from watermarks
watermarks = dedup.watermarks

if not watermarks:
print_warning("No conversations found")
return

print_info(f"Checking {len(watermarks)} conversations...")

all_valid = True
for conv_id in watermarks:
try:
is_valid, issues = dedup.validate_integrity(conv_id)

if is_valid:
if verbose:
print_success(f"{conv_id}: Valid")
else:
print_warning(f"{conv_id}: Issues found")
for issue in issues:
print(f" - {issue}")
all_valid = False

except Exception as e:
print_error(f"{conv_id}: Check failed - {e}")
all_valid = False

if all_valid:
print_success(f"All {len(watermarks)} conversations passed integrity check")
else:
print_warning("Some conversations have integrity issues (see above)")

except Exception as e:
raise ProcessingError(f"Integrity check failed: {e}") from e

def main(): """ Main CLI entry point with comprehensive error handling.

Returns:
Exit code (0=success, 1=error, 130=interrupted)
"""

parser = argparse.ArgumentParser(
description="CODITECT Conversation Export Deduplicator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""

Examples:

Process single file

%(prog)s --file export.json --session-id my-session

Auto-detect session ID from filename

%(prog)s --file 2025-11-17-EXPORT-ROLLOUT-MASTER.txt

Batch process directory

%(prog)s --batch MEMORY-CONTEXT/exports/

Show statistics

%(prog)s --stats --session-id my-session

Run integrity check

%(prog)s --integrity

Dry run (preview without changes)

%(prog)s --file export.json --dry-run

For more information, see: DEVELOPMENT-SETUP.md """, )

# Mode selection (mutually exclusive)
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument(
"--file", "-f", type=Path, help="Process single export file"
)
mode_group.add_argument(
"--batch", "-b", type=Path, help="Process all files in directory"
)
mode_group.add_argument(
"--stats", action="store_true", help="Show statistics for a session"
)
mode_group.add_argument(
"--integrity", action="store_true", help="Run integrity check on all conversations"
)

# Common options
parser.add_argument(
"--session-id",
"-s",
help="Session identifier (auto-detected if not provided)",
)
parser.add_argument(
"--storage-dir",
"-d",
type=Path,
default=Path("../../MEMORY-CONTEXT/dedup_state"),
help="Storage directory for deduplication state (default: ../../MEMORY-CONTEXT/dedup_state)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without modifying storage",
)
parser.add_argument(
"--verbose", "-v", action="store_true", help="Verbose output"
)
parser.add_argument(
"--quiet", "-q", action="store_true", help="Minimal output (errors only)"
)
parser.add_argument(
"--no-color", action="store_true", help="Disable colored output"
)
parser.add_argument(
"--output",
"-o",
type=Path,
help="Write results to JSON file",
)

args = parser.parse_args()

# Initialize graceful exit handler
graceful_exit = GracefulExit()

# Configure colors
if args.no_color or not sys.stdout.isatty():
Colors.disable()

# Setup logging
repo_root = Path(__file__).resolve().parent.parent.parent.parent.parent
log_dir = repo_root / "MEMORY-CONTEXT" / "logs"
logger = setup_logging(log_dir, args.verbose)

try:
# Initialize deduplicator
try:
dedup = ClaudeConversationDeduplicator(storage_dir=args.storage_dir)
if args.verbose:
print_success(f"Deduplicator initialized: {args.storage_dir}")
except Exception as e:
raise ProcessingError(f"Failed to initialize deduplicator: {e}") from e

# Execute requested mode
result = None

if args.file:
# Single file mode
if not args.file.exists():
raise SourceFileError(f"File not found: {args.file}")

print_header("Single File Processing")
result = process_single_file(
args.file, args.session_id, dedup, args.dry_run, args.verbose, logger, graceful_exit
)

if args.output:
atomic_write(args.output, json.dumps(result, indent=2), logger)
print_success(f"Results written to {args.output}")

elif args.batch:
# Batch directory mode
if not args.batch.is_dir():
raise SourceFileError(f"Directory not found: {args.batch}")

results = process_batch(args.batch, dedup, args.dry_run, args.verbose, logger, graceful_exit)

if args.output:
atomic_write(args.output, json.dumps(results, indent=2), logger)
print_success(f"Results written to {args.output}")

elif args.stats:
# Statistics mode
if not args.session_id:
raise ValueError("--session-id required for --stats mode")

show_statistics(args.session_id, dedup)

elif args.integrity:
# Integrity check mode
run_integrity_check(dedup, args.verbose)

return 0

except KeyboardInterrupt:
print_warning("\n\nOperation cancelled by user")
return 130

except SourceFileError as e:
print_error(f"Source file error: {e}")
logger.info("\nSuggestion: Check file paths and permissions")
return 1

except HashCollisionError as e:
print_error(f"Hash collision detected: {e}")
logger.info("\nSuggestion: This is rare - check for data corruption")
return 1

except ProcessingError as e:
print_error(f"Processing error: {e}")
logger.info("\nSuggestion: Check dedup_state directory integrity")
return 1

except BackupError as e:
print_error(f"Backup error: {e}")
logger.info("\nSuggestion: Check disk space and permissions")
return 1

except OutputError as e:
print_error(f"Output error: {e}")
logger.info("\nSuggestion: Check write permissions and disk space")
return 1

except DataIntegrityError as e:
print_error(f"Data integrity error: {e}")
logger.info("\nSuggestion: Data may be corrupted - restore from backup")
return 1

except Exception as e:
print_error(f"Unexpected error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
logger.info("\nSuggestion: Re-run with --verbose for details")
return 1

finally:
logger.debug("Cleanup complete")

if name == "main": sys.exit(main())