Skip to main content

#!/usr/bin/env python3 """ ADR-to-Rubric Generator for MoE Verification Layer.

Automatically extracts evaluation rubrics from Architecture Decision Records (ADRs) based on MUST/SHOULD/MAY constraint patterns (H.3.2.1).

Features:

  • Parses ADR markdown files with YAML frontmatter
  • Extracts MUST/SHOULD/MAY constraints from Decision sections
  • Generates evaluation rubrics with 1-3 scoring scales
  • Outputs to config/generated-rubrics/ as JSON
  • Supports batch processing of entire ADR directories

Usage: python scripts/adr-rubric-generator.py --adr ADR-001.md python scripts/adr-rubric-generator.py --adr-dir --output config/generated-rubrics/ python scripts/adr-rubric-generator.py --adr-dir --dry-run

Note: ADR-213 moved ADRs to coditect-documentation/coditect-core/adrs/

Research Basis: - judge-persona-design-methodology.md Part 4: ADR-to-Rubric Pipeline - G-EVAL pattern (Liu et al., 2023) """

import argparse import json import os import re import sys from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Dict, List, Optional, Any, Tuple, Set import hashlib

class ConstraintLevel(str, Enum): """RFC 2119 constraint levels with weights.""" MUST = "MUST" # Mandatory - failure is critical SHOULD = "SHOULD" # Recommended - failure is significant MAY = "MAY" # Optional - failure is minor

@property
def weight(self) -> float:
"""Get the default weight for this constraint level."""
weights = {
ConstraintLevel.MUST: 0.50,
ConstraintLevel.SHOULD: 0.35,
ConstraintLevel.MAY: 0.15,
}
return weights[self]

@property
def failure_severity(self) -> str:
"""Get the failure severity for this constraint level."""
severities = {
ConstraintLevel.MUST: "CRITICAL",
ConstraintLevel.SHOULD: "MAJOR",
ConstraintLevel.MAY: "MINOR",
}
return severities[self]

@dataclass class Constraint: """A single extracted constraint from an ADR.""" text: str level: ConstraintLevel source_section: str source_line: Optional[int] = None technical_terms: List[str] = field(default_factory=list)

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"text": self.text,
"level": self.level.value,
"source_section": self.source_section,
"source_line": self.source_line,
"technical_terms": self.technical_terms,
"weight": self.level.weight,
"failure_severity": self.level.failure_severity,
}

@dataclass class ScoreDescription: """Description for a single score level.""" score: int description: str constraint_level: str # What level of compliance this represents

@dataclass class RubricDimension: """A single evaluation dimension in a rubric.""" id: str name: str source_constraint: str constraint_level: ConstraintLevel weight: float scale: List[int] score_descriptions: Dict[int, str] evaluation_steps: List[str]

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"id": self.id,
"name": self.name,
"source_constraint": self.source_constraint,
"constraint_level": self.constraint_level.value,
"weight": self.weight,
"scale": self.scale,
"score_descriptions": self.score_descriptions,
"evaluation_steps": self.evaluation_steps,
}

@dataclass class GeneratedRubric: """A complete evaluation rubric generated from an ADR.""" rubric_id: str source_adr: str adr_title: str adr_status: str generated_at: str version: str dimensions: List[RubricDimension] total_weight: float constraint_summary: Dict[str, int] # Count by level

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"rubric_id": self.rubric_id,
"source_adr": self.source_adr,
"adr_title": self.adr_title,
"adr_status": self.adr_status,
"generated_at": self.generated_at,
"version": self.version,
"dimensions": [d.to_dict() for d in self.dimensions],
"total_weight": self.total_weight,
"constraint_summary": self.constraint_summary,
"dimension_count": len(self.dimensions),
}

class ADRParser: """ Parses Architecture Decision Records to extract constraints.

Supports MADR (Markdown Architectural Decision Records) format with:
- YAML frontmatter
- Standard sections: Context, Decision, Consequences
- RFC 2119 keywords: MUST, SHALL, SHOULD, MAY, etc.
"""

# RFC 2119 + common variants constraint patterns
CONSTRAINT_PATTERNS = {
ConstraintLevel.MUST: [
r'\b(must|shall|will|required|mandatory|always)\b',
r'\b(have to|need to|has to|needs to)\b',
r'\b(is required|are required)\b',
],
ConstraintLevel.SHOULD: [
r'\b(should|recommended|preferred|preferably)\b',
r'\b(ought to|expected to)\b',
r'\b(is recommended|are recommended)\b',
],
ConstraintLevel.MAY: [
r'\b(may|can|could|optional|optionally)\b',
r'\b(is optional|are optional)\b',
r'\b(is allowed|are allowed)\b',
],
}

# Technical term patterns to extract
TECHNICAL_PATTERNS = [
# Security
r'\b(encryption|TLS|SSL|AES|RSA|SHA|HMAC|JWT|OAuth|OIDC)\b',
r'\b(HIPAA|SOC\s*2|PCI-DSS|GDPR|FHIR|HL7)\b',
r'\b(authentication|authorization|RBAC|ABAC)\b',
# Data
r'\b(FoundationDB|PostgreSQL|Redis|Kafka|RabbitMQ)\b',
r'\b(event\s*sourcing|CQRS|ACID|eventual\s*consistency)\b',
r'\b(immutable|idempotent|transactional)\b',
# Architecture
r'\b(microservices?|monolith|serverless|containerized?)\b',
r'\b(API|REST|GraphQL|gRPC|WebSocket)\b',
r'\b(Kubernetes|Docker|GKE|Cloud\s*Run)\b',
# Quality
r'\b(latency|throughput|availability|durability)\b',
r'\b(SLA|SLO|SLI|uptime)\b',
]

def __init__(self):
"""Initialize the parser with compiled regex patterns."""
self._compile_patterns()

def _compile_patterns(self):
"""Compile regex patterns for performance."""
self.compiled_constraints = {}
for level, patterns in self.CONSTRAINT_PATTERNS.items():
combined = '|'.join(f'({p})' for p in patterns)
self.compiled_constraints[level] = re.compile(combined, re.IGNORECASE)

self.compiled_technical = re.compile(
'|'.join(f'({p})' for p in self.TECHNICAL_PATTERNS),
re.IGNORECASE
)

def parse_adr(self, file_path: Path) -> Tuple[Dict[str, Any], str]:
"""
Parse an ADR file and return frontmatter and content.

Args:
file_path: Path to the ADR markdown file

Returns:
Tuple of (frontmatter dict, content string)
"""
content = file_path.read_text(encoding='utf-8')

# Parse YAML frontmatter
frontmatter = {}
body = content

if content.startswith('---'):
parts = content.split('---', 2)
if len(parts) >= 3:
try:
# Simple YAML parsing for frontmatter
frontmatter = self._parse_simple_yaml(parts[1])
body = parts[2]
except Exception:
pass

return frontmatter, body

def _parse_simple_yaml(self, yaml_str: str) -> Dict[str, Any]:
"""Simple YAML parser for frontmatter (avoids external dependency)."""
result = {}
current_key = None
current_list = None

for line in yaml_str.strip().split('\n'):
line = line.rstrip()
if not line or line.startswith('#'):
continue

# List item
if line.startswith('- '):
if current_list is not None:
current_list.append(line[2:].strip().strip("'\""))
continue

# Key-value
if ':' in line:
key, _, value = line.partition(':')
key = key.strip()
value = value.strip().strip("'\"")

if value:
result[key] = value
current_key = None
current_list = None
else:
# Start of list or nested structure
result[key] = []
current_key = key
current_list = result[key]

return result

def extract_sections(self, content: str) -> Dict[str, str]:
"""
Extract named sections from ADR content.

Args:
content: The markdown content body

Returns:
Dict mapping section names to their content
"""
sections = {}
current_section = "preamble"
current_content = []

for line in content.split('\n'):
# Match markdown headers (## or ###)
header_match = re.match(r'^#{1,3}\s+(.+)$', line)
if header_match:
# Save previous section
if current_content:
sections[current_section.lower()] = '\n'.join(current_content)
current_section = header_match.group(1).strip()
current_content = []
else:
current_content.append(line)

# Save last section
if current_content:
sections[current_section.lower()] = '\n'.join(current_content)

return sections

def extract_constraints(
self,
content: str,
sections_to_search: Optional[List[str]] = None
) -> List[Constraint]:
"""
Extract RFC 2119 constraints from ADR content.

Args:
content: The ADR content
sections_to_search: Optional list of section names to limit search
Default: ["decision", "consequences", "requirements"]

Returns:
List of extracted Constraint objects
"""
sections = self.extract_sections(content)

# Default sections most likely to contain constraints
if sections_to_search is None:
sections_to_search = [
"decision", "decisions",
"consequences",
"requirements",
"constraints",
"technical decision",
"decision outcome",
]

constraints = []

for section_name, section_content in sections.items():
# Check if this section should be searched
should_search = any(
search_term in section_name.lower()
for search_term in sections_to_search
)

if not should_search:
continue

# Process each sentence/line
for line_num, line in enumerate(section_content.split('\n'), 1):
line = line.strip()
if not line or line.startswith('#'):
continue

# Extract constraints from this line
line_constraints = self._extract_line_constraints(
line, section_name, line_num
)
constraints.extend(line_constraints)

return constraints

def _extract_line_constraints(
self,
line: str,
section_name: str,
line_num: int
) -> List[Constraint]:
"""
Extract constraints from a single line.

Args:
line: The line to analyze
section_name: The section this line is from
line_num: Line number within the section

Returns:
List of Constraint objects found in this line
"""
constraints = []

# Determine the highest constraint level in this line
found_level = None
for level in [ConstraintLevel.MUST, ConstraintLevel.SHOULD, ConstraintLevel.MAY]:
if self.compiled_constraints[level].search(line):
found_level = level
break # Take the highest level found

if found_level is None:
return constraints

# Extract technical terms
technical_terms = []
for match in self.compiled_technical.finditer(line):
term = match.group(0)
if term and term not in technical_terms:
technical_terms.append(term)

# Clean up the constraint text
clean_text = self._clean_constraint_text(line)

if clean_text:
constraints.append(Constraint(
text=clean_text,
level=found_level,
source_section=section_name,
source_line=line_num,
technical_terms=technical_terms,
))

return constraints

def _clean_constraint_text(self, text: str) -> str:
"""
Clean constraint text for use in rubrics.

Args:
text: Raw constraint text

Returns:
Cleaned text suitable for rubric
"""
# Remove markdown formatting
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) # Bold
text = re.sub(r'\*(.+?)\*', r'\1', text) # Italic
text = re.sub(r'`(.+?)`', r'\1', text) # Code
text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text) # Links

# Remove list markers
text = re.sub(r'^[-*+]\s+', '', text)
text = re.sub(r'^\d+\.\s+', '', text)

# Normalize whitespace
text = ' '.join(text.split())

return text.strip()

class RubricGenerator: """ Generates evaluation rubrics from extracted ADR constraints.

Uses the G-EVAL pattern to create:
- 1-3 scoring scales for each dimension
- Clear score descriptions based on compliance level
- Evaluation steps for judges to follow
"""

def __init__(self, normalize_weights: bool = True):
"""
Initialize the generator.

Args:
normalize_weights: Whether to normalize dimension weights to sum to 1.0
"""
self.normalize_weights = normalize_weights

def generate_rubric(
self,
constraints: List[Constraint],
adr_info: Dict[str, Any],
source_file: str
) -> GeneratedRubric:
"""
Generate a complete rubric from constraints.

Args:
constraints: List of extracted constraints
adr_info: ADR metadata (title, status, etc.)
source_file: Path to the source ADR file

Returns:
GeneratedRubric object
"""
# Generate dimensions from constraints
dimensions = []
for i, constraint in enumerate(constraints):
dimension = self._constraint_to_dimension(constraint, i + 1)
dimensions.append(dimension)

# Normalize weights if requested
if self.normalize_weights and dimensions:
total_weight = sum(d.weight for d in dimensions)
if total_weight > 0:
for dimension in dimensions:
dimension.weight = round(dimension.weight / total_weight, 4)

# Calculate constraint summary
constraint_summary = {
"MUST": sum(1 for c in constraints if c.level == ConstraintLevel.MUST),
"SHOULD": sum(1 for c in constraints if c.level == ConstraintLevel.SHOULD),
"MAY": sum(1 for c in constraints if c.level == ConstraintLevel.MAY),
}

# Generate rubric ID from source file
rubric_id = self._generate_rubric_id(source_file)

return GeneratedRubric(
rubric_id=rubric_id,
source_adr=source_file,
adr_title=adr_info.get('title', 'Unknown ADR'),
adr_status=adr_info.get('status', 'unknown'),
generated_at=datetime.now(timezone.utc).isoformat() + 'Z',
version="1.0.0",
dimensions=dimensions,
total_weight=sum(d.weight for d in dimensions),
constraint_summary=constraint_summary,
)

def _constraint_to_dimension(
self,
constraint: Constraint,
index: int
) -> RubricDimension:
"""
Convert a constraint to an evaluation dimension.

Args:
constraint: The constraint to convert
index: Index for generating unique dimension ID

Returns:
RubricDimension object
"""
# Generate dimension name from constraint
name = self._generate_dimension_name(constraint)

# Generate dimension ID
dimension_id = f"dim_{index:03d}_{self._slugify(name)[:20]}"

# Generate score descriptions based on constraint level
score_descriptions = self._generate_score_descriptions(constraint)

# Generate evaluation steps
evaluation_steps = self._generate_evaluation_steps(constraint)

return RubricDimension(
id=dimension_id,
name=name,
source_constraint=constraint.text,
constraint_level=constraint.level,
weight=constraint.level.weight,
scale=[1, 2, 3],
score_descriptions=score_descriptions,
evaluation_steps=evaluation_steps,
)

def _generate_dimension_name(self, constraint: Constraint) -> str:
"""
Generate a descriptive name for the dimension.

Args:
constraint: The source constraint

Returns:
Human-readable dimension name
"""
text = constraint.text

# If there are technical terms, use them
if constraint.technical_terms:
primary_term = constraint.technical_terms[0]
# Create a name based on the technical term
return f"{primary_term} Compliance"

# Extract key verb/noun phrase
# Look for patterns like "use X", "implement Y", "ensure Z"
patterns = [
r'\b(use|implement|ensure|provide|maintain|support)\s+(\w+(?:\s+\w+)?)',
r'\b(\w+)\s+(must|should|shall|will)',
]

for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
groups = match.groups()
# Get the most meaningful word
for group in groups:
if group and group.lower() not in ['must', 'should', 'shall', 'will', 'be']:
return f"{group.title()} Requirement"

# Fallback: use first few words
words = text.split()[:4]
return ' '.join(words).title() + " Compliance"

def _generate_score_descriptions(self, constraint: Constraint) -> Dict[int, str]:
"""
Generate score descriptions for a constraint.

The G-EVAL pattern uses clear descriptions of what each score means.

Args:
constraint: The source constraint

Returns:
Dict mapping scores (1, 2, 3) to descriptions
"""
text = constraint.text
level = constraint.level

# Template-based score descriptions
if level == ConstraintLevel.MUST:
return {
3: f"Fully compliant: {text}",
2: f"Partially compliant with documented exceptions: {text}",
1: f"Non-compliant or missing implementation: {text}",
}
elif level == ConstraintLevel.SHOULD:
return {
3: f"Fully implemented as recommended: {text}",
2: f"Partially implemented with acceptable trade-offs: {text}",
1: f"Not implemented (may be acceptable with justification): {text}",
}
else: # MAY
return {
3: f"Implemented with best practices: {text}",
2: f"Basic implementation present: {text}",
1: f"Not implemented (acceptable for optional requirement): {text}",
}

def _generate_evaluation_steps(self, constraint: Constraint) -> List[str]:
"""
Generate evaluation steps for judges to follow.

Args:
constraint: The source constraint

Returns:
List of evaluation step strings
"""
steps = []

# Step 1: Locate relevant code/configuration
if constraint.technical_terms:
terms = ', '.join(constraint.technical_terms[:3])
steps.append(f"1. Search codebase for implementations related to: {terms}")
else:
steps.append("1. Identify code sections relevant to this requirement")

# Step 2: Verify compliance
steps.append(f"2. Verify compliance with: {constraint.text[:100]}...")

# Step 3: Check for documented exceptions
if constraint.level in [ConstraintLevel.MUST, ConstraintLevel.SHOULD]:
steps.append("3. Check for documented exceptions or alternative approaches")

# Step 4: Assess implementation quality
steps.append("4. Evaluate implementation quality and completeness")

# Step 5: Assign score
steps.append("5. Assign score (1-3) based on compliance level")

return steps

def _generate_rubric_id(self, source_file: str) -> str:
"""
Generate a unique rubric ID from the source file.

Args:
source_file: Path to the source ADR

Returns:
Unique rubric ID string
"""
# Extract ADR number from filename
match = re.search(r'ADR-(\d+)', source_file, re.IGNORECASE)
if match:
adr_num = match.group(1)
return f"rubric_adr_{adr_num}"

# Fallback: hash the filename
hash_str = hashlib.md5(source_file.encode()).hexdigest()[:8]
return f"rubric_{hash_str}"

def _slugify(self, text: str) -> str:
"""
Convert text to a URL-safe slug.

Args:
text: Text to slugify

Returns:
Slugified string
"""
text = text.lower()
text = re.sub(r'[^a-z0-9]+', '_', text)
text = text.strip('_')
return text

class ADRRubricCLI: """ Command-line interface for the ADR rubric generator. """

def __init__(self):
"""Initialize the CLI with parser and generator."""
self.parser = ADRParser()
self.generator = RubricGenerator()

def process_single_adr(
self,
adr_path: Path,
output_dir: Optional[Path] = None,
dry_run: bool = False
) -> Optional[GeneratedRubric]:
"""
Process a single ADR file.

Args:
adr_path: Path to the ADR file
output_dir: Optional output directory
dry_run: If True, don't write files

Returns:
Generated rubric or None if no constraints found
"""
print(f"Processing: {adr_path.name}")

# Parse ADR
frontmatter, content = self.parser.parse_adr(adr_path)

# Extract constraints
constraints = self.parser.extract_constraints(content)

if not constraints:
print(f" No constraints found in {adr_path.name}")
return None

print(f" Found {len(constraints)} constraints:")
for level in ConstraintLevel:
count = sum(1 for c in constraints if c.level == level)
if count > 0:
print(f" - {level.value}: {count}")

# Generate rubric
rubric = self.generator.generate_rubric(
constraints=constraints,
adr_info=frontmatter,
source_file=str(adr_path)
)

# Write output
if output_dir and not dry_run:
output_path = output_dir / f"{rubric.rubric_id}.json"
output_path.parent.mkdir(parents=True, exist_ok=True)

with open(output_path, 'w', encoding='utf-8') as f:
json.dump(rubric.to_dict(), f, indent=2)

print(f" Output: {output_path}")
elif dry_run:
print(f" [DRY RUN] Would write: {rubric.rubric_id}.json")
print(f" Dimensions: {len(rubric.dimensions)}")

return rubric

def process_directory(
self,
adr_dir: Path,
output_dir: Path,
dry_run: bool = False
) -> Dict[str, Any]:
"""
Process all ADR files in a directory.

Args:
adr_dir: Directory containing ADR files
output_dir: Directory for output files
dry_run: If True, don't write files

Returns:
Summary statistics dict
"""
# Find all ADR files
adr_files = list(adr_dir.glob('ADR-*.md'))
adr_files.sort()

print(f"\nProcessing {len(adr_files)} ADR files from {adr_dir}")
print("=" * 60)

stats = {
"total_adrs": len(adr_files),
"adrs_with_constraints": 0,
"total_constraints": 0,
"total_dimensions": 0,
"by_level": {"MUST": 0, "SHOULD": 0, "MAY": 0},
"rubrics_generated": [],
}

for adr_path in adr_files:
rubric = self.process_single_adr(adr_path, output_dir, dry_run)

if rubric:
stats["adrs_with_constraints"] += 1
stats["total_dimensions"] += len(rubric.dimensions)
stats["rubrics_generated"].append(rubric.rubric_id)

for level, count in rubric.constraint_summary.items():
stats["total_constraints"] += count
stats["by_level"][level] += count

# Write index file
if not dry_run and stats["rubrics_generated"]:
index_path = output_dir / "_index.json"
index_data = {
"generated_at": datetime.now(timezone.utc).isoformat() + 'Z',
"source_directory": str(adr_dir),
"rubric_count": len(stats["rubrics_generated"]),
"rubrics": stats["rubrics_generated"],
"statistics": {
"total_adrs_processed": stats["total_adrs"],
"adrs_with_constraints": stats["adrs_with_constraints"],
"total_constraints": stats["total_constraints"],
"total_dimensions": stats["total_dimensions"],
"constraints_by_level": stats["by_level"],
}
}

with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index_data, f, indent=2)

print(f"\nIndex written to: {index_path}")

return stats

def print_summary(self, stats: Dict[str, Any]):
"""Print processing summary."""
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"ADRs processed: {stats['total_adrs']}")
print(f"ADRs with constraints: {stats['adrs_with_constraints']}")
print(f"Total constraints: {stats['total_constraints']}")
print(f"Total dimensions: {stats['total_dimensions']}")
print(f"\nConstraints by level:")
print(f" MUST: {stats['by_level']['MUST']}")
print(f" SHOULD: {stats['by_level']['SHOULD']}")
print(f" MAY: {stats['by_level']['MAY']}")

if stats['rubrics_generated']:
print(f"\nRubrics generated: {len(stats['rubrics_generated'])}")

def main(): """Main entry point for the CLI.""" parser = argparse.ArgumentParser( description="Generate evaluation rubrics from Architecture Decision Records", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples:

Process single ADR (ADR-213: ADRs now in coditect-documentation)

python scripts/adr-rubric-generator.py --adr path/to/adrs/ADR-001.md

Process entire directory

python scripts/adr-rubric-generator.py --adr-dir path/to/adrs/ --output config/generated-rubrics/

Dry run (preview without writing)

python scripts/adr-rubric-generator.py --adr-dir path/to/adrs/ --dry-run

Verbose output

python scripts/adr-rubric-generator.py --adr-dir path/to/adrs/ -v """ )

parser.add_argument(
'--adr',
type=Path,
help='Path to a single ADR file to process'
)

parser.add_argument(
'--adr-dir',
type=Path,
help='Directory containing ADR files to process'
)

parser.add_argument(
'--output', '-o',
type=Path,
default=Path('config/generated-rubrics'),
help='Output directory for generated rubrics (default: config/generated-rubrics/)'
)

parser.add_argument(
'--dry-run', '-n',
action='store_true',
help='Preview processing without writing files'
)

parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)

args = parser.parse_args()

# Validate arguments
if not args.adr and not args.adr_dir:
parser.error("Either --adr or --adr-dir must be specified")

if args.adr and args.adr_dir:
parser.error("Cannot specify both --adr and --adr-dir")

# Initialize CLI
cli = ADRRubricCLI()

# Process
if args.adr:
if not args.adr.exists():
print(f"Error: ADR file not found: {args.adr}")
sys.exit(1)

rubric = cli.process_single_adr(args.adr, args.output, args.dry_run)

if rubric:
print(f"\nGenerated rubric with {len(rubric.dimensions)} dimensions")
else:
print("\nNo rubric generated (no constraints found)")

else: # --adr-dir
if not args.adr_dir.exists():
print(f"Error: ADR directory not found: {args.adr_dir}")
sys.exit(1)

stats = cli.process_directory(args.adr_dir, args.output, args.dry_run)
cli.print_summary(stats)

if name == 'main': main()