CODITECT Priority Queue Router
Implements intelligent task routing based on:
- Task priority (CRITICAL > HIGH > MEDIUM > LOW)
- Agent capabilities (skill matching)
- Agent availability (status, health, load)
- Load balancing (least-loaded first)
Part of Track H.2: Inter-Agent Communication Infrastructure
Components:
- AgentDiscoveryService: Registry of agents with capabilities
- PriorityRouter: Routes tasks to optimal agents
- RoutingStrategy: Different routing algorithms
Usage: from priority_router import PriorityRouter, AgentDiscoveryService
# Initialize services
discovery = AgentDiscoveryService()
router = PriorityRouter(discovery)
# Register an agent
await discovery.register_agent(agent)
# Route a task
decision = await router.route_task(task)
Author: CODITECT Framework Version: 1.0.0 Created: January 8, 2026
File: priority_router.py
Classes
AgentStatus
Agent availability status
RoutingStrategy
Task routing strategies
RoutingResult
Outcome of routing attempt
AgentCapability
What an agent can do
Agent
Registered agent with capabilities
RoutingDecision
Result of routing a task
AgentDiscoveryConfig
Configuration for Agent Discovery Service
LocalAgentRegistry
In-memory agent registry for development/testing
AgentDiscoveryService
Registry and discovery service for all agents.
PriorityRouter
Intelligent task router that selects the optimal agent for each task.
Functions
create_agent(agent_id, name, capabilities, agent_type, max_concurrency)
Helper to create an Agent with basic capabilities.
to_dict()
No description
from_dict(cls, data)
No description
load_ratio()
Current load as fraction of max
is_available()
Check if agent can accept tasks
available_slots()
Number of tasks agent can accept
has_capability(capability_name)
Check if agent has a specific capability
get_capability(capability_name)
Get capability by name
to_dict()
No description
to_json()
No description
from_dict(cls, data)
No description
from_json(cls, json_str)
No description
to_dict()
No description
from_env(cls)
Create config from environment variables
get_stats()
Get registry statistics
Usage
python priority_router.py