scripts-coordinator
""" Type Expert Coordinator
Orchestrates all Type Expert agents to achieve consensus on document classification. This is the core of Phase 2 - coordinating expert analysis, vote reconciliation, and content enhancement to achieve near-100% autonomous classification.
Key responsibilities:
- Run all relevant experts on a document
- Analyze analyst vote patterns to identify disagreements
- Select the most likely document type
- Generate targeted enhancements to improve classification
- Provide audit trail for decisions """
import json from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple from pathlib import Path import sys
sys.path.insert(0, str(Path(file).parent.parent)) from core.models import Document, AnalystVote, ClassificationResult
from .base import TypeExpert, TypeAnalysis, ContentEnhancement from .guide_expert import GuideExpert from .reference_expert import ReferenceExpert from .workflow_expert import WorkflowExpert from .agent_expert import AgentExpert from .command_expert import CommandExpert from .adr_expert import ADRExpert from .skill_expert import SkillExpert from .hook_expert import HookExpert from .readme_expert import ReadmeExpert from .template_expert import TemplateExpert from .index_expert import IndexExpert from .report_expert import ReportExpert from .changelog_expert import ChangelogExpert
@dataclass class VoteAnalysis: """Analysis of analyst voting patterns.""" majority_type: str majority_count: int total_votes: int agreement_ratio: float dissenting_analysts: Dict[str, str] # {analyst_name: voted_type} confidence_spread: float # Max - min confidence avg_confidence: float
@dataclass class CoordinatorDecision: """Final decision from the coordinator.""" recommended_type: str confidence: float expert_analysis: TypeAnalysis vote_analysis: VoteAnalysis enhancements: List[ContentEnhancement] reasoning: str audit_trail: List[str]
class TypeExpertCoordinator: """ Coordinates Type Expert agents for optimal document classification.
This is the brain of Phase 2 - it understands analyst disagreement patterns,
selects the appropriate expert, and generates targeted content enhancements
to improve classification confidence.
"""
def __init__(self):
"""Initialize with all type experts (13 total)."""
self.experts: Dict[str, TypeExpert] = {
# Original 7
'guide': GuideExpert(),
'reference': ReferenceExpert(),
'workflow': WorkflowExpert(),
'agent': AgentExpert(),
'command': CommandExpert(),
'adr': ADRExpert(),
'skill': SkillExpert(),
# Phase 2 Expansion (6 new)
'hook': HookExpert(),
'readme': ReadmeExpert(),
'template': TemplateExpert(),
'index': IndexExpert(),
'report': ReportExpert(),
'changelog': ChangelogExpert(),
}
self.audit_log: List[str] = []
def coordinate(
self,
document: Document,
analyst_votes: List[AnalystVote],
initial_result: Optional[ClassificationResult] = None
) -> CoordinatorDecision:
"""
Coordinate type expert analysis for a document.
Args:
document: The document to classify
analyst_votes: Initial votes from all analysts
initial_result: Optional initial classification result
Returns:
CoordinatorDecision with recommended type and enhancements
"""
self.audit_log = []
self._log(f"Starting coordination for: {document.path}")
# Step 1: Analyze vote patterns
vote_analysis = self._analyze_votes(analyst_votes)
self._log(f"Vote analysis: {vote_analysis.majority_type} ({vote_analysis.majority_count}/{vote_analysis.total_votes})")
self._log(f"Agreement ratio: {vote_analysis.agreement_ratio:.2f}, Avg confidence: {vote_analysis.avg_confidence:.2f}")
if vote_analysis.dissenting_analysts:
self._log(f"Dissenters: {vote_analysis.dissenting_analysts}")
# Step 2: Determine candidate types to analyze
candidate_types = self._get_candidate_types(vote_analysis, initial_result, document)
self._log(f"Candidate types to analyze: {candidate_types}")
# Step 3: Run expert analysis for each candidate
expert_analyses: Dict[str, TypeAnalysis] = {}
for doc_type in candidate_types:
if doc_type in self.experts:
expert = self.experts[doc_type]
analysis = expert.analyze(document, analyst_votes)
expert_analyses[doc_type] = analysis
self._log(f"Expert '{doc_type}' says is_this_type={analysis.is_this_type}, conf={analysis.confidence:.2f}")
# Step 4: Select best type based on expert analyses
recommended_type, reasoning = self._select_best_type(
vote_analysis, expert_analyses
)
self._log(f"Selected type: {recommended_type}")
self._log(f"Reasoning: {reasoning}")
# Step 5: Generate enhancements if expert found missing signals
enhancements = []
if recommended_type in expert_analyses:
expert = self.experts[recommended_type]
analysis = expert_analyses[recommended_type]
if analysis.missing_signals:
self._log(f"Missing signals: {analysis.missing_signals}")
enhancements = expert.generate_enhancements(document, analysis)
self._log(f"Generated {len(enhancements)} enhancements")
# Step 6: Calculate final confidence
final_confidence = self._calculate_final_confidence(
vote_analysis,
expert_analyses.get(recommended_type),
enhancements
)
self._log(f"Final confidence: {final_confidence:.2f}")
return CoordinatorDecision(
recommended_type=recommended_type,
confidence=final_confidence,
expert_analysis=expert_analyses.get(recommended_type),
vote_analysis=vote_analysis,
enhancements=enhancements,
reasoning=reasoning,
audit_trail=self.audit_log.copy()
)
def _analyze_votes(self, votes: List[AnalystVote]) -> VoteAnalysis:
"""Analyze voting patterns to identify disagreements."""
if not votes:
return VoteAnalysis(
majority_type='unknown',
majority_count=0,
total_votes=0,
agreement_ratio=0.0,
dissenting_analysts={},
confidence_spread=0.0,
avg_confidence=0.0
)
# Count votes by type
vote_counts: Dict[str, int] = {}
for vote in votes:
vote_counts[vote.classification] = vote_counts.get(vote.classification, 0) + 1
# Find majority
majority_type = max(vote_counts, key=vote_counts.get)
majority_count = vote_counts[majority_type]
# Find dissenters
dissenting = {
v.agent: v.classification
for v in votes
if v.classification != majority_type
}
# Calculate confidence stats
confidences = [v.confidence for v in votes]
avg_conf = sum(confidences) / len(confidences)
conf_spread = max(confidences) - min(confidences)
return VoteAnalysis(
majority_type=majority_type,
majority_count=majority_count,
total_votes=len(votes),
agreement_ratio=majority_count / len(votes),
dissenting_analysts=dissenting,
confidence_spread=conf_spread,
avg_confidence=avg_conf
)
def _get_candidate_types(
self,
vote_analysis: VoteAnalysis,
initial_result: Optional[ClassificationResult],
document: Optional[Document] = None
) -> List[str]:
"""Determine which types to analyze based on voting patterns and filename."""
candidates = set()
# Always include majority type
candidates.add(vote_analysis.majority_type)
# Include all dissenting votes
for voted_type in vote_analysis.dissenting_analysts.values():
candidates.add(voted_type)
# If initial result differs, include it
if initial_result and hasattr(initial_result, 'result') and initial_result.result:
if initial_result.result.classification:
candidates.add(initial_result.result.classification)
# Phase 2: Also check filename patterns for new document types
# This ensures new experts get triggered even if analysts don't vote for them
if document:
doc_path = str(document.path)
filename = Path(doc_path).stem.upper()
# Primary filename patterns (take precedence)
filename_type_map = {
'README': 'readme',
'CHANGELOG': 'changelog',
'HISTORY': 'changelog',
'TEMPLATE': 'template',
'INDEX': 'index',
'CATALOG': 'index',
'INVENTORY': 'index',
'REPORT': 'report',
'STATUS': 'report',
'SUMMARY': 'report',
}
for pattern, doc_type in filename_type_map.items():
if pattern in filename:
candidates.add(doc_type)
self._log(f"Filename pattern '{pattern}' suggests type: {doc_type}")
# Check path for hooks (only if not already matched by filename patterns)
# INDEX files in hooks/ should be classified as index, not hook
is_index_or_catalog = any(p in filename for p in ['INDEX', 'CATALOG', 'INVENTORY'])
if not is_index_or_catalog:
if '/hooks/' in doc_path.lower() or 'hook' in filename.lower():
candidates.add('hook')
self._log("Path/filename suggests type: hook")
# Filter to known types
return [t for t in candidates if t in self.experts]
def _select_best_type(
self,
vote_analysis: VoteAnalysis,
expert_analyses: Dict[str, TypeAnalysis]
) -> Tuple[str, str]:
"""Select the best document type based on votes and expert analysis."""
reasoning_parts = []
# If experts strongly agree with a type
confirming_experts = [
(doc_type, analysis)
for doc_type, analysis in expert_analyses.items()
if analysis.is_this_type and analysis.confidence > 0.7
]
if len(confirming_experts) == 1:
doc_type, analysis = confirming_experts[0]
reasoning_parts.append(
f"Expert '{doc_type}' strongly confirms (conf={analysis.confidence:.2f})"
)
# Check if this aligns with majority vote
if doc_type == vote_analysis.majority_type:
reasoning_parts.append("Expert agrees with analyst majority")
return doc_type, "; ".join(reasoning_parts)
else:
reasoning_parts.append(
f"Expert overrides analyst majority ({vote_analysis.majority_type})"
)
return doc_type, "; ".join(reasoning_parts)
# If multiple experts confirm, use vote count as tiebreaker
if len(confirming_experts) > 1:
# Sort by confidence, then by vote alignment
confirming_experts.sort(
key=lambda x: (
x[1].confidence,
1 if x[0] == vote_analysis.majority_type else 0
),
reverse=True
)
doc_type, analysis = confirming_experts[0]
reasoning_parts.append(
f"Multiple experts confirm; selected '{doc_type}' (highest confidence {analysis.confidence:.2f})"
)
return doc_type, "; ".join(reasoning_parts)
# No expert strongly confirms - fall back to majority vote
if vote_analysis.agreement_ratio >= 0.6:
reasoning_parts.append(
f"No strong expert confirmation; using majority vote "
f"({vote_analysis.majority_count}/{vote_analysis.total_votes})"
)
return vote_analysis.majority_type, "; ".join(reasoning_parts)
# Low agreement and no expert confirmation - need more analysis
reasoning_parts.append(
f"Low analyst agreement ({vote_analysis.agreement_ratio:.2f}) "
f"and no strong expert confirmation"
)
# Use expert with highest confidence even if not strongly confirming
if expert_analyses:
best_expert = max(
expert_analyses.items(),
key=lambda x: x[1].confidence
)
doc_type, analysis = best_expert
reasoning_parts.append(
f"Selected '{doc_type}' as best available (conf={analysis.confidence:.2f})"
)
return doc_type, "; ".join(reasoning_parts)
return vote_analysis.majority_type, "Fallback to majority vote"
def _calculate_final_confidence(
self,
vote_analysis: VoteAnalysis,
expert_analysis: Optional[TypeAnalysis],
enhancements: List[ContentEnhancement]
) -> float:
"""Calculate final confidence score."""
# Base confidence from votes
base_confidence = vote_analysis.avg_confidence * vote_analysis.agreement_ratio
# Expert boost
expert_boost = 0.0
if expert_analysis and expert_analysis.is_this_type:
expert_boost = expert_analysis.confidence * 0.3
# Enhancement boost (potential improvement)
enhancement_boost = 0.0
if enhancements:
for e in enhancements:
enhancement_boost += sum(e.expected_analyst_boost.values()) * 0.5
enhancement_boost = min(0.2, enhancement_boost) # Cap at 0.2
final = min(0.98, base_confidence + expert_boost + enhancement_boost)
return max(0.1, final)
def _log(self, message: str):
"""Add to audit log."""
self.audit_log.append(message)
def get_expert(self, doc_type: str) -> Optional[TypeExpert]:
"""Get a specific type expert."""
return self.experts.get(doc_type)
def get_all_experts(self) -> List[TypeExpert]:
"""Get all type experts."""
return list(self.experts.values())
def format_decision_report(self, decision: CoordinatorDecision) -> str:
"""Format decision as human-readable report."""
lines = [
"=" * 60,
"TYPE EXPERT COORDINATOR DECISION",
"=" * 60,
"",
f"Recommended Type: {decision.recommended_type}",
f"Confidence: {decision.confidence:.2%}",
"",
"REASONING:",
decision.reasoning,
"",
"VOTE ANALYSIS:",
f" Majority: {decision.vote_analysis.majority_type} ({decision.vote_analysis.majority_count}/{decision.vote_analysis.total_votes})",
f" Agreement: {decision.vote_analysis.agreement_ratio:.2%}",
f" Avg Confidence: {decision.vote_analysis.avg_confidence:.2%}",
]
if decision.vote_analysis.dissenting_analysts:
lines.append(" Dissenters:")
for analyst, voted_type in decision.vote_analysis.dissenting_analysts.items():
lines.append(f" - {analyst}: voted {voted_type}")
if decision.expert_analysis:
lines.extend([
"",
"EXPERT ANALYSIS:",
f" Type: {decision.expert_analysis.expert_type}",
f" Is this type: {decision.expert_analysis.is_this_type}",
f" Confidence: {decision.expert_analysis.confidence:.2%}",
"",
" Evidence For:",
])
for e in decision.expert_analysis.evidence_for[:5]:
lines.append(f" + {e}")
if decision.expert_analysis.evidence_against:
lines.append(" Evidence Against:")
for e in decision.expert_analysis.evidence_against[:3]:
lines.append(f" - {e}")
if decision.expert_analysis.missing_signals:
lines.append(f" Missing Signals: {', '.join(decision.expert_analysis.missing_signals)}")
if decision.enhancements:
lines.extend([
"",
"RECOMMENDED ENHANCEMENTS:",
])
for i, e in enumerate(decision.enhancements, 1):
lines.append(f" {i}. [{e.signal_type}] (priority {e.priority})")
lines.append(f" Reason: {e.reason}")
boost = sum(e.expected_analyst_boost.values())
lines.append(f" Expected boost: +{boost:.2%}")
lines.extend([
"",
"AUDIT TRAIL:",
])
for log in decision.audit_trail:
lines.append(f" > {log}")
lines.append("=" * 60)
return "\n".join(lines)