#!/usr/bin/env python3 """ J.9.2: Automatic Checkpoint Creation - Session Auto-Preservation
Creates lightweight checkpoints automatically based on triggers:
- Time-based: After N minutes of active work
- Event-based: After significant task completion
- Decision-density: When many decisions are detected in session
- Context-threshold: Before auto-export triggers
This module integrates with:
- J.8.3: Session Linker (cross-session context linking)
- J.8.4: Session Resume (optimized resume context)
- J.9.1: Context Watcher (daemon monitoring)
Usage: from scripts.context_graph.auto_checkpoint import ( AutoCheckpointer, CheckpointTrigger, CheckpointResult, should_create_checkpoint, create_auto_checkpoint, )
# Check if checkpoint is needed
if should_create_checkpoint(session_id, trigger=CheckpointTrigger.TIME_BASED):
result = create_auto_checkpoint(session_id)
# Use the AutoCheckpointer class for fine-grained control
checkpointer = AutoCheckpointer()
result = checkpointer.create_checkpoint(
session_id=session_id,
trigger=CheckpointTrigger.CONTEXT_THRESHOLD,
include_resume_context=True,
)
CLI: python3 scripts/context_graph/auto_checkpoint.py --session SESSION_ID python3 scripts/context_graph/auto_checkpoint.py --trigger context_threshold python3 scripts/context_graph/auto_checkpoint.py --check-triggers python3 scripts/context_graph/auto_checkpoint.py --help
ADR References: - ADR-118: Four-Tier Database Architecture - ADR-151: Context Graph Evolution Architecture
Created: 2026-02-04 Author: Claude (Opus 4.5) Track: J (Memory Intelligence) Task: J.9.2 """
import argparse import json import logging import os import sqlite3 import sys import uuid from dataclasses import dataclass, field from datetime import datetime, timezone, timedelta from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
Add parent to path for imports
sys.path.insert(0, str(Path(file).parent.parent))
Configure logging
logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(name)
Database path constants (ADR-118)
CODITECT_DATA_DIR = Path.home() / "PROJECTS" / ".coditect-data" CONTEXT_STORAGE_DIR = CODITECT_DATA_DIR / "context-storage" SESSIONS_DB_PATH = CONTEXT_STORAGE_DIR / "sessions.db" CHECKPOINTS_DIR = CONTEXT_STORAGE_DIR / "checkpoints"
class CheckpointTrigger(Enum): """Types of triggers for automatic checkpoint creation.""" MANUAL = "manual" # User-initiated TIME_BASED = "time_based" # After N minutes of work TASK_COMPLETION = "task_completion" # After significant task completion DECISION_DENSITY = "decision_density" # High decision count in session CONTEXT_THRESHOLD = "context_threshold" # Before context export SESSION_END = "session_end" # End of session ERROR_RECOVERY = "error_recovery" # After error recovery
@dataclass class CheckpointConfig: """Configuration for automatic checkpoint creation.""" # Time-based triggers time_interval_minutes: int = 30 # Create checkpoint every N minutes min_messages_for_checkpoint: int = 10 # Minimum messages before time checkpoint
# Decision density triggers
decision_threshold: int = 5 # Number of decisions to trigger checkpoint
decision_window_minutes: int = 15 # Window for decision counting
# Context threshold triggers
context_percent_trigger: float = 70.0 # Context % that triggers checkpoint
# Output settings
include_resume_context: bool = True # Include J.8.4 resume context
include_session_links: bool = True # Include J.8.3 session links
max_checkpoint_size_kb: int = 50 # Maximum checkpoint size
# Storage
checkpoints_dir: Path = field(default_factory=lambda: CHECKPOINTS_DIR)
sessions_db_path: Path = field(default_factory=lambda: SESSIONS_DB_PATH)
@dataclass class CheckpointResult: """Result of a checkpoint creation operation.""" success: bool checkpoint_id: str trigger: CheckpointTrigger session_id: str file_path: Optional[Path] = None summary: str = "" message_count: int = 0 decision_count: int = 0 created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"success": self.success,
"checkpoint_id": self.checkpoint_id,
"trigger": self.trigger.value,
"session_id": self.session_id,
"file_path": str(self.file_path) if self.file_path else None,
"summary": self.summary,
"message_count": self.message_count,
"decision_count": self.decision_count,
"created_at": self.created_at,
"error": self.error,
}
class AutoCheckpointer: """ Automatic checkpoint creation for session preservation.
Integrates with J.8.3 (Session Linker) and J.8.4 (Session Resume)
to create lightweight checkpoints based on various triggers.
"""
def __init__(self, config: Optional[CheckpointConfig] = None):
"""Initialize the auto checkpointer.
Args:
config: Optional configuration. Uses defaults if not provided.
"""
self.config = config or CheckpointConfig()
self._ensure_directories()
# Import session components (J.8.3, J.8.4)
self._session_linker = None
self._session_resume = None
self._import_session_components()
def _ensure_directories(self) -> None:
"""Ensure required directories exist."""
self.config.checkpoints_dir.mkdir(parents=True, exist_ok=True)
self.config.sessions_db_path.parent.mkdir(parents=True, exist_ok=True)
def _import_session_components(self) -> None:
"""Import J.8.3 and J.8.4 components if available."""
try:
from scripts.context_graph.session_linker import SessionLinker
self._session_linker = SessionLinker
logger.debug("J.8.3 SessionLinker available")
except ImportError:
logger.debug("J.8.3 SessionLinker not available")
try:
from scripts.context_graph.session_resume import SessionResumeOptimizer
self._session_resume = SessionResumeOptimizer
logger.debug("J.8.4 SessionResumeOptimizer available")
except ImportError:
logger.debug("J.8.4 SessionResumeOptimizer not available")
def _get_db_connection(self) -> sqlite3.Connection:
"""Get database connection for sessions.db."""
conn = sqlite3.connect(str(self.config.sessions_db_path))
conn.row_factory = sqlite3.Row
return conn
def _ensure_checkpoint_table(self, conn: sqlite3.Connection) -> None:
"""Ensure auto_checkpoints table exists."""
conn.execute("""
CREATE TABLE IF NOT EXISTS auto_checkpoints (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
trigger TEXT NOT NULL,
file_path TEXT,
summary TEXT,
message_count INTEGER DEFAULT 0,
decision_count INTEGER DEFAULT 0,
context_percent REAL,
resume_context TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(session_id)
)
""")
# Index for session lookups
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_auto_checkpoints_session
ON auto_checkpoints(session_id, created_at DESC)
""")
conn.commit()
def should_create_checkpoint(
self,
session_id: str,
trigger: CheckpointTrigger,
context_percent: Optional[float] = None,
) -> Tuple[bool, str]:
"""Check if a checkpoint should be created for the given trigger.
Args:
session_id: Session to check
trigger: Type of trigger to evaluate
context_percent: Current context usage percentage (for threshold trigger)
Returns:
Tuple of (should_create, reason)
"""
conn = self._get_db_connection()
self._ensure_checkpoint_table(conn)
try:
# Check for recent checkpoint (avoid duplicates)
cursor = conn.execute("""
SELECT created_at FROM auto_checkpoints
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 1
""", (session_id,))
last_checkpoint = cursor.fetchone()
cooldown_minutes = 5 # Minimum time between checkpoints
if last_checkpoint:
last_time = datetime.fromisoformat(last_checkpoint["created_at"].replace("Z", "+00:00"))
time_since = datetime.now(timezone.utc) - last_time
if time_since < timedelta(minutes=cooldown_minutes):
return False, f"Recent checkpoint exists ({time_since.seconds // 60}m ago)"
# Evaluate trigger-specific conditions
if trigger == CheckpointTrigger.TIME_BASED:
return self._check_time_trigger(conn, session_id)
elif trigger == CheckpointTrigger.DECISION_DENSITY:
return self._check_decision_trigger(conn, session_id)
elif trigger == CheckpointTrigger.CONTEXT_THRESHOLD:
if context_percent is not None and context_percent >= self.config.context_percent_trigger:
return True, f"Context at {context_percent:.1f}% (threshold: {self.config.context_percent_trigger}%)"
return False, "Context below threshold"
elif trigger == CheckpointTrigger.TASK_COMPLETION:
return self._check_task_completion_trigger(conn, session_id)
elif trigger in (CheckpointTrigger.SESSION_END, CheckpointTrigger.ERROR_RECOVERY, CheckpointTrigger.MANUAL):
return True, f"Trigger: {trigger.value}"
return False, "Unknown trigger type"
finally:
conn.close()
def _check_time_trigger(self, conn: sqlite3.Connection, session_id: str) -> Tuple[bool, str]:
"""Check if time-based checkpoint should be created."""
# Get session start and message count
cursor = conn.execute("""
SELECT
MIN(timestamp) as first_msg,
MAX(timestamp) as last_msg,
COUNT(*) as msg_count
FROM messages
WHERE session_id = ?
""", (session_id,))
row = cursor.fetchone()
if not row or not row["first_msg"]:
return False, "No messages in session"
msg_count = row["msg_count"]
if msg_count < self.config.min_messages_for_checkpoint:
return False, f"Only {msg_count} messages (need {self.config.min_messages_for_checkpoint})"
# Check time since last checkpoint or session start
cursor = conn.execute("""
SELECT created_at FROM auto_checkpoints
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT 1
""", (session_id,))
last_cp = cursor.fetchone()
if last_cp:
reference_time = datetime.fromisoformat(last_cp["created_at"].replace("Z", "+00:00"))
else:
reference_time = datetime.fromisoformat(row["first_msg"].replace("Z", "+00:00"))
time_since = datetime.now(timezone.utc) - reference_time
minutes_since = time_since.total_seconds() / 60
if minutes_since >= self.config.time_interval_minutes:
return True, f"{minutes_since:.0f}m since last checkpoint (threshold: {self.config.time_interval_minutes}m)"
return False, f"Only {minutes_since:.0f}m since last checkpoint"
def _check_decision_trigger(self, conn: sqlite3.Connection, session_id: str) -> Tuple[bool, str]:
"""Check if decision-density checkpoint should be created."""
# Count decisions in recent window
window_start = (
datetime.now(timezone.utc) -
timedelta(minutes=self.config.decision_window_minutes)
).isoformat()
cursor = conn.execute("""
SELECT COUNT(*) as decision_count
FROM messages
WHERE session_id = ?
AND timestamp >= ?
AND (
content LIKE '%decision%'
OR content LIKE '%decided%'
OR content LIKE '%choosing%'
OR content LIKE '%selected%'
OR content LIKE '%ADR%'
)
""", (session_id, window_start))
row = cursor.fetchone()
decision_count = row["decision_count"] if row else 0
if decision_count >= self.config.decision_threshold:
return True, f"{decision_count} decisions in last {self.config.decision_window_minutes}m"
return False, f"Only {decision_count} decisions (need {self.config.decision_threshold})"
def _check_task_completion_trigger(self, conn: sqlite3.Connection, session_id: str) -> Tuple[bool, str]:
"""Check if task completion checkpoint should be created."""
# Look for recent task completion markers
cursor = conn.execute("""
SELECT COUNT(*) as completion_count
FROM messages
WHERE session_id = ?
AND timestamp >= datetime('now', '-15 minutes')
AND (
content LIKE '%✅%'
OR content LIKE '%completed%'
OR content LIKE '%COMPLETE%'
OR content LIKE '%finished%'
OR content LIKE '%done%'
)
""", (session_id,))
row = cursor.fetchone()
count = row["completion_count"] if row else 0
if count >= 3: # Multiple completion markers
return True, f"{count} completion markers in last 15m"
return False, f"Only {count} completion markers"
def create_checkpoint(
self,
session_id: str,
trigger: CheckpointTrigger = CheckpointTrigger.MANUAL,
context_percent: Optional[float] = None,
include_resume_context: Optional[bool] = None,
force: bool = False,
) -> CheckpointResult:
"""Create an automatic checkpoint.
Args:
session_id: Session to checkpoint
trigger: What triggered the checkpoint
context_percent: Current context usage percentage
include_resume_context: Include J.8.4 resume context
force: Create even if trigger conditions not met
Returns:
CheckpointResult with creation details
"""
checkpoint_id = str(uuid.uuid4())[:8]
# Check if we should create
if not force:
should_create, reason = self.should_create_checkpoint(
session_id, trigger, context_percent
)
if not should_create:
return CheckpointResult(
success=False,
checkpoint_id=checkpoint_id,
trigger=trigger,
session_id=session_id,
summary=f"Skipped: {reason}",
)
conn = self._get_db_connection()
self._ensure_checkpoint_table(conn)
try:
# Gather session statistics
cursor = conn.execute("""
SELECT
COUNT(*) as msg_count,
MIN(timestamp) as first_msg,
MAX(timestamp) as last_msg
FROM messages
WHERE session_id = ?
""", (session_id,))
stats = cursor.fetchone()
msg_count = stats["msg_count"] if stats else 0
# Count decisions
cursor = conn.execute("""
SELECT COUNT(*) as decision_count
FROM messages
WHERE session_id = ?
AND (
content LIKE '%decision%'
OR content LIKE '%decided%'
OR content LIKE '%ADR%'
)
""", (session_id,))
decision_row = cursor.fetchone()
decision_count = decision_row["decision_count"] if decision_row else 0
# Generate resume context if requested
resume_context = None
if include_resume_context is None:
include_resume_context = self.config.include_resume_context
if include_resume_context and self._session_resume:
try:
optimizer = self._session_resume()
resume_ctx = optimizer.get_quick_resume(session_id)
resume_context = resume_ctx[:self.config.max_checkpoint_size_kb * 1024]
except Exception as e:
logger.warning(f"Failed to generate resume context: {e}")
# Create checkpoint file
timestamp = datetime.now(timezone.utc)
filename = f"auto-{timestamp.strftime('%Y-%m-%dT%H-%M-%SZ')}-{checkpoint_id}.json"
file_path = self.config.checkpoints_dir / filename
checkpoint_data = {
"checkpoint_id": checkpoint_id,
"session_id": session_id,
"trigger": trigger.value,
"created_at": timestamp.isoformat(),
"statistics": {
"message_count": msg_count,
"decision_count": decision_count,
"context_percent": context_percent,
"first_message": stats["first_msg"] if stats else None,
"last_message": stats["last_msg"] if stats else None,
},
"resume_context": resume_context,
}
# Add session links if available
if self.config.include_session_links and self._session_linker:
try:
linker = self._session_linker()
links = linker.get_session_links(session_id)
checkpoint_data["session_links"] = [
{
"linked_session": link.linked_session_id,
"relationship": link.relationship,
"description": link.description,
}
for link in links[:5] # Limit to 5 links
]
except Exception as e:
logger.warning(f"Failed to get session links: {e}")
# Write checkpoint file
with open(file_path, "w") as f:
json.dump(checkpoint_data, f, indent=2)
# Store in database
summary = f"Auto-checkpoint: {msg_count} messages, {decision_count} decisions"
conn.execute("""
INSERT INTO auto_checkpoints (
id, session_id, trigger, file_path, summary,
message_count, decision_count, context_percent,
resume_context, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
checkpoint_id,
session_id,
trigger.value,
str(file_path),
summary,
msg_count,
decision_count,
context_percent,
resume_context[:1000] if resume_context else None, # Truncate for DB
timestamp.isoformat(),
))
conn.commit()
logger.info(f"Created checkpoint {checkpoint_id} for session {session_id[:8]}...")
return CheckpointResult(
success=True,
checkpoint_id=checkpoint_id,
trigger=trigger,
session_id=session_id,
file_path=file_path,
summary=summary,
message_count=msg_count,
decision_count=decision_count,
)
except Exception as e:
logger.error(f"Failed to create checkpoint: {e}")
return CheckpointResult(
success=False,
checkpoint_id=checkpoint_id,
trigger=trigger,
session_id=session_id,
error=str(e),
)
finally:
conn.close()
def get_checkpoints(
self,
session_id: Optional[str] = None,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""Get recent checkpoints.
Args:
session_id: Filter by session (optional)
limit: Maximum number of checkpoints to return
Returns:
List of checkpoint records
"""
conn = self._get_db_connection()
self._ensure_checkpoint_table(conn)
try:
if session_id:
cursor = conn.execute("""
SELECT * FROM auto_checkpoints
WHERE session_id = ?
ORDER BY created_at DESC
LIMIT ?
""", (session_id, limit))
else:
cursor = conn.execute("""
SELECT * FROM auto_checkpoints
ORDER BY created_at DESC
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
finally:
conn.close()
def cleanup_old_checkpoints(self, max_age_days: int = 30) -> int:
"""Remove checkpoints older than max_age_days.
Args:
max_age_days: Maximum age of checkpoints to keep
Returns:
Number of checkpoints removed
"""
conn = self._get_db_connection()
self._ensure_checkpoint_table(conn)
try:
cutoff = (datetime.now(timezone.utc) - timedelta(days=max_age_days)).isoformat()
# Get file paths to delete
cursor = conn.execute("""
SELECT file_path FROM auto_checkpoints
WHERE created_at < ?
""", (cutoff,))
deleted_count = 0
for row in cursor.fetchall():
if row["file_path"]:
try:
path = Path(row["file_path"])
if path.exists():
path.unlink()
deleted_count += 1
except Exception as e:
logger.warning(f"Failed to delete {row['file_path']}: {e}")
# Remove from database
conn.execute("""
DELETE FROM auto_checkpoints
WHERE created_at < ?
""", (cutoff,))
conn.commit()
logger.info(f"Cleaned up {deleted_count} old checkpoints")
return deleted_count
finally:
conn.close()
Convenience functions
def should_create_checkpoint( session_id: str, trigger: CheckpointTrigger = CheckpointTrigger.TIME_BASED, context_percent: Optional[float] = None, ) -> Tuple[bool, str]: """Check if a checkpoint should be created.""" checkpointer = AutoCheckpointer() return checkpointer.should_create_checkpoint(session_id, trigger, context_percent)
def create_auto_checkpoint( session_id: str, trigger: CheckpointTrigger = CheckpointTrigger.MANUAL, context_percent: Optional[float] = None, force: bool = False, ) -> CheckpointResult: """Create an automatic checkpoint.""" checkpointer = AutoCheckpointer() return checkpointer.create_checkpoint( session_id, trigger, context_percent=context_percent, force=force )
def format_checkpoint_output(result: CheckpointResult, verbose: bool = False) -> str: """Format checkpoint result for display.""" lines = []
if result.success:
lines.append(f"✅ Checkpoint created: {result.checkpoint_id}")
lines.append(f" Session: {result.session_id[:16]}...")
lines.append(f" Trigger: {result.trigger.value}")
lines.append(f" Messages: {result.message_count}")
lines.append(f" Decisions: {result.decision_count}")
if result.file_path:
lines.append(f" File: {result.file_path.name}")
else:
lines.append(f"⏭ Checkpoint skipped: {result.checkpoint_id}")
lines.append(f" Reason: {result.summary or result.error}")
if verbose:
lines.append(f" Created: {result.created_at}")
return "\n".join(lines)
def format_checkpoint_help() -> str: """Return help text for auto checkpoint feature.""" return """ Auto Checkpoint (J.9.2) - Session Auto-Preservation
Creates lightweight checkpoints automatically based on triggers: • time_based - After 30 minutes of active work • decision_density - When 5+ decisions detected in 15 minutes • context_threshold - When context usage exceeds 70% • task_completion - After multiple task completions • session_end - At end of session • error_recovery - After recovering from errors
Examples:
Check if checkpoint needed
python3 auto_checkpoint.py --session SESSION_ID --check
Create checkpoint with specific trigger
python3 auto_checkpoint.py --session SESSION_ID --trigger time_based
Force create checkpoint
python3 auto_checkpoint.py --session SESSION_ID --force
List recent checkpoints
python3 auto_checkpoint.py --list --limit 20
Cleanup old checkpoints
python3 auto_checkpoint.py --cleanup --max-age 30
Integration: This module integrates with:
- J.8.3 Session Linker: Cross-session context linking
- J.8.4 Session Resume: Optimized resume context
- J.9.1 Context Watcher: Daemon monitoring """
def main(): """Main entry point for CLI usage.""" parser = argparse.ArgumentParser( description="J.9.2: Automatic Checkpoint Creation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=format_checkpoint_help(), )
parser.add_argument(
"--session",
metavar="ID",
help="Session ID to checkpoint",
)
parser.add_argument(
"--trigger",
choices=[t.value for t in CheckpointTrigger],
default="manual",
help="Checkpoint trigger type (default: manual)",
)
parser.add_argument(
"--context-percent",
type=float,
help="Current context usage percentage",
)
parser.add_argument(
"--check",
action="store_true",
help="Only check if checkpoint needed (don't create)",
)
parser.add_argument(
"--force",
action="store_true",
help="Force create checkpoint even if triggers not met",
)
parser.add_argument(
"--list",
action="store_true",
help="List recent checkpoints",
)
parser.add_argument(
"--limit",
type=int,
default=10,
help="Limit for --list (default: 10)",
)
parser.add_argument(
"--cleanup",
action="store_true",
help="Cleanup old checkpoints",
)
parser.add_argument(
"--max-age",
type=int,
default=30,
help="Max age in days for --cleanup (default: 30)",
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Verbose output",
)
parser.add_argument(
"--json",
action="store_true",
help="Output as JSON",
)
parser.add_argument(
"--help-triggers",
action="store_true",
help="Show help for checkpoint triggers",
)
args = parser.parse_args()
# Help triggers
if args.help_triggers:
print(format_checkpoint_help())
return 0
checkpointer = AutoCheckpointer()
# List checkpoints
if args.list:
checkpoints = checkpointer.get_checkpoints(
session_id=args.session,
limit=args.limit,
)
if args.json:
print(json.dumps(checkpoints, indent=2))
else:
print(f"Recent checkpoints ({len(checkpoints)}):\n")
for cp in checkpoints:
print(f" {cp['id']}: {cp['trigger']} - {cp['summary']}")
print(f" Created: {cp['created_at']}")
print()
return 0
# Cleanup
if args.cleanup:
deleted = checkpointer.cleanup_old_checkpoints(max_age_days=args.max_age)
print(f"Cleaned up {deleted} checkpoints older than {args.max_age} days")
return 0
# Require session ID for other operations
if not args.session:
print("Error: --session required for checkpoint operations")
print("Use --list to see recent checkpoints without session ID")
return 1
trigger = CheckpointTrigger(args.trigger)
# Check only
if args.check:
should_create, reason = checkpointer.should_create_checkpoint(
args.session, trigger, args.context_percent
)
if args.json:
print(json.dumps({
"should_create": should_create,
"reason": reason,
"trigger": trigger.value,
"session_id": args.session,
}, indent=2))
else:
status = "✅ Yes" if should_create else "❌ No"
print(f"Should create checkpoint: {status}")
print(f" Reason: {reason}")
return 0 if should_create else 1
# Create checkpoint
result = checkpointer.create_checkpoint(
session_id=args.session,
trigger=trigger,
context_percent=args.context_percent,
force=args.force,
)
if args.json:
print(json.dumps(result.to_dict(), indent=2))
else:
print(format_checkpoint_output(result, verbose=args.verbose))
return 0 if result.success else 1
if name == "main": sys.exit(main())