Skip to main content

CODITECT Agent-to-Agent Task Delegation Tests

Part of Track H.2.6: Test agent-to-agent task delegation Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications

This module provides comprehensive integration tests for the full agent-to-agent task delegation flow using all H.2 components:

  • H.2.1: MessageBus (message_bus.py)
  • H.2.2: TaskQueueManager (task_queue_manager.py)
  • H.2.3: PriorityQueueRouter (priority_queue_router.py)
  • H.2.4: CircuitBreaker (circuit_breaker.py)
  • H.2.5: RetryEngine (retry_engine.py)
  • Supporting: DiscoveryService (discovery_service.py)

Test Categories: 1. Message Bus Integration Tests 2. Task Queue Integration Tests 3. Discovery Service Integration Tests 4. Priority Queue Router Integration Tests 5. Full Delegation Flow Tests 6. Circuit Breaker Integration Tests 7. Retry Engine Integration Tests 8. Fault Tolerance Tests 9. Performance Tests 10. End-to-End Scenarios

Run: pytest scripts/core/test_agent_delegation.py -v

Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0

File: test_agent_delegation.py

Classes

AgentOrchestrator

Orchestrates agent-to-agent task delegation for testing.

TestMessageQueueIntegration

Integration tests for LocalMessageQueue.

TestTaskQueueIntegration

Integration tests for LocalTaskQueue (synchronous).

TestDiscoveryIntegration

Integration tests for DiscoveryService.

TestPriorityQueueRouterIntegration

Integration tests for LocalPriorityQueueRouter.

TestFullDelegationFlow

End-to-end tests for agent-to-agent task delegation.

TestCircuitBreakerIntegration

Integration tests for circuit breaker with delegation.

TestRetryEngineIntegration

Integration tests for retry engine with delegation.

TestFaultTolerance

Tests for fault tolerance scenarios.

TestPerformance

Performance tests for delegation system.

Functions

local_message_queue()

Local message queue for testing.

local_task_queue()

Local task queue for testing.

local_discovery()

Local discovery service for testing.

local_router()

Local priority queue router for testing.

circuit_breaker()

Circuit breaker for testing.

retry_engine()

Retry engine for testing.

sample_agent_message()

Sample agent message for testing.

sample_task()

Sample task for testing.

sample_component()

Sample component for testing.

test_publish_subscribe(local_message_queue)

Test basic publish/subscribe functionality.

test_multiple_subscribers(local_message_queue)

Test multiple agents subscribing to different queues.

test_message_to_json_roundtrip(sample_agent_message)

Test message serialization roundtrip.

test_publish_to_queue(local_message_queue)

Test publishing to queue creates entry.

test_enqueue_dequeue(local_task_queue, sample_task)

Test basic enqueue/dequeue functionality.

test_priority_ordering(local_task_queue)

Test tasks are dequeued by priority.

Usage

python test_agent_delegation.py