#!/usr/bin/env python3 """ CODITECT Ralph Wiggum MoE Evaluation (H.8.7.5)
Automated quality evaluation of all Ralph Wiggum components using Mixture-of-Experts scoring across multiple dimensions.
Evaluates:
- API consistency across modules
- Error handling patterns
- Test coverage completeness
- ADR compliance (108, 109, 110, 111)
- Code documentation quality
- Cross-module integration coherence
Usage: python -m scripts.core.ralph_wiggum.moe_evaluation [--verbose] [--json]
Author: CODITECT Framework Version: 1.0.0 Created: 2026-02-17 Task Reference: H.8.7.5 ADR References: ADR-108, ADR-109, ADR-110, ADR-111 """
import ast import importlib import inspect import json import logging import os import re import sys from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger("ralph-moe-evaluation")
---------------------------------------------------------------------------
Module registry — all Ralph Wiggum modules to evaluate
---------------------------------------------------------------------------
RALPH_MODULES = [ "checkpoint_protocol", "health_monitoring", "token_economics", "loop_orchestrator", "termination_criteria", "pilot_integration", "browser_automation", ]
ADR_REQUIREMENTS = { "checkpoint_protocol": { "adr": "ADR-108", "required_classes": [ "Checkpoint", "CheckpointService", "HandoffProtocol", ], "required_patterns": [ "compute_hash", "verify_integrity", "continuation_prompt", ], }, "health_monitoring": { "adr": "ADR-110", "required_classes": [ "HealthMonitoringService", "CircuitBreaker", "RecoveryService", ], "required_patterns": [ "register_agent", "record_heartbeat", "evaluate_health", "circuit_breaker", ], }, "token_economics": { "adr": "ADR-111", "required_classes": [ "TokenEconomicsService", "Budget", "BudgetCheckResult", ], "required_patterns": [ "record_consumption", "check_budget", "efficiency_metrics", ], }, "browser_automation": { "adr": "ADR-109", "required_classes": [ "QAAgentBrowserTools", "FlowStep", ], "required_patterns": [ "verify_page", "flow_step", ], }, "loop_orchestrator": { "adr": "H.8.6", "required_classes": [ "LoopOrchestrator", "LoopConfig", "LoopStatus", ], "required_patterns": [ "initialize", "plan_iteration", "record_iteration", "evaluate_termination", "check_handoff", ], }, "termination_criteria": { "adr": "H.8.6.5", "required_classes": [ "TerminationCriteria", "TerminationResult", ], "required_patterns": [ "evaluate", "should_terminate", ], }, "pilot_integration": { "adr": "H.8.6.4", "required_classes": [ "PilotTaskExtractor", "PilotTask", "TaskGroup", ], "required_patterns": [ "parse_track", "get_pending", "generate_loop", ], }, }
---------------------------------------------------------------------------
Scoring dataclasses
---------------------------------------------------------------------------
@dataclass class DimensionScore: """Score for a single evaluation dimension.""" dimension: str score: float # 0.0 to 1.0 max_score: float # 1.0 findings: List[str] = field(default_factory=list) deductions: List[str] = field(default_factory=list)
@property
def percentage(self) -> int:
if self.max_score == 0:
return 0
return int(self.score / self.max_score * 100)
@property
def grade(self) -> str:
pct = self.percentage
if pct >= 90:
return "A"
elif pct >= 80:
return "B"
elif pct >= 70:
return "C"
elif pct >= 60:
return "D"
return "F"
@dataclass class ModuleScore: """Evaluation score for a single module.""" module_name: str dimensions: List[DimensionScore] = field(default_factory=list) overall_score: float = 0.0 overall_grade: str = "" line_count: int = 0 class_count: int = 0 function_count: int = 0
def compute_overall(self) -> None:
if not self.dimensions:
return
total = sum(d.score for d in self.dimensions)
max_total = sum(d.max_score for d in self.dimensions)
self.overall_score = total / max_total if max_total > 0 else 0.0
pct = int(self.overall_score * 100)
if pct >= 90:
self.overall_grade = "A"
elif pct >= 80:
self.overall_grade = "B"
elif pct >= 70:
self.overall_grade = "C"
elif pct >= 60:
self.overall_grade = "D"
else:
self.overall_grade = "F"
@dataclass class EvaluationReport: """Complete MoE evaluation report.""" timestamp: str = "" modules: List[ModuleScore] = field(default_factory=list) aggregate_score: float = 0.0 aggregate_grade: str = "" total_lines: int = 0 total_classes: int = 0 total_functions: int = 0 total_tests: int = 0 recommendations: List[str] = field(default_factory=list)
def compute_aggregate(self) -> None:
if not self.modules:
return
self.aggregate_score = sum(
m.overall_score for m in self.modules
) / len(self.modules)
self.total_lines = sum(m.line_count for m in self.modules)
self.total_classes = sum(m.class_count for m in self.modules)
self.total_functions = sum(m.function_count for m in self.modules)
pct = int(self.aggregate_score * 100)
if pct >= 90:
self.aggregate_grade = "A"
elif pct >= 80:
self.aggregate_grade = "B"
elif pct >= 70:
self.aggregate_grade = "C"
elif pct >= 60:
self.aggregate_grade = "D"
else:
self.aggregate_grade = "F"
---------------------------------------------------------------------------
Evaluation dimensions (Experts)
---------------------------------------------------------------------------
def _get_module_path(module_name: str) -> Path: """Get filesystem path for a Ralph Wiggum module.""" return ( Path(file).parent / f"{module_name}.py" )
def _parse_ast(module_name: str) -> Optional[ast.Module]: """Parse module source into AST.""" path = _get_module_path(module_name) if not path.exists(): return None return ast.parse(path.read_text())
def _get_source(module_name: str) -> str: """Get raw source text for a module.""" path = _get_module_path(module_name) if not path.exists(): return "" return path.read_text()
class APIConsistencyExpert: """ Expert 1: API Consistency
Evaluates naming conventions, parameter patterns, return types,
and public API surface consistency across modules.
"""
def evaluate(self, module_name: str) -> DimensionScore:
score = DimensionScore(
dimension="api_consistency",
score=1.0,
max_score=1.0,
)
tree = _parse_ast(module_name)
if not tree:
score.score = 0.0
score.deductions.append("Module file not found")
return score
classes = [
n for n in ast.walk(tree) if isinstance(n, ast.ClassDef)
]
functions = [
n for n in ast.walk(tree)
if isinstance(n, ast.FunctionDef | ast.AsyncFunctionDef)
]
# Check 1: All public methods have type annotations
unannotated = []
for func in functions:
if func.name.startswith("_"):
continue
if not func.returns:
unannotated.append(func.name)
if unannotated:
deduction = min(0.15, len(unannotated) * 0.03)
score.score -= deduction
score.deductions.append(
f"{len(unannotated)} public functions without return type: "
f"{', '.join(unannotated[:5])}"
)
# Check 2: Consistent naming (snake_case for functions)
bad_names = []
for func in functions:
if func.name.startswith("__"):
continue
if not re.match(r"^[a-z_][a-z0-9_]*$", func.name):
bad_names.append(func.name)
if bad_names:
score.score -= 0.1
score.deductions.append(
f"Non-snake_case function names: {', '.join(bad_names[:5])}"
)
# Check 3: Dataclass usage (prefer dataclasses over plain dicts)
dataclass_count = sum(
1 for c in classes
if any(
isinstance(d, ast.Name) and d.id == "dataclass"
for d in c.decorator_list
)
or any(
isinstance(d, ast.Attribute) and d.attr == "dataclass"
for d in c.decorator_list
)
or any(
isinstance(d, ast.Call)
and isinstance(d.func, ast.Name)
and d.func.id == "dataclass"
for d in c.decorator_list
)
)
if classes and dataclass_count / len(classes) < 0.3:
score.score -= 0.05
score.deductions.append(
f"Low dataclass usage: {dataclass_count}/{len(classes)} classes"
)
# Check 4: Consistent to_dict/from_dict patterns
has_serialization = any(
f.name in ("to_dict", "from_dict", "to_json", "from_json")
for f in functions
)
if has_serialization:
score.findings.append("Has serialization methods")
score.score = max(0.0, score.score)
return score
class ErrorHandlingExpert: """ Expert 2: Error Handling
Evaluates exception handling, logging patterns, and graceful
degradation across modules.
"""
def evaluate(self, module_name: str) -> DimensionScore:
score = DimensionScore(
dimension="error_handling",
score=1.0,
max_score=1.0,
)
source = _get_source(module_name)
tree = _parse_ast(module_name)
if not tree or not source:
score.score = 0.0
score.deductions.append("Module file not found")
return score
# Check 1: Has logging configured
if "logging.getLogger" not in source:
score.score -= 0.15
score.deductions.append("No logging configured")
else:
score.findings.append("Logging configured")
# Check 2: Try/except blocks present
try_blocks = [
n for n in ast.walk(tree) if isinstance(n, ast.Try)
]
functions = [
n for n in ast.walk(tree)
if isinstance(n, ast.FunctionDef | ast.AsyncFunctionDef)
]
if functions and len(try_blocks) < len(functions) * 0.1:
score.score -= 0.1
score.deductions.append(
f"Low try/except coverage: {len(try_blocks)} blocks "
f"in {len(functions)} functions"
)
# Check 3: No bare except clauses
bare_excepts = 0
for try_block in try_blocks:
for handler in try_block.handlers:
if handler.type is None:
bare_excepts += 1
if bare_excepts:
score.score -= bare_excepts * 0.1
score.deductions.append(
f"{bare_excepts} bare except clauses (catch-all)"
)
# Check 4: Custom exceptions defined
custom_exceptions = [
c for c in ast.walk(tree)
if isinstance(c, ast.ClassDef) and any(
isinstance(b, ast.Name) and "Error" in (b.id or "")
for b in c.bases
)
]
if custom_exceptions:
score.findings.append(
f"{len(custom_exceptions)} custom exception classes"
)
# Check 5: Logger.warning/error usage for error paths
warn_count = source.count("logger.warning")
error_count = source.count("logger.error")
if warn_count + error_count == 0 and len(try_blocks) > 0:
score.score -= 0.1
score.deductions.append(
"Try/except blocks present but no warning/error logging"
)
score.score = max(0.0, score.score)
return score
class DocumentationExpert: """ Expert 3: Documentation Quality
Evaluates module docstrings, class docstrings, function docstrings,
and inline comments.
"""
def evaluate(self, module_name: str) -> DimensionScore:
score = DimensionScore(
dimension="documentation",
score=1.0,
max_score=1.0,
)
source = _get_source(module_name)
tree = _parse_ast(module_name)
if not tree or not source:
score.score = 0.0
score.deductions.append("Module file not found")
return score
# Check 1: Module docstring
if not ast.get_docstring(tree):
score.score -= 0.15
score.deductions.append("Missing module docstring")
else:
docstring = ast.get_docstring(tree) or ""
if len(docstring) < 50:
score.score -= 0.05
score.deductions.append("Module docstring too short")
else:
score.findings.append("Module docstring present")
# Check 2: Class docstrings
classes = [
n for n in ast.walk(tree) if isinstance(n, ast.ClassDef)
]
undocumented_classes = [
c.name for c in classes
if not ast.get_docstring(c)
]
if undocumented_classes:
deduction = min(0.15, len(undocumented_classes) * 0.05)
score.score -= deduction
score.deductions.append(
f"{len(undocumented_classes)} undocumented classes: "
f"{', '.join(undocumented_classes[:3])}"
)
# Check 3: Public function docstrings
functions = [
n for n in ast.walk(tree)
if isinstance(n, ast.FunctionDef | ast.AsyncFunctionDef)
and not n.name.startswith("_")
]
undocumented_funcs = [
f.name for f in functions
if not ast.get_docstring(f)
]
if functions and len(undocumented_funcs) / len(functions) > 0.3:
deduction = min(0.2, len(undocumented_funcs) * 0.02)
score.score -= deduction
score.deductions.append(
f"{len(undocumented_funcs)}/{len(functions)} public functions "
f"undocumented"
)
# Check 4: ADR references in module docstring
module_doc = ast.get_docstring(tree) or ""
if "ADR" in module_doc or "adr" in module_doc.lower():
score.findings.append("ADR references in docstring")
elif module_name in ADR_REQUIREMENTS:
adr = ADR_REQUIREMENTS[module_name]["adr"]
if adr.startswith("ADR"):
score.score -= 0.05
score.deductions.append(
f"Missing {adr} reference in module docstring"
)
score.score = max(0.0, score.score)
return score
class ADRComplianceExpert: """ Expert 4: ADR Compliance
Evaluates whether each module implements the classes and patterns
required by its governing ADR.
"""
def evaluate(self, module_name: str) -> DimensionScore:
score = DimensionScore(
dimension="adr_compliance",
score=1.0,
max_score=1.0,
)
source = _get_source(module_name)
tree = _parse_ast(module_name)
if not tree or not source:
score.score = 0.0
score.deductions.append("Module file not found")
return score
reqs = ADR_REQUIREMENTS.get(module_name)
if not reqs:
score.findings.append("No ADR requirements defined")
return score
# Check 1: Required classes present
classes = {
n.name for n in ast.walk(tree) if isinstance(n, ast.ClassDef)
}
missing_classes = [
c for c in reqs["required_classes"] if c not in classes
]
if missing_classes:
deduction = len(missing_classes) * 0.15
score.score -= deduction
score.deductions.append(
f"Missing required classes: {', '.join(missing_classes)}"
)
else:
score.findings.append(
f"All {len(reqs['required_classes'])} required classes present"
)
# Check 2: Required patterns present in source
missing_patterns = [
p for p in reqs["required_patterns"] if p not in source
]
if missing_patterns:
deduction = len(missing_patterns) * 0.1
score.score -= deduction
score.deductions.append(
f"Missing required patterns: {', '.join(missing_patterns)}"
)
else:
score.findings.append(
f"All {len(reqs['required_patterns'])} required patterns found"
)
score.score = max(0.0, score.score)
return score
class TestCoverageExpert: """ Expert 5: Test Coverage
Evaluates test file existence, test count, and coverage of
public API surface.
"""
TEST_DIR = Path(__file__).parent.parent.parent.parent / "tests" / "core"
def evaluate(self, module_name: str) -> DimensionScore:
score = DimensionScore(
dimension="test_coverage",
score=1.0,
max_score=1.0,
)
source = _get_source(module_name)
tree = _parse_ast(module_name)
if not tree or not source:
score.score = 0.0
score.deductions.append("Module file not found")
return score
# Find test files for this module
test_files = self._find_test_files(module_name)
# Check 1: Test file exists
if not test_files:
score.score -= 0.3
score.deductions.append(f"No test file found for {module_name}")
return score
score.findings.append(
f"Test files: {', '.join(f.name for f in test_files)}"
)
# Check 2: Count test functions
total_tests = 0
for tf in test_files:
test_tree = ast.parse(tf.read_text())
test_funcs = [
n for n in ast.walk(test_tree)
if isinstance(n, ast.FunctionDef | ast.AsyncFunctionDef)
and n.name.startswith("test_")
]
total_tests += len(test_funcs)
score.findings.append(f"{total_tests} test functions")
# Check 3: Coverage ratio — public functions vs tests
public_funcs = [
n for n in ast.walk(tree)
if isinstance(n, ast.FunctionDef | ast.AsyncFunctionDef)
and not n.name.startswith("_")
]
if public_funcs:
ratio = total_tests / len(public_funcs)
if ratio < 1.0:
deduction = min(0.2, (1.0 - ratio) * 0.3)
score.score -= deduction
score.deductions.append(
f"Test-to-function ratio: {ratio:.1f} "
f"({total_tests} tests / {len(public_funcs)} public funcs)"
)
else:
score.findings.append(
f"Good test ratio: {ratio:.1f}x "
f"({total_tests} tests / {len(public_funcs)} public funcs)"
)
score.score = max(0.0, score.score)
return score
def _find_test_files(self, module_name: str) -> List[Path]:
"""Find test files that cover a given module."""
test_dir = self.TEST_DIR
if not test_dir.exists():
return []
# Direct name match
direct = test_dir / f"test_{module_name}.py"
results = []
if direct.exists():
results.append(direct)
# Check integration test file
integration = test_dir / "test_ralph_wiggum_integration.py"
if integration.exists():
source = integration.read_text()
# Check if module classes are referenced
if module_name.replace("_", "") in source.lower():
results.append(integration)
return results
class IntegrationCoherenceExpert: """ Expert 6: Integration Coherence
Evaluates how well the module integrates with other Ralph Wiggum
components — imports, cross-references, and shared patterns.
"""
def evaluate(self, module_name: str) -> DimensionScore:
score = DimensionScore(
dimension="integration_coherence",
score=1.0,
max_score=1.0,
)
source = _get_source(module_name)
if not source:
score.score = 0.0
score.deductions.append("Module file not found")
return score
# Check 1: Imports from other Ralph Wiggum modules
cross_imports = []
for other in RALPH_MODULES:
if other == module_name:
continue
if f"from .{other}" in source or f"import {other}" in source:
cross_imports.append(other)
if cross_imports:
score.findings.append(
f"Cross-imports: {', '.join(cross_imports)}"
)
# Check 2: __init__.py exports this module
init_path = Path(__file__).parent / "__init__.py"
if init_path.exists():
init_source = init_path.read_text()
if f"from .{module_name}" in init_source:
score.findings.append("Exported via __init__.py")
else:
score.score -= 0.1
score.deductions.append("Not exported via __init__.py")
# Check 3: Consistent state directory usage
if "STATE_DIR" in source or "state_dir" in source:
score.findings.append("Uses state directory pattern")
if "CODITECT_DATA" in source or ".coditect-data" in source:
score.findings.append("Uses CODITECT data directory")
# Check 4: Async consistency — if module uses async, check
# that it uses async throughout (not mixed sync/async)
async_count = source.count("async def ")
sync_count = source.count("def ") - async_count
if async_count > 0 and sync_count > 0:
# Acceptable: some sync helpers with async main methods
ratio = async_count / (async_count + sync_count)
if 0.2 < ratio < 0.8:
score.findings.append(
f"Mixed sync/async: {async_count} async, {sync_count} sync"
)
score.score = max(0.0, score.score)
return score
---------------------------------------------------------------------------
Main evaluator
---------------------------------------------------------------------------
class RalphWiggumEvaluator: """ MoE evaluator for Ralph Wiggum components.
Runs 6 expert evaluations across all modules and produces
a comprehensive quality report.
"""
EXPERTS = [
APIConsistencyExpert(),
ErrorHandlingExpert(),
DocumentationExpert(),
ADRComplianceExpert(),
TestCoverageExpert(),
IntegrationCoherenceExpert(),
]
def evaluate_module(self, module_name: str) -> ModuleScore:
"""Evaluate a single module across all dimensions."""
mod_score = ModuleScore(module_name=module_name)
# Get basic stats
source = _get_source(module_name)
tree = _parse_ast(module_name)
if source:
mod_score.line_count = len(source.splitlines())
if tree:
mod_score.class_count = sum(
1 for n in ast.walk(tree) if isinstance(n, ast.ClassDef)
)
mod_score.function_count = sum(
1 for n in ast.walk(tree)
if isinstance(n, ast.FunctionDef | ast.AsyncFunctionDef)
)
# Run each expert
for expert in self.EXPERTS:
dim_score = expert.evaluate(module_name)
mod_score.dimensions.append(dim_score)
mod_score.compute_overall()
return mod_score
def evaluate_all(self) -> EvaluationReport:
"""Evaluate all Ralph Wiggum modules."""
report = EvaluationReport(
timestamp=datetime.now(timezone.utc).isoformat(),
)
for module_name in RALPH_MODULES:
mod_score = self.evaluate_module(module_name)
report.modules.append(mod_score)
# Count tests
test_dir = TestCoverageExpert.TEST_DIR
if test_dir.exists():
for tf in test_dir.glob("test_*.py"):
source = tf.read_text()
test_tree = ast.parse(source)
report.total_tests += sum(
1 for n in ast.walk(test_tree)
if isinstance(n, ast.FunctionDef | ast.AsyncFunctionDef)
and n.name.startswith("test_")
)
report.compute_aggregate()
report.recommendations = self._generate_recommendations(report)
return report
def _generate_recommendations(
self, report: EvaluationReport
) -> List[str]:
"""Generate actionable recommendations from evaluation."""
recs = []
for mod in report.modules:
for dim in mod.dimensions:
if dim.percentage < 70:
recs.append(
f"{mod.module_name}/{dim.dimension}: "
f"Score {dim.percentage}% — "
f"{'; '.join(dim.deductions[:2])}"
)
# Cross-module recommendations
low_modules = [m for m in report.modules if m.overall_score < 0.7]
if low_modules:
recs.append(
f"Priority: Improve {', '.join(m.module_name for m in low_modules)}"
)
if not recs:
recs.append("All modules meet quality thresholds. Maintain current standards.")
return recs
---------------------------------------------------------------------------
Report formatting
---------------------------------------------------------------------------
def format_report_text(report: EvaluationReport) -> str: """Format evaluation report as readable text.""" lines = [ "=" * 60, "RALPH WIGGUM MOE EVALUATION REPORT", f"Timestamp: {report.timestamp}", "=" * 60, "", f"AGGREGATE: {report.aggregate_grade} ({int(report.aggregate_score * 100)}%)", f"Modules: {len(report.modules)} | Lines: {report.total_lines:,} | " f"Classes: {report.total_classes} | Functions: {report.total_functions} | " f"Tests: {report.total_tests}", "", "-" * 60, "MODULE SCORES", "-" * 60, ]
for mod in report.modules:
lines.append(
f"\n {mod.module_name}: {mod.overall_grade} "
f"({int(mod.overall_score * 100)}%) "
f"— {mod.line_count} lines, {mod.class_count} classes, "
f"{mod.function_count} functions"
)
for dim in mod.dimensions:
indicator = "+" if dim.percentage >= 80 else "-" if dim.percentage < 70 else "~"
lines.append(
f" [{indicator}] {dim.dimension}: {dim.percentage}%"
)
for finding in dim.findings[:2]:
lines.append(f" {finding}")
for deduction in dim.deductions[:2]:
lines.append(f" ! {deduction}")
lines.extend([
"",
"-" * 60,
"RECOMMENDATIONS",
"-" * 60,
])
for i, rec in enumerate(report.recommendations, 1):
lines.append(f" {i}. {rec}")
lines.extend(["", "=" * 60])
return "\n".join(lines)
def format_report_json(report: EvaluationReport) -> str: """Format evaluation report as JSON.""" data = { "timestamp": report.timestamp, "aggregate_score": round(report.aggregate_score, 3), "aggregate_grade": report.aggregate_grade, "total_lines": report.total_lines, "total_classes": report.total_classes, "total_functions": report.total_functions, "total_tests": report.total_tests, "modules": [], "recommendations": report.recommendations, } for mod in report.modules: mod_data = { "name": mod.module_name, "score": round(mod.overall_score, 3), "grade": mod.overall_grade, "lines": mod.line_count, "classes": mod.class_count, "functions": mod.function_count, "dimensions": {}, } for dim in mod.dimensions: mod_data["dimensions"][dim.dimension] = { "score": round(dim.score, 3), "percentage": dim.percentage, "grade": dim.grade, "findings": dim.findings, "deductions": dim.deductions, } data["modules"].append(mod_data) return json.dumps(data, indent=2)
---------------------------------------------------------------------------
CLI
---------------------------------------------------------------------------
def main() -> int: """Run MoE evaluation from command line.""" import argparse
parser = argparse.ArgumentParser(
description="Ralph Wiggum MoE Component Evaluation"
)
parser.add_argument(
"--verbose", "-v", action="store_true",
help="Show detailed dimension scores",
)
parser.add_argument(
"--json", action="store_true",
help="Output as JSON",
)
parser.add_argument(
"--module", "-m",
help="Evaluate a single module (default: all)",
)
args = parser.parse_args()
evaluator = RalphWiggumEvaluator()
if args.module:
if args.module not in RALPH_MODULES:
print(f"Unknown module: {args.module}")
print(f"Available: {', '.join(RALPH_MODULES)}")
return 1
mod_score = evaluator.evaluate_module(args.module)
report = EvaluationReport(
timestamp=datetime.now(timezone.utc).isoformat(),
modules=[mod_score],
)
report.compute_aggregate()
else:
report = evaluator.evaluate_all()
if args.json:
print(format_report_json(report))
else:
print(format_report_text(report))
# Exit code: 0 if aggregate >= C (70%), 1 otherwise
return 0 if report.aggregate_score >= 0.7 else 1
if name == "main": sys.exit(main())