ADR-028: Map-Reduce Agent Orchestration
Status
PROPOSED
Date
2026-01-15
Context
Sequential document processing creates unacceptable latency for enterprise workloads. Processing 50 documents sequentially with Claude Code's approach requires ~30-60 minutes. Enterprise customers expect batch analysis of 1000+ documents to complete in minutes, not hours.
The Map-Reduce pattern, proven in distributed systems (Hadoop, Spark), can be adapted for LLM agent orchestration:
- Map Phase: Independent agents process documents in parallel
- Reduce Phase: Synthesis agent aggregates partial results
Constraints
| Constraint | Value | Source |
|---|---|---|
| Max concurrent API calls | 50 | Anthropic rate limits |
| Token budget per agent | 100K | Context window |
| 15x token multiplier | Active | Multi-agent overhead |
| Checkpoint frequency | 10K tokens | ADR-004 |
Decision
Implement a Map-Reduce Agent Orchestrator as a core Coditect capability.
Agent Roles
map_reduce_agents:
orchestrator:
role: MapReduceOrchestrator
responsibilities:
- Task decomposition into map units
- Agent pool management
- Token budget allocation
- Checkpoint coordination
- Result aggregation triggering
map_agent:
role: MapWorker
responsibilities:
- Process single document/chunk
- Extract structured data per schema
- Emit partial results
- Report token usage
capabilities:
- Stateless (no cross-document state)
- Idempotent (safe to retry)
- Schema-driven extraction
reduce_agent:
role: ReduceSynthesizer
responsibilities:
- Aggregate map outputs
- Deduplicate findings
- Resolve conflicts
- Generate unified output
capabilities:
- Handles partial results (streaming aggregation)
- Conflict resolution strategies
- Citation preservation
Orchestration Flow
┌─────────────────────────────────────────────────────────────────┐
│ MAP-REDUCE ORCHESTRATION │
├─────────────────────────────────────────────────────────────────┤
│ │
│ INPUT: List[Document], ExtractionSchema, TokenBudget │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PHASE 1: DECOMPOSITION │ │
│ │ │ │
│ │ documents.chunk(max_tokens=50K) → map_units[] │ │
│ │ allocate_budget(map_units, total_budget) → unit_budgets[] │ │
│ │ create_task_graph(map_units, dependencies) → DAG │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PHASE 2: MAP EXECUTION (PARALLEL) │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │MapAgent │ │MapAgent │ │MapAgent │ ... │MapAgent │ │ │
│ │ │ #1 │ │ #2 │ │ #3 │ │ #N │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Result │ │ Result │ │ Result │ │ Result │ │ │
│ │ │ #1 │ │ #2 │ │ #3 │ │ #N │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ │ Concurrency: min(N, rate_limit, budget_permits) │ │
│ │ Retry: exponential backoff with circuit breaker │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PHASE 3: REDUCE EXECUTION │ │
│ │ │ │
│ │ if len(results) <= context_window: │ │
│ │ single_reduce(results) → final_output │ │
│ │ else: │ │
│ │ hierarchical_reduce(results, levels=ceil(log(N))) │ │
│ │ → intermediate[] → final_output │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ OUTPUT: AggregatedResult, TokenMetrics, AuditTrail │
│ │
└─────────────────────────────────────────────────────────────────┘
Implementation
# /coditect/agents/map_reduce/orchestrator.py
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import asyncio
class MapReducePhase(Enum):
DECOMPOSITION = "decomposition"
MAPPING = "mapping"
REDUCING = "reducing"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class MapUnit:
"""Single unit of work for a map agent"""
unit_id: str
document_id: str
content: str
token_count: int
token_budget: int
extraction_schema: Dict[str, Any]
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MapResult:
"""Output from a single map agent"""
unit_id: str
document_id: str
extracted_data: Dict[str, Any]
tokens_used: int
confidence_score: float
citations: List[Dict[str, Any]]
errors: List[str] = field(default_factory=list)
@dataclass
class ReduceResult:
"""Aggregated output from reduce phase"""
aggregated_data: Dict[str, Any]
source_units: List[str]
total_tokens: int
deduplication_stats: Dict[str, int]
conflict_resolutions: List[Dict[str, Any]]
class MapReduceOrchestrator:
"""
Orchestrates parallel document processing using map-reduce pattern.
Integrates with:
- FoundationDB for checkpoint persistence (ADR-003)
- Token budget management (ADR-005)
- Circuit breaker patterns (ADR-012)
- Audit logging (ADR-015)
"""
def __init__(
self,
fdb_client: 'FoundationDBClient',
agent_pool: 'AgentPool',
config: 'MapReduceConfig'
):
self.fdb = fdb_client
self.agent_pool = agent_pool
self.config = config
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
self.metrics = MetricsCollector()
async def execute(
self,
documents: List['Document'],
extraction_schema: 'ExtractionSchema',
token_budget: 'TokenBudget',
on_progress: Optional[Callable] = None
) -> ReduceResult:
"""
Execute map-reduce pipeline on document set.
Args:
documents: Input documents to process
extraction_schema: Schema defining what to extract
token_budget: Total token allocation
on_progress: Optional callback for progress updates
Returns:
ReduceResult with aggregated findings
"""
job_id = self._generate_job_id()
try:
# Phase 1: Decomposition
await self._emit_phase(job_id, MapReducePhase.DECOMPOSITION)
map_units = await self._decompose(
documents,
extraction_schema,
token_budget
)
# Phase 2: Parallel Map Execution
await self._emit_phase(job_id, MapReducePhase.MAPPING)
map_results = await self._execute_map_phase(
map_units,
on_progress
)
# Phase 3: Reduce Execution
await self._emit_phase(job_id, MapReducePhase.REDUCING)
reduce_result = await self._execute_reduce_phase(
map_results,
extraction_schema
)
await self._emit_phase(job_id, MapReducePhase.COMPLETE)
return reduce_result
except Exception as e:
await self._emit_phase(job_id, MapReducePhase.FAILED, error=str(e))
await self._save_checkpoint(job_id, map_results if 'map_results' in locals() else [])
raise
async def _decompose(
self,
documents: List['Document'],
schema: 'ExtractionSchema',
budget: 'TokenBudget'
) -> List[MapUnit]:
"""Decompose documents into map units with budget allocation"""
map_units = []
per_unit_budget = budget.total // len(documents)
for doc in documents:
# Chunk large documents
if doc.token_count > self.config.max_tokens_per_unit:
chunks = self._chunk_document(
doc,
max_tokens=self.config.max_tokens_per_unit,
overlap=self.config.chunk_overlap
)
for i, chunk in enumerate(chunks):
map_units.append(MapUnit(
unit_id=f"{doc.id}_chunk_{i}",
document_id=doc.id,
content=chunk.content,
token_count=chunk.token_count,
token_budget=per_unit_budget // len(chunks),
extraction_schema=schema.to_dict(),
metadata={"chunk_index": i, "total_chunks": len(chunks)}
))
else:
map_units.append(MapUnit(
unit_id=doc.id,
document_id=doc.id,
content=doc.content,
token_count=doc.token_count,
token_budget=per_unit_budget,
extraction_schema=schema.to_dict()
))
return map_units
async def _execute_map_phase(
self,
units: List[MapUnit],
on_progress: Optional[Callable]
) -> List[MapResult]:
"""Execute map agents in parallel with rate limiting"""
semaphore = asyncio.Semaphore(self.config.max_concurrent_agents)
results = []
completed = 0
async def process_unit(unit: MapUnit) -> MapResult:
nonlocal completed
async with semaphore:
circuit_breaker = self._get_circuit_breaker(unit.unit_id)
try:
result = await circuit_breaker.call(
self._invoke_map_agent,
unit
)
completed += 1
if on_progress:
on_progress(completed, len(units))
return result
except CircuitBreakerOpen:
# Return partial result with error flag
return MapResult(
unit_id=unit.unit_id,
document_id=unit.document_id,
extracted_data={},
tokens_used=0,
confidence_score=0.0,
citations=[],
errors=["Circuit breaker open - skipped"]
)
# Execute all units in parallel (bounded by semaphore)
results = await asyncio.gather(*[
process_unit(unit) for unit in units
], return_exceptions=True)
# Filter out exceptions, convert to error results
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(MapResult(
unit_id=units[i].unit_id,
document_id=units[i].document_id,
extracted_data={},
tokens_used=0,
confidence_score=0.0,
citations=[],
errors=[str(result)]
))
else:
processed_results.append(result)
return processed_results
async def _invoke_map_agent(self, unit: MapUnit) -> MapResult:
"""Invoke a single map agent with the given unit"""
agent = await self.agent_pool.acquire(AgentRole.MAP_WORKER)
try:
prompt = self._build_map_prompt(unit)
response = await agent.execute(
prompt=prompt,
max_tokens=unit.token_budget,
tools=["extract_structured_data"]
)
return MapResult(
unit_id=unit.unit_id,
document_id=unit.document_id,
extracted_data=response.structured_output,
tokens_used=response.token_usage,
confidence_score=response.confidence,
citations=response.citations
)
finally:
await self.agent_pool.release(agent)
async def _execute_reduce_phase(
self,
map_results: List[MapResult],
schema: 'ExtractionSchema'
) -> ReduceResult:
"""Aggregate map results, using hierarchical reduce if needed"""
# Filter successful results
valid_results = [r for r in map_results if not r.errors]
total_tokens = sum(r.tokens_used for r in valid_results)
# Check if single-pass reduce is possible
if total_tokens <= self.config.reduce_context_limit:
return await self._single_reduce(valid_results, schema)
else:
return await self._hierarchical_reduce(valid_results, schema)
async def _single_reduce(
self,
results: List[MapResult],
schema: 'ExtractionSchema'
) -> ReduceResult:
"""Single-pass aggregation when results fit in context"""
agent = await self.agent_pool.acquire(AgentRole.REDUCE_SYNTHESIZER)
try:
prompt = self._build_reduce_prompt(results, schema)
response = await agent.execute(
prompt=prompt,
tools=["aggregate_findings", "resolve_conflicts", "deduplicate"]
)
return ReduceResult(
aggregated_data=response.structured_output,
source_units=[r.unit_id for r in results],
total_tokens=sum(r.tokens_used for r in results) + response.token_usage,
deduplication_stats=response.metadata.get("dedup_stats", {}),
conflict_resolutions=response.metadata.get("conflicts", [])
)
finally:
await self.agent_pool.release(agent)
async def _hierarchical_reduce(
self,
results: List[MapResult],
schema: 'ExtractionSchema'
) -> ReduceResult:
"""Multi-level reduce for large result sets"""
current_level = results
level_num = 0
while len(current_level) > self.config.reduce_batch_size:
# Group into batches
batches = self._batch_results(
current_level,
self.config.reduce_batch_size
)
# Reduce each batch in parallel
next_level = await asyncio.gather(*[
self._reduce_batch(batch, schema, level_num)
for batch in batches
])
current_level = next_level
level_num += 1
# Final reduce
return await self._single_reduce(current_level, schema)
def _build_map_prompt(self, unit: MapUnit) -> str:
"""Construct prompt for map agent"""
return f"""
ROLE: Document Extraction Agent
TASK: Extract structured data from the following document according to the schema.
EXTRACTION SCHEMA:
{json.dumps(unit.extraction_schema, indent=2)}
DOCUMENT CONTENT:
{unit.content}
INSTRUCTIONS:
1. Extract all data matching the schema fields
2. Include exact quotes as citations for each extracted item
3. Assign confidence score (0.0-1.0) for each extraction
4. If a field is not found, return null with explanation
OUTPUT FORMAT: JSON matching the extraction schema with citations array
"""
def _build_reduce_prompt(
self,
results: List[MapResult],
schema: 'ExtractionSchema'
) -> str:
"""Construct prompt for reduce agent"""
formatted_results = "\n\n".join([
f"--- Result from {r.unit_id} ---\n{json.dumps(r.extracted_data, indent=2)}"
for r in results
])
return f"""
ROLE: Synthesis Agent
TASK: Aggregate and synthesize findings from multiple document analyses.
EXTRACTION SCHEMA (for reference):
{json.dumps(schema.to_dict(), indent=2)}
MAP RESULTS:
{formatted_results}
INSTRUCTIONS:
1. Merge findings across all results
2. Deduplicate identical or near-identical items
3. Resolve conflicts when same item has different values
4. Preserve citation chains to original documents
5. Calculate aggregate statistics
OUTPUT FORMAT: JSON with:
- aggregated_data: Merged findings per schema
- dedup_stats: Count of duplicates removed per field
- conflicts: List of resolved conflicts with resolution rationale
"""
@dataclass
class MapReduceConfig:
"""Configuration for map-reduce orchestration"""
max_tokens_per_unit: int = 50_000
chunk_overlap: int = 500
max_concurrent_agents: int = 20
reduce_context_limit: int = 100_000
reduce_batch_size: int = 10
checkpoint_interval: int = 10
circuit_breaker_threshold: int = 3
circuit_breaker_timeout: float = 60.0
CLI Commands
# New Coditect CLI commands for map-reduce operations
# Process document corpus with map-reduce
coditect mapreduce run \
--input ./documents/ \
--schema ./extraction_schema.yaml \
--output ./results/ \
--parallelism 20 \
--budget 500000
# Resume interrupted job
coditect mapreduce resume \
--job-id mr_20260115_abc123
# View job status
coditect mapreduce status \
--job-id mr_20260115_abc123
# List recent jobs
coditect mapreduce list \
--status [running|completed|failed] \
--limit 20
FoundationDB Schema
# Map-Reduce job state in FoundationDB
mapreduce_schema = {
"jobs": {
# Key: ('mapreduce', 'jobs', job_id)
"job_id": "string",
"status": "enum[decomposition|mapping|reducing|complete|failed]",
"created_at": "timestamp",
"updated_at": "timestamp",
"config": "json",
"total_units": "int",
"completed_units": "int",
"total_tokens": "int",
"error": "string|null"
},
"units": {
# Key: ('mapreduce', 'units', job_id, unit_id)
"unit_id": "string",
"document_id": "string",
"status": "enum[pending|processing|complete|failed]",
"result": "json|null",
"tokens_used": "int",
"error": "string|null"
},
"checkpoints": {
# Key: ('mapreduce', 'checkpoints', job_id, checkpoint_id)
"checkpoint_id": "string",
"timestamp": "timestamp",
"completed_units": "list[string]",
"partial_results": "json"
}
}
Consequences
Positive
- 10-50x speedup for batch processing (parallel vs sequential)
- Fault tolerance: Individual unit failures don't block entire job
- Scalability: Horizontal scaling by adding more concurrent agents
- Visibility: Per-unit progress tracking and status
Negative
- Complexity: More moving parts than sequential processing
- Resource spikes: Parallel execution creates burst API usage
- Coordination overhead: Checkpoint management adds latency
- Debugging difficulty: Distributed failures harder to trace
Metrics
| Metric | Target | Measurement |
|---|---|---|
| Documents per minute | 100+ | job_duration / document_count |
| Agent utilization | >80% | active_time / total_time |
| Failure rate | <5% | failed_units / total_units |
| Token efficiency | <1000/doc | total_tokens / document_count |
Related ADRs
- ADR-027: Hybrid Document Processing Architecture (parent)
- ADR-004: Agent Checkpoint and Recovery
- ADR-005: Token Budget Management
- ADR-012: Circuit Breaker Patterns