CODITECT Message Bus - RabbitMQ-based Inter-Agent Communication
Part of Track H.2: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications
This module provides:
- AgentMessage: Structured message format for inter-agent communication
- MessageBus: RabbitMQ client for sending/receiving messages
- MessageBusConfig: Configuration management
- Local fallback for development without RabbitMQ
Usage: # Async usage from scripts.core.message_bus import MessageBus, AgentMessage
bus = MessageBus()
await bus.connect()
# Send task to another agent
correlation_id = await bus.send_task(
from_agent="orchestrator",
to_agent="code-review-agent",
task_id="task-123",
payload={"code": "def foo(): pass"}
)
# Subscribe to messages
async def handler(message: AgentMessage):
print(f"Received: {message}")
await bus.subscribe("my-agent", handler)
Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0
File: message_bus.py
Classes
MessageType
Types of messages that can be sent between agents.
MessagePriority
Message priority levels (1-10, higher = more urgent).
AgentMessage
Structured message format for inter-agent communication.
MessageBusConfig
Configuration for MessageBus connection.
LocalMessageQueue
Local in-memory message queue for development without RabbitMQ.
MessageBus
RabbitMQ-based message bus for inter-agent communication.
Functions
create_task_message(from_agent, to_agent, task_id, payload, priority)
Create a task request message.
to_dict()
Convert to dictionary for JSON serialization.
to_json()
Convert to JSON string.
from_dict(cls, data)
Create AgentMessage from dictionary.
from_json(cls, json_str)
Create AgentMessage from JSON string.
url()
Generate AMQP URL from config.
from_env(cls)
Create config from environment variables.
from_file(cls, path)
Load config from JSON file.
get_queue(queue_name)
Get or create a queue by name.
publish(queue_name, message)
Publish message to a queue.
subscribe(queue_name, callback)
Subscribe to messages on a queue.
start_processing()
Start background message processing.
stop_processing()
Stop background message processing.
is_connected()
Check if connected to message bus.
is_local_mode()
Check if using local fallback mode.
Usage
python message_bus.py