Skip to main content

#!/usr/bin/env python3 """ CODITECT Agent Health Monitoring Layer (ADR-110)

Implements health monitoring for autonomous agent loops with stuck detection, graduated intervention, circuit breakers, and self-healing recovery.

Key Insight from Gas Town Architecture: GUPP principle: "If there is work on your Hook, YOU MUST RUN IT."

Features:

  • 5-state health model (HEALTHY → DEGRADED → STUCK → FAILING → TERMINATED)
  • Heartbeat protocol with 5-minute intervals
  • Graduated intervention (nudge → escalate → terminate)
  • Circuit breaker pattern for fault tolerance
  • Self-healing recovery from checkpoints

Usage: from scripts.core.ralph_wiggum import HealthMonitoringService, HealthState

service = HealthMonitoringService()
health = await service.get_agent_health(agent_id)

if health.state == HealthState.STUCK:
await service.send_nudge(agent_id)

Author: CODITECT Framework Version: 1.0.0 Created: January 24, 2026 ADR Reference: ADR-110-agent-health-monitoring.md """

import asyncio import json import logging import time from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Any, Callable, Dict, List, Optional, TypeVar

Configure logging

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

=============================================================================

EXCEPTIONS

=============================================================================

class HealthMonitoringError(Exception): """Base exception for health monitoring.""" pass

class AgentNotFoundError(HealthMonitoringError): """Agent not registered with health monitor.""" pass

class InterventionError(HealthMonitoringError): """Error during intervention.""" pass

class RecoveryError(HealthMonitoringError): """Error during recovery.""" pass

=============================================================================

ENUMS

=============================================================================

class HealthState(Enum): """Agent health states.""" HEALTHY = "healthy" # Making progress DEGRADED = "degraded" # Warning signs STUCK = "stuck" # Not making progress FAILING = "failing" # In error loop TERMINATED = "terminated" # Stopped

class InterventionLevel(Enum): """Levels of intervention.""" NUDGE = "nudge" # Soft reminder ESCALATE = "escalate" # Orchestrator alert TERMINATE = "terminate" # Force stop

class CircuitBreakerState(Enum): """Circuit breaker states.""" CLOSED = "closed" # Normal operation OPEN = "open" # Blocking requests HALF_OPEN = "half_open" # Testing recovery

=============================================================================

DATA MODELS

=============================================================================

@dataclass class HeartbeatPayload: """Heartbeat message from agent.""" agent_id: str task_id: str timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) phase: str = "" last_tool_call: str = "" token_count: int = 0 error_count: int = 0 progress_indicator: str = ""

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "HeartbeatPayload":
"""Create from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

@classmethod
def from_json(cls, json_str: str) -> "HeartbeatPayload":
"""Create from JSON string."""
return cls.from_dict(json.loads(json_str))

@dataclass class HealthMetrics: """Health metrics for an agent.""" uptime_seconds: float = 0.0 token_count: int = 0 error_count: int = 0 tool_call_count: int = 0 intervention_count: int = 0 checkpoint_count: int = 0 last_progress_timestamp: str = ""

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@dataclass class CircuitBreakerStatus: """Status of a circuit breaker.""" name: str state: str = CircuitBreakerState.CLOSED.value failure_count: int = 0 last_failure_time: Optional[str] = None next_attempt_time: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@dataclass class AgentHealth: """Complete health status for an agent.""" agent_id: str task_id: str = "" state: str = HealthState.HEALTHY.value state_changed_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) last_heartbeat: str = "" last_checkpoint: str = "" metrics: HealthMetrics = field(default_factory=HealthMetrics) circuit_breakers: List[CircuitBreakerStatus] = field(default_factory=list) nudge_count: int = 0 escalation_count: int = 0

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
data = asdict(self)
data["metrics"] = self.metrics.to_dict()
data["circuit_breakers"] = [cb.to_dict() for cb in self.circuit_breakers]
return data

def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)

=============================================================================

CIRCUIT BREAKER

=============================================================================

class CircuitBreaker: """ Circuit breaker for fault tolerance.

States:
- CLOSED: Normal operation, failures counted
- OPEN: Requests fail immediately, timer running
- HALF_OPEN: Testing recovery, limited requests allowed

Config:
- failure_threshold: Consecutive failures to trip (default: 3)
- recovery_timeout: Time in OPEN before HALF_OPEN (default: 60s)
- half_open_requests: Requests allowed in HALF_OPEN (default: 1)
"""

def __init__(
self,
name: str,
failure_threshold: int = 3,
recovery_timeout_seconds: float = 60.0,
half_open_requests: int = 1,
):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout_seconds
self.half_open_requests = half_open_requests

self._state = CircuitBreakerState.CLOSED
self._failure_count = 0
self._last_failure_time: Optional[float] = None
self._half_open_successes = 0
self._lock = asyncio.Lock()

@property
def state(self) -> CircuitBreakerState:
"""Get current state, checking for timeout transitions."""
if self._state == CircuitBreakerState.OPEN:
if self._last_failure_time is not None:
elapsed = time.time() - self._last_failure_time
if elapsed >= self.recovery_timeout:
self._state = CircuitBreakerState.HALF_OPEN
self._half_open_successes = 0
return self._state

def get_status(self) -> CircuitBreakerStatus:
"""Get circuit breaker status."""
next_attempt = None
if self._state == CircuitBreakerState.OPEN and self._last_failure_time:
next_time = self._last_failure_time + self.recovery_timeout
next_attempt = datetime.fromtimestamp(next_time, tz=timezone.utc).isoformat()

return CircuitBreakerStatus(
name=self.name,
state=self.state.value,
failure_count=self._failure_count,
last_failure_time=(
datetime.fromtimestamp(self._last_failure_time, tz=timezone.utc).isoformat()
if self._last_failure_time else None
),
next_attempt_time=next_attempt,
)

async def call(
self,
func: Callable,
*args,
**kwargs,
) -> Any:
"""
Execute function through circuit breaker.

Raises:
CircuitBreakerOpenError: If circuit is open
"""
async with self._lock:
state = self.state # This may transition OPEN → HALF_OPEN

if state == CircuitBreakerState.OPEN:
raise CircuitBreakerOpenError(
f"Circuit breaker '{self.name}' is open"
)

try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)

await self._on_success()
return result

except Exception as e:
await self._on_failure()
raise

async def _on_success(self) -> None:
"""Handle successful call."""
async with self._lock:
if self._state == CircuitBreakerState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.half_open_requests:
self._state = CircuitBreakerState.CLOSED
self._failure_count = 0
logger.info(f"Circuit breaker '{self.name}' closed after recovery")
else:
self._failure_count = 0

async def _on_failure(self) -> None:
"""Handle failed call."""
async with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()

if self._state == CircuitBreakerState.HALF_OPEN:
self._state = CircuitBreakerState.OPEN
logger.warning(f"Circuit breaker '{self.name}' re-opened after failure in half-open")
elif self._failure_count >= self.failure_threshold:
self._state = CircuitBreakerState.OPEN
logger.warning(
f"Circuit breaker '{self.name}' opened after {self._failure_count} failures"
)

async def reset(self) -> None:
"""Manually reset circuit breaker to closed state."""
async with self._lock:
self._state = CircuitBreakerState.CLOSED
self._failure_count = 0
self._last_failure_time = None
self._half_open_successes = 0
logger.info(f"Circuit breaker '{self.name}' manually reset")

class CircuitBreakerOpenError(Exception): """Raised when circuit breaker is open.""" pass

=============================================================================

HEALTH MONITORING SERVICE

=============================================================================

class HealthMonitoringService: """ Service for monitoring agent health.

Features:
- Track health state for all agents
- Process heartbeats
- Detect stuck agents
- Trigger interventions
- Manage circuit breakers
"""

# Configuration
HEARTBEAT_INTERVAL = 300 # 5 minutes
HEARTBEAT_TIMEOUT = 600 # 10 minutes (2x interval)
HEALTH_CHECK_INTERVAL = 60 # 1 minute
CHECKPOINT_STALE_THRESHOLD = 1800 # 30 minutes

# Intervention thresholds
NUDGE_MAX_ATTEMPTS = 3
NUDGE_INTERVAL = 600 # 10 minutes
ESCALATION_TIMEOUT = 900 # 15 minutes

def __init__(self):
self._agents: Dict[str, AgentHealth] = {}
self._circuit_breakers: Dict[str, CircuitBreaker] = {}
self._nudge_timestamps: Dict[str, List[float]] = {}
self._lock = asyncio.Lock()
self._state_change_callbacks: List[Callable] = []
self._message_bus = None

def on_state_change(self, callback: Callable) -> None:
"""Register a callback for health state changes.

Callback signature: callback(agent_id: str, old_state: str, new_state: str, health: AgentHealth)
"""
self._state_change_callbacks.append(callback)

def set_message_bus(self, bus) -> None:
"""Connect a SessionMessageBus for publishing health events."""
self._message_bus = bus

def _fire_state_change(self, agent_id: str, old_state: str, new_state: str, health: "AgentHealth") -> None:
"""Fire all registered state change callbacks."""
for cb in self._state_change_callbacks:
try:
cb(agent_id, old_state, new_state, health)
except Exception as e:
logger.warning(f"State change callback error: {e}")

def _publish_health_event(self, event_type: str, agent_id: str, details: Dict[str, Any]) -> None:
"""Publish a health event to the message bus."""
if self._message_bus is None:
return
try:
self._message_bus.publish(
channel="operator_alert",
message_type=event_type,
task_id=details.get("task_id", ""),
priority=2 if event_type == "agent_terminated" else 1,
activity=details,
)
except Exception as e:
logger.warning(f"Failed to publish health event to message bus: {e}")

async def register_agent(
self,
agent_id: str,
task_id: str,
) -> AgentHealth:
"""Register an agent for health monitoring."""
async with self._lock:
health = AgentHealth(
agent_id=agent_id,
task_id=task_id,
)
self._agents[agent_id] = health
self._nudge_timestamps[agent_id] = []

# Create default circuit breakers
self._circuit_breakers[f"{agent_id}:tool_execution"] = CircuitBreaker(
name=f"{agent_id}:tool_execution",
failure_threshold=5,
recovery_timeout_seconds=30,
)
self._circuit_breakers[f"{agent_id}:checkpoint_write"] = CircuitBreaker(
name=f"{agent_id}:checkpoint_write",
failure_threshold=3,
recovery_timeout_seconds=120,
)

logger.info(f"Registered agent {agent_id} for health monitoring")
return health

async def unregister_agent(self, agent_id: str) -> None:
"""Unregister an agent from health monitoring."""
async with self._lock:
if agent_id in self._agents:
del self._agents[agent_id]
if agent_id in self._nudge_timestamps:
del self._nudge_timestamps[agent_id]

# Remove circuit breakers
keys_to_remove = [k for k in self._circuit_breakers if k.startswith(f"{agent_id}:")]
for key in keys_to_remove:
del self._circuit_breakers[key]

logger.info(f"Unregistered agent {agent_id} from health monitoring")

async def record_heartbeat(self, heartbeat: HeartbeatPayload) -> None:
"""Record a heartbeat from an agent."""
async with self._lock:
agent_id = heartbeat.agent_id

if agent_id not in self._agents:
# Auto-register if not registered
self._agents[agent_id] = AgentHealth(
agent_id=agent_id,
task_id=heartbeat.task_id,
)
self._nudge_timestamps[agent_id] = []

health = self._agents[agent_id]
health.last_heartbeat = heartbeat.timestamp
health.metrics.token_count = heartbeat.token_count
health.metrics.error_count = heartbeat.error_count

if heartbeat.progress_indicator:
health.metrics.last_progress_timestamp = heartbeat.timestamp

# Update state based on heartbeat
await self._evaluate_health_state(health, heartbeat)

logger.debug(f"Recorded heartbeat from agent {agent_id}")

async def _evaluate_health_state(
self,
health: AgentHealth,
heartbeat: HeartbeatPayload,
) -> None:
"""Evaluate and update health state based on heartbeat."""
old_state = health.state

# Check for error accumulation
if heartbeat.error_count >= 3:
health.state = HealthState.FAILING.value
# Check for high error rate
elif heartbeat.error_count > 0 and health.metrics.tool_call_count > 0:
error_rate = heartbeat.error_count / max(health.metrics.tool_call_count, 1)
if error_rate > 0.25:
health.state = HealthState.DEGRADED.value
else:
health.state = HealthState.HEALTHY.value
else:
health.state = HealthState.HEALTHY.value

if health.state != old_state:
health.state_changed_at = datetime.now(timezone.utc).isoformat()
logger.info(f"Agent {health.agent_id} health state changed: {old_state} → {health.state}")
self._fire_state_change(health.agent_id, old_state, health.state, health)
self._publish_health_event("health_state_changed", health.agent_id, {
"task_id": health.task_id,
"old_state": old_state,
"new_state": health.state,
"timestamp": health.state_changed_at,
})

async def get_agent_health(self, agent_id: str) -> AgentHealth:
"""Get health status for an agent."""
async with self._lock:
if agent_id not in self._agents:
raise AgentNotFoundError(f"Agent {agent_id} not registered")

health = self._agents[agent_id]

# Check for stale heartbeat
if health.last_heartbeat:
last_hb = datetime.fromisoformat(health.last_heartbeat.replace('Z', '+00:00'))
elapsed = (datetime.now(timezone.utc) - last_hb).total_seconds()

if elapsed > self.CHECKPOINT_STALE_THRESHOLD:
if health.state != HealthState.STUCK.value:
health.state = HealthState.STUCK.value
health.state_changed_at = datetime.now(timezone.utc).isoformat()
elif elapsed > self.HEARTBEAT_TIMEOUT:
if health.state == HealthState.HEALTHY.value:
health.state = HealthState.DEGRADED.value
health.state_changed_at = datetime.now(timezone.utc).isoformat()

# Attach circuit breaker statuses
health.circuit_breakers = [
self._circuit_breakers[k].get_status()
for k in self._circuit_breakers
if k.startswith(f"{agent_id}:")
]

return health

async def get_all_agent_health(self) -> List[AgentHealth]:
"""Get health status for all agents."""
async with self._lock:
return list(self._agents.values())

async def send_nudge(self, agent_id: str, message: Optional[str] = None) -> bool:
"""
Send a nudge intervention to an agent.

Returns True if nudge was sent, False if max attempts exceeded.
"""
async with self._lock:
if agent_id not in self._agents:
raise AgentNotFoundError(f"Agent {agent_id} not registered")

health = self._agents[agent_id]
timestamps = self._nudge_timestamps.get(agent_id, [])

# Filter to recent nudges
now = time.time()
recent_nudges = [t for t in timestamps if now - t < self.NUDGE_INTERVAL]

if len(recent_nudges) >= self.NUDGE_MAX_ATTEMPTS:
logger.warning(
f"Max nudge attempts ({self.NUDGE_MAX_ATTEMPTS}) reached for agent {agent_id}"
)
return False

# Record nudge
self._nudge_timestamps[agent_id] = recent_nudges + [now]
health.nudge_count += 1
health.metrics.intervention_count += 1

nudge_message = message or self._generate_nudge_message(health)

logger.info(f"Sent nudge to agent {agent_id}: {nudge_message[:100]}...")

# In a real implementation, this would inject the message into the agent's context
# via the orchestrator or message bus

return True

def _generate_nudge_message(self, health: AgentHealth) -> str:
"""Generate a nudge message for the agent."""
elapsed = "unknown"
if health.last_heartbeat:
try:
last_hb = datetime.fromisoformat(health.last_heartbeat.replace('Z', '+00:00'))
elapsed_seconds = (datetime.now(timezone.utc) - last_hb).total_seconds()
elapsed = f"{int(elapsed_seconds / 60)} minutes"
except ValueError:
pass

return f"""REMINDER: You have been working for {elapsed} without checkpoint update.

Please either:

  • Update your progress with a checkpoint
  • Request handoff if you are stuck
  • Report any blockers you've encountered

Current state: {health.state} Nudge count: {health.nudge_count} """

async def escalate_to_orchestrator(
self,
agent_id: str,
reason: str,
) -> Dict[str, Any]:
"""
Escalate agent issues to orchestrator.

Returns escalation details.
"""
async with self._lock:
if agent_id not in self._agents:
raise AgentNotFoundError(f"Agent {agent_id} not registered")

health = self._agents[agent_id]
health.escalation_count += 1
health.metrics.intervention_count += 1

escalation = {
"agent_id": agent_id,
"task_id": health.task_id,
"reason": reason,
"health_state": health.state,
"nudge_count": health.nudge_count,
"escalation_count": health.escalation_count,
"timestamp": datetime.now(timezone.utc).isoformat(),
"recommended_actions": [
"allow_more_time",
"force_handoff",
"terminate_and_recover",
],
}

logger.warning(f"Escalated agent {agent_id} to orchestrator: {reason}")

self._publish_health_event("agent_escalated", agent_id, escalation)

return escalation

async def terminate_agent(
self,
agent_id: str,
reason: str,
) -> Dict[str, Any]:
"""
Terminate an agent.

Returns termination details.
"""
async with self._lock:
if agent_id not in self._agents:
raise AgentNotFoundError(f"Agent {agent_id} not registered")

health = self._agents[agent_id]
old_state = health.state
health.state = HealthState.TERMINATED.value
health.state_changed_at = datetime.now(timezone.utc).isoformat()

termination = {
"agent_id": agent_id,
"task_id": health.task_id,
"reason": reason,
"previous_state": old_state,
"timestamp": health.state_changed_at,
"metrics": health.metrics.to_dict(),
}

logger.warning(f"Terminated agent {agent_id}: {reason}")

self._fire_state_change(agent_id, old_state, HealthState.TERMINATED.value, health)
self._publish_health_event("agent_terminated", agent_id, termination)

return termination

def get_circuit_breaker(self, name: str) -> Optional[CircuitBreaker]:
"""Get a circuit breaker by name."""
return self._circuit_breakers.get(name)

async def reset_circuit_breaker(self, name: str) -> None:
"""Reset a circuit breaker."""
if name in self._circuit_breakers:
await self._circuit_breakers[name].reset()

=============================================================================

RECOVERY SERVICE

=============================================================================

class RecoveryService: """ Service for recovering failed agents.

Features:
- Checkpoint chain traversal
- Agent respawn from checkpoint
- Recovery limits and backoff
- Human escalation on failure
"""

# Recovery limits
MAX_ATTEMPTS_PER_TASK = 3
MAX_ATTEMPTS_PER_HOUR = 5
BACKOFF_SECONDS = [60, 120, 240] # Exponential backoff

def __init__(self, checkpoint_service=None, health_service=None):
self.checkpoint_service = checkpoint_service
self.health_service = health_service
self._recovery_attempts: Dict[str, List[float]] = {} # task_id -> timestamps
self._lock = asyncio.Lock()

async def initiate_recovery(
self,
task_id: str,
failed_agent_id: str,
) -> Dict[str, Any]:
"""
Initiate recovery for a failed agent.

Returns recovery result with new agent ID if successful.
"""
async with self._lock:
# Check recovery limits
if not await self._can_recover(task_id):
return {
"success": False,
"reason": "recovery_limit_exceeded",
"message": "Maximum recovery attempts exceeded. Human intervention required.",
"task_id": task_id,
}

# Record recovery attempt
if task_id not in self._recovery_attempts:
self._recovery_attempts[task_id] = []
self._recovery_attempts[task_id].append(time.time())

attempt_number = len(self._recovery_attempts[task_id])

logger.info(
f"Initiating recovery for task {task_id}, attempt {attempt_number}"
)

# Find valid checkpoint
checkpoint = await self._find_valid_checkpoint(task_id)

if not checkpoint:
return {
"success": False,
"reason": "no_valid_checkpoint",
"message": "No valid checkpoint found. Human intervention required.",
"task_id": task_id,
}

# Calculate backoff delay
delay = self._get_backoff_delay(attempt_number)

if delay > 0:
logger.info(f"Waiting {delay}s before spawning recovery agent")
await asyncio.sleep(delay)

# Spawn new agent (placeholder - would integrate with orchestrator)
new_agent_id = f"recovery-agent-{task_id}-{attempt_number}"

return {
"success": True,
"task_id": task_id,
"checkpoint_id": checkpoint.get("id", "unknown"),
"new_agent_id": new_agent_id,
"attempt_number": attempt_number,
"timestamp": datetime.now(timezone.utc).isoformat(),
}

async def _can_recover(self, task_id: str) -> bool:
"""Check if recovery is allowed for a task."""
attempts = self._recovery_attempts.get(task_id, [])

# Check per-task limit
if len(attempts) >= self.MAX_ATTEMPTS_PER_TASK:
return False

# Check per-hour limit
now = time.time()
hour_ago = now - 3600
recent_attempts = [t for t in attempts if t > hour_ago]

if len(recent_attempts) >= self.MAX_ATTEMPTS_PER_HOUR:
return False

return True

async def _find_valid_checkpoint(self, task_id: str) -> Optional[Dict[str, Any]]:
"""Find a valid checkpoint for recovery."""
if not self.checkpoint_service:
# Placeholder - return mock checkpoint
return {"id": "mock-checkpoint", "task_id": task_id}

# Would call checkpoint service to get latest valid checkpoint
# and traverse chain if needed
checkpoint = await self.checkpoint_service.get_latest_checkpoint(task_id)
return checkpoint.to_dict() if checkpoint else None

def _get_backoff_delay(self, attempt_number: int) -> float:
"""Get backoff delay for recovery attempt."""
if attempt_number <= 1:
return 0

index = min(attempt_number - 2, len(self.BACKOFF_SECONDS) - 1)
return self.BACKOFF_SECONDS[index]

async def get_recovery_status(self, task_id: str) -> Dict[str, Any]:
"""Get recovery status for a task."""
attempts = self._recovery_attempts.get(task_id, [])

return {
"task_id": task_id,
"total_attempts": len(attempts),
"max_attempts": self.MAX_ATTEMPTS_PER_TASK,
"can_recover": await self._can_recover(task_id),
"last_attempt": (
datetime.fromtimestamp(attempts[-1], tz=timezone.utc).isoformat()
if attempts else None
),
}

=============================================================================

EXPORTS

=============================================================================

all = [ # Exceptions "HealthMonitoringError", "AgentNotFoundError", "InterventionError", "RecoveryError", "CircuitBreakerOpenError", # Enums "HealthState", "InterventionLevel", "CircuitBreakerState", # Data Models "HeartbeatPayload", "HealthMetrics", "CircuitBreakerStatus", "AgentHealth", # Circuit Breaker "CircuitBreaker", # Services "HealthMonitoringService", "RecoveryService", ]