#!/usr/bin/env python3 """ H.1.5: Component Discovery Service (Redis-Backed)
Enables agents and components to register themselves and be discovered by capability. Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications.
Features:
- Component/Agent registration with capabilities
- Capability-based discovery with load balancing
- Health monitoring with heartbeats
- TTL-based auto-cleanup for stale registrations
- Local fallback mode when Redis is unavailable
Usage: from scripts.core.discovery_service import DiscoveryService, Component, Capability
# Initialize (auto-detects Redis or falls back to local)
discovery = DiscoveryService()
# Register a component
component = Component(
id="agent/orchestrator",
name="orchestrator",
component_type="agent",
capabilities=[Capability(name="task_coordination", ...)],
...
)
await discovery.register(component)
# Find components by capability
components = await discovery.find_by_capability("task_coordination")
"""
import os import sys import json import asyncio import hashlib import logging from abc import ABC, abstractmethod from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import List, Dict, Optional, Any, Set
Configure logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class ComponentStatus(Enum): """Component operational status""" AVAILABLE = "available" BUSY = "busy" OFFLINE = "offline" MAINTENANCE = "maintenance" DEGRADED = "degraded"
@dataclass class Capability: """What a component can do""" name: str description: str = "" input_schema: Dict[str, Any] = field(default_factory=dict) output_schema: Dict[str, Any] = field(default_factory=dict) required_tools: List[str] = field(default_factory=list) cost_estimate: float = 0.0 # Estimated cost per invocation avg_duration_seconds: float = 0.0 tags: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Capability":
return cls(**data)
@dataclass class Component: """Registered component (agent, skill, command, etc.)""" id: str # Unique ID (e.g., "agent/orchestrator", "skill/moe-task-execution") name: str component_type: str # "agent", "skill", "command", "script", "hook" capabilities: List[Capability] = field(default_factory=list) status: ComponentStatus = ComponentStatus.AVAILABLE current_load: int = 0 max_concurrency: int = 10 health_score: float = 1.0 # 0.0 to 1.0 last_seen: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) registered_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) metadata: Dict[str, Any] = field(default_factory=dict) version: str = "1.0.0" path: str = ""
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage"""
return {
"id": self.id,
"name": self.name,
"component_type": self.component_type,
"capabilities": [c.to_dict() for c in self.capabilities],
"status": self.status.value,
"current_load": self.current_load,
"max_concurrency": self.max_concurrency,
"health_score": self.health_score,
"last_seen": self.last_seen.isoformat(),
"registered_at": self.registered_at.isoformat(),
"metadata": self.metadata,
"version": self.version,
"path": self.path,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Component":
"""Create from dictionary"""
return cls(
id=data["id"],
name=data["name"],
component_type=data["component_type"],
capabilities=[Capability.from_dict(c) for c in data.get("capabilities", [])],
status=ComponentStatus(data.get("status", "available")),
current_load=data.get("current_load", 0),
max_concurrency=data.get("max_concurrency", 10),
health_score=data.get("health_score", 1.0),
last_seen=datetime.fromisoformat(data["last_seen"]) if "last_seen" in data else datetime.now(timezone.utc),
registered_at=datetime.fromisoformat(data["registered_at"]) if "registered_at" in data else datetime.now(timezone.utc),
metadata=data.get("metadata", {}),
version=data.get("version", "1.0.0"),
path=data.get("path", ""),
)
@property
def load_ratio(self) -> float:
"""Current load as ratio of max concurrency"""
if self.max_concurrency == 0:
return 1.0
return self.current_load / self.max_concurrency
@dataclass class DiscoveryResult: """Result of a discovery query""" components: List[Component] query_capability: str query_time_ms: float total_matches: int filtered_count: int source: str # "redis" or "local"
class DiscoveryBackend(ABC): """Abstract backend for discovery service storage"""
@abstractmethod
async def register(self, component: Component) -> bool:
"""Register a component"""
pass
@abstractmethod
async def unregister(self, component_id: str) -> bool:
"""Unregister a component"""
pass
@abstractmethod
async def get(self, component_id: str) -> Optional[Component]:
"""Get component by ID"""
pass
@abstractmethod
async def find_by_capability(self, capability_name: str) -> List[Component]:
"""Find components by capability name"""
pass
@abstractmethod
async def find_by_type(self, component_type: str) -> List[Component]:
"""Find components by type"""
pass
@abstractmethod
async def list_all(self) -> List[Component]:
"""List all registered components"""
pass
@abstractmethod
async def heartbeat(self, component_id: str, status: ComponentStatus, load: int) -> bool:
"""Update component heartbeat"""
pass
@abstractmethod
async def get_stats(self) -> Dict[str, Any]:
"""Get registry statistics"""
pass
class LocalDiscoveryBackend(DiscoveryBackend): """In-memory backend for local development or when Redis unavailable"""
def __init__(self):
self._components: Dict[str, Component] = {}
self._capability_index: Dict[str, Set[str]] = {}
self._type_index: Dict[str, Set[str]] = {}
async def register(self, component: Component) -> bool:
"""Register component in local storage"""
self._components[component.id] = component
# Update capability index
for cap in component.capabilities:
if cap.name not in self._capability_index:
self._capability_index[cap.name] = set()
self._capability_index[cap.name].add(component.id)
# Update type index
if component.component_type not in self._type_index:
self._type_index[component.component_type] = set()
self._type_index[component.component_type].add(component.id)
return True
async def unregister(self, component_id: str) -> bool:
"""Unregister component from local storage"""
if component_id not in self._components:
return False
component = self._components[component_id]
# Remove from capability index
for cap in component.capabilities:
if cap.name in self._capability_index:
self._capability_index[cap.name].discard(component_id)
# Remove from type index
if component.component_type in self._type_index:
self._type_index[component.component_type].discard(component_id)
del self._components[component_id]
return True
async def get(self, component_id: str) -> Optional[Component]:
"""Get component by ID"""
return self._components.get(component_id)
async def find_by_capability(self, capability_name: str) -> List[Component]:
"""Find components by capability"""
component_ids = self._capability_index.get(capability_name, set())
return [self._components[cid] for cid in component_ids if cid in self._components]
async def find_by_type(self, component_type: str) -> List[Component]:
"""Find components by type"""
component_ids = self._type_index.get(component_type, set())
return [self._components[cid] for cid in component_ids if cid in self._components]
async def list_all(self) -> List[Component]:
"""List all components"""
return list(self._components.values())
async def heartbeat(self, component_id: str, status: ComponentStatus, load: int) -> bool:
"""Update heartbeat"""
if component_id not in self._components:
return False
self._components[component_id].status = status
self._components[component_id].current_load = load
self._components[component_id].last_seen = datetime.now(timezone.utc)
return True
async def get_stats(self) -> Dict[str, Any]:
"""Get statistics"""
by_type = {}
by_status = {}
for comp in self._components.values():
by_type[comp.component_type] = by_type.get(comp.component_type, 0) + 1
by_status[comp.status.value] = by_status.get(comp.status.value, 0) + 1
return {
"total_components": len(self._components),
"total_capabilities": len(self._capability_index),
"by_type": by_type,
"by_status": by_status,
"backend": "local"
}
class RedisDiscoveryBackend(DiscoveryBackend): """Redis-backed discovery service for production use"""
# Key prefixes
COMPONENT_PREFIX = "coditect:component:"
CAPABILITY_PREFIX = "coditect:capability:"
TYPE_PREFIX = "coditect:type:"
STATS_KEY = "coditect:discovery:stats"
# TTL for component registration (5 minutes without heartbeat = stale)
DEFAULT_TTL = 300
def __init__(self, redis_url: str = None):
self.redis_url = redis_url or os.environ.get("REDIS_URL", "redis://localhost:6379")
self._redis = None
async def _get_redis(self):
"""Get or create Redis connection"""
if self._redis is None:
try:
import redis.asyncio as redis
self._redis = redis.from_url(self.redis_url, decode_responses=True)
# Test connection
await self._redis.ping()
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
raise
return self._redis
async def register(self, component: Component) -> bool:
"""Register component in Redis"""
try:
r = await self._get_redis()
component_key = f"{self.COMPONENT_PREFIX}{component.id}"
# Store component data
await r.hset(component_key, mapping={
"data": json.dumps(component.to_dict())
})
# Set TTL
await r.expire(component_key, self.DEFAULT_TTL)
# Add to capability indexes
for cap in component.capabilities:
await r.sadd(f"{self.CAPABILITY_PREFIX}{cap.name}", component.id)
# Add to type index
await r.sadd(f"{self.TYPE_PREFIX}{component.component_type}", component.id)
logger.debug(f"Registered component: {component.id}")
return True
except Exception as e:
logger.error(f"Failed to register component {component.id}: {e}")
return False
async def unregister(self, component_id: str) -> bool:
"""Unregister component from Redis"""
try:
r = await self._get_redis()
# Get component data first
component = await self.get(component_id)
if not component:
return False
# Remove from capability indexes
for cap in component.capabilities:
await r.srem(f"{self.CAPABILITY_PREFIX}{cap.name}", component_id)
# Remove from type index
await r.srem(f"{self.TYPE_PREFIX}{component.component_type}", component_id)
# Delete component key
await r.delete(f"{self.COMPONENT_PREFIX}{component_id}")
return True
except Exception as e:
logger.error(f"Failed to unregister component {component_id}: {e}")
return False
async def get(self, component_id: str) -> Optional[Component]:
"""Get component by ID"""
try:
r = await self._get_redis()
data = await r.hget(f"{self.COMPONENT_PREFIX}{component_id}", "data")
if not data:
return None
return Component.from_dict(json.loads(data))
except Exception as e:
logger.error(f"Failed to get component {component_id}: {e}")
return None
async def find_by_capability(self, capability_name: str) -> List[Component]:
"""Find components by capability"""
try:
r = await self._get_redis()
component_ids = await r.smembers(f"{self.CAPABILITY_PREFIX}{capability_name}")
components = []
for cid in component_ids:
component = await self.get(cid)
if component:
components.append(component)
return components
except Exception as e:
logger.error(f"Failed to find by capability {capability_name}: {e}")
return []
async def find_by_type(self, component_type: str) -> List[Component]:
"""Find components by type"""
try:
r = await self._get_redis()
component_ids = await r.smembers(f"{self.TYPE_PREFIX}{component_type}")
components = []
for cid in component_ids:
component = await self.get(cid)
if component:
components.append(component)
return components
except Exception as e:
logger.error(f"Failed to find by type {component_type}: {e}")
return []
async def list_all(self) -> List[Component]:
"""List all registered components"""
try:
r = await self._get_redis()
# Get all component keys
keys = []
async for key in r.scan_iter(f"{self.COMPONENT_PREFIX}*"):
keys.append(key)
components = []
for key in keys:
component_id = key.replace(self.COMPONENT_PREFIX, "")
component = await self.get(component_id)
if component:
components.append(component)
return components
except Exception as e:
logger.error(f"Failed to list all components: {e}")
return []
async def heartbeat(self, component_id: str, status: ComponentStatus, load: int) -> bool:
"""Update component heartbeat"""
try:
r = await self._get_redis()
component = await self.get(component_id)
if not component:
return False
# Update fields
component.status = status
component.current_load = load
component.last_seen = datetime.now(timezone.utc)
# Save and refresh TTL
component_key = f"{self.COMPONENT_PREFIX}{component_id}"
await r.hset(component_key, mapping={
"data": json.dumps(component.to_dict())
})
await r.expire(component_key, self.DEFAULT_TTL)
return True
except Exception as e:
logger.error(f"Failed to heartbeat component {component_id}: {e}")
return False
async def get_stats(self) -> Dict[str, Any]:
"""Get registry statistics"""
try:
r = await self._get_redis()
components = await self.list_all()
by_type = {}
by_status = {}
for comp in components:
by_type[comp.component_type] = by_type.get(comp.component_type, 0) + 1
by_status[comp.status.value] = by_status.get(comp.status.value, 0) + 1
# Count capabilities
cap_count = 0
async for _ in r.scan_iter(f"{self.CAPABILITY_PREFIX}*"):
cap_count += 1
return {
"total_components": len(components),
"total_capabilities": cap_count,
"by_type": by_type,
"by_status": by_status,
"backend": "redis",
"redis_url": self.redis_url.split("@")[-1] if "@" in self.redis_url else self.redis_url
}
except Exception as e:
logger.error(f"Failed to get stats: {e}")
return {"error": str(e), "backend": "redis"}
class DiscoveryService: """ Main discovery service with automatic backend selection.
Tries Redis first, falls back to local in-memory storage.
"""
def __init__(self, redis_url: str = None, force_local: bool = False):
"""
Initialize discovery service.
Args:
redis_url: Redis connection URL (defaults to REDIS_URL env var)
force_local: Force local backend even if Redis is available
"""
self.redis_url = redis_url
self.force_local = force_local
self._backend: Optional[DiscoveryBackend] = None
self._initialized = False
async def _ensure_initialized(self):
"""Ensure backend is initialized"""
if self._initialized:
return
if self.force_local:
self._backend = LocalDiscoveryBackend()
logger.info("Using local discovery backend (forced)")
else:
# Try Redis first
try:
redis_backend = RedisDiscoveryBackend(self.redis_url)
await redis_backend._get_redis() # Test connection
self._backend = redis_backend
logger.info(f"Using Redis discovery backend: {self.redis_url or 'redis://localhost:6379'}")
except Exception as e:
logger.warning(f"Redis unavailable ({e}), falling back to local backend")
self._backend = LocalDiscoveryBackend()
self._initialized = True
async def register(self, component: Component) -> bool:
"""
Register a component.
Args:
component: Component to register
Returns:
True if successful
"""
await self._ensure_initialized()
return await self._backend.register(component)
async def unregister(self, component_id: str) -> bool:
"""
Unregister a component.
Args:
component_id: ID of component to unregister
Returns:
True if successful
"""
await self._ensure_initialized()
return await self._backend.unregister(component_id)
async def get(self, component_id: str) -> Optional[Component]:
"""
Get component by ID.
Args:
component_id: Component ID (e.g., "agent/orchestrator")
Returns:
Component if found, None otherwise
"""
await self._ensure_initialized()
return await self._backend.get(component_id)
async def find_by_capability(
self,
capability_name: str,
min_health_score: float = 0.7,
max_load_ratio: float = 0.8,
status_filter: Optional[List[ComponentStatus]] = None
) -> DiscoveryResult:
"""
Find components by capability with filtering and load balancing.
Args:
capability_name: Name of capability to search for
min_health_score: Minimum health score (0.0-1.0)
max_load_ratio: Maximum load ratio (0.0-1.0)
status_filter: Only include these statuses (default: AVAILABLE)
Returns:
DiscoveryResult with matching components sorted by load (least loaded first)
"""
import time
start = time.time()
await self._ensure_initialized()
if status_filter is None:
status_filter = [ComponentStatus.AVAILABLE]
# Get all components with capability
all_components = await self._backend.find_by_capability(capability_name)
# Filter by criteria
filtered = []
for comp in all_components:
if comp.status not in status_filter:
continue
if comp.health_score < min_health_score:
continue
if comp.load_ratio > max_load_ratio:
continue
filtered.append(comp)
# Sort by load (least loaded first)
filtered.sort(key=lambda c: c.load_ratio)
query_time = (time.time() - start) * 1000
return DiscoveryResult(
components=filtered,
query_capability=capability_name,
query_time_ms=query_time,
total_matches=len(all_components),
filtered_count=len(filtered),
source="redis" if isinstance(self._backend, RedisDiscoveryBackend) else "local"
)
async def find_by_type(self, component_type: str) -> List[Component]:
"""
Find components by type.
Args:
component_type: Type of component ("agent", "skill", etc.)
Returns:
List of matching components
"""
await self._ensure_initialized()
return await self._backend.find_by_type(component_type)
async def list_all(self) -> List[Component]:
"""
List all registered components.
Returns:
List of all components
"""
await self._ensure_initialized()
return await self._backend.list_all()
async def heartbeat(self, component_id: str, status: ComponentStatus = ComponentStatus.AVAILABLE, load: int = 0) -> bool:
"""
Send heartbeat to keep registration alive.
Args:
component_id: Component ID
status: Current status
load: Current load (number of active tasks)
Returns:
True if successful
"""
await self._ensure_initialized()
return await self._backend.heartbeat(component_id, status, load)
async def get_stats(self) -> Dict[str, Any]:
"""
Get registry statistics.
Returns:
Dictionary with stats
"""
await self._ensure_initialized()
return await self._backend.get_stats()
async def register_from_activation_status(self, activation_file: Path) -> int:
"""
Bulk register components from component-activation-status.json.
Args:
activation_file: Path to activation status file
Returns:
Number of components registered
"""
await self._ensure_initialized()
with open(activation_file) as f:
data = json.load(f)
count = 0
for comp_data in data.get("components", []):
if not comp_data.get("activated", False):
continue
component = Component(
id=f"{comp_data['type']}/{comp_data['name']}",
name=comp_data["name"],
component_type=comp_data["type"],
capabilities=[], # Will be populated by capability-based discovery (H.1.6)
status=ComponentStatus.AVAILABLE,
version=comp_data.get("version", "1.0.0"),
path=comp_data.get("path", ""),
metadata={
"activated_at": comp_data.get("activated_at"),
"reason": comp_data.get("reason"),
}
)
if await self.register(component):
count += 1
logger.info(f"Registered {count} components from activation status")
return count
CLI interface
async def main(): """CLI for discovery service""" import argparse
parser = argparse.ArgumentParser(description="CODITECT Component Discovery Service")
subparsers = parser.add_subparsers(dest="command")
# Stats command
subparsers.add_parser("stats", help="Show registry statistics")
# List command
list_parser = subparsers.add_parser("list", help="List registered components")
list_parser.add_argument("--type", help="Filter by component type")
# Find command
find_parser = subparsers.add_parser("find", help="Find components by capability")
find_parser.add_argument("capability", help="Capability name to search for")
# Register command
register_parser = subparsers.add_parser("register", help="Register components from activation status")
register_parser.add_argument("--file", default="config/component-activation-status.json", help="Activation status file")
# Common args
parser.add_argument("--redis", help="Redis URL")
parser.add_argument("--local", action="store_true", help="Force local backend")
args = parser.parse_args()
discovery = DiscoveryService(redis_url=args.redis, force_local=args.local)
if args.command == "stats":
stats = await discovery.get_stats()
print(json.dumps(stats, indent=2))
elif args.command == "list":
if args.type:
components = await discovery.find_by_type(args.type)
else:
components = await discovery.list_all()
print(f"\nRegistered Components: {len(components)}\n")
for comp in sorted(components, key=lambda c: c.id):
status_icon = "â
" if comp.status == ComponentStatus.AVAILABLE else "â¸ď¸"
print(f" {status_icon} {comp.id} (health: {comp.health_score:.2f}, load: {comp.load_ratio:.0%})")
elif args.command == "find":
result = await discovery.find_by_capability(args.capability)
print(f"\nFound {result.filtered_count} components with capability '{args.capability}'")
print(f"(Query time: {result.query_time_ms:.2f}ms, Backend: {result.source})\n")
for comp in result.components:
print(f" - {comp.id} (load: {comp.load_ratio:.0%})")
elif args.command == "register":
file_path = Path(args.file)
if not file_path.is_absolute():
file_path = Path(__file__).parent.parent.parent / args.file
count = await discovery.register_from_activation_status(file_path)
print(f"\nâ
Registered {count} components")
stats = await discovery.get_stats()
print(f"\nRegistry stats:")
print(json.dumps(stats, indent=2))
else:
parser.print_help()
if name == "main": asyncio.run(main())