#!/usr/bin/env python3 """ CODITECT Priority Queue Router
Implements intelligent task routing based on:
- Task priority (CRITICAL > HIGH > MEDIUM > LOW)
- Agent capabilities (skill matching)
- Agent availability (status, health, load)
- Load balancing (least-loaded first)
Part of Track H.2: Inter-Agent Communication Infrastructure
Components:
- AgentDiscoveryService: Registry of agents with capabilities
- PriorityRouter: Routes tasks to optimal agents
- RoutingStrategy: Different routing algorithms
Usage: from priority_router import PriorityRouter, AgentDiscoveryService
# Initialize services
discovery = AgentDiscoveryService()
router = PriorityRouter(discovery)
# Register an agent
await discovery.register_agent(agent)
# Route a task
decision = await router.route_task(task)
Author: CODITECT Framework Version: 1.0.0 Created: January 8, 2026 """
import os import json import asyncio import logging from enum import Enum from typing import List, Dict, Optional, Any, Callable, Set from dataclasses import dataclass, field, asdict from datetime import datetime, timedelta from collections import defaultdict import heapq
Configure logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
=============================================================================
Enums
=============================================================================
class AgentStatus(Enum): """Agent availability status""" AVAILABLE = "available" BUSY = "busy" OFFLINE = "offline" MAINTENANCE = "maintenance" DRAINING = "draining" # Not accepting new tasks, finishing current
class RoutingStrategy(Enum): """Task routing strategies""" LEAST_LOADED = "least_loaded" # Route to agent with lowest load ROUND_ROBIN = "round_robin" # Distribute evenly PRIORITY_FIRST = "priority_first" # High priority tasks get best agents CAPABILITY_MATCH = "capability_match" # Best capability match score COST_OPTIMIZED = "cost_optimized" # Minimize cost per invocation LATENCY_OPTIMIZED = "latency_optimized" # Minimize expected duration
class RoutingResult(Enum): """Outcome of routing attempt""" SUCCESS = "success" NO_CAPABLE_AGENTS = "no_capable_agents" ALL_AGENTS_BUSY = "all_agents_busy" TASK_EXPIRED = "task_expired" ROUTING_ERROR = "routing_error" QUEUED_FOR_RETRY = "queued_for_retry"
=============================================================================
Data Classes
=============================================================================
@dataclass class AgentCapability: """What an agent can do""" name: str description: str = "" input_schema: Dict = field(default_factory=dict) output_schema: Dict = field(default_factory=dict) required_tools: List[str] = field(default_factory=list) cost_per_invocation: float = 0.0 # Estimated cost in $ avg_duration_seconds: float = 60.0 success_rate: float = 0.95 # Historical success rate
def to_dict(self) -> Dict:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict) -> "AgentCapability":
return cls(**data)
@dataclass class Agent: """Registered agent with capabilities""" id: str name: str type: str # "claude", "gpt-4", "gemini", "custom", "coditect" capabilities: List[AgentCapability] status: AgentStatus = AgentStatus.AVAILABLE current_load: int = 0 max_concurrency: int = 5 health_score: float = 1.0 # 0.0 to 1.0 last_seen: datetime = field(default_factory=datetime.utcnow) metadata: Dict = field(default_factory=dict) priority_boost: int = 0 # Bonus for routing priority tags: Set[str] = field(default_factory=set)
@property
def load_ratio(self) -> float:
"""Current load as fraction of max"""
if self.max_concurrency == 0:
return 1.0
return self.current_load / self.max_concurrency
@property
def is_available(self) -> bool:
"""Check if agent can accept tasks"""
return (
self.status == AgentStatus.AVAILABLE and
self.current_load < self.max_concurrency
)
@property
def available_slots(self) -> int:
"""Number of tasks agent can accept"""
return max(0, self.max_concurrency - self.current_load)
def has_capability(self, capability_name: str) -> bool:
"""Check if agent has a specific capability"""
return any(c.name == capability_name for c in self.capabilities)
def get_capability(self, capability_name: str) -> Optional[AgentCapability]:
"""Get capability by name"""
for c in self.capabilities:
if c.name == capability_name:
return c
return None
def to_dict(self) -> Dict:
return {
"id": self.id,
"name": self.name,
"type": self.type,
"capabilities": [c.to_dict() for c in self.capabilities],
"status": self.status.value,
"current_load": self.current_load,
"max_concurrency": self.max_concurrency,
"health_score": self.health_score,
"last_seen": self.last_seen.isoformat(),
"metadata": self.metadata,
"priority_boost": self.priority_boost,
"tags": list(self.tags)
}
def to_json(self) -> str:
return json.dumps(self.to_dict())
@classmethod
def from_dict(cls, data: Dict) -> "Agent":
return cls(
id=data["id"],
name=data["name"],
type=data["type"],
capabilities=[AgentCapability.from_dict(c) for c in data.get("capabilities", [])],
status=AgentStatus(data.get("status", "available")),
current_load=data.get("current_load", 0),
max_concurrency=data.get("max_concurrency", 5),
health_score=data.get("health_score", 1.0),
last_seen=datetime.fromisoformat(data["last_seen"]) if "last_seen" in data else datetime.utcnow(),
metadata=data.get("metadata", {}),
priority_boost=data.get("priority_boost", 0),
tags=set(data.get("tags", []))
)
@classmethod
def from_json(cls, json_str: str) -> "Agent":
return cls.from_dict(json.loads(json_str))
@dataclass class RoutingDecision: """Result of routing a task""" task_id: str result: RoutingResult selected_agent: Optional[Agent] = None candidate_agents: List[Agent] = field(default_factory=list) routing_strategy: RoutingStrategy = RoutingStrategy.LEAST_LOADED score: float = 0.0 # Routing score for selected agent reason: str = "" timestamp: datetime = field(default_factory=datetime.utcnow) retry_after: Optional[float] = None # Seconds until retry if queued
def to_dict(self) -> Dict:
return {
"task_id": self.task_id,
"result": self.result.value,
"selected_agent_id": self.selected_agent.id if self.selected_agent else None,
"candidate_count": len(self.candidate_agents),
"routing_strategy": self.routing_strategy.value,
"score": self.score,
"reason": self.reason,
"timestamp": self.timestamp.isoformat(),
"retry_after": self.retry_after
}
@dataclass class AgentDiscoveryConfig: """Configuration for Agent Discovery Service""" redis_host: str = "localhost" redis_port: int = 6379 redis_db: int = 0 redis_password: Optional[str] = None agent_ttl_seconds: int = 300 # Auto-expire if no heartbeat capability_index_prefix: str = "capability:" agent_prefix: str = "agent:" health_check_interval: int = 30 min_health_score: float = 0.5 max_load_ratio: float = 0.9
@classmethod
def from_env(cls) -> "AgentDiscoveryConfig":
"""Create config from environment variables"""
return cls(
redis_host=os.environ.get("REDIS_HOST", "localhost"),
redis_port=int(os.environ.get("REDIS_PORT", "6379")),
redis_db=int(os.environ.get("REDIS_DB", "0")),
redis_password=os.environ.get("REDIS_PASSWORD"),
agent_ttl_seconds=int(os.environ.get("AGENT_TTL_SECONDS", "300")),
min_health_score=float(os.environ.get("MIN_HEALTH_SCORE", "0.5")),
max_load_ratio=float(os.environ.get("MAX_LOAD_RATIO", "0.9"))
)
=============================================================================
Local Agent Registry (Fallback)
=============================================================================
class LocalAgentRegistry: """In-memory agent registry for development/testing"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.capability_index: Dict[str, Set[str]] = defaultdict(set)
self.tag_index: Dict[str, Set[str]] = defaultdict(set)
self._lock = asyncio.Lock()
async def register(self, agent: Agent) -> str:
"""Register an agent"""
async with self._lock:
self.agents[agent.id] = agent
# Index by capability
for capability in agent.capabilities:
self.capability_index[capability.name].add(agent.id)
# Index by tags
for tag in agent.tags:
self.tag_index[tag].add(agent.id)
logger.info(f"Registered agent: {agent.id} ({agent.name})")
return agent.id
async def unregister(self, agent_id: str) -> bool:
"""Unregister an agent"""
async with self._lock:
if agent_id not in self.agents:
return False
agent = self.agents[agent_id]
# Remove from capability index
for capability in agent.capabilities:
self.capability_index[capability.name].discard(agent_id)
# Remove from tag index
for tag in agent.tags:
self.tag_index[tag].discard(agent_id)
del self.agents[agent_id]
logger.info(f"Unregistered agent: {agent_id}")
return True
async def get(self, agent_id: str) -> Optional[Agent]:
"""Get agent by ID"""
return self.agents.get(agent_id)
async def get_all(self) -> List[Agent]:
"""Get all registered agents"""
return list(self.agents.values())
async def find_by_capability(self, capability_name: str) -> List[Agent]:
"""Find agents with a specific capability"""
agent_ids = self.capability_index.get(capability_name, set())
return [self.agents[aid] for aid in agent_ids if aid in self.agents]
async def find_by_tag(self, tag: str) -> List[Agent]:
"""Find agents with a specific tag"""
agent_ids = self.tag_index.get(tag, set())
return [self.agents[aid] for aid in agent_ids if aid in self.agents]
async def update_status(self, agent_id: str, status: AgentStatus, load: int) -> bool:
"""Update agent status and load"""
if agent_id not in self.agents:
return False
async with self._lock:
agent = self.agents[agent_id]
agent.status = status
agent.current_load = load
agent.last_seen = datetime.utcnow()
return True
async def update_health(self, agent_id: str, health_score: float) -> bool:
"""Update agent health score"""
if agent_id not in self.agents:
return False
async with self._lock:
self.agents[agent_id].health_score = max(0.0, min(1.0, health_score))
return True
def get_stats(self) -> Dict:
"""Get registry statistics"""
total = len(self.agents)
available = sum(1 for a in self.agents.values() if a.is_available)
busy = sum(1 for a in self.agents.values() if a.status == AgentStatus.BUSY)
offline = sum(1 for a in self.agents.values() if a.status == AgentStatus.OFFLINE)
return {
"total_agents": total,
"available": available,
"busy": busy,
"offline": offline,
"capabilities": len(self.capability_index),
"tags": len(self.tag_index)
}
=============================================================================
Agent Discovery Service
=============================================================================
class AgentDiscoveryService: """ Registry and discovery service for all agents.
Features:
- Register/unregister agents with capabilities
- Find agents by capability
- Heartbeat mechanism for health tracking
- Load balancing information
- Redis backend (with local fallback)
"""
def __init__(self, config: Optional[AgentDiscoveryConfig] = None):
self.config = config or AgentDiscoveryConfig()
self.redis = None
self.local_registry = LocalAgentRegistry()
self._connected = False
self._use_local = True
async def connect(self, use_local_fallback: bool = True) -> bool:
"""Connect to Redis backend"""
if self._connected:
return True
try:
import redis.asyncio as redis
self.redis = redis.Redis(
host=self.config.redis_host,
port=self.config.redis_port,
db=self.config.redis_db,
password=self.config.redis_password,
decode_responses=False
)
# Test connection
await self.redis.ping()
self._connected = True
self._use_local = False
logger.info(f"Connected to Redis at {self.config.redis_host}:{self.config.redis_port}")
return True
except ImportError:
logger.warning("redis package not installed. Using local agent registry.")
self._use_local = True
self._connected = True
return use_local_fallback
except Exception as e:
logger.warning(f"Failed to connect to Redis: {e}. Using local agent registry.")
self._use_local = True
self._connected = use_local_fallback
return use_local_fallback
async def disconnect(self):
"""Disconnect from Redis"""
if self.redis:
await self.redis.close()
self.redis = None
self._connected = False
logger.info("Disconnected from Agent Discovery Service")
async def register_agent(self, agent: Agent) -> str:
"""Register an agent and return agent_id"""
if self._use_local:
return await self.local_registry.register(agent)
agent_key = f"{self.config.agent_prefix}{agent.id}"
# Store agent data
await self.redis.hset(agent_key, mapping={
b"data": agent.to_json().encode()
})
# Add to capability indexes
for capability in agent.capabilities:
await self.redis.sadd(
f"{self.config.capability_index_prefix}{capability.name}",
agent.id
)
# Set TTL for auto-cleanup
await self.redis.expire(agent_key, self.config.agent_ttl_seconds)
logger.info(f"Registered agent: {agent.id} ({agent.name})")
return agent.id
async def unregister_agent(self, agent_id: str) -> bool:
"""Unregister an agent"""
if self._use_local:
return await self.local_registry.unregister(agent_id)
agent = await self.get_agent(agent_id)
if not agent:
return False
# Remove from capability indexes
for capability in agent.capabilities:
await self.redis.srem(
f"{self.config.capability_index_prefix}{capability.name}",
agent_id
)
# Delete agent data
await self.redis.delete(f"{self.config.agent_prefix}{agent_id}")
logger.info(f"Unregistered agent: {agent_id}")
return True
async def get_agent(self, agent_id: str) -> Optional[Agent]:
"""Get agent by ID"""
if self._use_local:
return await self.local_registry.get(agent_id)
agent_key = f"{self.config.agent_prefix}{agent_id}"
data = await self.redis.hget(agent_key, b"data")
if not data:
return None
return Agent.from_json(data.decode())
async def get_all_agents(self) -> List[Agent]:
"""Get all registered agents"""
if self._use_local:
return await self.local_registry.get_all()
agents = []
pattern = f"{self.config.agent_prefix}*"
async for key in self.redis.scan_iter(match=pattern):
data = await self.redis.hget(key, b"data")
if data:
agents.append(Agent.from_json(data.decode()))
return agents
async def find_agents_by_capability(
self,
capability_name: str,
min_health_score: Optional[float] = None,
max_load_ratio: Optional[float] = None,
status_filter: Optional[List[AgentStatus]] = None,
skip_load_filter: bool = False
) -> List[Agent]:
"""
Find all agents that have a specific capability.
Args:
capability_name: Required capability
min_health_score: Minimum health score (0.0-1.0)
max_load_ratio: Maximum load ratio (0.0-1.0)
status_filter: Only include agents with these statuses
skip_load_filter: If True, skip load ratio filtering (for existence checks)
Returns:
List of matching agents, sorted by load (least loaded first)
"""
if self._use_local:
agents = await self.local_registry.find_by_capability(capability_name)
else:
agent_ids = await self.redis.smembers(
f"{self.config.capability_index_prefix}{capability_name}"
)
agents = []
for agent_id in agent_ids:
agent = await self.get_agent(agent_id.decode() if isinstance(agent_id, bytes) else agent_id)
if agent:
agents.append(agent)
# Apply filters
min_health = min_health_score if min_health_score is not None else self.config.min_health_score
max_load = max_load_ratio if max_load_ratio is not None else self.config.max_load_ratio
allowed_statuses = status_filter or [AgentStatus.AVAILABLE]
filtered = []
for a in agents:
if a.status not in allowed_statuses:
continue
# Skip health/load filters if just checking existence
if not skip_load_filter:
if a.health_score < min_health:
continue
if a.load_ratio > max_load:
continue
filtered.append(a)
# Sort by load ratio (least loaded first)
filtered.sort(key=lambda a: a.load_ratio)
return filtered
async def find_available_agents(
self,
capabilities: Optional[List[str]] = None,
min_slots: int = 1
) -> List[Agent]:
"""
Find all available agents, optionally filtered by capabilities.
Args:
capabilities: Optional list of required capabilities (agent must have ALL)
min_slots: Minimum available slots required
Returns:
List of available agents
"""
if self._use_local:
all_agents = await self.local_registry.get_all()
else:
all_agents = await self.get_all_agents()
available = []
for agent in all_agents:
if not agent.is_available:
continue
if agent.available_slots < min_slots:
continue
# Check capabilities if specified
if capabilities:
has_all = all(agent.has_capability(c) for c in capabilities)
if not has_all:
continue
available.append(agent)
return available
async def heartbeat(
self,
agent_id: str,
status: AgentStatus,
current_load: int,
health_score: Optional[float] = None
) -> bool:
"""
Agent heartbeat to keep registration alive.
Args:
agent_id: Agent identifier
status: Current status
current_load: Number of active tasks
health_score: Optional health score update
Returns:
True if heartbeat successful
"""
if self._use_local:
success = await self.local_registry.update_status(agent_id, status, current_load)
if success and health_score is not None:
await self.local_registry.update_health(agent_id, health_score)
return success
agent = await self.get_agent(agent_id)
if not agent:
return False
# Update agent state
agent.status = status
agent.current_load = current_load
agent.last_seen = datetime.utcnow()
if health_score is not None:
agent.health_score = max(0.0, min(1.0, health_score))
# Re-register with updated state
agent_key = f"{self.config.agent_prefix}{agent_id}"
await self.redis.hset(agent_key, mapping={
b"data": agent.to_json().encode()
})
# Refresh TTL
await self.redis.expire(agent_key, self.config.agent_ttl_seconds)
return True
async def increment_load(self, agent_id: str) -> bool:
"""Increment agent's current load"""
agent = await self.get_agent(agent_id)
if not agent:
return False
return await self.heartbeat(
agent_id,
agent.status,
agent.current_load + 1
)
async def decrement_load(self, agent_id: str) -> bool:
"""Decrement agent's current load"""
agent = await self.get_agent(agent_id)
if not agent:
return False
return await self.heartbeat(
agent_id,
agent.status,
max(0, agent.current_load - 1)
)
def get_stats(self) -> Dict:
"""Get discovery service statistics"""
if self._use_local:
stats = self.local_registry.get_stats()
else:
# Would need async call for Redis stats
stats = {"backend": "redis"}
stats["connected"] = self._connected
stats["using_local"] = self._use_local
return stats
=============================================================================
Priority Router
=============================================================================
class PriorityRouter: """ Intelligent task router that selects the optimal agent for each task.
Routing factors:
- Task priority (higher priority = better agents)
- Agent capabilities (must match required capability)
- Agent load (prefer less loaded agents)
- Agent health (prefer healthier agents)
- Cost optimization (optional)
- Latency optimization (optional)
"""
def __init__(
self,
discovery_service: AgentDiscoveryService,
default_strategy: RoutingStrategy = RoutingStrategy.LEAST_LOADED
):
self.discovery = discovery_service
self.default_strategy = default_strategy
self.routing_history: List[RoutingDecision] = []
self.max_history = 1000
# Round-robin state
self._round_robin_index: Dict[str, int] = defaultdict(int)
# Retry queue for tasks that couldn't be routed
self._retry_queue: List[tuple] = [] # (retry_time, task_id, capability)
async def route_task(
self,
task_id: str,
required_capability: str,
priority: int = 5,
strategy: Optional[RoutingStrategy] = None,
preferred_agent_id: Optional[str] = None,
excluded_agents: Optional[Set[str]] = None,
metadata: Optional[Dict] = None
) -> RoutingDecision:
"""
Route a task to the optimal agent.
Args:
task_id: Unique task identifier
required_capability: Capability the agent must have
priority: Task priority (1-10, higher = more urgent)
strategy: Routing strategy override
preferred_agent_id: Prefer this agent if available
excluded_agents: Do not route to these agents
metadata: Additional routing metadata
Returns:
RoutingDecision with selected agent or failure reason
"""
strategy = strategy or self.default_strategy
excluded = excluded_agents or set()
# Find capable agents
candidates = await self.discovery.find_agents_by_capability(
required_capability,
status_filter=[AgentStatus.AVAILABLE]
)
# Filter out excluded agents
candidates = [a for a in candidates if a.id not in excluded]
if not candidates:
# Check if any agents have the capability but are busy
# Use skip_load_filter=True to find agents regardless of load
all_capable = await self.discovery.find_agents_by_capability(
required_capability,
status_filter=[AgentStatus.AVAILABLE, AgentStatus.BUSY],
skip_load_filter=True
)
if all_capable:
# Agents exist but all busy - queue for retry
decision = RoutingDecision(
task_id=task_id,
result=RoutingResult.ALL_AGENTS_BUSY,
candidate_agents=[],
routing_strategy=strategy,
reason=f"All {len(all_capable)} capable agents are busy",
retry_after=5.0 # Retry in 5 seconds
)
else:
decision = RoutingDecision(
task_id=task_id,
result=RoutingResult.NO_CAPABLE_AGENTS,
candidate_agents=[],
routing_strategy=strategy,
reason=f"No agents found with capability: {required_capability}"
)
self._record_decision(decision)
return decision
# Check for preferred agent
if preferred_agent_id:
preferred = next((a for a in candidates if a.id == preferred_agent_id), None)
if preferred and preferred.is_available:
decision = RoutingDecision(
task_id=task_id,
result=RoutingResult.SUCCESS,
selected_agent=preferred,
candidate_agents=candidates,
routing_strategy=strategy,
score=1.0,
reason="Preferred agent selected"
)
self._record_decision(decision)
return decision
# Apply routing strategy
selected, score = await self._select_agent(
candidates, strategy, priority, required_capability, metadata
)
if selected:
decision = RoutingDecision(
task_id=task_id,
result=RoutingResult.SUCCESS,
selected_agent=selected,
candidate_agents=candidates,
routing_strategy=strategy,
score=score,
reason=f"Selected via {strategy.value} strategy"
)
else:
decision = RoutingDecision(
task_id=task_id,
result=RoutingResult.ROUTING_ERROR,
candidate_agents=candidates,
routing_strategy=strategy,
reason="Strategy failed to select agent"
)
self._record_decision(decision)
return decision
async def _select_agent(
self,
candidates: List[Agent],
strategy: RoutingStrategy,
priority: int,
capability: str,
metadata: Optional[Dict]
) -> tuple[Optional[Agent], float]:
"""Select agent based on strategy"""
if not candidates:
return None, 0.0
if strategy == RoutingStrategy.LEAST_LOADED:
return self._select_least_loaded(candidates)
elif strategy == RoutingStrategy.ROUND_ROBIN:
return self._select_round_robin(candidates, capability)
elif strategy == RoutingStrategy.PRIORITY_FIRST:
return self._select_priority_first(candidates, priority)
elif strategy == RoutingStrategy.CAPABILITY_MATCH:
return self._select_best_capability_match(candidates, capability)
elif strategy == RoutingStrategy.COST_OPTIMIZED:
return self._select_cost_optimized(candidates, capability)
elif strategy == RoutingStrategy.LATENCY_OPTIMIZED:
return self._select_latency_optimized(candidates, capability)
else:
# Default to least loaded
return self._select_least_loaded(candidates)
def _select_least_loaded(self, candidates: List[Agent]) -> tuple[Agent, float]:
"""Select agent with lowest load ratio"""
# Already sorted by load ratio from find_agents_by_capability
selected = candidates[0]
score = 1.0 - selected.load_ratio
return selected, score
def _select_round_robin(
self,
candidates: List[Agent],
capability: str
) -> tuple[Agent, float]:
"""Distribute tasks evenly across agents"""
index = self._round_robin_index[capability]
selected = candidates[index % len(candidates)]
self._round_robin_index[capability] = index + 1
return selected, 0.5 # Neutral score for round-robin
def _select_priority_first(
self,
candidates: List[Agent],
priority: int
) -> tuple[Agent, float]:
"""High priority tasks get agents with best health/performance"""
# Score agents by: health * (1 - load_ratio) + priority_boost
scored = []
for agent in candidates:
score = (
agent.health_score * (1 - agent.load_ratio) +
agent.priority_boost * 0.1
)
# Boost score for high-priority tasks
if priority >= 8: # CRITICAL or HIGH
score *= (1 + agent.health_score)
scored.append((score, agent))
scored.sort(reverse=True, key=lambda x: x[0])
return scored[0][1], scored[0][0]
def _select_best_capability_match(
self,
candidates: List[Agent],
capability: str
) -> tuple[Agent, float]:
"""Select agent with best success rate for this capability"""
scored = []
for agent in candidates:
cap = agent.get_capability(capability)
if cap:
score = cap.success_rate * agent.health_score
scored.append((score, agent))
if not scored:
return candidates[0], 0.5
scored.sort(reverse=True, key=lambda x: x[0])
return scored[0][1], scored[0][0]
def _select_cost_optimized(
self,
candidates: List[Agent],
capability: str
) -> tuple[Agent, float]:
"""Select cheapest agent that can handle the task"""
scored = []
for agent in candidates:
cap = agent.get_capability(capability)
if cap:
# Invert cost for scoring (lower cost = higher score)
if cap.cost_per_invocation > 0:
score = 1.0 / cap.cost_per_invocation
else:
score = float('inf')
scored.append((score, cap.cost_per_invocation, agent))
if not scored:
return candidates[0], 0.5
scored.sort(reverse=True, key=lambda x: x[0])
return scored[0][2], min(1.0, scored[0][0])
def _select_latency_optimized(
self,
candidates: List[Agent],
capability: str
) -> tuple[Agent, float]:
"""Select fastest agent for this capability"""
scored = []
for agent in candidates:
cap = agent.get_capability(capability)
if cap:
# Invert duration for scoring (lower duration = higher score)
if cap.avg_duration_seconds > 0:
score = 60.0 / cap.avg_duration_seconds # Normalize to ~1.0 for 60s
else:
score = float('inf')
scored.append((score, cap.avg_duration_seconds, agent))
if not scored:
return candidates[0], 0.5
scored.sort(reverse=True, key=lambda x: x[0])
return scored[0][2], min(1.0, scored[0][0])
def _record_decision(self, decision: RoutingDecision):
"""Record routing decision for analytics"""
self.routing_history.append(decision)
# Trim history if needed
if len(self.routing_history) > self.max_history:
self.routing_history = self.routing_history[-self.max_history:]
def get_routing_stats(self) -> Dict:
"""Get routing statistics"""
if not self.routing_history:
return {
"total_routes": 0,
"success_rate": 0.0,
"by_result": {},
"by_strategy": {}
}
total = len(self.routing_history)
successes = sum(1 for d in self.routing_history if d.result == RoutingResult.SUCCESS)
by_result = defaultdict(int)
by_strategy = defaultdict(int)
for d in self.routing_history:
by_result[d.result.value] += 1
by_strategy[d.routing_strategy.value] += 1
return {
"total_routes": total,
"success_rate": successes / total if total > 0 else 0.0,
"by_result": dict(by_result),
"by_strategy": dict(by_strategy)
}
def clear_history(self):
"""Clear routing history"""
self.routing_history.clear()
self._round_robin_index.clear()
=============================================================================
Integrated Router (combines MessageBus + TaskQueue + Discovery)
=============================================================================
class IntegratedRouter: """ High-level router that integrates: - AgentDiscoveryService (find agents) - PriorityRouter (select best agent) - MessageBus (send tasks) - optional - TaskQueueManager (manage queue) - optional """
def __init__(
self,
discovery: Optional[AgentDiscoveryService] = None,
router: Optional[PriorityRouter] = None,
message_bus = None, # MessageBus from message_bus.py
task_queue = None # TaskQueueManager from task_queue_manager.py
):
self.discovery = discovery or AgentDiscoveryService()
self.router = router or PriorityRouter(self.discovery)
self.message_bus = message_bus
self.task_queue = task_queue
self._connected = False
async def connect(self, use_local_fallback: bool = True) -> bool:
"""Connect all services"""
# Connect discovery service
discovery_ok = await self.discovery.connect(use_local_fallback)
# Connect message bus if provided
if self.message_bus:
try:
await self.message_bus.connect(use_local_fallback)
except Exception as e:
logger.warning(f"MessageBus connection failed: {e}")
# Connect task queue if provided
if self.task_queue:
try:
await self.task_queue.connect(use_local_fallback)
except Exception as e:
logger.warning(f"TaskQueue connection failed: {e}")
self._connected = discovery_ok
return self._connected
async def disconnect(self):
"""Disconnect all services"""
await self.discovery.disconnect()
if self.message_bus:
await self.message_bus.disconnect()
if self.task_queue:
await self.task_queue.disconnect()
self._connected = False
async def route_and_dispatch(
self,
task_id: str,
capability: str,
payload: Dict,
priority: int = 5,
strategy: Optional[RoutingStrategy] = None
) -> RoutingDecision:
"""
Route a task and dispatch it to the selected agent.
This is the main entry point for task routing:
1. Find capable agents
2. Select best agent via routing strategy
3. Dispatch task via message bus (if connected)
4. Update agent load
Returns:
RoutingDecision with dispatch status
"""
# Route the task
decision = await self.router.route_task(
task_id=task_id,
required_capability=capability,
priority=priority,
strategy=strategy
)
if decision.result != RoutingResult.SUCCESS:
return decision
# Dispatch via message bus if available
if self.message_bus and decision.selected_agent:
try:
from .message_bus import MessageBus
await self.message_bus.send_task(
from_agent="router",
to_agent=decision.selected_agent.id,
task_id=task_id,
payload=payload,
priority=priority
)
# Update agent load
await self.discovery.increment_load(decision.selected_agent.id)
decision.reason = f"Dispatched to {decision.selected_agent.id} via message bus"
except Exception as e:
logger.error(f"Failed to dispatch task: {e}")
decision.result = RoutingResult.ROUTING_ERROR
decision.reason = f"Dispatch failed: {e}"
return decision
async def register_agent(self, agent: Agent) -> str:
"""Register an agent with the discovery service"""
return await self.discovery.register_agent(agent)
async def unregister_agent(self, agent_id: str) -> bool:
"""Unregister an agent"""
return await self.discovery.unregister_agent(agent_id)
async def heartbeat(
self,
agent_id: str,
status: AgentStatus,
current_load: int
) -> bool:
"""Send agent heartbeat"""
return await self.discovery.heartbeat(agent_id, status, current_load)
def get_stats(self) -> Dict:
"""Get combined statistics"""
return {
"discovery": self.discovery.get_stats(),
"routing": self.router.get_routing_stats(),
"connected": self._connected
}
=============================================================================
Helper Functions
=============================================================================
def create_agent( agent_id: str, name: str, capabilities: List[str], agent_type: str = "coditect", max_concurrency: int = 5, **kwargs ) -> Agent: """ Helper to create an Agent with basic capabilities.
Args:
agent_id: Unique identifier
name: Display name
capabilities: List of capability names
agent_type: Agent type (claude, gpt-4, coditect, etc.)
max_concurrency: Max concurrent tasks
**kwargs: Additional agent properties
Returns:
Configured Agent instance
"""
caps = [
AgentCapability(name=c, description=f"Capability: {c}")
for c in capabilities
]
return Agent(
id=agent_id,
name=name,
type=agent_type,
capabilities=caps,
max_concurrency=max_concurrency,
**kwargs
)
async def create_discovery_service( use_redis: bool = True ) -> AgentDiscoveryService: """ Create and connect an AgentDiscoveryService.
Args:
use_redis: Whether to attempt Redis connection
Returns:
Connected AgentDiscoveryService
"""
config = AgentDiscoveryConfig.from_env()
service = AgentDiscoveryService(config)
await service.connect(use_local_fallback=True)
return service
async def create_router( strategy: RoutingStrategy = RoutingStrategy.LEAST_LOADED ) -> tuple[AgentDiscoveryService, PriorityRouter]: """ Create discovery service and router.
Returns:
Tuple of (AgentDiscoveryService, PriorityRouter)
"""
discovery = await create_discovery_service()
router = PriorityRouter(discovery, default_strategy=strategy)
return discovery, router
=============================================================================
Main (Demo/Testing)
=============================================================================
async def main(): """Demo the priority router system""" print("=" * 60) print("CODITECT Priority Router Demo") print("=" * 60)
# Create services
discovery = AgentDiscoveryService()
await discovery.connect(use_local_fallback=True)
router = PriorityRouter(discovery)
# Register some test agents
agents = [
create_agent(
"claude-1", "Claude Sonnet",
["code-review", "documentation", "testing"],
agent_type="claude",
max_concurrency=3
),
create_agent(
"claude-2", "Claude Opus",
["code-review", "architecture", "planning"],
agent_type="claude",
max_concurrency=2
),
create_agent(
"gpt-1", "GPT-4",
["code-review", "documentation"],
agent_type="gpt-4",
max_concurrency=5
),
create_agent(
"specialist-1", "Security Specialist",
["security-audit", "penetration-testing"],
agent_type="coditect",
max_concurrency=2
)
]
print("\nRegistering agents...")
for agent in agents:
await discovery.register_agent(agent)
print(f" Registered: {agent.id} ({agent.name})")
print(f" Capabilities: {[c.name for c in agent.capabilities]}")
# Test routing
print("\n" + "-" * 60)
print("Testing Routing Strategies")
print("-" * 60)
# Route a code review task
print("\n1. Route 'code-review' task (LEAST_LOADED):")
decision = await router.route_task(
task_id="task-001",
required_capability="code-review",
priority=7,
strategy=RoutingStrategy.LEAST_LOADED
)
print(f" Result: {decision.result.value}")
print(f" Selected: {decision.selected_agent.id if decision.selected_agent else 'None'}")
print(f" Candidates: {len(decision.candidate_agents)}")
# Update load and route again
if decision.selected_agent:
await discovery.heartbeat(decision.selected_agent.id, AgentStatus.AVAILABLE, 2)
# Route with ROUND_ROBIN
print("\n2. Route 'code-review' task (ROUND_ROBIN):")
for i in range(3):
decision = await router.route_task(
task_id=f"task-rr-{i}",
required_capability="code-review",
priority=5,
strategy=RoutingStrategy.ROUND_ROBIN
)
print(f" Round {i+1}: {decision.selected_agent.id if decision.selected_agent else 'None'}")
# Route a capability with no agents
print("\n3. Route 'machine-learning' task (no agents):")
decision = await router.route_task(
task_id="task-002",
required_capability="machine-learning",
priority=5
)
print(f" Result: {decision.result.value}")
print(f" Reason: {decision.reason}")
# Show statistics
print("\n" + "-" * 60)
print("Statistics")
print("-" * 60)
discovery_stats = discovery.get_stats()
print(f"\nDiscovery Service:")
for key, value in discovery_stats.items():
print(f" {key}: {value}")
routing_stats = router.get_routing_stats()
print(f"\nRouting:")
for key, value in routing_stats.items():
print(f" {key}: {value}")
# Cleanup
await discovery.disconnect()
print("\n" + "=" * 60)
print("Demo complete!")
if name == "main": asyncio.run(main())