ADR-003: Multi-Agent LLM Orchestration Pattern
Date: 2026-01-19
Status: Proposed
Decision Makers: System Architect, ML Engineer
Related: ADR-001 (Vision Model), SDD Section 2.2.3
Context
The synthesis phase must correlate audio tranH.P.004-SCRIPTS with visual frame analysis to generate unified insights. This requires:
- Topic Identification: Segment transcript into coherent topics
- Key Moment Extraction: Identify important visual content
- Cross-Modal Correlation: Match audio context with visual frames
- Insight Generation: Synthesize findings into structured output
Complexity Factors
- Data Volume: 100-200 frames + 500-2000 transcript segments
- Context Window: Need to process large amounts of multimodal data
- Reasoning Depth: Requires multi-step analysis and synthesis
- Token Economics: Long H.P.007-PROMPTS with vision + text = expensive
Decision Drivers
- Quality: Need sophisticated reasoning to correlate modalities
- Cost: Single monolithic prompt would exceed 100K tokens
- Modularity: Different H.P.001-AGENTS can specialize in different tasks
- Parallelization: Independent tasks should run concurrently
- Error Isolation: Failure in one stage shouldn't fail entire pipeline
Options Considered
Option 1: Monolithic Single-Agent Approach
Approach: One large LLM call with all data
prompt = f"""
Analyze this video:
- Transcript: {full_transcript} # 50K tokens
- Frame analyses: {all_frames} # 30K tokens
- Generate: topics, insights, timeline
"""
Pros:
- Simple implementation
- Holistic reasoning across all data
- Single API call
Cons:
- Token limit: Easily exceeds 100K context window
- Cost: $0.30+ per video in a single call
- Latency: 30-60s response time
- Error handling: All-or-nothing failure
- No parallelization: Sequential only
Token Analysis:
monolithic_tokens = {
'transcript': 50000, # 1 word ≈ 1.3 tokens
'frame_descriptions': 30000, # 150 frames × 200 tokens
'instructions': 2000,
'total_input': 82000,
'estimated_output': 10000,
'total': 92000,
'cost': 92000 * 0.000003 # $0.28 per video
}
Verdict: ❌ Inefficient, expensive, fragile
Option 2: Sequential Pipeline (Chain)
Approach: Sequential stages, each producing input for next
# Stage 1: Topic identification
topics = await identify_topics(transcript)
# Stage 2: Visual analysis (depends on topics)
visual_context = await analyze_visuals(frames, topics)
# Stage 3: Correlation (depends on both)
correlations = await correlate(transcript, frames, topics)
# Stage 4: Synthesis (depends on all)
insights = await synthesize(topics, visual_context, correlations)
Pros:
- Modular: Clear separation of concerns
- Token efficiency: Smaller H.P.007-PROMPTS per stage
- Debuggable: Inspect intermediate outputs
- Incremental: Can checkpoint between stages
Cons:
- Sequential latency: 4 stages × 5s = 20s minimum
- No parallelization: Each stage blocks next
- Error propagation: Early errors cascade through pipeline
- Rigid: Fixed execution order
Token Analysis:
sequential_tokens = {
'stage_1_input': 50000, # Full transcript
'stage_1_output': 2000, # Topics
'stage_2_input': 32000, # Frames + topics
'stage_2_output': 5000, # Visual analysis
'stage_3_input': 10000, # Correlations
'stage_3_output': 3000,
'stage_4_input': 15000, # Synthesis
'stage_4_output': 5000,
'total_input': 107000,
'total_output': 15000,
'total_cost': (107000 * 0.000003) + (15000 * 0.000015) # $0.55
}
Verdict: ⚠️ Better, but sequential bottleneck
Option 3: Parallel Multi-Agent with Orchestrator (Selected)
Approach: Specialized H.P.001-AGENTS run in parallel, orchestrator coordinates
# LangGraph state machine
class AnalysisState(TypedDict):
transcript: List[Segment]
frames: Dict[str, FrameAnalysis]
topics: List[Topic] # Written by topic_agent
key_moments: List[Moment] # Written by moment_agent
correlations: List[Corr] # Written by correlation_agent
insights: List[Insight] # Written by synthesis_agent
workflow = StateGraph(AnalysisState)
# Independent H.P.001-AGENTS (can run in parallel)
workflow.add_node("identify_topics", topic_agent)
workflow.add_node("extract_moments", moment_agent)
# Dependent H.P.001-AGENTS
workflow.add_node("correlate", correlation_agent)
workflow.add_node("synthesize", synthesis_agent)
# Parallel execution where possible
workflow.add_edge(START, "identify_topics")
workflow.add_edge(START, "extract_moments")
workflow.add_edge(["identify_topics", "extract_moments"], "correlate")
workflow.add_edge("correlate", "synthesize")
workflow.add_edge("synthesize", END)
Pros:
- Parallelization: Independent H.P.001-AGENTS run concurrently (2x speedup)
- Specialization: Each agent optimized for specific task
- Token efficiency: Smaller, focused H.P.007-PROMPTS
- Error isolation: One agent failure doesn't cascade
- Observability: Track each agent's performance
- Flexibility: Easy to add/remove H.P.001-AGENTS
Cons:
- Complexity: More code to maintain (LangGraph state management)
- Coordination overhead: State management adds complexity
- Multiple API calls: More calls but smaller H.P.007-PROMPTS
- Learning curve: Team must understand LangGraph
Token Analysis:
parallel_tokens = {
# Parallel stage
'topic_agent_input': 50000, # Full transcript
'topic_agent_output': 2000,
'moment_agent_input': 30000, # Frame analyses
'moment_agent_output': 3000,
# Sequential stages
'correlation_agent_input': 15000,
'correlation_agent_output': 4000,
'synthesis_agent_input': 20000,
'synthesis_agent_output': 5000,
'total_input': 115000,
'total_output': 14000,
'total_cost': (115000 * 0.000003) + (14000 * 0.000015), # $0.56
'latency_reduction': '40% vs sequential' # Parallel execution
}
Verdict: ✅ SELECTED - Best balance of quality, cost, and performance
Option 4: Dynamic Agent Routing (Future)
Approach: Intelligent orchestrator decides which H.P.001-AGENTS to invoke
class SmartOrchestrator:
def route_task(self, state: State) -> List[Agent]:
# Analyze state and determine required H.P.001-AGENTS
if state.video_type == 'lecture':
return [topic_agent, slide_agent, synthesis_agent]
elif state.video_type == 'interview':
return [speaker_agent, quote_agent, synthesis_agent]
Pros:
- Adaptive: Adjusts to video type
- Cost optimization: Only runs necessary H.P.001-AGENTS
- Intelligence: Learning which H.P.001-AGENTS work for which content
Cons:
- High complexity: Requires meta-reasoning
- Unpredictable costs: Variable agent counts
- Harder to test: Non-deterministic behavior
Verdict: 🔮 Future consideration (6-12 months)
Decision
Parallel Multi-Agent Orchestration with LangGraph
Architecture
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class AnalysisState(TypedDict):
# Inputs
video_metadata: VideoMetadata
audio_segments: List[AudioSegment]
frame_analyses: Dict[str, FrameAnalysis]
# Outputs (accumulated via operator.add)
topics: Annotated[List[TopicSegment], operator.add]
insights: Annotated[List[SynthesizedInsight], operator.add]
# Control
current_task: str
errors: Annotated[List[str], operator.add]
class SynthesisOrchestrator:
def __init__(self, H.P.009-CONFIG: PipelineConfig):
self.graph = self._build_graph()
def _build_graph(self) -> StateGraph:
workflow = StateGraph(AnalysisState)
# Agent nodes
workflow.add_node("identify_topics", self._identify_topics)
workflow.add_node("extract_key_moments", self._extract_key_moments)
workflow.add_node("correlate_modalities", self._correlate_modalities)
workflow.add_node("generate_insights", self._generate_insights)
# Execution graph
workflow.set_entry_point("identify_topics")
workflow.set_entry_point("extract_key_moments") # Parallel
workflow.add_edge("identify_topics", "correlate_modalities")
workflow.add_edge("extract_key_moments", "correlate_modalities")
workflow.add_edge("correlate_modalities", "generate_insights")
workflow.add_edge("generate_insights", END)
return workflow.compile()
async def synthesize(self, ...) -> Tuple[Topics, Insights]:
initial_state = {...}
final_state = await self.graph.ainvoke(initial_state)
return final_state["topics"], final_state["insights"]
Agent Responsibilities
Agent 1: Topic Identifier
Input: Full transcript
Output: 5-10 topic segments with timestamps
Specialization: Semantic segmentation, topic modeling
Token Budget: ~52K input, ~2K output
Agent 2: Key Moment Extractor
Input: Frame analyses (presentation frames only)
Output: Significant moments with importance scores
Specialization: Visual content understanding
Token Budget: ~32K input, ~3K output
Agent 3: Multimodal Correlator
Input: Topics + key moments + temporal overlaps
Output: Audio-visual correlations
Specialization: Cross-modal reasoning
Token Budget: ~15K input, ~4K output
Agent 4: Insight Synthesizer
Input: All previous outputs
Output: Final structured insights
Specialization: Synthesis, summarization
Token Budget: ~20K input, ~5K output
Cost Comparison
# Per 60-minute video
cost_comparison = {
'monolithic': {
'tokens': 92000,
'cost': 0.28,
'latency_seconds': 45,
'quality_score': 7 # Lower due to attention dilution
},
'sequential': {
'tokens': 122000,
'cost': 0.55,
'latency_seconds': 20,
'quality_score': 9
},
'parallel_multi_agent': {
'tokens': 129000, # Slightly more due to overlap
'cost': 0.56,
'latency_seconds': 12, # 40% faster
'quality_score': 9.5, # Best due to specialization
}
}
Consequences
Positive
- Performance: 40% latency reduction via parallelization
- Quality: Specialized H.P.001-AGENTS perform better on focused tasks
- Maintainability: Clear separation of concerns
- Observability: Can monitor each agent independently
- Error Handling: Isolated failures, graceful degradation
- Flexibility: Easy to add/remove/modify H.P.001-AGENTS
Negative
- Complexity: LangGraph adds learning curve and mental overhead
- State Management: Must carefully manage shared state
- Debugging: Parallel execution harder to debug
- Dependencies: Adds LangGraph as a critical dependency
- Cost: Slightly higher token usage due to some redundancy
Mitigation Strategies
# Error handling with circuit breakers
class ResilientAgent:
def __init__(self):
self.circuit_breaker = CircuitBreaker(threshold=3)
async def execute(self, state):
try:
return await self.circuit_breaker.call(self._core_logic, state)
except CircuitBreakerOpen:
logger.error("Agent circuit open, returning partial results")
return state # No-op, allow pipeline to continue
# Checkpointing for long-running H.P.006-WORKFLOWS
async def synthesis_with_checkpointing(state, checkpoint_store):
checkpointer = Checkpointer(checkpoint_store)
async for event in graph.astream(state, checkpointer=checkpointer):
if event['type'] == 'checkpoint':
await checkpointer.save(event['state'])
Observability Implementation
from prometheus_client import Histogram, Counter
agent_latency = Histogram(
'agent_execution_seconds',
'Time spent in agent execution',
['agent_name']
)
agent_errors = Counter(
'agent_errors_total',
'Number of agent errors',
['agent_name', 'error_type']
)
@agent_latency.labels(agent_name='topic_identifier').time()
async def _identify_topics(self, state):
try:
# Agent logic
pass
except Exception as e:
agent_errors.labels(
agent_name='topic_identifier',
error_type=type(e).__name__
).inc()
raise
Validation Metrics
Track these metrics to validate the decision:
metrics = {
'performance': {
'total_synthesis_time': Histogram(),
'agent_execution_times': Dict[str, Histogram],
'parallelization_speedup': Gauge() # vs sequential
},
'quality': {
'topic_accuracy': float, # Human evaluation
'correlation_quality': float,
'insight_relevance': float
},
'cost': {
'tokens_per_agent': Dict[str, Counter],
'cost_per_video': Histogram(),
'cost_vs_sequential': float
},
'reliability': {
'agent_success_rate': Dict[str, float],
'circuit_breaker_trips': Counter(),
'partial_completion_rate': float
}
}
Alternative Approaches
LangChain LCEL (LangChain Expression Language)
# Alternative implementation without LangGraph
from langchain.schema.runnable import RunnableParallel, RunnableLambda
analysis_chain = RunnableParallel({
'topics': identify_topics_chain,
'moments': extract_moments_chain
}) | RunnableLambda(lambda x: correlate(x['topics'], x['moments'])) | synthesize_chain
Why not selected: Less flexible for complex state management, harder to add checkpointing
CrewAI Framework
# Alternative: CrewAI for agent orchestration
from crewai import Agent, Task, Crew
topic_agent = Agent(role='Topic Analyzer', ...)
moment_agent = Agent(role='Moment Extractor', ...)
crew = Crew(H.P.001-AGENTS=[topic_agent, moment_agent], tasks=[...])
result = crew.kickoff()
Why not selected: Less control over execution order, newer/less proven
Review Schedule
- 1 month: Evaluate parallelization gains and quality metrics
- 3 months: Assess agent performance, consider specialization improvements
- 6 months: Review LangGraph vs alternatives, consider dynamic routing
References
- LangGraph Documentation
- Multi-Agent Systems in LLMs
- "ReAct: Synergizing Reasoning and Acting in Language Models"
- Internal: Agent Performance Analysis Dashboard