MoE Consensus Algorithm Specification
Overview
The consensus algorithm aggregates votes from 5 analyst agents to determine document classification with confidence scoring and optional judge validation.
Constants
# Threshold Configuration
ANALYST_COUNT = 5
JUDGE_COUNT = 3
# Confidence Thresholds
MIN_ANALYST_CONFIDENCE = 0.70 # Reject votes below this
AGREEMENT_THRESHOLD = 0.60 # Minimum 3/5 analysts must agree
AUTO_APPROVAL_CONFIDENCE = 0.90 # Skip judges if above
JUDGE_APPROVAL_CONFIDENCE = 0.85 # Accept with judge approval
ESCALATION_CONFIDENCE = 0.85 # Below this requires human review
# Timeout Configuration
ANALYST_TIMEOUT_MS = 30000 # 30 seconds per analyst
JUDGE_TIMEOUT_MS = 10000 # 10 seconds per judge
TOTAL_TIMEOUT_MS = 60000 # 1 minute total
# Valid Classification Types (ADR-018)
VALID_TYPES = {
"agent", "command", "skill", "script", "hook",
"guide", "reference", "adr", "workflow", "config"
}
Data Structures
@dataclass
class AnalystVote:
agent: str # "structural", "content", etc.
classification: str # Document type
confidence: float # 0.0 - 1.0
reasoning: str # Human-readable explanation
duration_ms: int # Processing time
@dataclass
class JudgeDecision:
judge: str # "consistency", "quality", "domain"
approved: bool # True = approve, False = veto
reason: str # Explanation
duration_ms: int
@dataclass
class ConsensusResult:
classification: str # Final document type
confidence: float # Weighted confidence
agreement_ratio: float # Proportion of agreeing analysts
approval_type: str # "AUTO", "JUDGE", "ESCALATED"
votes: List[AnalystVote] # All analyst votes
judge_decisions: List[JudgeDecision] # If judges were invoked
escalation_reason: Optional[str]
@dataclass
class ClassificationResult:
document_path: str
result: ConsensusResult
timestamp: datetime
processing_time_ms: int
Algorithm Pseudocode
Main Classification Flow
def classify_document(document: Document) -> ClassificationResult:
"""
Main entry point for document classification.
"""
start_time = now()
# Step 1: Dispatch to all analysts in parallel
votes = dispatch_analysts(document)
# Step 2: Filter invalid votes
valid_votes = filter_valid_votes(votes)
# Step 3: Calculate consensus
consensus = calculate_consensus(valid_votes)
# Step 4: Apply decision logic
result = apply_decision_logic(document, consensus, valid_votes)
# Step 5: Log audit trail
log_audit_trail(document, result)
return ClassificationResult(
document_path=document.path,
result=result,
timestamp=now(),
processing_time_ms=now() - start_time
)
Step 1: Dispatch Analysts
async def dispatch_analysts(document: Document) -> List[AnalystVote]:
"""
Dispatch document to all 5 analysts in parallel.
"""
tasks = [
analyze_structural(document),
analyze_content(document),
analyze_metadata(document),
analyze_semantic(document),
analyze_pattern(document)
]
# Wait for all with timeout
results = await asyncio.gather(
*tasks,
return_exceptions=True
)
votes = []
for i, result in enumerate(results):
if isinstance(result, Exception):
# Log error, create null vote
log_error(f"Analyst {i} failed: {result}")
votes.append(create_null_vote(ANALYST_NAMES[i]))
else:
votes.append(result)
return votes
Step 2: Filter Invalid Votes
def filter_valid_votes(votes: List[AnalystVote]) -> List[AnalystVote]:
"""
Remove votes below minimum confidence threshold.
"""
valid = []
for vote in votes:
if vote.confidence >= MIN_ANALYST_CONFIDENCE:
if vote.classification in VALID_TYPES:
valid.append(vote)
else:
log_warning(f"Invalid type '{vote.classification}' from {vote.agent}")
else:
log_info(f"Low confidence vote ({vote.confidence}) from {vote.agent}")
return valid
Step 3: Calculate Consensus
def calculate_consensus(votes: List[AnalystVote]) -> ConsensusResult:
"""
Calculate weighted consensus from analyst votes.
"""
if not votes:
return ConsensusResult(
classification=None,
confidence=0.0,
agreement_ratio=0.0,
approval_type="ESCALATED",
escalation_reason="NO_VALID_VOTES"
)
# Count votes per classification
vote_counts = Counter(v.classification for v in votes)
# Find majority classification
majority_type, majority_count = vote_counts.most_common(1)[0]
# Calculate agreement ratio
agreement_ratio = majority_count / ANALYST_COUNT
# Get votes for majority classification
majority_votes = [v for v in votes if v.classification == majority_type]
# Calculate weighted confidence (average of agreeing analysts)
weighted_confidence = sum(v.confidence for v in majority_votes) / len(majority_votes)
return ConsensusResult(
classification=majority_type,
confidence=weighted_confidence,
agreement_ratio=agreement_ratio,
votes=votes
)
Step 4: Apply Decision Logic
def apply_decision_logic(
document: Document,
consensus: ConsensusResult,
votes: List[AnalystVote]
) -> ConsensusResult:
"""
Apply thresholds and determine final decision.
"""
# Check 1: Sufficient agreement?
if consensus.agreement_ratio < AGREEMENT_THRESHOLD:
return escalate_to_human(
consensus,
reason="NO_CONSENSUS",
detail=f"Only {consensus.agreement_ratio*100:.0f}% agreement (need {AGREEMENT_THRESHOLD*100:.0f}%)"
)
# Check 2: Auto-approval threshold?
if consensus.confidence >= AUTO_APPROVAL_CONFIDENCE:
consensus.approval_type = "AUTO_APPROVED"
return consensus
# Check 3: Judge approval threshold?
if consensus.confidence >= JUDGE_APPROVAL_CONFIDENCE:
# Invoke judges
judge_result = invoke_judges(document, consensus)
if judge_result.all_approved:
consensus.approval_type = "JUDGE_APPROVED"
consensus.judge_decisions = judge_result.decisions
return consensus
else:
return escalate_to_human(
consensus,
reason="JUDGE_VETO",
detail=judge_result.veto_reasons
)
# Check 4: Below minimum threshold
return escalate_to_human(
consensus,
reason="LOW_CONFIDENCE",
detail=f"Confidence {consensus.confidence*100:.0f}% below {JUDGE_APPROVAL_CONFIDENCE*100:.0f}% threshold"
)
Step 5: Invoke Judges
async def invoke_judges(
document: Document,
consensus: ConsensusResult
) -> JudgeResult:
"""
Invoke all 3 judges for validation.
Any veto = escalation.
"""
tasks = [
judge_consistency(document, consensus),
judge_quality(consensus),
judge_domain(document, consensus)
]
decisions = await asyncio.gather(*tasks, return_exceptions=True)
vetos = []
valid_decisions = []
for decision in decisions:
if isinstance(decision, Exception):
# Judge error = automatic veto
vetos.append(("error", str(decision)))
elif not decision.approved:
vetos.append((decision.judge, decision.reason))
valid_decisions.append(decision)
else:
valid_decisions.append(decision)
return JudgeResult(
all_approved=len(vetos) == 0,
decisions=valid_decisions,
veto_reasons=vetos
)
Individual Judge Implementations
def judge_consistency(document: Document, consensus: ConsensusResult) -> JudgeDecision:
"""
Consistency Judge: Check cross-references and related documents.
"""
start = now()
# Get related documents (same directory, linked docs)
related_docs = get_related_documents(document)
for related in related_docs:
related_type = get_document_type(related)
# Check for incompatible types
if not types_compatible(consensus.classification, related_type):
return JudgeDecision(
judge="consistency",
approved=False,
reason=f"Incompatible with related doc {related.path} (type: {related_type})",
duration_ms=now() - start
)
return JudgeDecision(
judge="consistency",
approved=True,
reason="Consistent with related documents",
duration_ms=now() - start
)
def judge_quality(consensus: ConsensusResult) -> JudgeDecision:
"""
Quality Judge: Validate vote distribution and confidence scores.
"""
start = now()
# Check for outlier driving decision
confidences = [v.confidence for v in consensus.votes if v.classification == consensus.classification]
if len(confidences) >= 2:
max_conf = max(confidences)
others_avg = sum(c for c in confidences if c != max_conf) / (len(confidences) - 1)
# Outlier detection: one vote much higher than others
if max_conf > 0.95 and others_avg < 0.75:
return JudgeDecision(
judge="quality",
approved=False,
reason=f"Single outlier ({max_conf}) driving decision (others avg: {others_avg:.2f})",
duration_ms=now() - start
)
# Check confidence spread
if confidences:
spread = max(confidences) - min(confidences)
if spread > 0.30:
# High spread but still approve with warning
pass
return JudgeDecision(
judge="quality",
approved=True,
reason="Vote quality acceptable",
duration_ms=now() - start
)
def judge_domain(document: Document, consensus: ConsensusResult) -> JudgeDecision:
"""
Domain Judge: Enforce CODITECT-specific rules.
"""
start = now()
# Rule 1: Agents must be in agents/ directory
if consensus.classification == "agent":
if "/agents/" not in document.path:
return JudgeDecision(
judge="domain",
approved=False,
reason="Agent classification but not in /agents/ directory",
duration_ms=now() - start
)
# Rule 2: Commands must have specific sections
if consensus.classification == "command":
required_sections = ["## Invocation", "## Arguments"]
if not all(section in document.content for section in required_sections):
return JudgeDecision(
judge="domain",
approved=False,
reason="Command missing required sections",
duration_ms=now() - start
)
# Rule 3: ADRs must follow ADR template
if consensus.classification == "adr":
required_sections = ["## Status", "## Context", "## Decision"]
if not all(section in document.content for section in required_sections):
return JudgeDecision(
judge="domain",
approved=False,
reason="ADR missing required sections (Status/Context/Decision)",
duration_ms=now() - start
)
return JudgeDecision(
judge="domain",
approved=True,
reason="CODITECT domain rules satisfied",
duration_ms=now() - start
)
Escalation Handler
def escalate_to_human(
consensus: ConsensusResult,
reason: str,
detail: str
) -> ConsensusResult:
"""
Mark classification for human review.
"""
consensus.approval_type = "ESCALATED"
consensus.escalation_reason = f"{reason}: {detail}"
# Create escalation ticket
create_escalation_ticket(
classification=consensus.classification,
confidence=consensus.confidence,
reason=reason,
detail=detail,
votes=consensus.votes
)
return consensus
Edge Cases
Case 1: All Analysts Timeout
# All 5 analysts return null votes
votes = [null_vote] * 5
# → Escalate with reason "NO_VALID_VOTES"
Case 2: Perfect Split (2-2-1)
votes = ["agent", "agent", "command", "command", "guide"]
# Agreement ratio = 2/5 = 0.40 < 0.60
# → Escalate with reason "NO_CONSENSUS"
Case 3: High Confidence Minority
votes = [
("agent", 0.95), # High confidence minority
("command", 0.72), # Lower confidence majority
("command", 0.75),
("command", 0.71),
("command", 0.73)
]
# Majority: "command" (4/5 = 0.80 agreement)
# Weighted confidence: (0.72+0.75+0.71+0.73)/4 = 0.7275
# → Below JUDGE_APPROVAL (0.85) → Escalate
Case 4: Semantic Analyst Disagrees
votes = [
("structural", "agent", 0.85),
("content", "agent", 0.90),
("metadata", "agent", 0.98),
("semantic", "command", 0.88), # LLM disagrees
("pattern", "agent", 0.91)
]
# Majority: "agent" (4/5 = 0.80)
# Weighted confidence: 0.91
# → AUTO_APPROVED (≥0.90)
Case 5: Frontmatter Has Explicit Type
# metadata analyst: confidence 0.99 because type: "reference" in YAML
votes = [
("structural", "guide", 0.70),
("content", "guide", 0.75),
("metadata", "reference", 0.99), # Explicit frontmatter
("semantic", "reference", 0.85),
("pattern", "guide", 0.72)
]
# Split: guide=3, reference=2
# But frontmatter is authoritative → Domain judge may override
Performance Characteristics
| Metric | Target | Notes |
|---|---|---|
| Parallel Analysis | <3s | 5 analysts run concurrently |
| Consensus Calculation | <10ms | In-memory computation |
| Judge Validation | <1s | Only invoked for 85-90% confidence |
| Total Pipeline | <5s | End-to-end classification |
| Throughput | 100/min | Batch processing mode |
Testing Strategy
Unit Tests
def test_consensus_unanimous():
votes = [("type_a", 0.95)] * 5
result = calculate_consensus(votes)
assert result.agreement_ratio == 1.0
assert result.confidence == 0.95
def test_consensus_majority():
votes = [("type_a", 0.90)] * 3 + [("type_b", 0.85)] * 2
result = calculate_consensus(votes)
assert result.classification == "type_a"
assert result.agreement_ratio == 0.6
def test_escalation_no_consensus():
votes = [("a", 0.8), ("b", 0.8), ("c", 0.8), ("d", 0.8), ("e", 0.8)]
result = apply_decision_logic(doc, calculate_consensus(votes), votes)
assert result.approval_type == "ESCALATED"
Integration Tests
- Test with 100 manually-labeled documents
- Measure accuracy vs ground truth
- Target: 99.9%+ accuracy
Specification Version: 1.0.0 Created: December 27, 2025 Author: AI Specialist Agent