Skip to main content

CODITECT Priority Queue Router - Intelligent Task Routing System

Part of Track H.2.3: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications

This module provides:

  • PriorityQueueRouter: Multi-queue router with configurable routing rules
  • RoutingRule: Condition-based routing to named queues
  • QueueConfig: Per-queue configuration (weights, limits, policies)
  • DequeueStrategy: Different strategies for selecting next task

Features:

  • Multiple named queues (critical, high, normal, background, agent-specific)
  • Configurable routing rules based on agent, priority, metadata, tenant
  • Dequeue strategies: strict priority, weighted, round-robin, fair share
  • Priority boosting for aging tasks (starvation prevention)
  • Queue capacity limits and rate limiting
  • Load balancing across agent-specific queues

Usage: from scripts.core.priority_queue_router import PriorityQueueRouter, RoutingRule

router = PriorityQueueRouter()
await router.connect()

# Add routing rules
router.add_rule(RoutingRule(
name="critical-tasks",
condition=lambda t: t.priority >= 9,
target_queue="critical"
))

# Route and enqueue task
await router.route_and_enqueue(task)

# Dequeue from all queues based on strategy
next_task = await router.dequeue()

Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0

File: priority_queue_router.py

Classes

DequeueStrategy

Strategy for selecting which queue to dequeue from.

BoostPolicy

Policy for boosting task priority over time.

QueueConfig

Configuration for a named queue.

RoutingRule

Rule for routing tasks to specific queues.

QueueStats

Statistics for a single queue.

RouterStats

Aggregate statistics for the router.

LocalPriorityQueueRouter

Local in-memory priority queue router for development.

PriorityQueueRouter

Priority Queue Router with intelligent task routing.

Functions

create_routing_rule(name, target_queue, priority, condition, condition_expr)

Create a routing rule.

to_dict()

Convert to dictionary.

from_dict(cls, data)

Create from dictionary.

matches(task)

Check if task matches this rule.

to_dict()

Convert to dictionary (without callable).

from_dict(cls, data)

Create from dictionary.

add_queue(config)

Add or update a queue configuration.

remove_queue(name)

Remove a queue (tasks are moved to 'normal' queue).

add_rule(rule)

Add a routing rule.

remove_rule(name)

Remove a routing rule by name.

set_strategy(strategy)

Set the dequeue strategy.

enqueue(task, target_queue)

Enqueue a task, optionally to a specific queue.

dequeue()

Dequeue the next task based on current strategy.

dequeue_from(queue_name)

Dequeue from a specific queue.

complete(task_id, result)

Mark task as complete.

Usage

python priority_queue_router.py