Skip to main content

ADR-028: Map-Reduce Agent Orchestration

Status

PROPOSED

Date

2026-01-15

Context

Sequential document processing creates unacceptable latency for enterprise workloads. Processing 50 documents sequentially with Claude Code's approach requires ~30-60 minutes. Enterprise customers expect batch analysis of 1000+ documents to complete in minutes, not hours.

The Map-Reduce pattern, proven in distributed systems (Hadoop, Spark), can be adapted for LLM agent orchestration:

  • Map Phase: Independent agents process documents in parallel
  • Reduce Phase: Synthesis agent aggregates partial results

Constraints

ConstraintValueSource
Max concurrent API calls50Anthropic rate limits
Token budget per agent100KContext window
15x token multiplierActiveMulti-agent overhead
Checkpoint frequency10K tokensADR-004

Decision

Implement a Map-Reduce Agent Orchestrator as a core Coditect capability.

Agent Roles

map_reduce_agents:
orchestrator:
role: MapReduceOrchestrator
responsibilities:
- Task decomposition into map units
- Agent pool management
- Token budget allocation
- Checkpoint coordination
- Result aggregation triggering

map_agent:
role: MapWorker
responsibilities:
- Process single document/chunk
- Extract structured data per schema
- Emit partial results
- Report token usage
capabilities:
- Stateless (no cross-document state)
- Idempotent (safe to retry)
- Schema-driven extraction

reduce_agent:
role: ReduceSynthesizer
responsibilities:
- Aggregate map outputs
- Deduplicate findings
- Resolve conflicts
- Generate unified output
capabilities:
- Handles partial results (streaming aggregation)
- Conflict resolution strategies
- Citation preservation

Orchestration Flow

┌─────────────────────────────────────────────────────────────────┐
│ MAP-REDUCE ORCHESTRATION │
├─────────────────────────────────────────────────────────────────┤
│ │
│ INPUT: List[Document], ExtractionSchema, TokenBudget │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PHASE 1: DECOMPOSITION │ │
│ │ │ │
│ │ documents.chunk(max_tokens=50K) → map_units[] │ │
│ │ allocate_budget(map_units, total_budget) → unit_budgets[] │ │
│ │ create_task_graph(map_units, dependencies) → DAG │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PHASE 2: MAP EXECUTION (PARALLEL) │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │MapAgent │ │MapAgent │ │MapAgent │ ... │MapAgent │ │ │
│ │ │ #1 │ │ #2 │ │ #3 │ │ #N │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Result │ │ Result │ │ Result │ │ Result │ │ │
│ │ │ #1 │ │ #2 │ │ #3 │ │ #N │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ │ Concurrency: min(N, rate_limit, budget_permits) │ │
│ │ Retry: exponential backoff with circuit breaker │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PHASE 3: REDUCE EXECUTION │ │
│ │ │ │
│ │ if len(results) <= context_window: │ │
│ │ single_reduce(results) → final_output │ │
│ │ else: │ │
│ │ hierarchical_reduce(results, levels=ceil(log(N))) │ │
│ │ → intermediate[] → final_output │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ OUTPUT: AggregatedResult, TokenMetrics, AuditTrail │
│ │
└─────────────────────────────────────────────────────────────────┘

Implementation

# /coditect/agents/map_reduce/orchestrator.py

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import asyncio

class MapReducePhase(Enum):
DECOMPOSITION = "decomposition"
MAPPING = "mapping"
REDUCING = "reducing"
COMPLETE = "complete"
FAILED = "failed"

@dataclass
class MapUnit:
"""Single unit of work for a map agent"""
unit_id: str
document_id: str
content: str
token_count: int
token_budget: int
extraction_schema: Dict[str, Any]
metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class MapResult:
"""Output from a single map agent"""
unit_id: str
document_id: str
extracted_data: Dict[str, Any]
tokens_used: int
confidence_score: float
citations: List[Dict[str, Any]]
errors: List[str] = field(default_factory=list)

@dataclass
class ReduceResult:
"""Aggregated output from reduce phase"""
aggregated_data: Dict[str, Any]
source_units: List[str]
total_tokens: int
deduplication_stats: Dict[str, int]
conflict_resolutions: List[Dict[str, Any]]

class MapReduceOrchestrator:
"""
Orchestrates parallel document processing using map-reduce pattern.

Integrates with:
- FoundationDB for checkpoint persistence (ADR-003)
- Token budget management (ADR-005)
- Circuit breaker patterns (ADR-012)
- Audit logging (ADR-015)
"""

def __init__(
self,
fdb_client: 'FoundationDBClient',
agent_pool: 'AgentPool',
config: 'MapReduceConfig'
):
self.fdb = fdb_client
self.agent_pool = agent_pool
self.config = config
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
self.metrics = MetricsCollector()

async def execute(
self,
documents: List['Document'],
extraction_schema: 'ExtractionSchema',
token_budget: 'TokenBudget',
on_progress: Optional[Callable] = None
) -> ReduceResult:
"""
Execute map-reduce pipeline on document set.

Args:
documents: Input documents to process
extraction_schema: Schema defining what to extract
token_budget: Total token allocation
on_progress: Optional callback for progress updates

Returns:
ReduceResult with aggregated findings
"""
job_id = self._generate_job_id()

try:
# Phase 1: Decomposition
await self._emit_phase(job_id, MapReducePhase.DECOMPOSITION)
map_units = await self._decompose(
documents,
extraction_schema,
token_budget
)

# Phase 2: Parallel Map Execution
await self._emit_phase(job_id, MapReducePhase.MAPPING)
map_results = await self._execute_map_phase(
map_units,
on_progress
)

# Phase 3: Reduce Execution
await self._emit_phase(job_id, MapReducePhase.REDUCING)
reduce_result = await self._execute_reduce_phase(
map_results,
extraction_schema
)

await self._emit_phase(job_id, MapReducePhase.COMPLETE)
return reduce_result

except Exception as e:
await self._emit_phase(job_id, MapReducePhase.FAILED, error=str(e))
await self._save_checkpoint(job_id, map_results if 'map_results' in locals() else [])
raise

async def _decompose(
self,
documents: List['Document'],
schema: 'ExtractionSchema',
budget: 'TokenBudget'
) -> List[MapUnit]:
"""Decompose documents into map units with budget allocation"""

map_units = []
per_unit_budget = budget.total // len(documents)

for doc in documents:
# Chunk large documents
if doc.token_count > self.config.max_tokens_per_unit:
chunks = self._chunk_document(
doc,
max_tokens=self.config.max_tokens_per_unit,
overlap=self.config.chunk_overlap
)
for i, chunk in enumerate(chunks):
map_units.append(MapUnit(
unit_id=f"{doc.id}_chunk_{i}",
document_id=doc.id,
content=chunk.content,
token_count=chunk.token_count,
token_budget=per_unit_budget // len(chunks),
extraction_schema=schema.to_dict(),
metadata={"chunk_index": i, "total_chunks": len(chunks)}
))
else:
map_units.append(MapUnit(
unit_id=doc.id,
document_id=doc.id,
content=doc.content,
token_count=doc.token_count,
token_budget=per_unit_budget,
extraction_schema=schema.to_dict()
))

return map_units

async def _execute_map_phase(
self,
units: List[MapUnit],
on_progress: Optional[Callable]
) -> List[MapResult]:
"""Execute map agents in parallel with rate limiting"""

semaphore = asyncio.Semaphore(self.config.max_concurrent_agents)
results = []
completed = 0

async def process_unit(unit: MapUnit) -> MapResult:
nonlocal completed
async with semaphore:
circuit_breaker = self._get_circuit_breaker(unit.unit_id)

try:
result = await circuit_breaker.call(
self._invoke_map_agent,
unit
)
completed += 1
if on_progress:
on_progress(completed, len(units))
return result

except CircuitBreakerOpen:
# Return partial result with error flag
return MapResult(
unit_id=unit.unit_id,
document_id=unit.document_id,
extracted_data={},
tokens_used=0,
confidence_score=0.0,
citations=[],
errors=["Circuit breaker open - skipped"]
)

# Execute all units in parallel (bounded by semaphore)
results = await asyncio.gather(*[
process_unit(unit) for unit in units
], return_exceptions=True)

# Filter out exceptions, convert to error results
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(MapResult(
unit_id=units[i].unit_id,
document_id=units[i].document_id,
extracted_data={},
tokens_used=0,
confidence_score=0.0,
citations=[],
errors=[str(result)]
))
else:
processed_results.append(result)

return processed_results

async def _invoke_map_agent(self, unit: MapUnit) -> MapResult:
"""Invoke a single map agent with the given unit"""

agent = await self.agent_pool.acquire(AgentRole.MAP_WORKER)

try:
prompt = self._build_map_prompt(unit)
response = await agent.execute(
prompt=prompt,
max_tokens=unit.token_budget,
tools=["extract_structured_data"]
)

return MapResult(
unit_id=unit.unit_id,
document_id=unit.document_id,
extracted_data=response.structured_output,
tokens_used=response.token_usage,
confidence_score=response.confidence,
citations=response.citations
)
finally:
await self.agent_pool.release(agent)

async def _execute_reduce_phase(
self,
map_results: List[MapResult],
schema: 'ExtractionSchema'
) -> ReduceResult:
"""Aggregate map results, using hierarchical reduce if needed"""

# Filter successful results
valid_results = [r for r in map_results if not r.errors]

total_tokens = sum(r.tokens_used for r in valid_results)

# Check if single-pass reduce is possible
if total_tokens <= self.config.reduce_context_limit:
return await self._single_reduce(valid_results, schema)
else:
return await self._hierarchical_reduce(valid_results, schema)

async def _single_reduce(
self,
results: List[MapResult],
schema: 'ExtractionSchema'
) -> ReduceResult:
"""Single-pass aggregation when results fit in context"""

agent = await self.agent_pool.acquire(AgentRole.REDUCE_SYNTHESIZER)

try:
prompt = self._build_reduce_prompt(results, schema)
response = await agent.execute(
prompt=prompt,
tools=["aggregate_findings", "resolve_conflicts", "deduplicate"]
)

return ReduceResult(
aggregated_data=response.structured_output,
source_units=[r.unit_id for r in results],
total_tokens=sum(r.tokens_used for r in results) + response.token_usage,
deduplication_stats=response.metadata.get("dedup_stats", {}),
conflict_resolutions=response.metadata.get("conflicts", [])
)
finally:
await self.agent_pool.release(agent)

async def _hierarchical_reduce(
self,
results: List[MapResult],
schema: 'ExtractionSchema'
) -> ReduceResult:
"""Multi-level reduce for large result sets"""

current_level = results
level_num = 0

while len(current_level) > self.config.reduce_batch_size:
# Group into batches
batches = self._batch_results(
current_level,
self.config.reduce_batch_size
)

# Reduce each batch in parallel
next_level = await asyncio.gather(*[
self._reduce_batch(batch, schema, level_num)
for batch in batches
])

current_level = next_level
level_num += 1

# Final reduce
return await self._single_reduce(current_level, schema)

def _build_map_prompt(self, unit: MapUnit) -> str:
"""Construct prompt for map agent"""
return f"""
ROLE: Document Extraction Agent
TASK: Extract structured data from the following document according to the schema.

EXTRACTION SCHEMA:
{json.dumps(unit.extraction_schema, indent=2)}

DOCUMENT CONTENT:
{unit.content}

INSTRUCTIONS:
1. Extract all data matching the schema fields
2. Include exact quotes as citations for each extracted item
3. Assign confidence score (0.0-1.0) for each extraction
4. If a field is not found, return null with explanation

OUTPUT FORMAT: JSON matching the extraction schema with citations array
"""

def _build_reduce_prompt(
self,
results: List[MapResult],
schema: 'ExtractionSchema'
) -> str:
"""Construct prompt for reduce agent"""

formatted_results = "\n\n".join([
f"--- Result from {r.unit_id} ---\n{json.dumps(r.extracted_data, indent=2)}"
for r in results
])

return f"""
ROLE: Synthesis Agent
TASK: Aggregate and synthesize findings from multiple document analyses.

EXTRACTION SCHEMA (for reference):
{json.dumps(schema.to_dict(), indent=2)}

MAP RESULTS:
{formatted_results}

INSTRUCTIONS:
1. Merge findings across all results
2. Deduplicate identical or near-identical items
3. Resolve conflicts when same item has different values
4. Preserve citation chains to original documents
5. Calculate aggregate statistics

OUTPUT FORMAT: JSON with:
- aggregated_data: Merged findings per schema
- dedup_stats: Count of duplicates removed per field
- conflicts: List of resolved conflicts with resolution rationale
"""


@dataclass
class MapReduceConfig:
"""Configuration for map-reduce orchestration"""
max_tokens_per_unit: int = 50_000
chunk_overlap: int = 500
max_concurrent_agents: int = 20
reduce_context_limit: int = 100_000
reduce_batch_size: int = 10
checkpoint_interval: int = 10
circuit_breaker_threshold: int = 3
circuit_breaker_timeout: float = 60.0

CLI Commands

# New Coditect CLI commands for map-reduce operations

# Process document corpus with map-reduce
coditect mapreduce run \
--input ./documents/ \
--schema ./extraction_schema.yaml \
--output ./results/ \
--parallelism 20 \
--budget 500000

# Resume interrupted job
coditect mapreduce resume \
--job-id mr_20260115_abc123

# View job status
coditect mapreduce status \
--job-id mr_20260115_abc123

# List recent jobs
coditect mapreduce list \
--status [running|completed|failed] \
--limit 20

FoundationDB Schema

# Map-Reduce job state in FoundationDB
mapreduce_schema = {
"jobs": {
# Key: ('mapreduce', 'jobs', job_id)
"job_id": "string",
"status": "enum[decomposition|mapping|reducing|complete|failed]",
"created_at": "timestamp",
"updated_at": "timestamp",
"config": "json",
"total_units": "int",
"completed_units": "int",
"total_tokens": "int",
"error": "string|null"
},
"units": {
# Key: ('mapreduce', 'units', job_id, unit_id)
"unit_id": "string",
"document_id": "string",
"status": "enum[pending|processing|complete|failed]",
"result": "json|null",
"tokens_used": "int",
"error": "string|null"
},
"checkpoints": {
# Key: ('mapreduce', 'checkpoints', job_id, checkpoint_id)
"checkpoint_id": "string",
"timestamp": "timestamp",
"completed_units": "list[string]",
"partial_results": "json"
}
}

Consequences

Positive

  • 10-50x speedup for batch processing (parallel vs sequential)
  • Fault tolerance: Individual unit failures don't block entire job
  • Scalability: Horizontal scaling by adding more concurrent agents
  • Visibility: Per-unit progress tracking and status

Negative

  • Complexity: More moving parts than sequential processing
  • Resource spikes: Parallel execution creates burst API usage
  • Coordination overhead: Checkpoint management adds latency
  • Debugging difficulty: Distributed failures harder to trace

Metrics

MetricTargetMeasurement
Documents per minute100+job_duration / document_count
Agent utilization>80%active_time / total_time
Failure rate<5%failed_units / total_units
Token efficiency<1000/doctotal_tokens / document_count
  • ADR-027: Hybrid Document Processing Architecture (parent)
  • ADR-004: Agent Checkpoint and Recovery
  • ADR-005: Token Budget Management
  • ADR-012: Circuit Breaker Patterns