RLM Technical Implementation Guide
For CODITECT Engineering Team
Version: 1.0
Date: January 13, 2026
Table of Contents
- Architecture Overview
- Core Components
- Token Economics
- Error Handling Patterns
- Production Code Examples
- Quality Gates
- Deployment Checklist
Architecture Overview
System Diagram
┌─────────────────────────────────────────────────────────────────┐
│ RLM Root Agent │
│ (Claude Sonnet 4) │
│ │
│ - Receives user query │
│ - Initializes REPL environment │
│ - Plans decomposition strategy │
│ - Routes to specialist agents │
└────────────┬────────────────────────────────────────────────────┘
│
├──────────────┬──────────────┬──────────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌────────────┐ ┌─────────────┐ ┌──────────┐ ┌────────────┐
│ Document │ │ Code │ │ Workflow │ │ Research │
│ Processor │ │ Analyzer │ │ Executor │ │ Agent │
│ │ │ │ │ │ │ │
│ (Haiku 4) │ │ (Qwen3-480B)│ │(Sonnet 4)│ │ (Sonnet 4) │
└────────────┘ └─────────────┘ └──────────┘ └────────────┘
│ │ │ │
└──────────────┴──────────────┴──────────────────┘
│
▼
┌──────────────────┐
│ REPL Environment │
│ │
│ - Python 3.11 │
│ - Sandboxed │
│ - No network │
│ - 10GB memory │
└──────────────────┘
│
├─────────────┬──────────────┐
▼ ▼ ▼
┌────────────┐ ┌─────────┐ ┌──────────┐
│ Checkpoint │ │ Circuit │ │ Metrics │
│ Store │ │ Breaker │ │ Monitor │
│ (Redis) │ │ │ │ │
└────────────┘ └─────────┘ └──────────┘
Core Components
1. REPL Environment Manager
import ast
import sys
from io import StringIO
from typing import Any, Dict, List, Optional
import subprocess
import json
class SecureREPL:
"""
Sandboxed Python REPL for RLM execution.
Security features:
- No network access
- Limited system calls
- Memory constraints (10GB)
- CPU timeout (60s per execution)
- Whitelist of allowed modules
"""
ALLOWED_MODULES = {
'json', 're', 'math', 'itertools', 'functools',
'collections', 'datetime', 'typing', 'dataclasses'
}
FORBIDDEN_OPERATIONS = [
'import os', 'import sys', 'import subprocess',
'__import__', 'eval', 'exec', 'compile',
'open(', 'file(', 'input('
]
def __init__(self, memory_limit_gb: int = 10):
self.namespace: Dict[str, Any] = {}
self.memory_limit_gb = memory_limit_gb
self.execution_count = 0
def validate_code(self, code: str) -> tuple[bool, str]:
"""Validate code for security before execution."""
# Check for forbidden operations
for forbidden in self.FORBIDDEN_OPERATIONS:
if forbidden in code:
return False, f"Forbidden operation: {forbidden}"
# Parse AST to check for dangerous constructs
try:
tree = ast.parse(code)
except SyntaxError as e:
return False, f"Syntax error: {e}"
# Check for dangerous AST nodes
for node in ast.walk(tree):
if isinstance(node, (ast.Import, ast.ImportFrom)):
module_name = (
node.names[0].name if isinstance(node, ast.Import)
else node.module
)
if module_name not in self.ALLOWED_MODULES:
return False, f"Module not allowed: {module_name}"
return True, "OK"
def execute(self, code: str, timeout: int = 60) -> Dict[str, Any]:
"""
Execute code in sandboxed environment.
Returns:
{
'output': str, # Printed output
'error': Optional[str], # Error message if failed
'variables': Dict[str, Any], # Updated namespace
'truncated': bool # If output was truncated
}
"""
# Validate code
is_valid, msg = self.validate_code(code)
if not is_valid:
return {
'output': '',
'error': f"Security validation failed: {msg}",
'variables': self.namespace,
'truncated': False
}
# Capture stdout
old_stdout = sys.stdout
sys.stdout = captured_output = StringIO()
error_msg = None
try:
# Execute with timeout
exec(code, self.namespace)
self.execution_count += 1
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}"
finally:
# Restore stdout
sys.stdout = old_stdout
output = captured_output.getvalue()
# Truncate if too long (>10K chars)
truncated = False
if len(output) > 10_000:
output = output[:10_000] + "\n... [truncated]"
truncated = True
return {
'output': output,
'error': error_msg,
'variables': {
k: v for k, v in self.namespace.items()
if not k.startswith('_')
},
'truncated': truncated
}
def set_variable(self, name: str, value: Any) -> None:
"""Set a variable in the namespace."""
self.namespace[name] = value
def get_variable(self, name: str) -> Optional[Any]:
"""Get a variable from the namespace."""
return self.namespace.get(name)
def reset(self) -> None:
"""Reset the namespace."""
self.namespace = {}
self.execution_count = 0
2. Recursive LLM Query Function
from anthropic import Anthropic
from typing import Optional, Dict, Any
class RecursiveLLMCaller:
"""
Manages recursive LLM calls with token budgeting and circuit breaking.
"""
def __init__(
self,
api_key: str,
model: str = "claude-sonnet-4-20250514",
max_tokens: int = 4000,
max_depth: int = 3
):
self.client = Anthropic(api_key=api_key)
self.model = model
self.max_tokens = max_tokens
self.max_depth = max_depth
self.current_depth = 0
self.total_tokens_used = 0
self.call_count = 0
def query(
self,
prompt: str,
context: Optional[str] = None,
temperature: float = 0.7
) -> Dict[str, Any]:
"""
Execute a recursive LLM call.
Args:
prompt: The query to send to the LLM
context: Optional context to include
temperature: Sampling temperature
Returns:
{
'response': str,
'tokens_used': int,
'depth': int,
'model': str
}
"""
# Check recursion depth
if self.current_depth >= self.max_depth:
return {
'response': f"Max recursion depth ({self.max_depth}) reached",
'tokens_used': 0,
'depth': self.current_depth,
'model': self.model,
'error': 'MAX_DEPTH_EXCEEDED'
}
# Construct full prompt
full_prompt = prompt
if context:
# Truncate context if too long (>100K chars)
if len(context) > 100_000:
context = context[:100_000] + "\n... [truncated]"
full_prompt = f"{prompt}\n\nContext:\n{context}"
try:
self.current_depth += 1
self.call_count += 1
# Call API
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
temperature=temperature,
messages=[
{"role": "user", "content": full_prompt}
]
)
# Extract response
response_text = response.content[0].text
# Track tokens
tokens_used = response.usage.input_tokens + response.usage.output_tokens
self.total_tokens_used += tokens_used
return {
'response': response_text,
'tokens_used': tokens_used,
'depth': self.current_depth,
'model': self.model
}
except Exception as e:
return {
'response': '',
'tokens_used': 0,
'depth': self.current_depth,
'model': self.model,
'error': str(e)
}
finally:
self.current_depth -= 1
3. RLM Orchestrator
from typing import Optional, Dict, Any, List
import time
class RLMOrchestrator:
"""
Main RLM orchestration class that combines REPL + recursive LLM calls.
"""
def __init__(
self,
anthropic_api_key: str,
root_model: str = "claude-sonnet-4-20250514",
sub_model: str = "claude-haiku-4-20250514",
max_iterations: int = 20
):
self.repl = SecureREPL()
self.root_llm = RecursiveLLMCaller(
anthropic_api_key,
model=root_model,
max_depth=1 # Root doesn't recurse
)
self.sub_llm = RecursiveLLMCaller(
anthropic_api_key,
model=sub_model,
max_depth=3 # Sub-agents can recurse
)
self.max_iterations = max_iterations
self.iteration_count = 0
self.start_time = None
# Metrics
self.metrics = {
'total_tokens': 0,
'tool_calls': 0,
'sub_llm_calls': 0,
'execution_time': 0,
'error_count': 0
}
def _create_system_prompt(self, context_info: Dict[str, Any]) -> str:
"""Create system prompt for RLM."""
return f"""You are tasked with answering a query with associated context. You can access, transform, and analyze this context interactively in a REPL environment that can recursively query sub-LLMs.
Your context is a {context_info['type']} with {context_info['total_length']:,} total characters.
The REPL environment is initialized with:
1. A 'context' variable containing extremely important information about your query
2. A 'llm_query' function that allows you to query a sub-LLM (handles ~100K chars)
3. The ability to use 'print()' statements to view output
CRITICAL: Be strategic with 'llm_query' calls - they are expensive. Batch information when possible (aim for ~50K characters per call). If you have 1000 items to process, chunk into groups of 10-20 rather than making 1000 individual calls.
Make sure to explicitly look through the entire context before answering your query. An example strategy:
1. Examine context structure
2. Chunk strategically
3. Query sub-LLMs on chunks
4. Synthesize results
When you want to execute Python code, wrap it in triple backticks with 'repl' language identifier.
When done, provide your final answer inside FINAL(your answer here) or FINAL_VAR(variable_name).
Think step by step and execute immediately - don't just plan."""
def execute(
self,
query: str,
context: Any,
context_type: str = "string"
) -> Dict[str, Any]:
"""
Execute RLM on a query with context.
Args:
query: User's question
context: Data to analyze (string, list, dict, etc.)
context_type: Description of context type
Returns:
{
'answer': str,
'metrics': Dict,
'trajectory': List[Dict], # Step-by-step execution log
'success': bool
}
"""
self.start_time = time.time()
self.iteration_count = 0
# Load context into REPL
self.repl.set_variable('context', context)
# Add llm_query function
def llm_query_wrapper(prompt: str) -> str:
"""Wrapper for sub-LLM queries."""
self.metrics['sub_llm_calls'] += 1
result = self.sub_llm.query(prompt)
self.metrics['total_tokens'] += result['tokens_used']
return result['response']
self.repl.set_variable('llm_query', llm_query_wrapper)
# Context info for system prompt
context_length = len(str(context))
context_info = {
'type': context_type,
'total_length': context_length
}
system_prompt = self._create_system_prompt(context_info)
# Execution trajectory
trajectory = []
# Main loop
conversation_history = [
{"role": "assistant", "content": system_prompt},
{"role": "user", "content": f"Query: {query}\n\nAnalyze the 'context' variable to answer this query."}
]
final_answer = None
while self.iteration_count < self.max_iterations:
self.iteration_count += 1
# Get next action from root LLM
root_response = self.root_llm.query(
"\n".join([m["content"] for m in conversation_history])
)
self.metrics['total_tokens'] += root_response['tokens_used']
response_text = root_response['response']
# Check for final answer
if 'FINAL(' in response_text or 'FINAL_VAR(' in response_text:
# Extract final answer
if 'FINAL(' in response_text:
start = response_text.index('FINAL(') + 6
end = response_text.index(')', start)
final_answer = response_text[start:end]
else: # FINAL_VAR
start = response_text.index('FINAL_VAR(') + 10
end = response_text.index(')', start)
var_name = response_text[start:end]
final_answer = str(self.repl.get_variable(var_name))
trajectory.append({
'iteration': self.iteration_count,
'action': 'final_answer',
'content': final_answer
})
break
# Check for REPL code
if '```repl' in response_text:
# Extract code
start = response_text.index('```repl') + 7
end = response_text.index('```', start)
code = response_text[start:end].strip()
# Execute code
execution_result = self.repl.execute(code)
self.metrics['tool_calls'] += 1
if execution_result['error']:
self.metrics['error_count'] += 1
trajectory.append({
'iteration': self.iteration_count,
'action': 'repl_execution',
'code': code,
'output': execution_result['output'],
'error': execution_result['error']
})
# Add execution result to conversation
conversation_history.append({
"role": "assistant",
"content": response_text
})
conversation_history.append({
"role": "user",
"content": f"Execution output:\n{execution_result['output']}\n\nContinue or provide final answer."
})
else:
# No code to execute, just reasoning
trajectory.append({
'iteration': self.iteration_count,
'action': 'reasoning',
'content': response_text
})
conversation_history.append({
"role": "assistant",
"content": response_text
})
conversation_history.append({
"role": "user",
"content": "Continue your analysis or provide final answer."
})
# Calculate final metrics
self.metrics['execution_time'] = time.time() - self.start_time
return {
'answer': final_answer or "No final answer provided",
'metrics': self.metrics,
'trajectory': trajectory,
'success': final_answer is not None
}
Token Economics
Budget Calculator
from dataclasses import dataclass
from typing import Literal
@dataclass
class TokenBudget:
"""Token budget configuration for different task types."""
# Base multipliers from RLM paper
CHAT_BASELINE = 1_000
SINGLE_AGENT_MULTIPLIER = 4
MULTI_AGENT_MULTIPLIER = 15
# CODITECT task budgets
BUDGETS = {
'simple_query': {
'lead_agent_tokens': 5_000,
'sub_agent_tokens': 2_000,
'max_agents': 2,
'expected_cost_usd': 0.05
},
'document_processing': {
'lead_agent_tokens': 15_000,
'sub_agent_tokens': 5_000,
'max_agents': 5,
'expected_cost_usd': 0.25
},
'workflow_automation': {
'lead_agent_tokens': 50_000,
'sub_agent_tokens': 10_000,
'max_agents': 10,
'expected_cost_usd': 1.50
},
'deep_research': {
'lead_agent_tokens': 100_000,
'sub_agent_tokens': 20_000,
'max_agents': 15,
'expected_cost_usd': 5.00
}
}
@classmethod
def calculate_budget(
cls,
task_type: Literal['simple_query', 'document_processing', 'workflow_automation', 'deep_research'],
input_size_tokens: int
) -> dict:
"""
Calculate token budget for a task.
Returns:
{
'total_budget': int,
'lead_agent_budget': int,
'sub_agent_budget': int,
'max_sub_calls': int,
'estimated_cost': float
}
"""
budget = cls.BUDGETS[task_type]
# Adjust for input size
if input_size_tokens > 100_000:
# Add 20% for large inputs
budget['lead_agent_tokens'] = int(budget['lead_agent_tokens'] * 1.2)
max_sub_calls = (
budget['lead_agent_tokens'] // budget['sub_agent_tokens']
)
total_budget = (
budget['lead_agent_tokens'] +
(budget['sub_agent_tokens'] * max_sub_calls)
)
return {
'total_budget': total_budget,
'lead_agent_budget': budget['lead_agent_tokens'],
'sub_agent_budget': budget['sub_agent_tokens'],
'max_sub_calls': max_sub_calls,
'estimated_cost': budget['expected_cost_usd']
}
@classmethod
def calculate_roi(
cls,
task_type: str,
time_saved_hours: float,
hourly_rate: float = 50.0
) -> float:
"""
Calculate ROI for RLM vs manual work.
Args:
task_type: Type of task
time_saved_hours: Hours saved by automation
hourly_rate: Cost of manual labor per hour
Returns:
ROI multiplier (e.g., 400.0 means 400x ROI)
"""
rlm_cost = cls.BUDGETS[task_type]['expected_cost_usd']
manual_cost = time_saved_hours * hourly_rate
return manual_cost / rlm_cost if rlm_cost > 0 else float('inf')
Error Handling Patterns
Circuit Breaker Implementation
import time
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
"""
Prevent cascading failures across agent calls.
"""
def __init__(
self,
failure_threshold: int = 3,
recovery_timeout: float = 60.0,
half_open_requests: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_requests = half_open_requests
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.state = CircuitState.CLOSED
self.success_count = 0
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Execute function with circuit breaker protection.
Raises:
CircuitBreakerOpen: If circuit is open
"""
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise CircuitBreakerOpen(
f"Circuit breaker is open. Try again in "
f"{self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise
async def _on_success(self):
"""Handle successful call."""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.half_open_requests:
self.state = CircuitState.CLOSED
async def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class CircuitBreakerOpen(Exception):
"""Raised when circuit breaker is open."""
pass
Checkpoint System
import redis
import pickle
from typing import Any, Optional
from dataclasses import dataclass, asdict
import time
@dataclass
class Checkpoint:
"""State checkpoint for recovery."""
agent_state: dict
completed_tasks: list
pending_tasks: list
token_usage: int
timestamp: float
error_count: int = 0
recovery_attempts: int = 0
partial_results: dict = None
class CheckpointStore:
"""Redis-backed checkpoint storage."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.checkpoint_ttl = 86400 # 24 hours
def save(self, session_id: str, checkpoint: Checkpoint) -> None:
"""Save checkpoint to Redis."""
key = f"rlm:checkpoint:{session_id}"
data = pickle.dumps(checkpoint)
self.redis_client.setex(key, self.checkpoint_ttl, data)
def get_latest(self, session_id: str) -> Optional[Checkpoint]:
"""Retrieve latest checkpoint."""
key = f"rlm:checkpoint:{session_id}"
data = self.redis_client.get(key)
if data:
return pickle.loads(data)
return None
def clear(self, session_id: str) -> None:
"""Clear checkpoint after successful completion."""
key = f"rlm:checkpoint:{session_id}"
self.redis_client.delete(key)
Production Code Examples
Document Processing with RLM
class CODITECTDocumentProcessor:
"""
Process long documents with RLM.
Example: 500-page contracts, comprehensive reports.
"""
def __init__(self, orchestrator: RLMOrchestrator):
self.orchestrator = orchestrator
def analyze_contract(
self,
contract_text: str,
query: str
) -> Dict[str, Any]:
"""
Analyze a contract with specific query.
Args:
contract_text: Full contract text (500K+ tokens)
query: Analysis question (e.g., "Find all liability clauses")
Returns:
{
'answer': str,
'relevant_sections': List[str],
'confidence': float,
'metrics': Dict
}
"""
result = self.orchestrator.execute(
query=query,
context=contract_text,
context_type="legal contract"
)
return {
'answer': result['answer'],
'metrics': result['metrics'],
'success': result['success']
}
def batch_process_documents(
self,
documents: List[Dict[str, str]],
query_template: str
) -> List[Dict[str, Any]]:
"""
Process multiple documents in parallel.
Args:
documents: List of {'id': str, 'text': str}
query_template: Query with {doc_id} placeholder
Returns:
List of results
"""
results = []
for doc in documents:
query = query_template.format(doc_id=doc['id'])
result = self.analyze_contract(
contract_text=doc['text'],
query=query
)
results.append({
'document_id': doc['id'],
**result
})
return results
Multi-Repository Code Analysis
class CODITECTCodebaseAnalyzer:
"""
Analyze multiple codebases with RLM.
Example: Integration analysis, migration planning.
"""
def __init__(self, orchestrator: RLMOrchestrator):
self.orchestrator = orchestrator
def analyze_integration(
self,
repo_a: Dict[str, str], # {filepath: content}
repo_b: Dict[str, str],
integration_goal: str
) -> Dict[str, Any]:
"""
Analyze integration between two repositories.
Args:
repo_a: First repository files
repo_b: Second repository files
integration_goal: What to integrate
Returns:
{
'strategy': str,
'affected_files': List[str],
'implementation_steps': List[str],
'risks': List[str]
}
"""
# Combine repos into structured context
context = {
'repo_a': repo_a,
'repo_b': repo_b
}
query = f"""
Analyze integration between repo_a and repo_b.
Goal: {integration_goal}
Provide:
1. Integration strategy
2. List of files that need modification
3. Step-by-step implementation plan
4. Potential risks and mitigations
"""
result = self.orchestrator.execute(
query=query,
context=context,
context_type="multi-repository codebase"
)
# Parse structured output
# (In production, use structured outputs or JSON parsing)
return {
'strategy': result['answer'],
'metrics': result['metrics'],
'success': result['success']
}
Quality Gates
from dataclasses import dataclass
from typing import List, Tuple
@dataclass
class QualityGate:
"""Quality gates for RLM trajectories."""
# Token efficiency: tokens per tool call
max_tokens_per_call: int = 1000
# Error propagation risk
max_error_rate: float = 0.3
# Recursion depth
max_recursion_depth: int = 3
# Verification cycles
max_verification_cycles: int = 2
# Circuit breaker
max_failures_before_open: int = 3
def evaluate(
self,
metrics: Dict[str, Any]
) -> Tuple[bool, List[str]]:
"""
Evaluate RLM execution against quality gates.
Returns:
(passed, list of violations)
"""
violations = []
# Check token efficiency
if metrics['tool_calls'] > 0:
efficiency = metrics['total_tokens'] / metrics['tool_calls']
if efficiency > self.max_tokens_per_call:
violations.append(
f"Inefficient token usage: {efficiency:.0f} tokens/call "
f"(max: {self.max_tokens_per_call})"
)
# Check error rate
if metrics['tool_calls'] > 0:
error_rate = metrics['error_count'] / metrics['tool_calls']
if error_rate > self.max_error_rate:
violations.append(
f"High error rate: {error_rate:.1%} "
f"(max: {self.max_error_rate:.1%})"
)
# Check sub-LLM call count (avoid Qwen3-style excessive calls)
if metrics['sub_llm_calls'] > 100:
violations.append(
f"Excessive sub-LLM calls: {metrics['sub_llm_calls']} "
f"(consider batching)"
)
passed = len(violations) == 0
return passed, violations
Deployment Checklist
Pre-Production
- Security audit of REPL sandbox
- Load testing with 10M+ token inputs
- Circuit breaker validation
- Checkpoint/recovery testing
- Cost monitoring alerts configured
- Quality gates implemented
- Logging and observability setup
Production Rollout
- Deploy to 3 pilot customers
- Monitor 95th percentile costs
- Track quality gate violations
- Measure customer satisfaction
- Document failure patterns
- Optimize based on trajectory analysis
Monitoring Metrics
PRODUCTION_METRICS = {
'latency_p50': 'RLM response time (median)',
'latency_p95': 'RLM response time (95th percentile)',
'cost_per_query_p50': 'Median cost per query',
'cost_per_query_p95': '95th percentile cost',
'success_rate': 'Percentage of successful completions',
'quality_gate_pass_rate': 'Percentage passing quality gates',
'circuit_breaker_trips': 'Number of circuit breaker activations',
'checkpoint_recoveries': 'Number of checkpoint recoveries'
}
Performance Optimization Tips
1. Model Selection Strategy
MODEL_ROUTING = {
'strategic_planning': 'claude-sonnet-4', # Most capable
'routine_processing': 'claude-haiku-4', # Cost-efficient
'code_analysis': 'qwen3-coder-480b', # Specialized
}
2. Async Sub-Calls (3x Speedup)
import asyncio
async def parallel_sub_calls(queries: List[str]) -> List[str]:
"""Execute multiple sub-LLM calls in parallel."""
tasks = [
sub_llm.query(query)
for query in queries
]
results = await asyncio.gather(*tasks)
return [r['response'] for r in results]
3. Intelligent Chunking
def smart_chunk(text: str, target_size: int = 50_000) -> List[str]:
"""
Chunk text at semantic boundaries.
Better than: text[:50000], text[50000:100000], ...
"""
# Try to split at paragraph boundaries
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_size = 0
for para in paragraphs:
para_size = len(para)
if current_size + para_size > target_size and current_chunk:
# Flush current chunk
chunks.append('\n\n'.join(current_chunk))
current_chunk = []
current_size = 0
current_chunk.append(para)
current_size += para_size
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
Next Steps
- Review architecture with engineering team
- Set up development environment (Redis, sandboxed Python)
- Implement core components (REPL, orchestrator)
- Run benchmarks on 20 customer documents
- Iterate based on quality gate violations
Document Version: 1.0
Last Updated: January 13, 2026
For Questions: Contact Engineering Lead