#!/usr/bin/env python3 """ Unit tests for CODITECT Priority Queue Router
Tests cover:
- Queue configuration and management
- Routing rules and task routing
- Dequeue strategies (strict, weighted, round-robin, fair-share)
- Priority boosting for aging tasks
- Statistics and monitoring
- Local fallback mode
Run with: python -m pytest scripts/core/test_priority_queue_router.py -v
Author: CODITECT Framework Created: January 8, 2026 """
import asyncio import pytest import time from unittest.mock import patch, MagicMock
Import module under test
import sys from pathlib import Path sys.path.insert(0, str(Path(file).parent.parent.parent))
from scripts.core.priority_queue_router import ( PriorityQueueRouter, LocalPriorityQueueRouter, QueueConfig, RoutingRule, DequeueStrategy, BoostPolicy, QueueStats, RouterStats, create_priority_router, create_routing_rule, ) from scripts.core.task_queue_manager import ( Task, TaskStatus, TaskPriority, )
============================================================================
Fixtures
============================================================================
@pytest.fixture def local_router(): """Create a local priority queue router for testing.""" return LocalPriorityQueueRouter()
@pytest.fixture def sample_task(): """Create a sample task.""" return Task( description="Test task", agent="test-agent", priority=TaskPriority.NORMAL.value, metadata={"test": True}, )
@pytest.fixture def critical_task(): """Create a critical priority task.""" return Task( description="Critical task", agent="critical-agent", priority=TaskPriority.CRITICAL.value, )
@pytest.fixture def background_task(): """Create a background priority task.""" return Task( description="Background task", agent="background-agent", priority=TaskPriority.BACKGROUND.value, )
============================================================================
Test: Queue Configuration
============================================================================
class TestQueueConfig: """Tests for QueueConfig dataclass."""
def test_default_queue_config(self):
"""Test default queue configuration values."""
config = QueueConfig(name="test-queue")
assert config.name == "test-queue"
assert config.weight == 1.0
assert config.max_capacity == 0 # Unlimited
assert config.priority_floor == 1
assert config.priority_ceiling == 10
assert config.boost_policy == BoostPolicy.NONE.value
def test_custom_queue_config(self):
"""Test custom queue configuration."""
config = QueueConfig(
name="high-priority",
weight=5.0,
max_capacity=100,
priority_floor=7,
priority_ceiling=10,
boost_policy=BoostPolicy.LINEAR.value,
dedicated_agents=["agent-a", "agent-b"],
)
assert config.weight == 5.0
assert config.max_capacity == 100
assert config.priority_floor == 7
assert config.dedicated_agents == ["agent-a", "agent-b"]
def test_queue_config_serialization(self):
"""Test queue config to_dict and from_dict."""
original = QueueConfig(
name="test",
weight=3.0,
boost_policy=BoostPolicy.EXPONENTIAL.value,
)
data = original.to_dict()
restored = QueueConfig.from_dict(data)
assert restored.name == original.name
assert restored.weight == original.weight
assert restored.boost_policy == original.boost_policy
============================================================================
Test: Routing Rules
============================================================================
class TestRoutingRules: """Tests for RoutingRule matching logic."""
def test_rule_with_lambda_condition(self, sample_task):
"""Test rule matching with lambda condition."""
rule = RoutingRule(
name="high-priority-rule",
target_queue="high",
condition=lambda t: t.priority >= 7,
)
sample_task.priority = 8
assert rule.matches(sample_task) is True
sample_task.priority = 5
assert rule.matches(sample_task) is False
def test_rule_with_expression(self, sample_task):
"""Test rule matching with string expression."""
rule = RoutingRule(
name="agent-rule",
target_queue="agent-specific",
condition_expr='agent == "test-agent"',
)
assert rule.matches(sample_task) is True
sample_task.agent = "other-agent"
assert rule.matches(sample_task) is False
def test_rule_with_metadata_expression(self, sample_task):
"""Test rule matching with metadata condition."""
rule = RoutingRule(
name="metadata-rule",
target_queue="special",
condition_expr='metadata.get("urgent", False) == True',
)
assert rule.matches(sample_task) is False
sample_task.metadata["urgent"] = True
assert rule.matches(sample_task) is True
def test_disabled_rule(self, sample_task):
"""Test that disabled rules never match."""
rule = RoutingRule(
name="disabled-rule",
target_queue="test",
condition=lambda t: True,
enabled=False,
)
assert rule.matches(sample_task) is False
def test_rule_serialization(self):
"""Test rule to_dict and from_dict."""
original = RoutingRule(
name="test-rule",
target_queue="test-queue",
priority=50,
condition_expr='priority > 5',
enabled=True,
)
data = original.to_dict()
restored = RoutingRule.from_dict(data)
assert restored.name == original.name
assert restored.target_queue == original.target_queue
assert restored.priority == original.priority
assert restored.condition_expr == original.condition_expr
============================================================================
Test: Local Router - Enqueueing
============================================================================
class TestLocalRouterEnqueue: """Tests for task enqueueing in local router."""
def test_enqueue_basic(self, local_router, sample_task):
"""Test basic task enqueueing."""
task_id, queue_name = local_router.enqueue(sample_task)
assert task_id == sample_task.id
assert queue_name == "normal" # Default for priority 5
def test_enqueue_critical_task(self, local_router, critical_task):
"""Test critical task routes to critical queue."""
task_id, queue_name = local_router.enqueue(critical_task)
assert queue_name == "critical"
def test_enqueue_background_task(self, local_router, background_task):
"""Test background task routes to background queue."""
task_id, queue_name = local_router.enqueue(background_task)
assert queue_name == "background"
def test_enqueue_to_specific_queue(self, local_router, sample_task):
"""Test forcing task to specific queue."""
task_id, queue_name = local_router.enqueue(
sample_task,
target_queue="high",
)
assert queue_name == "high"
def test_enqueue_updates_stats(self, local_router, sample_task):
"""Test that enqueueing updates statistics."""
initial_stats = local_router.get_stats()
initial_total = initial_stats.total_tasks
local_router.enqueue(sample_task)
new_stats = local_router.get_stats()
assert new_stats.total_tasks == initial_total + 1
============================================================================
Test: Local Router - Dequeueing
============================================================================
class TestLocalRouterDequeue: """Tests for task dequeueing in local router."""
def test_dequeue_empty(self, local_router):
"""Test dequeueing from empty queues."""
# Clear any default tasks
local_router._queues = {name: [] for name in local_router._queues}
result = local_router.dequeue()
assert result is None
def test_dequeue_strict_priority(self, local_router):
"""Test strict priority dequeue strategy."""
local_router.set_strategy(DequeueStrategy.STRICT_PRIORITY)
# Enqueue tasks at different priorities
low_task = Task(description="Low", agent="a", priority=2)
high_task = Task(description="High", agent="a", priority=8)
local_router.enqueue(low_task)
local_router.enqueue(high_task)
# Should get high priority first (from 'high' queue)
result = local_router.dequeue()
assert result is not None
task, queue = result
assert task.id == high_task.id
assert queue == "high"
def test_dequeue_round_robin(self, local_router):
"""Test round-robin dequeue strategy."""
local_router.set_strategy(DequeueStrategy.ROUND_ROBIN)
# Add tasks to multiple queues
tasks = []
for i, priority in enumerate([2, 5, 8]):
task = Task(description=f"Task {i}", agent="a", priority=priority)
local_router.enqueue(task)
tasks.append(task)
# Dequeue should rotate through queues
seen_queues = set()
for _ in range(3):
result = local_router.dequeue()
if result:
_, queue = result
seen_queues.add(queue)
# Should have hit multiple queues
assert len(seen_queues) >= 2
def test_dequeue_from_specific_queue(self, local_router, sample_task):
"""Test dequeueing from specific queue."""
local_router.enqueue(sample_task, target_queue="normal")
task = local_router.dequeue_from("normal")
assert task is not None
assert task.id == sample_task.id
assert task.status == TaskStatus.IN_PROGRESS.value
def test_dequeue_updates_task_status(self, local_router, sample_task):
"""Test that dequeue updates task status."""
local_router.enqueue(sample_task)
result = local_router.dequeue()
task, _ = result
assert task.status == TaskStatus.IN_PROGRESS.value
assert task.started_at is not None
============================================================================
Test: Local Router - Task Completion
============================================================================
class TestLocalRouterCompletion: """Tests for task completion and failure."""
def test_complete_task(self, local_router, sample_task):
"""Test marking task as complete."""
local_router.enqueue(sample_task)
result = local_router.dequeue()
task, _ = result
local_router.complete(task.id, result={"output": "success"})
stored_task = local_router.get_task(task.id)
assert stored_task.status == TaskStatus.COMPLETED.value
assert stored_task.result == {"output": "success"}
assert stored_task.completed_at is not None
def test_fail_task(self, local_router, sample_task):
"""Test marking task as failed."""
local_router.enqueue(sample_task)
result = local_router.dequeue()
task, _ = result
local_router.fail(task.id, error="Something went wrong")
stored_task = local_router.get_task(task.id)
assert stored_task.status == TaskStatus.FAILED.value
assert stored_task.error == "Something went wrong"
============================================================================
Test: Custom Routing Rules
============================================================================
class TestCustomRouting: """Tests for custom routing rules."""
def test_add_custom_rule(self, local_router):
"""Test adding custom routing rule."""
rule = RoutingRule(
name="security-tasks",
target_queue="critical",
priority=5, # High priority rule
condition=lambda t: "security" in t.description.lower(),
)
local_router.add_queue(QueueConfig(name="security", weight=8.0))
rule.target_queue = "security"
local_router.add_rule(rule)
# Create a security-related task
security_task = Task(
description="Security audit required",
agent="security-agent",
priority=5, # Normal priority
)
task_id, queue = local_router.enqueue(security_task)
assert queue == "security"
def test_rule_priority_ordering(self, local_router):
"""Test that rules are evaluated in priority order."""
# Add two competing rules
rule1 = RoutingRule(
name="rule-high",
target_queue="high",
priority=10, # Lower number = higher priority
condition=lambda t: True,
)
rule2 = RoutingRule(
name="rule-normal",
target_queue="normal",
priority=20,
condition=lambda t: True,
)
local_router.add_rule(rule1)
local_router.add_rule(rule2)
task = Task(description="Test", agent="a", priority=5)
_, queue = local_router.enqueue(task)
# Should match rule1 first (priority 10)
assert queue == "high"
def test_remove_rule(self, local_router):
"""Test removing a routing rule."""
rule = RoutingRule(
name="temp-rule",
target_queue="high",
priority=1,
condition=lambda t: True,
)
local_router.add_rule(rule)
assert local_router.remove_rule("temp-rule") is True
assert local_router.remove_rule("nonexistent") is False
============================================================================
Test: Queue Management
============================================================================
class TestQueueManagement: """Tests for queue configuration and management."""
def test_default_queues_exist(self, local_router):
"""Test that default queues are initialized."""
assert "critical" in local_router._queue_configs
assert "high" in local_router._queue_configs
assert "normal" in local_router._queue_configs
assert "background" in local_router._queue_configs
def test_add_custom_queue(self, local_router):
"""Test adding a custom queue."""
config = QueueConfig(
name="custom-queue",
weight=4.0,
max_capacity=50,
priority_floor=5,
priority_ceiling=7,
)
local_router.add_queue(config)
assert "custom-queue" in local_router._queue_configs
assert local_router._queue_configs["custom-queue"].weight == 4.0
def test_remove_custom_queue(self, local_router):
"""Test removing a custom queue."""
config = QueueConfig(name="removable", weight=1.0)
local_router.add_queue(config)
task = Task(description="Test", agent="a", priority=5)
local_router.enqueue(task, target_queue="removable")
# Remove queue - tasks should move to normal
result = local_router.remove_queue("removable")
assert result is True
assert "removable" not in local_router._queue_configs
def test_cannot_remove_core_queues(self, local_router):
"""Test that core queues cannot be removed."""
assert local_router.remove_queue("critical") is False
assert local_router.remove_queue("high") is False
assert local_router.remove_queue("normal") is False
assert local_router.remove_queue("background") is False
============================================================================
Test: Priority Boosting
============================================================================
class TestPriorityBoosting: """Tests for priority boosting of aging tasks."""
def test_linear_boost(self, local_router):
"""Test linear priority boosting."""
# Configure queue with linear boost
local_router._queue_configs["background"].boost_policy = BoostPolicy.LINEAR.value
local_router._queue_configs["background"].boost_interval_seconds = 0.1
local_router._queue_configs["background"].boost_amount = 1
local_router._queue_configs["background"].max_boost = 3
task = Task(description="Test", agent="a", priority=2)
local_router.enqueue(task)
# Wait for boost interval
time.sleep(0.2)
# Apply boost
local_router.boost_aging_tasks()
# Check task was boosted
boosted_task = local_router.get_task(task.id)
assert boosted_task.priority > 2
assert boosted_task.metadata.get("boosted") is True
def test_boost_respects_max(self, local_router):
"""Test that boosting respects max_boost limit and max priority ceiling."""
local_router._queue_configs["background"].boost_policy = BoostPolicy.LINEAR.value
local_router._queue_configs["background"].boost_interval_seconds = 0.01
local_router._queue_configs["background"].boost_amount = 2
local_router._queue_configs["background"].max_boost = 3
task = Task(description="Test", agent="a", priority=2)
local_router.enqueue(task)
time.sleep(0.1)
# Apply multiple boosts
for _ in range(5):
local_router.boost_aging_tasks()
time.sleep(0.02)
boosted_task = local_router.get_task(task.id)
# Priority should be boosted but not exceed absolute max (10)
assert boosted_task.priority <= 10
# Should have been boosted
assert boosted_task.priority > 2
# Max boost should be applied correctly
assert boosted_task.metadata.get("boost_amount") <= 3
============================================================================
Test: Statistics
============================================================================
class TestStatistics: """Tests for router statistics."""
def test_router_stats(self, local_router):
"""Test getting router statistics."""
# Enqueue some tasks
for i in range(5):
task = Task(description=f"Task {i}", agent="a", priority=5)
local_router.enqueue(task)
stats = local_router.get_stats()
assert isinstance(stats, RouterStats)
assert stats.total_queues == 4
assert stats.total_tasks == 5
assert stats.strategy == DequeueStrategy.STRICT_PRIORITY.value
assert "normal" in stats.tasks_per_queue
assert stats.tasks_per_queue["normal"] == 5
def test_queue_stats(self, local_router):
"""Test getting individual queue statistics."""
task = Task(description="Test", agent="a", priority=5)
local_router.enqueue(task)
stats = local_router.get_queue_stats("normal")
assert isinstance(stats, QueueStats)
assert stats.name == "normal"
assert stats.size == 1
def test_routing_hits_tracking(self, local_router):
"""Test that routing hits are tracked."""
for i in range(3):
task = Task(description=f"Task {i}", agent="a", priority=9)
local_router.enqueue(task)
stats = local_router.get_stats()
assert stats.routing_hits.get("critical", 0) == 3
============================================================================
Test: Async Router (Local Mode)
============================================================================
class TestAsyncRouter: """Tests for async PriorityQueueRouter."""
@pytest.mark.asyncio
async def test_connect_local_mode(self):
"""Test connecting in local fallback mode."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)
assert router.is_connected
assert router.is_local_mode
await router.disconnect()
assert not router.is_connected
@pytest.mark.asyncio
async def test_route_and_enqueue(self):
"""Test async route and enqueue."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)
task = Task(description="Async test", agent="test-agent", priority=7)
task_id, queue = await router.route_and_enqueue(task)
assert task_id == task.id
assert queue == "high" # Priority 7 goes to high queue
await router.disconnect()
@pytest.mark.asyncio
async def test_async_dequeue(self):
"""Test async dequeue."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)
task = Task(description="Async test", agent="test-agent", priority=5)
await router.route_and_enqueue(task)
result = await router.dequeue()
assert result is not None
dequeued_task, queue = result
assert dequeued_task.id == task.id
await router.disconnect()
@pytest.mark.asyncio
async def test_async_complete(self):
"""Test async task completion."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)
task = Task(description="Async test", agent="test-agent", priority=5)
await router.route_and_enqueue(task)
result = await router.dequeue()
dequeued_task, _ = result
await router.complete(dequeued_task.id, result={"done": True})
stored = await router.get_task(dequeued_task.id)
assert stored.status == TaskStatus.COMPLETED.value
await router.disconnect()
@pytest.mark.asyncio
async def test_async_stats(self):
"""Test async statistics."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)
for i in range(3):
task = Task(description=f"Task {i}", agent="a", priority=5)
await router.route_and_enqueue(task)
stats = await router.get_stats()
assert stats.total_tasks == 3
await router.disconnect()
============================================================================
Test: Dequeue Strategies
============================================================================
class TestDequeueStrategies: """Tests for different dequeue strategies."""
def test_weighted_strategy_distribution(self, local_router):
"""Test weighted strategy roughly follows weights."""
local_router.set_strategy(DequeueStrategy.WEIGHTED)
# Add tasks to different queues
for _ in range(100):
# Critical queue (weight 10)
local_router.enqueue(
Task(description="c", agent="a", priority=10),
target_queue="critical"
)
# Background queue (weight 1)
local_router.enqueue(
Task(description="b", agent="a", priority=1),
target_queue="background"
)
# Dequeue many times and track which queue was selected
queue_counts = {"critical": 0, "background": 0}
for _ in range(100):
result = local_router.dequeue()
if result:
_, queue = result
queue_counts[queue] = queue_counts.get(queue, 0) + 1
# Critical should be selected more often (roughly 10:1 ratio)
# Allow some variance due to randomness
assert queue_counts["critical"] > queue_counts["background"]
def test_fair_share_strategy(self, local_router):
"""Test fair share strategy drains largest queue first."""
local_router.set_strategy(DequeueStrategy.FAIR_SHARE)
# Add more tasks to critical queue
for _ in range(10):
local_router.enqueue(
Task(description="c", agent="a", priority=10),
target_queue="critical"
)
# Add fewer to normal
for _ in range(2):
local_router.enqueue(
Task(description="n", agent="a", priority=5),
target_queue="normal"
)
# First dequeue should come from critical (largest queue)
result = local_router.dequeue()
assert result is not None
_, queue = result
assert queue == "critical"
============================================================================
Test: Convenience Functions
============================================================================
class TestConvenienceFunctions: """Tests for convenience functions."""
@pytest.mark.asyncio
async def test_create_priority_router(self):
"""Test create_priority_router convenience function."""
router = await create_priority_router()
assert router.is_connected
assert router.is_local_mode # No Redis in test env
await router.disconnect()
def test_create_routing_rule(self):
"""Test create_routing_rule convenience function."""
rule = create_routing_rule(
name="test-rule",
target_queue="test",
priority=50,
condition=lambda t: t.priority > 5,
)
assert rule.name == "test-rule"
assert rule.target_queue == "test"
assert rule.priority == 50
assert rule.condition is not None
============================================================================
Test: Edge Cases
============================================================================
class TestEdgeCases: """Tests for edge cases and error handling."""
def test_enqueue_to_nonexistent_queue(self, local_router):
"""Test enqueueing to nonexistent queue falls back to normal."""
task = Task(description="Test", agent="a", priority=5)
_, queue = local_router.enqueue(task, target_queue="nonexistent")
assert queue == "normal"
def test_dequeue_from_empty_specific_queue(self, local_router):
"""Test dequeueing from specific empty queue."""
# Clear the queue
local_router._queues["critical"] = []
result = local_router.dequeue_from("critical")
assert result is None
def test_complete_nonexistent_task(self, local_router):
"""Test completing nonexistent task doesn't error."""
# Should not raise
local_router.complete("nonexistent-id", result={})
def test_fail_nonexistent_task(self, local_router):
"""Test failing nonexistent task doesn't error."""
# Should not raise
local_router.fail("nonexistent-id", error="test")
def test_get_nonexistent_task(self, local_router):
"""Test getting nonexistent task returns None."""
result = local_router.get_task("nonexistent-id")
assert result is None
def test_invalid_expression_in_rule(self, local_router):
"""Test rule with invalid expression doesn't crash."""
rule = RoutingRule(
name="bad-rule",
target_queue="normal",
condition_expr="invalid syntax !!!",
)
task = Task(description="Test", agent="a", priority=5)
# Should return False, not crash
assert rule.matches(task) is False
============================================================================
Run tests
============================================================================
if name == "main": pytest.main([file, "-v", "--tb=short"])