scripts-consistency
""" Consistency Judge Agent
Validates cross-analyst agreement and ensures classification consistency. Checks for:
- Minimum agreement threshold across analysts
- Confidence distribution consistency
- No conflicting strong opinions """
from typing import List, Dict, Set import sys from pathlib import Path
sys.path.insert(0, str(Path(file).parent.parent))
from core.models import Document, AnalystVote, JudgeDecision from judges.base import BaseJudge
class ConsistencyJudge(BaseJudge): """Judge that validates consistency across analyst votes.
Ensures analysts reach sufficient agreement and that there
are no irreconcilable conflicts in classifications.
"""
name = "consistency"
description = "Validates cross-analyst agreement and classification consistency"
has_veto_authority = True
weight = 1.0
# Thresholds
MIN_AGREEMENT_RATIO = 0.60 # At least 60% of analysts must agree
MIN_CONFIDENCE_FOR_DISSENT = 0.80 # High-confidence dissent triggers review
MAX_CLASSIFICATION_SPREAD = 3 # Max different classifications allowed
UNANIMOUS_BONUS = 0.10 # Confidence boost for unanimous decisions
def evaluate(
self,
document: Document,
votes: List[AnalystVote]
) -> JudgeDecision:
"""Evaluate consistency of analyst votes."""
if not votes:
return self._create_decision(
approved=False,
reason="No analyst votes to evaluate",
confidence=1.0,
metadata={'error': 'no_votes'}
)
# Calculate agreement metrics
consensus = self._get_consensus_classification(votes)
agreement_ratio = self._get_agreement_ratio(votes)
classifications = self._get_classification_distribution(votes)
issues = []
warnings = []
# Check 1: Minimum agreement threshold
if agreement_ratio < self.MIN_AGREEMENT_RATIO:
issues.append(
f"Agreement ratio {agreement_ratio:.1%} below threshold {self.MIN_AGREEMENT_RATIO:.0%}"
)
# Check 2: Too many different classifications
if len(classifications) > self.MAX_CLASSIFICATION_SPREAD:
issues.append(
f"Too many classifications ({len(classifications)}) - max allowed is {self.MAX_CLASSIFICATION_SPREAD}"
)
# Check 3: High-confidence dissent
dissenting_votes = self._get_high_confidence_dissent(votes, consensus)
if dissenting_votes:
for vote in dissenting_votes:
warnings.append(
f"{vote.agent} strongly disagrees ({vote.confidence:.0%} confidence for '{vote.classification}')"
)
if len(dissenting_votes) >= 2:
issues.append(f"Multiple high-confidence dissenters ({len(dissenting_votes)})")
# Check 4: Bimodal distribution (two strong camps)
bimodal = self._check_bimodal_distribution(votes)
if bimodal:
issues.append(f"Bimodal distribution detected: '{bimodal[0]}' vs '{bimodal[1]}'")
# Determine approval
if issues:
return self._create_decision(
approved=False,
reason=f"Consistency check failed: {'; '.join(issues)}",
confidence=1.0 - (len(issues) * 0.2),
metadata={
'issues': issues,
'warnings': warnings,
'agreement_ratio': agreement_ratio,
'classification_count': len(classifications),
'consensus': consensus,
'distribution': {k: round(v, 3) for k, v in classifications.items()}
}
)
# Approved with possible warnings
confidence = min(1.0, agreement_ratio + (self.UNANIMOUS_BONUS if agreement_ratio == 1.0 else 0))
reason_parts = [f"Consistent: {agreement_ratio:.0%} agreement on '{consensus}'"]
if warnings:
reason_parts.append(f"Warnings: {len(warnings)}")
return self._create_decision(
approved=True,
reason="; ".join(reason_parts),
confidence=confidence,
metadata={
'agreement_ratio': agreement_ratio,
'consensus': consensus,
'warnings': warnings,
'is_unanimous': agreement_ratio == 1.0,
'distribution': {k: round(v, 3) for k, v in classifications.items()}
}
)
def _get_classification_distribution(self, votes: List[AnalystVote]) -> Dict[str, float]:
"""Get weighted distribution of classifications."""
distribution: Dict[str, float] = {}
total_weight = 0.0
for vote in votes:
cls = vote.classification
weight = vote.confidence
distribution[cls] = distribution.get(cls, 0.0) + weight
total_weight += weight
# Normalize
if total_weight > 0:
distribution = {k: v / total_weight for k, v in distribution.items()}
return distribution
def _get_high_confidence_dissent(
self,
votes: List[AnalystVote],
consensus: str
) -> List[AnalystVote]:
"""Find votes that strongly disagree with consensus."""
return [
v for v in votes
if v.classification != consensus
and v.confidence >= self.MIN_CONFIDENCE_FOR_DISSENT
]
def _check_bimodal_distribution(self, votes: List[AnalystVote]) -> tuple:
"""Check if there are two strong competing classifications.
Returns tuple of (class1, class2) if bimodal, else None.
"""
distribution = self._get_classification_distribution(votes)
# Sort by weight descending
sorted_classes = sorted(distribution.items(), key=lambda x: x[1], reverse=True)
if len(sorted_classes) < 2:
return None
first, second = sorted_classes[0], sorted_classes[1]
# Bimodal if top two are close and both significant
if second[1] >= 0.30 and (first[1] - second[1]) < 0.20:
return (first[0], second[0])
return None