Skip to main content

#!/usr/bin/env python3 """ CODITECT Priority Queue Router - Intelligent Task Routing System

Part of Track H.2.3: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications

This module provides:

  • PriorityQueueRouter: Multi-queue router with configurable routing rules
  • RoutingRule: Condition-based routing to named queues
  • QueueConfig: Per-queue configuration (weights, limits, policies)
  • DequeueStrategy: Different strategies for selecting next task

Features:

  • Multiple named queues (critical, high, normal, background, agent-specific)
  • Configurable routing rules based on agent, priority, metadata, tenant
  • Dequeue strategies: strict priority, weighted, round-robin, fair share
  • Priority boosting for aging tasks (starvation prevention)
  • Queue capacity limits and rate limiting
  • Load balancing across agent-specific queues

Usage: from scripts.core.priority_queue_router import PriorityQueueRouter, RoutingRule

router = PriorityQueueRouter()
await router.connect()

# Add routing rules
router.add_rule(RoutingRule(
name="critical-tasks",
condition=lambda t: t.priority >= 9,
target_queue="critical"
))

# Route and enqueue task
await router.route_and_enqueue(task)

# Dequeue from all queues based on strategy
next_task = await router.dequeue()

Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0 """

import asyncio import json import logging import os import random import time from dataclasses import dataclass, field, asdict from datetime import datetime, timedelta from enum import Enum from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union import heapq import threading import re

Import from sibling module

from scripts.core.task_queue_manager import ( Task, TaskStatus, TaskPriority, TaskQueueManager, TaskQueueConfig, LocalTaskQueue, REDIS_AVAILABLE, )

Configure logging

logger = logging.getLogger(name)

class DequeueStrategy(Enum): """Strategy for selecting which queue to dequeue from.""" STRICT_PRIORITY = "strict_priority" # Always highest priority queue first WEIGHTED = "weighted" # Probabilistic based on queue weights ROUND_ROBIN = "round_robin" # Fair rotation across queues FAIR_SHARE = "fair_share" # Based on queue utilization

class BoostPolicy(Enum): """Policy for boosting task priority over time.""" NONE = "none" # No boosting LINEAR = "linear" # Linear increase over time EXPONENTIAL = "exponential" # Exponential increase STEPPED = "stepped" # Stepped increases at thresholds

@dataclass class QueueConfig: """ Configuration for a named queue.

Attributes:
name: Queue identifier
weight: Relative weight for weighted dequeue (higher = more likely)
max_capacity: Maximum tasks allowed in queue (0 = unlimited)
rate_limit_per_second: Max dequeue rate (0 = unlimited)
priority_floor: Minimum priority to route to this queue
priority_ceiling: Maximum priority to route to this queue
boost_policy: How to boost aging tasks
boost_interval_seconds: How often to apply boost
boost_amount: Priority increase per boost
max_boost: Maximum total boost allowed
dedicated_agents: List of agent IDs this queue serves (empty = all)
"""
name: str
weight: float = 1.0
max_capacity: int = 0
rate_limit_per_second: float = 0.0
priority_floor: int = 1
priority_ceiling: int = 10
boost_policy: str = BoostPolicy.NONE.value
boost_interval_seconds: int = 300 # 5 minutes
boost_amount: int = 1
max_boost: int = 3
dedicated_agents: List[str] = field(default_factory=list)

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QueueConfig":
"""Create from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})

@dataclass class RoutingRule: """ Rule for routing tasks to specific queues.

Attributes:
name: Rule identifier
target_queue: Queue to route matching tasks to
priority: Rule evaluation priority (lower = evaluated first)
condition: Function that takes a Task and returns True if rule matches
condition_expr: String expression for serialization (optional)
enabled: Whether rule is active
"""
name: str
target_queue: str
priority: int = 100
condition: Optional[Callable[[Task], bool]] = None
condition_expr: Optional[str] = None
enabled: bool = True

def matches(self, task: Task) -> bool:
"""Check if task matches this rule."""
if not self.enabled:
return False

if self.condition:
try:
return self.condition(task)
except Exception as e:
logger.warning(f"Rule {self.name} condition error: {e}")
return False

if self.condition_expr:
return self._evaluate_expr(task)

return False

def _evaluate_expr(self, task: Task) -> bool:
"""Evaluate string expression against task."""
try:
# Safe evaluation with limited context
context = {
"task": task,
"priority": task.priority,
"agent": task.agent,
"status": task.status,
"metadata": task.metadata,
"re": re,
}
return bool(eval(self.condition_expr, {"__builtins__": {}}, context))
except Exception as e:
logger.warning(f"Rule {self.name} expression error: {e}")
return False

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary (without callable)."""
return {
"name": self.name,
"target_queue": self.target_queue,
"priority": self.priority,
"condition_expr": self.condition_expr,
"enabled": self.enabled,
}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "RoutingRule":
"""Create from dictionary."""
return cls(
name=data["name"],
target_queue=data["target_queue"],
priority=data.get("priority", 100),
condition_expr=data.get("condition_expr"),
enabled=data.get("enabled", True),
)

@dataclass class QueueStats: """Statistics for a single queue.""" name: str size: int = 0 in_progress: int = 0 completed_total: int = 0 failed_total: int = 0 avg_wait_time_ms: float = 0.0 avg_processing_time_ms: float = 0.0 current_rate: float = 0.0 last_dequeue_time: Optional[str] = None

@dataclass class RouterStats: """Aggregate statistics for the router.""" total_queues: int = 0 total_tasks: int = 0 total_in_progress: int = 0 tasks_per_queue: Dict[str, int] = field(default_factory=dict) routing_hits: Dict[str, int] = field(default_factory=dict) strategy: str = "" uptime_seconds: float = 0.0

class LocalPriorityQueueRouter: """ Local in-memory priority queue router for development.

Provides the same interface as Redis-backed router but uses
Python data structures for local testing.
"""

def __init__(self):
self._queues: Dict[str, List[Tuple[int, float, str]]] = {} # name -> heap
self._tasks: Dict[str, Task] = {}
self._queue_configs: Dict[str, QueueConfig] = {}
self._task_queues: Dict[str, str] = {} # task_id -> queue_name
self._routing_rules: List[RoutingRule] = []
self._dequeue_strategy = DequeueStrategy.STRICT_PRIORITY
self._round_robin_index = 0
self._lock = threading.Lock()
self._stats: Dict[str, QueueStats] = {}
self._routing_hits: Dict[str, int] = {}
self._start_time = time.time()

# Initialize default queues
self._init_default_queues()
logger.info("Using local in-memory priority queue router")

def _init_default_queues(self):
"""Initialize default queue hierarchy."""
default_queues = [
QueueConfig(
name="critical",
weight=10.0,
priority_floor=9,
priority_ceiling=10,
boost_policy=BoostPolicy.NONE.value,
),
QueueConfig(
name="high",
weight=5.0,
priority_floor=7,
priority_ceiling=8,
boost_policy=BoostPolicy.LINEAR.value,
boost_interval_seconds=600,
),
QueueConfig(
name="normal",
weight=3.0,
priority_floor=4,
priority_ceiling=6,
boost_policy=BoostPolicy.LINEAR.value,
boost_interval_seconds=300,
),
QueueConfig(
name="background",
weight=1.0,
priority_floor=1,
priority_ceiling=3,
boost_policy=BoostPolicy.STEPPED.value,
boost_interval_seconds=900,
),
]

for config in default_queues:
self.add_queue(config)

# Add default routing rules
self._routing_rules = [
RoutingRule(
name="critical-priority",
target_queue="critical",
priority=10,
condition=lambda t: t.priority >= 9,
),
RoutingRule(
name="high-priority",
target_queue="high",
priority=20,
condition=lambda t: 7 <= t.priority <= 8,
),
RoutingRule(
name="normal-priority",
target_queue="normal",
priority=30,
condition=lambda t: 4 <= t.priority <= 6,
),
RoutingRule(
name="background-priority",
target_queue="background",
priority=40,
condition=lambda t: t.priority <= 3,
),
]

def add_queue(self, config: QueueConfig):
"""Add or update a queue configuration."""
with self._lock:
self._queue_configs[config.name] = config
if config.name not in self._queues:
self._queues[config.name] = []
if config.name not in self._stats:
self._stats[config.name] = QueueStats(name=config.name)

def remove_queue(self, name: str) -> bool:
"""Remove a queue (tasks are moved to 'normal' queue)."""
with self._lock:
if name not in self._queues or name in ("critical", "high", "normal", "background"):
return False

# Move tasks to normal queue
for neg_priority, timestamp, task_id in self._queues[name]:
heapq.heappush(self._queues["normal"], (neg_priority, timestamp, task_id))
self._task_queues[task_id] = "normal"

del self._queues[name]
del self._queue_configs[name]
del self._stats[name]
return True

def add_rule(self, rule: RoutingRule):
"""Add a routing rule."""
with self._lock:
# Remove existing rule with same name
self._routing_rules = [r for r in self._routing_rules if r.name != rule.name]
self._routing_rules.append(rule)
# Sort by priority (lower = higher priority)
self._routing_rules.sort(key=lambda r: r.priority)

def remove_rule(self, name: str) -> bool:
"""Remove a routing rule by name."""
with self._lock:
initial_count = len(self._routing_rules)
self._routing_rules = [r for r in self._routing_rules if r.name != name]
return len(self._routing_rules) < initial_count

def set_strategy(self, strategy: DequeueStrategy):
"""Set the dequeue strategy."""
self._dequeue_strategy = strategy

def _determine_queue(self, task: Task) -> str:
"""Determine which queue a task should be routed to."""
# Check routing rules in priority order
for rule in self._routing_rules:
if rule.matches(task):
if rule.target_queue in self._queues:
return rule.target_queue

# Check for agent-specific queue
agent_queue = f"agent:{task.agent}"
if agent_queue in self._queues:
return agent_queue

# Check dedicated agent queues
for name, config in self._queue_configs.items():
if task.agent in config.dedicated_agents:
return name

# Default: route by priority floor/ceiling
for name, config in self._queue_configs.items():
if config.priority_floor <= task.priority <= config.priority_ceiling:
return name

return "normal"

def enqueue(
self,
task: Task,
target_queue: Optional[str] = None,
) -> Tuple[str, str]:
"""
Enqueue a task, optionally to a specific queue.

Returns:
Tuple of (task_id, queue_name)
"""
with self._lock:
# Determine target queue
queue_name = target_queue or self._determine_queue(task)

# Ensure queue exists
if queue_name not in self._queues:
logger.warning(f"Queue {queue_name} not found, using 'normal'")
queue_name = "normal"

# Check capacity
config = self._queue_configs.get(queue_name)
if config and config.max_capacity > 0:
if len(self._queues[queue_name]) >= config.max_capacity:
logger.warning(f"Queue {queue_name} at capacity, rejecting task")
raise ValueError(f"Queue {queue_name} is at capacity")

# Store task
self._tasks[task.id] = task
self._task_queues[task.id] = queue_name

# Add to queue (negative priority for max-heap behavior)
heapq.heappush(
self._queues[queue_name],
(-task.priority, time.time(), task.id)
)

# Update stats
self._stats[queue_name].size += 1
self._routing_hits[queue_name] = self._routing_hits.get(queue_name, 0) + 1

logger.debug(f"Enqueued task {task.id} to queue {queue_name}")
return task.id, queue_name

def _select_queue_strict(self) -> Optional[str]:
"""Select queue using strict priority (highest weight with tasks)."""
best_queue = None
best_weight = -1

for name, config in sorted(
self._queue_configs.items(),
key=lambda x: -x[1].weight
):
if self._queues[name]:
if config.weight > best_weight:
best_weight = config.weight
best_queue = name

return best_queue

def _select_queue_weighted(self) -> Optional[str]:
"""Select queue using weighted probability."""
non_empty = [
(name, config.weight)
for name, config in self._queue_configs.items()
if self._queues[name]
]

if not non_empty:
return None

total_weight = sum(w for _, w in non_empty)
r = random.random() * total_weight

cumulative = 0
for name, weight in non_empty:
cumulative += weight
if r <= cumulative:
return name

return non_empty[-1][0]

def _select_queue_round_robin(self) -> Optional[str]:
"""Select queue using round-robin."""
queue_names = list(self._queue_configs.keys())
if not queue_names:
return None

# Try each queue in order starting from current index
for i in range(len(queue_names)):
idx = (self._round_robin_index + i) % len(queue_names)
name = queue_names[idx]
if self._queues[name]:
self._round_robin_index = (idx + 1) % len(queue_names)
return name

return None

def _select_queue_fair_share(self) -> Optional[str]:
"""Select queue based on fair share (least utilized)."""
non_empty = [
(name, len(self._queues[name]))
for name in self._queue_configs
if self._queues[name]
]

if not non_empty:
return None

# Select queue with most tasks (to drain it)
return max(non_empty, key=lambda x: x[1])[0]

def dequeue(self) -> Optional[Tuple[Task, str]]:
"""
Dequeue the next task based on current strategy.

Returns:
Tuple of (task, queue_name) or None if all queues empty
"""
with self._lock:
# Select queue based on strategy
if self._dequeue_strategy == DequeueStrategy.STRICT_PRIORITY:
queue_name = self._select_queue_strict()
elif self._dequeue_strategy == DequeueStrategy.WEIGHTED:
queue_name = self._select_queue_weighted()
elif self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
queue_name = self._select_queue_round_robin()
elif self._dequeue_strategy == DequeueStrategy.FAIR_SHARE:
queue_name = self._select_queue_fair_share()
else:
queue_name = self._select_queue_strict()

if not queue_name:
return None

# Get task from selected queue
queue = self._queues[queue_name]
while queue:
neg_priority, timestamp, task_id = heapq.heappop(queue)

if task_id not in self._tasks:
continue

task = self._tasks[task_id]
if task.status not in (TaskStatus.PENDING.value, TaskStatus.READY.value):
continue

# Update task status
task.status = TaskStatus.IN_PROGRESS.value
task.started_at = datetime.utcnow().isoformat()

# Update stats
self._stats[queue_name].size -= 1
self._stats[queue_name].in_progress += 1
self._stats[queue_name].last_dequeue_time = task.started_at

logger.debug(f"Dequeued task {task.id} from queue {queue_name}")
return task, queue_name

return None

def dequeue_from(self, queue_name: str) -> Optional[Task]:
"""Dequeue from a specific queue."""
with self._lock:
if queue_name not in self._queues:
return None

queue = self._queues[queue_name]
while queue:
neg_priority, timestamp, task_id = heapq.heappop(queue)

if task_id not in self._tasks:
continue

task = self._tasks[task_id]
if task.status not in (TaskStatus.PENDING.value, TaskStatus.READY.value):
continue

task.status = TaskStatus.IN_PROGRESS.value
task.started_at = datetime.utcnow().isoformat()

self._stats[queue_name].size -= 1
self._stats[queue_name].in_progress += 1

return task

return None

def complete(self, task_id: str, result: Optional[Dict[str, Any]] = None):
"""Mark task as complete."""
with self._lock:
if task_id not in self._tasks:
return

task = self._tasks[task_id]
task.status = TaskStatus.COMPLETED.value
task.completed_at = datetime.utcnow().isoformat()
task.result = result

queue_name = self._task_queues.get(task_id, "normal")
if queue_name in self._stats:
self._stats[queue_name].in_progress -= 1
self._stats[queue_name].completed_total += 1

def fail(self, task_id: str, error: str):
"""Mark task as failed."""
with self._lock:
if task_id not in self._tasks:
return

task = self._tasks[task_id]
task.status = TaskStatus.FAILED.value
task.error = error
task.completed_at = datetime.utcnow().isoformat()

queue_name = self._task_queues.get(task_id, "normal")
if queue_name in self._stats:
self._stats[queue_name].in_progress -= 1
self._stats[queue_name].failed_total += 1

def get_task(self, task_id: str) -> Optional[Task]:
"""Get task by ID."""
return self._tasks.get(task_id)

def get_queue_stats(self, queue_name: str) -> Optional[QueueStats]:
"""Get statistics for a specific queue."""
return self._stats.get(queue_name)

def get_stats(self) -> RouterStats:
"""Get aggregate router statistics."""
with self._lock:
return RouterStats(
total_queues=len(self._queues),
total_tasks=sum(len(q) for q in self._queues.values()),
total_in_progress=sum(s.in_progress for s in self._stats.values()),
tasks_per_queue={name: len(q) for name, q in self._queues.items()},
routing_hits=self._routing_hits.copy(),
strategy=self._dequeue_strategy.value,
uptime_seconds=time.time() - self._start_time,
)

def boost_aging_tasks(self):
"""Apply priority boost to aging tasks."""
with self._lock:
now = time.time()

for queue_name, config in self._queue_configs.items():
if config.boost_policy == BoostPolicy.NONE.value:
continue

boosted = []
remaining = []

for neg_priority, enqueue_time, task_id in self._queues[queue_name]:
if task_id not in self._tasks:
continue

task = self._tasks[task_id]
age_seconds = now - enqueue_time

if age_seconds >= config.boost_interval_seconds:
# Calculate boost
intervals = int(age_seconds / config.boost_interval_seconds)

if config.boost_policy == BoostPolicy.LINEAR.value:
boost = min(intervals * config.boost_amount, config.max_boost)
elif config.boost_policy == BoostPolicy.EXPONENTIAL.value:
boost = min(2 ** intervals - 1, config.max_boost)
elif config.boost_policy == BoostPolicy.STEPPED.value:
boost = min(intervals, config.max_boost)
else:
boost = 0

if boost > 0:
new_priority = min(task.priority + boost, 10)
task.priority = new_priority
task.metadata["boosted"] = True
task.metadata["boost_amount"] = boost
boosted.append((
-new_priority,
enqueue_time,
task_id
))
continue

remaining.append((neg_priority, enqueue_time, task_id))

# Rebuild queue with boosted priorities
self._queues[queue_name] = remaining + boosted
heapq.heapify(self._queues[queue_name])

if boosted:
logger.debug(f"Boosted {len(boosted)} tasks in queue {queue_name}")

class PriorityQueueRouter: """ Priority Queue Router with intelligent task routing.

Features:
- Multiple named queues with configurable weights
- Routing rules for automatic queue selection
- Multiple dequeue strategies (strict, weighted, round-robin, fair-share)
- Priority boosting for aging tasks
- Queue capacity limits
- Comprehensive statistics

Falls back to local in-memory implementation if Redis unavailable.
"""

def __init__(self, config: Optional[TaskQueueConfig] = None):
"""
Initialize PriorityQueueRouter.

Args:
config: TaskQueueConfig for Redis connection
"""
self.config = config or TaskQueueConfig.from_env()
self._redis = None
self._local_router: Optional[LocalPriorityQueueRouter] = None
self._connected = False
self._dequeue_strategy = DequeueStrategy.STRICT_PRIORITY
self._queue_configs: Dict[str, QueueConfig] = {}
self._routing_rules: List[RoutingRule] = []
self._start_time = time.time()

@property
def is_connected(self) -> bool:
"""Check if connected."""
return self._connected

@property
def is_local_mode(self) -> bool:
"""Check if using local mode."""
return self._local_router is not None

async def connect(self, use_local_fallback: bool = True) -> bool:
"""
Connect to Redis.

Args:
use_local_fallback: Use local implementation if Redis unavailable

Returns:
True if connected
"""
if not REDIS_AVAILABLE:
if use_local_fallback:
self._local_router = LocalPriorityQueueRouter()
self._connected = True
logger.info("Connected to local priority queue router")
return True
raise ImportError("redis package required")

try:
import redis.asyncio as aioredis

self._redis = aioredis.from_url(
self.config.url,
decode_responses=self.config.decode_responses,
)

await self._redis.ping()
self._connected = True

# Initialize default queues in Redis
await self._init_redis_queues()

logger.info("Connected to Redis priority queue router")
return True

except Exception as e:
logger.error(f"Redis connection failed: {e}")
if use_local_fallback:
self._local_router = LocalPriorityQueueRouter()
self._connected = True
return True
raise

async def _init_redis_queues(self):
"""Initialize default queues in Redis."""
default_queues = [
QueueConfig(name="critical", weight=10.0, priority_floor=9, priority_ceiling=10),
QueueConfig(name="high", weight=5.0, priority_floor=7, priority_ceiling=8),
QueueConfig(name="normal", weight=3.0, priority_floor=4, priority_ceiling=6),
QueueConfig(name="background", weight=1.0, priority_floor=1, priority_ceiling=3),
]

for config in default_queues:
await self.add_queue(config)

# Add default routing rules
self._routing_rules = [
RoutingRule(
name="critical-priority",
target_queue="critical",
priority=10,
condition=lambda t: t.priority >= 9,
),
RoutingRule(
name="high-priority",
target_queue="high",
priority=20,
condition=lambda t: 7 <= t.priority <= 8,
),
RoutingRule(
name="normal-priority",
target_queue="normal",
priority=30,
condition=lambda t: 4 <= t.priority <= 6,
),
RoutingRule(
name="background-priority",
target_queue="background",
priority=40,
condition=lambda t: t.priority <= 3,
),
]

async def disconnect(self):
"""Disconnect from queue backend."""
if self._redis:
await self._redis.close()
self._redis = None

self._local_router = None
self._connected = False
logger.info("Disconnected from priority queue router")

async def add_queue(self, config: QueueConfig):
"""Add or update a queue configuration."""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
self._local_router.add_queue(config)
return

# Store config in Redis
config_key = f"{self.config.task_prefix}:queue_config:{config.name}"
await self._redis.hset(config_key, mapping={
"name": config.name,
"weight": config.weight,
"max_capacity": config.max_capacity,
"rate_limit_per_second": config.rate_limit_per_second,
"priority_floor": config.priority_floor,
"priority_ceiling": config.priority_ceiling,
"boost_policy": config.boost_policy,
"boost_interval_seconds": config.boost_interval_seconds,
"boost_amount": config.boost_amount,
"max_boost": config.max_boost,
"dedicated_agents": json.dumps(config.dedicated_agents),
})

self._queue_configs[config.name] = config
logger.debug(f"Added queue: {config.name}")

async def remove_queue(self, name: str) -> bool:
"""Remove a queue."""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
return self._local_router.remove_queue(name)

# Prevent removing core queues
if name in ("critical", "high", "normal", "background"):
return False

config_key = f"{self.config.task_prefix}:queue_config:{name}"
queue_key = f"{self.config.task_prefix}:queue:{name}"

await self._redis.delete(config_key, queue_key)
self._queue_configs.pop(name, None)

return True

def add_rule(self, rule: RoutingRule):
"""Add a routing rule."""
if self._local_router:
self._local_router.add_rule(rule)
return

# Remove existing with same name
self._routing_rules = [r for r in self._routing_rules if r.name != rule.name]
self._routing_rules.append(rule)
self._routing_rules.sort(key=lambda r: r.priority)

def remove_rule(self, name: str) -> bool:
"""Remove a routing rule."""
if self._local_router:
return self._local_router.remove_rule(name)

initial = len(self._routing_rules)
self._routing_rules = [r for r in self._routing_rules if r.name != name]
return len(self._routing_rules) < initial

def set_strategy(self, strategy: DequeueStrategy):
"""Set dequeue strategy."""
self._dequeue_strategy = strategy
if self._local_router:
self._local_router.set_strategy(strategy)

def _determine_queue(self, task: Task) -> str:
"""Determine target queue for task."""
# Check routing rules
for rule in self._routing_rules:
if rule.matches(task):
if rule.target_queue in self._queue_configs:
return rule.target_queue

# Check by priority range
for name, config in self._queue_configs.items():
if config.priority_floor <= task.priority <= config.priority_ceiling:
return name

return "normal"

async def route_and_enqueue(
self,
task: Task,
target_queue: Optional[str] = None,
) -> Tuple[str, str]:
"""
Route task to appropriate queue and enqueue.

Args:
task: Task to enqueue
target_queue: Force routing to specific queue

Returns:
Tuple of (task_id, queue_name)
"""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
return self._local_router.enqueue(task, target_queue)

# Determine queue
queue_name = target_queue or self._determine_queue(task)

# Store task
task_key = f"{self.config.task_prefix}:{task.id}"
await self._redis.hset(task_key, mapping={
"id": task.id,
"description": task.description,
"agent": task.agent,
"inputs": json.dumps(task.inputs),
"status": TaskStatus.READY.value,
"priority": task.priority,
"created_at": task.created_at,
"queue": queue_name,
"metadata": json.dumps(task.metadata),
})

# Add to queue sorted set
queue_key = f"{self.config.task_prefix}:queue:{queue_name}"
await self._redis.zadd(queue_key, {task.id: task.priority})

# Track routing
stats_key = f"{self.config.task_prefix}:router_stats"
await self._redis.hincrby(stats_key, f"routing:{queue_name}", 1)

logger.debug(f"Routed task {task.id} to queue {queue_name}")
return task.id, queue_name

async def dequeue(self) -> Optional[Tuple[Task, str]]:
"""
Dequeue next task based on strategy.

Returns:
Tuple of (task, queue_name) or None
"""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
return self._local_router.dequeue()

# Select queue based on strategy
queue_name = await self._select_queue()
if not queue_name:
return None

# Dequeue from selected queue
task = await self.dequeue_from(queue_name)
if task:
return task, queue_name

return None

async def _select_queue(self) -> Optional[str]:
"""Select queue based on current strategy."""
if self._dequeue_strategy == DequeueStrategy.STRICT_PRIORITY:
return await self._select_queue_strict()
elif self._dequeue_strategy == DequeueStrategy.WEIGHTED:
return await self._select_queue_weighted()
elif self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
return await self._select_queue_round_robin()
elif self._dequeue_strategy == DequeueStrategy.FAIR_SHARE:
return await self._select_queue_fair_share()
return await self._select_queue_strict()

async def _select_queue_strict(self) -> Optional[str]:
"""Select highest weight non-empty queue."""
for name, config in sorted(
self._queue_configs.items(),
key=lambda x: -x[1].weight
):
queue_key = f"{self.config.task_prefix}:queue:{name}"
size = await self._redis.zcard(queue_key)
if size > 0:
return name
return None

async def _select_queue_weighted(self) -> Optional[str]:
"""Select queue with weighted probability."""
non_empty = []
for name, config in self._queue_configs.items():
queue_key = f"{self.config.task_prefix}:queue:{name}"
size = await self._redis.zcard(queue_key)
if size > 0:
non_empty.append((name, config.weight))

if not non_empty:
return None

total = sum(w for _, w in non_empty)
r = random.random() * total
cumulative = 0
for name, weight in non_empty:
cumulative += weight
if r <= cumulative:
return name
return non_empty[-1][0]

async def _select_queue_round_robin(self) -> Optional[str]:
"""Select queue using round-robin."""
stats_key = f"{self.config.task_prefix}:router_stats"
idx = await self._redis.hincrby(stats_key, "rr_index", 1)

queue_names = list(self._queue_configs.keys())
for i in range(len(queue_names)):
name = queue_names[(idx + i) % len(queue_names)]
queue_key = f"{self.config.task_prefix}:queue:{name}"
size = await self._redis.zcard(queue_key)
if size > 0:
return name
return None

async def _select_queue_fair_share(self) -> Optional[str]:
"""Select queue with most tasks (to drain it)."""
best_queue = None
best_size = 0

for name in self._queue_configs:
queue_key = f"{self.config.task_prefix}:queue:{name}"
size = await self._redis.zcard(queue_key)
if size > best_size:
best_size = size
best_queue = name

return best_queue

async def dequeue_from(self, queue_name: str) -> Optional[Task]:
"""Dequeue from specific queue."""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
return self._local_router.dequeue_from(queue_name)

queue_key = f"{self.config.task_prefix}:queue:{queue_name}"

# Get highest priority task
result = await self._redis.zpopmax(queue_key, count=1)
if not result:
return None

task_id, _ = result[0]
task_key = f"{self.config.task_prefix}:{task_id}"

task_data = await self._redis.hgetall(task_key)
if not task_data:
return None

task = Task(
id=task_data["id"],
description=task_data.get("description", ""),
agent=task_data.get("agent", ""),
inputs=json.loads(task_data.get("inputs", "{}")),
status=TaskStatus.IN_PROGRESS.value,
priority=int(task_data.get("priority", 5)),
created_at=task_data.get("created_at", ""),
metadata=json.loads(task_data.get("metadata", "{}")),
)

# Update status
await self._redis.hset(task_key, mapping={
"status": TaskStatus.IN_PROGRESS.value,
"started_at": datetime.utcnow().isoformat(),
})

return task

async def complete(self, task_id: str, result: Optional[Dict[str, Any]] = None):
"""Mark task complete."""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
self._local_router.complete(task_id, result)
return

task_key = f"{self.config.task_prefix}:{task_id}"
await self._redis.hset(task_key, mapping={
"status": TaskStatus.COMPLETED.value,
"completed_at": datetime.utcnow().isoformat(),
"result": json.dumps(result or {}),
})

async def fail(self, task_id: str, error: str):
"""Mark task failed."""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
self._local_router.fail(task_id, error)
return

task_key = f"{self.config.task_prefix}:{task_id}"
await self._redis.hset(task_key, mapping={
"status": TaskStatus.FAILED.value,
"error": error,
"completed_at": datetime.utcnow().isoformat(),
})

async def get_task(self, task_id: str) -> Optional[Task]:
"""Get task by ID."""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
return self._local_router.get_task(task_id)

task_key = f"{self.config.task_prefix}:{task_id}"
task_data = await self._redis.hgetall(task_key)

if not task_data:
return None

return Task(
id=task_data.get("id", task_id),
description=task_data.get("description", ""),
agent=task_data.get("agent", ""),
inputs=json.loads(task_data.get("inputs", "{}")),
status=task_data.get("status", TaskStatus.PENDING.value),
priority=int(task_data.get("priority", 5)),
created_at=task_data.get("created_at", ""),
started_at=task_data.get("started_at"),
completed_at=task_data.get("completed_at"),
result=json.loads(task_data.get("result", "null")),
error=task_data.get("error"),
metadata=json.loads(task_data.get("metadata", "{}")),
)

async def get_stats(self) -> RouterStats:
"""Get router statistics."""
if not self._connected:
raise RuntimeError("Not connected")

if self._local_router:
return self._local_router.get_stats()

tasks_per_queue = {}
for name in self._queue_configs:
queue_key = f"{self.config.task_prefix}:queue:{name}"
tasks_per_queue[name] = await self._redis.zcard(queue_key)

stats_key = f"{self.config.task_prefix}:router_stats"
routing_data = await self._redis.hgetall(stats_key)
routing_hits = {
k.replace("routing:", ""): int(v)
for k, v in routing_data.items()
if k.startswith("routing:")
}

return RouterStats(
total_queues=len(self._queue_configs),
total_tasks=sum(tasks_per_queue.values()),
total_in_progress=0, # Would need separate tracking
tasks_per_queue=tasks_per_queue,
routing_hits=routing_hits,
strategy=self._dequeue_strategy.value,
uptime_seconds=time.time() - self._start_time,
)

async def boost_aging_tasks(self):
"""Apply priority boost to aging tasks."""
if self._local_router:
self._local_router.boost_aging_tasks()
return

# Redis implementation would scan queues and update priorities
# This is a simplified version - production would use Lua script
logger.info("Boost aging tasks (Redis implementation pending)")

Convenience functions

async def create_priority_router( config: Optional[TaskQueueConfig] = None ) -> PriorityQueueRouter: """Create and connect a PriorityQueueRouter.""" router = PriorityQueueRouter(config) await router.connect() return router

def create_routing_rule( name: str, target_queue: str, priority: int = 100, condition: Optional[Callable[[Task], bool]] = None, condition_expr: Optional[str] = None, ) -> RoutingRule: """Create a routing rule.""" return RoutingRule( name=name, target_queue=target_queue, priority=priority, condition=condition, condition_expr=condition_expr, )

CLI for testing

if name == "main": import argparse import sys

parser = argparse.ArgumentParser(description="CODITECT Priority Queue Router CLI")
parser.add_argument(
"command",
choices=["enqueue", "dequeue", "stats", "list-queues", "list-rules"],
help="Command to run"
)
parser.add_argument("--queue", help="Target queue name")
parser.add_argument("--description", default="Test task", help="Task description")
parser.add_argument("--agent", default="test-agent", help="Target agent")
parser.add_argument("--priority", type=int, default=5, help="Task priority (1-10)")
parser.add_argument(
"--strategy",
choices=["strict", "weighted", "round-robin", "fair-share"],
default="strict",
help="Dequeue strategy"
)
parser.add_argument("--local", action="store_true", help="Use local mode")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")

args = parser.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

async def main():
router = PriorityQueueRouter()

try:
await router.connect(use_local_fallback=args.local)
print(f"Connected (local mode: {router.is_local_mode})")

# Set strategy
strategy_map = {
"strict": DequeueStrategy.STRICT_PRIORITY,
"weighted": DequeueStrategy.WEIGHTED,
"round-robin": DequeueStrategy.ROUND_ROBIN,
"fair-share": DequeueStrategy.FAIR_SHARE,
}
router.set_strategy(strategy_map[args.strategy])

if args.command == "enqueue":
task = Task(
description=args.description,
agent=args.agent,
priority=args.priority,
)
task_id, queue_name = await router.route_and_enqueue(
task,
target_queue=args.queue,
)
print(f"Enqueued task {task_id} to queue '{queue_name}'")

elif args.command == "dequeue":
result = await router.dequeue()
if result:
task, queue_name = result
print(f"Dequeued from '{queue_name}':")
print(f" ID: {task.id}")
print(f" Description: {task.description}")
print(f" Agent: {task.agent}")
print(f" Priority: {task.priority}")
else:
print("No tasks available")

elif args.command == "stats":
stats = await router.get_stats()
print(f"Router Statistics:")
print(f" Total Queues: {stats.total_queues}")
print(f" Total Tasks: {stats.total_tasks}")
print(f" Strategy: {stats.strategy}")
print(f" Uptime: {stats.uptime_seconds:.1f}s")
print(f"\n Tasks per Queue:")
for name, count in stats.tasks_per_queue.items():
print(f" {name}: {count}")
if stats.routing_hits:
print(f"\n Routing Hits:")
for name, hits in stats.routing_hits.items():
print(f" {name}: {hits}")

elif args.command == "list-queues":
if router._local_router:
for name, config in router._local_router._queue_configs.items():
print(f"\n {name}:")
print(f" Weight: {config.weight}")
print(f" Priority Range: {config.priority_floor}-{config.priority_ceiling}")
print(f" Boost Policy: {config.boost_policy}")
else:
for name, config in router._queue_configs.items():
print(f"\n {name}:")
print(f" Weight: {config.weight}")
print(f" Priority Range: {config.priority_floor}-{config.priority_ceiling}")

elif args.command == "list-rules":
rules = (
router._local_router._routing_rules
if router._local_router
else router._routing_rules
)
for rule in rules:
status = "enabled" if rule.enabled else "disabled"
print(f"\n {rule.name} ({status}):")
print(f" Target Queue: {rule.target_queue}")
print(f" Priority: {rule.priority}")
if rule.condition_expr:
print(f" Expression: {rule.condition_expr}")

finally:
await router.disconnect()

asyncio.run(main())