CODITECT Priority Queue Router - Intelligent Task Routing System
Part of Track H.2.3: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications
This module provides:
- PriorityQueueRouter: Multi-queue router with configurable routing rules
- RoutingRule: Condition-based routing to named queues
- QueueConfig: Per-queue configuration (weights, limits, policies)
- DequeueStrategy: Different strategies for selecting next task
Features:
- Multiple named queues (critical, high, normal, background, agent-specific)
- Configurable routing rules based on agent, priority, metadata, tenant
- Dequeue strategies: strict priority, weighted, round-robin, fair share
- Priority boosting for aging tasks (starvation prevention)
- Queue capacity limits and rate limiting
- Load balancing across agent-specific queues
Usage: from scripts.core.priority_queue_router import PriorityQueueRouter, RoutingRule
router = PriorityQueueRouter()
await router.connect()
# Add routing rules
router.add_rule(RoutingRule(
name="critical-tasks",
condition=lambda t: t.priority >= 9,
target_queue="critical"
))
# Route and enqueue task
await router.route_and_enqueue(task)
# Dequeue from all queues based on strategy
next_task = await router.dequeue()
Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0
File: priority_queue_router.py
Classes
DequeueStrategy
Strategy for selecting which queue to dequeue from.
BoostPolicy
Policy for boosting task priority over time.
QueueConfig
Configuration for a named queue.
RoutingRule
Rule for routing tasks to specific queues.
QueueStats
Statistics for a single queue.
RouterStats
Aggregate statistics for the router.
LocalPriorityQueueRouter
Local in-memory priority queue router for development.
PriorityQueueRouter
Priority Queue Router with intelligent task routing.
Functions
create_routing_rule(name, target_queue, priority, condition, condition_expr)
Create a routing rule.
to_dict()
Convert to dictionary.
from_dict(cls, data)
Create from dictionary.
matches(task)
Check if task matches this rule.
to_dict()
Convert to dictionary (without callable).
from_dict(cls, data)
Create from dictionary.
add_queue(config)
Add or update a queue configuration.
remove_queue(name)
Remove a queue (tasks are moved to 'normal' queue).
add_rule(rule)
Add a routing rule.
remove_rule(name)
Remove a routing rule by name.
set_strategy(strategy)
Set the dequeue strategy.
enqueue(task, target_queue)
Enqueue a task, optionally to a specific queue.
dequeue()
Dequeue the next task based on current strategy.
dequeue_from(queue_name)
Dequeue from a specific queue.
complete(task_id, result)
Mark task as complete.
Usage
python priority_queue_router.py