scripts-debate
""" Debate Protocol for MoE Classification System.
Implements multi-round debate when judges disagree, based on MAJ-EVAL in-group debate protocol (Chen et al., 2025) and CODITECT research.
Key concepts:
- MAX_DEBATE_ROUNDS: Maximum rounds before forcing consensus (default: 3)
- CONVERGENCE_THRESHOLD: Agreement ratio to stop debate (default: 0.8)
- Debate context includes other judges' positions and evidence """
from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Dict, List, Optional, Callable, Any import statistics
class Verdict(str, Enum): """Judge verdict options.""" PASS = "PASS" FAIL = "FAIL" CONDITIONAL = "CONDITIONAL"
@dataclass class JudgeEvaluation: """Evaluation from a single judge.""" persona_id: str model_used: str verdict: Verdict confidence: float dimension_scores: Dict[str, float] critical_findings: List[str] = field(default_factory=list) remediation_required: List[str] = field(default_factory=list) rationale: str = "" raw_response: str = "" # For audit trail timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) token_usage: int = 0 debate_round: int = 0 # Which debate round this evaluation is from
@dataclass class Disagreement: """Represents a specific area of disagreement between judges.""" type: str # "verdict" or "dimension" dimension: Optional[str] = None # Only for dimension-level disagreements positions: Dict[str, Any] = field(default_factory=dict) # persona_id -> position severity: float = 0.0 # 0-1 indicating how significant the disagreement is
@dataclass class DebateRound: """Record of a single debate round.""" round_number: int initial_agreement: float final_agreement: float disagreements: List[Disagreement] evaluations: List[JudgeEvaluation] debate_context: str timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
@dataclass class DebateConfig: """Configuration for debate orchestration.""" max_debate_rounds: int = 3 convergence_threshold: float = 0.8 # Stop debate when agreement >= this dimension_disagreement_threshold: float = 1.5 # Score gap to trigger debate min_evaluations_for_debate: int = 2 debate_timeout_seconds: int = 120
# Weights for different persona types (can be customized)
persona_weights: Dict[str, float] = field(default_factory=lambda: {
"technical_architect": 0.25,
"compliance_auditor": 0.25,
"security_analyst": 0.20,
"domain_expert": 0.15,
"qa_evaluator": 0.15,
})
@dataclass class DebateResult: """Result of the complete debate process.""" final_evaluations: List[JudgeEvaluation] rounds: List[DebateRound] initial_agreement: float final_agreement: float convergence_achieved: bool total_debate_rounds: int timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
class DebateOrchestrator: """ Orchestrates multi-round debate when judges disagree. Based on MAJ-EVAL in-group debate protocol (Chen et al., 2025).
The debate process:
1. Calculate initial agreement ratio
2. If below convergence threshold, identify disagreements
3. Prepare debate context with other judges' positions
4. Have each judge re-evaluate with debate context
5. Repeat until convergence or max rounds reached
"""
def __init__(self, config: Optional[DebateConfig] = None):
"""Initialize debate orchestrator with configuration."""
self.config = config or DebateConfig()
self._evaluation_callback: Optional[Callable] = None
def set_evaluation_callback(
self,
callback: Callable[[str, str, str, Dict], JudgeEvaluation]
) -> None:
"""
Set callback for conducting evaluations during debate.
The callback signature:
callback(persona_id, artifact, debate_context, context) -> JudgeEvaluation
"""
self._evaluation_callback = callback
async def orchestrate_debate(
self,
evaluations: List[JudgeEvaluation],
artifact: str,
context: Optional[Dict] = None
) -> DebateResult:
"""
Orchestrate debate rounds until convergence or max rounds.
Args:
evaluations: Initial judge evaluations
artifact: The artifact being evaluated
context: Additional context for evaluation
Returns:
DebateResult with final evaluations and debate history
"""
if len(evaluations) < self.config.min_evaluations_for_debate:
return DebateResult(
final_evaluations=evaluations,
rounds=[],
initial_agreement=1.0,
final_agreement=1.0,
convergence_achieved=True,
total_debate_rounds=0
)
context = context or {}
current_evaluations = evaluations
initial_agreement = self._calculate_agreement(current_evaluations)
rounds: List[DebateRound] = []
for round_num in range(self.config.max_debate_rounds):
# Check for convergence
agreement = self._calculate_agreement(current_evaluations)
if agreement >= self.config.convergence_threshold:
break
# Identify disagreement areas
disagreements = self._identify_disagreements(current_evaluations)
if not disagreements:
break
# Generate debate context
debate_context = self._prepare_debate_context(
current_evaluations,
disagreements,
round_num
)
# Conduct debate round
updated_evaluations = await self._conduct_debate_round(
current_evaluations,
debate_context,
artifact,
context,
round_num + 1
)
final_agreement = self._calculate_agreement(updated_evaluations)
# Record round
rounds.append(DebateRound(
round_number=round_num + 1,
initial_agreement=agreement,
final_agreement=final_agreement,
disagreements=disagreements,
evaluations=updated_evaluations,
debate_context=debate_context
))
current_evaluations = updated_evaluations
final_agreement = self._calculate_agreement(current_evaluations)
return DebateResult(
final_evaluations=current_evaluations,
rounds=rounds,
initial_agreement=initial_agreement,
final_agreement=final_agreement,
convergence_achieved=final_agreement >= self.config.convergence_threshold,
total_debate_rounds=len(rounds)
)
def _calculate_agreement(self, evaluations: List[JudgeEvaluation]) -> float:
"""
Calculate agreement ratio among evaluations.
Agreement is based on:
1. Verdict agreement (weighted 60%)
2. Dimension score agreement (weighted 40%)
"""
if len(evaluations) < 2:
return 1.0
# Verdict agreement
verdicts = [e.verdict for e in evaluations]
verdict_counts: Dict[Verdict, int] = {}
for v in verdicts:
verdict_counts[v] = verdict_counts.get(v, 0) + 1
max_verdict_count = max(verdict_counts.values())
verdict_agreement = max_verdict_count / len(evaluations)
# Dimension score agreement (based on standard deviation)
all_dimensions = set()
for e in evaluations:
all_dimensions.update(e.dimension_scores.keys())
dimension_agreements = []
for dim in all_dimensions:
scores = [
e.dimension_scores.get(dim)
for e in evaluations
if dim in e.dimension_scores
]
if len(scores) >= 2:
# Normalize by expected range (1-3 scale)
score_range = max(scores) - min(scores)
# Max disagreement on 1-3 scale is 2 points
agreement = 1.0 - (score_range / 2.0)
dimension_agreements.append(max(0.0, agreement))
dimension_agreement = (
statistics.mean(dimension_agreements)
if dimension_agreements
else 1.0
)
# Weighted combination
return 0.6 * verdict_agreement + 0.4 * dimension_agreement
def _identify_disagreements(
self,
evaluations: List[JudgeEvaluation]
) -> List[Disagreement]:
"""
Identify specific dimensions where judges disagree.
Returns list of disagreements sorted by severity.
"""
disagreements: List[Disagreement] = []
# Check verdict-level disagreement
verdicts = [e.verdict for e in evaluations]
unique_verdicts = set(verdicts)
if len(unique_verdicts) > 1:
# Calculate severity based on verdict spread
verdict_counts: Dict[Verdict, int] = {}
for v in verdicts:
verdict_counts[v] = verdict_counts.get(v, 0) + 1
# Severity is higher when verdicts are split more evenly
max_count = max(verdict_counts.values())
severity = 1.0 - (max_count / len(evaluations))
disagreements.append(Disagreement(
type="verdict",
positions={
e.persona_id: e.verdict.value
for e in evaluations
},
severity=severity
))
# Check dimension-level disagreements
all_dimensions: set = set()
for e in evaluations:
all_dimensions.update(e.dimension_scores.keys())
for dim in all_dimensions:
scores = {
e.persona_id: e.dimension_scores.get(dim)
for e in evaluations
if dim in e.dimension_scores
}
if len(scores) < 2:
continue
score_values = [s for s in scores.values() if s is not None]
score_gap = max(score_values) - min(score_values)
if score_gap >= self.config.dimension_disagreement_threshold:
# Severity based on score gap (max gap on 1-3 scale is 2)
severity = min(1.0, score_gap / 2.0)
disagreements.append(Disagreement(
type="dimension",
dimension=dim,
positions=scores,
severity=severity
))
# Sort by severity (most severe first)
disagreements.sort(key=lambda d: d.severity, reverse=True)
return disagreements
def _prepare_debate_context(
self,
evaluations: List[JudgeEvaluation],
disagreements: List[Disagreement],
round_num: int
) -> str:
"""
Prepare context for debate round.
Includes:
- Other judges' positions and rationales
- Specific areas of disagreement
- Instructions for debate response
"""
lines = [
f"═══ DEBATE ROUND {round_num + 1} ═══",
"",
"Your fellow judges have provided their evaluations. There are areas of "
"disagreement that require discussion. Review the positions below and "
"reconsider your evaluation in light of the evidence presented.",
"",
"─── AREAS OF DISAGREEMENT ───",
""
]
for d in disagreements:
if d.type == "verdict":
lines.append("VERDICT DISAGREEMENT:")
for persona_id, verdict in d.positions.items():
eval_obj = next(
(e for e in evaluations if e.persona_id == persona_id),
None
)
if eval_obj:
confidence_str = f"{eval_obj.confidence:.0%}"
rationale_preview = eval_obj.rationale[:300]
if len(eval_obj.rationale) > 300:
rationale_preview += "..."
lines.extend([
f" • {persona_id}: {verdict} (confidence: {confidence_str})",
f" Rationale: {rationale_preview}",
""
])
else:
lines.append(f"DIMENSION DISAGREEMENT: {d.dimension}")
for persona_id, score in d.positions.items():
if score is not None:
eval_obj = next(
(e for e in evaluations if e.persona_id == persona_id),
None
)
evidence = ""
if eval_obj and d.dimension in eval_obj.dimension_scores:
# Look for evidence in critical findings
for finding in eval_obj.critical_findings:
if d.dimension.lower() in finding.lower():
evidence = finding
break
lines.append(f" • {persona_id}: Score {score}/3")
if evidence:
lines.append(f" Evidence: {evidence}")
lines.append("")
lines.extend([
"─── DEBATE INSTRUCTIONS ───",
"",
"1. Review other judges' positions and supporting evidence carefully",
"2. Consider whether their concerns reveal aspects you may have overlooked",
"3. If their evidence is compelling, update your assessment accordingly",
"4. If you maintain your position, cite specific evidence from the artifact",
"5. Focus on objective, verifiable criteria rather than subjective opinion",
"",
"Provide your UPDATED evaluation considering the debate context.",
"═════════════════════════════",
])
return "\n".join(lines)
async def _conduct_debate_round(
self,
evaluations: List[JudgeEvaluation],
debate_context: str,
artifact: str,
context: Dict,
round_number: int
) -> List[JudgeEvaluation]:
"""
Conduct a single debate round.
Each judge receives the debate context and provides an updated evaluation.
"""
if not self._evaluation_callback:
# Without a callback, we can't conduct actual evaluations
# Return evaluations as-is but mark the round
return [
JudgeEvaluation(
persona_id=e.persona_id,
model_used=e.model_used,
verdict=e.verdict,
confidence=e.confidence,
dimension_scores=e.dimension_scores.copy(),
critical_findings=e.critical_findings.copy(),
remediation_required=e.remediation_required.copy(),
rationale=e.rationale,
raw_response=e.raw_response,
token_usage=e.token_usage,
debate_round=round_number
)
for e in evaluations
]
updated_evaluations: List[JudgeEvaluation] = []
for evaluation in evaluations:
# Call the evaluation callback for each judge
updated = await self._evaluation_callback(
evaluation.persona_id,
artifact,
debate_context,
{
**context,
"previous_evaluation": evaluation,
"debate_round": round_number
}
)
# Ensure round number is set
updated.debate_round = round_number
updated_evaluations.append(updated)
return updated_evaluations
def get_debate_summary(self, result: DebateResult) -> Dict:
"""
Generate a summary of the debate process for audit trail.
"""
return {
"total_rounds": result.total_debate_rounds,
"initial_agreement": round(result.initial_agreement, 3),
"final_agreement": round(result.final_agreement, 3),
"convergence_achieved": result.convergence_achieved,
"agreement_improvement": round(
result.final_agreement - result.initial_agreement, 3
),
"rounds": [
{
"round": r.round_number,
"initial_agreement": round(r.initial_agreement, 3),
"final_agreement": round(r.final_agreement, 3),
"disagreement_count": len(r.disagreements),
"disagreement_types": [d.type for d in r.disagreements]
}
for r in result.rounds
],
"final_verdicts": {
e.persona_id: {
"verdict": e.verdict.value,
"confidence": round(e.confidence, 3)
}
for e in result.final_evaluations
},
"timestamp": result.timestamp
}
def create_default_orchestrator() -> DebateOrchestrator: """Create a debate orchestrator with default configuration.""" return DebateOrchestrator(DebateConfig())
def requires_debate( evaluations: List[JudgeEvaluation], threshold: float = 0.8 ) -> bool: """ Quick check to determine if debate is needed.
Args:
evaluations: List of judge evaluations
threshold: Agreement threshold (default 0.8)
Returns:
True if debate is recommended
"""
if len(evaluations) < 2:
return False
orchestrator = DebateOrchestrator()
agreement = orchestrator._calculate_agreement(evaluations)
return agreement < threshold