Unit tests for CODITECT Priority Queue Router
Tests cover:
- Queue configuration and management
- Routing rules and task routing
- Dequeue strategies (strict, weighted, round-robin, fair-share)
- Priority boosting for aging tasks
- Statistics and monitoring
- Local fallback mode
Run with: python -m pytest scripts/core/test_priority_queue_router.py -v
Author: CODITECT Framework Created: January 8, 2026
File: test_priority_queue_router.py
Classes
TestQueueConfig
Tests for QueueConfig dataclass.
TestRoutingRules
Tests for RoutingRule matching logic.
TestLocalRouterEnqueue
Tests for task enqueueing in local router.
TestLocalRouterDequeue
Tests for task dequeueing in local router.
TestLocalRouterCompletion
Tests for task completion and failure.
TestCustomRouting
Tests for custom routing rules.
TestQueueManagement
Tests for queue configuration and management.
TestPriorityBoosting
Tests for priority boosting of aging tasks.
TestStatistics
Tests for router statistics.
TestAsyncRouter
Tests for async PriorityQueueRouter.
Functions
local_router()
Create a local priority queue router for testing.
sample_task()
Create a sample task.
critical_task()
Create a critical priority task.
background_task()
Create a background priority task.
test_default_queue_config()
Test default queue configuration values.
test_custom_queue_config()
Test custom queue configuration.
test_queue_config_serialization()
Test queue config to_dict and from_dict.
test_rule_with_lambda_condition(sample_task)
Test rule matching with lambda condition.
test_rule_with_expression(sample_task)
Test rule matching with string expression.
test_rule_with_metadata_expression(sample_task)
Test rule matching with metadata condition.
test_disabled_rule(sample_task)
Test that disabled rules never match.
test_rule_serialization()
Test rule to_dict and from_dict.
test_enqueue_basic(local_router, sample_task)
Test basic task enqueueing.
test_enqueue_critical_task(local_router, critical_task)
Test critical task routes to critical queue.
test_enqueue_background_task(local_router, background_task)
Test background task routes to background queue.
Usage
python test_priority_queue_router.py