Skip to main content

#!/usr/bin/env python3 """ Test script for MoE Orchestration Engine.

Tests the full classification pipeline:

  1. Consensus calculation from analyst votes
  2. Orchestrator coordination of analysts and judges
  3. Parallel execution and error handling
  4. Batch processing """

import sys import time from pathlib import Path

Add module path

sys.path.insert(0, str(Path(file).parent))

from core.models import Document, AnalystVote, ApprovalType from core.consensus import ConsensusCalculator, ConsensusConfig from core.orchestrator import ( MoEOrchestrator, OrchestratorConfig, create_default_orchestrator ) from analysts import get_all_analysts from judges import get_all_judges

def create_test_documents() -> list: """Create sample documents for testing.""" return [ Document( path=Path('/test/agents/test-agent.md'), content="""--- title: Test Agent type: agent tags: [agent, ai, specialist]

Test Agent

Role

AI specialist agent for testing classification.

Capabilities

  • Automated task execution
  • Code analysis and review
  • Documentation generation """ ), Document( path=Path('/test/adrs/ADR-001-test.md'), content="""--- title: ADR-001 Test Decision type: adr

ADR-001: Test Architecture Decision

Status

Accepted

Context

We need to decide on the classification architecture.

Decision

We will use a Mixture of Experts approach with 5 analysts and 3 judges.

Consequences

  • Better accuracy through ensemble voting
  • Increased processing overhead
  • Clear audit trail for all decisions """ ), Document( path=Path('/test/commands/sync.md'), content="""--- title: Sync Command type: command

/sync

Synchronize all components.

Invocation

/sync [--all] [--force]

Arguments

  • --all: Sync all components
  • --force: Force sync even if up to date

Examples

/sync --all
/sync --force

""" ), Document( path=Path('/test/guides/getting-started.md'), content="""--- title: Getting Started Guide type: guide tags: [guide, tutorial, onboarding]

Getting Started with CODITECT

This guide will walk you through setting up CODITECT.

Prerequisites

  • Python 3.10+
  • Git
  • Claude Code CLI

Step 1: Install Dependencies

Run the following command to install all dependencies.

Step 2: Configure Environment

Set up your environment variables.

Step 3: Run Initial Setup

Execute the setup script. """ ), Document( path=Path('/test/workflows/ci-pipeline.workflow.yaml'), content="""--- title: CI Pipeline Workflow type: workflow

CI/CD Pipeline Workflow

Automated continuous integration workflow.

Steps

Phase 1: Build

Compile and bundle the application.

Phase 2: Test

Run unit and integration tests.

Phase 3: Deploy

Deploy to staging environment. """ ), ]

def test_consensus_calculator(): """Test consensus calculation independently.""" print("\n" + "="*60) print("Testing ConsensusCalculator") print("="*60)

calc = ConsensusCalculator()

# Test 1: High-confidence unanimous votes
print("\n1. High-confidence unanimous votes:")
votes = [
AnalystVote(agent='a1', classification='agent', confidence=0.95, reasoning='match', duration_ms=5),
AnalystVote(agent='a2', classification='agent', confidence=0.90, reasoning='match', duration_ms=5),
AnalystVote(agent='a3', classification='agent', confidence=0.92, reasoning='match', duration_ms=5),
AnalystVote(agent='a4', classification='agent', confidence=0.88, reasoning='match', duration_ms=5),
AnalystVote(agent='a5', classification='agent', confidence=0.91, reasoning='match', duration_ms=5),
]
result = calc.calculate_from_votes(votes)
print(f" Classification: {result.classification}")
print(f" Confidence: {result.confidence:.1%}")
print(f" Agreement: {result.agreement_ratio:.0%}")
print(f" Status: {result.approval_type.value}")
assert result.approval_type == ApprovalType.AUTO_APPROVED, "Should auto-approve"

# Test 2: Split votes (needs judge review - moderate confidence)
print("\n2. Moderate confidence votes (needs judge review):")
votes = [
AnalystVote(agent='a1', classification='agent', confidence=0.80, reasoning='match', duration_ms=5),
AnalystVote(agent='a2', classification='agent', confidence=0.78, reasoning='match', duration_ms=5),
AnalystVote(agent='a3', classification='agent', confidence=0.75, reasoning='match', duration_ms=5),
AnalystVote(agent='a4', classification='agent', confidence=0.72, reasoning='match', duration_ms=5),
AnalystVote(agent='a5', classification='command', confidence=0.65, reasoning='match', duration_ms=5),
]
result = calc.calculate_from_votes(votes)
print(f" Classification: {result.classification}")
print(f" Confidence: {result.confidence:.1%}")
print(f" Agreement: {result.agreement_ratio:.0%}")
print(f" Status: {result.approval_type.value}")
assert result.approval_type == ApprovalType.PENDING, "Should need judge review"

# Test 2b: Low confidence split (should escalate)
print("\n2b. Low confidence split votes (should escalate):")
votes = [
AnalystVote(agent='a1', classification='agent', confidence=0.60, reasoning='match', duration_ms=5),
AnalystVote(agent='a2', classification='agent', confidence=0.55, reasoning='match', duration_ms=5),
AnalystVote(agent='a3', classification='agent', confidence=0.50, reasoning='match', duration_ms=5),
AnalystVote(agent='a4', classification='command', confidence=0.70, reasoning='match', duration_ms=5),
AnalystVote(agent='a5', classification='command', confidence=0.65, reasoning='match', duration_ms=5),
]
result = calc.calculate_from_votes(votes)
print(f" Classification: {result.classification}")
print(f" Confidence: {result.confidence:.1%}")
print(f" Agreement: {result.agreement_ratio:.0%}")
print(f" Status: {result.approval_type.value}")
print(f" Escalation: {result.escalation_reason}")
assert result.approval_type == ApprovalType.ESCALATED, "Should escalate due to low confidence"

# Test 3: Detailed breakdown
print("\n3. Vote breakdown:")
breakdown = calc.get_detailed_breakdown(votes)
for cls, info in breakdown['distribution'].items():
print(f" {cls}: {info['vote_count']} votes, {info['normalized']:.0%} weight")

print("\n ConsensusCalculator: PASSED")

def test_orchestrator_single(): """Test orchestrator with single document.""" print("\n" + "="*60) print("Testing MoEOrchestrator - Single Document") print("="*60)

# Create orchestrator with real analysts and judges
orchestrator = create_default_orchestrator()
docs = create_test_documents()

# Test each document type
for doc in docs:
doc_type = doc.frontmatter.get('type', 'unknown')
print(f"\n Processing: {doc.path.name} (expected: {doc_type})")

start = time.time()
result = orchestrator.classify(doc)
elapsed = (time.time() - start) * 1000

status_icon = "✓" if result.result.classification == doc_type else "✗"
print(f" {status_icon} Result: {result.result.classification}")
print(f" Confidence: {result.result.confidence:.1%}")
print(f" Agreement: {result.result.agreement_ratio:.0%}")
print(f" Status: {result.result.approval_type.value}")
print(f" Time: {elapsed:.0f}ms")

print("\n Single Document Tests: PASSED")

def test_orchestrator_batch(): """Test orchestrator with batch processing.""" print("\n" + "="*60) print("Testing MoEOrchestrator - Batch Processing") print("="*60)

orchestrator = create_default_orchestrator()
orchestrator.reset_stats() # Start fresh

docs = create_test_documents()

def progress_callback(current, total):
print(f"\r Progress: {current}/{total}", end="", flush=True)

print(f"\n Processing {len(docs)} documents...")
start = time.time()
results = orchestrator.classify_batch(docs, progress_callback)
elapsed = (time.time() - start) * 1000

print() # New line after progress

# Summarize results
correct = 0
for doc, result in zip(docs, results):
expected = doc.frontmatter.get('type', 'unknown')
if result.result.classification == expected:
correct += 1

print(f"\n Results:")
print(f" Total: {len(results)}")
print(f" Correct: {correct}/{len(results)} ({correct/len(results):.0%})")
print(f" Time: {elapsed:.0f}ms ({elapsed/len(results):.0f}ms avg)")

# Show stats
stats = orchestrator.get_stats()
print(f"\n Statistics:")
print(f" Auto-approved: {stats['auto_approved']}")
print(f" Judge-approved: {stats['judge_approved']}")
print(f" Escalated: {stats['escalated']}")
print(f" Approval rate: {stats['approval_rate']:.0%}")
print(f" Avg analyst time: {stats['avg_analyst_time_ms']:.1f}ms")
print(f" Avg judge time: {stats['avg_judge_time_ms']:.1f}ms")

print("\n Batch Processing: PASSED")

def test_orchestrator_edge_cases(): """Test orchestrator edge cases and error handling.""" print("\n" + "="*60) print("Testing MoEOrchestrator - Edge Cases") print("="*60)

orchestrator = create_default_orchestrator()

# Test 1: Minimal document
print("\n1. Minimal document (no frontmatter):")
doc = Document(
path=Path('/test/unknown.md'),
content="# Just a Title\n\nSome content without structure."
)
result = orchestrator.classify(doc)
print(f" Classification: {result.result.classification}")
print(f" Status: {result.result.approval_type.value}")

# Test 2: Empty content
print("\n2. Nearly empty document:")
doc = Document(
path=Path('/test/empty.md'),
content="# Empty"
)
result = orchestrator.classify(doc)
print(f" Classification: {result.result.classification}")
print(f" Status: {result.result.approval_type.value}")

# Test 3: Ambiguous document
print("\n3. Ambiguous document (mixed signals):")
doc = Document(
path=Path('/test/agents/but-is-guide.md'),
content="""---

title: Tutorial on Agents type: guide

How to Create an Agent

Prerequisites

Learn about agents.

Step 1: Understanding Agents

This agent specializes in...

Capabilities

  • Task execution """ ) result = orchestrator.classify(doc) print(f" Classification: {result.result.classification}") print(f" Confidence: {result.result.confidence:.1%}") print(f" Agreement: {result.result.agreement_ratio:.0%}") print(f" Status: {result.result.approval_type.value}") if result.result.escalation_reason: print(f" Escalation: {result.result.escalation_reason}")

    print("\n Edge Cases: PASSED")

def test_full_pipeline(): """Full integration test of the classification pipeline.""" print("\n" + "="*60) print("Full Pipeline Integration Test") print("="*60)

# Get component counts
analysts = get_all_analysts()
judges = get_all_judges()

print(f"\n Components:")
print(f" Analysts: {len(analysts)} ({', '.join(a.name for a in analysts)})")
print(f" Judges: {len(judges)} ({', '.join(j.name for j in judges)})")

# Create orchestrator
orchestrator = create_default_orchestrator()

# Run a complete classification
doc = Document(
path=Path('/test/agents/orchestrator-agent.md'),
content="""---

title: Orchestrator Agent type: agent tags: [agent, orchestrator, ai] created: 2025-12-27

Orchestrator Agent

Multi-agent coordination specialist for complex workflows.

Role

Coordinate multiple specialized agents to achieve complex goals.

Capabilities

  • Multi-agent task delegation
  • Workflow orchestration
  • Result aggregation

Tools

  • Task tool for agent spawning
  • TodoWrite for progress tracking

Invocation

Task(subagent_type="orchestrator", prompt="coordinate deployment")

""" )

print(f"\n   Classifying: {doc.path}")

result = orchestrator.classify(doc)

print(f"\n RESULT:")
print(f" Classification: {result.result.classification}")
print(f" Confidence: {result.result.confidence:.1%}")
print(f" Agreement: {result.result.agreement_ratio:.0%}")
print(f" Approval: {result.result.approval_type.value}")
print(f" Processing Time: {result.processing_time_ms}ms")

print(f"\n Analyst Votes:")
for vote in result.result.votes:
print(f" {vote.agent}: {vote.classification} ({vote.confidence:.0%})")

print(f"\n Judge Decisions:")
for decision in result.result.judge_decisions:
status = "APPROVED" if decision.approved else "REJECTED"
print(f" {decision.judge}: {status} - {decision.reason[:50]}...")

# Verify correct classification
expected = 'agent'
if result.result.classification == expected:
print(f"\n ✓ Correct classification!")
else:
print(f"\n ✗ Expected '{expected}', got '{result.result.classification}'")

print("\n Full Pipeline: PASSED")

def main(): """Run all orchestrator tests.""" print("="*60) print("MoE Orchestration Engine Tests") print("="*60)

try:
test_consensus_calculator()
test_orchestrator_single()
test_orchestrator_batch()
test_orchestrator_edge_cases()
test_full_pipeline()

print("\n" + "="*60)
print("ALL TESTS PASSED")
print("="*60)
print("\nOrchestration Engine Components:")
print(" 1. ConsensusCalculator - Weighted voting and threshold logic")
print(" 2. MoEOrchestrator - Full pipeline coordination")
print(" 3. Parallel execution - ThreadPoolExecutor for analysts/judges")
print(" 4. Batch processing - Multiple documents with progress")
print(" 5. Error handling - Graceful degradation on failures")

except AssertionError as e:
print(f"\nTEST FAILED: {e}")
return 1
except Exception as e:
print(f"\nERROR: {e}")
import traceback
traceback.print_exc()
return 1

return 0

if name == 'main': sys.exit(main())