Skip to main content

""" Judge Persona Loader for MoE Verification Layer.

Loads and manages judge personas from config/judge-personas.json for the Constitutional Court architecture (H.3.1.7).

Features:

  • Load personas by ID or get all enabled personas
  • Generate prompt templates with artifact/context injection
  • Validate persona schema completeness
  • Model routing configuration (configurable, not hard-coded)
  • Environment variable overrides for models
  • Diversity verification across model families
  • Provider-aware model selection (ADR-073)

Model Configuration Priority (highest to lowest):

  1. Runtime override via get_model_for_persona(override_model=...)
  2. Environment variables: CODITECT_JUDGE_MODEL_<PERSONA_ID>
  3. Provider mode detection (single/dual/multi) - ADR-073
  4. Model routing config file: config/judge-model-routing.json
  5. Persona-specific config in judge-personas.json """

import json import os import re from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path from typing import Dict, List, Optional, Any, Tuple

Import provider detector for provider-aware model selection (ADR-073)

from .provider_detector import ( ProviderDetector, ProviderMode, Provider, ProviderDetectionResult, get_default_detector, )

class Verdict(str, Enum): """Unified verdict types for consensus.""" PASS = "PASS" FAIL = "FAIL" CONDITIONAL = "CONDITIONAL"

class StrictnessLevel(str, Enum): """Persona strictness levels.""" LOW = "LOW" MEDIUM = "MEDIUM" HIGH = "HIGH" VERY_HIGH = "VERY_HIGH" ADVERSARIAL = "ADVERSARIAL" METHODICAL = "METHODICAL"

@dataclass class ModelRouting: """Model routing configuration for a persona.""" primary_model: str backup_model: str model_family: str

@dataclass class EvaluationDimension: """A single evaluation dimension with scoring.""" id: str name: str weight: float scale: List[Any] score_descriptions: Optional[Dict[int, str]] = None evaluation_steps: Optional[List[str]] = None evaluation_criteria: Optional[List[str]] = None owasp_ref: Optional[str] = None types: Optional[List[str]] = None

@dataclass class RedFlag: """A red flag pattern to identify during evaluation.""" pattern: str severity: str

@dataclass class Demographics: """Persona demographic information.""" name: str title: str experience_years: int credentials: List[str] background: str

@dataclass class EvaluationStyle: """Evaluation style configuration.""" strictness: str focus: str risk_tolerance: Optional[str] = None documentation_expectation: Optional[str] = None technical_debt_tolerance: Optional[str] = None gap_tolerance: Optional[str] = None assumption: Optional[str] = None input_assumption: Optional[str] = None

@dataclass class JudgePersona: """Complete judge persona with all attributes.""" persona_id: str version: str enabled: bool demographics: Demographics expertise: Dict[str, List[str]] evaluation_style: EvaluationStyle model_routing: ModelRouting weight: float trigger_conditions: List[str] evaluation_dimensions: List[EvaluationDimension] output_schema: Dict[str, str] red_flags: Optional[List[RedFlag]] = None regulatory_references: Optional[Dict] = None severity_classification: Optional[Dict[str, str]] = None patient_safety_red_flags: Optional[List[RedFlag]] = None testing_gaps_to_identify: Optional[List[str]] = None

@dataclass class PersonaConfig: """Global persona configuration.""" approval_threshold: float = 0.67 confidence_floor: float = 0.60 max_debate_rounds: int = 3 convergence_threshold: float = 0.80 min_model_families: int = 3 max_single_model_weight: float = 0.40

@dataclass class PersonaRegistry: """Registry of all loaded personas with configuration.""" version: str created: str updated: str description: str configuration: PersonaConfig personas: Dict[str, JudgePersona] diversity_requirements: Dict[str, Any] verdict_types: Dict[str, List[str]] consensus_mapping: Dict[str, List[str]]

class PersonaValidationError(Exception): """Raised when persona validation fails.""" pass

class PersonaNotFoundError(Exception): """Raised when requested persona is not found.""" pass

class PersonaLoader: """ Loads and manages judge personas from configuration.

Usage:
loader = PersonaLoader()
persona = loader.get_persona("technical_architect")
prompt = loader.get_prompt_template(persona, artifact, context)
"""

DEFAULT_CONFIG_PATH = Path(__file__).parent.parent.parent.parent / "config" / "judge-personas.json"

def __init__(self, config_path: Optional[Path] = None):
"""Initialize loader with optional custom config path."""
self.config_path = config_path or self.DEFAULT_CONFIG_PATH
self._registry: Optional[PersonaRegistry] = None
self._raw_config: Optional[Dict] = None
self._load_config()

def _load_config(self) -> None:
"""Load and parse the persona configuration file."""
if not self.config_path.exists():
raise FileNotFoundError(f"Persona config not found: {self.config_path}")

with open(self.config_path, 'r', encoding='utf-8') as f:
self._raw_config = json.load(f)

self._registry = self._parse_registry(self._raw_config)

def _parse_registry(self, data: Dict) -> PersonaRegistry:
"""Parse raw JSON into PersonaRegistry dataclass."""
config_data = data.get("configuration", {})
configuration = PersonaConfig(
approval_threshold=config_data.get("approval_threshold", 0.67),
confidence_floor=config_data.get("confidence_floor", 0.60),
max_debate_rounds=config_data.get("max_debate_rounds", 3),
convergence_threshold=config_data.get("convergence_threshold", 0.80),
min_model_families=config_data.get("min_model_families", 3),
max_single_model_weight=config_data.get("max_single_model_weight", 0.40)
)

personas = {}
for persona_id, persona_data in data.get("personas", {}).items():
personas[persona_id] = self._parse_persona(persona_id, persona_data)

return PersonaRegistry(
version=data.get("version", "1.0.0"),
created=data.get("created", ""),
updated=data.get("updated", ""),
description=data.get("description", ""),
configuration=configuration,
personas=personas,
diversity_requirements=data.get("diversity_requirements", {}),
verdict_types=data.get("verdict_types", {}),
consensus_mapping=data.get("consensus_mapping", {})
)

def _parse_persona(self, persona_id: str, data: Dict) -> JudgePersona:
"""Parse a single persona from JSON data."""
# Parse demographics
demo_data = data.get("demographics", {})
demographics = Demographics(
name=demo_data.get("name", ""),
title=demo_data.get("title", ""),
experience_years=demo_data.get("experience_years", 0),
credentials=demo_data.get("credentials", []),
background=demo_data.get("background", "")
)

# Parse evaluation style
style_data = data.get("evaluation_style", {})
evaluation_style = EvaluationStyle(
strictness=style_data.get("strictness", "MEDIUM"),
focus=style_data.get("focus", ""),
risk_tolerance=style_data.get("risk_tolerance"),
documentation_expectation=style_data.get("documentation_expectation"),
technical_debt_tolerance=style_data.get("technical_debt_tolerance"),
gap_tolerance=style_data.get("gap_tolerance"),
assumption=style_data.get("assumption"),
input_assumption=style_data.get("input_assumption")
)

# Parse model routing
routing_data = data.get("model_routing", {})
model_routing = ModelRouting(
primary_model=routing_data.get("primary_model", "claude-sonnet-4"),
backup_model=routing_data.get("backup_model", "claude-opus-4-5"),
model_family=routing_data.get("model_family", "anthropic")
)

# Parse evaluation dimensions
dimensions = []
for dim_data in data.get("evaluation_dimensions", []):
dim = EvaluationDimension(
id=dim_data.get("id", ""),
name=dim_data.get("name", ""),
weight=dim_data.get("weight", 0.0),
scale=dim_data.get("scale", [1, 2, 3]),
score_descriptions=dim_data.get("score_descriptions"),
evaluation_steps=dim_data.get("evaluation_steps"),
evaluation_criteria=dim_data.get("evaluation_criteria") or dim_data.get("criteria"),
owasp_ref=dim_data.get("owasp_ref"),
types=dim_data.get("types")
)
dimensions.append(dim)

# Parse red flags
red_flags = None
if "red_flags" in data:
red_flags = [
RedFlag(pattern=rf.get("pattern", ""), severity=rf.get("severity", "MEDIUM"))
for rf in data["red_flags"]
]

# Parse patient safety red flags (for healthcare domain)
patient_safety_red_flags = None
if "patient_safety_red_flags" in data:
patient_safety_red_flags = [
RedFlag(pattern=rf.get("pattern", ""), severity=rf.get("severity", "HIGH"))
for rf in data["patient_safety_red_flags"]
]

return JudgePersona(
persona_id=persona_id,
version=data.get("version", "1.0.0"),
enabled=data.get("enabled", True),
demographics=demographics,
expertise=data.get("expertise", {}),
evaluation_style=evaluation_style,
model_routing=model_routing,
weight=data.get("weight", 0.15),
trigger_conditions=data.get("trigger_conditions", []),
evaluation_dimensions=dimensions,
output_schema=data.get("output_schema", {}),
red_flags=red_flags,
regulatory_references=data.get("regulatory_references"),
severity_classification=data.get("severity_classification"),
patient_safety_red_flags=patient_safety_red_flags,
testing_gaps_to_identify=data.get("testing_gaps_to_identify")
)

def get_persona(self, persona_id: str) -> JudgePersona:
"""
Get a persona by ID.

Args:
persona_id: The persona identifier (e.g., "technical_architect")

Returns:
JudgePersona instance

Raises:
PersonaNotFoundError: If persona_id not found
"""
if persona_id not in self._registry.personas:
available = list(self._registry.personas.keys())
raise PersonaNotFoundError(
f"Persona '{persona_id}' not found. Available: {available}"
)
return self._registry.personas[persona_id]

def get_all_personas(self, enabled_only: bool = True) -> List[JudgePersona]:
"""
Get all personas, optionally filtered by enabled status.

Args:
enabled_only: If True, return only enabled personas

Returns:
List of JudgePersona instances
"""
personas = list(self._registry.personas.values())
if enabled_only:
personas = [p for p in personas if p.enabled]
return personas

def get_personas_for_artifact(self, artifact_tags: List[str]) -> List[JudgePersona]:
"""
Get personas relevant for a given artifact based on trigger conditions.

Args:
artifact_tags: Tags/labels for the artifact (e.g., ["HIPAA", "API", "healthcare"])

Returns:
List of matching personas
"""
matching = []
for persona in self.get_all_personas():
# Check if any trigger condition matches
for trigger in persona.trigger_conditions:
trigger_lower = trigger.lower()
# "All code artifacts" matches everything
if "all code" in trigger_lower or "all artifact" in trigger_lower:
matching.append(persona)
break
# Check for tag match
for tag in artifact_tags:
if tag.lower() in trigger_lower or trigger_lower in tag.lower():
matching.append(persona)
break
# Deduplicate by persona_id (JudgePersona is not hashable)
seen = {}
for p in matching:
if p.persona_id not in seen:
seen[p.persona_id] = p
return list(seen.values())

def get_prompt_template(
self,
persona: JudgePersona,
artifact: str,
context: Optional[Dict[str, Any]] = None
) -> str:
"""
Generate the evaluation prompt for a persona with artifact injection.

Args:
persona: The judge persona
artifact: The code/document artifact to evaluate
context: Additional context (ADRs, requirements, tech_stack, etc.)

Returns:
Formatted prompt string ready for LLM
"""
context = context or {}

# Build persona introduction
intro = self._build_persona_intro(persona)

# Build evaluation style section
style = self._build_evaluation_style(persona)

# Build dimensions section
dimensions = self._build_dimensions_section(persona)

# Build red flags section
red_flags = self._build_red_flags_section(persona)

# Build output schema section
output = self._build_output_schema(persona)

# Build context sections
context_sections = self._build_context_sections(context)

# Assemble full prompt
prompt = f"""{intro}

{style}

{dimensions}

{red_flags}

ARTIFACT UNDER REVIEW:

{artifact}

{context_sections}

{output} """ return prompt.strip()

def _build_persona_intro(self, persona: JudgePersona) -> str:
"""Build the persona introduction section."""
demo = persona.demographics
credentials = ", ".join(demo.credentials) if demo.credentials else "N/A"

expertise_primary = ", ".join(persona.expertise.get("primary_domains", []))

return f"""You are {demo.name}, {demo.title} with {demo.experience_years} years of experience.

CREDENTIALS: {credentials}

BACKGROUND: {demo.background}

PRIMARY EXPERTISE: {expertise_primary}"""

def _build_evaluation_style(self, persona: JudgePersona) -> str:
"""Build the evaluation style section."""
style = persona.evaluation_style

lines = [
"YOUR EVALUATION STYLE:",
f"- Strictness: {style.strictness}"
]

if style.focus:
lines.append(f"- Focus: {style.focus}")
if style.risk_tolerance:
lines.append(f"- Risk Tolerance: {style.risk_tolerance}")
if style.documentation_expectation:
lines.append(f"- Documentation: {style.documentation_expectation}")
if style.assumption:
lines.append(f"- Assumption: {style.assumption}")
if style.input_assumption:
lines.append(f"- Input Assumption: {style.input_assumption}")
if style.technical_debt_tolerance:
lines.append(f"- Technical Debt: {style.technical_debt_tolerance} tolerance")

return "\n".join(lines)

def _build_dimensions_section(self, persona: JudgePersona) -> str:
"""Build the evaluation dimensions section."""
if not persona.evaluation_dimensions:
return ""

lines = ["EVALUATION DIMENSIONS:"]

for i, dim in enumerate(persona.evaluation_dimensions, 1):
lines.append(f"\n{i}. {dim.name}")
if dim.weight:
lines.append(f" Weight: {dim.weight:.0%}")

# Score descriptions
if dim.score_descriptions:
for score, desc in sorted(dim.score_descriptions.items(), reverse=True):
lines.append(f" - {score}: {desc}")

# Evaluation criteria/steps
criteria = dim.evaluation_criteria or dim.evaluation_steps or dim.types
if criteria:
for criterion in criteria:
lines.append(f" - {criterion}")

# OWASP reference
if dim.owasp_ref:
lines.append(f" OWASP Ref: {dim.owasp_ref}")

return "\n".join(lines)

def _build_red_flags_section(self, persona: JudgePersona) -> str:
"""Build the red flags section."""
flags = persona.red_flags or persona.patient_safety_red_flags

if not flags:
return ""

section_name = "PATIENT SAFETY RED FLAGS:" if persona.patient_safety_red_flags else "RED FLAGS TO IDENTIFY:"
lines = [section_name]

for flag in flags:
lines.append(f"- [{flag.severity}] {flag.pattern}")

# Add testing gaps if present
if persona.testing_gaps_to_identify:
lines.append("\nTESTING GAPS TO IDENTIFY:")
for gap in persona.testing_gaps_to_identify:
lines.append(f"- {gap}")

return "\n".join(lines)

def _build_output_schema(self, persona: JudgePersona) -> str:
"""Build the expected output schema section."""
schema = persona.output_schema
if not schema:
return ""

# Get verdict types for this persona
verdict_types = self._registry.verdict_types.get(persona.persona_id, ["PASS", "FAIL", "CONDITIONAL"])
verdict_enum = "|".join(verdict_types)

lines = [
"Provide your evaluation in the following JSON format:",
"{",
f' "persona": "{persona.demographics.name}",',
f' "overall_verdict": "{verdict_enum}",',
' "confidence": 0.0-1.0,'
]

# Add dimension-specific fields based on persona type
if persona.persona_id == "technical_architect":
lines.extend([
' "dimension_scores": {',
' "<dimension_id>": {"score": 1-3, "evidence": "...", "issues": []}',
' },',
' "red_flags": [],',
' "strengths": [],',
' "remediation_required": [],'
])
elif persona.persona_id == "compliance_auditor":
lines.extend([
' "framework_assessments": {',
' "hipaa": {"status": "PASS|FAIL", "findings": [...]},',
' "fda_part_11": {...},',
' "soc2": {...}',
' },',
' "critical_findings": [],',
' "remediation_required": [],'
])
elif persona.persona_id == "security_analyst":
lines.extend([
' "vulnerability_findings": [',
' {"id": "VULN-001", "severity": "...", "title": "...", "cwe_id": "CWE-XXX"}',
' ],',
' "critical_count": 0,',
' "high_count": 0,',
' "attack_surface_assessment": "...",'
])
elif "healthcare" in persona.persona_id or "domain_expert" in persona.persona_id:
lines.extend([
' "dimension_scores": {...},',
' "patient_safety_concerns": [],',
' "terminology_errors": [],',
' "clinical_review_recommended": true|false,'
])
elif persona.persona_id == "qa_evaluator":
lines.extend([
' "coverage_assessment": {',
' "unit_test_coverage": "X%",',
' "critical_paths_covered": true|false',
' },',
' "missing_test_cases": [],',
' "regression_risks": [],'
])

lines.extend([
' "rationale": "..."',
'}'
])

return "\n".join(lines)

def _build_context_sections(self, context: Dict[str, Any]) -> str:
"""Build context sections from provided context dict."""
sections = []

if context.get("adrs"):
sections.append(f"APPLICABLE ADRs:\n{context['adrs']}")

if context.get("requirements"):
sections.append(f"REQUIREMENTS:\n{context['requirements']}")

if context.get("tech_stack"):
sections.append(f"TECHNOLOGY STACK:\n{context['tech_stack']}")

if context.get("compliance_requirements"):
sections.append(f"COMPLIANCE REQUIREMENTS:\n{context['compliance_requirements']}")

if context.get("security_requirements"):
sections.append(f"SECURITY REQUIREMENTS:\n{context['security_requirements']}")

if context.get("clinical_requirements"):
sections.append(f"CLINICAL REQUIREMENTS:\n{context['clinical_requirements']}")

if context.get("clinical_standards"):
sections.append(f"CLINICAL STANDARDS:\n{context['clinical_standards']}")

if context.get("existing_tests"):
sections.append(f"EXISTING TEST COVERAGE:\n{context['existing_tests']}")

if context.get("frameworks"):
sections.append(f"APPLICABLE FRAMEWORKS:\n{context['frameworks']}")

return "\n\n".join(sections)

def validate_persona_schema(self, persona: JudgePersona) -> Tuple[bool, List[str]]:
"""
Validate that a persona has all required fields.

Args:
persona: The persona to validate

Returns:
Tuple of (is_valid, list of validation errors)
"""
errors = []

# Required fields
if not persona.persona_id:
errors.append("Missing persona_id")
if not persona.demographics.name:
errors.append("Missing demographics.name")
if not persona.evaluation_style.strictness:
errors.append("Missing evaluation_style.strictness")
if not persona.model_routing.primary_model:
errors.append("Missing model_routing.primary_model")
if persona.weight <= 0 or persona.weight > 1:
errors.append(f"Invalid weight: {persona.weight} (must be 0 < weight <= 1)")
if not persona.evaluation_dimensions:
errors.append("No evaluation_dimensions defined")
if not persona.trigger_conditions:
errors.append("No trigger_conditions defined")

# Validate dimensions have required fields
for dim in persona.evaluation_dimensions:
if not dim.id:
errors.append(f"Dimension missing id")
if not dim.name:
errors.append(f"Dimension {dim.id} missing name")

return len(errors) == 0, errors

def validate_all_personas(self) -> Dict[str, Tuple[bool, List[str]]]:
"""
Validate all loaded personas.

Returns:
Dict mapping persona_id to (is_valid, errors)
"""
results = {}
for persona_id, persona in self._registry.personas.items():
results[persona_id] = self.validate_persona_schema(persona)
return results

def verify_panel_diversity(
self,
persona_ids: Optional[List[str]] = None,
provider_aware: bool = True
) -> Tuple[bool, Dict[str, Any]]:
"""
Verify that a panel of personas meets diversity requirements.

In provider-aware mode (ADR-073), diversity requirements are adjusted
based on the detected provider mode:
- Single provider: Model tier diversity (flagship/balanced/fast)
- Dual provider: Provider alternation diversity
- Multi provider: Full model family diversity

Args:
persona_ids: Specific personas to check, or all enabled if None
provider_aware: If True, adjust requirements based on provider mode

Returns:
Tuple of (meets_requirements, details_dict)
"""
if persona_ids:
personas = [self.get_persona(pid) for pid in persona_ids]
else:
personas = self.get_all_personas()

# Detect provider mode if provider-aware
provider_mode = ProviderMode.MULTI
provider_detection_result: Optional[ProviderDetectionResult] = None
if provider_aware:
try:
detector = get_default_detector()
provider_detection_result = detector.detect_mode()
provider_mode = provider_detection_result.mode
except Exception:
pass # Default to MULTI mode requirements

# Count families and weights
family_weights: Dict[str, float] = {}
for persona in personas:
family = persona.model_routing.model_family
family_weights[family] = family_weights.get(family, 0) + persona.weight

# Normalize weights
total_weight = sum(family_weights.values())
if total_weight > 0:
family_weights = {k: v/total_weight for k, v in family_weights.items()}

# Check requirements based on provider mode
config = self._registry.configuration
diversity_req = self._registry.diversity_requirements

num_families = len(family_weights)
max_family_weight = max(family_weights.values()) if family_weights else 0

# Adjust requirements based on provider mode (ADR-073)
if provider_mode == ProviderMode.SINGLE:
# Single provider: Only require model tier diversity, not family diversity
min_families_required = 1
max_allowed_weight = 1.0 # Single family is acceptable
required_families = set()
diversity_strategy = "model_tier_diversity"
elif provider_mode == ProviderMode.DUAL:
# Dual provider: Require at least 2 families
min_families_required = 2
max_allowed_weight = 0.60 # Allow up to 60% from one provider
required_families = set() # Any two providers are acceptable
diversity_strategy = "provider_alternation"
else: # MULTI
# Full diversity requirements
min_families_required = config.min_model_families
max_allowed_weight = config.max_single_model_weight
required_families = set(diversity_req.get("required_families", []))
diversity_strategy = "full_diversity"

meets_min_families = num_families >= min_families_required
meets_max_weight = max_family_weight <= max_allowed_weight

# Check required families (only in multi mode)
present_families = set(family_weights.keys())
missing_families = required_families - present_families
has_required = len(missing_families) == 0

details = {
"num_families": num_families,
"min_required": min_families_required,
"family_weights": family_weights,
"max_family_weight": max_family_weight,
"max_allowed_weight": max_allowed_weight,
"required_families": list(required_families),
"missing_families": list(missing_families),
"meets_min_families": meets_min_families,
"meets_max_weight": meets_max_weight,
"has_required_families": has_required,
# Provider mode info (ADR-073)
"provider_mode": provider_mode.value,
"diversity_strategy": diversity_strategy,
"provider_count": provider_detection_result.provider_count if provider_detection_result else len(family_weights),
"confidence_adjustment": provider_detection_result.confidence_adjustment if provider_detection_result else 0.0
}

is_valid = meets_min_families and meets_max_weight and has_required

return is_valid, details

def adjust_confidence_for_provider_mode(
self,
confidence: float
) -> float:
"""
Adjust confidence based on detected provider mode (ADR-073).

Single provider mode: -10% adjustment (less diversity)
Dual provider mode: -5% adjustment
Multi provider mode: No adjustment (full diversity)

Args:
confidence: Original confidence score (0.0-1.0)

Returns:
Adjusted confidence score (clamped to 0.0-1.0)
"""
try:
detector = get_default_detector()
return detector.adjust_confidence(confidence)
except Exception:
return confidence # Return unchanged if detector fails

def get_provider_mode(self) -> ProviderMode:
"""
Get the current provider mode based on available API keys.

Returns:
ProviderMode enum (SINGLE, DUAL, or MULTI)
"""
try:
detector = get_default_detector()
result = detector.detect_mode()
return result.mode
except Exception:
return ProviderMode.MULTI # Assume full diversity by default

def get_diversity_report(self) -> Dict[str, Any]:
"""
Get a comprehensive diversity report including provider mode info.

Returns:
Dict with mode, provider count, recommendations, etc.
"""
try:
detector = get_default_detector()
return detector.get_diversity_report()
except Exception:
return {
"mode": "unknown",
"provider_count": 0,
"error": "Provider detector unavailable"
}

def get_model_for_persona(
self,
persona_id: str,
use_backup: bool = False,
override_model: Optional[str] = None,
provider_aware: bool = True
) -> str:
"""
Get the model to use for a persona with configuration priority.

Priority (highest to lowest):
1. override_model parameter (runtime override)
2. Environment variable: CODITECT_JUDGE_MODEL_<PERSONA_ID>
3. Provider mode detection (single/dual/multi) - ADR-073
4. Model routing config file: config/judge-model-routing.json
5. Persona-specific config in judge-personas.json

Args:
persona_id: The persona identifier
use_backup: If True, return backup model instead of primary
override_model: Runtime override for model selection
provider_aware: If True, use ProviderDetector for mode-aware selection

Returns:
Model identifier string

Raises:
PersonaNotFoundError: If persona_id not found
ValueError: If no model configuration found
"""
# Priority 1: Runtime override
if override_model:
return override_model

# Priority 2: Environment variable
env_key = f"CODITECT_JUDGE_MODEL_{persona_id.upper()}"
env_model = os.environ.get(env_key)
if env_model:
return env_model

# Also check backup env var
if use_backup:
env_backup_key = f"CODITECT_JUDGE_MODEL_{persona_id.upper()}_BACKUP"
env_backup = os.environ.get(env_backup_key)
if env_backup:
return env_backup

# Priority 3: Provider mode detection (ADR-073)
if provider_aware:
try:
detector = get_default_detector()
result = detector.detect_mode()

# In single or dual provider mode, use provider detector's model selection
if result.mode in (ProviderMode.SINGLE, ProviderMode.DUAL):
model = detector.get_model_for_persona(persona_id)
if model:
return model
except Exception:
# Fall through to config-based selection if detector fails
pass

# Priority 4: Model routing config file
model_from_routing = self._get_model_from_routing_config(persona_id, use_backup)
if model_from_routing:
return model_from_routing

# Priority 5: Persona-specific config
persona = self.get_persona(persona_id)
if use_backup:
if persona.model_routing.backup_model:
return persona.model_routing.backup_model
# Fall through to primary if no backup configured

if persona.model_routing.primary_model:
return persona.model_routing.primary_model

raise ValueError(
f"No model configuration found for persona '{persona_id}'. "
f"Set CODITECT_JUDGE_MODEL_{persona_id.upper()} or configure in judge-personas.json"
)

def _get_model_from_routing_config(
self,
persona_id: str,
use_backup: bool
) -> Optional[str]:
"""
Load model from separate routing config file if it exists.

Args:
persona_id: The persona identifier
use_backup: If True, return backup model

Returns:
Model string or None if not configured
"""
routing_path = self.config_path.parent / "judge-model-routing.json"
if not routing_path.exists():
return None

try:
with open(routing_path, 'r', encoding='utf-8') as f:
routing_config = json.load(f)

persona_routing = routing_config.get("routing", {}).get(persona_id)
if not persona_routing:
return None

if use_backup:
return persona_routing.get("backup_model")
return persona_routing.get("primary_model")
except (json.JSONDecodeError, IOError):
return None

def get_all_model_mappings(self) -> Dict[str, Dict[str, str]]:
"""
Get all persona-to-model mappings with resolved priorities.

Returns:
Dict mapping persona_id to {primary_model, backup_model, source}
"""
mappings = {}
for persona_id in self._registry.personas.keys():
try:
primary = self.get_model_for_persona(persona_id, use_backup=False)
backup = self.get_model_for_persona(persona_id, use_backup=True)

# Determine source
env_key = f"CODITECT_JUDGE_MODEL_{persona_id.upper()}"
if os.environ.get(env_key):
source = "environment"
elif self._get_model_from_routing_config(persona_id, False):
source = "routing_config"
else:
source = "persona_config"

mappings[persona_id] = {
"primary_model": primary,
"backup_model": backup,
"source": source
}
except ValueError:
mappings[persona_id] = {
"primary_model": None,
"backup_model": None,
"source": "not_configured"
}

return mappings

def map_verdict_to_consensus(self, persona_id: str, verdict: str) -> Verdict:
"""
Map a persona-specific verdict to unified consensus verdict.

Args:
persona_id: The persona that issued the verdict
verdict: The persona-specific verdict string

Returns:
Unified Verdict enum value
"""
mapping = self._registry.consensus_mapping

if verdict in mapping.get("PASS_equivalent", []):
return Verdict.PASS
elif verdict in mapping.get("FAIL_equivalent", []):
return Verdict.FAIL
elif verdict in mapping.get("CONDITIONAL_equivalent", []):
return Verdict.CONDITIONAL
else:
# Default based on verdict string
verdict_upper = verdict.upper()
if "PASS" in verdict_upper or "SAFE" in verdict_upper or "COMPLIANT" in verdict_upper or "SECURE" in verdict_upper:
return Verdict.PASS
elif "FAIL" in verdict_upper or "UNSAFE" in verdict_upper or "NON_COMPLIANT" in verdict_upper or "VULNERABLE" in verdict_upper:
return Verdict.FAIL
else:
return Verdict.CONDITIONAL

@property
def configuration(self) -> PersonaConfig:
"""Get the global persona configuration."""
return self._registry.configuration

@property
def registry(self) -> PersonaRegistry:
"""Get the full persona registry."""
return self._registry

def reload(self) -> None:
"""Reload the configuration from disk."""
self._load_config()

Convenience functions for direct usage

_default_loader: Optional[PersonaLoader] = None

def get_default_loader() -> PersonaLoader: """Get or create the default persona loader.""" global _default_loader if _default_loader is None: _default_loader = PersonaLoader() return _default_loader

def load_persona(persona_id: str) -> JudgePersona: """Load a persona by ID using the default loader.""" return get_default_loader().get_persona(persona_id)

def get_prompt_template( persona: JudgePersona, artifact: str, context: Optional[Dict[str, Any]] = None ) -> str: """Generate prompt template using the default loader.""" return get_default_loader().get_prompt_template(persona, artifact, context)

def validate_persona_schema(persona: JudgePersona) -> Tuple[bool, List[str]]: """Validate a persona schema using the default loader.""" return get_default_loader().validate_persona_schema(persona)

def verify_panel_diversity( persona_ids: Optional[List[str]] = None, provider_aware: bool = True ) -> Tuple[bool, Dict[str, Any]]: """Verify panel diversity using the default loader.""" return get_default_loader().verify_panel_diversity(persona_ids, provider_aware)

def adjust_confidence_for_provider_mode(confidence: float) -> float: """Adjust confidence based on provider mode using the default loader.""" return get_default_loader().adjust_confidence_for_provider_mode(confidence)

def get_provider_mode() -> ProviderMode: """Get the current provider mode using the default loader.""" return get_default_loader().get_provider_mode()

def get_provider_diversity_report() -> Dict[str, Any]: """Get a diversity report using the default loader.""" return get_default_loader().get_diversity_report()