Skip to main content

#!/usr/bin/env python3 """ Unit Tests for CODITECT Priority Queue Router

Tests:

  • AgentStatus enum
  • AgentCapability dataclass
  • Agent dataclass
  • LocalAgentRegistry
  • AgentDiscoveryService
  • PriorityRouter (all strategies)
  • IntegratedRouter

Author: CODITECT Framework Version: 1.0.0 Created: January 8, 2026 """

import asyncio import json import unittest from datetime import datetime, timedelta from typing import List

from priority_router import ( # Enums AgentStatus, RoutingStrategy, RoutingResult, # Data classes AgentCapability, Agent, RoutingDecision, AgentDiscoveryConfig, # Classes LocalAgentRegistry, AgentDiscoveryService, PriorityRouter, IntegratedRouter, # Helpers create_agent, )

=============================================================================

Test AgentStatus Enum

=============================================================================

class TestAgentStatus(unittest.TestCase): """Test AgentStatus enum"""

def test_status_values(self):
"""Test all status values exist"""
self.assertEqual(AgentStatus.AVAILABLE.value, "available")
self.assertEqual(AgentStatus.BUSY.value, "busy")
self.assertEqual(AgentStatus.OFFLINE.value, "offline")
self.assertEqual(AgentStatus.MAINTENANCE.value, "maintenance")
self.assertEqual(AgentStatus.DRAINING.value, "draining")

def test_status_from_string(self):
"""Test creating status from string"""
status = AgentStatus("available")
self.assertEqual(status, AgentStatus.AVAILABLE)

=============================================================================

Test RoutingStrategy Enum

=============================================================================

class TestRoutingStrategy(unittest.TestCase): """Test RoutingStrategy enum"""

def test_strategy_values(self):
"""Test all strategy values"""
self.assertEqual(RoutingStrategy.LEAST_LOADED.value, "least_loaded")
self.assertEqual(RoutingStrategy.ROUND_ROBIN.value, "round_robin")
self.assertEqual(RoutingStrategy.PRIORITY_FIRST.value, "priority_first")
self.assertEqual(RoutingStrategy.CAPABILITY_MATCH.value, "capability_match")
self.assertEqual(RoutingStrategy.COST_OPTIMIZED.value, "cost_optimized")
self.assertEqual(RoutingStrategy.LATENCY_OPTIMIZED.value, "latency_optimized")

=============================================================================

Test RoutingResult Enum

=============================================================================

class TestRoutingResult(unittest.TestCase): """Test RoutingResult enum"""

def test_result_values(self):
"""Test all result values"""
self.assertEqual(RoutingResult.SUCCESS.value, "success")
self.assertEqual(RoutingResult.NO_CAPABLE_AGENTS.value, "no_capable_agents")
self.assertEqual(RoutingResult.ALL_AGENTS_BUSY.value, "all_agents_busy")
self.assertEqual(RoutingResult.ROUTING_ERROR.value, "routing_error")

=============================================================================

Test AgentCapability

=============================================================================

class TestAgentCapability(unittest.TestCase): """Test AgentCapability dataclass"""

def test_create_default(self):
"""Test creating capability with defaults"""
cap = AgentCapability(name="code-review")
self.assertEqual(cap.name, "code-review")
self.assertEqual(cap.description, "")
self.assertEqual(cap.cost_per_invocation, 0.0)
self.assertEqual(cap.avg_duration_seconds, 60.0)
self.assertEqual(cap.success_rate, 0.95)

def test_create_custom(self):
"""Test creating capability with custom values"""
cap = AgentCapability(
name="security-audit",
description="Security code audit",
cost_per_invocation=0.05,
avg_duration_seconds=120.0,
success_rate=0.98,
required_tools=["bandit", "semgrep"]
)
self.assertEqual(cap.name, "security-audit")
self.assertEqual(cap.description, "Security code audit")
self.assertEqual(cap.cost_per_invocation, 0.05)
self.assertEqual(cap.avg_duration_seconds, 120.0)
self.assertEqual(cap.success_rate, 0.98)
self.assertEqual(len(cap.required_tools), 2)

def test_to_dict(self):
"""Test converting to dictionary"""
cap = AgentCapability(name="testing", description="Unit testing")
data = cap.to_dict()
self.assertEqual(data["name"], "testing")
self.assertEqual(data["description"], "Unit testing")

def test_from_dict(self):
"""Test creating from dictionary"""
data = {
"name": "documentation",
"description": "Write docs",
"cost_per_invocation": 0.01
}
cap = AgentCapability.from_dict(data)
self.assertEqual(cap.name, "documentation")
self.assertEqual(cap.cost_per_invocation, 0.01)

=============================================================================

Test Agent

=============================================================================

class TestAgent(unittest.TestCase): """Test Agent dataclass"""

def test_create_default(self):
"""Test creating agent with defaults"""
agent = Agent(
id="agent-1",
name="Test Agent",
type="claude",
capabilities=[]
)
self.assertEqual(agent.id, "agent-1")
self.assertEqual(agent.name, "Test Agent")
self.assertEqual(agent.status, AgentStatus.AVAILABLE)
self.assertEqual(agent.current_load, 0)
self.assertEqual(agent.max_concurrency, 5)
self.assertEqual(agent.health_score, 1.0)

def test_load_ratio(self):
"""Test load ratio calculation"""
agent = Agent(
id="agent-1",
name="Test",
type="claude",
capabilities=[],
current_load=3,
max_concurrency=10
)
self.assertEqual(agent.load_ratio, 0.3)

def test_load_ratio_zero_concurrency(self):
"""Test load ratio with zero max concurrency"""
agent = Agent(
id="agent-1",
name="Test",
type="claude",
capabilities=[],
max_concurrency=0
)
self.assertEqual(agent.load_ratio, 1.0)

def test_is_available(self):
"""Test availability check"""
agent = Agent(
id="agent-1",
name="Test",
type="claude",
capabilities=[],
status=AgentStatus.AVAILABLE,
current_load=2,
max_concurrency=5
)
self.assertTrue(agent.is_available)

# At capacity
agent.current_load = 5
self.assertFalse(agent.is_available)

# Offline
agent.current_load = 0
agent.status = AgentStatus.OFFLINE
self.assertFalse(agent.is_available)

def test_available_slots(self):
"""Test available slots calculation"""
agent = Agent(
id="agent-1",
name="Test",
type="claude",
capabilities=[],
current_load=2,
max_concurrency=5
)
self.assertEqual(agent.available_slots, 3)

def test_has_capability(self):
"""Test capability check"""
agent = Agent(
id="agent-1",
name="Test",
type="claude",
capabilities=[
AgentCapability(name="code-review"),
AgentCapability(name="testing")
]
)
self.assertTrue(agent.has_capability("code-review"))
self.assertTrue(agent.has_capability("testing"))
self.assertFalse(agent.has_capability("security-audit"))

def test_get_capability(self):
"""Test getting capability by name"""
agent = Agent(
id="agent-1",
name="Test",
type="claude",
capabilities=[
AgentCapability(name="code-review", cost_per_invocation=0.05)
]
)
cap = agent.get_capability("code-review")
self.assertIsNotNone(cap)
self.assertEqual(cap.cost_per_invocation, 0.05)

cap = agent.get_capability("nonexistent")
self.assertIsNone(cap)

def test_to_dict(self):
"""Test converting agent to dictionary"""
agent = Agent(
id="agent-1",
name="Test Agent",
type="claude",
capabilities=[AgentCapability(name="testing")],
tags={"fast", "reliable"}
)
data = agent.to_dict()
self.assertEqual(data["id"], "agent-1")
self.assertEqual(data["name"], "Test Agent")
self.assertEqual(data["status"], "available")
self.assertEqual(len(data["capabilities"]), 1)
self.assertIn("fast", data["tags"])

def test_to_json(self):
"""Test converting agent to JSON"""
agent = Agent(
id="agent-1",
name="Test",
type="claude",
capabilities=[]
)
json_str = agent.to_json()
self.assertIn('"id": "agent-1"', json_str)

def test_from_dict(self):
"""Test creating agent from dictionary"""
data = {
"id": "agent-2",
"name": "From Dict",
"type": "gpt-4",
"capabilities": [{"name": "code-review"}],
"status": "busy",
"current_load": 3,
"max_concurrency": 5,
"health_score": 0.9,
"tags": ["premium"]
}
agent = Agent.from_dict(data)
self.assertEqual(agent.id, "agent-2")
self.assertEqual(agent.status, AgentStatus.BUSY)
self.assertEqual(agent.current_load, 3)
self.assertEqual(len(agent.capabilities), 1)
self.assertIn("premium", agent.tags)

def test_from_json(self):
"""Test creating agent from JSON"""
json_str = '{"id": "agent-3", "name": "JSON Agent", "type": "claude", "capabilities": []}'
agent = Agent.from_json(json_str)
self.assertEqual(agent.id, "agent-3")
self.assertEqual(agent.name, "JSON Agent")

def test_round_trip_serialization(self):
"""Test that agent survives JSON round-trip"""
original = Agent(
id="agent-rt",
name="Round Trip",
type="claude",
capabilities=[
AgentCapability(name="cap1", cost_per_invocation=0.1),
AgentCapability(name="cap2", avg_duration_seconds=30)
],
current_load=2,
max_concurrency=10,
health_score=0.95,
tags={"tag1", "tag2"}
)
json_str = original.to_json()
restored = Agent.from_json(json_str)

self.assertEqual(restored.id, original.id)
self.assertEqual(restored.name, original.name)
self.assertEqual(restored.current_load, original.current_load)
self.assertEqual(len(restored.capabilities), len(original.capabilities))
self.assertEqual(restored.tags, original.tags)

=============================================================================

Test LocalAgentRegistry

=============================================================================

class TestLocalAgentRegistry(unittest.TestCase): """Test LocalAgentRegistry"""

def setUp(self):
"""Set up test fixtures"""
self.registry = LocalAgentRegistry()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
"""Clean up"""
self.loop.close()

def test_register_agent(self):
"""Test registering an agent"""
agent = create_agent("test-1", "Test Agent", ["code-review"])

async def run():
agent_id = await self.registry.register(agent)
self.assertEqual(agent_id, "test-1")

retrieved = await self.registry.get("test-1")
self.assertIsNotNone(retrieved)
self.assertEqual(retrieved.name, "Test Agent")

self.loop.run_until_complete(run())

def test_unregister_agent(self):
"""Test unregistering an agent"""
agent = create_agent("test-2", "Test Agent", ["code-review"])

async def run():
await self.registry.register(agent)
success = await self.registry.unregister("test-2")
self.assertTrue(success)

retrieved = await self.registry.get("test-2")
self.assertIsNone(retrieved)

self.loop.run_until_complete(run())

def test_unregister_nonexistent(self):
"""Test unregistering nonexistent agent"""
async def run():
success = await self.registry.unregister("nonexistent")
self.assertFalse(success)

self.loop.run_until_complete(run())

def test_find_by_capability(self):
"""Test finding agents by capability"""
agents = [
create_agent("agent-1", "Agent 1", ["code-review", "testing"]),
create_agent("agent-2", "Agent 2", ["code-review", "documentation"]),
create_agent("agent-3", "Agent 3", ["security-audit"])
]

async def run():
for agent in agents:
await self.registry.register(agent)

# Find code-review agents
found = await self.registry.find_by_capability("code-review")
self.assertEqual(len(found), 2)
ids = {a.id for a in found}
self.assertIn("agent-1", ids)
self.assertIn("agent-2", ids)

# Find security-audit agents
found = await self.registry.find_by_capability("security-audit")
self.assertEqual(len(found), 1)
self.assertEqual(found[0].id, "agent-3")

# Find nonexistent capability
found = await self.registry.find_by_capability("nonexistent")
self.assertEqual(len(found), 0)

self.loop.run_until_complete(run())

def test_update_status(self):
"""Test updating agent status"""
agent = create_agent("test-update", "Test", ["testing"])

async def run():
await self.registry.register(agent)

success = await self.registry.update_status(
"test-update",
AgentStatus.BUSY,
3
)
self.assertTrue(success)

updated = await self.registry.get("test-update")
self.assertEqual(updated.status, AgentStatus.BUSY)
self.assertEqual(updated.current_load, 3)

self.loop.run_until_complete(run())

def test_update_health(self):
"""Test updating agent health score"""
agent = create_agent("test-health", "Test", ["testing"])

async def run():
await self.registry.register(agent)

await self.registry.update_health("test-health", 0.75)

updated = await self.registry.get("test-health")
self.assertEqual(updated.health_score, 0.75)

self.loop.run_until_complete(run())

def test_get_stats(self):
"""Test getting registry statistics"""
agents = [
create_agent("agent-1", "Agent 1", ["cap1"]),
create_agent("agent-2", "Agent 2", ["cap1", "cap2"])
]
agents[1].status = AgentStatus.BUSY

async def run():
for agent in agents:
await self.registry.register(agent)

stats = self.registry.get_stats()
self.assertEqual(stats["total_agents"], 2)
self.assertEqual(stats["available"], 1)
self.assertEqual(stats["busy"], 1)
self.assertEqual(stats["capabilities"], 2)

self.loop.run_until_complete(run())

=============================================================================

Test AgentDiscoveryService (Local Mode)

=============================================================================

class TestAgentDiscoveryServiceLocalMode(unittest.TestCase): """Test AgentDiscoveryService in local mode"""

def setUp(self):
"""Set up test fixtures"""
self.service = AgentDiscoveryService()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.service.connect(use_local_fallback=True))

def tearDown(self):
"""Clean up"""
self.loop.run_until_complete(self.service.disconnect())
self.loop.close()

def test_connect_local_mode(self):
"""Test connecting in local mode"""
self.assertTrue(self.service._connected)
self.assertTrue(self.service._use_local)

def test_register_and_get_agent(self):
"""Test registering and retrieving agent"""
agent = create_agent("local-1", "Local Agent", ["testing"])

async def run():
agent_id = await self.service.register_agent(agent)
self.assertEqual(agent_id, "local-1")

retrieved = await self.service.get_agent("local-1")
self.assertIsNotNone(retrieved)
self.assertEqual(retrieved.name, "Local Agent")

self.loop.run_until_complete(run())

def test_find_agents_by_capability(self):
"""Test finding agents by capability"""
agents = [
create_agent("cap-1", "Agent 1", ["code-review"], health_score=0.9),
create_agent("cap-2", "Agent 2", ["code-review"], health_score=0.6),
create_agent("cap-3", "Agent 3", ["testing"])
]
# Set one agent as busy
agents[1].status = AgentStatus.BUSY

async def run():
for agent in agents:
await self.service.register_agent(agent)

# Find available code-review agents with good health
found = await self.service.find_agents_by_capability(
"code-review",
min_health_score=0.7
)
self.assertEqual(len(found), 1)
self.assertEqual(found[0].id, "cap-1")

self.loop.run_until_complete(run())

def test_find_available_agents(self):
"""Test finding available agents"""
agents = [
create_agent("avail-1", "Agent 1", ["cap1", "cap2"]),
create_agent("avail-2", "Agent 2", ["cap1"]),
create_agent("avail-3", "Agent 3", ["cap2"])
]
agents[1].status = AgentStatus.OFFLINE

async def run():
for agent in agents:
await self.service.register_agent(agent)

# Find agents with both capabilities
found = await self.service.find_available_agents(
capabilities=["cap1", "cap2"]
)
self.assertEqual(len(found), 1)
self.assertEqual(found[0].id, "avail-1")

self.loop.run_until_complete(run())

def test_heartbeat(self):
"""Test agent heartbeat"""
agent = create_agent("heartbeat-test", "Test", ["testing"])

async def run():
await self.service.register_agent(agent)

success = await self.service.heartbeat(
"heartbeat-test",
AgentStatus.BUSY,
3,
health_score=0.85
)
self.assertTrue(success)

updated = await self.service.get_agent("heartbeat-test")
self.assertEqual(updated.status, AgentStatus.BUSY)
self.assertEqual(updated.current_load, 3)
self.assertEqual(updated.health_score, 0.85)

self.loop.run_until_complete(run())

def test_increment_decrement_load(self):
"""Test incrementing and decrementing load"""
agent = create_agent("load-test", "Test", ["testing"])

async def run():
await self.service.register_agent(agent)

await self.service.increment_load("load-test")
updated = await self.service.get_agent("load-test")
self.assertEqual(updated.current_load, 1)

await self.service.increment_load("load-test")
updated = await self.service.get_agent("load-test")
self.assertEqual(updated.current_load, 2)

await self.service.decrement_load("load-test")
updated = await self.service.get_agent("load-test")
self.assertEqual(updated.current_load, 1)

self.loop.run_until_complete(run())

=============================================================================

Test PriorityRouter

=============================================================================

class TestPriorityRouter(unittest.TestCase): """Test PriorityRouter with all strategies"""

def setUp(self):
"""Set up test fixtures"""
self.discovery = AgentDiscoveryService()
self.router = PriorityRouter(self.discovery)
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.discovery.connect(use_local_fallback=True))

def tearDown(self):
"""Clean up"""
self.loop.run_until_complete(self.discovery.disconnect())
self.loop.close()

def _register_test_agents(self):
"""Helper to register test agents"""
agents = [
create_agent("router-1", "Agent 1", ["code-review", "testing"],
max_concurrency=5),
create_agent("router-2", "Agent 2", ["code-review"],
max_concurrency=3),
create_agent("router-3", "Agent 3", ["security-audit"],
max_concurrency=2)
]

async def run():
for agent in agents:
await self.discovery.register_agent(agent)

self.loop.run_until_complete(run())
return agents

def test_route_task_success(self):
"""Test successful task routing"""
self._register_test_agents()

async def run():
decision = await self.router.route_task(
task_id="task-1",
required_capability="code-review",
priority=5
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)
self.assertIsNotNone(decision.selected_agent)
self.assertIn(decision.selected_agent.id, ["router-1", "router-2"])

self.loop.run_until_complete(run())

def test_route_task_no_capability(self):
"""Test routing when no agents have capability"""
self._register_test_agents()

async def run():
decision = await self.router.route_task(
task_id="task-2",
required_capability="machine-learning",
priority=5
)
self.assertEqual(decision.result, RoutingResult.NO_CAPABLE_AGENTS)
self.assertIsNone(decision.selected_agent)

self.loop.run_until_complete(run())

def test_route_task_all_busy(self):
"""Test routing when all capable agents are busy"""
agents = self._register_test_agents()

async def run():
# Make all code-review agents busy
await self.discovery.heartbeat("router-1", AgentStatus.BUSY, 5)
await self.discovery.heartbeat("router-2", AgentStatus.BUSY, 3)

decision = await self.router.route_task(
task_id="task-3",
required_capability="code-review",
priority=5
)
self.assertEqual(decision.result, RoutingResult.ALL_AGENTS_BUSY)
self.assertIsNotNone(decision.retry_after)

self.loop.run_until_complete(run())

def test_route_task_least_loaded_strategy(self):
"""Test LEAST_LOADED routing strategy"""
self._register_test_agents()

async def run():
# Set different loads
await self.discovery.heartbeat("router-1", AgentStatus.AVAILABLE, 4) # 80% loaded
await self.discovery.heartbeat("router-2", AgentStatus.AVAILABLE, 1) # 33% loaded

decision = await self.router.route_task(
task_id="task-ll",
required_capability="code-review",
priority=5,
strategy=RoutingStrategy.LEAST_LOADED
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)
self.assertEqual(decision.selected_agent.id, "router-2")

self.loop.run_until_complete(run())

def test_route_task_round_robin_strategy(self):
"""Test ROUND_ROBIN routing strategy"""
self._register_test_agents()

async def run():
selected_ids = []
for i in range(6): # More than number of agents
decision = await self.router.route_task(
task_id=f"task-rr-{i}",
required_capability="code-review",
priority=5,
strategy=RoutingStrategy.ROUND_ROBIN
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)
selected_ids.append(decision.selected_agent.id)

# Should cycle through agents
# Both router-1 and router-2 have code-review
self.assertIn("router-1", selected_ids)
self.assertIn("router-2", selected_ids)

self.loop.run_until_complete(run())

def test_route_task_priority_first_strategy(self):
"""Test PRIORITY_FIRST routing strategy"""
async def run():
# Create agents with different health scores
agent1 = create_agent("pf-1", "Healthy Agent", ["analysis"],
health_score=0.95)
agent2 = create_agent("pf-2", "Less Healthy", ["analysis"],
health_score=0.6)

await self.discovery.register_agent(agent1)
await self.discovery.register_agent(agent2)

# High priority task should get healthier agent
decision = await self.router.route_task(
task_id="task-pf",
required_capability="analysis",
priority=9, # CRITICAL
strategy=RoutingStrategy.PRIORITY_FIRST
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)
self.assertEqual(decision.selected_agent.id, "pf-1")

self.loop.run_until_complete(run())

def test_route_task_cost_optimized_strategy(self):
"""Test COST_OPTIMIZED routing strategy"""
async def run():
# Create agents with different costs
agent1 = Agent(
id="cost-1",
name="Expensive Agent",
type="claude",
capabilities=[AgentCapability(name="processing", cost_per_invocation=0.10)]
)
agent2 = Agent(
id="cost-2",
name="Cheap Agent",
type="claude",
capabilities=[AgentCapability(name="processing", cost_per_invocation=0.01)]
)

await self.discovery.register_agent(agent1)
await self.discovery.register_agent(agent2)

decision = await self.router.route_task(
task_id="task-cost",
required_capability="processing",
priority=5,
strategy=RoutingStrategy.COST_OPTIMIZED
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)
self.assertEqual(decision.selected_agent.id, "cost-2")

self.loop.run_until_complete(run())

def test_route_task_latency_optimized_strategy(self):
"""Test LATENCY_OPTIMIZED routing strategy"""
async def run():
# Create agents with different latencies
agent1 = Agent(
id="lat-1",
name="Slow Agent",
type="claude",
capabilities=[AgentCapability(name="query", avg_duration_seconds=120)]
)
agent2 = Agent(
id="lat-2",
name="Fast Agent",
type="claude",
capabilities=[AgentCapability(name="query", avg_duration_seconds=10)]
)

await self.discovery.register_agent(agent1)
await self.discovery.register_agent(agent2)

decision = await self.router.route_task(
task_id="task-lat",
required_capability="query",
priority=5,
strategy=RoutingStrategy.LATENCY_OPTIMIZED
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)
self.assertEqual(decision.selected_agent.id, "lat-2")

self.loop.run_until_complete(run())

def test_route_task_preferred_agent(self):
"""Test routing with preferred agent"""
self._register_test_agents()

async def run():
decision = await self.router.route_task(
task_id="task-pref",
required_capability="code-review",
priority=5,
preferred_agent_id="router-2"
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)
self.assertEqual(decision.selected_agent.id, "router-2")

self.loop.run_until_complete(run())

def test_route_task_excluded_agents(self):
"""Test routing with excluded agents"""
self._register_test_agents()

async def run():
decision = await self.router.route_task(
task_id="task-excl",
required_capability="code-review",
priority=5,
excluded_agents={"router-1"}
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)
self.assertEqual(decision.selected_agent.id, "router-2")

self.loop.run_until_complete(run())

def test_routing_stats(self):
"""Test routing statistics"""
self._register_test_agents()

async def run():
# Make some routing decisions
for i in range(5):
await self.router.route_task(
task_id=f"stats-{i}",
required_capability="code-review",
priority=5
)

# One failure
await self.router.route_task(
task_id="stats-fail",
required_capability="nonexistent",
priority=5
)

stats = self.router.get_routing_stats()
self.assertEqual(stats["total_routes"], 6)
self.assertGreater(stats["success_rate"], 0.8)
self.assertIn("success", stats["by_result"])
self.assertIn("no_capable_agents", stats["by_result"])

self.loop.run_until_complete(run())

def test_clear_history(self):
"""Test clearing routing history"""
self._register_test_agents()

async def run():
await self.router.route_task(
task_id="clear-test",
required_capability="code-review",
priority=5
)
self.assertEqual(len(self.router.routing_history), 1)

self.router.clear_history()
self.assertEqual(len(self.router.routing_history), 0)

self.loop.run_until_complete(run())

=============================================================================

Test IntegratedRouter

=============================================================================

class TestIntegratedRouter(unittest.TestCase): """Test IntegratedRouter"""

def setUp(self):
"""Set up test fixtures"""
self.integrated = IntegratedRouter()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
"""Clean up"""
self.loop.run_until_complete(self.integrated.disconnect())
self.loop.close()

def test_connect(self):
"""Test connecting integrated router"""
async def run():
connected = await self.integrated.connect(use_local_fallback=True)
self.assertTrue(connected)
self.assertTrue(self.integrated._connected)

self.loop.run_until_complete(run())

def test_register_and_route(self):
"""Test registering agent and routing task"""
async def run():
await self.integrated.connect(use_local_fallback=True)

# Register agent
agent = create_agent("int-1", "Integrated Agent", ["processing"])
await self.integrated.register_agent(agent)

# Route task
decision = await self.integrated.route_and_dispatch(
task_id="int-task-1",
capability="processing",
payload={"data": "test"},
priority=7
)
self.assertEqual(decision.result, RoutingResult.SUCCESS)

self.loop.run_until_complete(run())

def test_heartbeat(self):
"""Test heartbeat through integrated router"""
async def run():
await self.integrated.connect(use_local_fallback=True)

agent = create_agent("int-hb", "Heartbeat Test", ["testing"])
await self.integrated.register_agent(agent)

success = await self.integrated.heartbeat(
"int-hb",
AgentStatus.BUSY,
2
)
self.assertTrue(success)

self.loop.run_until_complete(run())

def test_get_stats(self):
"""Test getting combined statistics"""
async def run():
await self.integrated.connect(use_local_fallback=True)

stats = self.integrated.get_stats()
self.assertIn("discovery", stats)
self.assertIn("routing", stats)
self.assertTrue(stats["connected"])

self.loop.run_until_complete(run())

=============================================================================

Test Helper Functions

=============================================================================

class TestHelperFunctions(unittest.TestCase): """Test helper functions"""

def test_create_agent_simple(self):
"""Test create_agent helper"""
agent = create_agent(
agent_id="helper-1",
name="Helper Agent",
capabilities=["cap1", "cap2", "cap3"]
)
self.assertEqual(agent.id, "helper-1")
self.assertEqual(agent.name, "Helper Agent")
self.assertEqual(agent.type, "coditect")
self.assertEqual(len(agent.capabilities), 3)
self.assertTrue(agent.has_capability("cap1"))

def test_create_agent_with_options(self):
"""Test create_agent with all options"""
agent = create_agent(
agent_id="helper-2",
name="Custom Agent",
capabilities=["cap1"],
agent_type="claude",
max_concurrency=10,
health_score=0.9,
priority_boost=2
)
self.assertEqual(agent.type, "claude")
self.assertEqual(agent.max_concurrency, 10)
self.assertEqual(agent.health_score, 0.9)
self.assertEqual(agent.priority_boost, 2)

=============================================================================

Test RoutingDecision

=============================================================================

class TestRoutingDecision(unittest.TestCase): """Test RoutingDecision dataclass"""

def test_create_success_decision(self):
"""Test creating a successful routing decision"""
agent = create_agent("rd-agent", "Test", ["testing"])
decision = RoutingDecision(
task_id="rd-task-1",
result=RoutingResult.SUCCESS,
selected_agent=agent,
routing_strategy=RoutingStrategy.LEAST_LOADED,
score=0.85,
reason="Selected via least_loaded strategy"
)
self.assertEqual(decision.task_id, "rd-task-1")
self.assertEqual(decision.result, RoutingResult.SUCCESS)
self.assertEqual(decision.selected_agent.id, "rd-agent")
self.assertEqual(decision.score, 0.85)

def test_to_dict(self):
"""Test converting decision to dictionary"""
decision = RoutingDecision(
task_id="rd-task-2",
result=RoutingResult.NO_CAPABLE_AGENTS,
reason="No agents found"
)
data = decision.to_dict()
self.assertEqual(data["task_id"], "rd-task-2")
self.assertEqual(data["result"], "no_capable_agents")
self.assertIsNone(data["selected_agent_id"])

=============================================================================

Test AgentDiscoveryConfig

=============================================================================

class TestAgentDiscoveryConfig(unittest.TestCase): """Test AgentDiscoveryConfig"""

def test_default_config(self):
"""Test default configuration"""
config = AgentDiscoveryConfig()
self.assertEqual(config.redis_host, "localhost")
self.assertEqual(config.redis_port, 6379)
self.assertEqual(config.agent_ttl_seconds, 300)
self.assertEqual(config.min_health_score, 0.5)

def test_custom_config(self):
"""Test custom configuration"""
config = AgentDiscoveryConfig(
redis_host="redis.example.com",
redis_port=6380,
redis_password="secret",
agent_ttl_seconds=600
)
self.assertEqual(config.redis_host, "redis.example.com")
self.assertEqual(config.redis_port, 6380)
self.assertEqual(config.redis_password, "secret")

=============================================================================

Main

=============================================================================

def run_tests(): """Run all tests with summary""" loader = unittest.TestLoader() suite = unittest.TestSuite()

# Add all test classes
test_classes = [
TestAgentStatus,
TestRoutingStrategy,
TestRoutingResult,
TestAgentCapability,
TestAgent,
TestLocalAgentRegistry,
TestAgentDiscoveryServiceLocalMode,
TestPriorityRouter,
TestIntegratedRouter,
TestHelperFunctions,
TestRoutingDecision,
TestAgentDiscoveryConfig,
]

for test_class in test_classes:
tests = loader.loadTestsFromTestCase(test_class)
suite.addTests(tests)

# Run tests
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)

# Print summary
print("\n" + "=" * 70)
print("TEST SUMMARY")
print("=" * 70)
print(f"Tests Run: {result.testsRun}")
print(f"Failures: {len(result.failures)}")
print(f"Errors: {len(result.errors)}")
print(f"Skipped: {len(result.skipped)}")
print(f"\nOverall: {'PASSED' if result.wasSuccessful() else 'FAILED'}")
print("=" * 70)

return result.wasSuccessful()

if name == "main": success = run_tests() exit(0 if success else 1)