Skip to main content

#!/usr/bin/env python3 """ Enhanced MoE Orchestrator - Integrates all enhancement modules.

This orchestrator extends the base MoE system with:

  • Semantic embeddings for improved similarity
  • Historical learning with dynamic weights
  • Memory system integration
  • Adaptive thresholds
  • Confidence calibration
  • Additional specialized judges
  • Batch corpus analysis """

import logging from typing import Dict, List, Optional, Any from dataclasses import dataclass

from .models import Document, ClassificationResult, ConsensusResult, AnalystVote, ApprovalType from .embeddings import get_embedding_service, SemanticEmbeddingService from .learning import get_learner, ClassificationLearner from .memory_integration import get_memory_classifier, MemoryEnhancedClassifier from .adaptive_thresholds import get_threshold_manager, AdaptiveThresholdManager from .calibration import get_calibrator, ConfidenceCalibrator from .orchestrator import create_default_orchestrator, MoEOrchestrator

logger = logging.getLogger(name)

@dataclass class EnhancedOrchestratorConfig: """Configuration for enhanced orchestrator.""" use_embeddings: bool = True use_learning: bool = True use_memory: bool = True use_adaptive_thresholds: bool = True use_calibration: bool = True use_enhanced_judges: bool = True max_parallel_analysts: int = 5 max_parallel_judges: int = 3

class EnhancedMoEOrchestrator: """ Enhanced MoE Orchestrator integrating all improvement modules.

Provides a unified interface for document classification with:
- Pre-classification: Embedding similarity, memory hints
- Classification: Base analysts with dynamic weights
- Post-classification: Calibration, learning, adaptive thresholds
"""

def __init__(self, config: Optional[EnhancedOrchestratorConfig] = None):
self.config = config or EnhancedOrchestratorConfig()

# Initialize base orchestrator for actual classification
self.base_orchestrator: MoEOrchestrator = create_default_orchestrator()

# Initialize enhancement modules
self.embedding_service: Optional[SemanticEmbeddingService] = None
self.learner: Optional[ClassificationLearner] = None
self.memory_classifier: Optional[MemoryEnhancedClassifier] = None
self.threshold_manager: Optional[AdaptiveThresholdManager] = None
self.calibrator: Optional[ConfidenceCalibrator] = None

self._init_modules()

def _init_modules(self):
"""Initialize enhancement modules based on config."""
if self.config.use_embeddings:
try:
self.embedding_service = get_embedding_service()
logger.info(f"Embeddings: {'available' if self.embedding_service.is_available() else 'fallback mode'}")
except Exception as e:
logger.warning(f"Failed to init embeddings: {e}")

if self.config.use_learning:
try:
self.learner = get_learner()
logger.info("Learning system initialized")
except Exception as e:
logger.warning(f"Failed to init learning: {e}")

if self.config.use_memory:
try:
self.memory_classifier = get_memory_classifier()
logger.info(f"Memory: {'available' if self.memory_classifier.is_available() else 'unavailable'}")
except Exception as e:
logger.warning(f"Failed to init memory: {e}")

if self.config.use_adaptive_thresholds:
try:
self.threshold_manager = get_threshold_manager()
logger.info("Adaptive thresholds initialized")
except Exception as e:
logger.warning(f"Failed to init thresholds: {e}")

if self.config.use_calibration:
try:
self.calibrator = get_calibrator()
logger.info("Calibration initialized")
except Exception as e:
logger.warning(f"Failed to init calibration: {e}")

def get_pre_classification_hints(self, document: Document) -> Dict[str, Any]:
"""Get hints from embeddings and memory before classification."""
hints = {
'embedding_suggestion': None,
'embedding_confidence': 0.0,
'memory_suggestion': None,
'memory_confidence': 0.0,
'combined_hint': None
}

# Embedding-based hint
if self.embedding_service and self.embedding_service.is_available():
try:
result = self.embedding_service.classify(document.content)
hints['embedding_suggestion'] = result.classification
hints['embedding_confidence'] = result.confidence
except Exception as e:
logger.debug(f"Embedding hint failed: {e}")

# Memory-based hint
if self.memory_classifier and self.memory_classifier.is_available():
try:
mem_hints = self.memory_classifier.get_classification_hints(
str(document.path), document.content)
hints['memory_suggestion'] = mem_hints.get('suggested_type')
hints['memory_confidence'] = mem_hints.get('confidence_boost', 0.0)
except Exception as e:
logger.debug(f"Memory hint failed: {e}")

# Combine hints
if hints['embedding_suggestion'] and hints['memory_suggestion']:
if hints['embedding_suggestion'] == hints['memory_suggestion']:
hints['combined_hint'] = hints['embedding_suggestion']
elif hints['embedding_suggestion']:
hints['combined_hint'] = hints['embedding_suggestion']
elif hints['memory_suggestion']:
hints['combined_hint'] = hints['memory_suggestion']

return hints

def classify(self, document: Document) -> ClassificationResult:
"""
Classify a document using enhanced classification with embeddings.

This method:
1. Gets pre-classification hints from embeddings/memory
2. Runs base classification through the MoE pipeline
3. Enhances the result with embedding confidence if applicable
4. Records the classification for learning
5. Returns the enhanced result

Args:
document: The document to classify

Returns:
ClassificationResult with enhanced confidence
"""
# Get pre-classification hints
hints = self.get_pre_classification_hints(document)

# Run base classification
result = self.base_orchestrator.classify(document)

# Enhance result with embedding confidence if available
if hints.get('embedding_suggestion') and result.result:
embedding_conf = hints.get('embedding_confidence', 0.0)

# If embedding agrees with classification, boost confidence slightly
if hints['embedding_suggestion'] == result.result.classification:
# Blend: 70% base confidence + 30% embedding confidence
blended = (result.result.confidence * 0.7) + (embedding_conf * 0.3)
# Only boost, never reduce
if blended > result.result.confidence:
result.result.confidence = min(blended, 1.0)
logger.debug(f"Boosted confidence with embedding: {result.result.confidence:.2%}")

# Store embedding info in result metadata
if not hasattr(result, 'enhancement_metadata'):
result.enhancement_metadata = {}
result.enhancement_metadata['embedding_hint'] = hints['embedding_suggestion']
result.enhancement_metadata['embedding_confidence'] = embedding_conf
result.enhancement_metadata['hints_agreed'] = (
hints['embedding_suggestion'] == result.result.classification
)

# Apply calibration if available
if self.calibrator and result.result:
calibrated = self.calibrate_confidence(result.result.confidence)
result.result.confidence = calibrated

# Record for learning
self.record_classification(result)

return result

def get_dynamic_analyst_weights(self) -> Dict[str, float]:
"""Get dynamic weights for analysts based on historical accuracy."""
if self.learner:
try:
return self.learner.get_analyst_weights()
except Exception as e:
logger.debug(f"Failed to get weights: {e}")
return {}

def get_adaptive_thresholds(self) -> Dict[str, float]:
"""Get current adaptive thresholds."""
if self.threshold_manager:
return self.threshold_manager.get_thresholds()
return {
'auto_approval': 0.90,
'judge_approval': 0.85,
'agreement': 0.60
}

def calibrate_confidence(self, raw_confidence: float) -> float:
"""Apply calibration to confidence score."""
if self.calibrator:
try:
return self.calibrator.calibrate(raw_confidence)
except Exception:
pass
return raw_confidence

def record_classification(self, result: ClassificationResult):
"""Record classification for learning and threshold adjustment."""
if not result.result:
return

# Record in learning system
if self.learner:
try:
self.learner.record_classification(
document_path=result.document_path,
predicted_type=result.result.classification or "unknown",
confidence=result.result.confidence,
agreement_ratio=result.result.agreement_ratio,
analyst_votes=[{
'agent': v.agent,
'classification': v.classification,
'confidence': v.confidence
} for v in result.result.votes],
approval_type=result.result.approval_type.value if result.result.approval_type else None
)
except Exception as e:
logger.debug(f"Failed to record learning: {e}")

# Record in threshold manager
if self.threshold_manager:
try:
was_escalated = result.result.approval_type in [
ApprovalType.ESCALATED,
ApprovalType.HUMAN_REVIEW_REQUIRED
]
self.threshold_manager.record_classification(
confidence=result.result.confidence,
was_escalated=was_escalated,
approval_type=result.result.approval_type.value if result.result.approval_type else "UNKNOWN"
)
except Exception as e:
logger.debug(f"Failed to record threshold: {e}")

def confirm_classification(self, document_path: str, actual_type: str) -> bool:
"""Confirm a classification for learning."""
success = False

if self.learner:
try:
success = self.learner.confirm_classification(document_path, actual_type)
except Exception as e:
logger.warning(f"Failed to confirm in learner: {e}")

if self.calibrator and success:
try:
# Get the original prediction
history = self.learner.get_classification_history(document_path, limit=1)
if history:
was_correct = history[0].predicted_type == actual_type
self.calibrator.record(history[0].confidence, was_correct)
except Exception as e:
logger.debug(f"Failed to record calibration: {e}")

if self.threshold_manager and success:
try:
history = self.learner.get_classification_history(document_path, limit=1)
if history:
was_correct = history[0].predicted_type == actual_type
self.threshold_manager.record_confirmation(was_correct)
except Exception as e:
logger.debug(f"Failed to record threshold confirmation: {e}")

return success

def get_stats(self) -> Dict[str, Any]:
"""Get statistics from all enhancement modules."""
stats = {
'modules_enabled': {
'embeddings': self.config.use_embeddings,
'learning': self.config.use_learning,
'memory': self.config.use_memory,
'adaptive_thresholds': self.config.use_adaptive_thresholds,
'calibration': self.config.use_calibration
}
}

if self.embedding_service:
stats['embeddings'] = self.embedding_service.get_stats()

if self.learner:
stats['learning'] = self.learner.get_stats()

if self.memory_classifier:
stats['memory'] = {'available': self.memory_classifier.is_available()}

if self.threshold_manager:
stats['thresholds'] = self.threshold_manager.get_stats()

if self.calibrator:
stats['calibration'] = self.calibrator.get_stats()

return stats

Singleton instance

_enhanced_orchestrator: Optional[EnhancedMoEOrchestrator] = None

def get_enhanced_orchestrator(config: Optional[EnhancedOrchestratorConfig] = None) -> EnhancedMoEOrchestrator: """Get or create singleton enhanced orchestrator.""" global _enhanced_orchestrator # Create new instance if config is provided (for testing/comparison) if config is not None: _enhanced_orchestrator = EnhancedMoEOrchestrator(config) elif _enhanced_orchestrator is None: _enhanced_orchestrator = EnhancedMoEOrchestrator() return _enhanced_orchestrator

def reset_enhanced_orchestrator(): """Reset the singleton (useful for testing).""" global _enhanced_orchestrator _enhanced_orchestrator = None

if name == "main": logging.basicConfig(level=logging.INFO)

orchestrator = get_enhanced_orchestrator()
print("Enhanced Orchestrator Stats:")
import json
print(json.dumps(orchestrator.get_stats(), indent=2, default=str))