Skip to main content

#!/usr/bin/env python3 """ JSONL Session Structure Analyzer

Analyzes Claude Code JSONL session files to identify safe split points, message boundaries, and optimal chunking strategies for large sessions.

Features:

  • Stream processing (no full file load)
  • Safe split point detection (file snapshots, user messages, assistant end turns)
  • Tool call sequence tracking (prevents unsafe splits)
  • Overlap window calculation
  • Session metadata extraction

Author: Claude + AZ1.AI License: MIT """

import json import logging from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from enum import Enum from dataclasses import dataclass from datetime import datetime

Setup logging

logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(name)

class JournalEntryType(Enum): """JSONL entry types in Claude Code session files""" FILE_SNAPSHOT = "file-history-snapshot" USER_MESSAGE = "user" ASSISTANT_MESSAGE = "assistant" UNKNOWN = "unknown"

class SplitPointQuality(Enum): """Quality/safety level of split points""" HIGH = "high" # File snapshot - safest MEDIUM = "medium" # User message start - safe LOW = "low" # Assistant end turn - acceptable UNSAFE = "unsafe" # Mid-tool-sequence - never use

@dataclass class SplitPoint: """Represents a potential split boundary""" line_number: int entry_type: str quality: SplitPointQuality reason: str message_index: int = 0

@dataclass class SessionStructure: """Session file structure analysis""" total_lines: int file_snapshots: int user_messages: int assistant_messages: int unknown_entries: int safe_split_points: List[SplitPoint] message_boundaries: List[int] tool_call_sequences: List[Tuple[int, int]] # (start_line, end_line) file_size_mb: float first_timestamp: Optional[str] = None last_timestamp: Optional[str] = None

class JSONLAnalyzer: """ Analyze JSONL session file structure and find safe split points.

Does NOT load entire file into memory - uses streaming analysis.
"""

def __init__(self, session_file: Path):
"""
Initialize analyzer with session file.

Args:
session_file: Path to JSONL session file
"""
self.session_file = Path(session_file)

if not self.session_file.exists():
raise FileNotFoundError(f"Session file not found: {session_file}")

if not self.session_file.is_file():
raise ValueError(f"Not a file: {session_file}")

# Analysis results
self.structure: Optional[SessionStructure] = None

def analyze_structure(self, quick_mode: bool = False) -> SessionStructure:
"""
Scan file and analyze structure without loading into memory.

Args:
quick_mode: If True, sample file instead of full scan (faster)

Returns:
SessionStructure with complete analysis

Raises:
IOError: If file cannot be read
json.JSONDecodeError: If JSONL is malformed
"""
logger.info(f"Analyzing session: {self.session_file.name}")

# Counters
total_lines = 0
file_snapshots = 0
user_messages = 0
assistant_messages = 0
unknown_entries = 0
message_index = 0

# Collections
safe_split_points: List[SplitPoint] = []
message_boundaries: List[int] = []
tool_call_sequences: List[Tuple[int, int]] = []

# State tracking
in_tool_sequence = False
tool_sequence_start = 0
first_timestamp = None
last_timestamp = None

# File size
file_size_mb = self.session_file.stat().st_size / (1024 * 1024)

logger.info(f" File size: {file_size_mb:.2f} MB")

try:
with open(self.session_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
total_lines = line_num

# Quick mode: sample every 10th line after first 1000
if quick_mode and line_num > 1000 and line_num % 10 != 0:
continue

try:
entry = json.loads(line.strip())
entry_type = entry.get('type', 'unknown')

# Track timestamps
timestamp = entry.get('timestamp')
if timestamp:
if not first_timestamp:
first_timestamp = timestamp
last_timestamp = timestamp

# Count entry types
if entry_type == JournalEntryType.FILE_SNAPSHOT.value:
file_snapshots += 1

# File snapshot = HIGHEST quality split point
split_point = SplitPoint(
line_number=line_num,
entry_type=entry_type,
quality=SplitPointQuality.HIGH,
reason="Natural session checkpoint",
message_index=message_index
)
safe_split_points.append(split_point)

elif entry_type == JournalEntryType.USER_MESSAGE.value:
user_messages += 1
message_index += 1
message_boundaries.append(line_num)

# User message = MEDIUM quality split point (if not in tool sequence)
if not in_tool_sequence:
split_point = SplitPoint(
line_number=line_num,
entry_type=entry_type,
quality=SplitPointQuality.MEDIUM,
reason="Conversation turn boundary",
message_index=message_index
)
safe_split_points.append(split_point)

elif entry_type == JournalEntryType.ASSISTANT_MESSAGE.value:
assistant_messages += 1
message_index += 1
message_boundaries.append(line_num)

# Check for tool calls
message_content = entry.get('message', {}).get('content', [])
has_tool_use = False

if isinstance(message_content, list):
has_tool_use = any(
isinstance(item, dict) and item.get('type') == 'tool_use'
for item in message_content
)

if has_tool_use:
# Start of tool sequence
in_tool_sequence = True
tool_sequence_start = line_num
else:
# Check for end of turn
stop_reason = entry.get('message', {}).get('stop_reason')

if stop_reason == 'end_turn' and in_tool_sequence:
# End of tool sequence
tool_call_sequences.append((tool_sequence_start, line_num))
in_tool_sequence = False

# Assistant end turn = LOW quality split point
split_point = SplitPoint(
line_number=line_num,
entry_type=entry_type,
quality=SplitPointQuality.LOW,
reason="Response completion (after tool sequence)",
message_index=message_index
)
safe_split_points.append(split_point)

elif stop_reason == 'end_turn' and not in_tool_sequence:
# Simple assistant end turn
split_point = SplitPoint(
line_number=line_num,
entry_type=entry_type,
quality=SplitPointQuality.MEDIUM,
reason="Response completion",
message_index=message_index
)
safe_split_points.append(split_point)

else:
unknown_entries += 1

except json.JSONDecodeError as e:
logger.warning(f"Line {line_num}: Invalid JSON - {e}")
unknown_entries += 1
continue

except IOError as e:
logger.error(f"Failed to read session file: {e}")
raise

# Create structure
self.structure = SessionStructure(
total_lines=total_lines,
file_snapshots=file_snapshots,
user_messages=user_messages,
assistant_messages=assistant_messages,
unknown_entries=unknown_entries,
safe_split_points=sorted(safe_split_points, key=lambda x: x.line_number),
message_boundaries=message_boundaries,
tool_call_sequences=tool_call_sequences,
file_size_mb=file_size_mb,
first_timestamp=first_timestamp,
last_timestamp=last_timestamp
)

logger.info(f" Analysis complete:")
logger.info(f" Total lines: {total_lines}")
logger.info(f" Messages: {user_messages + assistant_messages} (user: {user_messages}, assistant: {assistant_messages})")
logger.info(f" File snapshots: {file_snapshots}")
logger.info(f" Safe split points: {len(safe_split_points)}")
logger.info(f" Tool call sequences: {len(tool_call_sequences)}")

return self.structure

def find_safe_split_points(
self,
target_chunk_size: int = 1000,
min_chunk_size: int = 500,
max_split_points: Optional[int] = None
) -> List[SplitPoint]:
"""
Find optimal split points for chunking based on target chunk size.

Args:
target_chunk_size: Target lines per chunk
min_chunk_size: Minimum chunk size (prevent tiny chunks)
max_split_points: Maximum number of split points to return

Returns:
List of optimal split points
"""
if not self.structure:
self.analyze_structure()

all_splits = self.structure.safe_split_points

if not all_splits:
logger.warning("No safe split points found")
return []

# Filter splits by quality and spacing
optimal_splits: List[SplitPoint] = []
last_split_line = 0

for split in all_splits:
# Skip if too close to last split
if split.line_number - last_split_line < min_chunk_size:
continue

# Prefer high-quality splits near target chunk size
distance_from_target = abs((split.line_number - last_split_line) - target_chunk_size)

# Add split if:
# 1. High quality (file snapshot) - always add
# 2. Medium/Low quality and close to target size
if split.quality == SplitPointQuality.HIGH:
optimal_splits.append(split)
last_split_line = split.line_number
elif distance_from_target < target_chunk_size * 0.3: # Within 30% of target
optimal_splits.append(split)
last_split_line = split.line_number

# Limit number of splits if requested
if max_split_points and len(optimal_splits) > max_split_points:
# Prefer high-quality splits
optimal_splits = sorted(optimal_splits, key=lambda x: (x.quality.value, x.line_number))[:max_split_points]
optimal_splits = sorted(optimal_splits, key=lambda x: x.line_number)

logger.info(f"Selected {len(optimal_splits)} optimal split points for target chunk size {target_chunk_size}")

return optimal_splits

def calculate_overlap(
self,
split_line: int,
overlap_messages: int = 10
) -> Tuple[int, int]:
"""
Calculate overlap window for conversation continuity.

Args:
split_line: Line number where split occurs
overlap_messages: Number of messages to include in overlap

Returns:
Tuple of (overlap_start_line, overlap_end_line)
"""
if not self.structure:
self.analyze_structure()

# Find message boundaries before split_line
boundaries_before_split = [
b for b in self.structure.message_boundaries
if b < split_line
]

if not boundaries_before_split:
return (split_line, split_line)

# Take last N message boundaries
overlap_boundaries = boundaries_before_split[-overlap_messages:] if len(boundaries_before_split) >= overlap_messages else boundaries_before_split

overlap_start = overlap_boundaries[0] if overlap_boundaries else split_line
overlap_end = split_line

logger.debug(f"Overlap window: lines {overlap_start}-{overlap_end} ({len(overlap_boundaries)} messages)")

return (overlap_start, overlap_end)

def get_recommended_chunks(
self,
target_chunk_size: int = 1000,
overlap_messages: int = 10
) -> List[Dict[str, Any]]:
"""
Get recommended chunking strategy for this session.

Args:
target_chunk_size: Target lines per chunk
overlap_messages: Number of messages to overlap

Returns:
List of chunk specifications with overlap windows
"""
if not self.structure:
self.analyze_structure()

split_points = self.find_safe_split_points(target_chunk_size)

if not split_points:
# No splits needed - single chunk
return [{
'chunk_id': 1,
'start_line': 1,
'end_line': self.structure.total_lines,
'overlap_start': None,
'overlap_end': None,
'estimated_lines': self.structure.total_lines
}]

chunks = []
last_end = 0

for idx, split in enumerate(split_points, 1):
# Calculate overlap for continuity
if idx > 1:
overlap_start, overlap_end = self.calculate_overlap(last_end + 1, overlap_messages)
else:
overlap_start, overlap_end = None, None

chunk = {
'chunk_id': idx,
'start_line': last_end + 1,
'end_line': split.line_number,
'overlap_start': overlap_start,
'overlap_end': overlap_end,
'estimated_lines': split.line_number - last_end,
'split_quality': split.quality.value,
'split_reason': split.reason
}
chunks.append(chunk)
last_end = split.line_number

# Final chunk (from last split to end)
if last_end < self.structure.total_lines:
overlap_start, overlap_end = self.calculate_overlap(last_end + 1, overlap_messages)

chunk = {
'chunk_id': len(chunks) + 1,
'start_line': last_end + 1,
'end_line': self.structure.total_lines,
'overlap_start': overlap_start,
'overlap_end': overlap_end,
'estimated_lines': self.structure.total_lines - last_end,
'split_quality': 'end_of_file',
'split_reason': 'End of session'
}
chunks.append(chunk)

logger.info(f"Recommended chunking: {len(chunks)} chunks")

return chunks

if name == "main": import argparse import sys

parser = argparse.ArgumentParser(description="Analyze JSONL session file structure")
parser.add_argument("session_file", help="Path to JSONL session file")
parser.add_argument("--chunk-size", type=int, default=1000, help="Target chunk size in lines")
parser.add_argument("--overlap", type=int, default=10, help="Number of messages to overlap")
parser.add_argument("--quick", action="store_true", help="Quick mode (sample file)")
parser.add_argument("--show-splits", action="store_true", help="Show all safe split points")
parser.add_argument("--show-chunks", action="store_true", help="Show recommended chunks")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")

args = parser.parse_args()

# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)

try:
# Analyze session
analyzer = JSONLAnalyzer(args.session_file)
structure = analyzer.analyze_structure(quick_mode=args.quick)

if args.json:
# JSON output
output = {
'file': str(analyzer.session_file),
'size_mb': structure.file_size_mb,
'total_lines': structure.total_lines,
'file_snapshots': structure.file_snapshots,
'user_messages': structure.user_messages,
'assistant_messages': structure.assistant_messages,
'total_messages': structure.user_messages + structure.assistant_messages,
'safe_split_points': len(structure.safe_split_points),
'tool_sequences': len(structure.tool_call_sequences),
'first_timestamp': structure.first_timestamp,
'last_timestamp': structure.last_timestamp
}

if args.show_chunks:
chunks = analyzer.get_recommended_chunks(args.chunk_size, args.overlap)
output['recommended_chunks'] = chunks

print(json.dumps(output, indent=2))

else:
# Human-readable output
print(f"\nSession Analysis: {analyzer.session_file.name}")
print(f"{'='*70}")
print(f" File size: {structure.file_size_mb:.2f} MB")
print(f" Total lines: {structure.total_lines:,}")
print(f" Messages: {structure.user_messages + structure.assistant_messages:,}")
print(f" - User: {structure.user_messages:,}")
print(f" - Assistant: {structure.assistant_messages:,}")
print(f" File snapshots: {structure.file_snapshots:,}")
print(f" Tool call sequences: {len(structure.tool_call_sequences):,}")
print(f" Safe split points: {len(structure.safe_split_points):,}")

if structure.first_timestamp and structure.last_timestamp:
print(f" Time range: {structure.first_timestamp} → {structure.last_timestamp}")

if args.show_splits:
print(f"\n Safe Split Points:")
for split in structure.safe_split_points[:20]: # Show first 20
print(f" Line {split.line_number:6,}: {split.entry_type:20} ({split.quality.value:6}) - {split.reason}")
if len(structure.safe_split_points) > 20:
print(f" ... ({len(structure.safe_split_points) - 20} more)")

if args.show_chunks:
chunks = analyzer.get_recommended_chunks(args.chunk_size, args.overlap)
print(f"\n Recommended Chunking Strategy:")
print(f" Target chunk size: {args.chunk_size} lines")
print(f" Overlap: {args.overlap} messages")
print(f" Total chunks: {len(chunks)}")
print()

for chunk in chunks:
overlap_info = ""
if chunk['overlap_start']:
overlap_info = f" (overlap: {chunk['overlap_start']}-{chunk['overlap_end']})"

print(f" Chunk {chunk['chunk_id']:2}: Lines {chunk['start_line']:6,}-{chunk['end_line']:6,} ({chunk['estimated_lines']:5,} lines){overlap_info}")
print(f" Split: {chunk['split_quality']} - {chunk['split_reason']}")

print()

sys.exit(0)

except FileNotFoundError as e:
print(f"\nāŒ Error: {e}", file=sys.stderr)
sys.exit(1)

except Exception as e:
print(f"\nāŒ Unexpected error: {e}", file=sys.stderr)
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)