#!/usr/bin/env python3 """ CODITECT Message Bus - RabbitMQ-based Inter-Agent Communication
Part of Track H.2: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications
This module provides:
- AgentMessage: Structured message format for inter-agent communication
- MessageBus: RabbitMQ client for sending/receiving messages
- MessageBusConfig: Configuration management
- Local fallback for development without RabbitMQ
Usage: # Async usage from scripts.core.message_bus import MessageBus, AgentMessage
bus = MessageBus()
await bus.connect()
# Send task to another agent
correlation_id = await bus.send_task(
from_agent="orchestrator",
to_agent="code-review-agent",
task_id="task-123",
payload={"code": "def foo(): pass"}
)
# Subscribe to messages
async def handler(message: AgentMessage):
print(f"Received: {message}")
await bus.subscribe("my-agent", handler)
Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0 """
import asyncio import json import logging import os import uuid from dataclasses import dataclass, field, asdict from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Union import queue import threading
Configure logging
logger = logging.getLogger(name)
Try to import aio_pika, fall back to local mode if not available
try: import aio_pika from aio_pika import Message, DeliveryMode, ExchangeType from aio_pika.abc import AbstractRobustConnection, AbstractChannel AIO_PIKA_AVAILABLE = True except ImportError: AIO_PIKA_AVAILABLE = False logger.warning("aio_pika not installed. Using local message queue fallback.")
class MessageType(Enum): """Types of messages that can be sent between agents.""" TASK_REQUEST = "task_request" TASK_RESPONSE = "task_response" EVENT = "event" QUERY = "query" HEARTBEAT = "heartbeat" BROADCAST = "broadcast" ERROR = "error"
class MessagePriority(Enum): """Message priority levels (1-10, higher = more urgent).""" LOWEST = 1 LOW = 3 NORMAL = 5 HIGH = 7 URGENT = 9 CRITICAL = 10
@dataclass class AgentMessage: """ Structured message format for inter-agent communication.
Attributes:
id: Unique message identifier
from_agent: Sender agent ID
to_agent: Recipient agent ID (or "*" for broadcast)
task_id: Associated task ID
message_type: Type of message
payload: Message content
correlation_id: For request-response pattern matching
timestamp: Message creation time
reply_to: Queue name for responses
priority: Message priority (1-10)
ttl_ms: Time-to-live in milliseconds
metadata: Additional metadata
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
task_id: str = ""
message_type: str = MessageType.TASK_REQUEST.value
payload: Dict[str, Any] = field(default_factory=dict)
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
reply_to: Optional[str] = None
priority: int = MessagePriority.NORMAL.value
ttl_ms: Optional[int] = 3600000 # 1 hour default
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return asdict(self)
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AgentMessage":
"""Create AgentMessage from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
@classmethod
def from_json(cls, json_str: str) -> "AgentMessage":
"""Create AgentMessage from JSON string."""
return cls.from_dict(json.loads(json_str))
@dataclass class MessageBusConfig: """Configuration for MessageBus connection.""" host: str = "localhost" port: int = 5672 username: str = "coditect" password: str = "coditect_dev_2026" vhost: str = "coditect" heartbeat: int = 60 connection_timeout: int = 30 prefetch_count: int = 10 use_ssl: bool = False
# Exchange names
task_exchange: str = "agent.tasks"
broadcast_exchange: str = "agent.broadcasts"
response_exchange: str = "agent.responses"
event_exchange: str = "agent.events"
# Queue settings
queue_max_priority: int = 10
queue_message_ttl: int = 3600000 # 1 hour
@property
def url(self) -> str:
"""Generate AMQP URL from config."""
protocol = "amqps" if self.use_ssl else "amqp"
return f"{protocol}://{self.username}:{self.password}@{self.host}:{self.port}/{self.vhost}"
@classmethod
def from_env(cls) -> "MessageBusConfig":
"""Create config from environment variables."""
return cls(
host=os.getenv("RABBITMQ_HOST", "localhost"),
port=int(os.getenv("RABBITMQ_PORT", "5672")),
username=os.getenv("RABBITMQ_USER", "coditect"),
password=os.getenv("RABBITMQ_PASS", "coditect_dev_2026"),
vhost=os.getenv("RABBITMQ_VHOST", "coditect"),
use_ssl=os.getenv("RABBITMQ_SSL", "false").lower() == "true",
)
@classmethod
def from_file(cls, path: Union[str, Path]) -> "MessageBusConfig":
"""Load config from JSON file."""
with open(path) as f:
data = json.load(f)
return cls(**data)
class LocalMessageQueue: """ Local in-memory message queue for development without RabbitMQ.
This provides the same interface as the RabbitMQ-backed MessageBus
but uses Python queues for local testing.
"""
def __init__(self):
self._queues: Dict[str, queue.Queue] = {}
self._subscribers: Dict[str, List[Callable]] = {}
self._running = False
self._processor_thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
logger.info("Using local in-memory message queue (no RabbitMQ)")
def get_queue(self, queue_name: str) -> queue.Queue:
"""Get or create a queue by name."""
with self._lock:
if queue_name not in self._queues:
self._queues[queue_name] = queue.Queue()
return self._queues[queue_name]
def publish(self, queue_name: str, message: AgentMessage):
"""Publish message to a queue."""
q = self.get_queue(queue_name)
q.put(message)
logger.debug(f"Published message {message.id} to {queue_name}")
def subscribe(self, queue_name: str, callback: Callable[[AgentMessage], None]):
"""Subscribe to messages on a queue."""
with self._lock:
if queue_name not in self._subscribers:
self._subscribers[queue_name] = []
self._subscribers[queue_name].append(callback)
logger.debug(f"Subscribed to {queue_name}")
def start_processing(self):
"""Start background message processing."""
self._running = True
self._processor_thread = threading.Thread(target=self._process_messages, daemon=True)
self._processor_thread.start()
def stop_processing(self):
"""Stop background message processing."""
self._running = False
if self._processor_thread:
self._processor_thread.join(timeout=5)
def _process_messages(self):
"""Background thread to process messages."""
while self._running:
for queue_name, q in list(self._queues.items()):
try:
message = q.get_nowait()
callbacks = self._subscribers.get(queue_name, [])
for callback in callbacks:
try:
callback(message)
except Exception as e:
logger.error(f"Callback error: {e}")
except queue.Empty:
pass
threading.Event().wait(0.1) # Small sleep to prevent busy loop
class MessageBus: """ RabbitMQ-based message bus for inter-agent communication.
This class provides a high-level interface for:
- Sending tasks between agents
- Broadcasting events to all agents
- Request-response patterns with correlation IDs
- Message subscription with callbacks
Falls back to local in-memory queue if RabbitMQ is unavailable.
"""
def __init__(self, config: Optional[MessageBusConfig] = None):
"""
Initialize MessageBus.
Args:
config: MessageBusConfig instance, or None to use env vars
"""
self.config = config or MessageBusConfig.from_env()
self._connection: Optional["AbstractRobustConnection"] = None
self._channel: Optional["AbstractChannel"] = None
self._exchanges: Dict[str, Any] = {}
self._queues: Dict[str, Any] = {}
self._local_queue: Optional[LocalMessageQueue] = None
self._connected = False
self._pending_responses: Dict[str, asyncio.Future] = {}
self._subscribers: Dict[str, Callable] = {}
@property
def is_connected(self) -> bool:
"""Check if connected to message bus."""
return self._connected
@property
def is_local_mode(self) -> bool:
"""Check if using local fallback mode."""
return self._local_queue is not None
async def connect(self, use_local_fallback: bool = True) -> bool:
"""
Establish connection to RabbitMQ.
Args:
use_local_fallback: If True, use local queue when RabbitMQ unavailable
Returns:
True if connected successfully
"""
if not AIO_PIKA_AVAILABLE:
if use_local_fallback:
self._local_queue = LocalMessageQueue()
self._local_queue.start_processing()
self._connected = True
logger.info("Connected to local message queue (aio_pika not installed)")
return True
else:
raise ImportError("aio_pika package is required. Install with: pip install aio-pika")
try:
logger.info(f"Connecting to RabbitMQ at {self.config.host}:{self.config.port}")
self._connection = await aio_pika.connect_robust(
self.config.url,
timeout=self.config.connection_timeout,
)
self._channel = await self._connection.channel()
await self._channel.set_qos(prefetch_count=self.config.prefetch_count)
# Declare exchanges
self._exchanges["tasks"] = await self._channel.declare_exchange(
self.config.task_exchange,
ExchangeType.TOPIC,
durable=True,
)
self._exchanges["broadcasts"] = await self._channel.declare_exchange(
self.config.broadcast_exchange,
ExchangeType.FANOUT,
durable=True,
)
self._exchanges["responses"] = await self._channel.declare_exchange(
self.config.response_exchange,
ExchangeType.DIRECT,
durable=True,
)
self._exchanges["events"] = await self._channel.declare_exchange(
self.config.event_exchange,
ExchangeType.TOPIC,
durable=True,
)
self._connected = True
logger.info("Successfully connected to RabbitMQ")
return True
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
if use_local_fallback:
logger.info("Falling back to local message queue")
self._local_queue = LocalMessageQueue()
self._local_queue.start_processing()
self._connected = True
return True
raise
async def disconnect(self):
"""Close connection to message bus."""
if self._local_queue:
self._local_queue.stop_processing()
self._local_queue = None
if self._connection and not self._connection.is_closed:
await self._connection.close()
self._connection = None
self._channel = None
self._connected = False
self._exchanges.clear()
self._queues.clear()
logger.info("Disconnected from message bus")
async def send_task(
self,
from_agent: str,
to_agent: str,
task_id: str,
payload: Dict[str, Any],
priority: int = MessagePriority.NORMAL.value,
ttl_ms: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""
Send a task from one agent to another.
Args:
from_agent: Sender agent ID
to_agent: Recipient agent ID
task_id: Task identifier
payload: Task data
priority: Message priority (1-10)
ttl_ms: Time-to-live in milliseconds
metadata: Additional metadata
Returns:
Correlation ID for tracking the response
"""
if not self._connected:
raise RuntimeError("Not connected to message bus. Call connect() first.")
message = AgentMessage(
from_agent=from_agent,
to_agent=to_agent,
task_id=task_id,
message_type=MessageType.TASK_REQUEST.value,
payload=payload,
reply_to=f"agent.{from_agent}.responses",
priority=priority,
ttl_ms=ttl_ms or self.config.queue_message_ttl,
metadata=metadata or {},
)
await self._publish_message(
exchange_name="tasks",
routing_key=f"agent.{to_agent}",
message=message,
)
logger.info(f"Sent task {task_id} from {from_agent} to {to_agent} (correlation: {message.correlation_id})")
return message.correlation_id
async def send_response(
self,
original_message: AgentMessage,
payload: Dict[str, Any],
success: bool = True,
):
"""
Send a response back to the requesting agent.
Args:
original_message: The original task request message
payload: Response data
success: Whether the task succeeded
"""
if not self._connected:
raise RuntimeError("Not connected to message bus. Call connect() first.")
message_type = MessageType.TASK_RESPONSE.value if success else MessageType.ERROR.value
response = AgentMessage(
from_agent=original_message.to_agent,
to_agent=original_message.from_agent,
task_id=original_message.task_id,
message_type=message_type,
payload=payload,
correlation_id=original_message.correlation_id,
metadata={"original_message_id": original_message.id},
)
if original_message.reply_to:
await self._publish_message(
exchange_name="responses",
routing_key=original_message.reply_to,
message=response,
)
logger.info(f"Sent response for task {original_message.task_id} (success: {success})")
async def broadcast_event(
self,
from_agent: str,
event_type: str,
payload: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None,
):
"""
Broadcast an event to all agents.
Args:
from_agent: Sender agent ID
event_type: Type of event
payload: Event data
metadata: Additional metadata
"""
if not self._connected:
raise RuntimeError("Not connected to message bus. Call connect() first.")
message = AgentMessage(
from_agent=from_agent,
to_agent="*",
task_id="",
message_type=MessageType.BROADCAST.value,
payload={"event_type": event_type, **payload},
metadata=metadata or {},
)
await self._publish_message(
exchange_name="broadcasts",
routing_key="",
message=message,
)
logger.info(f"Broadcast event '{event_type}' from {from_agent}")
async def publish_event(
self,
from_agent: str,
event_type: str,
payload: Dict[str, Any],
routing_key: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
):
"""
Publish an event to the event exchange with optional routing.
Args:
from_agent: Sender agent ID
event_type: Type of event
payload: Event data
routing_key: Optional routing key for topic-based routing
metadata: Additional metadata
"""
if not self._connected:
raise RuntimeError("Not connected to message bus. Call connect() first.")
message = AgentMessage(
from_agent=from_agent,
to_agent="",
task_id="",
message_type=MessageType.EVENT.value,
payload={"event_type": event_type, **payload},
metadata=metadata or {},
)
await self._publish_message(
exchange_name="events",
routing_key=routing_key or f"event.{event_type}",
message=message,
)
logger.debug(f"Published event '{event_type}' from {from_agent}")
async def subscribe(
self,
agent_id: str,
callback: Callable[[AgentMessage], Any],
include_broadcasts: bool = True,
):
"""
Subscribe to messages for this agent.
Args:
agent_id: Agent identifier
callback: Async callback function to handle messages
include_broadcasts: Whether to receive broadcast messages
"""
if not self._connected:
raise RuntimeError("Not connected to message bus. Call connect() first.")
queue_name = f"agent.{agent_id}.tasks"
if self._local_queue:
# Local mode
def sync_callback(msg: AgentMessage):
asyncio.create_task(callback(msg))
self._local_queue.subscribe(queue_name, sync_callback)
if include_broadcasts:
self._local_queue.subscribe("broadcasts", sync_callback)
logger.info(f"Subscribed {agent_id} to local queue")
return
# RabbitMQ mode
queue = await self._channel.declare_queue(
queue_name,
durable=True,
arguments={
"x-max-priority": self.config.queue_max_priority,
"x-message-ttl": self.config.queue_message_ttl,
},
)
# Bind to task exchange
await queue.bind(self._exchanges["tasks"], routing_key=f"agent.{agent_id}")
await queue.bind(self._exchanges["tasks"], routing_key=f"agent.{agent_id}.#")
# Bind to broadcast exchange if requested
if include_broadcasts:
await queue.bind(self._exchanges["broadcasts"])
self._queues[queue_name] = queue
self._subscribers[agent_id] = callback
# Start consuming
async def message_handler(message: aio_pika.IncomingMessage):
async with message.process():
try:
agent_message = AgentMessage.from_json(message.body.decode())
await callback(agent_message)
except Exception as e:
logger.error(f"Error processing message: {e}")
await queue.consume(message_handler)
logger.info(f"Subscribed {agent_id} to RabbitMQ queue {queue_name}")
async def subscribe_to_responses(
self,
agent_id: str,
callback: Callable[[AgentMessage], Any],
):
"""
Subscribe to response messages for this agent.
Args:
agent_id: Agent identifier
callback: Async callback function to handle responses
"""
if not self._connected:
raise RuntimeError("Not connected to message bus. Call connect() first.")
queue_name = f"agent.{agent_id}.responses"
if self._local_queue:
def sync_callback(msg: AgentMessage):
asyncio.create_task(callback(msg))
self._local_queue.subscribe(queue_name, sync_callback)
logger.info(f"Subscribed {agent_id} to local response queue")
return
queue = await self._channel.declare_queue(
queue_name,
durable=True,
)
await queue.bind(self._exchanges["responses"], routing_key=queue_name)
self._queues[queue_name] = queue
async def response_handler(message: aio_pika.IncomingMessage):
async with message.process():
try:
agent_message = AgentMessage.from_json(message.body.decode())
# Check if there's a pending future for this correlation ID
if agent_message.correlation_id in self._pending_responses:
future = self._pending_responses.pop(agent_message.correlation_id)
if not future.done():
future.set_result(agent_message)
await callback(agent_message)
except Exception as e:
logger.error(f"Error processing response: {e}")
await queue.consume(response_handler)
logger.info(f"Subscribed {agent_id} to RabbitMQ response queue {queue_name}")
async def wait_for_response(
self,
correlation_id: str,
timeout: float = 300.0,
) -> AgentMessage:
"""
Wait for a response with a specific correlation ID.
Args:
correlation_id: The correlation ID to wait for
timeout: Maximum time to wait in seconds
Returns:
The response AgentMessage
Raises:
asyncio.TimeoutError: If no response received within timeout
"""
future = asyncio.get_event_loop().create_future()
self._pending_responses[correlation_id] = future
try:
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
self._pending_responses.pop(correlation_id, None)
raise asyncio.TimeoutError(f"No response received for correlation_id: {correlation_id}")
async def _publish_message(
self,
exchange_name: str,
routing_key: str,
message: AgentMessage,
):
"""Internal method to publish a message to an exchange."""
if self._local_queue:
# Local mode - publish to queue by routing key
queue_name = routing_key if routing_key else exchange_name
self._local_queue.publish(queue_name, message)
return
exchange = self._exchanges.get(exchange_name)
if not exchange:
raise ValueError(f"Unknown exchange: {exchange_name}")
amqp_message = Message(
body=message.to_json().encode(),
priority=message.priority,
correlation_id=message.correlation_id,
reply_to=message.reply_to,
delivery_mode=DeliveryMode.PERSISTENT,
expiration=str(message.ttl_ms) if message.ttl_ms else None,
headers=message.metadata,
)
await exchange.publish(amqp_message, routing_key=routing_key)
async def get_queue_stats(self) -> Dict[str, Dict[str, int]]:
"""Get statistics for all declared queues."""
stats = {}
if self._local_queue:
for name, q in self._local_queue._queues.items():
stats[name] = {
"message_count": q.qsize(),
"consumer_count": len(self._local_queue._subscribers.get(name, [])),
}
return stats
for name, q in self._queues.items():
try:
declaration = await self._channel.declare_queue(name, passive=True)
stats[name] = {
"message_count": declaration.declaration_result.message_count,
"consumer_count": declaration.declaration_result.consumer_count,
}
except Exception as e:
logger.warning(f"Could not get stats for queue {name}: {e}")
return stats
Convenience functions for common operations
async def create_message_bus(config: Optional[MessageBusConfig] = None) -> MessageBus: """Create and connect a MessageBus instance.""" bus = MessageBus(config) await bus.connect() return bus
def create_task_message( from_agent: str, to_agent: str, task_id: str, payload: Dict[str, Any], priority: int = MessagePriority.NORMAL.value, ) -> AgentMessage: """Create a task request message.""" return AgentMessage( from_agent=from_agent, to_agent=to_agent, task_id=task_id, message_type=MessageType.TASK_REQUEST.value, payload=payload, priority=priority, reply_to=f"agent.{from_agent}.responses", )
CLI for testing
if name == "main": import argparse import sys
parser = argparse.ArgumentParser(description="CODITECT Message Bus CLI")
parser.add_argument("command", choices=["test", "stats", "send", "listen"],
help="Command to run")
parser.add_argument("--agent-id", default="test-agent",
help="Agent ID for subscribe/send operations")
parser.add_argument("--to-agent", help="Target agent for send command")
parser.add_argument("--message", default="Hello", help="Message payload for send")
parser.add_argument("--local", action="store_true",
help="Use local queue instead of RabbitMQ")
parser.add_argument("--host", default="localhost", help="RabbitMQ host")
parser.add_argument("--port", type=int, default=5672, help="RabbitMQ port")
args = parser.parse_args()
async def main():
config = MessageBusConfig(host=args.host, port=args.port)
bus = MessageBus(config)
try:
connected = await bus.connect(use_local_fallback=args.local)
if not connected:
print("Failed to connect to message bus")
sys.exit(1)
print(f"Connected to message bus (local mode: {bus.is_local_mode})")
if args.command == "test":
# Send a test message
correlation_id = await bus.send_task(
from_agent=args.agent_id,
to_agent=args.to_agent or "echo-agent",
task_id=f"test-{uuid.uuid4().hex[:8]}",
payload={"message": args.message, "timestamp": datetime.utcnow().isoformat()},
)
print(f"Sent test message with correlation_id: {correlation_id}")
elif args.command == "stats":
stats = await bus.get_queue_stats()
print("Queue Statistics:")
for queue_name, queue_stats in stats.items():
print(f" {queue_name}: {queue_stats}")
elif args.command == "send":
if not args.to_agent:
print("Error: --to-agent required for send command")
sys.exit(1)
correlation_id = await bus.send_task(
from_agent=args.agent_id,
to_agent=args.to_agent,
task_id=f"cli-{uuid.uuid4().hex[:8]}",
payload={"message": args.message},
)
print(f"Sent message to {args.to_agent}, correlation_id: {correlation_id}")
elif args.command == "listen":
print(f"Listening for messages as {args.agent_id}...")
async def handler(message: AgentMessage):
print(f"\n[{message.timestamp}] Received message:")
print(f" From: {message.from_agent}")
print(f" Type: {message.message_type}")
print(f" Task: {message.task_id}")
print(f" Payload: {json.dumps(message.payload, indent=2)}")
await bus.subscribe(args.agent_id, handler)
# Keep running
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
await bus.disconnect()
asyncio.run(main())