#!/usr/bin/env python3 """ Session Processing Watermark Tracker
Tracks processing progress for JSONL session files to enable resume capability after crashes or interruptions.
Features:
- Per-session watermark tracking
- Chunk completion tracking
- Status management (pending, in_progress, completed, failed)
- Resume from last successful point
- Atomic updates
Author: Claude + AZ1.AI License: MIT """
import json import logging from pathlib import Path from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict from datetime import datetime from enum import Enum
Setup logging
logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(name)
class ProcessingStatus(Enum): """Session processing status""" PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed"
@dataclass class SessionWatermark: """Watermark for session processing progress""" session_id: str last_processed_line: int last_processed_timestamp: str total_lines: int chunks_completed: List[int] chunks_pending: List[int] status: ProcessingStatus error_message: Optional[str] = None created: Optional[str] = None updated: Optional[str] = None
class WatermarkTracker: """ Track session processing watermarks for resume capability.
Storage:
{storage_dir}/session_watermarks.json
"""
def __init__(self, storage_dir: Path):
"""
Initialize watermark tracker.
Args:
storage_dir: Directory for watermark storage
"""
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
self.watermarks_file = self.storage_dir / "session_watermarks.json"
# Load watermarks
self.watermarks: Dict[str, SessionWatermark] = self._load_watermarks()
logger.info(f"WatermarkTracker initialized: {len(self.watermarks)} sessions tracked")
def _load_watermarks(self) -> Dict[str, SessionWatermark]:
"""Load watermarks from disk"""
if not self.watermarks_file.exists():
return {}
try:
with open(self.watermarks_file, 'r', encoding='utf-8') as f:
data = json.load(f)
watermarks = {}
for session_id, wm_data in data.items():
# Convert status string to enum
wm_data['status'] = ProcessingStatus(wm_data['status'])
watermarks[session_id] = SessionWatermark(**wm_data)
return watermarks
except (json.JSONDecodeError, KeyError, ValueError) as e:
logger.warning(f"Failed to load watermarks (using empty): {e}")
return {}
def _save_watermarks(self) -> None:
"""Save watermarks to disk (atomic)"""
try:
# Convert to dict
data = {}
for session_id, watermark in self.watermarks.items():
wm_dict = asdict(watermark)
# Convert enum to string
wm_dict['status'] = watermark.status.value
data[session_id] = wm_dict
# Atomic write
temp_file = self.watermarks_file.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
# Atomic rename
temp_file.rename(self.watermarks_file)
except IOError as e:
logger.error(f"Failed to save watermarks: {e}")
raise
def get_watermark(self, session_id: str) -> Optional[SessionWatermark]:
"""
Get watermark for session.
Args:
session_id: Session identifier
Returns:
SessionWatermark or None if not tracked
"""
return self.watermarks.get(session_id)
def update_watermark(
self,
session_id: str,
last_processed_line: int,
total_lines: int,
chunk_id: Optional[int] = None,
status: ProcessingStatus = ProcessingStatus.IN_PROGRESS,
error_message: Optional[str] = None
) -> SessionWatermark:
"""
Update watermark for session.
Args:
session_id: Session identifier
last_processed_line: Last line successfully processed
total_lines: Total lines in session
chunk_id: Chunk just completed (if applicable)
status: Processing status
error_message: Error message (if failed)
Returns:
Updated SessionWatermark
"""
now = datetime.now().isoformat()
# Get existing watermark or create new
watermark = self.watermarks.get(session_id)
if watermark:
# Update existing
watermark.last_processed_line = last_processed_line
watermark.last_processed_timestamp = now
watermark.total_lines = total_lines
watermark.status = status
watermark.error_message = error_message
watermark.updated = now
# Update chunk tracking
if chunk_id is not None:
if chunk_id not in watermark.chunks_completed:
watermark.chunks_completed.append(chunk_id)
watermark.chunks_completed.sort()
if chunk_id in watermark.chunks_pending:
watermark.chunks_pending.remove(chunk_id)
else:
# Create new watermark
watermark = SessionWatermark(
session_id=session_id,
last_processed_line=last_processed_line,
last_processed_timestamp=now,
total_lines=total_lines,
chunks_completed=[chunk_id] if chunk_id else [],
chunks_pending=[],
status=status,
error_message=error_message,
created=now,
updated=now
)
self.watermarks[session_id] = watermark
# Save to disk
self._save_watermarks()
logger.debug(f"Watermark updated: {session_id} @ line {last_processed_line} ({status.value})")
return watermark
def mark_completed(self, session_id: str) -> None:
"""Mark session as completed"""
watermark = self.watermarks.get(session_id)
if watermark:
watermark.status = ProcessingStatus.COMPLETED
watermark.last_processed_line = watermark.total_lines
watermark.updated = datetime.now().isoformat()
self._save_watermarks()
logger.info(f"Session marked completed: {session_id}")
def mark_failed(self, session_id: str, error_message: str) -> None:
"""Mark session as failed"""
watermark = self.watermarks.get(session_id)
if watermark:
watermark.status = ProcessingStatus.FAILED
watermark.error_message = error_message
watermark.updated = datetime.now().isoformat()
self._save_watermarks()
logger.warning(f"Session marked failed: {session_id} - {error_message}")
def reset_watermark(self, session_id: str) -> None:
"""Reset watermark for session (start over)"""
if session_id in self.watermarks:
del self.watermarks[session_id]
self._save_watermarks()
logger.info(f"Watermark reset: {session_id}")
def get_progress(self, session_id: str) -> Dict[str, Any]:
"""
Get progress statistics for session.
Returns:
Dict with progress info
"""
watermark = self.watermarks.get(session_id)
if not watermark:
return {
'session_id': session_id,
'status': 'not_started',
'progress_percent': 0,
'lines_processed': 0,
'lines_remaining': 0
}
progress_percent = (watermark.last_processed_line / watermark.total_lines * 100) if watermark.total_lines > 0 else 0
return {
'session_id': session_id,
'status': watermark.status.value,
'progress_percent': round(progress_percent, 1),
'lines_processed': watermark.last_processed_line,
'lines_remaining': watermark.total_lines - watermark.last_processed_line,
'chunks_completed': len(watermark.chunks_completed),
'chunks_pending': len(watermark.chunks_pending),
'last_updated': watermark.last_processed_timestamp,
'error': watermark.error_message
}
def list_in_progress(self) -> List[SessionWatermark]:
"""Get all in-progress sessions"""
return [
wm for wm in self.watermarks.values()
if wm.status == ProcessingStatus.IN_PROGRESS
]
def list_failed(self) -> List[SessionWatermark]:
"""Get all failed sessions"""
return [
wm for wm in self.watermarks.values()
if wm.status == ProcessingStatus.FAILED
]
def list_completed(self) -> List[SessionWatermark]:
"""Get all completed sessions"""
return [
wm for wm in self.watermarks.values()
if wm.status == ProcessingStatus.COMPLETED
]
if name == "main": import argparse import sys
parser = argparse.ArgumentParser(description="Session watermark tracker")
parser.add_argument("--storage-dir", default="dedup_state", help="Storage directory")
parser.add_argument("--session", help="Session ID to check")
parser.add_argument("--list", choices=['all', 'in-progress', 'completed', 'failed'], help="List sessions")
parser.add_argument("--reset", metavar="SESSION_ID", help="Reset watermark for session")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
try:
tracker = WatermarkTracker(Path(args.storage_dir))
if args.reset:
tracker.reset_watermark(args.reset)
print(f"ā
Watermark reset: {args.reset}")
sys.exit(0)
if args.session:
# Show progress for specific session
progress = tracker.get_progress(args.session)
if args.json:
print(json.dumps(progress, indent=2))
else:
print(f"\nSession: {progress['session_id']}")
print(f"{'='*70}")
print(f" Status: {progress['status']}")
print(f" Progress: {progress['progress_percent']}%")
print(f" Lines processed: {progress['lines_processed']:,}")
print(f" Lines remaining: {progress['lines_remaining']:,}")
if progress.get('chunks_completed'):
print(f" Chunks completed: {progress['chunks_completed']}")
if progress.get('chunks_pending'):
print(f" Chunks pending: {progress['chunks_pending']}")
if progress.get('last_updated'):
print(f" Last updated: {progress['last_updated']}")
if progress.get('error'):
print(f" Error: {progress['error']}")
print()
elif args.list:
# List sessions
if args.list == 'all':
sessions = list(tracker.watermarks.values())
elif args.list == 'in-progress':
sessions = tracker.list_in_progress()
elif args.list == 'completed':
sessions = tracker.list_completed()
elif args.list == 'failed':
sessions = tracker.list_failed()
if args.json:
output = [asdict(s) for s in sessions]
for item in output:
item['status'] = item['status'].value
print(json.dumps(output, indent=2))
else:
print(f"\n{args.list.title()} Sessions: {len(sessions)}")
print(f"{'='*70}")
for session in sessions:
progress_percent = (session.last_processed_line / session.total_lines * 100) if session.total_lines > 0 else 0
print(f" {session.session_id}")
print(f" Status: {session.status.value} ({progress_percent:.1f}%)")
print(f" Lines: {session.last_processed_line:,} / {session.total_lines:,}")
print(f" Chunks: {len(session.chunks_completed)} completed, {len(session.chunks_pending)} pending")
if session.error_message:
print(f" Error: {session.error_message}")
print()
else:
# Show summary
total = len(tracker.watermarks)
in_progress = len(tracker.list_in_progress())
completed = len(tracker.list_completed())
failed = len(tracker.list_failed())
print(f"\nWatermark Tracker Summary")
print(f"{'='*70}")
print(f" Storage: {tracker.watermarks_file}")
print(f" Total sessions: {total}")
print(f" In progress: {in_progress}")
print(f" Completed: {completed}")
print(f" Failed: {failed}")
print()
sys.exit(0)
except Exception as e:
print(f"\nā Error: {e}", file=sys.stderr)
if args.verbose:
import traceback
traceback.print_exc()
sys.exit(1)