ADR-029: Map-Reduce Agent Orchestrator
Status
PROPOSED
Date
2026-01-15
Context
Sequential document processing (as demonstrated in the "unlimited memory" video) has O(n) time complexity. For a corpus of 1,000 documents at 2 minutes each, sequential processing requires 33+ hours.
Map-Reduce pattern enables parallel processing: if we can run 50 mapper agents simultaneously, the same corpus processes in ~40 minutes (plus reduce overhead).
Requirements
| Requirement | Priority | Rationale |
|---|---|---|
| Process 1,000 docs in <1 hour | P0 | Competitive necessity |
| Automatic agent scaling | P1 | Resource efficiency |
| Partial failure recovery | P0 | Reliability |
| Token budget enforcement | P0 | Cost control |
| Deterministic aggregation | P1 | Reproducibility |
Decision
Implement a Map-Reduce Agent Orchestrator with the following architecture:
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ MAP-REDUCE ORCHESTRATOR │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ COORDINATOR │ │
│ │ - Job scheduling │ │
│ │ - Agent pool management │ │
│ │ - Token budget allocation │ │
│ │ - Checkpoint coordination │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ MAPPER POOL │ │ MAPPER POOL │ │ MAPPER POOL │ │
│ │ Agent 1 │ │ Agent 2 │ │ Agent N │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │
│ │ │ Doc 1 │ │ │ │ Doc 2 │ │ │ │ Doc N │ │ │
│ │ │ Doc N+1 │ │ │ │ Doc N+2 │ │ │ │ Doc 2N │ │ │
│ │ │ ... │ │ │ │ ... │ │ │ │ ... │ │ │
│ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ │
│ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ SHUFFLE/SORT │ │
│ │ - Group mapper outputs by key │ │
│ │ - Deduplication │ │
│ │ - Conflict resolution │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ REDUCER POOL │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Reducer 1 │ │ Reducer 2 │ │ Reducer M │ │ │
│ │ │ (Category A)│ │ (Category B)│ │ (Category M)│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ FINAL SYNTHESIS │ │
│ │ - Combine reducer outputs │ │
│ │ - Generate final report │ │
│ │ - Create audit trail │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Core Components
1. Coordinator
@dataclass
class MapReduceJob:
"""Definition of a map-reduce job"""
job_id: UUID
corpus_id: str
# Map configuration
mapper_prompt: str
mapper_schema: OutputSchema
docs_per_mapper: int = 10
max_parallel_mappers: int = 50
# Reduce configuration
reducer_prompt: str
reducer_schema: OutputSchema
reduction_strategy: Literal["hierarchical", "flat", "category"]
# Budget
total_token_budget: int
mapper_token_budget: int
reducer_token_budget: int
# Checkpointing
checkpoint_interval: int = 10 # Documents
# Metadata
created_at: datetime
status: JobStatus
class MapReduceCoordinator:
"""Orchestrates map-reduce job execution"""
def __init__(
self,
agent_pool: AgentPool,
checkpoint_store: CheckpointStore,
event_bus: EventBus
):
self.agent_pool = agent_pool
self.checkpoint_store = checkpoint_store
self.event_bus = event_bus
self.active_jobs: Dict[UUID, MapReduceJob] = {}
async def submit_job(self, job: MapReduceJob) -> JobHandle:
"""Submit a new map-reduce job"""
# Validate budget
estimated_tokens = self._estimate_tokens(job)
if estimated_tokens > job.total_token_budget:
raise TokenBudgetExceeded(
f"Estimated {estimated_tokens} tokens exceeds budget {job.total_token_budget}"
)
# Create checkpoint
checkpoint = await self.checkpoint_store.create(
job_id=job.job_id,
state=JobState.INITIALIZED
)
# Schedule execution
self.active_jobs[job.job_id] = job
asyncio.create_task(self._execute_job(job))
return JobHandle(job_id=job.job_id, checkpoint_id=checkpoint.id)
async def _execute_job(self, job: MapReduceJob):
"""Execute the map-reduce pipeline"""
try:
# Phase 1: MAP
await self._update_status(job, JobStatus.MAPPING)
map_results = await self._execute_map_phase(job)
# Phase 2: SHUFFLE
await self._update_status(job, JobStatus.SHUFFLING)
shuffled = await self._execute_shuffle_phase(job, map_results)
# Phase 3: REDUCE
await self._update_status(job, JobStatus.REDUCING)
reduce_results = await self._execute_reduce_phase(job, shuffled)
# Phase 4: SYNTHESIZE
await self._update_status(job, JobStatus.SYNTHESIZING)
final_output = await self._execute_synthesis(job, reduce_results)
# Complete
await self._complete_job(job, final_output)
except Exception as e:
await self._handle_failure(job, e)
async def _execute_map_phase(
self,
job: MapReduceJob
) -> List[MapperOutput]:
"""Execute parallel mappers"""
# Get documents
documents = await self._get_documents(job.corpus_id)
# Partition into batches
batches = self._partition(documents, job.docs_per_mapper)
# Acquire mapper agents
mappers = await self.agent_pool.acquire(
role=AgentRole.MAPPER,
count=min(len(batches), job.max_parallel_mappers)
)
# Execute in parallel with semaphore for rate limiting
semaphore = asyncio.Semaphore(job.max_parallel_mappers)
async def map_batch(batch: List[Document], mapper: Agent) -> MapperOutput:
async with semaphore:
return await self._run_mapper(job, batch, mapper)
# Create tasks
tasks = []
for i, batch in enumerate(batches):
mapper = mappers[i % len(mappers)]
tasks.append(map_batch(batch, mapper))
# Execute with checkpoint updates
results = []
for i, coro in enumerate(asyncio.as_completed(tasks)):
result = await coro
results.append(result)
# Checkpoint every N completions
if (i + 1) % job.checkpoint_interval == 0:
await self._checkpoint_progress(job, results)
# Release mappers
await self.agent_pool.release(mappers)
return results
async def _run_mapper(
self,
job: MapReduceJob,
batch: List[Document],
mapper: Agent
) -> MapperOutput:
"""Execute single mapper agent"""
# Build prompt
prompt = self._build_mapper_prompt(job, batch)
# Execute with token tracking
start_tokens = mapper.token_usage
response = await mapper.execute(
prompt=prompt,
schema=job.mapper_schema,
max_tokens=job.mapper_token_budget
)
actual_tokens = mapper.token_usage - start_tokens
return MapperOutput(
batch_id=batch[0].id,
documents_processed=len(batch),
extractions=response.parsed,
tokens_used=actual_tokens,
errors=response.errors
)
async def _execute_shuffle_phase(
self,
job: MapReduceJob,
map_results: List[MapperOutput]
) -> Dict[str, List[Any]]:
"""Group and sort mapper outputs"""
grouped: Dict[str, List[Any]] = defaultdict(list)
for result in map_results:
for extraction in result.extractions:
# Group by category/key
key = self._get_shuffle_key(extraction, job)
grouped[key].append(extraction)
# Deduplicate within groups
for key in grouped:
grouped[key] = self._deduplicate(grouped[key])
return dict(grouped)
async def _execute_reduce_phase(
self,
job: MapReduceJob,
shuffled: Dict[str, List[Any]]
) -> List[ReducerOutput]:
"""Execute reducers on grouped data"""
if job.reduction_strategy == "hierarchical":
return await self._hierarchical_reduce(job, shuffled)
elif job.reduction_strategy == "flat":
return await self._flat_reduce(job, shuffled)
else: # category
return await self._category_reduce(job, shuffled)
async def _hierarchical_reduce(
self,
job: MapReduceJob,
shuffled: Dict[str, List[Any]]
) -> List[ReducerOutput]:
"""Reduce in multiple levels until single output"""
current_level = list(shuffled.values())
level_num = 0
while len(current_level) > 1:
# Group into batches for this level
batches = self._partition(current_level, size=5)
# Reduce each batch
reducers = await self.agent_pool.acquire(
role=AgentRole.REDUCER,
count=min(len(batches), 10)
)
next_level = []
for batch, reducer in zip(batches, cycle(reducers)):
reduced = await self._run_reducer(job, batch, reducer, level_num)
next_level.append(reduced)
await self.agent_pool.release(reducers)
current_level = next_level
level_num += 1
return current_level
2. Mapper Agent
class MapperAgent:
"""Agent specialized for document mapping"""
def __init__(
self,
agent_id: str,
llm: LLM,
tools: List[Tool]
):
self.agent_id = agent_id
self.llm = llm
self.tools = tools
self.token_usage = 0
self.circuit_breaker = CircuitBreaker()
async def map_document(
self,
document: ProcessedDocument,
extraction_schema: ExtractionSchema,
prompt_template: str
) -> MappedOutput:
"""Extract structured data from document"""
prompt = prompt_template.format(
document_content=document.content,
entities=document.entities,
schema=extraction_schema.to_json_schema()
)
async with self.circuit_breaker:
response = await self.llm.generate(
prompt=prompt,
response_format={"type": "json_object"},
max_tokens=extraction_schema.max_output_tokens
)
self.token_usage += response.usage.total_tokens
# Parse and validate
try:
parsed = extraction_schema.parse(response.content)
return MappedOutput(
document_id=document.id,
extractions=parsed,
confidence=self._calculate_confidence(parsed),
source_spans=self._extract_source_spans(parsed, document)
)
except ValidationError as e:
return MappedOutput(
document_id=document.id,
extractions=[],
confidence=0.0,
errors=[str(e)]
)
3. Reducer Agent
class ReducerAgent:
"""Agent specialized for aggregating mapper outputs"""
async def reduce(
self,
inputs: List[MappedOutput],
aggregation_schema: AggregationSchema,
prompt_template: str
) -> ReducedOutput:
"""Aggregate multiple mapper outputs"""
# Prepare input summary
input_summary = self._prepare_input_summary(inputs)
prompt = prompt_template.format(
inputs=input_summary,
total_documents=sum(1 for i in inputs),
schema=aggregation_schema.to_json_schema()
)
response = await self.llm.generate(
prompt=prompt,
response_format={"type": "json_object"},
max_tokens=aggregation_schema.max_output_tokens
)
return ReducedOutput(
aggregated=aggregation_schema.parse(response.content),
source_documents=[i.document_id for i in inputs],
tokens_used=response.usage.total_tokens
)
def _prepare_input_summary(
self,
inputs: List[MappedOutput]
) -> str:
"""Prepare inputs for reducer prompt"""
# Deduplicate and merge
merged = defaultdict(list)
for inp in inputs:
for extraction in inp.extractions:
key = extraction.get("category", "default")
merged[key].append(extraction)
# Format for prompt
sections = []
for category, items in merged.items():
sections.append(f"## {category}\n")
for item in items[:50]: # Limit per category
sections.append(f"- {json.dumps(item)}")
return "\n".join(sections)
Token Budget Management
@dataclass
class MapReduceTokenBudget:
"""Token budget allocation for map-reduce job"""
total_budget: int
# Allocation percentages
map_allocation: float = 0.6 # 60% for mapping
shuffle_allocation: float = 0.05 # 5% for shuffle (minimal)
reduce_allocation: float = 0.25 # 25% for reduction
synthesis_allocation: float = 0.1 # 10% for final synthesis
@property
def map_budget(self) -> int:
return int(self.total_budget * self.map_allocation)
@property
def reduce_budget(self) -> int:
return int(self.total_budget * self.reduce_allocation)
def per_mapper_budget(self, num_mappers: int) -> int:
return self.map_budget // num_mappers
def per_reducer_budget(self, num_reducers: int) -> int:
return self.reduce_budget // num_reducers
def validate_job(self, job: MapReduceJob) -> ValidationResult:
"""Validate job fits within budget"""
estimated_map = job.docs_per_mapper * len(job.documents) * 500 # Est 500 tokens/doc
estimated_reduce = len(job.documents) * 100 # Est 100 tokens/doc for reduction
total_estimated = estimated_map + estimated_reduce
if total_estimated > self.total_budget:
return ValidationResult(
valid=False,
message=f"Estimated {total_estimated} exceeds budget {self.total_budget}",
suggestions=[
"Increase preprocessing aggressiveness",
"Reduce docs_per_mapper",
"Increase total_budget"
]
)
return ValidationResult(valid=True)
Failure Recovery
class MapReduceCheckpoint:
"""Checkpoint state for recovery"""
job_id: UUID
phase: Literal["map", "shuffle", "reduce", "synthesis"]
# Map phase state
completed_batches: List[str]
pending_batches: List[str]
map_results: List[MapperOutput]
# Reduce phase state
reduce_level: int
reduce_results: List[ReducerOutput]
# Metrics
tokens_used: int
errors: List[str]
timestamp: datetime
class MapReduceRecovery:
"""Recovery manager for failed jobs"""
async def recover_job(
self,
job_id: UUID,
coordinator: MapReduceCoordinator
) -> JobHandle:
"""Recover a failed job from checkpoint"""
# Load latest checkpoint
checkpoint = await self.checkpoint_store.get_latest(job_id)
if checkpoint is None:
raise RecoveryError(f"No checkpoint found for job {job_id}")
# Determine recovery strategy
if checkpoint.phase == "map":
# Resume mapping from pending batches
return await self._recover_map_phase(checkpoint, coordinator)
elif checkpoint.phase == "reduce":
# Resume reduction from current level
return await self._recover_reduce_phase(checkpoint, coordinator)
else:
# Re-run from last stable checkpoint
return await self._recover_from_stable(checkpoint, coordinator)
Commands and Workflows
map_reduce_commands:
- command: "@corpus:analyze"
description: Run map-reduce analysis on corpus
handler: MapReduceCoordinator.submit_job
parameters:
- corpus_id: string (required)
- analysis_type: enum[extract, summarize, compare, custom]
- mapper_prompt: string (optional, has defaults per type)
- reducer_prompt: string (optional, has defaults per type)
- output_schema: string (optional)
- max_parallel: int (default: 50)
- token_budget: int (default: 1000000)
- command: "@corpus:status"
description: Check job status
handler: MapReduceCoordinator.get_status
parameters:
- job_id: string (required)
- command: "@corpus:cancel"
description: Cancel running job
handler: MapReduceCoordinator.cancel_job
parameters:
- job_id: string (required)
- checkpoint: bool (default: true)
- command: "@corpus:recover"
description: Recover failed job
handler: MapReduceRecovery.recover_job
parameters:
- job_id: string (required)
Consequences
Positive
- 10-50x faster than sequential processing
- Automatic scaling based on corpus size
- Fault tolerance through checkpointing
- Token budget enforcement prevents runaway costs
- Deterministic aggregation for reproducibility
Negative
- Coordination overhead (~5-10% of total time)
- Agent pool management complexity
- Partial results on failure may need manual review
Performance Projections
| Corpus Size | Sequential Time | Map-Reduce Time | Speedup |
|---|---|---|---|
| 100 docs | 3.3 hours | 12 minutes | 16x |
| 500 docs | 16.6 hours | 35 minutes | 28x |
| 1,000 docs | 33 hours | 55 minutes | 36x |
| 5,000 docs | 166 hours | 3.5 hours | 47x |
Assumes 2 min/doc sequential, 50 parallel mappers, 3-level reduction
References
- ADR-027: Corpus Processing Subsystem Architecture
- ADR-015: Multi-Agent Orchestration Framework
- LLM×MapReduce Paper
- Google Cloud Map-Reduce for LLMs
Approval
| Role | Name | Date | Decision |
|---|---|---|---|
| CTO | Hal Casteel | ||
| Platform Lead |