Skip to main content

scripts-jsonl-session-processor

#!/usr/bin/env python3 """ JSONL Session Processor with Deduplication and Watermarking

Purpose:​

Process large Claude Code JSONL session files (>10MB) with:

  • Read-only access (original files never modified)
  • Streaming chunk processing (memory efficient)
  • SHA-256 deduplication against global hash pool
  • Watermark tracking for resume capability
  • Complete statistics and provenance tracking

Safety Guarantees:​

  1. Source files are READ-ONLY (no modifications ever)
  2. All processing uses streaming (no full file load)
  3. Watermarks enable resume after failures
  4. All output goes to MEMORY-CONTEXT/dedup_state/

Output Structure:​

MEMORY-CONTEXT/dedup_state/ ├── global_hashes.json (global dedup hash pool) ├── unique_messages.jsonl (append-only unique messages) ├── session_watermarks.json (resume tracking) └── processing_logs/ (detailed execution logs)

Author: Claude + AZ1.AI License: MIT """

import json import hashlib import logging import sys from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Set, Tuple, Any from dataclasses import dataclass, asdict

Setup logging

logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(name)

@dataclass class SessionWatermark: """Track processing progress for resume capability""" session_id: str session_path: str total_lines: int processed_lines: int processed_messages: int new_unique_messages: int duplicate_messages: int last_processed_timestamp: str status: str # 'in_progress', 'completed', 'failed' error_message: Optional[str] = None

@dataclass class ProcessingStats: """Processing statistics for a session""" session_id: str session_path: str file_size_mb: float total_lines: int total_messages: int user_messages: int assistant_messages: int new_unique_messages: int duplicate_messages: int deduplication_rate: float processing_time_seconds: float status: str

class JSONLSessionProcessor: """ Process JSONL session files with deduplication and watermarking.

Read-only operations on source files with streaming chunk processing.
"""

def __init__(self, memory_context_dir: Optional[Path] = None):
"""
Initialize processor.

Args:
memory_context_dir: Path to MEMORY-CONTEXT directory
"""
if memory_context_dir is None:
memory_context_dir = Path.cwd() / "MEMORY-CONTEXT"

self.memory_context = Path(memory_context_dir)
self.dedup_dir = self.memory_context / "dedup_state"
self.logs_dir = self.dedup_dir / "processing_logs"

# Create directories
self.dedup_dir.mkdir(parents=True, exist_ok=True)
self.logs_dir.mkdir(parents=True, exist_ok=True)

# State files
self.global_hashes_file = self.dedup_dir / "global_hashes.json"
self.unique_messages_file = self.dedup_dir / "unique_messages.jsonl"
self.watermarks_file = self.dedup_dir / "session_watermarks.json"

# Load state
self.global_hashes = self._load_global_hashes()
self.watermarks = self._load_watermarks()

logger.info(f"Initialized processor with {len(self.global_hashes)} existing unique hashes")

def _load_global_hashes(self) -> Set[str]:
"""Load global hash pool"""
if not self.global_hashes_file.exists():
logger.info("No existing global hashes found, starting fresh")
return set()

try:
with open(self.global_hashes_file, 'r', encoding='utf-8') as f:
data = json.load(f)

if isinstance(data, list):
return set(data)
elif isinstance(data, dict):
return set(data.get("hashes", []))
else:
logger.warning("Unexpected hash file format, starting fresh")
return set()

except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in global hashes: {e}")
return set()
except Exception as e:
logger.error(f"Failed to load global hashes: {e}")
return set()

def _save_global_hashes(self):
"""Save global hash pool atomically"""
temp_file = self.global_hashes_file.with_suffix('.json.tmp')

try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump({
"updated": datetime.now().isoformat(),
"total_unique": len(self.global_hashes),
"hashes": sorted(list(self.global_hashes))
}, f, indent=2)

temp_file.replace(self.global_hashes_file)
logger.debug(f"Saved {len(self.global_hashes)} global hashes")

except Exception as e:
logger.error(f"Failed to save global hashes: {e}")
if temp_file.exists():
temp_file.unlink()
raise

def _load_watermarks(self) -> Dict[str, SessionWatermark]:
"""Load session watermarks"""
if not self.watermarks_file.exists():
return {}

try:
with open(self.watermarks_file, 'r', encoding='utf-8') as f:
data = json.load(f)

return {
k: SessionWatermark(**v)
for k, v in data.items()
}

except Exception as e:
logger.error(f"Failed to load watermarks: {e}")
return {}

def _save_watermarks(self):
"""Save session watermarks atomically"""
temp_file = self.watermarks_file.with_suffix('.json.tmp')

try:
watermark_dict = {
session_id: asdict(watermark)
for session_id, watermark in self.watermarks.items()
}

with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(watermark_dict, f, indent=2)

temp_file.replace(self.watermarks_file)
logger.debug(f"Saved watermarks for {len(self.watermarks)} sessions")

except Exception as e:
logger.error(f"Failed to save watermarks: {e}")
if temp_file.exists():
temp_file.unlink()
raise

def _compute_hash(self, content: str) -> str:
"""Compute SHA-256 hash of content"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()

def _extract_session_id(self, session_path: Path) -> str:
"""Extract session ID from filename"""
# Session files are named like: SESSION_ID.jsonl
return session_path.stem

def _extract_message_content(self, entry: Dict[str, Any]) -> Optional[str]:
"""
Extract message content from JSONL entry.

Returns normalized message content or None if not a message.
"""
entry_type = entry.get('type', '')

# User message
if entry_type == 'user':
message = entry.get('message', {})
content = message.get('content', '')

if isinstance(content, str):
return content
elif isinstance(content, list):
# Multi-part content - concatenate text blocks
text_parts = [
block.get('text', '')
for block in content
if isinstance(block, dict) and block.get('type') == 'text'
]
return ' '.join(text_parts)

# Assistant message
elif entry_type == 'assistant':
message = entry.get('message', {})
content = message.get('content', [])

if isinstance(content, list):
# Extract text blocks (ignore tool_use blocks for dedup)
text_parts = [
block.get('text', '')
for block in content
if isinstance(block, dict) and block.get('type') == 'text'
]
return ' '.join(text_parts)

return None

def process_session(
self,
session_file: Path,
chunk_size: int = 1000,
resume: bool = True
) -> ProcessingStats:
"""
Process a single JSONL session file.

Args:
session_file: Path to JSONL session file
chunk_size: Number of lines per processing chunk
resume: Resume from watermark if exists

Returns:
ProcessingStats with processing results
"""
session_id = self._extract_session_id(session_file)
start_time = datetime.now()

logger.info(f"\n{'='*70}")
logger.info(f"Processing session: {session_id}")
logger.info(f"File: {session_file}")
logger.info(f"Size: {session_file.stat().st_size / (1024*1024):.2f} MB")
logger.info(f"{'='*70}\n")

# Check for existing watermark
watermark = None
start_line = 1

if resume and session_id in self.watermarks:
watermark = self.watermarks[session_id]

if watermark.status == 'completed':
logger.info(f"Session already processed (status: completed)")
logger.info(f"Skipping. To reprocess, remove watermark first.")

# Return existing stats
return ProcessingStats(
session_id=session_id,
session_path=str(session_file),
file_size_mb=session_file.stat().st_size / (1024*1024),
total_lines=watermark.total_lines,
total_messages=watermark.processed_messages,
user_messages=0, # Not tracked in watermark
assistant_messages=0,
new_unique_messages=watermark.new_unique_messages,
duplicate_messages=watermark.duplicate_messages,
deduplication_rate=(
watermark.duplicate_messages / watermark.processed_messages * 100
if watermark.processed_messages > 0 else 0
),
processing_time_seconds=0,
status='already_completed'
)

elif watermark.status == 'in_progress':
start_line = watermark.processed_lines + 1
logger.info(f"Resuming from line {start_line} (progress: {watermark.processed_lines}/{watermark.total_lines})")

# Initialize watermark
if watermark is None:
watermark = SessionWatermark(
session_id=session_id,
session_path=str(session_file),
total_lines=0,
processed_lines=0,
processed_messages=0,
new_unique_messages=0,
duplicate_messages=0,
last_processed_timestamp=datetime.now().isoformat(),
status='in_progress'
)

# Processing counters
total_lines = 0
processed_lines = 0
user_messages = 0
assistant_messages = 0
new_unique = 0
duplicates = 0

# Process file in streaming chunks
try:
with open(session_file, 'r', encoding='utf-8') as f:
current_chunk = []

for line_num, line in enumerate(f, 1):
total_lines = line_num

# Skip lines before resume point
if line_num < start_line:
continue

current_chunk.append((line_num, line))

# Process chunk when full
if len(current_chunk) >= chunk_size:
chunk_stats = self._process_chunk(
current_chunk,
session_id,
session_file
)

processed_lines += len(current_chunk)
user_messages += chunk_stats['user_messages']
assistant_messages += chunk_stats['assistant_messages']
new_unique += chunk_stats['new_unique']
duplicates += chunk_stats['duplicates']

# Update watermark
watermark.processed_lines = processed_lines
watermark.processed_messages = user_messages + assistant_messages
watermark.new_unique_messages = new_unique
watermark.duplicate_messages = duplicates
watermark.last_processed_timestamp = datetime.now().isoformat()

self.watermarks[session_id] = watermark
self._save_watermarks()

# Log progress
progress_pct = (processed_lines / total_lines * 100) if total_lines > 0 else 0
logger.info(
f"Progress: {processed_lines:,}/{total_lines:,} lines ({progress_pct:.1f}%) | "
f"Messages: {user_messages + assistant_messages:,} | "
f"New unique: {new_unique:,} | "
f"Duplicates: {duplicates:,}"
)

current_chunk = []

# Process remaining lines
if current_chunk:
chunk_stats = self._process_chunk(
current_chunk,
session_id,
session_file
)

processed_lines += len(current_chunk)
user_messages += chunk_stats['user_messages']
assistant_messages += chunk_stats['assistant_messages']
new_unique += chunk_stats['new_unique']
duplicates += chunk_stats['duplicates']

# Update final watermark
watermark.total_lines = total_lines
watermark.processed_lines = total_lines
watermark.processed_messages = user_messages + assistant_messages
watermark.new_unique_messages = new_unique
watermark.duplicate_messages = duplicates
watermark.last_processed_timestamp = datetime.now().isoformat()
watermark.status = 'completed'

self.watermarks[session_id] = watermark
self._save_watermarks()

# Save updated global hashes
self._save_global_hashes()

# Calculate stats
end_time = datetime.now()
processing_time = (end_time - start_time).total_seconds()

total_messages = user_messages + assistant_messages
dedup_rate = (duplicates / total_messages * 100) if total_messages > 0 else 0

stats = ProcessingStats(
session_id=session_id,
session_path=str(session_file),
file_size_mb=session_file.stat().st_size / (1024*1024),
total_lines=total_lines,
total_messages=total_messages,
user_messages=user_messages,
assistant_messages=assistant_messages,
new_unique_messages=new_unique,
duplicate_messages=duplicates,
deduplication_rate=dedup_rate,
processing_time_seconds=processing_time,
status='completed'
)

# Log summary
logger.info(f"\n{'='*70}")
logger.info(f"Session processing complete: {session_id}")
logger.info(f"{'='*70}")
logger.info(f"Total lines: {total_lines:,}")
logger.info(f"Total messages: {total_messages:,} (user: {user_messages:,}, assistant: {assistant_messages:,})")
logger.info(f"New unique messages: {new_unique:,}")
logger.info(f"Duplicate messages: {duplicates:,}")
logger.info(f"Deduplication rate: {dedup_rate:.1f}%")
logger.info(f"Processing time: {processing_time:.1f}s")
logger.info(f"Global unique count: {len(self.global_hashes):,}")
logger.info(f"{'='*70}\n")

return stats

except Exception as e:
logger.error(f"Error processing session {session_id}: {e}")

# Update watermark with error
watermark.status = 'failed'
watermark.error_message = str(e)
self.watermarks[session_id] = watermark
self._save_watermarks()

raise

def _process_chunk(
self,
chunk: List[Tuple[int, str]],
session_id: str,
session_file: Path
) -> Dict[str, int]:
"""
Process a chunk of JSONL lines.

Args:
chunk: List of (line_number, line_content) tuples
session_id: Session identifier
session_file: Path to session file

Returns:
Dict with chunk statistics
"""
user_messages = 0
assistant_messages = 0
new_unique = 0
duplicates = 0

# Open unique messages file in append mode
with open(self.unique_messages_file, 'a', encoding='utf-8') as unique_file:
for line_num, line in chunk:
try:
entry = json.loads(line.strip())
entry_type = entry.get('type', '')

# Count message types
if entry_type == 'user':
user_messages += 1
elif entry_type == 'assistant':
assistant_messages += 1

# Extract message content
content = self._extract_message_content(entry)

if content:
# Compute hash
content_hash = self._compute_hash(content)

# Check for duplicate
if content_hash not in self.global_hashes:
# New unique message
self.global_hashes.add(content_hash)
new_unique += 1

# Write to unique messages file
unique_message = {
'hash': content_hash,
'content': content,
'source_session': session_id,
'source_file': str(session_file),
'source_line': line_num,
'entry_type': entry_type,
'timestamp': entry.get('timestamp'),
'extracted_at': datetime.now().isoformat()
}

unique_file.write(json.dumps(unique_message, ensure_ascii=False) + '\n')
else:
duplicates += 1

except json.JSONDecodeError as e:
logger.debug(f"Line {line_num}: Invalid JSON - {e}")
continue
except Exception as e:
logger.debug(f"Line {line_num}: Error processing - {e}")
continue

return {
'user_messages': user_messages,
'assistant_messages': assistant_messages,
'new_unique': new_unique,
'duplicates': duplicates
}

def batch_process_sessions(
self,
session_files: List[Path],
chunk_size: int = 1000
) -> List[ProcessingStats]:
"""
Process multiple session files in batch.

Args:
session_files: List of session file paths
chunk_size: Lines per processing chunk

Returns:
List of ProcessingStats for each session
"""
results = []

logger.info(f"\n{'='*70}")
logger.info(f"BATCH PROCESSING: {len(session_files)} sessions")
logger.info(f"{'='*70}\n")

for idx, session_file in enumerate(session_files, 1):
logger.info(f"\nSession {idx}/{len(session_files)}")

try:
stats = self.process_session(session_file, chunk_size=chunk_size)
results.append(stats)
except Exception as e:
logger.error(f"Failed to process {session_file.name}: {e}")
continue

# Summary
logger.info(f"\n{'='*70}")
logger.info(f"BATCH PROCESSING COMPLETE")
logger.info(f"{'='*70}")
logger.info(f"Sessions processed: {len(results)}")
logger.info(f"Total new unique messages: {sum(s.new_unique_messages for s in results):,}")
logger.info(f"Total duplicates: {sum(s.duplicate_messages for s in results):,}")
logger.info(f"Global unique count: {len(self.global_hashes):,}")
logger.info(f"{'='*70}\n")

return results

def find_large_sessions( projects_dir: Optional[Path] = None, min_size_mb: float = 10.0 ) -> List[Path]: """ Find all JSONL session files larger than threshold.

Args:
projects_dir: Path to ~/.claude/projects/ directory
min_size_mb: Minimum file size in MB

Returns:
List of session file paths sorted by size (largest first)
"""
if projects_dir is None:
projects_dir = Path.home() / ".claude" / "projects"

projects_dir = Path(projects_dir)

if not projects_dir.exists():
logger.warning(f"Projects directory not found: {projects_dir}")
return []

logger.info(f"Scanning for large sessions (>{min_size_mb} MB) in {projects_dir}")

large_sessions = []
min_size_bytes = min_size_mb * 1024 * 1024

# Find all .jsonl files
for jsonl_file in projects_dir.rglob("*.jsonl"):
if jsonl_file.is_file():
file_size = jsonl_file.stat().st_size

if file_size >= min_size_bytes:
large_sessions.append(jsonl_file)

# Sort by size (largest first)
large_sessions.sort(key=lambda f: f.stat().st_size, reverse=True)

logger.info(f"Found {len(large_sessions)} large sessions")

for session_file in large_sessions:
size_mb = session_file.stat().st_size / (1024 * 1024)
logger.info(f" {session_file.name}: {size_mb:.1f} MB")

return large_sessions

if name == "main": import argparse

parser = argparse.ArgumentParser(description="Process JSONL session files with deduplication")
parser.add_argument("session_files", nargs="*", help="Session files to process")
parser.add_argument("--batch", action="store_true", help="Process all large sessions")
parser.add_argument("--min-size", type=float, default=10.0, help="Minimum session size in MB (default: 10)")
parser.add_argument("--chunk-size", type=int, default=1000, help="Lines per processing chunk (default: 1000)")
parser.add_argument("--no-resume", action="store_true", help="Disable resume capability")
parser.add_argument("--memory-context", type=str, help="Path to MEMORY-CONTEXT directory")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")

args = parser.parse_args()

# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

# Initialize processor
memory_context = Path(args.memory_context) if args.memory_context else None
processor = JSONLSessionProcessor(memory_context_dir=memory_context)

# Determine session files to process
session_files = []

if args.batch:
# Find all large sessions
session_files = find_large_sessions(min_size_mb=args.min_size)
elif args.session_files:
# Use provided session files
session_files = [Path(f) for f in args.session_files]
else:
print("Error: Provide session files or use --batch flag")
sys.exit(1)

if not session_files:
print("No session files found to process")
sys.exit(0)

# Process sessions
try:
stats = processor.batch_process_sessions(
session_files,
chunk_size=args.chunk_size
)

print("\nProcessing complete!")
print(f"Sessions processed: {len(stats)}")
print(f"Global unique messages: {len(processor.global_hashes):,}")

sys.exit(0)

except KeyboardInterrupt:
print("\nProcessing interrupted by user")
sys.exit(130)
except Exception as e:
print(f"\nError: {e}")
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)