Skip to main content

Unit tests for CODITECT Priority Queue Router

Tests cover:

  • Queue configuration and management
  • Routing rules and task routing
  • Dequeue strategies (strict, weighted, round-robin, fair-share)
  • Priority boosting for aging tasks
  • Statistics and monitoring
  • Local fallback mode

Run with: python -m pytest scripts/core/test_priority_queue_router.py -v

Author: CODITECT Framework Created: January 8, 2026

File: test_priority_queue_router.py

Classes

TestQueueConfig

Tests for QueueConfig dataclass.

TestRoutingRules

Tests for RoutingRule matching logic.

TestLocalRouterEnqueue

Tests for task enqueueing in local router.

TestLocalRouterDequeue

Tests for task dequeueing in local router.

TestLocalRouterCompletion

Tests for task completion and failure.

TestCustomRouting

Tests for custom routing rules.

TestQueueManagement

Tests for queue configuration and management.

TestPriorityBoosting

Tests for priority boosting of aging tasks.

TestStatistics

Tests for router statistics.

TestAsyncRouter

Tests for async PriorityQueueRouter.

Functions

local_router()

Create a local priority queue router for testing.

sample_task()

Create a sample task.

critical_task()

Create a critical priority task.

background_task()

Create a background priority task.

test_default_queue_config()

Test default queue configuration values.

test_custom_queue_config()

Test custom queue configuration.

test_queue_config_serialization()

Test queue config to_dict and from_dict.

test_rule_with_lambda_condition(sample_task)

Test rule matching with lambda condition.

test_rule_with_expression(sample_task)

Test rule matching with string expression.

test_rule_with_metadata_expression(sample_task)

Test rule matching with metadata condition.

test_disabled_rule(sample_task)

Test that disabled rules never match.

test_rule_serialization()

Test rule to_dict and from_dict.

test_enqueue_basic(local_router, sample_task)

Test basic task enqueueing.

test_enqueue_critical_task(local_router, critical_task)

Test critical task routes to critical queue.

test_enqueue_background_task(local_router, background_task)

Test background task routes to background queue.

Usage

python test_priority_queue_router.py