#!/usr/bin/env python3 """ CODITECT Message Bus Health Check
Part of Track H.2: Inter-Agent Communication Infrastructure
This module provides health checking and monitoring for the RabbitMQ message bus:
- Connection health verification
- Queue depth monitoring
- Consumer status checking
- Performance metrics collection
- Alerting thresholds
Usage: # Quick health check python3 scripts/core/message_bus_health.py check
# Detailed status
python3 scripts/core/message_bus_health.py status --verbose
# Monitor continuously
python3 scripts/core/message_bus_health.py monitor --interval 30
# JSON output for automation
python3 scripts/core/message_bus_health.py status --json
Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0 """
import argparse import asyncio import json import logging import os import sys import time from dataclasses import dataclass, field, asdict from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
Configure logging
logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) logger = logging.getLogger(name)
Try to import HTTP client for management API
try: import httpx HTTPX_AVAILABLE = True except ImportError: HTTPX_AVAILABLE = False logger.debug("httpx not installed, using basic health checks only")
Import our message bus module
try: from message_bus import MessageBus, MessageBusConfig except ImportError: # Handle case where script is run directly script_dir = Path(file).parent sys.path.insert(0, str(script_dir)) from message_bus import MessageBus, MessageBusConfig
class HealthStatus(Enum): """Health check result status.""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" UNKNOWN = "unknown"
@dataclass class HealthCheckResult: """Result of a single health check.""" name: str status: HealthStatus message: str duration_ms: float = 0.0 details: Dict[str, Any] = field(default_factory=dict) timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
def to_dict(self) -> Dict[str, Any]:
result = asdict(self)
result["status"] = self.status.value
return result
@dataclass class HealthReport: """Complete health report for the message bus.""" overall_status: HealthStatus checks: List[HealthCheckResult] summary: str timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) config: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
result = asdict(self)
result["overall_status"] = self.overall_status.value
result["checks"] = [check.to_dict() for check in self.checks]
return result
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent)
@dataclass class QueueMetrics: """Metrics for a single queue.""" name: str message_count: int consumer_count: int message_rate: float = 0.0 # messages/sec memory_bytes: int = 0 state: str = "running"
@dataclass class AlertThresholds: """Thresholds for health alerts.""" queue_depth_warning: int = 1000 queue_depth_critical: int = 10000 consumer_min: int = 1 connection_timeout_ms: int = 5000 message_rate_warning: float = 100.0 # messages/sec
class MessageBusHealthChecker: """ Health checker for RabbitMQ message bus.
Performs comprehensive health checks including:
- Connection health
- Queue depth
- Consumer availability
- Message throughput
- Memory usage
"""
def __init__(
self,
config: Optional[MessageBusConfig] = None,
thresholds: Optional[AlertThresholds] = None,
):
self.config = config or MessageBusConfig.from_env()
self.thresholds = thresholds or AlertThresholds()
self._management_url = f"http://{self.config.host}:15672/api"
self._management_auth = (self.config.username, self.config.password)
async def run_all_checks(self) -> HealthReport:
"""Run all health checks and return a comprehensive report."""
checks: List[HealthCheckResult] = []
# Run checks
checks.append(await self._check_connection())
checks.append(await self._check_management_api())
if HTTPX_AVAILABLE:
checks.append(await self._check_queues())
checks.append(await self._check_exchanges())
checks.append(await self._check_consumers())
checks.append(await self._check_memory())
# Determine overall status
statuses = [check.status for check in checks]
if HealthStatus.UNHEALTHY in statuses:
overall_status = HealthStatus.UNHEALTHY
elif HealthStatus.DEGRADED in statuses:
overall_status = HealthStatus.DEGRADED
elif HealthStatus.UNKNOWN in statuses:
overall_status = HealthStatus.DEGRADED
else:
overall_status = HealthStatus.HEALTHY
# Generate summary
healthy_count = sum(1 for s in statuses if s == HealthStatus.HEALTHY)
summary = f"{healthy_count}/{len(checks)} checks passed"
if overall_status != HealthStatus.HEALTHY:
failed_checks = [c.name for c in checks if c.status != HealthStatus.HEALTHY]
summary += f" (issues: {', '.join(failed_checks)})"
return HealthReport(
overall_status=overall_status,
checks=checks,
summary=summary,
config={
"host": self.config.host,
"port": self.config.port,
"vhost": self.config.vhost,
}
)
async def _check_connection(self) -> HealthCheckResult:
"""Check if we can connect to RabbitMQ."""
start_time = time.time()
try:
bus = MessageBus(self.config)
connected = await asyncio.wait_for(
bus.connect(use_local_fallback=False),
timeout=self.thresholds.connection_timeout_ms / 1000,
)
await bus.disconnect()
duration_ms = (time.time() - start_time) * 1000
if connected:
return HealthCheckResult(
name="connection",
status=HealthStatus.HEALTHY,
message=f"Connected successfully in {duration_ms:.1f}ms",
duration_ms=duration_ms,
details={"host": self.config.host, "port": self.config.port},
)
else:
return HealthCheckResult(
name="connection",
status=HealthStatus.UNHEALTHY,
message="Connection returned False",
duration_ms=duration_ms,
)
except asyncio.TimeoutError:
return HealthCheckResult(
name="connection",
status=HealthStatus.UNHEALTHY,
message=f"Connection timed out after {self.thresholds.connection_timeout_ms}ms",
duration_ms=self.thresholds.connection_timeout_ms,
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
return HealthCheckResult(
name="connection",
status=HealthStatus.UNHEALTHY,
message=f"Connection failed: {str(e)}",
duration_ms=duration_ms,
details={"error": str(e)},
)
async def _check_management_api(self) -> HealthCheckResult:
"""Check if the RabbitMQ management API is accessible."""
if not HTTPX_AVAILABLE:
return HealthCheckResult(
name="management_api",
status=HealthStatus.UNKNOWN,
message="httpx not installed, skipping management API check",
)
start_time = time.time()
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self._management_url}/overview",
auth=self._management_auth,
timeout=5.0,
)
duration_ms = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
return HealthCheckResult(
name="management_api",
status=HealthStatus.HEALTHY,
message=f"Management API responding (RabbitMQ {data.get('rabbitmq_version', 'unknown')})",
duration_ms=duration_ms,
details={
"rabbitmq_version": data.get("rabbitmq_version"),
"erlang_version": data.get("erlang_version"),
"cluster_name": data.get("cluster_name"),
},
)
else:
return HealthCheckResult(
name="management_api",
status=HealthStatus.UNHEALTHY,
message=f"Management API returned status {response.status_code}",
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
return HealthCheckResult(
name="management_api",
status=HealthStatus.DEGRADED,
message=f"Management API check failed: {str(e)}",
duration_ms=duration_ms,
details={"error": str(e)},
)
async def _check_queues(self) -> HealthCheckResult:
"""Check queue health and depths."""
if not HTTPX_AVAILABLE:
return HealthCheckResult(
name="queues",
status=HealthStatus.UNKNOWN,
message="httpx not installed",
)
start_time = time.time()
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self._management_url}/queues/{self.config.vhost}",
auth=self._management_auth,
timeout=5.0,
)
duration_ms = (time.time() - start_time) * 1000
if response.status_code != 200:
return HealthCheckResult(
name="queues",
status=HealthStatus.DEGRADED,
message=f"Queue API returned status {response.status_code}",
duration_ms=duration_ms,
)
queues = response.json()
total_messages = sum(q.get("messages", 0) for q in queues)
queue_count = len(queues)
# Check for problematic queues
critical_queues = []
warning_queues = []
for q in queues:
messages = q.get("messages", 0)
name = q.get("name", "unknown")
if messages >= self.thresholds.queue_depth_critical:
critical_queues.append(f"{name}({messages})")
elif messages >= self.thresholds.queue_depth_warning:
warning_queues.append(f"{name}({messages})")
if critical_queues:
return HealthCheckResult(
name="queues",
status=HealthStatus.UNHEALTHY,
message=f"Critical queue depths: {', '.join(critical_queues)}",
duration_ms=duration_ms,
details={
"queue_count": queue_count,
"total_messages": total_messages,
"critical_queues": critical_queues,
"warning_queues": warning_queues,
},
)
elif warning_queues:
return HealthCheckResult(
name="queues",
status=HealthStatus.DEGRADED,
message=f"High queue depths: {', '.join(warning_queues)}",
duration_ms=duration_ms,
details={
"queue_count": queue_count,
"total_messages": total_messages,
"warning_queues": warning_queues,
},
)
else:
return HealthCheckResult(
name="queues",
status=HealthStatus.HEALTHY,
message=f"{queue_count} queues, {total_messages} total messages",
duration_ms=duration_ms,
details={
"queue_count": queue_count,
"total_messages": total_messages,
},
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
return HealthCheckResult(
name="queues",
status=HealthStatus.UNKNOWN,
message=f"Queue check failed: {str(e)}",
duration_ms=duration_ms,
)
async def _check_exchanges(self) -> HealthCheckResult:
"""Check that required exchanges exist."""
if not HTTPX_AVAILABLE:
return HealthCheckResult(
name="exchanges",
status=HealthStatus.UNKNOWN,
message="httpx not installed",
)
start_time = time.time()
required_exchanges = [
self.config.task_exchange,
self.config.broadcast_exchange,
self.config.response_exchange,
self.config.event_exchange,
]
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self._management_url}/exchanges/{self.config.vhost}",
auth=self._management_auth,
timeout=5.0,
)
duration_ms = (time.time() - start_time) * 1000
if response.status_code != 200:
return HealthCheckResult(
name="exchanges",
status=HealthStatus.DEGRADED,
message=f"Exchange API returned status {response.status_code}",
duration_ms=duration_ms,
)
exchanges = response.json()
exchange_names = {e.get("name") for e in exchanges}
missing = [ex for ex in required_exchanges if ex not in exchange_names]
if missing:
return HealthCheckResult(
name="exchanges",
status=HealthStatus.UNHEALTHY,
message=f"Missing required exchanges: {', '.join(missing)}",
duration_ms=duration_ms,
details={
"required": required_exchanges,
"missing": missing,
"found": list(exchange_names),
},
)
else:
return HealthCheckResult(
name="exchanges",
status=HealthStatus.HEALTHY,
message=f"All {len(required_exchanges)} required exchanges present",
duration_ms=duration_ms,
details={
"required": required_exchanges,
"total_exchanges": len(exchanges),
},
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
return HealthCheckResult(
name="exchanges",
status=HealthStatus.UNKNOWN,
message=f"Exchange check failed: {str(e)}",
duration_ms=duration_ms,
)
async def _check_consumers(self) -> HealthCheckResult:
"""Check consumer status."""
if not HTTPX_AVAILABLE:
return HealthCheckResult(
name="consumers",
status=HealthStatus.UNKNOWN,
message="httpx not installed",
)
start_time = time.time()
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self._management_url}/consumers/{self.config.vhost}",
auth=self._management_auth,
timeout=5.0,
)
duration_ms = (time.time() - start_time) * 1000
if response.status_code != 200:
return HealthCheckResult(
name="consumers",
status=HealthStatus.DEGRADED,
message=f"Consumer API returned status {response.status_code}",
duration_ms=duration_ms,
)
consumers = response.json()
consumer_count = len(consumers)
# Group by queue
consumers_by_queue: Dict[str, int] = {}
for c in consumers:
queue_name = c.get("queue", {}).get("name", "unknown")
consumers_by_queue[queue_name] = consumers_by_queue.get(queue_name, 0) + 1
if consumer_count == 0:
return HealthCheckResult(
name="consumers",
status=HealthStatus.DEGRADED,
message="No active consumers",
duration_ms=duration_ms,
details={"consumer_count": 0},
)
else:
return HealthCheckResult(
name="consumers",
status=HealthStatus.HEALTHY,
message=f"{consumer_count} active consumers across {len(consumers_by_queue)} queues",
duration_ms=duration_ms,
details={
"consumer_count": consumer_count,
"queues_with_consumers": len(consumers_by_queue),
"consumers_by_queue": consumers_by_queue,
},
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
return HealthCheckResult(
name="consumers",
status=HealthStatus.UNKNOWN,
message=f"Consumer check failed: {str(e)}",
duration_ms=duration_ms,
)
async def _check_memory(self) -> HealthCheckResult:
"""Check RabbitMQ memory usage."""
if not HTTPX_AVAILABLE:
return HealthCheckResult(
name="memory",
status=HealthStatus.UNKNOWN,
message="httpx not installed",
)
start_time = time.time()
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self._management_url}/nodes",
auth=self._management_auth,
timeout=5.0,
)
duration_ms = (time.time() - start_time) * 1000
if response.status_code != 200:
return HealthCheckResult(
name="memory",
status=HealthStatus.DEGRADED,
message=f"Nodes API returned status {response.status_code}",
duration_ms=duration_ms,
)
nodes = response.json()
if not nodes:
return HealthCheckResult(
name="memory",
status=HealthStatus.UNHEALTHY,
message="No RabbitMQ nodes found",
duration_ms=duration_ms,
)
node = nodes[0] # Single node for dev
mem_used = node.get("mem_used", 0)
mem_limit = node.get("mem_limit", 0)
mem_alarm = node.get("mem_alarm", False)
disk_alarm = node.get("disk_free_alarm", False)
mem_percent = (mem_used / mem_limit * 100) if mem_limit > 0 else 0
mem_used_mb = mem_used / (1024 * 1024)
mem_limit_mb = mem_limit / (1024 * 1024)
if mem_alarm or disk_alarm:
return HealthCheckResult(
name="memory",
status=HealthStatus.UNHEALTHY,
message=f"Resource alarm active (memory: {mem_alarm}, disk: {disk_alarm})",
duration_ms=duration_ms,
details={
"mem_used_mb": round(mem_used_mb, 2),
"mem_limit_mb": round(mem_limit_mb, 2),
"mem_percent": round(mem_percent, 1),
"mem_alarm": mem_alarm,
"disk_alarm": disk_alarm,
},
)
elif mem_percent > 80:
return HealthCheckResult(
name="memory",
status=HealthStatus.DEGRADED,
message=f"High memory usage: {mem_percent:.1f}%",
duration_ms=duration_ms,
details={
"mem_used_mb": round(mem_used_mb, 2),
"mem_limit_mb": round(mem_limit_mb, 2),
"mem_percent": round(mem_percent, 1),
},
)
else:
return HealthCheckResult(
name="memory",
status=HealthStatus.HEALTHY,
message=f"Memory usage: {mem_used_mb:.1f}MB / {mem_limit_mb:.1f}MB ({mem_percent:.1f}%)",
duration_ms=duration_ms,
details={
"mem_used_mb": round(mem_used_mb, 2),
"mem_limit_mb": round(mem_limit_mb, 2),
"mem_percent": round(mem_percent, 1),
},
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
return HealthCheckResult(
name="memory",
status=HealthStatus.UNKNOWN,
message=f"Memory check failed: {str(e)}",
duration_ms=duration_ms,
)
async def quick_health_check(config: Optional[MessageBusConfig] = None) -> Tuple[bool, str]: """ Perform a quick health check.
Returns:
Tuple of (is_healthy, message)
"""
checker = MessageBusHealthChecker(config)
result = await checker._check_connection()
return result.status == HealthStatus.HEALTHY, result.message
def print_report(report: HealthReport, verbose: bool = False): """Print health report to console.""" # Status colors (ANSI) colors = { HealthStatus.HEALTHY: "\033[92m", # Green HealthStatus.DEGRADED: "\033[93m", # Yellow HealthStatus.UNHEALTHY: "\033[91m", # Red HealthStatus.UNKNOWN: "\033[94m", # Blue } reset = "\033[0m"
status_color = colors.get(report.overall_status, "")
print(f"\n{'='*60}")
print(f"CODITECT Message Bus Health Report")
print(f"{'='*60}")
print(f"Overall Status: {status_color}{report.overall_status.value.upper()}{reset}")
print(f"Summary: {report.summary}")
print(f"Timestamp: {report.timestamp}")
print(f"\nHost: {report.config.get('host')}:{report.config.get('port')}")
print(f"VHost: {report.config.get('vhost')}")
print(f"\n{'-'*60}")
print("Checks:")
for check in report.checks:
status_color = colors.get(check.status, "")
status_icon = {
HealthStatus.HEALTHY: "[OK]",
HealthStatus.DEGRADED: "[!]",
HealthStatus.UNHEALTHY: "[X]",
HealthStatus.UNKNOWN: "[?]",
}.get(check.status, "[?]")
print(f" {status_color}{status_icon}{reset} {check.name}: {check.message}")
if verbose and check.details:
for key, value in check.details.items():
print(f" {key}: {value}")
print(f"{'='*60}\n")
async def main(): """Main entry point for CLI.""" parser = argparse.ArgumentParser( description="CODITECT Message Bus Health Check", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s check Quick health check %(prog)s status --verbose Detailed status report %(prog)s status --json JSON output for automation %(prog)s monitor --interval 30 Continuous monitoring """ )
parser.add_argument(
"command",
choices=["check", "status", "monitor"],
help="Command to run"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Show detailed output"
)
parser.add_argument(
"--json",
action="store_true",
help="Output as JSON"
)
parser.add_argument(
"--interval",
type=int,
default=60,
help="Monitoring interval in seconds (for monitor command)"
)
parser.add_argument(
"--host",
default="localhost",
help="RabbitMQ host"
)
parser.add_argument(
"--port",
type=int,
default=5672,
help="RabbitMQ port"
)
parser.add_argument(
"--exit-code",
action="store_true",
help="Return non-zero exit code on unhealthy status"
)
args = parser.parse_args()
config = MessageBusConfig(host=args.host, port=args.port)
checker = MessageBusHealthChecker(config)
if args.command == "check":
is_healthy, message = await quick_health_check(config)
if args.json:
print(json.dumps({
"healthy": is_healthy,
"message": message,
"timestamp": datetime.utcnow().isoformat(),
}))
else:
status = "HEALTHY" if is_healthy else "UNHEALTHY"
print(f"Message Bus: {status} - {message}")
if args.exit_code and not is_healthy:
sys.exit(1)
elif args.command == "status":
report = await checker.run_all_checks()
if args.json:
print(report.to_json())
else:
print_report(report, verbose=args.verbose)
if args.exit_code and report.overall_status == HealthStatus.UNHEALTHY:
sys.exit(1)
elif args.command == "monitor":
print(f"Monitoring message bus health every {args.interval} seconds...")
print("Press Ctrl+C to stop.\n")
try:
while True:
report = await checker.run_all_checks()
if args.json:
print(report.to_json())
else:
print_report(report, verbose=args.verbose)
await asyncio.sleep(args.interval)
except KeyboardInterrupt:
print("\nMonitoring stopped.")
if name == "main": asyncio.run(main())