#!/usr/bin/env python3 """ CODITECT Health Monitoring ↔ Context-Watcher Bridge (H.8.3.6)
Bridges the async HealthMonitoringService with the synchronous context-watcher daemon via the SessionMessageBus.
Integration surfaces:
- Reads context-watcher state file → creates health heartbeats
- Registers context-watcher as a virtual agent in health monitoring
- Publishes health transitions to operator_alert channel
- Subscribes to health events for watcher-side actions
State file: ~/PROJECTS/.coditect-data/context-storage/auto-export-state.json
Author: CODITECT Framework Task: H.8.3.6 Version: 1.0.0 Created: February 2026 """
import asyncio import json import logging from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Dict, List, Optional
from .health_monitoring import ( AgentHealth, HealthMonitoringService, HealthState, HeartbeatPayload, )
logger = logging.getLogger(name)
Virtual agent ID for the context-watcher daemon
WATCHER_AGENT_ID = "context-watcher-daemon" WATCHER_TASK_ID = "J.10" # Context auto-export task
@dataclass class WatcherHealthState: """Snapshot of context-watcher health derived from state file."""
active: bool = False
context_percent: float = 0.0
tokens: int = 0
last_check: str = ""
last_export: str = ""
session_file: str = ""
pending_exports: int = 0
error_count: int = 0
health_state: str = HealthState.HEALTHY.value
@dataclass class BridgeConfig: """Configuration for the health-watcher bridge."""
state_file: Optional[Path] = None
context_critical_threshold: float = 90.0
context_warning_threshold: float = 80.0
stale_check_seconds: float = 120.0
auto_register: bool = True
class HealthWatcherBridge: """Bridge between HealthMonitoringService and context-watcher daemon.
Reads the watcher's state file, maps it to health heartbeats, and
publishes state transitions to the message bus.
"""
def __init__(
self,
health_service: HealthMonitoringService,
config: Optional[BridgeConfig] = None,
message_bus=None,
):
self._health_service = health_service
self._config = config or BridgeConfig()
self._message_bus = message_bus
self._registered = False
self._last_watcher_state: Optional[WatcherHealthState] = None
self._event_log: List[Dict[str, Any]] = []
# Resolve state file path
if self._config.state_file is None:
coditect_data = Path.home() / "PROJECTS" / ".coditect-data"
self._config.state_file = coditect_data / "context-storage" / "auto-export-state.json"
# Connect message bus to health service if provided
if self._message_bus is not None:
self._health_service.set_message_bus(self._message_bus)
# Register state change callback
self._health_service.on_state_change(self._on_health_state_change)
def read_watcher_state(self) -> WatcherHealthState:
"""Read the context-watcher's state file and map to WatcherHealthState."""
state = WatcherHealthState()
state_file = self._config.state_file
if state_file is None or not state_file.exists():
return state
try:
raw = json.loads(state_file.read_text())
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"Failed to read watcher state: {e}")
state.error_count = 1
return state
state.active = raw.get("watcher_active", False)
state.context_percent = raw.get("last_context_percent", 0.0)
state.tokens = raw.get("last_tokens", 0)
state.last_check = raw.get("last_check", "")
state.last_export = raw.get("last_export", "")
state.session_file = raw.get("session_file", "")
state.pending_exports = len(raw.get("pending_exports", []))
# Derive health state from watcher metrics
state.health_state = self._derive_health_state(state)
return state
def _derive_health_state(self, ws: WatcherHealthState) -> str:
"""Map watcher metrics to a HealthState value."""
if not ws.active:
return HealthState.HEALTHY.value # Inactive is not unhealthy
if ws.error_count > 0:
return HealthState.FAILING.value
if ws.context_percent >= self._config.context_critical_threshold:
return HealthState.FAILING.value
if ws.context_percent >= self._config.context_warning_threshold:
return HealthState.DEGRADED.value
# Check staleness
if ws.last_check:
try:
last = datetime.fromisoformat(ws.last_check.replace("Z", "+00:00"))
elapsed = (datetime.now(timezone.utc) - last).total_seconds()
if elapsed > self._config.stale_check_seconds:
return HealthState.STUCK.value
except (ValueError, TypeError):
pass
return HealthState.HEALTHY.value
def create_heartbeat(self, ws: Optional[WatcherHealthState] = None) -> HeartbeatPayload:
"""Create a HeartbeatPayload from watcher state."""
if ws is None:
ws = self.read_watcher_state()
return HeartbeatPayload(
agent_id=WATCHER_AGENT_ID,
task_id=WATCHER_TASK_ID,
timestamp=datetime.now(timezone.utc).isoformat(),
token_count=ws.tokens,
error_count=ws.error_count,
progress_indicator=f"context:{ws.context_percent:.1f}%" if ws.active else "",
)
async def sync(self) -> WatcherHealthState:
"""Read watcher state, register if needed, and send heartbeat.
This is the main bridge operation — call periodically or on demand.
"""
ws = self.read_watcher_state()
# Auto-register on first sync
if not self._registered and self._config.auto_register:
try:
await self._health_service.register_agent(
WATCHER_AGENT_ID, WATCHER_TASK_ID
)
self._registered = True
except Exception:
self._registered = True # Already registered
# Send heartbeat
heartbeat = self.create_heartbeat(ws)
await self._health_service.record_heartbeat(heartbeat)
# Track state for delta detection
old_state = self._last_watcher_state
self._last_watcher_state = ws
# Log transitions
if old_state and old_state.health_state != ws.health_state:
self._log_event("watcher_state_changed", {
"old_state": old_state.health_state,
"new_state": ws.health_state,
"context_percent": ws.context_percent,
"tokens": ws.tokens,
})
return ws
def _on_health_state_change(
self, agent_id: str, old_state: str, new_state: str, health: AgentHealth
) -> None:
"""Callback when any agent's health state changes."""
self._log_event("health_callback", {
"agent_id": agent_id,
"old_state": old_state,
"new_state": new_state,
})
def _log_event(self, event_type: str, details: Dict[str, Any]) -> None:
"""Record an event in the bridge's event log."""
entry = {
"type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
**details,
}
self._event_log.append(entry)
logger.info(f"Bridge event: {event_type} - {details}")
def get_event_log(self) -> List[Dict[str, Any]]:
"""Return the bridge's event log."""
return list(self._event_log)
def get_status(self) -> Dict[str, Any]:
"""Return current bridge status summary."""
ws = self._last_watcher_state or WatcherHealthState()
return {
"registered": self._registered,
"watcher_active": ws.active,
"watcher_health": ws.health_state,
"context_percent": ws.context_percent,
"tokens": ws.tokens,
"pending_exports": ws.pending_exports,
"events_logged": len(self._event_log),
"message_bus_connected": self._message_bus is not None,
}
all = [ "WatcherHealthState", "BridgeConfig", "HealthWatcherBridge", "WATCHER_AGENT_ID", "WATCHER_TASK_ID", ]