Skip to main content

#!/usr/bin/env python3 """ H.1.5: Discovery Service Health Check

Monitors the component discovery service and reports health metrics. Can be used standalone or integrated into monitoring systems.

Usage: python3 scripts/core/discovery_health_check.py # Quick health check python3 scripts/core/discovery_health_check.py --json # JSON output python3 scripts/core/discovery_health_check.py --watch # Continuous monitoring

Exit codes: 0 - Healthy 1 - Degraded (warnings) 2 - Unhealthy (errors) """

import asyncio import argparse import json import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, List

Add parent to path

sys.path.insert(0, str(Path(file).parent.parent.parent))

from scripts.core.discovery_service import ( DiscoveryService, ComponentStatus, )

class HealthStatus: """Health check status levels""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy"

async def check_backend_connectivity(service: DiscoveryService) -> Dict[str, Any]: """Check if backend is reachable""" try: await service._ensure_initialized() stats = await service.get_stats() return { "status": HealthStatus.HEALTHY, "backend": stats.get("backend", "unknown"), "message": f"Connected to {stats.get('backend', 'unknown')} backend" } except Exception as e: return { "status": HealthStatus.UNHEALTHY, "backend": "unknown", "message": f"Backend connection failed: {str(e)}" }

async def check_component_health(service: DiscoveryService) -> Dict[str, Any]: """Check overall component health distribution""" try: components = await service.list_all()

    if not components:
return {
"status": HealthStatus.DEGRADED,
"message": "No components registered",
"total": 0
}

by_status = {}
by_health = {"high": 0, "medium": 0, "low": 0, "critical": 0}

for comp in components:
status_name = comp.status.value
by_status[status_name] = by_status.get(status_name, 0) + 1

if comp.health_score >= 0.9:
by_health["high"] += 1
elif comp.health_score >= 0.7:
by_health["medium"] += 1
elif comp.health_score >= 0.5:
by_health["low"] += 1
else:
by_health["critical"] += 1

# Determine overall status
available_ratio = by_status.get("available", 0) / len(components)
critical_ratio = by_health["critical"] / len(components)

if available_ratio >= 0.9 and critical_ratio < 0.05:
status = HealthStatus.HEALTHY
elif available_ratio >= 0.7 and critical_ratio < 0.1:
status = HealthStatus.DEGRADED
else:
status = HealthStatus.UNHEALTHY

return {
"status": status,
"total": len(components),
"by_status": by_status,
"by_health": by_health,
"available_ratio": round(available_ratio, 3),
"critical_ratio": round(critical_ratio, 3)
}

except Exception as e:
return {
"status": HealthStatus.UNHEALTHY,
"message": f"Failed to check components: {str(e)}"
}

async def check_stale_components(service: DiscoveryService, stale_threshold_seconds: int = 600) -> Dict[str, Any]: """Check for stale components (no recent heartbeat)""" try: components = await service.list_all() now = datetime.now(timezone.utc)

    stale = []
for comp in components:
age_seconds = (now - comp.last_seen).total_seconds()
if age_seconds > stale_threshold_seconds:
stale.append({
"id": comp.id,
"last_seen": comp.last_seen.isoformat(),
"age_seconds": int(age_seconds)
})

stale_ratio = len(stale) / len(components) if components else 0

if stale_ratio < 0.05:
status = HealthStatus.HEALTHY
elif stale_ratio < 0.2:
status = HealthStatus.DEGRADED
else:
status = HealthStatus.UNHEALTHY

return {
"status": status,
"stale_count": len(stale),
"total_count": len(components),
"stale_ratio": round(stale_ratio, 3),
"threshold_seconds": stale_threshold_seconds,
"stale_components": stale[:10] # Only first 10
}

except Exception as e:
return {
"status": HealthStatus.UNHEALTHY,
"message": f"Failed to check stale components: {str(e)}"
}

async def check_load_distribution(service: DiscoveryService) -> Dict[str, Any]: """Check load distribution across components""" try: components = await service.list_all()

    if not components:
return {
"status": HealthStatus.DEGRADED,
"message": "No components to check load"
}

load_ratios = [comp.load_ratio for comp in components]
avg_load = sum(load_ratios) / len(load_ratios)
max_load = max(load_ratios)
overloaded = sum(1 for r in load_ratios if r > 0.8)

overloaded_ratio = overloaded / len(components)

if avg_load < 0.5 and overloaded_ratio < 0.1:
status = HealthStatus.HEALTHY
elif avg_load < 0.7 and overloaded_ratio < 0.2:
status = HealthStatus.DEGRADED
else:
status = HealthStatus.UNHEALTHY

return {
"status": status,
"avg_load_ratio": round(avg_load, 3),
"max_load_ratio": round(max_load, 3),
"overloaded_count": overloaded,
"overloaded_ratio": round(overloaded_ratio, 3)
}

except Exception as e:
return {
"status": HealthStatus.UNHEALTHY,
"message": f"Failed to check load: {str(e)}"
}

async def run_health_check(redis_url: str = None, force_local: bool = False) -> Dict[str, Any]: """Run comprehensive health check""" start_time = time.time()

service = DiscoveryService(redis_url=redis_url, force_local=force_local)

checks = {
"backend": await check_backend_connectivity(service),
"components": await check_component_health(service),
"stale": await check_stale_components(service),
"load": await check_load_distribution(service),
}

# Determine overall status
statuses = [c["status"] for c in checks.values()]
if HealthStatus.UNHEALTHY in statuses:
overall = HealthStatus.UNHEALTHY
elif HealthStatus.DEGRADED in statuses:
overall = HealthStatus.DEGRADED
else:
overall = HealthStatus.HEALTHY

return {
"overall_status": overall,
"timestamp": datetime.now(timezone.utc).isoformat(),
"duration_ms": round((time.time() - start_time) * 1000, 2),
"checks": checks
}

def print_health_report(result: Dict[str, Any], json_output: bool = False): """Print health report to console""" if json_output: print(json.dumps(result, indent=2)) return

status_icons = {
HealthStatus.HEALTHY: "✅",
HealthStatus.DEGRADED: "⚠️",
HealthStatus.UNHEALTHY: "❌"
}

print()
print("=" * 60)
print("CODITECT Discovery Service Health Check")
print("=" * 60)
print()

overall = result["overall_status"]
icon = status_icons.get(overall, "?")
print(f"Overall Status: {icon} {overall.upper()}")
print(f"Timestamp: {result['timestamp']}")
print(f"Duration: {result['duration_ms']}ms")
print()

for check_name, check_result in result["checks"].items():
status = check_result.get("status", "unknown")
icon = status_icons.get(status, "?")
print(f" {icon} {check_name.upper()}")

for key, value in check_result.items():
if key == "status":
continue
if isinstance(value, list) and len(value) > 3:
print(f" {key}: [{len(value)} items]")
else:
print(f" {key}: {value}")
print()

async def watch_health(redis_url: str = None, force_local: bool = False, interval: int = 30): """Continuously monitor health""" print(f"Watching discovery service health (interval: {interval}s)") print("Press Ctrl+C to stop") print()

while True:
result = await run_health_check(redis_url=redis_url, force_local=force_local)
print_health_report(result)
print("-" * 60)
await asyncio.sleep(interval)

def main(): parser = argparse.ArgumentParser(description="Discovery Service Health Check") parser.add_argument("--redis", help="Redis URL") parser.add_argument("--local", action="store_true", help="Force local backend") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--watch", action="store_true", help="Continuous monitoring") parser.add_argument("--interval", type=int, default=30, help="Watch interval in seconds")

args = parser.parse_args()

if args.watch:
try:
asyncio.run(watch_health(
redis_url=args.redis,
force_local=args.local,
interval=args.interval
))
except KeyboardInterrupt:
print("\nStopped")
sys.exit(0)
else:
result = asyncio.run(run_health_check(
redis_url=args.redis,
force_local=args.local
))
print_health_report(result, json_output=args.json)

# Exit code based on status
if result["overall_status"] == HealthStatus.HEALTHY:
sys.exit(0)
elif result["overall_status"] == HealthStatus.DEGRADED:
sys.exit(1)
else:
sys.exit(2)

if name == "main": main()