Skip to main content

scripts-consensus

""" Consensus Calculator for MoE Classification System.

Implements the consensus algorithm specified in ADR-019:

  • Weighted voting based on analyst confidence
  • Agreement ratio calculation
  • Threshold-based auto-approval
  • Judge aggregation for final decision
  • Debate protocol for judge disagreement (H.3.3) """

from typing import List, Dict, Optional, Tuple, Callable, TYPE_CHECKING from dataclasses import dataclass

from .models import AnalystVote, JudgeDecision, ConsensusResult, ApprovalType

if TYPE_CHECKING: from .debate import DebateOrchestrator, JudgeEvaluation, DebateResult

@dataclass class ConsensusConfig: """Configuration for consensus calculation.""" # Auto-approval thresholds auto_approve_confidence: float = 0.85 auto_approve_agreement: float = 0.80

# Minimum thresholds for any approval
min_confidence: float = 0.60
min_agreement: float = 0.60

# Escalation thresholds
escalation_confidence: float = 0.50
max_disagreement: int = 3 # Max different classifications before escalation

# Judge requirements
min_judges_required: int = 2
judge_approval_threshold: float = 0.67 # 2/3 judges must approve

# Debate protocol settings (H.3.3)
enable_debate: bool = True
debate_agreement_threshold: float = 0.80 # Trigger debate below this
max_debate_rounds: int = 3

class ConsensusCalculator: """Calculates consensus from analyst votes and judge decisions."""

def __init__(self, config: Optional[ConsensusConfig] = None):
self.config = config or ConsensusConfig()
self._debate_orchestrator: Optional['DebateOrchestrator'] = None
self._evaluation_callback: Optional[Callable] = None

def enable_debate_protocol(
self,
orchestrator: Optional['DebateOrchestrator'] = None,
evaluation_callback: Optional[Callable] = None
) -> None:
"""
Enable debate protocol for judge disagreement resolution.

Args:
orchestrator: Custom DebateOrchestrator instance (creates default if None)
evaluation_callback: Callback for judge re-evaluations during debate
Signature: callback(persona_id, artifact, debate_context, context) -> JudgeEvaluation
"""
if orchestrator is None:
from .debate import DebateOrchestrator, DebateConfig
orchestrator = DebateOrchestrator(DebateConfig(
max_debate_rounds=self.config.max_debate_rounds,
convergence_threshold=self.config.debate_agreement_threshold
))

self._debate_orchestrator = orchestrator
self._evaluation_callback = evaluation_callback

if evaluation_callback:
orchestrator.set_evaluation_callback(evaluation_callback)

def calculate_from_votes(self, votes: List[AnalystVote]) -> ConsensusResult:
"""Calculate initial consensus from analyst votes only.

Args:
votes: List of analyst votes

Returns:
ConsensusResult with preliminary classification
"""
if not votes:
return ConsensusResult(
classification=None,
confidence=0.0,
agreement_ratio=0.0,
approval_type=ApprovalType.ESCALATED,
votes=votes,
escalation_reason="No analyst votes received"
)

# Calculate weighted classification scores
scores = self._calculate_weighted_scores(votes)

if not scores:
return ConsensusResult(
classification=None,
confidence=0.0,
agreement_ratio=0.0,
approval_type=ApprovalType.ESCALATED,
votes=votes,
escalation_reason="No valid classifications in votes"
)

# Get top classification
classification, weighted_score = max(scores.items(), key=lambda x: x[1])

# Calculate agreement ratio
agreeing_votes = [v for v in votes if v.classification == classification]
agreement_ratio = len(agreeing_votes) / len(votes)

# Calculate confidence (average of agreeing votes weighted by agreement)
avg_confidence = sum(v.confidence for v in agreeing_votes) / len(agreeing_votes)
confidence = avg_confidence * agreement_ratio

# Check for auto-approval
if (confidence >= self.config.auto_approve_confidence and
agreement_ratio >= self.config.auto_approve_agreement):
return ConsensusResult(
classification=classification,
confidence=confidence,
agreement_ratio=agreement_ratio,
approval_type=ApprovalType.AUTO_APPROVED,
votes=votes
)

# Check for escalation conditions
escalation_reason = self._check_escalation(votes, confidence, agreement_ratio, scores)
if escalation_reason:
return ConsensusResult(
classification=classification,
confidence=confidence,
agreement_ratio=agreement_ratio,
approval_type=ApprovalType.ESCALATED,
votes=votes,
escalation_reason=escalation_reason
)

# Needs judge review
return ConsensusResult(
classification=classification,
confidence=confidence,
agreement_ratio=agreement_ratio,
approval_type=ApprovalType.PENDING,
votes=votes
)

def apply_judge_decisions(
self,
result: ConsensusResult,
decisions: List[JudgeDecision]
) -> ConsensusResult:
"""Apply judge decisions to finalize consensus.

Args:
result: Preliminary consensus result
decisions: List of judge decisions

Returns:
Updated ConsensusResult with final approval status
"""
if result.approval_type == ApprovalType.AUTO_APPROVED:
# Already auto-approved, just add judge decisions for audit
result.judge_decisions = decisions
return result

if not decisions:
result.approval_type = ApprovalType.ESCALATED
result.escalation_reason = "No judge decisions received"
return result

# Count approvals
approvals = sum(1 for d in decisions if d.approved)
approval_ratio = approvals / len(decisions)

result.judge_decisions = decisions

# Check if enough judges approved
if approval_ratio >= self.config.judge_approval_threshold:
result.approval_type = ApprovalType.JUDGE_APPROVED
return result

# Check for veto (any judge with veto authority rejected)
vetoes = [d for d in decisions if not d.approved and d.metadata.get('has_veto', True)]
if vetoes:
result.approval_type = ApprovalType.ESCALATED
result.escalation_reason = f"Vetoed by {vetoes[0].judge}: {vetoes[0].reason}"
return result

# Not enough approvals
result.approval_type = ApprovalType.ESCALATED
rejections = [d.reason for d in decisions if not d.approved]
result.escalation_reason = f"Insufficient judge approval ({approvals}/{len(decisions)}): {rejections[0] if rejections else 'Unknown'}"
return result

def _calculate_weighted_scores(self, votes: List[AnalystVote]) -> Dict[str, float]:
"""Calculate weighted scores for each classification."""
scores: Dict[str, float] = {}

for vote in votes:
cls = vote.classification
# Weight by confidence
scores[cls] = scores.get(cls, 0.0) + vote.confidence

return scores

def _check_escalation(
self,
votes: List[AnalystVote],
confidence: float,
agreement_ratio: float,
scores: Dict[str, float]
) -> Optional[str]:
"""Check if result should be escalated for human review."""

# Too many different classifications
if len(scores) > self.config.max_disagreement:
return f"Too many different classifications ({len(scores)})"

# Confidence too low
if confidence < self.config.escalation_confidence:
return f"Confidence too low ({confidence:.0%})"

# Agreement too low
if agreement_ratio < self.config.min_agreement:
return f"Agreement too low ({agreement_ratio:.0%})"

# Check for strong dissent
sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True)
if len(sorted_scores) >= 2:
top_score = sorted_scores[0][1]
second_score = sorted_scores[1][1]
# If second place is close to first, escalate
if second_score >= top_score * 0.8:
return f"Close competition between '{sorted_scores[0][0]}' and '{sorted_scores[1][0]}'"

return None

def get_detailed_breakdown(self, votes: List[AnalystVote]) -> Dict:
"""Get detailed breakdown of vote distribution."""
if not votes:
return {'error': 'No votes'}

scores = self._calculate_weighted_scores(votes)
total_weight = sum(scores.values())

breakdown = {
'total_votes': len(votes),
'unique_classifications': len(scores),
'distribution': {},
'by_analyst': {}
}

# Normalized distribution
for cls, score in scores.items():
vote_count = sum(1 for v in votes if v.classification == cls)
breakdown['distribution'][cls] = {
'weighted_score': round(score, 3),
'normalized': round(score / total_weight, 3) if total_weight > 0 else 0,
'vote_count': vote_count,
'avg_confidence': round(
sum(v.confidence for v in votes if v.classification == cls) / vote_count, 3
)
}

# By analyst
for vote in votes:
breakdown['by_analyst'][vote.agent] = {
'classification': vote.classification,
'confidence': vote.confidence,
'reasoning': vote.reasoning[:100] + '...' if len(vote.reasoning) > 100 else vote.reasoning
}

return breakdown

# ========== Debate Protocol Integration (H.3.3.5) ==========

def convert_decisions_to_evaluations(
self,
decisions: List[JudgeDecision]
) -> List['JudgeEvaluation']:
"""
Convert JudgeDecision objects to JudgeEvaluation for debate protocol.

Args:
decisions: List of judge decisions

Returns:
List of JudgeEvaluation objects for debate
"""
from .debate import JudgeEvaluation, Verdict

evaluations = []
for d in decisions:
# Map approved boolean to verdict
if d.approved:
verdict = Verdict.PASS
else:
verdict = Verdict.FAIL

# Extract dimension scores from metadata if present
dimension_scores = d.metadata.get('dimension_scores', {})

# Extract critical findings from reason
critical_findings = d.metadata.get('critical_findings', [])
if not critical_findings and not d.approved:
critical_findings = [d.reason]

evaluations.append(JudgeEvaluation(
persona_id=d.judge,
model_used=d.metadata.get('model_used', 'unknown'),
verdict=verdict,
confidence=d.confidence,
dimension_scores=dimension_scores,
critical_findings=critical_findings,
remediation_required=d.metadata.get('remediation_required', []),
rationale=d.reason,
raw_response=d.metadata.get('raw_response', ''),
token_usage=d.metadata.get('token_usage', 0)
))

return evaluations

def convert_evaluations_to_decisions(
self,
evaluations: List['JudgeEvaluation']
) -> List[JudgeDecision]:
"""
Convert JudgeEvaluation objects back to JudgeDecision after debate.

Args:
evaluations: List of judge evaluations from debate

Returns:
List of JudgeDecision objects
"""
from .debate import Verdict

decisions = []
for e in evaluations:
# Map verdict to approved boolean
approved = e.verdict == Verdict.PASS

decisions.append(JudgeDecision(
judge=e.persona_id,
approved=approved,
reason=e.rationale,
confidence=e.confidence,
duration_ms=0, # Not tracked during debate
metadata={
'model_used': e.model_used,
'dimension_scores': e.dimension_scores,
'critical_findings': e.critical_findings,
'remediation_required': e.remediation_required,
'raw_response': e.raw_response,
'token_usage': e.token_usage,
'debate_round': e.debate_round,
'verdict': e.verdict.value
}
))

return decisions

async def apply_judge_decisions_with_debate(
self,
result: ConsensusResult,
decisions: List[JudgeDecision],
artifact: str,
context: Optional[Dict] = None
) -> ConsensusResult:
"""
Apply judge decisions with debate protocol for disagreement resolution.

This method extends apply_judge_decisions by:
1. Converting decisions to evaluations
2. Checking if debate is needed (agreement below threshold)
3. Running debate protocol if needed
4. Converting back to decisions and applying

Args:
result: Preliminary consensus result
decisions: List of judge decisions
artifact: The artifact being evaluated (for debate context)
context: Additional context for evaluation

Returns:
Updated ConsensusResult with debate results if applicable
"""
from .debate import requires_debate

# If debate is disabled or already auto-approved, use standard method
if not self.config.enable_debate or result.approval_type == ApprovalType.AUTO_APPROVED:
return self.apply_judge_decisions(result, decisions)

if not decisions or len(decisions) < 2:
return self.apply_judge_decisions(result, decisions)

# Convert to evaluations for debate analysis
evaluations = self.convert_decisions_to_evaluations(decisions)

# Check if debate is needed
if not requires_debate(evaluations, self.config.debate_agreement_threshold):
return self.apply_judge_decisions(result, decisions)

# Run debate protocol if orchestrator is configured
if self._debate_orchestrator:
debate_result = await self._debate_orchestrator.orchestrate_debate(
evaluations,
artifact,
context or {}
)

# Convert back to decisions
updated_decisions = self.convert_evaluations_to_decisions(
debate_result.final_evaluations
)

# Store debate metadata in result
result.metadata = result.metadata if hasattr(result, 'metadata') else {}
result.metadata['debate'] = self._debate_orchestrator.get_debate_summary(debate_result)

decisions = updated_decisions

# Apply the (potentially updated) decisions
return self.apply_judge_decisions(result, decisions)

def check_debate_needed(self, decisions: List[JudgeDecision]) -> bool:
"""
Check if debate is needed for the given judge decisions.

Args:
decisions: List of judge decisions

Returns:
True if debate is recommended
"""
from .debate import requires_debate

if len(decisions) < 2:
return False

evaluations = self.convert_decisions_to_evaluations(decisions)
return requires_debate(evaluations, self.config.debate_agreement_threshold)