#!/usr/bin/env python3 """ JSONL Session File Chunker
Splits large JSONL session files into processable chunks with smart boundary detection and overlap for conversation continuity.
Features:
- Smart split point detection (via JSONLAnalyzer)
- Overlap window generation
- Chunk file creation
- Chunk index generation
- Progress tracking
Author: Claude + AZ1.AI License: MIT """
import json import logging import shutil from pathlib import Path from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict from datetime import datetime
from jsonl_analyzer import JSONLAnalyzer, SessionStructure
Setup logging
logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(name)
@dataclass class SessionChunk: """Represents a chunk of a session file""" chunk_id: int start_line: int end_line: int overlap_start: Optional[int] overlap_end: Optional[int] file_path: Path line_count: int estimated_messages: int split_quality: str split_reason: str
@dataclass class ChunkIndex: """Index of all chunks for a session""" session_id: str original_file: str original_size_mb: float total_lines: int total_chunks: int chunk_size_target: int overlap_messages: int chunks: List[Dict[str, Any]] created: str structure: Dict[str, Any]
class SessionChunker: """ Split large JSONL session files into manageable chunks.
Uses JSONLAnalyzer to find safe split points and creates
chunk files with overlap for conversation continuity.
"""
def __init__(self, session_file: Path, chunk_dir: Path):
"""
Initialize chunker with session file and output directory.
Args:
session_file: Path to JSONL session file
chunk_dir: Directory to store chunk files
"""
self.session_file = Path(session_file)
self.chunk_dir = Path(chunk_dir)
if not self.session_file.exists():
raise FileNotFoundError(f"Session file not found: {session_file}")
# Create chunk directory
self.chunk_dir.mkdir(parents=True, exist_ok=True)
# Initialize analyzer
self.analyzer = JSONLAnalyzer(self.session_file)
# Session ID from filename
self.session_id = self.session_file.stem
def split_session(
self,
target_chunk_lines: int = 1000,
overlap_messages: int = 10,
dry_run: bool = False
) -> List[SessionChunk]:
"""
Split session into chunks with overlap.
Args:
target_chunk_lines: Target lines per chunk
overlap_messages: Number of messages to overlap
dry_run: If True, don't create chunk files (just plan)
Returns:
List of SessionChunk objects
Process:
1. Analyze structure
2. Find safe split points
3. Create chunk files with overlap
4. Generate chunk metadata
"""
logger.info(f"Splitting session: {self.session_file.name}")
logger.info(f" Target chunk size: {target_chunk_lines} lines")
logger.info(f" Overlap: {overlap_messages} messages")
# Analyze structure
structure = self.analyzer.analyze_structure()
# Get recommended chunks
chunk_specs = self.analyzer.get_recommended_chunks(
target_chunk_size=target_chunk_lines,
overlap_messages=overlap_messages
)
logger.info(f" Recommended chunks: {len(chunk_specs)}")
# Create chunk files
chunks: List[SessionChunk] = []
for spec in chunk_specs:
chunk = self.create_chunk_file(
start_line=spec['start_line'],
end_line=spec['end_line'],
chunk_id=spec['chunk_id'],
overlap_start=spec.get('overlap_start'),
overlap_end=spec.get('overlap_end'),
split_quality=spec.get('split_quality', 'unknown'),
split_reason=spec.get('split_reason', 'Unknown'),
dry_run=dry_run
)
chunks.append(chunk)
# Create chunk index
if not dry_run:
self.create_chunk_index(chunks, structure, target_chunk_lines, overlap_messages)
logger.info(f" Chunking complete: {len(chunks)} chunks created")
return chunks
def create_chunk_file(
self,
start_line: int,
end_line: int,
chunk_id: int,
overlap_start: Optional[int] = None,
overlap_end: Optional[int] = None,
split_quality: str = "unknown",
split_reason: str = "Unknown",
dry_run: bool = False
) -> SessionChunk:
"""
Extract chunk from session file.
Args:
start_line: Starting line number (1-indexed)
end_line: Ending line number (inclusive)
chunk_id: Chunk number
overlap_start: Start of overlap window (if any)
overlap_end: End of overlap window
split_quality: Quality of split point
split_reason: Reason for split
dry_run: If True, don't create file
Returns:
SessionChunk object
Output:
{chunk_dir}/{session_id}-chunk-{chunk_id:03d}.jsonl
"""
chunk_filename = f"{self.session_id}-chunk-{chunk_id:03d}.jsonl"
chunk_path = self.chunk_dir / chunk_filename
# Count messages in chunk (estimate)
estimated_messages = int((end_line - start_line + 1) * 0.9) # ~90% of lines are messages
if not dry_run:
logger.info(f" Creating chunk {chunk_id}: lines {start_line:,}-{end_line:,}")
# Extract lines
with open(self.session_file, 'r', encoding='utf-8') as infile, \
open(chunk_path, 'w', encoding='utf-8') as outfile:
for line_num, line in enumerate(infile, 1):
if start_line <= line_num <= end_line:
outfile.write(line)
if line_num > end_line:
break
logger.debug(f" Chunk file created: {chunk_path.name}")
chunk = SessionChunk(
chunk_id=chunk_id,
start_line=start_line,
end_line=end_line,
overlap_start=overlap_start,
overlap_end=overlap_end,
file_path=chunk_path,
line_count=end_line - start_line + 1,
estimated_messages=estimated_messages,
split_quality=split_quality,
split_reason=split_reason
)
return chunk
def create_chunk_index(
self,
chunks: List[SessionChunk],
structure: SessionStructure,
chunk_size_target: int,
overlap_messages: int
) -> Path:
"""
Create index of all chunks.
Args:
chunks: List of SessionChunk objects
structure: SessionStructure from analyzer
chunk_size_target: Target chunk size used
overlap_messages: Overlap messages used
Returns:
Path to index file
Output JSON:
{chunk_dir}/{session_id}-chunk-index.json
"""
index_path = self.chunk_dir / f"{self.session_id}-chunk-index.json"
# Build chunk list
chunk_list = []
for chunk in chunks:
chunk_dict = {
'chunk_id': chunk.chunk_id,
'file': chunk.file_path.name,
'lines': f"{chunk.start_line}-{chunk.end_line}",
'line_count': chunk.line_count,
'estimated_messages': chunk.estimated_messages,
'overlap': f"{chunk.overlap_start}-{chunk.overlap_end}" if chunk.overlap_start else None,
'split_quality': chunk.split_quality,
'split_reason': chunk.split_reason
}
chunk_list.append(chunk_dict)
# Build index
index = ChunkIndex(
session_id=self.session_id,
original_file=str(self.session_file),
original_size_mb=round(structure.file_size_mb, 2),
total_lines=structure.total_lines,
total_chunks=len(chunks),
chunk_size_target=chunk_size_target,
overlap_messages=overlap_messages,
chunks=chunk_list,
created=datetime.now().isoformat(),
structure={
'file_snapshots': structure.file_snapshots,
'user_messages': structure.user_messages,
'assistant_messages': structure.assistant_messages,
'total_messages': structure.user_messages + structure.assistant_messages,
'tool_sequences': len(structure.tool_call_sequences),
'first_timestamp': structure.first_timestamp,
'last_timestamp': structure.last_timestamp
}
)
# Write index
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(asdict(index), f, indent=2, default=str)
logger.info(f" Chunk index created: {index_path.name}")
return index_path
def cleanup_chunks(self) -> None:
"""Remove all chunk files for this session"""
logger.info(f"Cleaning up chunks for session: {self.session_id}")
pattern = f"{self.session_id}-chunk-*.jsonl"
chunks_removed = 0
for chunk_file in self.chunk_dir.glob(pattern):
chunk_file.unlink()
chunks_removed += 1
logger.debug(f" Removed: {chunk_file.name}")
# Remove index
index_path = self.chunk_dir / f"{self.session_id}-chunk-index.json"
if index_path.exists():
index_path.unlink()
logger.debug(f" Removed: {index_path.name}")
logger.info(f" Cleanup complete: {chunks_removed} chunks removed")
if name == "main": import argparse import sys
parser = argparse.ArgumentParser(description="Split JSONL session file into chunks")
parser.add_argument("session_file", help="Path to JSONL session file")
parser.add_argument("--chunk-dir", default="chunks", help="Directory for chunk files")
parser.add_argument("--chunk-size", type=int, default=1000, help="Target chunk size in lines")
parser.add_argument("--overlap", type=int, default=10, help="Number of messages to overlap")
parser.add_argument("--dry-run", action="store_true", help="Don't create files (just plan)")
parser.add_argument("--cleanup", action="store_true", help="Remove existing chunks for this session")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
try:
# Initialize chunker
chunker = SessionChunker(
session_file=Path(args.session_file),
chunk_dir=Path(args.chunk_dir)
)
if args.cleanup:
# Cleanup existing chunks
chunker.cleanup_chunks()
sys.exit(0)
# Split session
chunks = chunker.split_session(
target_chunk_lines=args.chunk_size,
overlap_messages=args.overlap,
dry_run=args.dry_run
)
# Report
print(f"\nChunking {'Plan' if args.dry_run else 'Complete'}: {chunker.session_file.name}")
print(f"{'='*70}")
print(f" Total chunks: {len(chunks)}")
print(f" Chunk directory: {chunker.chunk_dir}")
print()
for chunk in chunks:
overlap_info = ""
if chunk.overlap_start:
overlap_size = chunk.overlap_end - chunk.overlap_start + 1
overlap_info = f" (overlap: {overlap_size} lines)"
print(f" Chunk {chunk.chunk_id:2}: {chunk.file_path.name}")
print(f" Lines {chunk.start_line:6,}-{chunk.end_line:6,} ({chunk.line_count:5,} lines){overlap_info}")
print(f" Split: {chunk.split_quality} - {chunk.split_reason}")
print()
if not args.dry_run:
index_path = chunker.chunk_dir / f"{chunker.session_id}-chunk-index.json"
print(f" Index: {index_path}")
print()
sys.exit(0)
except FileNotFoundError as e:
print(f"\n❌ Error: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"\n❌ Unexpected error: {e}", file=sys.stderr)
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)