scripts-session-recovery
#!/usr/bin/env python3 """ Session Recovery - Detect and recover hung Claude CLI sessions.
This script provides utilities for:
- Detecting all Claude processes
- Mapping PIDs to TTYs to session files
- Identifying hung sessions
- Exporting sessions (LOSSLESS)
- Killing processes safely
- Generating resume commands
Usage: python3 session-recovery.py --status # Show all sessions python3 session-recovery.py --export # Export all sessions python3 session-recovery.py --kill s013 # Kill session on TTY s013 python3 session-recovery.py --resume # Show resume commands python3 session-recovery.py --interactive # Interactive mode
Author: Claude (Opus 4.5) Created: 2026-02-05 Track: J.26 """
import argparse import json import os import re import shutil import signal import subprocess import sys from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple
@dataclass class ProcessInfo: """Information about a running Claude process.""" pid: int tty: str cpu_time: str mem_percent: float command: str user: str
@property
def cpu_minutes(self) -> float:
"""Parse CPU time string to minutes."""
try:
parts = self.cpu_time.split(':')
if len(parts) == 2:
return int(parts[0]) + int(parts[1]) / 60
elif len(parts) == 3:
return int(parts[0]) * 60 + int(parts[1]) + int(parts[2]) / 60
except (ValueError, IndexError):
pass
return 0.0
@dataclass class SessionInfo: """Information about a Claude session.""" uuid: str path: Path project_hash: str size_bytes: int modified: datetime process: Optional[ProcessInfo] = None
@property
def size_human(self) -> str:
"""Human-readable file size."""
if self.size_bytes >= 1024 * 1024:
return f"{self.size_bytes / (1024 * 1024):.1f}MB"
elif self.size_bytes >= 1024:
return f"{self.size_bytes / 1024:.1f}KB"
return f"{self.size_bytes}B"
@property
def age_minutes(self) -> float:
"""Minutes since last modification."""
return (datetime.now() - self.modified).total_seconds() / 60
@property
def resume_command(self) -> str:
"""Command to resume this session."""
return f"claude --resume {self.uuid}"
class SessionRecovery: """Main class for session recovery operations."""
def __init__(self, timeout_minutes: int = 10):
self.timeout_minutes = timeout_minutes
self.claude_dir = Path.home() / '.claude' / 'projects'
self.export_dir = Path.home() / 'PROJECTS' / '.coditect-data' / 'sessions-export-pending-anthropic'
self.current_tty = self._get_current_tty()
def _get_current_tty(self) -> str:
"""Get the TTY of the current terminal."""
try:
result = subprocess.run(['tty'], capture_output=True, text=True)
tty = result.stdout.strip()
# Extract short form (e.g., /dev/ttys009 -> s009)
if '/dev/ttys' in tty:
return 's' + tty.split('/dev/ttys')[1]
elif '/dev/pts/' in tty:
return 'pts/' + tty.split('/dev/pts/')[1]
return tty
except Exception:
return ''
def detect_claude_processes(self) -> List[ProcessInfo]:
"""Detect all running Claude CLI processes."""
processes = []
try:
# Run ps aux
result = subprocess.run(
['ps', 'aux'],
capture_output=True,
text=True
)
for line in result.stdout.strip().split('\n')[1:]: # Skip header
if 'claude' in line and 'grep' not in line:
parts = line.split()
if len(parts) >= 11:
# Check if this is actually a claude process (not a substring match)
cmd = ' '.join(parts[10:])
if cmd.endswith('claude') or ' claude ' in cmd or parts[10] == 'claude':
processes.append(ProcessInfo(
pid=int(parts[1]),
user=parts[0],
cpu_time=parts[9],
mem_percent=float(parts[3]),
tty=parts[6] if parts[6] != '??' else '',
command=cmd
))
except Exception as e:
print(f"Error detecting processes: {e}", file=sys.stderr)
return processes
def get_all_sessions(self) -> List[SessionInfo]:
"""Get all session files from Claude projects directory."""
sessions = []
if not self.claude_dir.exists():
return sessions
for project_dir in self.claude_dir.iterdir():
if project_dir.is_dir():
for jsonl in project_dir.glob('*.jsonl'):
try:
stat = jsonl.stat()
sessions.append(SessionInfo(
uuid=jsonl.stem,
path=jsonl,
project_hash=project_dir.name,
size_bytes=stat.st_size,
modified=datetime.fromtimestamp(stat.st_mtime)
))
except Exception:
continue
# Sort by modification time (most recent first)
sessions.sort(key=lambda s: s.modified, reverse=True)
return sessions
def map_tty_to_session(self, tty: str, processes: List[ProcessInfo],
sessions: List[SessionInfo]) -> Optional[SessionInfo]:
"""
Map a TTY to its corresponding session.
This is heuristic-based since there's no direct link:
- Find process on the TTY
- Match to most recently modified session
"""
# Find process on this TTY
process = next((p for p in processes if p.tty == tty), None)
if not process:
return None
# Find most recently modified session (likely the active one)
# This is a heuristic - Claude doesn't expose session-to-process mapping
if sessions:
session = sessions[0] # Most recent
session.process = process
return session
return None
def detect_hung_sessions(self, processes: List[ProcessInfo],
sessions: List[SessionInfo]) -> List[SessionInfo]:
"""
Identify sessions that appear hung.
A session is "hung" when:
- There's an active Claude process
- The session file hasn't been modified in > timeout_minutes
- The process is consuming CPU (not idle)
"""
hung = []
for session in sessions:
# Skip very recent sessions
if session.age_minutes < self.timeout_minutes:
continue
# Check if there's an active process
# This is approximate - we match by recent activity
for proc in processes:
if proc.cpu_minutes > 0: # Process is active
if proc.tty != self.current_tty: # Not current session
session.process = proc
hung.append(session)
break
return hung
def export_session(self, session: SessionInfo, lossless: bool = True) -> Optional[Path]:
"""
Export a session to the pending directory.
Args:
session: Session to export
lossless: If True, copy raw JSONL (recommended)
Returns:
Path to exported file, or None on failure
"""
# Ensure export directory exists
self.export_dir.mkdir(parents=True, exist_ok=True)
# Generate timestamped filename
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H-%M-%SZ')
prefix = 'LOSSLESS-' if lossless else ''
dest_name = f"{timestamp}-{prefix}{session.uuid}-rescued.jsonl"
dest_path = self.export_dir / dest_name
try:
shutil.copy2(session.path, dest_path)
return dest_path
except Exception as e:
print(f"Error exporting session: {e}", file=sys.stderr)
return None
def kill_process(self, pid: int, force: bool = False) -> bool:
"""
Kill a Claude process.
Args:
pid: Process ID to kill
force: If True, use SIGKILL; otherwise SIGTERM
Returns:
True if process was killed successfully
"""
# Safety: Never kill current session
processes = self.detect_claude_processes()
target = next((p for p in processes if p.pid == pid), None)
if target and target.tty == self.current_tty:
print("ERROR: Cannot kill current session!", file=sys.stderr)
return False
try:
sig = signal.SIGKILL if force else signal.SIGTERM
os.kill(pid, sig)
# Clean up message bus registration (H.13.5.4)
self._cleanup_bus_session(pid)
return True
except ProcessLookupError:
print(f"Process {pid} not found", file=sys.stderr)
return False
except PermissionError:
print(f"Permission denied to kill {pid}", file=sys.stderr)
return False
def _cleanup_bus_session(self, pid: int) -> None:
"""
Unregister a killed session from the inter-session message bus (H.13.5.4).
Removes the session from messaging.db and releases any advisory
file locks it held. Non-fatal on any error.
"""
try:
from scripts.core.session_message_bus import get_session_message_bus
bus = get_session_message_bus()
sessions = bus.list_sessions(active_only=False)
for s in sessions:
if s.pid == pid:
bus.unregister_session(s.session_id)
break
except Exception:
pass # Non-fatal - stale cleanup will handle it eventually
def generate_resume_command(self, session: SessionInfo) -> str:
"""Generate the command to resume a session."""
return session.resume_command
def log_rescue_event(self, action: str, sessions: List[SessionInfo],
details: Optional[str] = None) -> None:
"""
Log a rescue event to the project-scoped session log (J.26.4.5).
Args:
action: What happened (e.g., "exported", "killed", "export+kill")
sessions: Sessions that were affected
details: Optional additional details
"""
try:
from scripts.session_log_manager import append_entry
except ImportError:
# Try relative import for direct script execution
try:
script_dir = Path(__file__).resolve().parent
sys.path.insert(0, str(script_dir.parent))
from scripts.session_log_manager import append_entry
except ImportError:
return # Silently skip if session log manager not available
session_list = ", ".join(
f"{s.uuid[:8]}({s.size_human})" for s in sessions[:5]
)
message = f"Session rescue: {action} {len(sessions)} session(s)"
fix_text = f"Sessions: {session_list}"
if details:
fix_text += f"\n{details}"
try:
append_entry(
message=message,
fix=fix_text,
tasks="J.26",
author="session-recovery.py (automated)",
classify=False,
)
except Exception:
pass # Never let logging failure break recovery
def print_status(self, processes: List[ProcessInfo], sessions: List[SessionInfo]):
"""Print current session status."""
print("=" * 60)
print("CLAUDE SESSION STATUS")
print("=" * 60)
print(f"\nActive Processes: {len(processes)}")
print("-" * 40)
for proc in processes:
current = " (CURRENT)" if proc.tty == self.current_tty else ""
print(f" PID: {proc.pid:6} TTY: {proc.tty:6} CPU: {proc.cpu_time:10}{current}")
print(f"\nSession Files: {len(sessions)} (showing recent 10)")
print("-" * 40)
for session in sessions[:10]:
age = f"{session.age_minutes:.0f}m ago"
print(f" {session.uuid[:8]}... {session.size_human:>8} {age:>10}")
hung = self.detect_hung_sessions(processes, sessions)
if hung:
print(f"\nPotentially Hung: {len(hung)}")
print("-" * 40)
for session in hung:
print(f" {session.uuid[:8]}... Last activity: {session.age_minutes:.0f}m ago")
def interactive_recovery(self):
"""Run interactive session recovery."""
processes = self.detect_claude_processes()
sessions = self.get_all_sessions()
if not processes:
print("No Claude processes found.")
return
self.print_status(processes, sessions)
# Find hung sessions
hung = self.detect_hung_sessions(processes, sessions)
if not hung:
print("\nNo hung sessions detected.")
return
print(f"\n{len(hung)} potentially hung session(s) found.")
print("\nOptions:")
print(" 1. Export all hung sessions")
print(" 2. Export and kill all hung sessions")
print(" 3. Show resume commands")
print(" 4. Exit")
try:
choice = input("\nSelect option [1-4]: ").strip()
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
if choice == '1':
exported = []
for session in hung:
path = self.export_session(session)
if path:
print(f"Exported: {path}")
exported.append(session)
if exported:
self.log_rescue_event("exported", exported)
elif choice == '2':
exported = []
killed = []
for session in hung:
path = self.export_session(session)
if path:
print(f"Exported: {path}")
exported.append(session)
if session.process:
confirm = input(f"Kill PID {session.process.pid}? [y/N]: ").strip().lower()
if confirm == 'y':
if self.kill_process(session.process.pid):
print(f"Killed PID {session.process.pid}")
killed.append(session)
action = "export+kill" if killed else "exported"
if exported:
self.log_rescue_event(action, exported,
f"Killed: {len(killed)}" if killed else None)
elif choice == '3':
print("\nResume commands:")
for session in sessions[:5]:
print(f" {session.resume_command}")
else:
print("Exiting.")
def main(): parser = argparse.ArgumentParser(description='Claude Session Recovery') parser.add_argument('--status', action='store_true', help='Show session status') parser.add_argument('--export', action='store_true', help='Export all active sessions') parser.add_argument('--kill', metavar='TTY', help='Kill session on specified TTY') parser.add_argument('--resume', action='store_true', help='Show resume commands') parser.add_argument('--interactive', '-i', action='store_true', help='Interactive mode') parser.add_argument('--timeout', type=int, default=10, help='Hung timeout in minutes') parser.add_argument('--force', action='store_true', help='Force kill without confirmation') parser.add_argument('--json', action='store_true', help='Output in JSON format')
args = parser.parse_args()
recovery = SessionRecovery(timeout_minutes=args.timeout)
processes = recovery.detect_claude_processes()
sessions = recovery.get_all_sessions()
if args.json:
# JSON output mode
data = {
'processes': [vars(p) for p in processes],
'sessions': [{
'uuid': s.uuid,
'path': str(s.path),
'size': s.size_human,
'age_minutes': s.age_minutes,
'resume_command': s.resume_command
} for s in sessions[:10]]
}
print(json.dumps(data, indent=2, default=str))
return
if args.status or (not any([args.export, args.kill, args.resume, args.interactive])):
recovery.print_status(processes, sessions)
elif args.export:
print("Exporting all sessions...")
exported = []
for session in sessions[:5]: # Recent 5
path = recovery.export_session(session)
if path:
print(f" Exported: {path.name}")
exported.append(session)
if exported:
recovery.log_rescue_event("exported", exported)
elif args.kill:
tty = args.kill
target = next((p for p in processes if p.tty == tty), None)
if not target:
print(f"No Claude process found on TTY {tty}")
sys.exit(1)
if not args.force:
confirm = input(f"Kill PID {target.pid} on {tty}? [y/N]: ").strip().lower()
if confirm != 'y':
print("Cancelled.")
sys.exit(0)
if recovery.kill_process(target.pid, force=args.force):
print(f"Killed PID {target.pid}")
# Find session associated with this TTY for logging
matched = recovery.map_tty_to_session(tty, processes, sessions)
if matched:
recovery.log_rescue_event("killed", [matched],
f"PID {target.pid} on TTY {tty}")
else:
sys.exit(1)
elif args.resume:
print("Resume commands for recent sessions:")
for session in sessions[:5]:
print(f" {session.resume_command}")
elif args.interactive:
recovery.interactive_recovery()
if name == 'main': main()