Skip to main content

CODITECT Message Bus - RabbitMQ-based Inter-Agent Communication

Part of Track H.2: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications

This module provides:

  • AgentMessage: Structured message format for inter-agent communication
  • MessageBus: RabbitMQ client for sending/receiving messages
  • MessageBusConfig: Configuration management
  • Local fallback for development without RabbitMQ

Usage: # Async usage from scripts.core.message_bus import MessageBus, AgentMessage

bus = MessageBus()
await bus.connect()

# Send task to another agent
correlation_id = await bus.send_task(
from_agent="orchestrator",
to_agent="code-review-agent",
task_id="task-123",
payload={"code": "def foo(): pass"}
)

# Subscribe to messages
async def handler(message: AgentMessage):
print(f"Received: {message}")

await bus.subscribe("my-agent", handler)

Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0

File: message_bus.py

Classes

MessageType

Types of messages that can be sent between agents.

MessagePriority

Message priority levels (1-10, higher = more urgent).

AgentMessage

Structured message format for inter-agent communication.

MessageBusConfig

Configuration for MessageBus connection.

LocalMessageQueue

Local in-memory message queue for development without RabbitMQ.

MessageBus

RabbitMQ-based message bus for inter-agent communication.

Functions

create_task_message(from_agent, to_agent, task_id, payload, priority)

Create a task request message.

to_dict()

Convert to dictionary for JSON serialization.

to_json()

Convert to JSON string.

from_dict(cls, data)

Create AgentMessage from dictionary.

from_json(cls, json_str)

Create AgentMessage from JSON string.

url()

Generate AMQP URL from config.

from_env(cls)

Create config from environment variables.

from_file(cls, path)

Load config from JSON file.

get_queue(queue_name)

Get or create a queue by name.

publish(queue_name, message)

Publish message to a queue.

subscribe(queue_name, callback)

Subscribe to messages on a queue.

start_processing()

Start background message processing.

stop_processing()

Stop background message processing.

is_connected()

Check if connected to message bus.

is_local_mode()

Check if using local fallback mode.

Usage

python message_bus.py