scripts-test-provenance
""" Tests for Provenance Enhancement (H.3.4).
Tests cover:
- JudgeDecision provenance fields (H.3.4.1)
- ConsensusResult provenance chain and dissent (H.3.4.2-H.3.4.4)
- ClassificationResult.to_dict() with provenance (H.3.4.5) """
import unittest from datetime import datetime, timezone, timedelta from typing import Dict, List import json import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(file)))
from moe_classifier.core.models import ( JudgeDecision, ConsensusResult, ClassificationResult, DissentingView, ApprovalType, AnalystVote, )
class TestJudgeDecisionProvenance(unittest.TestCase): """Tests for JudgeDecision provenance fields (H.3.4.1)."""
def test_default_timestamp(self):
"""Test JudgeDecision gets timestamp on creation."""
decision = JudgeDecision(
judge="technical_architect",
approved=True,
reason="Good architecture"
)
self.assertIsNotNone(decision.timestamp)
self.assertIsInstance(decision.timestamp, datetime)
def test_model_used_field(self):
"""Test model_used field."""
decision = JudgeDecision(
judge="security_analyst",
approved=False,
reason="Security issues",
model_used="claude-sonnet-4"
)
self.assertEqual(decision.model_used, "claude-sonnet-4")
def test_token_usage_field(self):
"""Test token_usage field."""
decision = JudgeDecision(
judge="qa_evaluator",
approved=True,
reason="Tests pass",
token_usage=1500
)
self.assertEqual(decision.token_usage, 1500)
def test_raw_response_field(self):
"""Test raw_response field for audit trail."""
raw = '{"verdict": "PASS", "rationale": "..."}'
decision = JudgeDecision(
judge="compliance_auditor",
approved=True,
reason="Compliant",
raw_response=raw
)
self.assertEqual(decision.raw_response, raw)
def test_evaluation_timestamps(self):
"""Test evaluation start/end timestamps."""
start = datetime.now(timezone.utc)
end = start + timedelta(seconds=2)
decision = JudgeDecision(
judge="technical_architect",
approved=True,
reason="OK",
evaluation_start_time=start,
evaluation_end_time=end
)
self.assertEqual(decision.evaluation_start_time, start)
self.assertEqual(decision.evaluation_end_time, end)
def test_latency_ms_from_timestamps(self):
"""Test latency_ms calculated from start/end times."""
start = datetime.now(timezone.utc)
end = start + timedelta(milliseconds=1500)
decision = JudgeDecision(
judge="technical_architect",
approved=True,
reason="OK",
evaluation_start_time=start,
evaluation_end_time=end
)
self.assertEqual(decision.latency_ms, 1500)
def test_latency_ms_fallback_to_duration(self):
"""Test latency_ms falls back to duration_ms."""
decision = JudgeDecision(
judge="technical_architect",
approved=True,
reason="OK",
duration_ms=2000
)
self.assertEqual(decision.latency_ms, 2000)
def test_dimension_scores_field(self):
"""Test dimension_scores field."""
decision = JudgeDecision(
judge="technical_architect",
approved=True,
reason="Good design",
dimension_scores={
"architectural_soundness": 3,
"design_patterns": 2,
"error_handling": 3
}
)
self.assertEqual(decision.dimension_scores["architectural_soundness"], 3)
self.assertEqual(len(decision.dimension_scores), 3)
def test_to_provenance_dict(self):
"""Test conversion to provenance dictionary."""
decision = JudgeDecision(
judge="security_analyst",
approved=False,
reason="Vulnerabilities found in authentication",
confidence=0.85,
model_used="gpt-4o",
token_usage=1200,
dimension_scores={"security": 1, "performance": 2}
)
prov = decision.to_provenance_dict()
self.assertEqual(prov["judge"], "security_analyst")
self.assertEqual(prov["model"], "gpt-4o")
self.assertEqual(prov["approved"], False)
self.assertEqual(prov["confidence"], 0.85)
self.assertEqual(prov["token_usage"], 1200)
self.assertIn("timestamp", prov)
self.assertEqual(prov["dimension_scores"]["security"], 1)
def test_to_provenance_dict_truncates_reason(self):
"""Test long reasons are truncated in provenance."""
long_reason = "A" * 500
decision = JudgeDecision(
judge="judge1",
approved=True,
reason=long_reason
)
prov = decision.to_provenance_dict()
self.assertTrue(prov["reason_preview"].endswith("..."))
self.assertLessEqual(len(prov["reason_preview"]), 205) # 200 + "..."
class TestDissentingView(unittest.TestCase): """Tests for DissentingView dataclass (H.3.4.4)."""
def test_create_dissenting_view(self):
"""Test creating a dissenting view."""
view = DissentingView(
judge="security_analyst",
model_used="gpt-4o",
approved=False,
confidence=0.9,
reason="Security vulnerabilities detected",
key_concerns=["SQL injection risk", "Missing auth"],
dimension_scores={"security": 1}
)
self.assertEqual(view.judge, "security_analyst")
self.assertFalse(view.approved)
self.assertEqual(len(view.key_concerns), 2)
def test_to_dict(self):
"""Test DissentingView serialization."""
view = DissentingView(
judge="compliance_auditor",
model_used="claude-opus-4.5",
approved=False,
confidence=0.95,
reason="HIPAA violation",
key_concerns=["Missing encryption"]
)
d = view.to_dict()
self.assertEqual(d["judge"], "compliance_auditor")
self.assertEqual(d["model"], "claude-opus-4.5")
self.assertFalse(d["approved"])
self.assertEqual(d["confidence"], 0.95)
self.assertIn("HIPAA", d["reason"])
class TestConsensusResultProvenance(unittest.TestCase): """Tests for ConsensusResult provenance (H.3.4.2-H.3.4.4)."""
def setUp(self):
"""Set up test fixtures."""
self.decisions = [
JudgeDecision(
judge="technical_architect",
approved=True,
reason="Good architecture",
confidence=0.9,
model_used="claude-sonnet-4",
token_usage=1000
),
JudgeDecision(
judge="security_analyst",
approved=True,
reason="Secure implementation",
confidence=0.85,
model_used="gpt-4o",
token_usage=1200
),
JudgeDecision(
judge="compliance_auditor",
approved=False,
reason="Missing audit trail. Data retention policy unclear.",
confidence=0.9,
model_used="claude-opus-4.5",
token_usage=1500
),
]
def test_build_provenance_chain(self):
"""Test building provenance chain (H.3.4.3)."""
result = ConsensusResult(
classification="agent",
confidence=0.85,
agreement_ratio=0.67,
approval_type=ApprovalType.JUDGE_APPROVED,
judge_decisions=self.decisions
)
result.build_provenance_chain()
self.assertEqual(len(result.provenance_chain), 3)
self.assertEqual(result.provenance_chain[0]["judge"], "technical_architect")
self.assertEqual(result.provenance_chain[1]["model"], "gpt-4o")
def test_total_token_usage(self):
"""Test total token usage calculation."""
result = ConsensusResult(
classification="agent",
confidence=0.85,
agreement_ratio=0.67,
approval_type=ApprovalType.JUDGE_APPROVED,
judge_decisions=self.decisions
)
result.build_provenance_chain()
self.assertEqual(result.total_token_usage, 3700) # 1000 + 1200 + 1500
def test_total_latency_ms(self):
"""Test total latency calculation."""
decisions = [
JudgeDecision(
judge="j1", approved=True, reason="OK",
duration_ms=500
),
JudgeDecision(
judge="j2", approved=True, reason="OK",
duration_ms=700
),
]
result = ConsensusResult(
classification="agent",
confidence=0.9,
agreement_ratio=1.0,
approval_type=ApprovalType.JUDGE_APPROVED,
judge_decisions=decisions
)
result.build_provenance_chain()
self.assertEqual(result.total_latency_ms, 1200)
def test_extract_dissent(self):
"""Test extracting dissenting views (H.3.4.4)."""
result = ConsensusResult(
classification="agent",
confidence=0.85,
agreement_ratio=0.67,
approval_type=ApprovalType.JUDGE_APPROVED,
judge_decisions=self.decisions
)
# Final verdict is approved (2/3 approved)
result.extract_dissent(final_approved=True)
self.assertEqual(len(result.dissenting_views), 1)
self.assertEqual(result.dissenting_views[0].judge, "compliance_auditor")
self.assertFalse(result.dissenting_views[0].approved)
def test_dissent_extracts_key_concerns(self):
"""Test key concerns are extracted from reason."""
result = ConsensusResult(
classification="agent",
confidence=0.85,
agreement_ratio=0.67,
approval_type=ApprovalType.JUDGE_APPROVED,
judge_decisions=self.decisions
)
result.extract_dissent(final_approved=True)
# Should extract concerns from "Missing audit trail. Data retention policy unclear."
self.assertGreater(len(result.dissenting_views[0].key_concerns), 0)
def test_get_provenance_summary(self):
"""Test provenance summary generation."""
result = ConsensusResult(
classification="agent",
confidence=0.85,
agreement_ratio=0.67,
approval_type=ApprovalType.JUDGE_APPROVED,
judge_decisions=self.decisions
)
result.build_provenance_chain()
result.extract_dissent(final_approved=True)
summary = result.get_provenance_summary()
self.assertEqual(summary["judge_count"], 3)
self.assertEqual(summary["approval_count"], 2)
self.assertEqual(summary["rejection_count"], 1)
self.assertEqual(summary["dissent_count"], 1)
self.assertEqual(summary["total_token_usage"], 3700)
self.assertIn("claude-sonnet-4", summary["models_used"])
self.assertIn("gpt-4o", summary["models_used"])
def test_debate_metadata(self):
"""Test debate metadata storage."""
result = ConsensusResult(
classification="agent",
confidence=0.9,
agreement_ratio=0.9,
approval_type=ApprovalType.JUDGE_APPROVED,
debate_metadata={
"total_rounds": 2,
"initial_agreement": 0.6,
"final_agreement": 0.9,
"convergence_achieved": True
}
)
summary = result.get_provenance_summary()
self.assertTrue(summary["had_debate"])
self.assertEqual(result.debate_metadata["total_rounds"], 2)
class TestClassificationResultProvenance(unittest.TestCase): """Tests for ClassificationResult.to_dict() with provenance (H.3.4.5)."""
def setUp(self):
"""Set up test fixtures."""
self.decisions = [
JudgeDecision(
judge="technical_architect",
approved=True,
reason="Good",
confidence=0.9,
model_used="claude-sonnet-4",
token_usage=1000,
dimension_scores={"architecture": 3}
),
JudgeDecision(
judge="security_analyst",
approved=False,
reason="Security issues found",
confidence=0.85,
model_used="gpt-4o",
token_usage=1200,
dimension_scores={"security": 1}
),
]
self.consensus = ConsensusResult(
classification="agent",
confidence=0.85,
agreement_ratio=0.5,
approval_type=ApprovalType.ESCALATED,
judge_decisions=self.decisions
)
def test_to_dict_includes_provenance(self):
"""Test to_dict includes provenance data."""
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
result.build_full_provenance()
d = result.to_dict()
self.assertIn("provenance", d)
self.assertIn("chain", d["provenance"])
self.assertIn("dissenting_views", d["provenance"])
self.assertIn("total_token_usage", d["provenance"])
self.assertIn("summary", d["provenance"])
def test_to_dict_without_provenance(self):
"""Test to_dict can exclude provenance."""
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
d = result.to_dict(include_provenance=False)
self.assertNotIn("provenance", d)
def test_to_dict_judge_decisions_enhanced(self):
"""Test judge_decisions include provenance fields."""
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
d = result.to_dict()
judge_decision = d["judge_decisions"][0]
self.assertIn("model_used", judge_decision)
self.assertIn("token_usage", judge_decision)
self.assertIn("timestamp", judge_decision)
self.assertIn("dimension_scores", judge_decision)
def test_to_dict_provenance_chain(self):
"""Test provenance chain in output."""
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
result.build_full_provenance()
d = result.to_dict()
self.assertEqual(len(d["provenance"]["chain"]), 2)
self.assertEqual(d["provenance"]["chain"][0]["judge"], "technical_architect")
def test_to_dict_dissenting_views(self):
"""Test dissenting views in output."""
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
result.build_full_provenance()
d = result.to_dict()
# One dissent (security_analyst rejected, final = rejected due to 50/50)
# Actually with 50/50, final_approved = False (not > 50%)
# So technical_architect dissents
dissents = d["provenance"]["dissenting_views"]
self.assertGreater(len(dissents), 0)
def test_to_dict_with_debate_metadata(self):
"""Test debate metadata in provenance."""
self.consensus.debate_metadata = {
"total_rounds": 2,
"convergence_achieved": True
}
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
d = result.to_dict()
self.assertIn("debate", d["provenance"])
self.assertEqual(d["provenance"]["debate"]["total_rounds"], 2)
def test_to_dict_json_serializable(self):
"""Test output is JSON serializable."""
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
result.build_full_provenance()
d = result.to_dict()
# Should not raise
json_str = json.dumps(d)
self.assertIsInstance(json_str, str)
def test_build_full_provenance(self):
"""Test build_full_provenance method."""
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
result.build_full_provenance()
self.assertEqual(len(self.consensus.provenance_chain), 2)
self.assertEqual(self.consensus.total_token_usage, 2200)
# Dissenting views should be extracted
self.assertGreater(len(self.consensus.dissenting_views), 0)
def test_timestamp_uses_utc(self):
"""Test ClassificationResult uses UTC timestamps."""
result = ClassificationResult(
document_path="/path/to/doc.md",
result=self.consensus
)
self.assertIsNotNone(result.timestamp.tzinfo)
class TestProvenanceSummary(unittest.TestCase): """Tests for provenance summary functionality."""
def test_summary_with_no_decisions(self):
"""Test summary handles empty decisions."""
result = ConsensusResult(
classification="agent",
confidence=0.5,
agreement_ratio=0.0,
approval_type=ApprovalType.ESCALATED,
judge_decisions=[]
)
summary = result.get_provenance_summary()
self.assertEqual(summary["judge_count"], 0)
self.assertEqual(summary["approval_count"], 0)
self.assertEqual(summary["models_used"], [])
def test_summary_models_deduplication(self):
"""Test models_used is deduplicated."""
decisions = [
JudgeDecision("j1", True, "OK", model_used="claude-sonnet-4"),
JudgeDecision("j2", True, "OK", model_used="claude-sonnet-4"),
JudgeDecision("j3", True, "OK", model_used="gpt-4o"),
]
result = ConsensusResult(
classification="agent",
confidence=0.9,
agreement_ratio=1.0,
approval_type=ApprovalType.JUDGE_APPROVED,
judge_decisions=decisions
)
summary = result.get_provenance_summary()
# Should have 2 unique models
self.assertEqual(len(summary["models_used"]), 2)
class TestProvenanceEdgeCases(unittest.TestCase): """Edge case tests for provenance functionality."""
def test_decision_with_empty_model(self):
"""Test decision with empty model_used."""
decision = JudgeDecision(
judge="test",
approved=True,
reason="OK",
model_used=""
)
prov = decision.to_provenance_dict()
self.assertEqual(prov["model"], "")
def test_decision_with_zero_tokens(self):
"""Test decision with zero token usage."""
decision = JudgeDecision(
judge="test",
approved=True,
reason="OK",
token_usage=0
)
prov = decision.to_provenance_dict()
self.assertEqual(prov["token_usage"], 0)
def test_dissent_with_empty_reason(self):
"""Test dissent extraction with empty reason."""
decisions = [
JudgeDecision("j1", True, "", confidence=0.9),
JudgeDecision("j2", False, "", confidence=0.85),
]
result = ConsensusResult(
classification="agent",
confidence=0.85,
agreement_ratio=0.5,
approval_type=ApprovalType.ESCALATED,
judge_decisions=decisions
)
result.extract_dissent(final_approved=True)
# Should still create dissent, just with empty key_concerns
self.assertEqual(len(result.dissenting_views), 1)
self.assertEqual(result.dissenting_views[0].key_concerns, [])
def test_unanimous_approval_no_dissent(self):
"""Test no dissent when unanimous."""
decisions = [
JudgeDecision("j1", True, "OK", confidence=0.9),
JudgeDecision("j2", True, "OK", confidence=0.85),
JudgeDecision("j3", True, "OK", confidence=0.95),
]
result = ConsensusResult(
classification="agent",
confidence=0.9,
agreement_ratio=1.0,
approval_type=ApprovalType.JUDGE_APPROVED,
judge_decisions=decisions
)
result.extract_dissent(final_approved=True)
self.assertEqual(len(result.dissenting_views), 0)
def test_unanimous_rejection_no_dissent(self):
"""Test no dissent when unanimously rejected."""
decisions = [
JudgeDecision("j1", False, "Bad", confidence=0.9),
JudgeDecision("j2", False, "Bad", confidence=0.85),
]
result = ConsensusResult(
classification="agent",
confidence=0.85,
agreement_ratio=1.0,
approval_type=ApprovalType.ESCALATED,
judge_decisions=decisions
)
result.extract_dissent(final_approved=False)
self.assertEqual(len(result.dissenting_views), 0)
if name == 'main': unittest.main()