Skip to main content

""" Rubric Merger for MoE Verification Layer.

Merges generated ADR rubrics with base persona evaluation dimensions and renormalizes weights (H.3.2.4).

Features:

  • Merge ADR-generated rubrics with persona base dimensions
  • Smart matching based on persona expertise and trigger conditions
  • Weight renormalization to maintain sum = 1.0
  • Configurable merge strategies (append, replace, weighted)
  • Conflict resolution for overlapping dimensions

Usage: from moe_classifier.core.rubric_merger import RubricMerger, MergeStrategy

merger = RubricMerger()
merged = merger.merge_rubrics(persona, adr_rubrics)

# Or use convenience function
from moe_classifier.core import merge_persona_with_adrs
merged_persona = merge_persona_with_adrs("technical_architect", ["ADR-001", "ADR-009"])

"""

import json import os import re from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Dict, List, Optional, Any, Tuple, Set

Import from persona_loader

from .persona_loader import ( PersonaLoader, JudgePersona, EvaluationDimension, get_default_loader, )

class MergeStrategy(str, Enum): """Strategy for merging rubrics with persona dimensions.""" APPEND = "append" # Add ADR dimensions to existing WEIGHTED_APPEND = "weighted_append" # Add with reduced weight for ADR dims REPLACE_MATCHING = "replace_matching" # Replace if dimension name matches ADR_PRIORITY = "adr_priority" # ADR dimensions take priority

class ConflictResolution(str, Enum): """How to resolve conflicts between base and ADR dimensions.""" KEEP_BASE = "keep_base" # Keep the base persona dimension KEEP_ADR = "keep_adr" # Keep the ADR-generated dimension MERGE_SCORES = "merge_scores" # Combine score descriptions HIGHEST_WEIGHT = "highest_weight" # Keep whichever has higher weight

@dataclass class MergeConfig: """Configuration for rubric merging.""" strategy: MergeStrategy = MergeStrategy.WEIGHTED_APPEND conflict_resolution: ConflictResolution = ConflictResolution.MERGE_SCORES adr_weight_factor: float = 0.6 # ADR dimensions get 60% of their original weight max_total_dimensions: int = 15 # Cap total dimensions min_dimension_weight: float = 0.02 # Minimum weight threshold preserve_must_constraints: bool = True # Always include MUST-level ADR constraints

@dataclass class MergedDimension: """A merged evaluation dimension combining base and ADR sources.""" id: str name: str weight: float scale: List[int] score_descriptions: Dict[int, str] evaluation_steps: List[str] source: str # "base", "adr", or "merged" source_adr: Optional[str] = None constraint_level: Optional[str] = None original_weight: float = 0.0

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"id": self.id,
"name": self.name,
"weight": self.weight,
"scale": self.scale,
"score_descriptions": self.score_descriptions,
"evaluation_steps": self.evaluation_steps,
"source": self.source,
"source_adr": self.source_adr,
"constraint_level": self.constraint_level,
"original_weight": self.original_weight,
}

@classmethod
def from_persona_dimension(cls, dim: EvaluationDimension) -> "MergedDimension":
"""Create from a persona EvaluationDimension."""
return cls(
id=dim.id,
name=dim.name,
weight=dim.weight,
scale=dim.scale if isinstance(dim.scale, list) else [1, 2, 3],
score_descriptions=dim.score_descriptions or {},
evaluation_steps=dim.evaluation_steps or dim.evaluation_criteria or [],
source="base",
original_weight=dim.weight,
)

@classmethod
def from_adr_dimension(cls, dim: Dict[str, Any], source_adr: str) -> "MergedDimension":
"""Create from an ADR-generated rubric dimension."""
return cls(
id=dim.get("id", ""),
name=dim.get("name", ""),
weight=dim.get("weight", 0.1),
scale=dim.get("scale", [1, 2, 3]),
score_descriptions=dim.get("score_descriptions", {}),
evaluation_steps=dim.get("evaluation_steps", []),
source="adr",
source_adr=source_adr,
constraint_level=dim.get("constraint_level"),
original_weight=dim.get("weight", 0.1),
)

@dataclass class MergeResult: """Result of a rubric merge operation.""" persona_id: str merged_dimensions: List[MergedDimension] total_weight: float base_dimension_count: int adr_dimension_count: int merged_dimension_count: int adrs_applied: List[str] merge_strategy: str merged_at: str warnings: List[str] = field(default_factory=list)

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"persona_id": self.persona_id,
"merged_dimensions": [d.to_dict() for d in self.merged_dimensions],
"total_weight": self.total_weight,
"statistics": {
"base_dimension_count": self.base_dimension_count,
"adr_dimension_count": self.adr_dimension_count,
"merged_dimension_count": self.merged_dimension_count,
},
"adrs_applied": self.adrs_applied,
"merge_strategy": self.merge_strategy,
"merged_at": self.merged_at,
"warnings": self.warnings,
}

class RubricMerger: """ Merges ADR-generated rubrics with base persona evaluation dimensions.

The merger:
1. Loads persona base dimensions
2. Loads relevant ADR rubrics based on persona expertise
3. Merges dimensions using configured strategy
4. Renormalizes weights to sum to 1.0
"""

def __init__(
self,
config: Optional[MergeConfig] = None,
persona_loader: Optional[PersonaLoader] = None,
rubrics_dir: Optional[Path] = None,
):
"""
Initialize the merger.

Args:
config: Merge configuration
persona_loader: PersonaLoader instance (uses default if None)
rubrics_dir: Directory containing generated rubrics
"""
self.config = config or MergeConfig()
self.persona_loader = persona_loader or get_default_loader()

# Find rubrics directory
if rubrics_dir:
self.rubrics_dir = Path(rubrics_dir)
else:
# Default: config/generated-rubrics relative to coditect-core
base = Path(__file__).parent.parent.parent.parent
self.rubrics_dir = base / "config" / "generated-rubrics"

self._rubric_cache: Dict[str, Dict] = {}
self._index_cache: Optional[Dict] = None

def load_rubric_index(self) -> Dict[str, Any]:
"""
Load the rubrics index file.

Returns:
Index dictionary with rubric metadata
"""
if self._index_cache:
return self._index_cache

index_path = self.rubrics_dir / "_index.json"
if index_path.exists():
with open(index_path, 'r', encoding='utf-8') as f:
self._index_cache = json.load(f)
else:
self._index_cache = {"rubrics": []}

return self._index_cache

def load_rubric(self, rubric_id: str) -> Optional[Dict[str, Any]]:
"""
Load a specific rubric by ID.

Args:
rubric_id: The rubric ID (e.g., "rubric_adr_001")

Returns:
Rubric dictionary or None if not found
"""
if rubric_id in self._rubric_cache:
return self._rubric_cache[rubric_id]

rubric_path = self.rubrics_dir / f"{rubric_id}.json"
if not rubric_path.exists():
return None

with open(rubric_path, 'r', encoding='utf-8') as f:
rubric = json.load(f)
self._rubric_cache[rubric_id] = rubric
return rubric

def get_relevant_adrs(
self,
persona: JudgePersona,
adr_ids: Optional[List[str]] = None
) -> List[str]:
"""
Get ADR rubric IDs relevant to a persona based on expertise.

Args:
persona: The judge persona
adr_ids: Optional explicit list of ADR IDs to use

Returns:
List of rubric IDs to apply
"""
if adr_ids:
# Convert ADR IDs to rubric IDs
return [f"rubric_adr_{adr.replace('ADR-', '').split('-')[0].zfill(3)}"
for adr in adr_ids]

# Auto-detect based on persona expertise
index = self.load_rubric_index()
relevant = []

# Get persona's domain and expertise keywords
keywords = set()
for domain, terms in persona.expertise.items():
keywords.add(domain.lower())
keywords.update(t.lower() for t in terms)

# Add trigger condition keywords
for trigger in persona.trigger_conditions:
keywords.update(trigger.lower().split())

# Check each rubric for relevance
for rubric_id in index.get("rubrics", []):
rubric = self.load_rubric(rubric_id)
if not rubric:
continue

# Check if any dimension matches persona keywords
for dim in rubric.get("dimensions", []):
dim_text = f"{dim.get('name', '')} {dim.get('source_constraint', '')}".lower()
if any(kw in dim_text for kw in keywords):
relevant.append(rubric_id)
break

return relevant

def merge_rubrics(
self,
persona: JudgePersona,
adr_rubric_ids: Optional[List[str]] = None,
config: Optional[MergeConfig] = None
) -> MergeResult:
"""
Merge ADR rubrics with persona base dimensions.

Args:
persona: The judge persona to augment
adr_rubric_ids: Optional list of specific rubric IDs to use
config: Optional override config for this merge

Returns:
MergeResult with merged dimensions
"""
config = config or self.config
warnings = []

# Get base persona dimensions
base_dimensions = [
MergedDimension.from_persona_dimension(dim)
for dim in persona.evaluation_dimensions
]

# Get relevant ADR rubrics
if adr_rubric_ids is None:
adr_rubric_ids = self.get_relevant_adrs(persona)

# Load ADR dimensions
adr_dimensions = []
adrs_applied = []

for rubric_id in adr_rubric_ids:
rubric = self.load_rubric(rubric_id)
if not rubric:
warnings.append(f"Rubric not found: {rubric_id}")
continue

adrs_applied.append(rubric.get("source_adr", rubric_id))

for dim_data in rubric.get("dimensions", []):
dim = MergedDimension.from_adr_dimension(
dim_data,
rubric.get("source_adr", rubric_id)
)
adr_dimensions.append(dim)

# Apply merge strategy
merged = self._apply_merge_strategy(
base_dimensions,
adr_dimensions,
config
)

# Handle conflicts
merged = self._resolve_conflicts(merged, config)

# Apply constraints
if config.preserve_must_constraints:
merged = self._ensure_must_constraints(merged, adr_dimensions)

# Limit dimensions if needed
if len(merged) > config.max_total_dimensions:
warnings.append(f"Truncated from {len(merged)} to {config.max_total_dimensions} dimensions")
merged = self._prioritize_dimensions(merged, config.max_total_dimensions)

# Remove low-weight dimensions
merged = [d for d in merged if d.weight >= config.min_dimension_weight]

# Renormalize weights
merged = self._normalize_weights(merged)

return MergeResult(
persona_id=persona.persona_id,
merged_dimensions=merged,
total_weight=sum(d.weight for d in merged),
base_dimension_count=len(base_dimensions),
adr_dimension_count=len(adr_dimensions),
merged_dimension_count=len(merged),
adrs_applied=adrs_applied,
merge_strategy=config.strategy.value,
merged_at=datetime.now(timezone.utc).isoformat(),
warnings=warnings,
)

def _apply_merge_strategy(
self,
base: List[MergedDimension],
adr: List[MergedDimension],
config: MergeConfig
) -> List[MergedDimension]:
"""
Apply the configured merge strategy.

Args:
base: Base persona dimensions
adr: ADR-generated dimensions
config: Merge configuration

Returns:
Merged dimension list
"""
if config.strategy == MergeStrategy.APPEND:
# Simply combine all dimensions
return base + adr

elif config.strategy == MergeStrategy.WEIGHTED_APPEND:
# Reduce ADR dimension weights
for dim in adr:
dim.weight = dim.original_weight * config.adr_weight_factor
return base + adr

elif config.strategy == MergeStrategy.REPLACE_MATCHING:
# Replace base dimensions if ADR has matching name
adr_names = {d.name.lower(): d for d in adr}
result = []
replaced = set()

for dim in base:
if dim.name.lower() in adr_names:
result.append(adr_names[dim.name.lower()])
replaced.add(dim.name.lower())
else:
result.append(dim)

# Add non-matching ADR dimensions
for dim in adr:
if dim.name.lower() not in replaced:
result.append(dim)

return result

elif config.strategy == MergeStrategy.ADR_PRIORITY:
# ADR dimensions come first with full weight
return adr + base

return base + adr

def _resolve_conflicts(
self,
dimensions: List[MergedDimension],
config: MergeConfig
) -> List[MergedDimension]:
"""
Resolve conflicts between dimensions with similar names.

Args:
dimensions: List of all dimensions
config: Merge configuration

Returns:
Deduplicated dimension list
"""
# Group by normalized name
by_name: Dict[str, List[MergedDimension]] = {}
for dim in dimensions:
key = self._normalize_dimension_name(dim.name)
if key not in by_name:
by_name[key] = []
by_name[key].append(dim)

result = []
for name, dims in by_name.items():
if len(dims) == 1:
result.append(dims[0])
else:
# Conflict - apply resolution strategy
resolved = self._resolve_dimension_conflict(dims, config)
result.append(resolved)

return result

def _normalize_dimension_name(self, name: str) -> str:
"""Normalize dimension name for comparison."""
name = name.lower()
name = re.sub(r'[^a-z0-9]+', '_', name)
name = name.strip('_')
return name

def _resolve_dimension_conflict(
self,
dims: List[MergedDimension],
config: MergeConfig
) -> MergedDimension:
"""
Resolve a conflict between multiple dimensions.

Args:
dims: Conflicting dimensions
config: Merge configuration

Returns:
Single resolved dimension
"""
resolution = config.conflict_resolution

if resolution == ConflictResolution.KEEP_BASE:
base_dims = [d for d in dims if d.source == "base"]
return base_dims[0] if base_dims else dims[0]

elif resolution == ConflictResolution.KEEP_ADR:
adr_dims = [d for d in dims if d.source == "adr"]
return adr_dims[0] if adr_dims else dims[0]

elif resolution == ConflictResolution.HIGHEST_WEIGHT:
return max(dims, key=lambda d: d.original_weight)

elif resolution == ConflictResolution.MERGE_SCORES:
# Merge score descriptions from all sources
merged = dims[0]
merged.source = "merged"

# Combine score descriptions
all_scores = {}
for dim in dims:
for score, desc in dim.score_descriptions.items():
key = int(score) if isinstance(score, str) else score
if key not in all_scores:
all_scores[key] = desc
else:
# Append if different
if desc not in all_scores[key]:
all_scores[key] = f"{all_scores[key]}; {desc}"

merged.score_descriptions = {str(k): v for k, v in all_scores.items()}

# Combine evaluation steps
all_steps = []
for dim in dims:
for step in dim.evaluation_steps:
if step not in all_steps:
all_steps.append(step)
merged.evaluation_steps = all_steps[:10] # Limit

# Average weight
merged.weight = sum(d.original_weight for d in dims) / len(dims)

return merged

return dims[0]

def _ensure_must_constraints(
self,
merged: List[MergedDimension],
adr_dims: List[MergedDimension]
) -> List[MergedDimension]:
"""
Ensure all MUST-level ADR constraints are included.

Args:
merged: Current merged dimensions
adr_dims: All ADR dimensions

Returns:
Updated dimension list with MUST constraints
"""
merged_ids = {d.id for d in merged}

for dim in adr_dims:
if dim.constraint_level == "MUST" and dim.id not in merged_ids:
merged.append(dim)
merged_ids.add(dim.id)

return merged

def _prioritize_dimensions(
self,
dimensions: List[MergedDimension],
max_count: int
) -> List[MergedDimension]:
"""
Prioritize dimensions when over the limit.

Priority order:
1. MUST-level ADR constraints
2. Base persona dimensions
3. SHOULD-level ADR constraints
4. Higher weight dimensions

Args:
dimensions: All dimensions
max_count: Maximum to keep

Returns:
Prioritized and truncated list
"""
def priority_key(dim: MergedDimension) -> Tuple:
level_priority = {
"MUST": 0,
"SHOULD": 2,
"MAY": 3,
None: 1, # Base dimensions
}
return (
level_priority.get(dim.constraint_level, 4),
0 if dim.source == "base" else 1,
-dim.original_weight, # Higher weight = higher priority
)

sorted_dims = sorted(dimensions, key=priority_key)
return sorted_dims[:max_count]

def _normalize_weights(
self,
dimensions: List[MergedDimension]
) -> List[MergedDimension]:
"""
Normalize weights to sum to 1.0.

Args:
dimensions: Dimensions to normalize

Returns:
Dimensions with normalized weights
"""
if not dimensions:
return dimensions

total = sum(d.weight for d in dimensions)
if total <= 0:
# Assign equal weights
equal_weight = 1.0 / len(dimensions)
for dim in dimensions:
dim.weight = round(equal_weight, 4)
else:
for dim in dimensions:
dim.weight = round(dim.weight / total, 4)

return dimensions

def save_merged_rubric(
self,
result: MergeResult,
output_dir: Optional[Path] = None
) -> Path:
"""
Save merged rubric to file.

Args:
result: The merge result
output_dir: Output directory (default: config/merged-rubrics/)

Returns:
Path to saved file
"""
if output_dir is None:
output_dir = self.rubrics_dir.parent / "merged-rubrics"

output_dir.mkdir(parents=True, exist_ok=True)

output_path = output_dir / f"merged_{result.persona_id}.json"

with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result.to_dict(), f, indent=2)

return output_path

Convenience functions

_default_merger: Optional[RubricMerger] = None

def get_default_merger() -> RubricMerger: """Get or create the default RubricMerger instance.""" global _default_merger if _default_merger is None: _default_merger = RubricMerger() return _default_merger

def merge_rubrics( persona_id: str, adr_ids: Optional[List[str]] = None, strategy: MergeStrategy = MergeStrategy.WEIGHTED_APPEND ) -> MergeResult: """ Convenience function to merge rubrics for a persona.

Args:
persona_id: The persona ID
adr_ids: Optional list of ADR IDs (e.g., ["ADR-001", "ADR-009"])
strategy: Merge strategy to use

Returns:
MergeResult with merged dimensions

Example:
result = merge_rubrics("technical_architect", ["ADR-001", "ADR-009"])
for dim in result.merged_dimensions:
print(f"{dim.name}: {dim.weight}")
"""
merger = get_default_merger()
loader = merger.persona_loader

persona = loader.get_persona(persona_id)

config = MergeConfig(strategy=strategy)

# Convert ADR IDs to rubric IDs if provided
rubric_ids = None
if adr_ids:
rubric_ids = [
f"rubric_adr_{adr.replace('ADR-', '').split('-')[0].zfill(3)}"
for adr in adr_ids
]

return merger.merge_rubrics(persona, rubric_ids, config)

def merge_persona_with_adrs( persona_id: str, adr_ids: Optional[List[str]] = None ) -> Dict[str, Any]: """ Merge ADR rubrics with persona and return as dictionary.

This is the main convenience function for integrating
ADR-generated rubrics with judge personas.

Args:
persona_id: The persona ID (e.g., "technical_architect")
adr_ids: Optional list of ADR IDs to apply

Returns:
Dictionary with merged persona data

Example:
merged = merge_persona_with_adrs("technical_architect")
print(f"Merged {merged['statistics']['adr_dimension_count']} ADR dimensions")
"""
result = merge_rubrics(persona_id, adr_ids)
return result.to_dict()

def batch_merge_all_personas( output_dir: Optional[Path] = None ) -> Dict[str, MergeResult]: """ Merge ADR rubrics with all enabled personas.

Args:
output_dir: Optional output directory for merged files

Returns:
Dictionary mapping persona_id to MergeResult
"""
merger = get_default_merger()
loader = merger.persona_loader

results = {}

for persona in loader.get_all_personas(enabled_only=True):
result = merger.merge_rubrics(persona)
results[persona.persona_id] = result

if output_dir:
merger.save_merged_rubric(result, output_dir)

return results