Skip to main content

#!/usr/bin/env python3 """ CODITECT Task Dispatch Latency Validation

Part of Track H.2.7: Validate <5s task dispatch latency Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications

This module validates that the task dispatch latency meets the <5s SLA across all H.2 components in various scenarios.

Target SLA: <5000ms (5 seconds) end-to-end task dispatch latency

Metrics Collected: - End-to-end dispatch latency (task submission → agent receives) - Component-level latencies (Discovery, MessageBus, TaskQueue, Router) - Percentiles: P50, P95, P99, P99.9 - Latency under load (concurrent requests) - Latency variance and standard deviation

Test Scenarios: 1. Single task dispatch (baseline) 2. Sequential task dispatch (N tasks) 3. Concurrent task dispatch (parallel load) 4. Component-level latency breakdown 5. Stress test (sustained load) 6. Burst test (sudden spike)

Run: pytest scripts/core/test_dispatch_latency.py -v python scripts/core/test_dispatch_latency.py --benchmark

Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0 """

import asyncio import argparse import json import logging import os import statistics import sys import time from dataclasses import dataclass, field, asdict from datetime import datetime from pathlib import Path from typing import Dict, List, Any, Optional, Callable, Tuple from concurrent.futures import ThreadPoolExecutor import pytest

Add project root to path for imports

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(file))))

Import H.2 components

from scripts.core.message_bus import ( LocalMessageQueue, AgentMessage, MessageType, MessagePriority, ) from scripts.core.task_queue_manager import ( LocalTaskQueue, Task, TaskStatus, TaskPriority, ) from scripts.core.priority_queue_router import ( LocalPriorityQueueRouter, RoutingRule, ) from scripts.core.discovery_service import ( DiscoveryService, Component, Capability, ComponentStatus, DiscoveryResult, ) from scripts.core.circuit_breaker import ( CircuitBreaker, CircuitBreakerConfig, ) from scripts.core.retry_engine import ( RetryEngine, RetryConfig, JitterType, )

=============================================================================

Constants

=============================================================================

Target SLA: 5 seconds (5000ms)

TARGET_LATENCY_MS = 5000

Warning threshold: 80% of SLA

WARNING_LATENCY_MS = 4000

Component latency budgets (should sum to < TARGET_LATENCY_MS)

LATENCY_BUDGET = { "discovery": 500, # 500ms for agent discovery "routing": 100, # 100ms for queue routing "task_queue": 200, # 200ms for task queue operations "message_bus": 300, # 300ms for message delivery "overhead": 100, # 100ms for orchestration overhead # Total budget: 1200ms (well under 5000ms target) }

Queue names

QUEUE_CRITICAL = "critical" QUEUE_HIGH = "high" QUEUE_NORMAL = "normal" QUEUE_BACKGROUND = "background"

Configure logging

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

=============================================================================

Data Classes

=============================================================================

@dataclass class LatencyMeasurement: """Single latency measurement with breakdown.""" total_ms: float discovery_ms: float = 0.0 routing_ms: float = 0.0 task_queue_ms: float = 0.0 message_bus_ms: float = 0.0 overhead_ms: float = 0.0 timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) task_id: str = "" success: bool = True error: Optional[str] = None

@dataclass class LatencyStats: """Statistical summary of latency measurements.""" count: int min_ms: float max_ms: float mean_ms: float median_ms: float std_dev_ms: float p50_ms: float p95_ms: float p99_ms: float p999_ms: float within_sla: int sla_percentage: float

def to_dict(self) -> Dict[str, Any]:
return asdict(self)

@dataclass class BenchmarkResult: """Complete benchmark result.""" scenario: str target_sla_ms: float stats: LatencyStats component_stats: Dict[str, LatencyStats] measurements: List[LatencyMeasurement] passed: bool timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) duration_seconds: float = 0.0 throughput_per_second: float = 0.0

def to_dict(self) -> Dict[str, Any]:
return {
"scenario": self.scenario,
"target_sla_ms": self.target_sla_ms,
"stats": self.stats.to_dict(),
"component_stats": {k: v.to_dict() for k, v in self.component_stats.items()},
"measurement_count": len(self.measurements),
"passed": self.passed,
"timestamp": self.timestamp,
"duration_seconds": self.duration_seconds,
"throughput_per_second": self.throughput_per_second,
}

=============================================================================

Latency Measurement Utilities

=============================================================================

def calculate_stats(measurements: List[float]) -> LatencyStats: """Calculate statistical summary of latency measurements.""" if not measurements: return LatencyStats( count=0, min_ms=0, max_ms=0, mean_ms=0, median_ms=0, std_dev_ms=0, p50_ms=0, p95_ms=0, p99_ms=0, p999_ms=0, within_sla=0, sla_percentage=0.0 )

sorted_measurements = sorted(measurements)
n = len(sorted_measurements)

within_sla = sum(1 for m in measurements if m <= TARGET_LATENCY_MS)

def percentile(data: List[float], p: float) -> float:
"""Calculate percentile value."""
k = (len(data) - 1) * (p / 100)
f = int(k)
c = f + 1
if c >= len(data):
return data[-1]
return data[f] + (k - f) * (data[c] - data[f])

return LatencyStats(
count=n,
min_ms=min(measurements),
max_ms=max(measurements),
mean_ms=statistics.mean(measurements),
median_ms=statistics.median(measurements),
std_dev_ms=statistics.stdev(measurements) if n > 1 else 0.0,
p50_ms=percentile(sorted_measurements, 50),
p95_ms=percentile(sorted_measurements, 95),
p99_ms=percentile(sorted_measurements, 99),
p999_ms=percentile(sorted_measurements, 99.9),
within_sla=within_sla,
sla_percentage=(within_sla / n) * 100 if n > 0 else 0.0,
)

class LatencyTimer: """Context manager for measuring latency with component breakdown."""

def __init__(self):
self.start_time: float = 0
self.end_time: float = 0
self.checkpoints: Dict[str, float] = {}
self._checkpoint_order: List[str] = []

def __enter__(self):
self.start_time = time.perf_counter()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.end_time = time.perf_counter()
return False

def checkpoint(self, name: str):
"""Record a checkpoint time."""
self.checkpoints[name] = time.perf_counter()
self._checkpoint_order.append(name)

@property
def total_ms(self) -> float:
"""Total elapsed time in milliseconds."""
return (self.end_time - self.start_time) * 1000

def get_segment_ms(self, name: str) -> float:
"""Get time for a specific segment in milliseconds."""
if name not in self.checkpoints:
return 0.0

idx = self._checkpoint_order.index(name)
if idx == 0:
start = self.start_time
else:
prev_name = self._checkpoint_order[idx - 1]
start = self.checkpoints[prev_name]

return (self.checkpoints[name] - start) * 1000

=============================================================================

Latency Validator

=============================================================================

class DispatchLatencyValidator: """ Validates task dispatch latency against SLA targets.

Measures the full dispatch flow:
1. Discovery: Find agent by capability
2. Routing: Route task to appropriate queue
3. TaskQueue: Enqueue task for tracking
4. MessageBus: Send message to agent
"""

def __init__(
self,
message_queue: Optional[LocalMessageQueue] = None,
task_queue: Optional[LocalTaskQueue] = None,
discovery: Optional[DiscoveryService] = None,
router: Optional[LocalPriorityQueueRouter] = None,
):
self.message_queue = message_queue or LocalMessageQueue()
self.task_queue = task_queue or LocalTaskQueue()
self.discovery = discovery or DiscoveryService(force_local=True)
self.router = router or LocalPriorityQueueRouter()
self.measurements: List[LatencyMeasurement] = []

def _get_queue_for_priority(self, priority: int) -> str:
"""Determine queue name based on priority."""
if priority >= TaskPriority.CRITICAL.value:
return QUEUE_CRITICAL
elif priority >= TaskPriority.HIGH.value:
return QUEUE_HIGH
elif priority >= TaskPriority.LOW.value:
return QUEUE_NORMAL
else:
return QUEUE_BACKGROUND

async def measure_dispatch(
self,
capability: str,
task_description: str,
priority: int = TaskPriority.NORMAL.value,
) -> LatencyMeasurement:
"""
Measure latency of a single task dispatch.

Returns detailed breakdown of component latencies.
"""
timer = LatencyTimer()
measurement = LatencyMeasurement(total_ms=0)

try:
with timer:
# 1. Discovery: Find agent by capability
result = await self.discovery.find_by_capability(capability)
timer.checkpoint("discovery")

agents = result.components if isinstance(result, DiscoveryResult) else result
if not agents:
raise ValueError(f"No agent found with capability: {capability}")

target_agent = agents[0]

# 2. Create task and determine queue
task = Task(
description=task_description,
agent=target_agent.name,
priority=priority,
)
queue_name = self._get_queue_for_priority(priority)

# 3. Routing: Enqueue to priority router
self.router.enqueue(task, queue_name)
timer.checkpoint("routing")

# 4. TaskQueue: Add to task queue for tracking
self.task_queue.enqueue(task, priority=priority)
timer.checkpoint("task_queue")

# 5. MessageBus: Send message to agent
message = AgentMessage(
from_agent="latency-validator",
to_agent=target_agent.name,
task_id=task.id,
message_type=MessageType.TASK_REQUEST.value,
payload={"task": task.to_dict()},
priority=priority,
)
self.message_queue.publish(target_agent.name, message)
timer.checkpoint("message_bus")

# Calculate component latencies
measurement = LatencyMeasurement(
total_ms=timer.total_ms,
discovery_ms=timer.get_segment_ms("discovery"),
routing_ms=timer.get_segment_ms("routing"),
task_queue_ms=timer.get_segment_ms("task_queue"),
message_bus_ms=timer.get_segment_ms("message_bus"),
overhead_ms=timer.total_ms - sum([
timer.get_segment_ms("discovery"),
timer.get_segment_ms("routing"),
timer.get_segment_ms("task_queue"),
timer.get_segment_ms("message_bus"),
]),
task_id=task.id,
success=True,
)

except Exception as e:
measurement = LatencyMeasurement(
total_ms=timer.total_ms if timer.end_time else 0,
success=False,
error=str(e),
)

self.measurements.append(measurement)
return measurement

async def run_benchmark(
self,
scenario: str,
num_tasks: int,
capability: str,
concurrent: bool = False,
concurrency: int = 10,
) -> BenchmarkResult:
"""
Run a latency benchmark scenario.

Args:
scenario: Name of the benchmark scenario
num_tasks: Number of tasks to dispatch
capability: Capability to search for
concurrent: Whether to dispatch concurrently
concurrency: Number of concurrent tasks

Returns:
BenchmarkResult with statistics
"""
self.measurements = [] # Reset measurements
start_time = time.time()

if concurrent:
# Concurrent dispatch
semaphore = asyncio.Semaphore(concurrency)

async def dispatch_with_limit(i: int):
async with semaphore:
return await self.measure_dispatch(
capability=capability,
task_description=f"Benchmark task {i}",
)

await asyncio.gather(*[dispatch_with_limit(i) for i in range(num_tasks)])
else:
# Sequential dispatch
for i in range(num_tasks):
await self.measure_dispatch(
capability=capability,
task_description=f"Benchmark task {i}",
)

duration = time.time() - start_time

# Calculate statistics
total_latencies = [m.total_ms for m in self.measurements if m.success]
stats = calculate_stats(total_latencies)

# Calculate component statistics
component_stats = {}
for component in ["discovery", "routing", "task_queue", "message_bus", "overhead"]:
component_latencies = [
getattr(m, f"{component}_ms")
for m in self.measurements if m.success
]
component_stats[component] = calculate_stats(component_latencies)

# Determine if benchmark passed
passed = stats.p99_ms <= TARGET_LATENCY_MS and stats.sla_percentage >= 99.0

return BenchmarkResult(
scenario=scenario,
target_sla_ms=TARGET_LATENCY_MS,
stats=stats,
component_stats=component_stats,
measurements=self.measurements,
passed=passed,
duration_seconds=duration,
throughput_per_second=num_tasks / duration if duration > 0 else 0,
)

def get_stats(self) -> LatencyStats:
"""Get statistics for all measurements."""
total_latencies = [m.total_ms for m in self.measurements if m.success]
return calculate_stats(total_latencies)

def reset(self):
"""Reset all measurements."""
self.measurements = []

=============================================================================

Test Fixtures

=============================================================================

@pytest.fixture def message_queue(): """Local message queue for testing.""" return LocalMessageQueue()

@pytest.fixture def task_queue(): """Local task queue for testing.""" return LocalTaskQueue()

@pytest.fixture def discovery(): """Local discovery service for testing.""" return DiscoveryService(force_local=True)

@pytest.fixture def router(): """Local priority queue router for testing.""" return LocalPriorityQueueRouter()

@pytest.fixture def sample_agent(): """Sample agent component for testing.""" return Component( id="agent/benchmark-agent", name="benchmark-agent", component_type="agent", capabilities=[ Capability(name="benchmark", description="Benchmark testing"), Capability(name="code_review", description="Code review"), ], status=ComponentStatus.AVAILABLE, max_concurrency=100, )

@pytest.fixture async def validator(message_queue, task_queue, discovery, router, sample_agent): """Configured latency validator with sample agent.""" await discovery.register(sample_agent) return DispatchLatencyValidator( message_queue=message_queue, task_queue=task_queue, discovery=discovery, router=router, )

=============================================================================

Test Cases

=============================================================================

class TestDispatchLatencySLA: """Tests for task dispatch latency SLA compliance."""

@pytest.mark.asyncio
async def test_single_dispatch_under_sla(self, validator):
"""Test single task dispatch is under SLA."""
measurement = await validator.measure_dispatch(
capability="benchmark",
task_description="Single dispatch test",
)

assert measurement.success, f"Dispatch failed: {measurement.error}"
assert measurement.total_ms < TARGET_LATENCY_MS, \
f"Latency {measurement.total_ms:.2f}ms exceeds SLA {TARGET_LATENCY_MS}ms"

@pytest.mark.asyncio
async def test_sequential_dispatch_p99_under_sla(self, validator):
"""Test P99 latency for sequential dispatch is under SLA."""
result = await validator.run_benchmark(
scenario="sequential_100",
num_tasks=100,
capability="benchmark",
concurrent=False,
)

assert result.stats.p99_ms < TARGET_LATENCY_MS, \
f"P99 latency {result.stats.p99_ms:.2f}ms exceeds SLA {TARGET_LATENCY_MS}ms"
assert result.stats.sla_percentage >= 99.0, \
f"SLA compliance {result.stats.sla_percentage:.1f}% below 99%"

@pytest.mark.asyncio
async def test_concurrent_dispatch_p99_under_sla(self, validator):
"""Test P99 latency for concurrent dispatch is under SLA."""
result = await validator.run_benchmark(
scenario="concurrent_100",
num_tasks=100,
capability="benchmark",
concurrent=True,
concurrency=20,
)

assert result.stats.p99_ms < TARGET_LATENCY_MS, \
f"P99 latency {result.stats.p99_ms:.2f}ms exceeds SLA {TARGET_LATENCY_MS}ms"
assert result.stats.sla_percentage >= 99.0, \
f"SLA compliance {result.stats.sla_percentage:.1f}% below 99%"

@pytest.mark.asyncio
async def test_high_concurrency_p99_under_sla(self, validator):
"""Test P99 latency under high concurrency is under SLA."""
result = await validator.run_benchmark(
scenario="high_concurrency_200",
num_tasks=200,
capability="benchmark",
concurrent=True,
concurrency=50,
)

assert result.stats.p99_ms < TARGET_LATENCY_MS, \
f"P99 latency {result.stats.p99_ms:.2f}ms exceeds SLA {TARGET_LATENCY_MS}ms"

@pytest.mark.asyncio
async def test_burst_load_p99_under_sla(self, validator):
"""Test P99 latency under burst load is under SLA."""
# Simulate burst: 50 concurrent requests
result = await validator.run_benchmark(
scenario="burst_50",
num_tasks=50,
capability="benchmark",
concurrent=True,
concurrency=50, # All at once
)

assert result.stats.p99_ms < TARGET_LATENCY_MS, \
f"Burst P99 latency {result.stats.p99_ms:.2f}ms exceeds SLA {TARGET_LATENCY_MS}ms"

class TestComponentLatencyBudgets: """Tests for individual component latency budgets."""

@pytest.mark.asyncio
async def test_discovery_latency_budget(self, validator):
"""Test discovery latency is within budget."""
result = await validator.run_benchmark(
scenario="discovery_budget",
num_tasks=50,
capability="benchmark",
concurrent=False,
)

discovery_p99 = result.component_stats["discovery"].p99_ms
budget = LATENCY_BUDGET["discovery"]

assert discovery_p99 < budget, \
f"Discovery P99 {discovery_p99:.2f}ms exceeds budget {budget}ms"

@pytest.mark.asyncio
async def test_routing_latency_budget(self, validator):
"""Test routing latency is within budget."""
result = await validator.run_benchmark(
scenario="routing_budget",
num_tasks=50,
capability="benchmark",
concurrent=False,
)

routing_p99 = result.component_stats["routing"].p99_ms
budget = LATENCY_BUDGET["routing"]

assert routing_p99 < budget, \
f"Routing P99 {routing_p99:.2f}ms exceeds budget {budget}ms"

@pytest.mark.asyncio
async def test_task_queue_latency_budget(self, validator):
"""Test task queue latency is within budget."""
result = await validator.run_benchmark(
scenario="task_queue_budget",
num_tasks=50,
capability="benchmark",
concurrent=False,
)

task_queue_p99 = result.component_stats["task_queue"].p99_ms
budget = LATENCY_BUDGET["task_queue"]

assert task_queue_p99 < budget, \
f"Task queue P99 {task_queue_p99:.2f}ms exceeds budget {budget}ms"

@pytest.mark.asyncio
async def test_message_bus_latency_budget(self, validator):
"""Test message bus latency is within budget."""
result = await validator.run_benchmark(
scenario="message_bus_budget",
num_tasks=50,
capability="benchmark",
concurrent=False,
)

message_bus_p99 = result.component_stats["message_bus"].p99_ms
budget = LATENCY_BUDGET["message_bus"]

assert message_bus_p99 < budget, \
f"Message bus P99 {message_bus_p99:.2f}ms exceeds budget {budget}ms"

class TestLatencyPercentiles: """Tests for latency percentile targets."""

@pytest.mark.asyncio
async def test_p50_well_under_sla(self, validator):
"""Test P50 (median) latency is well under SLA."""
result = await validator.run_benchmark(
scenario="percentiles_p50",
num_tasks=100,
capability="benchmark",
concurrent=False,
)

# P50 should be well under SLA (< 50% of target)
assert result.stats.p50_ms < TARGET_LATENCY_MS * 0.5, \
f"P50 {result.stats.p50_ms:.2f}ms exceeds 50% of SLA"

@pytest.mark.asyncio
async def test_p95_under_sla(self, validator):
"""Test P95 latency is under SLA."""
result = await validator.run_benchmark(
scenario="percentiles_p95",
num_tasks=100,
capability="benchmark",
concurrent=False,
)

assert result.stats.p95_ms < TARGET_LATENCY_MS, \
f"P95 {result.stats.p95_ms:.2f}ms exceeds SLA {TARGET_LATENCY_MS}ms"

@pytest.mark.asyncio
async def test_p999_reasonable(self, validator):
"""Test P99.9 latency is reasonable (< 2x SLA)."""
result = await validator.run_benchmark(
scenario="percentiles_p999",
num_tasks=200,
capability="benchmark",
concurrent=False,
)

# P99.9 can be higher but should still be reasonable
assert result.stats.p999_ms < TARGET_LATENCY_MS * 2, \
f"P99.9 {result.stats.p999_ms:.2f}ms exceeds 2x SLA"

class TestLatencyVariance: """Tests for latency variance and stability."""

@pytest.mark.asyncio
async def test_low_variance(self, validator):
"""Test latency has low variance (predictable)."""
result = await validator.run_benchmark(
scenario="variance",
num_tasks=100,
capability="benchmark",
concurrent=False,
)

# Standard deviation should be less than 50% of mean
if result.stats.mean_ms > 0:
cv = result.stats.std_dev_ms / result.stats.mean_ms # Coefficient of variation
assert cv < 0.5, f"Latency coefficient of variation {cv:.2f} too high"

@pytest.mark.asyncio
async def test_consistent_under_load(self, validator):
"""Test latency consistency under sustained load."""
# Run three batches and compare
results = []
for i in range(3):
result = await validator.run_benchmark(
scenario=f"consistency_batch_{i}",
num_tasks=50,
capability="benchmark",
concurrent=False,
)
results.append(result.stats.median_ms)

# Batches should have similar median latencies (within 50%)
max_median = max(results)
min_median = min(results)
if min_median > 0:
variance_ratio = max_median / min_median
assert variance_ratio < 1.5, \
f"Batch variance ratio {variance_ratio:.2f} too high"

class TestPriorityLatency: """Tests for latency by task priority."""

@pytest.mark.asyncio
async def test_critical_priority_fastest(self, validator):
"""Test critical priority tasks have lowest latency."""
# Dispatch tasks with different priorities
priorities = [
(TaskPriority.CRITICAL.value, []),
(TaskPriority.HIGH.value, []),
(TaskPriority.NORMAL.value, []),
]

for priority, measurements in priorities:
for _ in range(20):
m = await validator.measure_dispatch(
capability="benchmark",
task_description=f"Priority {priority} task",
priority=priority,
)
measurements.append(m.total_ms)

# Critical should be fastest or comparable
critical_median = statistics.median(priorities[0][1])
normal_median = statistics.median(priorities[2][1])

# Critical shouldn't be significantly slower than normal
assert critical_median <= normal_median * 1.2, \
f"Critical priority slower than normal: {critical_median:.2f}ms vs {normal_median:.2f}ms"

class TestThroughput: """Tests for throughput under latency SLA."""

@pytest.mark.asyncio
async def test_minimum_throughput(self, validator):
"""Test minimum throughput while meeting SLA."""
result = await validator.run_benchmark(
scenario="throughput",
num_tasks=100,
capability="benchmark",
concurrent=True,
concurrency=20,
)

# Should achieve at least 50 tasks/second while meeting SLA
assert result.throughput_per_second >= 50, \
f"Throughput {result.throughput_per_second:.1f}/s below minimum 50/s"
assert result.passed, "SLA not met while measuring throughput"

@pytest.mark.asyncio
async def test_high_throughput(self, validator):
"""Test high throughput capability."""
result = await validator.run_benchmark(
scenario="high_throughput",
num_tasks=500,
capability="benchmark",
concurrent=True,
concurrency=50,
)

# Should achieve at least 100 tasks/second
assert result.throughput_per_second >= 100, \
f"Throughput {result.throughput_per_second:.1f}/s below target 100/s"

=============================================================================

Benchmark CLI

=============================================================================

async def run_full_benchmark(output_file: Optional[str] = None): """Run full latency benchmark suite.""" print("=" * 70) print("CODITECT Task Dispatch Latency Benchmark") print(f"Target SLA: <{TARGET_LATENCY_MS}ms") print("=" * 70) print()

# Setup
discovery = DiscoveryService(force_local=True)
agent = Component(
id="agent/benchmark-agent",
name="benchmark-agent",
component_type="agent",
capabilities=[
Capability(name="benchmark", description="Benchmark testing"),
],
status=ComponentStatus.AVAILABLE,
max_concurrency=100,
)
await discovery.register(agent)

validator = DispatchLatencyValidator(
discovery=discovery,
)

results = []

# Benchmark scenarios
scenarios = [
("Single Dispatch (Baseline)", 1, False, 1),
("Sequential 100 Tasks", 100, False, 1),
("Concurrent 100 (20 parallel)", 100, True, 20),
("Concurrent 200 (50 parallel)", 200, True, 50),
("Burst 50 (all at once)", 50, True, 50),
("High Throughput 500", 500, True, 50),
]

for name, num_tasks, concurrent, concurrency in scenarios:
print(f"\n{name}")
print("-" * 50)

result = await validator.run_benchmark(
scenario=name.lower().replace(" ", "_").replace("(", "").replace(")", ""),
num_tasks=num_tasks,
capability="benchmark",
concurrent=concurrent,
concurrency=concurrency,
)
results.append(result)

# Print results
print(f" Tasks: {result.stats.count}")
print(f" Duration: {result.duration_seconds:.2f}s")
print(f" Throughput: {result.throughput_per_second:.1f} tasks/s")
print()
print(f" Latency (ms):")
print(f" Min: {result.stats.min_ms:.2f}")
print(f" P50: {result.stats.p50_ms:.2f}")
print(f" P95: {result.stats.p95_ms:.2f}")
print(f" P99: {result.stats.p99_ms:.2f}")
print(f" Max: {result.stats.max_ms:.2f}")
print()
print(f" SLA Compliance: {result.stats.sla_percentage:.1f}% within {TARGET_LATENCY_MS}ms")

status = "PASS" if result.passed else "FAIL"
color = "\033[92m" if result.passed else "\033[91m"
print(f" Status: {color}{status}\033[0m")

# Component breakdown
print("\n" + "=" * 70)
print("Component Latency Breakdown (P99)")
print("=" * 70)

last_result = results[-2] # Use concurrent 200 for breakdown
for component, budget in LATENCY_BUDGET.items():
if component in last_result.component_stats:
stats = last_result.component_stats[component]
status = "OK" if stats.p99_ms < budget else "OVER"
color = "\033[92m" if stats.p99_ms < budget else "\033[91m"
print(f" {component:15} P99: {stats.p99_ms:8.2f}ms Budget: {budget:6}ms [{color}{status}\033[0m]")

# Summary
print("\n" + "=" * 70)
print("Summary")
print("=" * 70)

passed = sum(1 for r in results if r.passed)
total = len(results)

print(f" Scenarios Passed: {passed}/{total}")
print(f" Target SLA: <{TARGET_LATENCY_MS}ms (99th percentile)")

if passed == total:
print("\n \033[92mALL BENCHMARKS PASSED - SLA VALIDATED\033[0m")
else:
print(f"\n \033[91m{total - passed} BENCHMARKS FAILED\033[0m")

# Save results
if output_file:
output_data = {
"timestamp": datetime.utcnow().isoformat(),
"target_sla_ms": TARGET_LATENCY_MS,
"latency_budgets": LATENCY_BUDGET,
"results": [r.to_dict() for r in results],
"summary": {
"passed": passed,
"total": total,
"sla_validated": passed == total,
},
}

with open(output_file, "w") as f:
json.dump(output_data, f, indent=2)
print(f"\n Results saved to: {output_file}")

return passed == total

def main(): """CLI entry point.""" parser = argparse.ArgumentParser( description="CODITECT Task Dispatch Latency Validation" ) parser.add_argument( "--benchmark", action="store_true", help="Run full benchmark suite", ) parser.add_argument( "--output", "-o", type=str, help="Output file for benchmark results (JSON)", ) parser.add_argument( "--quick", action="store_true", help="Run quick validation (fewer tasks)", )

args = parser.parse_args()

if args.benchmark:
success = asyncio.run(run_full_benchmark(args.output))
sys.exit(0 if success else 1)
else:
# Run pytest
pytest.main([__file__, "-v", "--tb=short"])

if name == "main": main()