Multi-Agent Orchestration Patterns
Designing Collaborative AI Systems
Document ID: A4-MULTI-AGENT-PATTERNS
Version: 1.0
Category: P1 - Implementation Guides
Topology Overview
| Topology | Best For | Coordination | Complexity |
|---|---|---|---|
| Hierarchical | Structured workflows | Top-down | Medium |
| Distributed | Creative tasks, debate | Peer-to-peer | High |
| Pipeline | Sequential processing | Linear handoff | Low |
| Hybrid | Complex enterprise | Mixed | High |
Hierarchical Topology
┌─────────────────┐
│ ORCHESTRATOR │
└────────┬────────┘
┌─────────────────┼─────────────────┐
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ RESEARCH │ │ ANALYSIS │ │ EXECUTION │
└─────────────┘ └─────────────┘ └─────────────┘
Implementation Pattern
class HierarchicalOrchestrator:
async def execute_task(self, goal):
# 1. Decompose goal into subtasks
subtasks = await self._decompose_goal(goal)
# 2. Assign subtasks to agents
assignments = await self._assign_tasks(subtasks)
# 3. Execute with dependency coordination
results = await self._execute_with_coordination(assignments)
# 4. Synthesize results
return await self._synthesize_results(goal, results)
Distributed Topology (Debate)
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent 1 │◄───►│ Agent 2 │◄───►│ Agent 3 │
└────┬────┘ └────┬────┘ └────┬────┘
└───────────────┴───────────────┘
│
┌───────────▼───────────┐
│ CONSENSUS MODULE │
└───────────────────────┘
Debate Flow
class DistributedDebate:
async def debate(self, question):
for round in range(max_rounds):
# Generate/revise positions
positions = await self._generate_positions(question)
# Check convergence
if self._calculate_convergence(positions) >= threshold:
break
# Generate critiques
critiques = await self._generate_critiques(positions)
return await self._build_consensus(question, positions)
Pipeline Topology
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ INTAKE │───►│ PROCESS │───►│ VALIDATE │───►│ OUTPUT │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Stage Definition
@dataclass
class PipelineStage:
name: str
agent_config: AgentConfig
input_schema: Dict
output_schema: Dict
retry_on_failure: bool = True
Hybrid Topology
class HybridOrchestrator:
routing_rules = {
"research": "hierarchical",
"creative": "distributed",
"workflow": "pipeline"
}
async def execute(self, task, task_type=None):
if not task_type:
task_type = await self._classify_task(task)
topology = self.routing_rules.get(task_type)
if topology == "hierarchical":
return await self.hierarchical.execute_task(task)
elif topology == "distributed":
return await self.distributed.debate(task)
elif topology == "pipeline":
return await self.pipeline.process({"task": task})
Coordination Mechanisms
Message Passing
class MessageBus:
async def send(self, to, message)
async def broadcast(self, topic, message)
async def receive(self, agent_id, timeout)
def subscribe(self, agent_id, topic)
Shared Memory
class SharedMemory:
async def get(self, key)
async def set(self, key, value) -> version
async def compare_and_set(self, key, expected_version, value)
Consensus Protocol
class ConsensusProtocol:
async def propose(self, proposal, collect_votes):
votes = {}
for agent_id in self.agents:
votes[agent_id] = await collect_votes(agent_id, proposal)
approvals = sum(1 for v in votes.values() if v.approved)
return approvals >= self.quorum_size, votes
Failure Handling
Circuit Breaker
class AgentCircuitBreaker:
states: closed | open | half_open
def record_failure(self, agent_id):
self.failures[agent_id] += 1
if self.failures[agent_id] >= threshold:
self.state[agent_id] = "open"
def can_execute(self, agent_id):
if state == "open":
if time_since_failure > recovery_timeout:
return True # half-open
return False
return True
Graceful Degradation
class GracefulDegradation:
async def execute_with_degradation(self, task_func, available_agents):
# Check required agents present
missing_required = self.required - set(available_agents)
if missing_required:
raise RuntimeError(f"Missing: {missing_required}")
result = await task_func(available_agents)
# Note missing optional capabilities
if self.optional - set(available_agents):
result["degraded"] = True
return result
Topology Selection Guide
| Scenario | Recommended | Why |
|---|---|---|
| Structured research | Hierarchical | Clear decomposition |
| Creative brainstorming | Distributed | Diverse perspectives |
| Document processing | Pipeline | Sequential, predictable |
| Strategic decision | Distributed | Multiple viewpoints |
| Complex enterprise | Hybrid | Different subtask needs |
Token Economics
| Topology | Multiplier | Typical Range |
|---|---|---|
| Single agent | 4x | 1K-10K tokens |
| Hierarchical | 10-15x | 5K-50K tokens |
| Distributed | 15-20x | 10K-100K tokens |
| Pipeline | 5-8x | 3K-30K tokens |
Document maintained by CODITECT Platform Team