Skip to main content

#!/usr/bin/env python3 """ H.1.5: Tests for Component Discovery Service

Tests cover:

  • Component registration and unregistration
  • Capability-based discovery
  • Type-based discovery
  • Health filtering and load balancing
  • Heartbeat mechanism
  • Local backend fallback
  • Bulk registration from activation status """

import asyncio import json import sys import unittest from datetime import datetime, timezone from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch

Add parent to path for imports

sys.path.insert(0, str(Path(file).parent.parent.parent))

from scripts.core.discovery_service import ( Component, Capability, ComponentStatus, DiscoveryService, LocalDiscoveryBackend, DiscoveryResult, )

class TestCapability(unittest.TestCase): """Tests for Capability dataclass"""

def test_capability_creation(self):
"""Test creating a capability"""
cap = Capability(
name="code_review",
description="Review code for bugs",
input_schema={"type": "object"},
output_schema={"type": "object"},
required_tools=["read", "grep"],
cost_estimate=0.05,
avg_duration_seconds=30.0,
tags=["quality", "security"]
)

self.assertEqual(cap.name, "code_review")
self.assertEqual(cap.description, "Review code for bugs")
self.assertEqual(cap.required_tools, ["read", "grep"])
self.assertEqual(cap.cost_estimate, 0.05)

def test_capability_to_dict(self):
"""Test capability serialization"""
cap = Capability(name="test", description="Test capability")
d = cap.to_dict()

self.assertIn("name", d)
self.assertIn("description", d)
self.assertEqual(d["name"], "test")

def test_capability_from_dict(self):
"""Test capability deserialization"""
data = {
"name": "test",
"description": "Test capability",
"input_schema": {},
"output_schema": {},
"required_tools": ["read"],
"cost_estimate": 0.1,
"avg_duration_seconds": 10.0,
"tags": ["test"]
}
cap = Capability.from_dict(data)

self.assertEqual(cap.name, "test")
self.assertEqual(cap.required_tools, ["read"])

class TestComponent(unittest.TestCase): """Tests for Component dataclass"""

def test_component_creation(self):
"""Test creating a component"""
comp = Component(
id="agent/orchestrator",
name="orchestrator",
component_type="agent",
capabilities=[Capability(name="task_coordination")],
status=ComponentStatus.AVAILABLE,
max_concurrency=5,
health_score=0.95,
path="agents/orchestrator.md"
)

self.assertEqual(comp.id, "agent/orchestrator")
self.assertEqual(comp.name, "orchestrator")
self.assertEqual(comp.component_type, "agent")
self.assertEqual(len(comp.capabilities), 1)
self.assertEqual(comp.status, ComponentStatus.AVAILABLE)

def test_component_load_ratio(self):
"""Test load ratio calculation"""
comp = Component(
id="test",
name="test",
component_type="agent",
current_load=3,
max_concurrency=10
)

self.assertEqual(comp.load_ratio, 0.3)

def test_component_load_ratio_zero_max(self):
"""Test load ratio with zero max concurrency"""
comp = Component(
id="test",
name="test",
component_type="agent",
current_load=5,
max_concurrency=0
)

self.assertEqual(comp.load_ratio, 1.0)

def test_component_to_dict(self):
"""Test component serialization"""
comp = Component(
id="agent/test",
name="test",
component_type="agent"
)
d = comp.to_dict()

self.assertIn("id", d)
self.assertIn("name", d)
self.assertIn("component_type", d)
self.assertIn("status", d)
self.assertEqual(d["status"], "available")

def test_component_from_dict(self):
"""Test component deserialization"""
data = {
"id": "agent/test",
"name": "test",
"component_type": "agent",
"capabilities": [{"name": "cap1", "description": ""}],
"status": "busy",
"current_load": 2,
"max_concurrency": 5,
"health_score": 0.8,
"last_seen": "2025-01-01T00:00:00+00:00",
"registered_at": "2025-01-01T00:00:00+00:00",
"metadata": {"key": "value"},
"version": "2.0.0",
"path": "agents/test.md"
}
comp = Component.from_dict(data)

self.assertEqual(comp.id, "agent/test")
self.assertEqual(comp.status, ComponentStatus.BUSY)
self.assertEqual(comp.current_load, 2)
self.assertEqual(len(comp.capabilities), 1)

class TestLocalDiscoveryBackend(unittest.TestCase): """Tests for LocalDiscoveryBackend"""

def setUp(self):
"""Set up test fixtures"""
self.backend = LocalDiscoveryBackend()

def test_register_component(self):
"""Test registering a component"""
async def run_test():
comp = Component(
id="agent/test",
name="test",
component_type="agent",
capabilities=[Capability(name="cap1")]
)

result = await self.backend.register(comp)
self.assertTrue(result)

# Verify it's stored
stored = await self.backend.get("agent/test")
self.assertIsNotNone(stored)
self.assertEqual(stored.name, "test")

asyncio.run(run_test())

def test_unregister_component(self):
"""Test unregistering a component"""
async def run_test():
comp = Component(
id="agent/test",
name="test",
component_type="agent",
capabilities=[Capability(name="cap1")]
)

await self.backend.register(comp)
result = await self.backend.unregister("agent/test")
self.assertTrue(result)

# Verify it's removed
stored = await self.backend.get("agent/test")
self.assertIsNone(stored)

asyncio.run(run_test())

def test_unregister_nonexistent(self):
"""Test unregistering non-existent component"""
async def run_test():
result = await self.backend.unregister("nonexistent")
self.assertFalse(result)

asyncio.run(run_test())

def test_find_by_capability(self):
"""Test finding components by capability"""
async def run_test():
# Register components with different capabilities
comp1 = Component(
id="agent/a",
name="a",
component_type="agent",
capabilities=[Capability(name="code_review")]
)
comp2 = Component(
id="agent/b",
name="b",
component_type="agent",
capabilities=[Capability(name="code_review"), Capability(name="testing")]
)
comp3 = Component(
id="agent/c",
name="c",
component_type="agent",
capabilities=[Capability(name="documentation")]
)

await self.backend.register(comp1)
await self.backend.register(comp2)
await self.backend.register(comp3)

# Find by capability
results = await self.backend.find_by_capability("code_review")
self.assertEqual(len(results), 2)

results = await self.backend.find_by_capability("testing")
self.assertEqual(len(results), 1)
self.assertEqual(results[0].id, "agent/b")

results = await self.backend.find_by_capability("nonexistent")
self.assertEqual(len(results), 0)

asyncio.run(run_test())

def test_find_by_type(self):
"""Test finding components by type"""
async def run_test():
comp1 = Component(id="agent/a", name="a", component_type="agent")
comp2 = Component(id="skill/b", name="b", component_type="skill")
comp3 = Component(id="agent/c", name="c", component_type="agent")

await self.backend.register(comp1)
await self.backend.register(comp2)
await self.backend.register(comp3)

agents = await self.backend.find_by_type("agent")
self.assertEqual(len(agents), 2)

skills = await self.backend.find_by_type("skill")
self.assertEqual(len(skills), 1)

asyncio.run(run_test())

def test_list_all(self):
"""Test listing all components"""
async def run_test():
comp1 = Component(id="agent/a", name="a", component_type="agent")
comp2 = Component(id="skill/b", name="b", component_type="skill")

await self.backend.register(comp1)
await self.backend.register(comp2)

all_components = await self.backend.list_all()
self.assertEqual(len(all_components), 2)

asyncio.run(run_test())

def test_heartbeat(self):
"""Test heartbeat updates"""
async def run_test():
comp = Component(
id="agent/test",
name="test",
component_type="agent",
status=ComponentStatus.AVAILABLE,
current_load=0
)

await self.backend.register(comp)

# Send heartbeat with updated status
result = await self.backend.heartbeat(
"agent/test",
ComponentStatus.BUSY,
load=3
)
self.assertTrue(result)

# Verify updates
stored = await self.backend.get("agent/test")
self.assertEqual(stored.status, ComponentStatus.BUSY)
self.assertEqual(stored.current_load, 3)

asyncio.run(run_test())

def test_heartbeat_nonexistent(self):
"""Test heartbeat for non-existent component"""
async def run_test():
result = await self.backend.heartbeat(
"nonexistent",
ComponentStatus.AVAILABLE,
load=0
)
self.assertFalse(result)

asyncio.run(run_test())

def test_get_stats(self):
"""Test getting statistics"""
async def run_test():
comp1 = Component(
id="agent/a",
name="a",
component_type="agent",
status=ComponentStatus.AVAILABLE,
capabilities=[Capability(name="cap1")]
)
comp2 = Component(
id="skill/b",
name="b",
component_type="skill",
status=ComponentStatus.BUSY
)

await self.backend.register(comp1)
await self.backend.register(comp2)

stats = await self.backend.get_stats()

self.assertEqual(stats["total_components"], 2)
self.assertEqual(stats["by_type"]["agent"], 1)
self.assertEqual(stats["by_type"]["skill"], 1)
self.assertEqual(stats["by_status"]["available"], 1)
self.assertEqual(stats["by_status"]["busy"], 1)
self.assertEqual(stats["backend"], "local")

asyncio.run(run_test())

class TestDiscoveryService(unittest.TestCase): """Tests for DiscoveryService (main interface)"""

def test_local_backend_fallback(self):
"""Test that service falls back to local when Redis unavailable"""
async def run_test():
# Force local backend
service = DiscoveryService(force_local=True)

comp = Component(
id="agent/test",
name="test",
component_type="agent"
)

result = await service.register(comp)
self.assertTrue(result)

stats = await service.get_stats()
self.assertEqual(stats["backend"], "local")

asyncio.run(run_test())

def test_find_by_capability_with_filtering(self):
"""Test capability search with health and load filtering"""
async def run_test():
service = DiscoveryService(force_local=True)

# Register components with varying health and load
comps = [
Component(
id="agent/healthy_light",
name="healthy_light",
component_type="agent",
capabilities=[Capability(name="code_review")],
health_score=0.95,
current_load=1,
max_concurrency=10
),
Component(
id="agent/healthy_heavy",
name="healthy_heavy",
component_type="agent",
capabilities=[Capability(name="code_review")],
health_score=0.9,
current_load=9,
max_concurrency=10
),
Component(
id="agent/unhealthy",
name="unhealthy",
component_type="agent",
capabilities=[Capability(name="code_review")],
health_score=0.5,
current_load=0,
max_concurrency=10
),
Component(
id="agent/offline",
name="offline",
component_type="agent",
capabilities=[Capability(name="code_review")],
status=ComponentStatus.OFFLINE,
health_score=1.0,
current_load=0,
max_concurrency=10
),
]

for comp in comps:
await service.register(comp)

# Default filtering (min_health=0.7, max_load=0.8, status=AVAILABLE)
result = await service.find_by_capability("code_review")

# Should only return healthy_light (healthy_heavy exceeds load, unhealthy below health, offline wrong status)
self.assertEqual(result.total_matches, 4)
self.assertEqual(result.filtered_count, 1)
self.assertEqual(result.components[0].id, "agent/healthy_light")

asyncio.run(run_test())

def test_find_by_capability_load_balancing(self):
"""Test that results are sorted by load (least loaded first)"""
async def run_test():
service = DiscoveryService(force_local=True)

# Register components with varying load
comps = [
Component(
id="agent/heavy",
name="heavy",
component_type="agent",
capabilities=[Capability(name="test")],
current_load=7,
max_concurrency=10
),
Component(
id="agent/medium",
name="medium",
component_type="agent",
capabilities=[Capability(name="test")],
current_load=5,
max_concurrency=10
),
Component(
id="agent/light",
name="light",
component_type="agent",
capabilities=[Capability(name="test")],
current_load=2,
max_concurrency=10
),
]

for comp in comps:
await service.register(comp)

result = await service.find_by_capability("test", max_load_ratio=1.0)

# Should be sorted by load ratio
self.assertEqual(result.components[0].id, "agent/light")
self.assertEqual(result.components[1].id, "agent/medium")
self.assertEqual(result.components[2].id, "agent/heavy")

asyncio.run(run_test())

def test_discovery_result_metadata(self):
"""Test that DiscoveryResult contains correct metadata"""
async def run_test():
service = DiscoveryService(force_local=True)

comp = Component(
id="agent/test",
name="test",
component_type="agent",
capabilities=[Capability(name="test_cap")]
)
await service.register(comp)

result = await service.find_by_capability("test_cap")

self.assertEqual(result.query_capability, "test_cap")
self.assertGreaterEqual(result.query_time_ms, 0)
self.assertEqual(result.source, "local")
self.assertEqual(result.total_matches, 1)

asyncio.run(run_test())

class TestBulkRegistration(unittest.TestCase): """Tests for bulk registration from activation status"""

def test_register_from_activation_status(self):
"""Test bulk registration from activation status file"""
async def run_test():
service = DiscoveryService(force_local=True)

# Create mock activation status
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump({
"components": [
{
"type": "agent",
"name": "test-agent",
"activated": True,
"version": "1.0.0",
"path": "agents/test-agent.md"
},
{
"type": "skill",
"name": "test-skill",
"activated": True,
"version": "2.0.0",
"path": "skills/test-skill/SKILL.md"
},
{
"type": "command",
"name": "inactive-cmd",
"activated": False, # Should be skipped
"path": "commands/inactive.md"
}
]
}, f)
temp_path = Path(f.name)

try:
count = await service.register_from_activation_status(temp_path)

# Should register only activated components
self.assertEqual(count, 2)

# Verify they're registered
agent = await service.get("agent/test-agent")
self.assertIsNotNone(agent)
self.assertEqual(agent.version, "1.0.0")

skill = await service.get("skill/test-skill")
self.assertIsNotNone(skill)

# Inactive should not be registered
inactive = await service.get("command/inactive-cmd")
self.assertIsNone(inactive)

finally:
temp_path.unlink()

asyncio.run(run_test())

def run_tests(): """Run all tests""" loader = unittest.TestLoader() suite = unittest.TestSuite()

# Add test classes
suite.addTests(loader.loadTestsFromTestCase(TestCapability))
suite.addTests(loader.loadTestsFromTestCase(TestComponent))
suite.addTests(loader.loadTestsFromTestCase(TestLocalDiscoveryBackend))
suite.addTests(loader.loadTestsFromTestCase(TestDiscoveryService))
suite.addTests(loader.loadTestsFromTestCase(TestBulkRegistration))

# Run tests
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)

return 0 if result.wasSuccessful() else 1

if name == "main": sys.exit(run_tests())