#!/usr/bin/env python3 """ Test script for MoE Orchestration Engine.
Tests the full classification pipeline:
- Consensus calculation from analyst votes
- Orchestrator coordination of analysts and judges
- Parallel execution and error handling
- Batch processing """
import sys import time from pathlib import Path
Add module path
sys.path.insert(0, str(Path(file).parent))
from core.models import Document, AnalystVote, ApprovalType from core.consensus import ConsensusCalculator, ConsensusConfig from core.orchestrator import ( MoEOrchestrator, OrchestratorConfig, create_default_orchestrator ) from analysts import get_all_analysts from judges import get_all_judges
def create_test_documents() -> list: """Create sample documents for testing.""" return [ Document( path=Path('/test/agents/test-agent.md'), content="""--- title: Test Agent type: agent tags: [agent, ai, specialist]
Test Agent
Role
AI specialist agent for testing classification.
Capabilities
- Automated task execution
- Code analysis and review
- Documentation generation """ ), Document( path=Path('/test/adrs/ADR-001-test.md'), content="""--- title: ADR-001 Test Decision type: adr
ADR-001: Test Architecture Decision
Status
Accepted
Context
We need to decide on the classification architecture.
Decision
We will use a Mixture of Experts approach with 5 analysts and 3 judges.
Consequences
- Better accuracy through ensemble voting
- Increased processing overhead
- Clear audit trail for all decisions """ ), Document( path=Path('/test/commands/sync.md'), content="""--- title: Sync Command type: command
/sync
Synchronize all components.
Invocation
/sync [--all] [--force]
Arguments
--all: Sync all components--force: Force sync even if up to date
Examples
/sync --all
/sync --force
""" ), Document( path=Path('/test/guides/getting-started.md'), content="""--- title: Getting Started Guide type: guide tags: [guide, tutorial, onboarding]
Getting Started with CODITECT
This guide will walk you through setting up CODITECT.
Prerequisites
- Python 3.10+
- Git
- Claude Code CLI
Step 1: Install Dependencies
Run the following command to install all dependencies.
Step 2: Configure Environment
Set up your environment variables.
Step 3: Run Initial Setup
Execute the setup script. """ ), Document( path=Path('/test/workflows/ci-pipeline.workflow.yaml'), content="""--- title: CI Pipeline Workflow type: workflow
CI/CD Pipeline Workflow
Automated continuous integration workflow.
Steps
Phase 1: Build
Compile and bundle the application.
Phase 2: Test
Run unit and integration tests.
Phase 3: Deploy
Deploy to staging environment. """ ), ]
def test_consensus_calculator(): """Test consensus calculation independently.""" print("\n" + "="*60) print("Testing ConsensusCalculator") print("="*60)
calc = ConsensusCalculator()
# Test 1: High-confidence unanimous votes
print("\n1. High-confidence unanimous votes:")
votes = [
AnalystVote(agent='a1', classification='agent', confidence=0.95, reasoning='match', duration_ms=5),
AnalystVote(agent='a2', classification='agent', confidence=0.90, reasoning='match', duration_ms=5),
AnalystVote(agent='a3', classification='agent', confidence=0.92, reasoning='match', duration_ms=5),
AnalystVote(agent='a4', classification='agent', confidence=0.88, reasoning='match', duration_ms=5),
AnalystVote(agent='a5', classification='agent', confidence=0.91, reasoning='match', duration_ms=5),
]
result = calc.calculate_from_votes(votes)
print(f" Classification: {result.classification}")
print(f" Confidence: {result.confidence:.1%}")
print(f" Agreement: {result.agreement_ratio:.0%}")
print(f" Status: {result.approval_type.value}")
assert result.approval_type == ApprovalType.AUTO_APPROVED, "Should auto-approve"
# Test 2: Split votes (needs judge review - moderate confidence)
print("\n2. Moderate confidence votes (needs judge review):")
votes = [
AnalystVote(agent='a1', classification='agent', confidence=0.80, reasoning='match', duration_ms=5),
AnalystVote(agent='a2', classification='agent', confidence=0.78, reasoning='match', duration_ms=5),
AnalystVote(agent='a3', classification='agent', confidence=0.75, reasoning='match', duration_ms=5),
AnalystVote(agent='a4', classification='agent', confidence=0.72, reasoning='match', duration_ms=5),
AnalystVote(agent='a5', classification='command', confidence=0.65, reasoning='match', duration_ms=5),
]
result = calc.calculate_from_votes(votes)
print(f" Classification: {result.classification}")
print(f" Confidence: {result.confidence:.1%}")
print(f" Agreement: {result.agreement_ratio:.0%}")
print(f" Status: {result.approval_type.value}")
assert result.approval_type == ApprovalType.PENDING, "Should need judge review"
# Test 2b: Low confidence split (should escalate)
print("\n2b. Low confidence split votes (should escalate):")
votes = [
AnalystVote(agent='a1', classification='agent', confidence=0.60, reasoning='match', duration_ms=5),
AnalystVote(agent='a2', classification='agent', confidence=0.55, reasoning='match', duration_ms=5),
AnalystVote(agent='a3', classification='agent', confidence=0.50, reasoning='match', duration_ms=5),
AnalystVote(agent='a4', classification='command', confidence=0.70, reasoning='match', duration_ms=5),
AnalystVote(agent='a5', classification='command', confidence=0.65, reasoning='match', duration_ms=5),
]
result = calc.calculate_from_votes(votes)
print(f" Classification: {result.classification}")
print(f" Confidence: {result.confidence:.1%}")
print(f" Agreement: {result.agreement_ratio:.0%}")
print(f" Status: {result.approval_type.value}")
print(f" Escalation: {result.escalation_reason}")
assert result.approval_type == ApprovalType.ESCALATED, "Should escalate due to low confidence"
# Test 3: Detailed breakdown
print("\n3. Vote breakdown:")
breakdown = calc.get_detailed_breakdown(votes)
for cls, info in breakdown['distribution'].items():
print(f" {cls}: {info['vote_count']} votes, {info['normalized']:.0%} weight")
print("\n ConsensusCalculator: PASSED")
def test_orchestrator_single(): """Test orchestrator with single document.""" print("\n" + "="*60) print("Testing MoEOrchestrator - Single Document") print("="*60)
# Create orchestrator with real analysts and judges
orchestrator = create_default_orchestrator()
docs = create_test_documents()
# Test each document type
for doc in docs:
doc_type = doc.frontmatter.get('type', 'unknown')
print(f"\n Processing: {doc.path.name} (expected: {doc_type})")
start = time.time()
result = orchestrator.classify(doc)
elapsed = (time.time() - start) * 1000
status_icon = "✓" if result.result.classification == doc_type else "✗"
print(f" {status_icon} Result: {result.result.classification}")
print(f" Confidence: {result.result.confidence:.1%}")
print(f" Agreement: {result.result.agreement_ratio:.0%}")
print(f" Status: {result.result.approval_type.value}")
print(f" Time: {elapsed:.0f}ms")
print("\n Single Document Tests: PASSED")
def test_orchestrator_batch(): """Test orchestrator with batch processing.""" print("\n" + "="*60) print("Testing MoEOrchestrator - Batch Processing") print("="*60)
orchestrator = create_default_orchestrator()
orchestrator.reset_stats() # Start fresh
docs = create_test_documents()
def progress_callback(current, total):
print(f"\r Progress: {current}/{total}", end="", flush=True)
print(f"\n Processing {len(docs)} documents...")
start = time.time()
results = orchestrator.classify_batch(docs, progress_callback)
elapsed = (time.time() - start) * 1000
print() # New line after progress
# Summarize results
correct = 0
for doc, result in zip(docs, results):
expected = doc.frontmatter.get('type', 'unknown')
if result.result.classification == expected:
correct += 1
print(f"\n Results:")
print(f" Total: {len(results)}")
print(f" Correct: {correct}/{len(results)} ({correct/len(results):.0%})")
print(f" Time: {elapsed:.0f}ms ({elapsed/len(results):.0f}ms avg)")
# Show stats
stats = orchestrator.get_stats()
print(f"\n Statistics:")
print(f" Auto-approved: {stats['auto_approved']}")
print(f" Judge-approved: {stats['judge_approved']}")
print(f" Escalated: {stats['escalated']}")
print(f" Approval rate: {stats['approval_rate']:.0%}")
print(f" Avg analyst time: {stats['avg_analyst_time_ms']:.1f}ms")
print(f" Avg judge time: {stats['avg_judge_time_ms']:.1f}ms")
print("\n Batch Processing: PASSED")
def test_orchestrator_edge_cases(): """Test orchestrator edge cases and error handling.""" print("\n" + "="*60) print("Testing MoEOrchestrator - Edge Cases") print("="*60)
orchestrator = create_default_orchestrator()
# Test 1: Minimal document
print("\n1. Minimal document (no frontmatter):")
doc = Document(
path=Path('/test/unknown.md'),
content="# Just a Title\n\nSome content without structure."
)
result = orchestrator.classify(doc)
print(f" Classification: {result.result.classification}")
print(f" Status: {result.result.approval_type.value}")
# Test 2: Empty content
print("\n2. Nearly empty document:")
doc = Document(
path=Path('/test/empty.md'),
content="# Empty"
)
result = orchestrator.classify(doc)
print(f" Classification: {result.result.classification}")
print(f" Status: {result.result.approval_type.value}")
# Test 3: Ambiguous document
print("\n3. Ambiguous document (mixed signals):")
doc = Document(
path=Path('/test/agents/but-is-guide.md'),
content="""---
title: Tutorial on Agents type: guide
How to Create an Agent
Prerequisites
Learn about agents.
Step 1: Understanding Agents
This agent specializes in...
Capabilities
-
Task execution """ ) result = orchestrator.classify(doc) print(f" Classification: {result.result.classification}") print(f" Confidence: {result.result.confidence:.1%}") print(f" Agreement: {result.result.agreement_ratio:.0%}") print(f" Status: {result.result.approval_type.value}") if result.result.escalation_reason: print(f" Escalation: {result.result.escalation_reason}")
print("\n Edge Cases: PASSED")
def test_full_pipeline(): """Full integration test of the classification pipeline.""" print("\n" + "="*60) print("Full Pipeline Integration Test") print("="*60)
# Get component counts
analysts = get_all_analysts()
judges = get_all_judges()
print(f"\n Components:")
print(f" Analysts: {len(analysts)} ({', '.join(a.name for a in analysts)})")
print(f" Judges: {len(judges)} ({', '.join(j.name for j in judges)})")
# Create orchestrator
orchestrator = create_default_orchestrator()
# Run a complete classification
doc = Document(
path=Path('/test/agents/orchestrator-agent.md'),
content="""---
title: Orchestrator Agent type: agent tags: [agent, orchestrator, ai] created: 2025-12-27
Orchestrator Agent
Multi-agent coordination specialist for complex workflows.
Role
Coordinate multiple specialized agents to achieve complex goals.
Capabilities
- Multi-agent task delegation
- Workflow orchestration
- Result aggregation
Tools
- Task tool for agent spawning
- TodoWrite for progress tracking
Invocation
Task(subagent_type="orchestrator", prompt="coordinate deployment")
""" )
print(f"\n Classifying: {doc.path}")
result = orchestrator.classify(doc)
print(f"\n RESULT:")
print(f" Classification: {result.result.classification}")
print(f" Confidence: {result.result.confidence:.1%}")
print(f" Agreement: {result.result.agreement_ratio:.0%}")
print(f" Approval: {result.result.approval_type.value}")
print(f" Processing Time: {result.processing_time_ms}ms")
print(f"\n Analyst Votes:")
for vote in result.result.votes:
print(f" {vote.agent}: {vote.classification} ({vote.confidence:.0%})")
print(f"\n Judge Decisions:")
for decision in result.result.judge_decisions:
status = "APPROVED" if decision.approved else "REJECTED"
print(f" {decision.judge}: {status} - {decision.reason[:50]}...")
# Verify correct classification
expected = 'agent'
if result.result.classification == expected:
print(f"\n ✓ Correct classification!")
else:
print(f"\n ✗ Expected '{expected}', got '{result.result.classification}'")
print("\n Full Pipeline: PASSED")
def main(): """Run all orchestrator tests.""" print("="*60) print("MoE Orchestration Engine Tests") print("="*60)
try:
test_consensus_calculator()
test_orchestrator_single()
test_orchestrator_batch()
test_orchestrator_edge_cases()
test_full_pipeline()
print("\n" + "="*60)
print("ALL TESTS PASSED")
print("="*60)
print("\nOrchestration Engine Components:")
print(" 1. ConsensusCalculator - Weighted voting and threshold logic")
print(" 2. MoEOrchestrator - Full pipeline coordination")
print(" 3. Parallel execution - ThreadPoolExecutor for analysts/judges")
print(" 4. Batch processing - Multiple documents with progress")
print(" 5. Error handling - Graceful degradation on failures")
except AssertionError as e:
print(f"\nTEST FAILED: {e}")
return 1
except Exception as e:
print(f"\nERROR: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if name == 'main': sys.exit(main())