Skip to main content

#!/usr/bin/env python3 """ Unit tests for CODITECT Priority Queue Router

Tests cover:

  • Queue configuration and management
  • Routing rules and task routing
  • Dequeue strategies (strict, weighted, round-robin, fair-share)
  • Priority boosting for aging tasks
  • Statistics and monitoring
  • Local fallback mode

Run with: python -m pytest scripts/core/test_priority_queue_router.py -v

Author: CODITECT Framework Created: January 8, 2026 """

import asyncio import pytest import time from unittest.mock import patch, MagicMock

Import module under test

import sys from pathlib import Path sys.path.insert(0, str(Path(file).parent.parent.parent))

from scripts.core.priority_queue_router import ( PriorityQueueRouter, LocalPriorityQueueRouter, QueueConfig, RoutingRule, DequeueStrategy, BoostPolicy, QueueStats, RouterStats, create_priority_router, create_routing_rule, ) from scripts.core.task_queue_manager import ( Task, TaskStatus, TaskPriority, )

============================================================================

Fixtures

============================================================================

@pytest.fixture def local_router(): """Create a local priority queue router for testing.""" return LocalPriorityQueueRouter()

@pytest.fixture def sample_task(): """Create a sample task.""" return Task( description="Test task", agent="test-agent", priority=TaskPriority.NORMAL.value, metadata={"test": True}, )

@pytest.fixture def critical_task(): """Create a critical priority task.""" return Task( description="Critical task", agent="critical-agent", priority=TaskPriority.CRITICAL.value, )

@pytest.fixture def background_task(): """Create a background priority task.""" return Task( description="Background task", agent="background-agent", priority=TaskPriority.BACKGROUND.value, )

============================================================================

Test: Queue Configuration

============================================================================

class TestQueueConfig: """Tests for QueueConfig dataclass."""

def test_default_queue_config(self):
"""Test default queue configuration values."""
config = QueueConfig(name="test-queue")

assert config.name == "test-queue"
assert config.weight == 1.0
assert config.max_capacity == 0 # Unlimited
assert config.priority_floor == 1
assert config.priority_ceiling == 10
assert config.boost_policy == BoostPolicy.NONE.value

def test_custom_queue_config(self):
"""Test custom queue configuration."""
config = QueueConfig(
name="high-priority",
weight=5.0,
max_capacity=100,
priority_floor=7,
priority_ceiling=10,
boost_policy=BoostPolicy.LINEAR.value,
dedicated_agents=["agent-a", "agent-b"],
)

assert config.weight == 5.0
assert config.max_capacity == 100
assert config.priority_floor == 7
assert config.dedicated_agents == ["agent-a", "agent-b"]

def test_queue_config_serialization(self):
"""Test queue config to_dict and from_dict."""
original = QueueConfig(
name="test",
weight=3.0,
boost_policy=BoostPolicy.EXPONENTIAL.value,
)

data = original.to_dict()
restored = QueueConfig.from_dict(data)

assert restored.name == original.name
assert restored.weight == original.weight
assert restored.boost_policy == original.boost_policy

============================================================================

Test: Routing Rules

============================================================================

class TestRoutingRules: """Tests for RoutingRule matching logic."""

def test_rule_with_lambda_condition(self, sample_task):
"""Test rule matching with lambda condition."""
rule = RoutingRule(
name="high-priority-rule",
target_queue="high",
condition=lambda t: t.priority >= 7,
)

sample_task.priority = 8
assert rule.matches(sample_task) is True

sample_task.priority = 5
assert rule.matches(sample_task) is False

def test_rule_with_expression(self, sample_task):
"""Test rule matching with string expression."""
rule = RoutingRule(
name="agent-rule",
target_queue="agent-specific",
condition_expr='agent == "test-agent"',
)

assert rule.matches(sample_task) is True

sample_task.agent = "other-agent"
assert rule.matches(sample_task) is False

def test_rule_with_metadata_expression(self, sample_task):
"""Test rule matching with metadata condition."""
rule = RoutingRule(
name="metadata-rule",
target_queue="special",
condition_expr='metadata.get("urgent", False) == True',
)

assert rule.matches(sample_task) is False

sample_task.metadata["urgent"] = True
assert rule.matches(sample_task) is True

def test_disabled_rule(self, sample_task):
"""Test that disabled rules never match."""
rule = RoutingRule(
name="disabled-rule",
target_queue="test",
condition=lambda t: True,
enabled=False,
)

assert rule.matches(sample_task) is False

def test_rule_serialization(self):
"""Test rule to_dict and from_dict."""
original = RoutingRule(
name="test-rule",
target_queue="test-queue",
priority=50,
condition_expr='priority > 5',
enabled=True,
)

data = original.to_dict()
restored = RoutingRule.from_dict(data)

assert restored.name == original.name
assert restored.target_queue == original.target_queue
assert restored.priority == original.priority
assert restored.condition_expr == original.condition_expr

============================================================================

Test: Local Router - Enqueueing

============================================================================

class TestLocalRouterEnqueue: """Tests for task enqueueing in local router."""

def test_enqueue_basic(self, local_router, sample_task):
"""Test basic task enqueueing."""
task_id, queue_name = local_router.enqueue(sample_task)

assert task_id == sample_task.id
assert queue_name == "normal" # Default for priority 5

def test_enqueue_critical_task(self, local_router, critical_task):
"""Test critical task routes to critical queue."""
task_id, queue_name = local_router.enqueue(critical_task)

assert queue_name == "critical"

def test_enqueue_background_task(self, local_router, background_task):
"""Test background task routes to background queue."""
task_id, queue_name = local_router.enqueue(background_task)

assert queue_name == "background"

def test_enqueue_to_specific_queue(self, local_router, sample_task):
"""Test forcing task to specific queue."""
task_id, queue_name = local_router.enqueue(
sample_task,
target_queue="high",
)

assert queue_name == "high"

def test_enqueue_updates_stats(self, local_router, sample_task):
"""Test that enqueueing updates statistics."""
initial_stats = local_router.get_stats()
initial_total = initial_stats.total_tasks

local_router.enqueue(sample_task)

new_stats = local_router.get_stats()
assert new_stats.total_tasks == initial_total + 1

============================================================================

Test: Local Router - Dequeueing

============================================================================

class TestLocalRouterDequeue: """Tests for task dequeueing in local router."""

def test_dequeue_empty(self, local_router):
"""Test dequeueing from empty queues."""
# Clear any default tasks
local_router._queues = {name: [] for name in local_router._queues}

result = local_router.dequeue()
assert result is None

def test_dequeue_strict_priority(self, local_router):
"""Test strict priority dequeue strategy."""
local_router.set_strategy(DequeueStrategy.STRICT_PRIORITY)

# Enqueue tasks at different priorities
low_task = Task(description="Low", agent="a", priority=2)
high_task = Task(description="High", agent="a", priority=8)

local_router.enqueue(low_task)
local_router.enqueue(high_task)

# Should get high priority first (from 'high' queue)
result = local_router.dequeue()
assert result is not None
task, queue = result
assert task.id == high_task.id
assert queue == "high"

def test_dequeue_round_robin(self, local_router):
"""Test round-robin dequeue strategy."""
local_router.set_strategy(DequeueStrategy.ROUND_ROBIN)

# Add tasks to multiple queues
tasks = []
for i, priority in enumerate([2, 5, 8]):
task = Task(description=f"Task {i}", agent="a", priority=priority)
local_router.enqueue(task)
tasks.append(task)

# Dequeue should rotate through queues
seen_queues = set()
for _ in range(3):
result = local_router.dequeue()
if result:
_, queue = result
seen_queues.add(queue)

# Should have hit multiple queues
assert len(seen_queues) >= 2

def test_dequeue_from_specific_queue(self, local_router, sample_task):
"""Test dequeueing from specific queue."""
local_router.enqueue(sample_task, target_queue="normal")

task = local_router.dequeue_from("normal")
assert task is not None
assert task.id == sample_task.id
assert task.status == TaskStatus.IN_PROGRESS.value

def test_dequeue_updates_task_status(self, local_router, sample_task):
"""Test that dequeue updates task status."""
local_router.enqueue(sample_task)

result = local_router.dequeue()
task, _ = result

assert task.status == TaskStatus.IN_PROGRESS.value
assert task.started_at is not None

============================================================================

Test: Local Router - Task Completion

============================================================================

class TestLocalRouterCompletion: """Tests for task completion and failure."""

def test_complete_task(self, local_router, sample_task):
"""Test marking task as complete."""
local_router.enqueue(sample_task)
result = local_router.dequeue()
task, _ = result

local_router.complete(task.id, result={"output": "success"})

stored_task = local_router.get_task(task.id)
assert stored_task.status == TaskStatus.COMPLETED.value
assert stored_task.result == {"output": "success"}
assert stored_task.completed_at is not None

def test_fail_task(self, local_router, sample_task):
"""Test marking task as failed."""
local_router.enqueue(sample_task)
result = local_router.dequeue()
task, _ = result

local_router.fail(task.id, error="Something went wrong")

stored_task = local_router.get_task(task.id)
assert stored_task.status == TaskStatus.FAILED.value
assert stored_task.error == "Something went wrong"

============================================================================

Test: Custom Routing Rules

============================================================================

class TestCustomRouting: """Tests for custom routing rules."""

def test_add_custom_rule(self, local_router):
"""Test adding custom routing rule."""
rule = RoutingRule(
name="security-tasks",
target_queue="critical",
priority=5, # High priority rule
condition=lambda t: "security" in t.description.lower(),
)

local_router.add_queue(QueueConfig(name="security", weight=8.0))
rule.target_queue = "security"
local_router.add_rule(rule)

# Create a security-related task
security_task = Task(
description="Security audit required",
agent="security-agent",
priority=5, # Normal priority
)

task_id, queue = local_router.enqueue(security_task)
assert queue == "security"

def test_rule_priority_ordering(self, local_router):
"""Test that rules are evaluated in priority order."""
# Add two competing rules
rule1 = RoutingRule(
name="rule-high",
target_queue="high",
priority=10, # Lower number = higher priority
condition=lambda t: True,
)
rule2 = RoutingRule(
name="rule-normal",
target_queue="normal",
priority=20,
condition=lambda t: True,
)

local_router.add_rule(rule1)
local_router.add_rule(rule2)

task = Task(description="Test", agent="a", priority=5)
_, queue = local_router.enqueue(task)

# Should match rule1 first (priority 10)
assert queue == "high"

def test_remove_rule(self, local_router):
"""Test removing a routing rule."""
rule = RoutingRule(
name="temp-rule",
target_queue="high",
priority=1,
condition=lambda t: True,
)

local_router.add_rule(rule)
assert local_router.remove_rule("temp-rule") is True
assert local_router.remove_rule("nonexistent") is False

============================================================================

Test: Queue Management

============================================================================

class TestQueueManagement: """Tests for queue configuration and management."""

def test_default_queues_exist(self, local_router):
"""Test that default queues are initialized."""
assert "critical" in local_router._queue_configs
assert "high" in local_router._queue_configs
assert "normal" in local_router._queue_configs
assert "background" in local_router._queue_configs

def test_add_custom_queue(self, local_router):
"""Test adding a custom queue."""
config = QueueConfig(
name="custom-queue",
weight=4.0,
max_capacity=50,
priority_floor=5,
priority_ceiling=7,
)

local_router.add_queue(config)

assert "custom-queue" in local_router._queue_configs
assert local_router._queue_configs["custom-queue"].weight == 4.0

def test_remove_custom_queue(self, local_router):
"""Test removing a custom queue."""
config = QueueConfig(name="removable", weight=1.0)
local_router.add_queue(config)

task = Task(description="Test", agent="a", priority=5)
local_router.enqueue(task, target_queue="removable")

# Remove queue - tasks should move to normal
result = local_router.remove_queue("removable")
assert result is True
assert "removable" not in local_router._queue_configs

def test_cannot_remove_core_queues(self, local_router):
"""Test that core queues cannot be removed."""
assert local_router.remove_queue("critical") is False
assert local_router.remove_queue("high") is False
assert local_router.remove_queue("normal") is False
assert local_router.remove_queue("background") is False

============================================================================

Test: Priority Boosting

============================================================================

class TestPriorityBoosting: """Tests for priority boosting of aging tasks."""

def test_linear_boost(self, local_router):
"""Test linear priority boosting."""
# Configure queue with linear boost
local_router._queue_configs["background"].boost_policy = BoostPolicy.LINEAR.value
local_router._queue_configs["background"].boost_interval_seconds = 0.1
local_router._queue_configs["background"].boost_amount = 1
local_router._queue_configs["background"].max_boost = 3

task = Task(description="Test", agent="a", priority=2)
local_router.enqueue(task)

# Wait for boost interval
time.sleep(0.2)

# Apply boost
local_router.boost_aging_tasks()

# Check task was boosted
boosted_task = local_router.get_task(task.id)
assert boosted_task.priority > 2
assert boosted_task.metadata.get("boosted") is True

def test_boost_respects_max(self, local_router):
"""Test that boosting respects max_boost limit and max priority ceiling."""
local_router._queue_configs["background"].boost_policy = BoostPolicy.LINEAR.value
local_router._queue_configs["background"].boost_interval_seconds = 0.01
local_router._queue_configs["background"].boost_amount = 2
local_router._queue_configs["background"].max_boost = 3

task = Task(description="Test", agent="a", priority=2)
local_router.enqueue(task)

time.sleep(0.1)

# Apply multiple boosts
for _ in range(5):
local_router.boost_aging_tasks()
time.sleep(0.02)

boosted_task = local_router.get_task(task.id)
# Priority should be boosted but not exceed absolute max (10)
assert boosted_task.priority <= 10
# Should have been boosted
assert boosted_task.priority > 2
# Max boost should be applied correctly
assert boosted_task.metadata.get("boost_amount") <= 3

============================================================================

Test: Statistics

============================================================================

class TestStatistics: """Tests for router statistics."""

def test_router_stats(self, local_router):
"""Test getting router statistics."""
# Enqueue some tasks
for i in range(5):
task = Task(description=f"Task {i}", agent="a", priority=5)
local_router.enqueue(task)

stats = local_router.get_stats()

assert isinstance(stats, RouterStats)
assert stats.total_queues == 4
assert stats.total_tasks == 5
assert stats.strategy == DequeueStrategy.STRICT_PRIORITY.value
assert "normal" in stats.tasks_per_queue
assert stats.tasks_per_queue["normal"] == 5

def test_queue_stats(self, local_router):
"""Test getting individual queue statistics."""
task = Task(description="Test", agent="a", priority=5)
local_router.enqueue(task)

stats = local_router.get_queue_stats("normal")

assert isinstance(stats, QueueStats)
assert stats.name == "normal"
assert stats.size == 1

def test_routing_hits_tracking(self, local_router):
"""Test that routing hits are tracked."""
for i in range(3):
task = Task(description=f"Task {i}", agent="a", priority=9)
local_router.enqueue(task)

stats = local_router.get_stats()
assert stats.routing_hits.get("critical", 0) == 3

============================================================================

Test: Async Router (Local Mode)

============================================================================

class TestAsyncRouter: """Tests for async PriorityQueueRouter."""

@pytest.mark.asyncio
async def test_connect_local_mode(self):
"""Test connecting in local fallback mode."""
router = PriorityQueueRouter()

await router.connect(use_local_fallback=True)

assert router.is_connected
assert router.is_local_mode

await router.disconnect()
assert not router.is_connected

@pytest.mark.asyncio
async def test_route_and_enqueue(self):
"""Test async route and enqueue."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)

task = Task(description="Async test", agent="test-agent", priority=7)
task_id, queue = await router.route_and_enqueue(task)

assert task_id == task.id
assert queue == "high" # Priority 7 goes to high queue

await router.disconnect()

@pytest.mark.asyncio
async def test_async_dequeue(self):
"""Test async dequeue."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)

task = Task(description="Async test", agent="test-agent", priority=5)
await router.route_and_enqueue(task)

result = await router.dequeue()
assert result is not None
dequeued_task, queue = result
assert dequeued_task.id == task.id

await router.disconnect()

@pytest.mark.asyncio
async def test_async_complete(self):
"""Test async task completion."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)

task = Task(description="Async test", agent="test-agent", priority=5)
await router.route_and_enqueue(task)

result = await router.dequeue()
dequeued_task, _ = result

await router.complete(dequeued_task.id, result={"done": True})

stored = await router.get_task(dequeued_task.id)
assert stored.status == TaskStatus.COMPLETED.value

await router.disconnect()

@pytest.mark.asyncio
async def test_async_stats(self):
"""Test async statistics."""
router = PriorityQueueRouter()
await router.connect(use_local_fallback=True)

for i in range(3):
task = Task(description=f"Task {i}", agent="a", priority=5)
await router.route_and_enqueue(task)

stats = await router.get_stats()
assert stats.total_tasks == 3

await router.disconnect()

============================================================================

Test: Dequeue Strategies

============================================================================

class TestDequeueStrategies: """Tests for different dequeue strategies."""

def test_weighted_strategy_distribution(self, local_router):
"""Test weighted strategy roughly follows weights."""
local_router.set_strategy(DequeueStrategy.WEIGHTED)

# Add tasks to different queues
for _ in range(100):
# Critical queue (weight 10)
local_router.enqueue(
Task(description="c", agent="a", priority=10),
target_queue="critical"
)
# Background queue (weight 1)
local_router.enqueue(
Task(description="b", agent="a", priority=1),
target_queue="background"
)

# Dequeue many times and track which queue was selected
queue_counts = {"critical": 0, "background": 0}
for _ in range(100):
result = local_router.dequeue()
if result:
_, queue = result
queue_counts[queue] = queue_counts.get(queue, 0) + 1

# Critical should be selected more often (roughly 10:1 ratio)
# Allow some variance due to randomness
assert queue_counts["critical"] > queue_counts["background"]

def test_fair_share_strategy(self, local_router):
"""Test fair share strategy drains largest queue first."""
local_router.set_strategy(DequeueStrategy.FAIR_SHARE)

# Add more tasks to critical queue
for _ in range(10):
local_router.enqueue(
Task(description="c", agent="a", priority=10),
target_queue="critical"
)

# Add fewer to normal
for _ in range(2):
local_router.enqueue(
Task(description="n", agent="a", priority=5),
target_queue="normal"
)

# First dequeue should come from critical (largest queue)
result = local_router.dequeue()
assert result is not None
_, queue = result
assert queue == "critical"

============================================================================

Test: Convenience Functions

============================================================================

class TestConvenienceFunctions: """Tests for convenience functions."""

@pytest.mark.asyncio
async def test_create_priority_router(self):
"""Test create_priority_router convenience function."""
router = await create_priority_router()

assert router.is_connected
assert router.is_local_mode # No Redis in test env

await router.disconnect()

def test_create_routing_rule(self):
"""Test create_routing_rule convenience function."""
rule = create_routing_rule(
name="test-rule",
target_queue="test",
priority=50,
condition=lambda t: t.priority > 5,
)

assert rule.name == "test-rule"
assert rule.target_queue == "test"
assert rule.priority == 50
assert rule.condition is not None

============================================================================

Test: Edge Cases

============================================================================

class TestEdgeCases: """Tests for edge cases and error handling."""

def test_enqueue_to_nonexistent_queue(self, local_router):
"""Test enqueueing to nonexistent queue falls back to normal."""
task = Task(description="Test", agent="a", priority=5)

_, queue = local_router.enqueue(task, target_queue="nonexistent")

assert queue == "normal"

def test_dequeue_from_empty_specific_queue(self, local_router):
"""Test dequeueing from specific empty queue."""
# Clear the queue
local_router._queues["critical"] = []

result = local_router.dequeue_from("critical")
assert result is None

def test_complete_nonexistent_task(self, local_router):
"""Test completing nonexistent task doesn't error."""
# Should not raise
local_router.complete("nonexistent-id", result={})

def test_fail_nonexistent_task(self, local_router):
"""Test failing nonexistent task doesn't error."""
# Should not raise
local_router.fail("nonexistent-id", error="test")

def test_get_nonexistent_task(self, local_router):
"""Test getting nonexistent task returns None."""
result = local_router.get_task("nonexistent-id")
assert result is None

def test_invalid_expression_in_rule(self, local_router):
"""Test rule with invalid expression doesn't crash."""
rule = RoutingRule(
name="bad-rule",
target_queue="normal",
condition_expr="invalid syntax !!!",
)

task = Task(description="Test", agent="a", priority=5)

# Should return False, not crash
assert rule.matches(task) is False

============================================================================

Run tests

============================================================================

if name == "main": pytest.main([file, "-v", "--tb=short"])