scripts-models
""" Core data models for MoE Classification System.
Enhanced with provenance tracking (H.3.4) for full audit trail:
- JudgeDecision: model_used, timestamps, token_usage, raw_response
- ConsensusResult: provenance_chain, dissenting_views, token/latency metrics """
from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Dict, List, Optional, Any import yaml import re
class DocumentType(str, Enum): """Valid document types per ADR-018.""" AGENT = "agent" COMMAND = "command" SKILL = "skill" SCRIPT = "script" HOOK = "hook" GUIDE = "guide" REFERENCE = "reference" ADR = "adr" WORKFLOW = "workflow" CONFIG = "config"
class ApprovalType(str, Enum): """Classification approval types.""" AUTO_APPROVED = "AUTO_APPROVED" JUDGE_APPROVED = "JUDGE_APPROVED" DEEP_ANALYSIS_APPROVED = "DEEP_ANALYSIS_APPROVED" # Resolved by deep analysts HUMAN_REVIEW_REQUIRED = "HUMAN_REVIEW_REQUIRED" # Deep analysis inconclusive ESCALATED = "ESCALATED" PENDING = "PENDING"
@dataclass class Document: """Represents a document to be classified.""" path: Path content: str frontmatter: Dict = field(default_factory=dict) body: str = ""
def __post_init__(self):
self._parse_frontmatter()
def _parse_frontmatter(self):
"""Parse YAML frontmatter from content."""
if self.content.strip().startswith('---'):
match = re.match(r'^---\s*\n(.*?)\n---\s*\n', self.content, re.DOTALL)
if match:
try:
self.frontmatter = yaml.safe_load(match.group(1)) or {}
self.body = self.content[match.end():]
except yaml.YAMLError:
self.frontmatter = {}
self.body = self.content
else:
self.body = self.content
else:
self.body = self.content
@classmethod
def from_path(cls, path: Path) -> 'Document':
"""Load document from file path."""
content = path.read_text(encoding='utf-8')
return cls(path=path, content=content)
@property
def extension(self) -> str:
return self.path.suffix.lower()
@property
def filename(self) -> str:
return self.path.name
@property
def directory(self) -> str:
return str(self.path.parent)
@property
def size_bytes(self) -> int:
return len(self.content.encode('utf-8'))
@dataclass class AnalystVote: """Result from a single analyst agent.""" agent: str classification: str confidence: float reasoning: str duration_ms: int = 0 metadata: Dict = field(default_factory=dict)
def is_valid(self, min_confidence: float = 0.70) -> bool:
"""Check if vote meets minimum requirements."""
return (
self.confidence >= min_confidence and
self.classification in [t.value for t in DocumentType]
)
@dataclass class JudgeDecision: """Result from a judge agent with full provenance tracking (H.3.4.1).
Provenance fields for audit trail:
- model_used: Which LLM model rendered the decision
- timestamp: When the evaluation was recorded
- token_usage: Input + output tokens consumed
- raw_response: Complete model response for audit
- evaluation_start_time: When evaluation began
- evaluation_end_time: When evaluation completed
- dimension_scores: Per-dimension scoring (1-3 scale)
"""
judge: str
approved: bool
reason: str
confidence: float = 1.0
duration_ms: int = 0
metadata: Dict = field(default_factory=dict)
# Provenance fields (H.3.4.1)
model_used: str = ""
timestamp: Optional[datetime] = None
token_usage: int = 0
raw_response: str = ""
evaluation_start_time: Optional[datetime] = None
evaluation_end_time: Optional[datetime] = None
dimension_scores: Dict[str, float] = field(default_factory=dict)
def __post_init__(self):
"""Set timestamp if not provided."""
if self.timestamp is None:
self.timestamp = datetime.now(timezone.utc)
@property
def latency_ms(self) -> int:
"""Calculate latency from start/end times or use duration_ms."""
if self.evaluation_start_time and self.evaluation_end_time:
delta = self.evaluation_end_time - self.evaluation_start_time
return int(delta.total_seconds() * 1000)
return self.duration_ms
def to_provenance_dict(self) -> Dict[str, Any]:
"""Convert to provenance record for audit trail."""
return {
"judge": self.judge,
"model": self.model_used,
"approved": self.approved,
"confidence": self.confidence,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"token_usage": self.token_usage,
"latency_ms": self.latency_ms,
"dimension_scores": self.dimension_scores,
"reason_preview": self.reason[:200] + "..." if len(self.reason) > 200 else self.reason
}
@dataclass class DissentingView: """Record of a dissenting judge opinion for audit trail (H.3.4.4).
Captures judges who disagreed with the final verdict for:
- Transparency in decision-making
- Audit compliance (regulated industries)
- Analysis of recurring dissent patterns
"""
judge: str
model_used: str
approved: bool # Their decision (opposite of final)
confidence: float
reason: str
key_concerns: List[str] = field(default_factory=list)
dimension_scores: Dict[str, float] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"judge": self.judge,
"model": self.model_used,
"approved": self.approved,
"confidence": self.confidence,
"reason": self.reason,
"key_concerns": self.key_concerns,
"dimension_scores": self.dimension_scores
}
@dataclass class ConsensusResult: """Result of consensus calculation with full provenance (H.3.4.2).
Provenance fields for audit trail:
- provenance_chain: Ordered list of all judge evaluations
- dissenting_views: Judges who disagreed with final verdict
- total_token_usage: Sum of all token consumption
- total_latency_ms: Sum of all evaluation latencies
- debate_metadata: Details if debate protocol was used
"""
classification: Optional[str]
confidence: float
agreement_ratio: float
approval_type: ApprovalType
votes: List[AnalystVote] = field(default_factory=list)
judge_decisions: List[JudgeDecision] = field(default_factory=list)
escalation_reason: Optional[str] = None
deep_analysis_reasoning: Optional[str] = None # Reasoning from deep analysts
# Provenance fields (H.3.4.2)
provenance_chain: List[Dict[str, Any]] = field(default_factory=list)
dissenting_views: List[DissentingView] = field(default_factory=list)
total_token_usage: int = 0
total_latency_ms: int = 0
debate_metadata: Optional[Dict[str, Any]] = None
# Provider adjustment fields (ADR-073)
provider_adjustment_applied: bool = False
raw_confidence: Optional[float] = None # Confidence before adjustment
provider_mode: Optional[str] = None # single, dual, or multi
def build_provenance_chain(self) -> None:
"""Build provenance chain from judge decisions (H.3.4.3)."""
self.provenance_chain = [
decision.to_provenance_dict()
for decision in self.judge_decisions
]
# Calculate totals
self.total_token_usage = sum(d.token_usage for d in self.judge_decisions)
self.total_latency_ms = sum(d.latency_ms for d in self.judge_decisions)
def extract_dissent(self, final_approved: bool) -> None:
"""Extract dissenting opinions from judges (H.3.4.4).
Args:
final_approved: The final consensus decision (True = approved)
"""
self.dissenting_views = []
for decision in self.judge_decisions:
if decision.approved != final_approved:
# Extract key concerns from reason
key_concerns = []
if decision.reason:
# Split by common delimiters to extract concerns
concerns = [
c.strip()
for c in decision.reason.replace('\n', '. ').split('. ')
if len(c.strip()) > 10
][:3] # Top 3 concerns
key_concerns = concerns
self.dissenting_views.append(DissentingView(
judge=decision.judge,
model_used=decision.model_used,
approved=decision.approved,
confidence=decision.confidence,
reason=decision.reason,
key_concerns=key_concerns,
dimension_scores=decision.dimension_scores.copy()
))
def get_provenance_summary(self) -> Dict[str, Any]:
"""Get summary of provenance data for logging."""
return {
"judge_count": len(self.judge_decisions),
"approval_count": sum(1 for d in self.judge_decisions if d.approved),
"rejection_count": sum(1 for d in self.judge_decisions if not d.approved),
"dissent_count": len(self.dissenting_views),
"total_token_usage": self.total_token_usage,
"total_latency_ms": self.total_latency_ms,
"models_used": list(set(d.model_used for d in self.judge_decisions if d.model_used)),
"had_debate": self.debate_metadata is not None
}
@dataclass class ClassificationResult: """Final classification result with full audit trail (H.3.4.5).
Includes complete provenance chain for regulatory compliance:
- Per-judge evaluation records with model/token/latency
- Dissenting views for transparency
- Debate protocol results if applicable
"""
document_path: str
result: ConsensusResult
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
processing_time_ms: int = 0
deep_analysis: Optional[object] = None # DeepAnalysisResult when applicable
def to_dict(self, include_provenance: bool = True) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization.
Args:
include_provenance: Include full provenance data (default True)
Returns:
Dictionary suitable for JSON serialization
"""
result_dict: Dict[str, Any] = {
"document_path": self.document_path,
"classification": self.result.classification,
"confidence": self.result.confidence,
"agreement_ratio": self.result.agreement_ratio,
"approval_type": self.result.approval_type.value,
"votes": [
{
"agent": v.agent,
"classification": v.classification,
"confidence": v.confidence,
"reasoning": v.reasoning,
"duration_ms": v.duration_ms
}
for v in self.result.votes
],
"judge_decisions": [
{
"judge": j.judge,
"approved": j.approved,
"reason": j.reason,
"confidence": j.confidence,
"duration_ms": j.duration_ms,
"metadata": j.metadata,
# Enhanced provenance fields (H.3.4.1)
"model_used": j.model_used,
"token_usage": j.token_usage,
"timestamp": j.timestamp.isoformat() if j.timestamp else None,
"dimension_scores": j.dimension_scores
}
for j in self.result.judge_decisions
],
"escalation_reason": self.result.escalation_reason,
"deep_analysis_reasoning": self.result.deep_analysis_reasoning,
"timestamp": self.timestamp.isoformat(),
"processing_time_ms": self.processing_time_ms
}
# Add provenance data (H.3.4.5)
if include_provenance:
result_dict["provenance"] = {
"chain": self.result.provenance_chain,
"dissenting_views": [
dv.to_dict() for dv in self.result.dissenting_views
],
"total_token_usage": self.result.total_token_usage,
"total_latency_ms": self.result.total_latency_ms,
"summary": self.result.get_provenance_summary()
}
# Add debate metadata if present
if self.result.debate_metadata:
result_dict["provenance"]["debate"] = self.result.debate_metadata
# Add deep analysis details if present
if self.deep_analysis:
result_dict["deep_analysis"] = {
"original_classification": self.deep_analysis.original_classification,
"original_confidence": self.deep_analysis.original_confidence,
"final_classification": self.deep_analysis.final_classification,
"final_confidence": self.deep_analysis.final_confidence,
"consensus_reached": self.deep_analysis.consensus_reached,
"requires_human_review": self.deep_analysis.requires_human_review,
"reasoning": self.deep_analysis.reasoning,
"votes": [
{
"analyst": v.analyst_type.value,
"classification": v.classification,
"confidence": v.confidence,
"reasoning": v.reasoning,
"evidence": v.evidence
}
for v in self.deep_analysis.deep_votes
]
}
return result_dict
def build_full_provenance(self) -> None:
"""Build complete provenance data for this classification.
Call this after all judge decisions are collected to:
1. Build the provenance chain
2. Extract dissenting views
3. Calculate token/latency totals
"""
# Build provenance chain
self.result.build_provenance_chain()
# Determine final verdict (majority approval)
if self.result.judge_decisions:
approvals = sum(1 for d in self.result.judge_decisions if d.approved)
final_approved = approvals > len(self.result.judge_decisions) / 2
self.result.extract_dissent(final_approved)