Skip to main content

#!/usr/bin/env python3 """ CODITECT Agent-to-Agent Task Delegation Tests

Part of Track H.2.6: Test agent-to-agent task delegation Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications

This module provides comprehensive integration tests for the full agent-to-agent task delegation flow using all H.2 components:

  • H.2.1: MessageBus (message_bus.py)
  • H.2.2: TaskQueueManager (task_queue_manager.py)
  • H.2.3: PriorityQueueRouter (priority_queue_router.py)
  • H.2.4: CircuitBreaker (circuit_breaker.py)
  • H.2.5: RetryEngine (retry_engine.py)
  • Supporting: DiscoveryService (discovery_service.py)

Test Categories: 1. Message Bus Integration Tests 2. Task Queue Integration Tests 3. Discovery Service Integration Tests 4. Priority Queue Router Integration Tests 5. Full Delegation Flow Tests 6. Circuit Breaker Integration Tests 7. Retry Engine Integration Tests 8. Fault Tolerance Tests 9. Performance Tests 10. End-to-End Scenarios

Run: pytest scripts/core/test_agent_delegation.py -v

Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0 """

import asyncio import pytest import time import uuid from datetime import datetime, timezone, timedelta from typing import Dict, List, Any, Optional, Tuple from unittest.mock import Mock, AsyncMock, patch from dataclasses import dataclass, field

Import H.2 components

from message_bus import ( MessageBus, AgentMessage, MessageType, MessagePriority, MessageBusConfig, LocalMessageQueue, ) from task_queue_manager import ( TaskQueueManager, Task, TaskStatus, TaskPriority, TaskQueueConfig, LocalTaskQueue, ) from priority_queue_router import ( PriorityQueueRouter, LocalPriorityQueueRouter, RoutingRule, QueueConfig, DequeueStrategy, BoostPolicy, ) from circuit_breaker import ( CircuitBreaker, CircuitBreakerConfig, CircuitState, CircuitBreakerRegistry, CircuitOpenError, ) from retry_engine import ( RetryEngine, RetryConfig, RetryPolicy, BackoffStrategy, JitterType, MaxRetriesExceeded, retry, ) from discovery_service import ( DiscoveryService, Component, Capability, ComponentStatus, LocalDiscoveryBackend, DiscoveryResult, )

=============================================================================

Constants

=============================================================================

Queue names used by the router

QUEUE_CRITICAL = "critical" QUEUE_HIGH = "high" QUEUE_NORMAL = "normal" QUEUE_BACKGROUND = "background"

=============================================================================

Test Fixtures

=============================================================================

@pytest.fixture def local_message_queue(): """Local message queue for testing.""" return LocalMessageQueue()

@pytest.fixture def local_task_queue(): """Local task queue for testing.""" return LocalTaskQueue()

@pytest.fixture def local_discovery(): """Local discovery service for testing.""" # Use force_local to ensure we use LocalDiscoveryBackend return DiscoveryService(force_local=True)

@pytest.fixture def local_router(): """Local priority queue router for testing.""" return LocalPriorityQueueRouter()

@pytest.fixture def circuit_breaker(): """Circuit breaker for testing.""" config = CircuitBreakerConfig( fail_max=3, recovery_timeout=1.0, # Short for testing half_open_max_calls=2, ) return CircuitBreaker("test-service", config)

@pytest.fixture def retry_engine(): """Retry engine for testing.""" config = RetryConfig( max_attempts=3, base_delay=0.01, jitter=JitterType.NONE, ) return RetryEngine(config)

@pytest.fixture def sample_agent_message(): """Sample agent message for testing.""" return AgentMessage( from_agent="orchestrator", to_agent="code-review-agent", task_id="task-001", message_type=MessageType.TASK_REQUEST.value, payload={"code": "def hello(): pass", "language": "python"}, priority=MessagePriority.NORMAL.value, )

@pytest.fixture def sample_task(): """Sample task for testing.""" return Task( description="Review code for security issues", agent="security-scanner", inputs={"code": "user_input = input()", "checks": ["injection", "xss"]}, priority=TaskPriority.HIGH.value, )

@pytest.fixture def sample_component(): """Sample component for testing.""" return Component( id="agent/code-review-agent", name="code-review-agent", component_type="agent", capabilities=[ Capability( name="code_review", description="Reviews code for quality issues", tags=["code", "review", "quality"], ), Capability( name="security_scan", description="Scans code for security vulnerabilities", tags=["security", "scan"], ), ], status=ComponentStatus.AVAILABLE, max_concurrency=5, )

=============================================================================

Agent Orchestrator (Test Helper)

=============================================================================

class AgentOrchestrator: """ Orchestrates agent-to-agent task delegation for testing.

This class combines all H.2 components into a unified delegation flow:
1. Discover target agent by capability
2. Route task to appropriate queue
3. Send message via message queue
4. Manage task in task queue
5. Handle failures with circuit breaker and retry
"""

def __init__(
self,
message_queue: LocalMessageQueue,
task_queue: LocalTaskQueue,
discovery: DiscoveryService,
router: LocalPriorityQueueRouter,
circuit_breaker: Optional[CircuitBreaker] = None,
retry_engine: Optional[RetryEngine] = None,
):
self.message_queue = message_queue
self.task_queue = task_queue
self.discovery = discovery
self.router = router
self.circuit_breaker = circuit_breaker
self.retry_engine = retry_engine
self.delegation_log: List[Dict[str, Any]] = []

def _get_queue_for_priority(self, priority: int) -> str:
"""Determine queue name based on priority."""
if priority >= TaskPriority.CRITICAL.value:
return QUEUE_CRITICAL
elif priority >= TaskPriority.HIGH.value:
return QUEUE_HIGH
elif priority >= TaskPriority.LOW.value:
return QUEUE_NORMAL
else:
return QUEUE_BACKGROUND

async def delegate_task(
self,
from_agent: str,
capability: str,
task_description: str,
inputs: Dict[str, Any],
priority: int = TaskPriority.NORMAL.value,
) -> str:
"""
Delegate a task to an agent with the specified capability.

Flow:
1. Discover agent with capability
2. Create task and route to queue
3. Send message to agent
4. Return task ID
"""
# 1. Discover target agent
result = await self.discovery.find_by_capability(capability)
agents = result.components if isinstance(result, DiscoveryResult) else result
if not agents:
raise ValueError(f"No agent found with capability: {capability}")

# Select first available agent (load balancing could be added)
target_agent = agents[0]

# 2. Create task
task = Task(
description=task_description,
agent=target_agent.name,
inputs=inputs,
priority=priority,
)

# 3. Determine queue based on priority
queue_name = self._get_queue_for_priority(priority)

# 4. Enqueue task using router (task, target_queue)
self.router.enqueue(task, queue_name)

# 5. Also add to task queue for tracking (synchronous)
self.task_queue.enqueue(task, priority=priority)

# 6. Send message to agent
message = AgentMessage(
from_agent=from_agent,
to_agent=target_agent.name,
task_id=task.id,
message_type=MessageType.TASK_REQUEST.value,
payload={"task": task.to_dict()},
priority=priority,
)

self.message_queue.publish(target_agent.name, message)

# Log delegation
self.delegation_log.append({
"task_id": task.id,
"from_agent": from_agent,
"to_agent": target_agent.name,
"capability": capability,
"queue": queue_name,
"priority": priority,
"timestamp": datetime.utcnow().isoformat(),
})

return task.id

async def delegate_with_retry(
self,
from_agent: str,
capability: str,
task_description: str,
inputs: Dict[str, Any],
) -> str:
"""Delegate task with retry logic."""
if not self.retry_engine:
raise RuntimeError("Retry engine not configured")

async def _delegate():
return await self.delegate_task(
from_agent, capability, task_description, inputs
)

return await self.retry_engine.execute(_delegate)

async def delegate_with_circuit_breaker(
self,
from_agent: str,
capability: str,
task_description: str,
inputs: Dict[str, Any],
) -> str:
"""Delegate task with circuit breaker protection."""
if not self.circuit_breaker:
raise RuntimeError("Circuit breaker not configured")

async def _delegate():
return await self.delegate_task(
from_agent, capability, task_description, inputs
)

return await self.circuit_breaker.call(_delegate)

=============================================================================

1. Message Queue Integration Tests

=============================================================================

class TestMessageQueueIntegration: """Integration tests for LocalMessageQueue."""

def test_publish_subscribe(self, local_message_queue):
"""Test basic publish/subscribe functionality."""
received_messages = []

def handler(message: AgentMessage):
received_messages.append(message)

# Subscribe
local_message_queue.subscribe("test-agent", handler)

# Publish
message = AgentMessage(
from_agent="sender",
to_agent="test-agent",
task_id="task-1",
payload={"data": "test"},
)
local_message_queue.publish("test-agent", message)

# Start processing and give it time
local_message_queue.start_processing()
time.sleep(0.2)
local_message_queue.stop_processing()

assert len(received_messages) == 1
assert received_messages[0].task_id == "task-1"

def test_multiple_subscribers(self, local_message_queue):
"""Test multiple agents subscribing to different queues."""
agent1_messages = []
agent2_messages = []

def handler1(msg): agent1_messages.append(msg)
def handler2(msg): agent2_messages.append(msg)

local_message_queue.subscribe("agent-1", handler1)
local_message_queue.subscribe("agent-2", handler2)

# Send to agent-1
local_message_queue.publish("agent-1", AgentMessage(
from_agent="sender", to_agent="agent-1", task_id="t1"
))

# Send to agent-2
local_message_queue.publish("agent-2", AgentMessage(
from_agent="sender", to_agent="agent-2", task_id="t2"
))

local_message_queue.start_processing()
time.sleep(0.2)
local_message_queue.stop_processing()

assert len(agent1_messages) == 1
assert len(agent2_messages) == 1
assert agent1_messages[0].task_id == "t1"
assert agent2_messages[0].task_id == "t2"

def test_message_to_json_roundtrip(self, sample_agent_message):
"""Test message serialization roundtrip."""
json_str = sample_agent_message.to_json()
restored = AgentMessage.from_json(json_str)

assert restored.from_agent == sample_agent_message.from_agent
assert restored.to_agent == sample_agent_message.to_agent
assert restored.task_id == sample_agent_message.task_id
assert restored.payload == sample_agent_message.payload

def test_publish_to_queue(self, local_message_queue):
"""Test publishing to queue creates entry."""
for i in range(5):
local_message_queue.publish("agent", AgentMessage(
from_agent="sender", to_agent="agent", task_id=f"t{i}"
))

# Verify queue has messages
q = local_message_queue.get_queue("agent")
assert q.qsize() == 5

=============================================================================

2. Task Queue Integration Tests

=============================================================================

class TestTaskQueueIntegration: """Integration tests for LocalTaskQueue (synchronous)."""

def test_enqueue_dequeue(self, local_task_queue, sample_task):
"""Test basic enqueue/dequeue functionality."""
local_task_queue.enqueue(sample_task)

dequeued = local_task_queue.dequeue()

assert dequeued is not None
assert dequeued.id == sample_task.id
assert dequeued.status == TaskStatus.IN_PROGRESS.value

def test_priority_ordering(self, local_task_queue):
"""Test tasks are dequeued by priority."""
# Enqueue tasks with different priorities
low_task = Task(description="Low", priority=TaskPriority.LOW.value)
high_task = Task(description="High", priority=TaskPriority.HIGH.value)
critical_task = Task(description="Critical", priority=TaskPriority.CRITICAL.value)

local_task_queue.enqueue(low_task, priority=low_task.priority)
local_task_queue.enqueue(high_task, priority=high_task.priority)
local_task_queue.enqueue(critical_task, priority=critical_task.priority)

# Dequeue should return highest priority first
first = local_task_queue.dequeue()
second = local_task_queue.dequeue()
third = local_task_queue.dequeue()

assert first.priority == TaskPriority.CRITICAL.value
assert second.priority == TaskPriority.HIGH.value
assert third.priority == TaskPriority.LOW.value

def test_task_completion(self, local_task_queue, sample_task):
"""Test task completion updates status."""
local_task_queue.enqueue(sample_task)
task = local_task_queue.dequeue()

result = {"status": "success", "issues_found": 0}
local_task_queue.complete(task.id, result=result)

completed_task = local_task_queue.get_task(task.id)
assert completed_task.status == TaskStatus.COMPLETED.value
assert completed_task.result == result

def test_task_failure(self, local_task_queue, sample_task):
"""Test task failure handling."""
local_task_queue.enqueue(sample_task)
task = local_task_queue.dequeue()

local_task_queue.fail(task.id, error="Something went wrong", retry=False)

failed_task = local_task_queue.get_task(task.id)
assert failed_task.status == TaskStatus.FAILED.value
assert "Something went wrong" in failed_task.error

def test_task_to_dict_roundtrip(self, sample_task):
"""Test task serialization roundtrip."""
task_dict = sample_task.to_dict()
restored = Task.from_dict(task_dict)

assert restored.id == sample_task.id
assert restored.description == sample_task.description
assert restored.agent == sample_task.agent
assert restored.priority == sample_task.priority

def test_queue_statistics(self, local_task_queue):
"""Test queue statistics tracking."""
# Enqueue several tasks
for i in range(5):
local_task_queue.enqueue(Task(description=f"Task {i}"))

stats = local_task_queue.get_stats()

assert stats["ready"] == 5
assert stats["total_tasks"] >= 5

=============================================================================

3. Discovery Service Integration Tests

=============================================================================

class TestDiscoveryIntegration: """Integration tests for DiscoveryService."""

@pytest.mark.asyncio
async def test_register_component(self, local_discovery, sample_component):
"""Test component registration."""
await local_discovery.register(sample_component)

component = await local_discovery.get(sample_component.id)
assert component is not None
assert component.name == sample_component.name

@pytest.mark.asyncio
async def test_find_by_capability(self, local_discovery, sample_component):
"""Test finding components by capability."""
await local_discovery.register(sample_component)

# Find by existing capability
result = await local_discovery.find_by_capability("code_review")
assert result.total_matches >= 1
assert any(c.id == sample_component.id for c in result.components)

# Find by non-existent capability
result = await local_discovery.find_by_capability("nonexistent")
assert result.total_matches == 0

@pytest.mark.asyncio
async def test_multiple_agents_same_capability(self, local_discovery):
"""Test multiple agents with same capability."""
agent1 = Component(
id="agent/reviewer-1",
name="reviewer-1",
component_type="agent",
capabilities=[Capability(name="code_review")],
)
agent2 = Component(
id="agent/reviewer-2",
name="reviewer-2",
component_type="agent",
capabilities=[Capability(name="code_review")],
)

await local_discovery.register(agent1)
await local_discovery.register(agent2)

result = await local_discovery.find_by_capability("code_review")
assert result.total_matches == 2

@pytest.mark.asyncio
async def test_unregister_component(self, local_discovery, sample_component):
"""Test component unregistration."""
await local_discovery.register(sample_component)
await local_discovery.unregister(sample_component.id)

component = await local_discovery.get(sample_component.id)
assert component is None

@pytest.mark.asyncio
async def test_heartbeat(self, local_discovery, sample_component):
"""Test updating component via heartbeat."""
await local_discovery.register(sample_component)

await local_discovery.heartbeat(
sample_component.id,
ComponentStatus.BUSY,
load=3,
)

component = await local_discovery.get(sample_component.id)
assert component.status == ComponentStatus.BUSY
assert component.current_load == 3

@pytest.mark.asyncio
async def test_list_all_components(self, local_discovery, sample_component):
"""Test listing all registered components."""
await local_discovery.register(sample_component)

components = await local_discovery.list_all()
assert len(components) >= 1

=============================================================================

4. Priority Queue Router Integration Tests

=============================================================================

class TestPriorityQueueRouterIntegration: """Integration tests for LocalPriorityQueueRouter."""

def test_enqueue_to_named_queue(self, local_router):
"""Test enqueueing to a named queue."""
task = Task(description="Test task", priority=TaskPriority.CRITICAL.value)
task_id, queue_name = local_router.enqueue(task, QUEUE_CRITICAL)

assert queue_name == QUEUE_CRITICAL
stats = local_router.get_stats()
assert stats.tasks_per_queue.get(QUEUE_CRITICAL, 0) >= 1

def test_dequeue_priority_order(self, local_router):
"""Test dequeue returns highest priority queue first."""
# Add tasks to different queues
background_task = Task(description="Background", priority=TaskPriority.BACKGROUND.value)
critical_task = Task(description="Critical", priority=TaskPriority.CRITICAL.value)

local_router.enqueue(background_task, QUEUE_BACKGROUND)
local_router.enqueue(critical_task, QUEUE_CRITICAL)

# Strict priority should return critical first
result = local_router.dequeue()
assert result is not None
task, queue_name = result
assert task.description == "Critical"
assert queue_name == QUEUE_CRITICAL

def test_custom_routing_rules(self, local_router):
"""Test custom routing rules."""
# Add rule for security tasks
local_router.add_rule(RoutingRule(
name="security-to-critical",
condition=lambda t: "security" in t.description.lower(),
target_queue=QUEUE_CRITICAL,
priority=5, # High priority rule
))

task = Task(description="Security scan needed", priority=TaskPriority.NORMAL.value)

# Route based on rules - should go to critical
task_id, queue_name = local_router.enqueue(task)
assert queue_name == QUEUE_CRITICAL

def test_dequeue_from_specific_queue(self, local_router):
"""Test dequeuing from a specific queue."""
task = Task(description="Normal task", priority=TaskPriority.NORMAL.value)
local_router.enqueue(task, QUEUE_NORMAL)

result = local_router.dequeue_from(QUEUE_NORMAL)
assert result is not None
assert result.description == "Normal task"

def test_queue_stats(self, local_router):
"""Test queue statistics."""
# Add tasks to multiple queues
local_router.enqueue(Task(description="High 1", priority=TaskPriority.HIGH.value), QUEUE_HIGH)
local_router.enqueue(Task(description="Normal 1", priority=TaskPriority.NORMAL.value), QUEUE_NORMAL)

stats = local_router.get_stats()
assert stats.total_tasks >= 2
assert stats.total_queues >= 4 # Default queues

=============================================================================

5. Full Delegation Flow Tests

=============================================================================

class TestFullDelegationFlow: """End-to-end tests for agent-to-agent task delegation."""

@pytest.fixture
def orchestrator(
self,
local_message_queue,
local_task_queue,
local_discovery,
local_router,
circuit_breaker,
retry_engine,
):
"""Create fully configured orchestrator."""
return AgentOrchestrator(
message_queue=local_message_queue,
task_queue=local_task_queue,
discovery=local_discovery,
router=local_router,
circuit_breaker=circuit_breaker,
retry_engine=retry_engine,
)

@pytest.mark.asyncio
async def test_simple_delegation(self, orchestrator, sample_component):
"""Test simple task delegation from one agent to another."""
# Register target agent
await orchestrator.discovery.register(sample_component)

# Delegate task
task_id = await orchestrator.delegate_task(
from_agent="orchestrator",
capability="code_review",
task_description="Review PR #123",
inputs={"pr_number": 123, "repo": "coditect/core"},
)

assert task_id is not None
assert len(orchestrator.delegation_log) == 1
assert orchestrator.delegation_log[0]["to_agent"] == "code-review-agent"

@pytest.mark.asyncio
async def test_delegation_no_agent_found(self, orchestrator):
"""Test delegation fails when no agent has capability."""
with pytest.raises(ValueError, match="No agent found"):
await orchestrator.delegate_task(
from_agent="orchestrator",
capability="nonexistent_capability",
task_description="Test",
inputs={},
)

@pytest.mark.asyncio
async def test_delegation_chain(self, orchestrator):
"""Test chain of delegations (A → B → C)."""
# Register multiple agents
agent_a = Component(
id="agent/planner",
name="planner",
component_type="agent",
capabilities=[Capability(name="planning")],
)
agent_b = Component(
id="agent/coder",
name="coder",
component_type="agent",
capabilities=[Capability(name="coding")],
)
agent_c = Component(
id="agent/tester",
name="tester",
component_type="agent",
capabilities=[Capability(name="testing")],
)

await orchestrator.discovery.register(agent_a)
await orchestrator.discovery.register(agent_b)
await orchestrator.discovery.register(agent_c)

# Chain of delegations
task1 = await orchestrator.delegate_task(
from_agent="user",
capability="planning",
task_description="Plan feature X",
inputs={"feature": "X"},
)

task2 = await orchestrator.delegate_task(
from_agent="planner",
capability="coding",
task_description="Implement feature X",
inputs={"plan_id": task1},
)

task3 = await orchestrator.delegate_task(
from_agent="coder",
capability="testing",
task_description="Test feature X",
inputs={"code_id": task2},
)

assert len(orchestrator.delegation_log) == 3
assert orchestrator.delegation_log[0]["to_agent"] == "planner"
assert orchestrator.delegation_log[1]["to_agent"] == "coder"
assert orchestrator.delegation_log[2]["to_agent"] == "tester"

@pytest.mark.asyncio
async def test_priority_based_delegation(self, orchestrator, sample_component):
"""Test delegation respects priority."""
await orchestrator.discovery.register(sample_component)

# Delegate with critical priority
task_id = await orchestrator.delegate_task(
from_agent="orchestrator",
capability="code_review",
task_description="Emergency review",
inputs={},
priority=TaskPriority.CRITICAL.value,
)

log_entry = orchestrator.delegation_log[0]
assert log_entry["priority"] == TaskPriority.CRITICAL.value
assert log_entry["queue"] == QUEUE_CRITICAL

@pytest.mark.asyncio
async def test_multiple_tasks_to_same_agent(self, orchestrator, sample_component):
"""Test sending multiple tasks to the same agent."""
await orchestrator.discovery.register(sample_component)

tasks = []
for i in range(5):
task_id = await orchestrator.delegate_task(
from_agent="batch-processor",
capability="code_review",
task_description=f"Review file {i}",
inputs={"file_index": i},
)
tasks.append(task_id)

assert len(tasks) == 5
assert len(set(tasks)) == 5 # All unique IDs

=============================================================================

6. Circuit Breaker Integration Tests

=============================================================================

class TestCircuitBreakerIntegration: """Integration tests for circuit breaker with delegation."""

@pytest.fixture
def orchestrator_with_breaker(
self,
local_message_queue,
local_task_queue,
local_discovery,
local_router,
):
"""Orchestrator with circuit breaker."""
breaker = CircuitBreaker(
"delegation-breaker",
CircuitBreakerConfig(
fail_max=2,
recovery_timeout=0.5,
),
)
return AgentOrchestrator(
message_queue=local_message_queue,
task_queue=local_task_queue,
discovery=local_discovery,
router=local_router,
circuit_breaker=breaker,
)

@pytest.mark.asyncio
async def test_circuit_opens_on_failures(self, orchestrator_with_breaker):
"""Test circuit breaker opens after failures."""
# No agents registered - will fail
for i in range(3):
try:
await orchestrator_with_breaker.delegate_with_circuit_breaker(
from_agent="test",
capability="nonexistent",
task_description="Will fail",
inputs={},
)
except (ValueError, CircuitOpenError):
pass

# Circuit should be open now
assert orchestrator_with_breaker.circuit_breaker.state == CircuitState.OPEN

@pytest.mark.asyncio
async def test_circuit_half_open_recovery(self, orchestrator_with_breaker, sample_component):
"""Test circuit breaker recovery."""
# First, fail enough to open circuit
for i in range(3):
try:
await orchestrator_with_breaker.delegate_with_circuit_breaker(
from_agent="test",
capability="nonexistent",
task_description="Will fail",
inputs={},
)
except (ValueError, CircuitOpenError):
pass

# Wait for recovery timeout
await asyncio.sleep(0.6)

# Register an agent and try again
await orchestrator_with_breaker.discovery.register(sample_component)

# Circuit should allow test call
task_id = await orchestrator_with_breaker.delegate_with_circuit_breaker(
from_agent="test",
capability="code_review",
task_description="Should work",
inputs={},
)

assert task_id is not None

def test_circuit_breaker_states(self, circuit_breaker):
"""Test circuit breaker state transitions."""
assert circuit_breaker.is_closed
assert not circuit_breaker.is_open
assert not circuit_breaker.is_half_open

def test_circuit_breaker_metrics(self, circuit_breaker):
"""Test circuit breaker metrics tracking."""
metrics = circuit_breaker.get_metrics()
assert metrics.total_calls == 0
assert metrics.failure_count == 0

=============================================================================

7. Retry Engine Integration Tests

=============================================================================

class TestRetryEngineIntegration: """Integration tests for retry engine with delegation."""

@pytest.fixture
def orchestrator_with_retry(
self,
local_message_queue,
local_task_queue,
local_discovery,
local_router,
):
"""Orchestrator with retry engine."""
retry = RetryEngine(RetryConfig(
max_attempts=3,
base_delay=0.01,
jitter=JitterType.NONE,
))
return AgentOrchestrator(
message_queue=local_message_queue,
task_queue=local_task_queue,
discovery=local_discovery,
router=local_router,
retry_engine=retry,
)

@pytest.mark.asyncio
async def test_successful_delegation_with_retry(
self,
orchestrator_with_retry,
sample_component,
):
"""Test successful delegation doesn't retry."""
await orchestrator_with_retry.discovery.register(sample_component)

task_id = await orchestrator_with_retry.delegate_with_retry(
from_agent="test",
capability="code_review",
task_description="Test",
inputs={},
)

assert task_id is not None
# Should have succeeded on first try
metrics = orchestrator_with_retry.retry_engine.get_metrics()
assert metrics.total_retries == 0

@pytest.mark.asyncio
async def test_retry_exhausted(self, orchestrator_with_retry):
"""Test retry eventually gives up."""
with pytest.raises(MaxRetriesExceeded):
await orchestrator_with_retry.delegate_with_retry(
from_agent="test",
capability="nonexistent",
task_description="Will always fail",
inputs={},
)

def test_retry_config_validation(self):
"""Test retry configuration validation."""
# Valid config
config = RetryConfig(max_attempts=5, base_delay=1.0)
assert config.max_attempts == 5

# Invalid config - should raise
with pytest.raises(ValueError):
RetryConfig(max_attempts=0)

=============================================================================

8. Fault Tolerance Tests

=============================================================================

class TestFaultTolerance: """Tests for fault tolerance scenarios."""

@pytest.mark.asyncio
async def test_agent_unavailable_fallback(self, local_discovery):
"""Test fallback when primary agent is unavailable."""
# Register primary (offline) and fallback agents
primary = Component(
id="agent/primary-reviewer",
name="primary-reviewer",
component_type="agent",
capabilities=[Capability(name="code_review")],
status=ComponentStatus.OFFLINE,
)
fallback = Component(
id="agent/fallback-reviewer",
name="fallback-reviewer",
component_type="agent",
capabilities=[Capability(name="code_review")],
status=ComponentStatus.AVAILABLE,
)

await local_discovery.register(primary)
await local_discovery.register(fallback)

# Find available agents - default filters to AVAILABLE status
result = await local_discovery.find_by_capability("code_review")
available = result.components

assert len(available) == 1
assert available[0].name == "fallback-reviewer"

def test_task_queue_resilience(self, local_task_queue):
"""Test task queue handles errors gracefully."""
task = Task(description="Test task")
local_task_queue.enqueue(task)

# Try to complete non-existent task - should not crash
local_task_queue.complete("non-existent-id")

# Original task should still be in queue
stats = local_task_queue.get_stats()
assert stats["ready"] >= 1

def test_message_queue_resilience(self, local_message_queue):
"""Test message queue handles errors gracefully."""
# Publish to non-subscribed queue - should not crash
local_message_queue.publish("unknown-agent", AgentMessage(
from_agent="test",
to_agent="unknown-agent",
task_id="test",
))

# Should have created the queue
q = local_message_queue.get_queue("unknown-agent")
assert q is not None

=============================================================================

9. Performance Tests

=============================================================================

class TestPerformance: """Performance tests for delegation system."""

@pytest.mark.asyncio
async def test_throughput(
self,
local_message_queue,
local_task_queue,
local_discovery,
local_router,
sample_component,
):
"""Test system throughput."""
orchestrator = AgentOrchestrator(
message_queue=local_message_queue,
task_queue=local_task_queue,
discovery=local_discovery,
router=local_router,
)

await local_discovery.register(sample_component)

num_tasks = 100
start_time = time.time()

for i in range(num_tasks):
await orchestrator.delegate_task(
from_agent="perf-test",
capability="code_review",
task_description=f"Task {i}",
inputs={"index": i},
)

elapsed = time.time() - start_time
throughput = num_tasks / elapsed

# Should handle at least 100 delegations per second locally
assert throughput > 100, f"Throughput too low: {throughput:.2f} tasks/sec"

@pytest.mark.asyncio
async def test_concurrent_delegations(
self,
local_message_queue,
local_task_queue,
local_discovery,
local_router,
sample_component,
):
"""Test concurrent task delegations."""
orchestrator = AgentOrchestrator(
message_queue=local_message_queue,
task_queue=local_task_queue,
discovery=local_discovery,
router=local_router,
)

await local_discovery.register(sample_component)

async def delegate_one(i):
return await orchestrator.delegate_task(
from_agent="concurrent-test",
capability="code_review",
task_description=f"Concurrent task {i}",
inputs={"index": i},
)

# Run 50 concurrent delegations
tasks = [delegate_one(i) for i in range(50)]
results = await asyncio.gather(*tasks)

# All should succeed
assert len(results) == 50
assert all(r is not None for r in results)

def test_router_performance(self, local_router):
"""Test router enqueue/dequeue performance."""
num_tasks = 1000

start_time = time.time()
for i in range(num_tasks):
task = Task(description=f"Task {i}", priority=TaskPriority.NORMAL.value)
local_router.enqueue(task, QUEUE_NORMAL)
enqueue_time = time.time() - start_time

start_time = time.time()
for _ in range(num_tasks):
local_router.dequeue()
dequeue_time = time.time() - start_time

# Should be fast locally
assert enqueue_time < 1.0, f"Enqueue too slow: {enqueue_time:.2f}s"
assert dequeue_time < 1.0, f"Dequeue too slow: {dequeue_time:.2f}s"

=============================================================================

10. End-to-End Scenarios

=============================================================================

class TestEndToEndScenarios: """End-to-end test scenarios."""

@pytest.mark.asyncio
async def test_code_review_workflow(
self,
local_message_queue,
local_task_queue,
local_discovery,
local_router,
):
"""Test complete code review workflow."""
# Setup agents
code_analyzer = Component(
id="agent/code-analyzer",
name="code-analyzer",
component_type="agent",
capabilities=[Capability(name="code_analysis")],
)
security_scanner = Component(
id="agent/security-scanner",
name="security-scanner",
component_type="agent",
capabilities=[Capability(name="security_scan")],
)
reviewer = Component(
id="agent/reviewer",
name="reviewer",
component_type="agent",
capabilities=[Capability(name="code_review")],
)

await local_discovery.register(code_analyzer)
await local_discovery.register(security_scanner)
await local_discovery.register(reviewer)

orchestrator = AgentOrchestrator(
message_queue=local_message_queue,
task_queue=local_task_queue,
discovery=local_discovery,
router=local_router,
)

# Step 1: Analyze code
analysis_task = await orchestrator.delegate_task(
from_agent="user",
capability="code_analysis",
task_description="Analyze PR #456",
inputs={"pr": 456, "repo": "coditect/core"},
)

# Step 2: Security scan
security_task = await orchestrator.delegate_task(
from_agent="code-analyzer",
capability="security_scan",
task_description="Security scan for PR #456",
inputs={"analysis_id": analysis_task},
)

# Step 3: Final review
review_task = await orchestrator.delegate_task(
from_agent="security-scanner",
capability="code_review",
task_description="Final review for PR #456",
inputs={
"analysis_id": analysis_task,
"security_id": security_task,
},
)

# Verify workflow
assert len(orchestrator.delegation_log) == 3
assert orchestrator.delegation_log[0]["to_agent"] == "code-analyzer"
assert orchestrator.delegation_log[1]["to_agent"] == "security-scanner"
assert orchestrator.delegation_log[2]["to_agent"] == "reviewer"

@pytest.mark.asyncio
async def test_multi_agent_parallel_processing(
self,
local_message_queue,
local_task_queue,
local_discovery,
local_router,
):
"""Test parallel processing across multiple agents."""
# Register multiple specialists
for i in range(3):
agent = Component(
id=f"agent/processor-{i}",
name=f"processor-{i}",
component_type="agent",
capabilities=[Capability(name="data_processing")],
)
await local_discovery.register(agent)

orchestrator = AgentOrchestrator(
message_queue=local_message_queue,
task_queue=local_task_queue,
discovery=local_discovery,
router=local_router,
)

# Delegate multiple tasks in parallel
async def delegate_batch():
tasks = []
for i in range(10):
task_id = await orchestrator.delegate_task(
from_agent="coordinator",
capability="data_processing",
task_description=f"Process chunk {i}",
inputs={"chunk": i, "total": 10},
)
tasks.append(task_id)
return tasks

results = await delegate_batch()

assert len(results) == 10
assert len(orchestrator.delegation_log) == 10

@pytest.mark.asyncio
async def test_priority_escalation_flow(
self,
local_message_queue,
local_task_queue,
local_discovery,
local_router,
sample_component,
):
"""Test task priority escalation."""
await local_discovery.register(sample_component)

orchestrator = AgentOrchestrator(
message_queue=local_message_queue,
task_queue=local_task_queue,
discovery=local_discovery,
router=local_router,
)

# Start with normal priority
task1 = await orchestrator.delegate_task(
from_agent="user",
capability="code_review",
task_description="Normal review",
inputs={},
priority=TaskPriority.NORMAL.value,
)

# Escalate to critical
task2 = await orchestrator.delegate_task(
from_agent="user",
capability="code_review",
task_description="Urgent fix needed",
inputs={"escalated_from": task1},
priority=TaskPriority.CRITICAL.value,
)

# Verify priorities
assert orchestrator.delegation_log[0]["priority"] == TaskPriority.NORMAL.value
assert orchestrator.delegation_log[1]["priority"] == TaskPriority.CRITICAL.value
assert orchestrator.delegation_log[0]["queue"] == QUEUE_NORMAL
assert orchestrator.delegation_log[1]["queue"] == QUEUE_CRITICAL

=============================================================================

Run tests

=============================================================================

if name == "main": pytest.main([file, "-v", "--tb=short"])