Skip to main content

Multi-Agent Orchestration Patterns

Designing Collaborative AI Systems

Document ID: A4-MULTI-AGENT-PATTERNS
Version: 1.0
Category: P1 - Implementation Guides


Topology Overview

TopologyBest ForCoordinationComplexity
HierarchicalStructured workflowsTop-downMedium
DistributedCreative tasks, debatePeer-to-peerHigh
PipelineSequential processingLinear handoffLow
HybridComplex enterpriseMixedHigh

Hierarchical Topology

                    ┌─────────────────┐
│ ORCHESTRATOR │
└────────┬────────┘
┌─────────────────┼─────────────────┐
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ RESEARCH │ │ ANALYSIS │ │ EXECUTION │
└─────────────┘ └─────────────┘ └─────────────┘

Implementation Pattern

class HierarchicalOrchestrator:
async def execute_task(self, goal):
# 1. Decompose goal into subtasks
subtasks = await self._decompose_goal(goal)

# 2. Assign subtasks to agents
assignments = await self._assign_tasks(subtasks)

# 3. Execute with dependency coordination
results = await self._execute_with_coordination(assignments)

# 4. Synthesize results
return await self._synthesize_results(goal, results)

Distributed Topology (Debate)

    ┌─────────┐     ┌─────────┐     ┌─────────┐
│ Agent 1 │◄───►│ Agent 2 │◄───►│ Agent 3 │
└────┬────┘ └────┬────┘ └────┬────┘
└───────────────┴───────────────┘

┌───────────▼───────────┐
│ CONSENSUS MODULE │
└───────────────────────┘

Debate Flow

class DistributedDebate:
async def debate(self, question):
for round in range(max_rounds):
# Generate/revise positions
positions = await self._generate_positions(question)

# Check convergence
if self._calculate_convergence(positions) >= threshold:
break

# Generate critiques
critiques = await self._generate_critiques(positions)

return await self._build_consensus(question, positions)

Pipeline Topology

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│ INTAKE │───►│ PROCESS │───►│ VALIDATE │───►│ OUTPUT │
└──────────┘ └──────────┘ └──────────┘ └──────────┘

Stage Definition

@dataclass
class PipelineStage:
name: str
agent_config: AgentConfig
input_schema: Dict
output_schema: Dict
retry_on_failure: bool = True

Hybrid Topology

class HybridOrchestrator:
routing_rules = {
"research": "hierarchical",
"creative": "distributed",
"workflow": "pipeline"
}

async def execute(self, task, task_type=None):
if not task_type:
task_type = await self._classify_task(task)

topology = self.routing_rules.get(task_type)

if topology == "hierarchical":
return await self.hierarchical.execute_task(task)
elif topology == "distributed":
return await self.distributed.debate(task)
elif topology == "pipeline":
return await self.pipeline.process({"task": task})

Coordination Mechanisms

Message Passing

class MessageBus:
async def send(self, to, message)
async def broadcast(self, topic, message)
async def receive(self, agent_id, timeout)
def subscribe(self, agent_id, topic)

Shared Memory

class SharedMemory:
async def get(self, key)
async def set(self, key, value) -> version
async def compare_and_set(self, key, expected_version, value)

Consensus Protocol

class ConsensusProtocol:
async def propose(self, proposal, collect_votes):
votes = {}
for agent_id in self.agents:
votes[agent_id] = await collect_votes(agent_id, proposal)

approvals = sum(1 for v in votes.values() if v.approved)
return approvals >= self.quorum_size, votes

Failure Handling

Circuit Breaker

class AgentCircuitBreaker:
states: closed | open | half_open

def record_failure(self, agent_id):
self.failures[agent_id] += 1
if self.failures[agent_id] >= threshold:
self.state[agent_id] = "open"

def can_execute(self, agent_id):
if state == "open":
if time_since_failure > recovery_timeout:
return True # half-open
return False
return True

Graceful Degradation

class GracefulDegradation:
async def execute_with_degradation(self, task_func, available_agents):
# Check required agents present
missing_required = self.required - set(available_agents)
if missing_required:
raise RuntimeError(f"Missing: {missing_required}")

result = await task_func(available_agents)

# Note missing optional capabilities
if self.optional - set(available_agents):
result["degraded"] = True

return result

Topology Selection Guide

ScenarioRecommendedWhy
Structured researchHierarchicalClear decomposition
Creative brainstormingDistributedDiverse perspectives
Document processingPipelineSequential, predictable
Strategic decisionDistributedMultiple viewpoints
Complex enterpriseHybridDifferent subtask needs

Token Economics

TopologyMultiplierTypical Range
Single agent4x1K-10K tokens
Hierarchical10-15x5K-50K tokens
Distributed15-20x10K-100K tokens
Pipeline5-8x3K-30K tokens

Document maintained by CODITECT Platform Team