Skip to main content

scripts-quality

""" Quality Judge Agent

Validates the quality of analyst votes and classifications. Checks for:

  • Minimum confidence thresholds
  • Reasoning quality and specificity
  • Vote completeness
  • Suspicious patterns (all same score, no reasoning, etc.) """

from typing import List, Dict, Set import re import sys from pathlib import Path

sys.path.insert(0, str(Path(file).parent.parent))

from core.models import Document, AnalystVote, JudgeDecision from judges.base import BaseJudge

class QualityJudge(BaseJudge): """Judge that validates the quality of analyst votes.

Ensures votes meet quality standards including confidence
levels, reasoning quality, and detection of suspicious patterns.
"""

name = "quality"
description = "Validates vote quality, confidence thresholds, and reasoning"
has_veto_authority = True
weight = 1.0

# Thresholds
MIN_WEIGHTED_CONFIDENCE = 0.55 # Minimum weighted confidence for consensus
MIN_INDIVIDUAL_CONFIDENCE = 0.40 # Flag votes below this
MIN_REASONING_LENGTH = 10 # Minimum characters for reasoning
MAX_EXECUTION_TIME_MS = 5000 # Flag slow analysts

# Suspicious patterns
SUSPICIOUS_CONFIDENCE_VALUES = {0.0, 0.5, 1.0} # Exact values that might indicate lazy defaults
MIN_VOTES_REQUIRED = 3 # Minimum analysts that must vote

def evaluate(
self,
document: Document,
votes: List[AnalystVote]
) -> JudgeDecision:
"""Evaluate quality of analyst votes."""

if not votes:
return self._create_decision(
approved=False,
reason="No analyst votes to evaluate",
confidence=1.0,
metadata={'error': 'no_votes'}
)

issues = []
warnings = []
quality_scores = []

# Check 1: Minimum votes
if len(votes) < self.MIN_VOTES_REQUIRED:
issues.append(
f"Insufficient votes: {len(votes)} < {self.MIN_VOTES_REQUIRED} required"
)

# Check 2: Overall confidence threshold
weighted_confidence = self._get_weighted_confidence(votes)
if weighted_confidence < self.MIN_WEIGHTED_CONFIDENCE:
issues.append(
f"Weighted confidence {weighted_confidence:.1%} below threshold {self.MIN_WEIGHTED_CONFIDENCE:.0%}"
)

# Check 3: Individual vote quality
for vote in votes:
vote_quality = self._assess_vote_quality(vote)
quality_scores.append(vote_quality)

if vote_quality['score'] < 0.50:
warnings.append(
f"{vote.agent}: low quality vote ({vote_quality['score']:.0%})"
)

for issue in vote_quality.get('issues', []):
warnings.append(f"{vote.agent}: {issue}")

# Check 4: Suspicious patterns
suspicious = self._detect_suspicious_patterns(votes)
if suspicious:
for pattern in suspicious:
warnings.append(f"Suspicious pattern: {pattern}")
if len(suspicious) >= 2:
issues.append("Multiple suspicious voting patterns detected")

# Check 5: Reasoning quality
reasoning_quality = self._assess_reasoning_quality(votes)
if reasoning_quality < 0.50:
warnings.append(f"Overall reasoning quality is low ({reasoning_quality:.0%})")

# Calculate overall quality score
avg_vote_quality = sum(q['score'] for q in quality_scores) / len(quality_scores) if quality_scores else 0
overall_quality = (
avg_vote_quality * 0.40 +
weighted_confidence * 0.30 +
reasoning_quality * 0.20 +
(1.0 if not issues else 0.5) * 0.10
)

# Determine approval
if issues:
return self._create_decision(
approved=False,
reason=f"Quality check failed: {'; '.join(issues)}",
confidence=overall_quality,
metadata={
'issues': issues,
'warnings': warnings,
'weighted_confidence': round(weighted_confidence, 3),
'reasoning_quality': round(reasoning_quality, 3),
'overall_quality': round(overall_quality, 3),
'vote_count': len(votes)
}
)

return self._create_decision(
approved=True,
reason=f"Quality approved: {overall_quality:.0%} overall quality",
confidence=overall_quality,
metadata={
'warnings': warnings,
'weighted_confidence': round(weighted_confidence, 3),
'reasoning_quality': round(reasoning_quality, 3),
'overall_quality': round(overall_quality, 3),
'vote_quality_scores': [
{'agent': votes[i].agent, 'score': round(q['score'], 3)}
for i, q in enumerate(quality_scores)
]
}
)

def _assess_vote_quality(self, vote: AnalystVote) -> Dict:
"""Assess quality of an individual vote."""
score = 1.0
issues = []

# Check confidence
if vote.confidence < self.MIN_INDIVIDUAL_CONFIDENCE:
score -= 0.20
issues.append(f"low confidence ({vote.confidence:.0%})")

# Check for suspicious exact values
if vote.confidence in self.SUSPICIOUS_CONFIDENCE_VALUES:
score -= 0.10
issues.append(f"suspicious confidence value ({vote.confidence})")

# Check reasoning
if not vote.reasoning or len(vote.reasoning.strip()) < self.MIN_REASONING_LENGTH:
score -= 0.25
issues.append("insufficient reasoning")
elif 'default' in vote.reasoning.lower():
score -= 0.15
issues.append("reasoning mentions 'default'")

# Check execution time (if available)
if vote.duration_ms > self.MAX_EXECUTION_TIME_MS:
score -= 0.10
issues.append(f"slow execution ({vote.duration_ms}ms)")

return {
'score': max(0.0, score),
'issues': issues
}

def _detect_suspicious_patterns(self, votes: List[AnalystVote]) -> List[str]:
"""Detect suspicious voting patterns that might indicate issues."""
patterns = []

# All same confidence
confidences = [v.confidence for v in votes]
if len(set(confidences)) == 1 and len(votes) > 2:
patterns.append(f"all analysts have identical confidence ({confidences[0]:.2f})")

# All same classification with very different confidences
classifications = [v.classification for v in votes]
if len(set(classifications)) == 1:
conf_range = max(confidences) - min(confidences)
if conf_range > 0.50:
patterns.append(f"unanimous but confidence spread is {conf_range:.0%}")

# Check for copy-paste reasoning
reasonings = [v.reasoning for v in votes]
unique_reasonings = set(reasonings)
if len(unique_reasonings) < len(votes) * 0.7:
patterns.append("duplicate reasoning detected across votes")

return patterns

def _assess_reasoning_quality(self, votes: List[AnalystVote]) -> float:
"""Assess overall reasoning quality across all votes."""
if not votes:
return 0.0

scores = []
for vote in votes:
reasoning = vote.reasoning or ""
score = 0.0

# Length check
if len(reasoning) >= 50:
score += 0.30
elif len(reasoning) >= 20:
score += 0.15

# Contains specific indicators
specificity_patterns = [
r'\d+', # Contains numbers
r'match(es|ed|ing)?', # Match terminology
r'pattern', # Pattern terminology
r'section', # Section references
r'\.md|\.py', # File references
]
for pattern in specificity_patterns:
if re.search(pattern, reasoning, re.IGNORECASE):
score += 0.14

scores.append(min(1.0, score))

return sum(scores) / len(scores)