""" Tests for ADR Rubric Generator and Rubric Merger (H.3.2.6).
Tests cover:
- ADR parsing and frontmatter extraction
- Constraint extraction (MUST/SHOULD/MAY patterns)
- Technical term detection
- Rubric generation logic
- Weight normalization
- Batch processing
- Rubric merger functionality
- Edge cases """
import json import os import sys import tempfile import unittest from pathlib import Path from unittest.mock import Mock, patch, MagicMock
Add parent to path for imports
sys.path.insert(0, str(Path(file).parent.parent.parent))
from scripts.moe_classifier.core.rubric_merger import ( RubricMerger, MergeStrategy, MergeConfig, MergeResult, MergedDimension, ConflictResolution, merge_rubrics, merge_persona_with_adrs, get_default_merger, )
Import from the script directly
script_path = Path(file).parent.parent / "adr-rubric-generator.py" spec = import("importlib.util").util.spec_from_file_location("adr_rubric_generator", script_path) adr_module = import("importlib.util").util.module_from_spec(spec) spec.loader.exec_module(adr_module)
ADRParser = adr_module.ADRParser RubricGenerator = adr_module.RubricGenerator ConstraintLevel = adr_module.ConstraintLevel Constraint = adr_module.Constraint GeneratedRubric = adr_module.GeneratedRubric ADRRubricCLI = adr_module.ADRRubricCLI
class TestADRParser(unittest.TestCase): """Tests for ADRParser class."""
def setUp(self):
"""Set up test fixtures."""
self.parser = ADRParser()
def test_parser_initialization(self):
"""Test parser initializes with compiled patterns."""
self.assertIsNotNone(self.parser.compiled_constraints)
self.assertEqual(len(self.parser.compiled_constraints), 3)
self.assertIn(ConstraintLevel.MUST, self.parser.compiled_constraints)
self.assertIn(ConstraintLevel.SHOULD, self.parser.compiled_constraints)
self.assertIn(ConstraintLevel.MAY, self.parser.compiled_constraints)
def test_parse_simple_yaml_frontmatter(self):
"""Test parsing simple YAML frontmatter."""
yaml_str = """
title: Test ADR status: accepted version: 1.0.0 """ result = self.parser._parse_simple_yaml(yaml_str) self.assertEqual(result.get("title"), "Test ADR") self.assertEqual(result.get("status"), "accepted") self.assertEqual(result.get("version"), "1.0.0")
def test_parse_yaml_with_list(self):
"""Test parsing YAML with list values."""
yaml_str = """
title: Test ADR keywords:
-
security
-
authentication
-
oauth """ result = self.parser._parse_simple_yaml(yaml_str) self.assertEqual(result.get("title"), "Test ADR") self.assertIn("keywords", result) self.assertEqual(len(result["keywords"]), 3)
def test_extract_sections(self): """Test section extraction from markdown.""" content = """
Title
Some preamble text.
Context
This is the context section.
Decision
We will implement X.
Consequences
This has consequences. """ sections = self.parser.extract_sections(content) self.assertIn("context", sections) self.assertIn("decision", sections) self.assertIn("consequences", sections)
class TestConstraintExtraction(unittest.TestCase): """Tests for constraint extraction patterns."""
def setUp(self):
"""Set up test fixtures."""
self.parser = ADRParser()
def test_extract_must_constraints(self):
"""Test extraction of MUST-level constraints."""
content = """
Decision
We must use FoundationDB for state management. The system shall implement encryption at rest. All data will be validated before storage. """ constraints = self.parser.extract_constraints(content) must_constraints = [c for c in constraints if c.level == ConstraintLevel.MUST] self.assertGreaterEqual(len(must_constraints), 2)
def test_extract_should_constraints(self):
"""Test extraction of SHOULD-level constraints."""
content = """
Decision
Components should follow the single responsibility principle. It is recommended to use async/await patterns. """ constraints = self.parser.extract_constraints(content) should_constraints = [c for c in constraints if c.level == ConstraintLevel.SHOULD] self.assertGreaterEqual(len(should_constraints), 1)
def test_extract_may_constraints(self):
"""Test extraction of MAY-level constraints."""
content = """
Decision
Developers may use optional caching for performance. This feature can be disabled in development environments. """ constraints = self.parser.extract_constraints(content) may_constraints = [c for c in constraints if c.level == ConstraintLevel.MAY] self.assertGreaterEqual(len(may_constraints), 1)
def test_extract_technical_terms(self):
"""Test extraction of technical terms."""
content = """
Decision
We must use TLS 1.3 for all API communications. The system will implement OAuth 2.0 with JWT tokens. All data must be encrypted using AES-256. """ constraints = self.parser.extract_constraints(content) all_terms = [] for c in constraints: all_terms.extend(c.technical_terms)
# Check for expected technical terms
terms_lower = [t.lower() for t in all_terms]
self.assertTrue(any('tls' in t for t in terms_lower) or
any('oauth' in t for t in terms_lower) or
any('jwt' in t for t in terms_lower) or
any('aes' in t for t in terms_lower))
def test_constraint_source_tracking(self):
"""Test that constraints track their source section."""
content = """
Decision
We must implement feature X.
Consequences
Teams should document all changes. """ constraints = self.parser.extract_constraints(content) sources = {c.source_section.lower() for c in constraints} self.assertTrue(len(sources) >= 1)
def test_no_constraints_in_context(self):
"""Test that context section is not searched by default."""
content = """
Context
We must consider legacy requirements. The system should support old clients.
Decision
Use REST APIs. """ constraints = self.parser.extract_constraints(content) # Context section constraints should not be extracted by default context_constraints = [c for c in constraints if "context" in c.source_section.lower()] self.assertEqual(len(context_constraints), 0)
class TestRubricGeneration(unittest.TestCase): """Tests for rubric generation logic."""
def setUp(self):
"""Set up test fixtures."""
self.generator = RubricGenerator()
def test_generate_rubric_from_constraints(self):
"""Test rubric generation from constraints."""
constraints = [
Constraint(
text="Must use FoundationDB",
level=ConstraintLevel.MUST,
source_section="Decision",
technical_terms=["FoundationDB"]
),
Constraint(
text="Should implement caching",
level=ConstraintLevel.SHOULD,
source_section="Decision",
technical_terms=[]
),
]
rubric = self.generator.generate_rubric(
constraints=constraints,
adr_info={"title": "Test ADR", "status": "accepted"},
source_file="ADR-001.md"
)
self.assertEqual(len(rubric.dimensions), 2)
self.assertEqual(rubric.constraint_summary["MUST"], 1)
self.assertEqual(rubric.constraint_summary["SHOULD"], 1)
def test_dimension_has_required_fields(self):
"""Test that generated dimensions have all required fields."""
constraints = [
Constraint(
text="Must encrypt data at rest",
level=ConstraintLevel.MUST,
source_section="Decision",
technical_terms=["encryption"]
),
]
rubric = self.generator.generate_rubric(
constraints=constraints,
adr_info={},
source_file="ADR-001.md"
)
dim = rubric.dimensions[0]
self.assertIsNotNone(dim.id)
self.assertIsNotNone(dim.name)
self.assertEqual(dim.scale, [1, 2, 3])
self.assertIn(1, dim.score_descriptions)
self.assertIn(2, dim.score_descriptions)
self.assertIn(3, dim.score_descriptions)
self.assertGreater(len(dim.evaluation_steps), 0)
def test_score_descriptions_vary_by_level(self):
"""Test that score descriptions differ by constraint level."""
must_constraint = Constraint(
text="Must do X",
level=ConstraintLevel.MUST,
source_section="Decision",
technical_terms=[]
)
may_constraint = Constraint(
text="May do Y",
level=ConstraintLevel.MAY,
source_section="Decision",
technical_terms=[]
)
must_rubric = self.generator.generate_rubric([must_constraint], {}, "test.md")
may_rubric = self.generator.generate_rubric([may_constraint], {}, "test.md")
# Score 1 descriptions should differ
must_score1 = must_rubric.dimensions[0].score_descriptions[1]
may_score1 = may_rubric.dimensions[0].score_descriptions[1]
self.assertIn("Non-compliant", must_score1)
self.assertIn("acceptable", may_score1.lower())
class TestWeightNormalization(unittest.TestCase): """Tests for weight normalization."""
def setUp(self):
"""Set up test fixtures."""
self.generator = RubricGenerator(normalize_weights=True)
def test_weights_sum_to_one(self):
"""Test that normalized weights sum to 1.0."""
constraints = [
Constraint("Must X", ConstraintLevel.MUST, "Decision", []),
Constraint("Should Y", ConstraintLevel.SHOULD, "Decision", []),
Constraint("May Z", ConstraintLevel.MAY, "Decision", []),
]
rubric = self.generator.generate_rubric(constraints, {}, "test.md")
total_weight = sum(d.weight for d in rubric.dimensions)
self.assertAlmostEqual(total_weight, 1.0, places=3)
def test_must_has_higher_weight(self):
"""Test that MUST constraints have higher weight than MAY."""
constraints = [
Constraint("Must X", ConstraintLevel.MUST, "Decision", []),
Constraint("May Y", ConstraintLevel.MAY, "Decision", []),
]
rubric = self.generator.generate_rubric(constraints, {}, "test.md")
must_dim = next(d for d in rubric.dimensions if d.constraint_level == ConstraintLevel.MUST)
may_dim = next(d for d in rubric.dimensions if d.constraint_level == ConstraintLevel.MAY)
self.assertGreater(must_dim.weight, may_dim.weight)
class TestEdgeCases(unittest.TestCase): """Tests for edge cases."""
def setUp(self):
"""Set up test fixtures."""
self.parser = ADRParser()
self.generator = RubricGenerator()
def test_no_constraints_adr(self):
"""Test handling ADR with no constraints."""
content = """
Context
Some background information.
Decision
We chose option A.
Consequences
This affects the system. """ constraints = self.parser.extract_constraints(content) # Should return empty list, not error self.assertEqual(len(constraints), 0)
def test_empty_content(self):
"""Test handling empty content."""
constraints = self.parser.extract_constraints("")
self.assertEqual(len(constraints), 0)
def test_malformed_yaml_frontmatter(self):
"""Test handling malformed YAML frontmatter."""
content = """---
title: Test invalid yaml here: : :
Decision
We must do X. """ # Should not raise, just skip frontmatter with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(content) f.flush() try: frontmatter, body = self.parser.parse_adr(Path(f.name)) # Should still extract constraints from body constraints = self.parser.extract_constraints(body) self.assertGreaterEqual(len(constraints), 0) finally: os.unlink(f.name)
def test_constraint_with_special_characters(self):
"""Test constraints containing special characters."""
content = """
Decision
We must use regex patterns like ^[a-zA-Z0-9]+$ for validation.
The API must return JSON with keys like "user_id" and "created_at".
"""
constraints = self.parser.extract_constraints(content)
self.assertGreaterEqual(len(constraints), 1)
def test_very_long_constraint(self):
"""Test handling very long constraint text."""
long_text = "We must " + "implement very detailed requirements " * 50
content = f"""
Decision
{long_text} """ constraints = self.parser.extract_constraints(content) self.assertGreaterEqual(len(constraints), 1)
class TestRubricMerger(unittest.TestCase): """Tests for RubricMerger class."""
def setUp(self):
"""Set up test fixtures."""
self.merger = RubricMerger()
def test_merger_initialization(self):
"""Test merger initializes correctly."""
self.assertIsNotNone(self.merger.config)
self.assertIsNotNone(self.merger.persona_loader)
self.assertIsNotNone(self.merger.rubrics_dir)
def test_load_rubric_index(self):
"""Test loading rubric index."""
index = self.merger.load_rubric_index()
self.assertIn("rubrics", index)
def test_load_existing_rubric(self):
"""Test loading an existing rubric."""
# This depends on rubrics being generated
index = self.merger.load_rubric_index()
if index.get("rubrics"):
rubric_id = index["rubrics"][0]
rubric = self.merger.load_rubric(rubric_id)
self.assertIsNotNone(rubric)
self.assertIn("dimensions", rubric)
def test_load_nonexistent_rubric(self):
"""Test loading a non-existent rubric."""
rubric = self.merger.load_rubric("nonexistent_rubric_xyz")
self.assertIsNone(rubric)
class TestMergeStrategies(unittest.TestCase): """Tests for different merge strategies."""
def setUp(self):
"""Set up test fixtures."""
self.merger = RubricMerger()
def test_weighted_append_strategy(self):
"""Test weighted append merge strategy."""
config = MergeConfig(
strategy=MergeStrategy.WEIGHTED_APPEND,
adr_weight_factor=0.5
)
result = merge_rubrics("technical_architect", ["ADR-009"], MergeStrategy.WEIGHTED_APPEND)
self.assertIsInstance(result, MergeResult)
self.assertEqual(result.persona_id, "technical_architect")
self.assertGreater(result.merged_dimension_count, 0)
def test_append_strategy(self):
"""Test simple append merge strategy."""
result = merge_rubrics("technical_architect", ["ADR-009"], MergeStrategy.APPEND)
self.assertIsInstance(result, MergeResult)
# Append should include all dimensions
self.assertEqual(
result.merged_dimension_count,
result.base_dimension_count + result.adr_dimension_count
)
def test_merge_normalizes_weights(self):
"""Test that merge normalizes weights to sum to 1.0."""
result = merge_rubrics("technical_architect", ["ADR-009"])
total = sum(d.weight for d in result.merged_dimensions)
self.assertAlmostEqual(total, 1.0, places=2)
class TestMergeConvenienceFunctions(unittest.TestCase): """Tests for merge convenience functions."""
def test_merge_rubrics_function(self):
"""Test merge_rubrics convenience function."""
result = merge_rubrics("technical_architect")
self.assertIsInstance(result, MergeResult)
self.assertEqual(result.persona_id, "technical_architect")
def test_merge_persona_with_adrs_function(self):
"""Test merge_persona_with_adrs convenience function."""
result = merge_persona_with_adrs("security_analyst")
self.assertIsInstance(result, dict)
self.assertIn("persona_id", result)
self.assertIn("merged_dimensions", result)
def test_get_default_merger(self):
"""Test get_default_merger returns singleton."""
merger1 = get_default_merger()
merger2 = get_default_merger()
self.assertIs(merger1, merger2)
class TestMergedDimension(unittest.TestCase): """Tests for MergedDimension class."""
def test_from_adr_dimension(self):
"""Test creating MergedDimension from ADR dimension dict."""
adr_dim = {
"id": "dim_001",
"name": "Test Dimension",
"weight": 0.25,
"scale": [1, 2, 3],
"score_descriptions": {"1": "Bad", "2": "Ok", "3": "Good"},
"evaluation_steps": ["Step 1", "Step 2"],
"constraint_level": "MUST",
}
merged = MergedDimension.from_adr_dimension(adr_dim, "ADR-001.md")
self.assertEqual(merged.id, "dim_001")
self.assertEqual(merged.name, "Test Dimension")
self.assertEqual(merged.source, "adr")
self.assertEqual(merged.source_adr, "ADR-001.md")
self.assertEqual(merged.constraint_level, "MUST")
def test_to_dict(self):
"""Test MergedDimension serialization."""
merged = MergedDimension(
id="test_id",
name="Test",
weight=0.5,
scale=[1, 2, 3],
score_descriptions={1: "Low", 2: "Med", 3: "High"},
evaluation_steps=["Evaluate"],
source="base",
)
result = merged.to_dict()
self.assertIsInstance(result, dict)
self.assertEqual(result["id"], "test_id")
self.assertEqual(result["weight"], 0.5)
class TestMergeConfig(unittest.TestCase): """Tests for MergeConfig class."""
def test_default_config(self):
"""Test default merge configuration."""
config = MergeConfig()
self.assertEqual(config.strategy, MergeStrategy.WEIGHTED_APPEND)
self.assertEqual(config.conflict_resolution, ConflictResolution.MERGE_SCORES)
self.assertEqual(config.adr_weight_factor, 0.6)
self.assertEqual(config.max_total_dimensions, 15)
def test_custom_config(self):
"""Test custom merge configuration."""
config = MergeConfig(
strategy=MergeStrategy.ADR_PRIORITY,
adr_weight_factor=0.8,
max_total_dimensions=20,
)
self.assertEqual(config.strategy, MergeStrategy.ADR_PRIORITY)
self.assertEqual(config.adr_weight_factor, 0.8)
self.assertEqual(config.max_total_dimensions, 20)
class TestConstraintLevel(unittest.TestCase): """Tests for ConstraintLevel enum."""
def test_constraint_weights(self):
"""Test constraint level weights."""
self.assertEqual(ConstraintLevel.MUST.weight, 0.50)
self.assertEqual(ConstraintLevel.SHOULD.weight, 0.35)
self.assertEqual(ConstraintLevel.MAY.weight, 0.15)
def test_failure_severity(self):
"""Test constraint level failure severities."""
self.assertEqual(ConstraintLevel.MUST.failure_severity, "CRITICAL")
self.assertEqual(ConstraintLevel.SHOULD.failure_severity, "MAJOR")
self.assertEqual(ConstraintLevel.MAY.failure_severity, "MINOR")
class TestBatchProcessing(unittest.TestCase): """Tests for batch processing functionality."""
def test_process_real_adrs_directory(self):
"""Test processing the actual ADRs directory."""
cli = ADRRubricCLI()
adr_dir = Path(__file__).parent.parent.parent / "internal" / "architecture" / "adrs"
if adr_dir.exists():
with tempfile.TemporaryDirectory() as tmpdir:
stats = cli.process_directory(adr_dir, Path(tmpdir), dry_run=True)
self.assertIn("total_adrs", stats)
self.assertIn("adrs_with_constraints", stats)
self.assertIn("total_constraints", stats)
self.assertGreater(stats["total_adrs"], 0)
class TestRubricSerialization(unittest.TestCase): """Tests for rubric serialization."""
def test_generated_rubric_to_dict(self):
"""Test GeneratedRubric serialization."""
generator = RubricGenerator()
constraints = [
Constraint("Must test", ConstraintLevel.MUST, "Decision", []),
]
rubric = generator.generate_rubric(constraints, {"title": "Test"}, "test.md")
result = rubric.to_dict()
self.assertIsInstance(result, dict)
self.assertIn("rubric_id", result)
self.assertIn("dimensions", result)
self.assertIn("constraint_summary", result)
def test_rubric_json_serializable(self):
"""Test that rubric can be serialized to JSON."""
generator = RubricGenerator()
constraints = [
Constraint("Must test", ConstraintLevel.MUST, "Decision", ["API"]),
]
rubric = generator.generate_rubric(constraints, {}, "test.md")
# Should not raise
json_str = json.dumps(rubric.to_dict())
self.assertIsInstance(json_str, str)
# Should be parseable
parsed = json.loads(json_str)
self.assertIn("dimensions", parsed)
if name == 'main': unittest.main()