Skip to main content

#!/usr/bin/env python3 """ CODITECT Agent Checkpoint and Handoff Protocol (ADR-108)

Implements structured checkpoint persistence for agent handoffs, enabling fresh-context iterations while maintaining task continuity.

Key Insight from Ralph Wiggum: "Single-context loops degrade; fresh-context iterations maintain quality."

Features:

  • Checkpoint schema with metadata, execution state, metrics
  • FoundationDB key structure for persistence
  • Handoff protocol with triggers and recovery
  • Compliance audit trail with cryptographic signing

Usage: from scripts.core.ralph_wiggum import CheckpointService, Checkpoint

service = CheckpointService()
checkpoint = await service.create_checkpoint(
task_id="task-2026-01-24-001",
agent_id="implementation-agent-3",
state=ExecutionState(phase=ExecutionPhase.IMPLEMENTING, ...)
)

Author: CODITECT Framework Version: 1.0.0 Created: January 24, 2026 ADR Reference: ADR-108-agent-checkpoint-handoff-protocol.md """

import hashlib import json import logging import os import uuid from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Callable, Dict, List, Optional, TypeVar

Configure logging

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

=============================================================================

EXCEPTIONS

=============================================================================

class CheckpointError(Exception): """Base exception for checkpoint operations.""" pass

class CheckpointNotFoundError(CheckpointError): """Checkpoint does not exist.""" pass

class CheckpointCorruptionError(CheckpointError): """Checkpoint data integrity check failed.""" pass

class HandoffError(CheckpointError): """Error during agent handoff.""" pass

=============================================================================

ENUMS

=============================================================================

class ExecutionPhase(Enum): """Agent execution phases.""" PLANNING = "planning" IMPLEMENTING = "implementing" TESTING = "testing" REVIEWING = "reviewing" HANDOFF = "handoff" COMPLETE = "complete"

class HandoffTrigger(Enum): """Events that trigger agent handoff.""" CONTEXT_THRESHOLD = "context_threshold" # Context > 70% PHASE_COMPLETE = "phase_complete" # Phase boundary reached ERROR_THRESHOLD = "error_threshold" # > 3 consecutive errors EXPLICIT_REQUEST = "explicit_request" # Agent requests handoff TOKEN_BUDGET = "token_budget" # > 80% of task budget TIMEOUT = "timeout" # Max time exceeded

class AgentType(Enum): """Types of agents in the system.""" ARCHITECTURE = "architecture" IMPLEMENTATION = "implementation" QA = "qa" DOCUMENTATION = "documentation" ORCHESTRATOR = "orchestrator"

=============================================================================

DATA MODELS

=============================================================================

@dataclass class CheckpointMetadata: """Metadata for a checkpoint.""" checkpoint_id: str = field(default_factory=lambda: str(uuid.uuid4())) task_id: str = "" agent_id: str = "" agent_type: str = AgentType.IMPLEMENTATION.value iteration: int = 1 timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CheckpointMetadata":
"""Create from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

@dataclass class ExecutionState: """Current execution state of the agent.""" phase: str = ExecutionPhase.PLANNING.value completed_items: List[str] = field(default_factory=list) pending_items: List[str] = field(default_factory=list) blocked_items: List[str] = field(default_factory=list) current_focus: str = ""

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ExecutionState":
"""Create from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

@dataclass class ContextSummary: """Summary of context for handoff.""" key_decisions: List[str] = field(default_factory=list) assumptions: List[str] = field(default_factory=list) constraints: List[str] = field(default_factory=list) external_dependencies: List[str] = field(default_factory=list)

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ContextSummary":
"""Create from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

@dataclass class TestStatus: """Test execution status.""" passed: int = 0 failed: int = 0 skipped: int = 0 coverage_percent: float = 0.0

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@dataclass class CheckpointMetrics: """Metrics captured at checkpoint.""" tokens_consumed: int = 0 tools_invoked: int = 0 files_modified: List[str] = field(default_factory=list) tests_status: TestStatus = field(default_factory=TestStatus) duration_seconds: float = 0.0 error_count: int = 0

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
data = asdict(self)
data["tests_status"] = self.tests_status.to_dict()
return data

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CheckpointMetrics":
"""Create from dictionary."""
if "tests_status" in data and isinstance(data["tests_status"], dict):
data["tests_status"] = TestStatus(**data["tests_status"])
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

@dataclass class RecoveryInfo: """Information for recovery from checkpoint.""" last_successful_state: str = "" # Reference to prior checkpoint rollback_instructions: str = "" # How to undo current iteration continuation_prompt: str = "" # Prompt to resume work

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "RecoveryInfo":
"""Create from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

@dataclass class ComplianceInfo: """Compliance and audit trail information.""" event_log_ref: str = "" # FoundationDB event stream ref hash: str = "" # SHA-256 of checkpoint content signature: str = "" # Optional cryptographic signature retention_policy: str = "default" # Compliance retention requirement

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ComplianceInfo":
"""Create from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

@dataclass class Checkpoint: """ Complete checkpoint for agent handoff.

This is the primary data structure for persisting agent state between
context windows, enabling fresh-context iterations.
"""
version: str = "1.0"
metadata: CheckpointMetadata = field(default_factory=CheckpointMetadata)
execution_state: ExecutionState = field(default_factory=ExecutionState)
context_summary: ContextSummary = field(default_factory=ContextSummary)
metrics: CheckpointMetrics = field(default_factory=CheckpointMetrics)
recovery: RecoveryInfo = field(default_factory=RecoveryInfo)
compliance: ComplianceInfo = field(default_factory=ComplianceInfo)

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"version": self.version,
"metadata": self.metadata.to_dict(),
"execution_state": self.execution_state.to_dict(),
"context_summary": self.context_summary.to_dict(),
"metrics": self.metrics.to_dict(),
"recovery": self.recovery.to_dict(),
"compliance": self.compliance.to_dict(),
}

def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str, indent=2)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Checkpoint":
"""Create Checkpoint from dictionary."""
return cls(
version=data.get("version", "1.0"),
metadata=CheckpointMetadata.from_dict(data.get("metadata", {})),
execution_state=ExecutionState.from_dict(data.get("execution_state", {})),
context_summary=ContextSummary.from_dict(data.get("context_summary", {})),
metrics=CheckpointMetrics.from_dict(data.get("metrics", {})),
recovery=RecoveryInfo.from_dict(data.get("recovery", {})),
compliance=ComplianceInfo.from_dict(data.get("compliance", {})),
)

@classmethod
def from_json(cls, json_str: str) -> "Checkpoint":
"""Create Checkpoint from JSON string."""
return cls.from_dict(json.loads(json_str))

def compute_hash(self) -> str:
"""Compute SHA-256 hash of checkpoint content."""
# Exclude compliance hash from the hash computation
data = self.to_dict()
data["compliance"]["hash"] = ""
content = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(content.encode()).hexdigest()

def verify_integrity(self) -> bool:
"""Verify checkpoint integrity using stored hash."""
if not self.compliance.hash:
return True # No hash stored, skip verification
return self.compute_hash() == self.compliance.hash

=============================================================================

HANDOFF PROTOCOL

=============================================================================

@dataclass class HandoffRequest: """Request for agent handoff.""" trigger: str = HandoffTrigger.CONTEXT_THRESHOLD.value checkpoint: Checkpoint = field(default_factory=Checkpoint) target_agent_type: str = "" # Empty means same agent type priority: int = 5 # 1-10, higher = more urgent

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"trigger": self.trigger,
"checkpoint": self.checkpoint.to_dict(),
"target_agent_type": self.target_agent_type,
"priority": self.priority,
}

class HandoffProtocol: """ Implements the handoff protocol between agent iterations.

Protocol Steps:
1. Detect handoff trigger (context > 70%, phase complete, errors, etc.)
2. Generate continuation_prompt summarizing state
3. Write final checkpoint with phase="handoff"
4. Emit AGENT_HANDOFF event to orchestrator
5. Orchestrator validates checkpoint integrity
6. Orchestrator spawns new agent with fresh context
7. New agent reads checkpoint and acknowledges
8. New agent creates new checkpoint linking to parent
"""

# Thresholds for handoff triggers
CONTEXT_THRESHOLD = 0.70 # 70% context utilization
ERROR_THRESHOLD = 3 # Consecutive errors
TOKEN_BUDGET_THRESHOLD = 0.80 # 80% of task budget

@staticmethod
def should_handoff(
context_utilization: float,
consecutive_errors: int,
token_budget_used: float,
phase_complete: bool = False,
explicit_request: bool = False,
) -> Optional[HandoffTrigger]:
"""
Determine if handoff should be triggered.

Returns the trigger type if handoff should occur, None otherwise.
"""
if explicit_request:
return HandoffTrigger.EXPLICIT_REQUEST

if phase_complete:
return HandoffTrigger.PHASE_COMPLETE

if consecutive_errors >= HandoffProtocol.ERROR_THRESHOLD:
return HandoffTrigger.ERROR_THRESHOLD

if context_utilization >= HandoffProtocol.CONTEXT_THRESHOLD:
return HandoffTrigger.CONTEXT_THRESHOLD

if token_budget_used >= HandoffProtocol.TOKEN_BUDGET_THRESHOLD:
return HandoffTrigger.TOKEN_BUDGET

return None

@staticmethod
def generate_continuation_prompt(checkpoint: Checkpoint) -> str:
"""
Generate a continuation prompt for the next agent.

This prompt summarizes what was accomplished, what remains,
and recommended next steps.
"""
state = checkpoint.execution_state
context = checkpoint.context_summary

prompt_parts = [
"## Continuation from Previous Agent",
"",
f"**Task ID:** {checkpoint.metadata.task_id}",
f"**Iteration:** {checkpoint.metadata.iteration}",
f"**Current Phase:** {state.phase}",
"",
"### Completed Work",
]

if state.completed_items:
for item in state.completed_items:
prompt_parts.append(f"- {item}")
else:
prompt_parts.append("- No items completed yet")

prompt_parts.extend([
"",
"### Pending Work",
])

if state.pending_items:
for item in state.pending_items:
prompt_parts.append(f"- {item}")
else:
prompt_parts.append("- No pending items")

if state.blocked_items:
prompt_parts.extend([
"",
"### Blocked Items",
])
for item in state.blocked_items:
prompt_parts.append(f"- {item}")

if context.key_decisions:
prompt_parts.extend([
"",
"### Key Decisions Made",
])
for decision in context.key_decisions:
prompt_parts.append(f"- {decision}")

if context.constraints:
prompt_parts.extend([
"",
"### Active Constraints",
])
for constraint in context.constraints:
prompt_parts.append(f"- {constraint}")

if state.current_focus:
prompt_parts.extend([
"",
f"### Current Focus",
f"Continue work on: {state.current_focus}",
])

prompt_parts.extend([
"",
"### Instructions",
"1. Review the completed and pending work above",
"2. Continue from where the previous agent left off",
"3. Create a new checkpoint when you complete significant work",
"4. Request handoff if you approach context limits",
])

return "\n".join(prompt_parts)

=============================================================================

CHECKPOINT SERVICE

=============================================================================

class CheckpointService: """ Service for managing checkpoints.

Provides CRUD operations for checkpoints with optional FoundationDB
persistence and local file fallback.

FoundationDB Key Structure:
- /coditect/checkpoints/{task_id}/{checkpoint_id}
- /coditect/checkpoints/by-agent/{agent_id}/{timestamp}
- /coditect/checkpoints/latest/{task_id}
"""

def __init__(
self,
storage_path: Optional[Path] = None,
use_fdb: bool = False,
):
"""
Initialize checkpoint service.

Args:
storage_path: Local path for checkpoint storage (fallback)
use_fdb: Whether to use FoundationDB (requires fdb module)
"""
# ADR-114 & ADR-118: Checkpoints are Tier 3 data (regenerable)
_user_data = Path.home() / "PROJECTS" / ".coditect-data"
default_path = _user_data / "checkpoints" if _user_data.exists() else Path.home() / ".coditect" / "checkpoints"
self.storage_path = storage_path or default_path
self.storage_path.mkdir(parents=True, exist_ok=True)
self.use_fdb = use_fdb
self._fdb_client = None

if use_fdb:
try:
import fdb
fdb.api_version(710)
self._fdb_client = fdb.open()
logger.info("Connected to FoundationDB")
except ImportError:
logger.warning("FoundationDB module not available, using local storage")
self.use_fdb = False
except Exception as e:
logger.warning(f"Failed to connect to FoundationDB: {e}, using local storage")
self.use_fdb = False

def _get_checkpoint_path(self, task_id: str, checkpoint_id: str) -> Path:
"""Get local file path for checkpoint."""
task_dir = self.storage_path / task_id
task_dir.mkdir(parents=True, exist_ok=True)
return task_dir / f"{checkpoint_id}.json"

def create_checkpoint(
self,
task_id: str,
agent_id: str,
execution_state: ExecutionState,
context_summary: Optional[ContextSummary] = None,
metrics: Optional[CheckpointMetrics] = None,
agent_type: str = AgentType.IMPLEMENTATION.value,
iteration: int = 1,
) -> Checkpoint:
"""
Create a new checkpoint.

Args:
task_id: Parent task identifier
agent_id: Executing agent identifier
execution_state: Current execution state
context_summary: Optional context summary
metrics: Optional checkpoint metrics
agent_type: Type of agent
iteration: Loop iteration number

Returns:
Created Checkpoint with generated ID and hash
"""
checkpoint = Checkpoint(
metadata=CheckpointMetadata(
task_id=task_id,
agent_id=agent_id,
agent_type=agent_type,
iteration=iteration,
),
execution_state=execution_state,
context_summary=context_summary or ContextSummary(),
metrics=metrics or CheckpointMetrics(),
)

# Generate continuation prompt
checkpoint.recovery.continuation_prompt = HandoffProtocol.generate_continuation_prompt(checkpoint)

# Set event_log_ref BEFORE computing hash (all fields must be set before hash)
checkpoint.compliance.event_log_ref = f"fdb://events/{task_id}/checkpoint/{checkpoint.metadata.checkpoint_id}"

# Compute and store hash for integrity (after all other fields are set)
checkpoint.compliance.hash = checkpoint.compute_hash()

# Persist checkpoint
self._save_checkpoint(checkpoint)

logger.info(
f"Created checkpoint {checkpoint.metadata.checkpoint_id} "
f"for task {task_id}, iteration {iteration}"
)

return checkpoint

def _save_checkpoint(self, checkpoint: Checkpoint) -> None:
"""Save checkpoint to storage."""
if self.use_fdb and self._fdb_client:
self._save_to_fdb(checkpoint)
else:
self._save_to_file(checkpoint)

def _save_to_file(self, checkpoint: Checkpoint) -> None:
"""Save checkpoint to local file."""
path = self._get_checkpoint_path(
checkpoint.metadata.task_id,
checkpoint.metadata.checkpoint_id,
)
path.write_text(checkpoint.to_json())

# Update latest pointer
latest_path = self.storage_path / checkpoint.metadata.task_id / "latest.json"
latest_path.write_text(json.dumps({
"checkpoint_id": checkpoint.metadata.checkpoint_id,
"timestamp": checkpoint.metadata.timestamp,
}))

def _save_to_fdb(self, checkpoint: Checkpoint) -> None:
"""Save checkpoint to FoundationDB."""
if not self._fdb_client:
raise CheckpointError("FoundationDB client not available")

import fdb

@fdb.transactional
def save_txn(tr):
task_id = checkpoint.metadata.task_id
checkpoint_id = checkpoint.metadata.checkpoint_id
agent_id = checkpoint.metadata.agent_id
timestamp = checkpoint.metadata.timestamp

# Primary key
primary_key = f"/coditect/checkpoints/{task_id}/{checkpoint_id}"
tr[primary_key.encode()] = checkpoint.to_json().encode()

# By-agent index
agent_key = f"/coditect/checkpoints/by-agent/{agent_id}/{timestamp}"
tr[agent_key.encode()] = checkpoint_id.encode()

# Latest pointer
latest_key = f"/coditect/checkpoints/latest/{task_id}"
tr[latest_key.encode()] = checkpoint_id.encode()

save_txn(self._fdb_client)

def get_checkpoint(self, task_id: str, checkpoint_id: str) -> Checkpoint:
"""
Retrieve a checkpoint by ID.

Args:
task_id: Parent task identifier
checkpoint_id: Checkpoint identifier

Returns:
Retrieved Checkpoint

Raises:
CheckpointNotFoundError: If checkpoint doesn't exist
CheckpointCorruptionError: If integrity check fails
"""
if self.use_fdb and self._fdb_client:
return self._get_from_fdb(task_id, checkpoint_id)
else:
return self._get_from_file(task_id, checkpoint_id)

def _get_from_file(self, task_id: str, checkpoint_id: str) -> Checkpoint:
"""Get checkpoint from local file."""
path = self._get_checkpoint_path(task_id, checkpoint_id)

if not path.exists():
raise CheckpointNotFoundError(
f"Checkpoint {checkpoint_id} not found for task {task_id}"
)

checkpoint = Checkpoint.from_json(path.read_text())

if not checkpoint.verify_integrity():
raise CheckpointCorruptionError(
f"Checkpoint {checkpoint_id} failed integrity check"
)

return checkpoint

def _get_from_fdb(self, task_id: str, checkpoint_id: str) -> Checkpoint:
"""Get checkpoint from FoundationDB."""
if not self._fdb_client:
raise CheckpointError("FoundationDB client not available")

import fdb

@fdb.transactional
def get_txn(tr):
key = f"/coditect/checkpoints/{task_id}/{checkpoint_id}"
value = tr[key.encode()]
if value.present():
return value.decode()
return None

json_str = get_txn(self._fdb_client)

if not json_str:
raise CheckpointNotFoundError(
f"Checkpoint {checkpoint_id} not found for task {task_id}"
)

checkpoint = Checkpoint.from_json(json_str)

if not checkpoint.verify_integrity():
raise CheckpointCorruptionError(
f"Checkpoint {checkpoint_id} failed integrity check"
)

return checkpoint

def get_latest_checkpoint(self, task_id: str) -> Optional[Checkpoint]:
"""
Get the most recent checkpoint for a task.

Args:
task_id: Task identifier

Returns:
Latest Checkpoint or None if no checkpoints exist
"""
if self.use_fdb and self._fdb_client:
return self._get_latest_from_fdb(task_id)
else:
return self._get_latest_from_file(task_id)

def _get_latest_from_file(self, task_id: str) -> Optional[Checkpoint]:
"""Get latest checkpoint from local file."""
latest_path = self.storage_path / task_id / "latest.json"

if not latest_path.exists():
return None

latest_data = json.loads(latest_path.read_text())
checkpoint_id = latest_data.get("checkpoint_id")

if not checkpoint_id:
return None

try:
return self.get_checkpoint(task_id, checkpoint_id)
except CheckpointNotFoundError:
return None

def _get_latest_from_fdb(self, task_id: str) -> Optional[Checkpoint]:
"""Get latest checkpoint from FoundationDB."""
if not self._fdb_client:
return None

import fdb

@fdb.transactional
def get_latest_txn(tr):
key = f"/coditect/checkpoints/latest/{task_id}"
value = tr[key.encode()]
if value.present():
return value.decode()
return None

checkpoint_id = get_latest_txn(self._fdb_client)

if not checkpoint_id:
return None

try:
return self.get_checkpoint(task_id, checkpoint_id)
except CheckpointNotFoundError:
return None

def get_checkpoint_history(
self,
task_id: str,
limit: int = 10,
) -> List[Checkpoint]:
"""
Get checkpoint history for a task.

Args:
task_id: Task identifier
limit: Maximum number of checkpoints to return

Returns:
List of checkpoints, newest first
"""
if self.use_fdb and self._fdb_client:
return self._get_history_from_fdb(task_id, limit)
else:
return self._get_history_from_file(task_id, limit)

def _get_history_from_file(self, task_id: str, limit: int) -> List[Checkpoint]:
"""Get checkpoint history from local files."""
task_dir = self.storage_path / task_id

if not task_dir.exists():
return []

checkpoints = []
for path in task_dir.glob("*.json"):
if path.name == "latest.json":
continue

try:
checkpoint = Checkpoint.from_json(path.read_text())
checkpoints.append(checkpoint)
except (json.JSONDecodeError, KeyError):
logger.warning(f"Skipping invalid checkpoint file: {path}")
continue

# Sort by timestamp descending, then iteration descending as tiebreaker
checkpoints.sort(
key=lambda c: (c.metadata.timestamp, c.metadata.iteration),
reverse=True,
)

return checkpoints[:limit]

def _get_history_from_fdb(self, task_id: str, limit: int) -> List[Checkpoint]:
"""Get checkpoint history from FoundationDB."""
if not self._fdb_client:
return []

import fdb

@fdb.transactional
def get_history_txn(tr):
prefix = f"/coditect/checkpoints/{task_id}/"
checkpoints = []

for key, value in tr.get_range(
prefix.encode(),
fdb.KeySelector.first_greater_or_equal((prefix + "\xff").encode()),
limit=limit,
reverse=True,
):
try:
checkpoint = Checkpoint.from_json(value.decode())
checkpoints.append(checkpoint)
except (json.JSONDecodeError, KeyError):
continue

return checkpoints

return get_history_txn(self._fdb_client)

def link_checkpoints(
self,
parent_id: str,
child_id: str,
task_id: str,
) -> None:
"""
Link a child checkpoint to its parent.

Updates the child's recovery.last_successful_state to reference
the parent checkpoint.

Args:
parent_id: Parent checkpoint ID
child_id: Child checkpoint ID
task_id: Task identifier
"""
child = self.get_checkpoint(task_id, child_id)
child.recovery.last_successful_state = parent_id
child.compliance.hash = child.compute_hash()
self._save_checkpoint(child)

logger.info(f"Linked checkpoint {child_id} to parent {parent_id}")

=============================================================================

EXPORTS

=============================================================================

all = [ # Exceptions "CheckpointError", "CheckpointNotFoundError", "CheckpointCorruptionError", "HandoffError", # Enums "ExecutionPhase", "HandoffTrigger", "AgentType", # Data Models "CheckpointMetadata", "ExecutionState", "ContextSummary", "TestStatus", "CheckpointMetrics", "RecoveryInfo", "ComplianceInfo", "Checkpoint", # Handoff "HandoffRequest", "HandoffProtocol", # Service "CheckpointService", ]