Skip to main content

ADR-028: CODI2 Separation of Concerns Architecture (v4) - Part 3: Comprehensive Testing

Document Specification Block​

Document: ADR-028-v4-codi2-separation-of-concerns-part3-testing
Version: 1.0.0
Purpose: Comprehensive testing strategy for CODI2 race-free architecture
Audience: QA Engineers, Developers, AI Test Agents
Date Created: 2025-09-06
Date Modified: 2025-09-06
Status: DRAFT

Table of Contents​

  1. Testing Philosophy
  2. Unit Tests (100% Coverage)
  3. Integration Tests (100% Coverage)
  4. Critical Path Tests (100% Coverage)
  5. Race Condition Tests
  6. Chaos Engineering Tests
  7. Performance Tests
  8. Test Execution Strategy

1. Testing Philosophy​

CODI2's testing must prove the elimination of all 23 identified race conditions. Every test should verify:

  • Correctness: Does it work as designed?
  • Concurrency: Is it race-free under load?
  • Resilience: Does it recover from failures?
  • Performance: Does it meet latency targets?

Coverage Requirements​

  • Unit Tests: 100% coverage (no exceptions)
  • Integration Tests: 100% coverage (no exceptions)
  • Critical Path Tests: 100% coverage (already required)

Rationale: CODI2 is too critical to accept partial coverage. Every line of code must be tested.

2. Unit Tests (100% Coverage)​

2.1 Audit Logger Tests​

#[cfg(test)]
mod audit_tests {
use super::*;

#[tokio::test]
async fn test_audit_buffer_management() {
let logger = setup_test_logger();

// Fill buffer to capacity - 1
for i in 0..99 {
logger.log(create_test_event(i)).await.unwrap();
}

// Verify no flush yet
assert_eq!(logger.buffer.lock().await.len(), 99);

// Add one more to trigger flush
logger.log(create_test_event(99)).await.unwrap();

// Verify buffer was flushed
assert_eq!(logger.buffer.lock().await.len(), 0);
}

#[tokio::test]
async fn test_concurrent_audit_writes() {
let logger = Arc::new(setup_test_logger());
let handles: Vec<_> = (0..1000).map(|i| {
let logger = logger.clone();
tokio::spawn(async move {
logger.log(create_test_event(i)).await.unwrap();
})
}).collect();

futures::future::join_all(handles).await;
logger.flush().await.unwrap();

// Verify all events were written
let events = read_all_events().await;
assert_eq!(events.len(), 1000);
}

#[tokio::test]
async fn test_audit_persistence_on_crash() {
let logger = setup_test_logger();

// Add events
for i in 0..50 {
logger.log(create_test_event(i)).await.unwrap();
}

// Simulate crash (drop without flush)
drop(logger);

// Create new logger and force flush
let new_logger = setup_test_logger();
new_logger.flush().await.unwrap();

// Verify events persisted
assert!(read_all_events().await.len() >= 50);
}
}

2.2 Message Bus Tests​

#[cfg(test)]
mod message_bus_tests {
#[tokio::test]
async fn test_delivery_guarantees() {
let bus = MessageBus::new();
let agent = AgentId("test".into());
let mut rx = bus.register(agent.clone()).await.unwrap();

// Test at-least-once delivery
let msg = create_test_message();
bus.send_with_pattern(
msg.clone(),
RoutingPattern::Direct { target: agent.clone() },
DeliveryGuarantee::AtLeastOnce {
max_retries: 3,
retry_delay: Duration::from_millis(10)
}
).await.unwrap();

let received = rx.recv().await.unwrap();
assert_eq!(received.id, msg.id);
}

#[tokio::test]
async fn test_exactly_once_deduplication() {
let bus = MessageBus::new();
let agent = AgentId("test".into());
let mut rx = bus.register(agent.clone()).await.unwrap();

let msg = create_test_message();
let idempotency_key = Uuid::new_v4();

// Send same message twice
for _ in 0..2 {
bus.send_with_pattern(
msg.clone(),
RoutingPattern::Direct { target: agent.clone() },
DeliveryGuarantee::ExactlyOnce {
idempotency_key,
dedup_window: Duration::from_secs(60),
}
).await.unwrap();
}

// Should only receive once
rx.recv().await.unwrap();
assert!(rx.try_recv().is_err());
}

#[tokio::test]
async fn test_topic_subscription() {
let bus = MessageBus::new();
let mut receivers = vec![];

// Register 3 agents to same topic
for i in 0..3 {
let agent = AgentId(format!("agent-{}", i));
let rx = bus.register(agent.clone()).await.unwrap();
bus.subscribe(agent, MessageTopic::StatusUpdate).await.unwrap();
receivers.push(rx);
}

// Publish to topic
let msg = create_status_message();
bus.send_with_pattern(
msg.clone(),
RoutingPattern::Topic { topic: MessageTopic::StatusUpdate },
DeliveryGuarantee::BestEffort,
).await.unwrap();

// All should receive
for mut rx in receivers {
let received = rx.recv().await.unwrap();
assert_eq!(received.id, msg.id);
}
}
}

2.3 State Store Tests​

#[cfg(test)]
mod state_store_tests {
#[tokio::test]
async fn test_optimistic_locking() {
let store = setup_test_store();
let task = create_test_task("task-1", 0);

// Initial write
store.update_task(task.clone()).await.unwrap();

// Concurrent updates
let mut task1 = store.get_task("task-1").await.unwrap().unwrap();
let mut task2 = task1.clone();

// Both read version 1
assert_eq!(task1.version, 1);
assert_eq!(task2.version, 1);

// First update succeeds
task1.status = TaskStatus::InProgress { started_at: Utc::now() };
store.update_task(task1).await.unwrap();

// Second update fails
task2.status = TaskStatus::Completed { completed_at: Utc::now() };
let result = store.update_task(task2).await;
assert!(matches!(result, Err(Codi2Error::StateConflict { .. })));
}
}

3. Integration Tests (100% Coverage)​

3.1 End-to-End Flow Tests​

#[tokio::test]
async fn test_task_assignment_flow() {
let (audit, bus, state) = setup_integrated_system().await;

// Register orchestrator and agent
let orch = AgentId("orchestrator".into());
let agent = AgentId("agent-1".into());
let mut agent_rx = bus.register(agent.clone()).await.unwrap();

// Create task in state
let task = TaskState::new("task-1");
state.update_task(task).await.unwrap();

// Send assignment via message bus
let assign_msg = AgentMessage {
payload: MessagePayload::AssignTask {
task_id: "task-1".into(),
deadline: Utc::now() + Duration::from_secs(3600),
requirements: Default::default(),
},
// ...
};

bus.send_with_pattern(
assign_msg,
RoutingPattern::Direct { target: agent.clone() },
DeliveryGuarantee::AtLeastOnce { max_retries: 3, retry_delay: Duration::from_millis(100) },
).await.unwrap();

// Agent receives message
let msg = agent_rx.recv().await.unwrap();

// Agent updates state
let mut task = state.get_task("task-1").await.unwrap().unwrap();
task.status = TaskStatus::Assigned { agent_id: agent.0.clone() };
state.update_task(task).await.unwrap();

// Log audit event
audit.log(AuditEvent::TaskCompleted {
meta: EventMetadata::new("test".into(), "test".into()),
task_id: "task-1".into(),
duration_ms: 100,
result: TaskResult::Success,
}).await.unwrap();

audit.flush().await.unwrap();

// Verify complete flow
let final_task = state.get_task("task-1").await.unwrap().unwrap();
assert!(matches!(final_task.status, TaskStatus::Assigned { .. }));
}

4. Critical Path Tests (100% Coverage)​

#[tokio::test]
async fn critical_path_message_delivery() {
// Test every possible message delivery scenario
for guarantee in [DeliveryGuarantee::BestEffort,
DeliveryGuarantee::AtLeastOnce { .. },
DeliveryGuarantee::ExactlyOnce { .. }] {
for pattern in [RoutingPattern::Direct { .. },
RoutingPattern::Topic { .. },
RoutingPattern::Broadcast { .. }] {
test_delivery_scenario(guarantee, pattern).await;
}
}
}

#[tokio::test]
async fn critical_path_state_consistency() {
// Test all state transitions maintain consistency
let states = [TaskStatus::Pending,
TaskStatus::Assigned { .. },
TaskStatus::InProgress { .. },
TaskStatus::Completed { .. },
TaskStatus::Failed { .. }];

for from in &states {
for to in &states {
test_state_transition(from, to).await;
}
}
}

5. Race Condition Tests​

#[tokio::test]
async fn test_no_race_in_concurrent_writes() {
let system = setup_integrated_system().await;

// Launch 1000 concurrent operations
let handles: Vec<_> = (0..1000).map(|i| {
let system = system.clone();
tokio::spawn(async move {
// Interleave all operations
let agent = AgentId(format!("agent-{}", i % 10));
let task = format!("task-{}", i % 20);

// Concurrent: register, send, update, audit
tokio::join!(
system.bus.register(agent.clone()),
system.state.update_task(create_task(&task)),
system.audit.log(create_event(i)),
system.bus.send_with_pattern(create_msg(), create_pattern(), create_guarantee())
);
})
}).collect();

futures::future::join_all(handles).await;

// Verify no data loss or corruption
verify_system_integrity(&system).await;
}

6. Chaos Engineering Tests​

#[tokio::test]
async fn test_resilience_under_chaos() {
let system = setup_integrated_system().await;

// Start normal operations
let ops_handle = tokio::spawn(run_normal_operations(system.clone()));

// Inject chaos
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(rand::thread_rng().gen_range(10..100))).await;

match rand::thread_rng().gen_range(0..5) {
0 => simulate_network_partition(),
1 => simulate_process_crash(),
2 => simulate_disk_full(),
3 => simulate_cpu_spike(),
4 => simulate_memory_pressure(),
_ => {}
}
}
});

// Run for 60 seconds
tokio::time::sleep(Duration::from_secs(60)).await;

// Verify system recovered and no data lost
verify_system_integrity(&system).await;
}

7. Performance Tests​

#[tokio::test]
async fn test_performance_targets() {
let system = setup_integrated_system().await;
let mut latencies = PerformanceMetrics::new();

// Warm up
run_operations(&system, 1000).await;

// Measure
for _ in 0..10000 {
let start = Instant::now();
let msg = create_test_message();
system.bus.send_with_pattern(msg, Direct { .. }, BestEffort).await.unwrap();
latencies.record_message(start.elapsed());

let start = Instant::now();
system.state.update_task(create_task()).await.unwrap();
latencies.record_state(start.elapsed());

let start = Instant::now();
system.audit.log(create_event()).await.unwrap();
latencies.record_audit(start.elapsed());
}

// Verify targets
assert!(latencies.message_p99() < Duration::from_micros(100));
assert!(latencies.state_p99() < Duration::from_millis(5));
assert!(latencies.audit_p99() < Duration::from_millis(10));
}

8. Test Execution Strategy​

CI/CD Pipeline​

test-stages:
- unit-tests:
parallel: true
timeout: 5m
coverage-threshold: 100%
fail-on-uncovered: true

- integration-tests:
parallel: false
timeout: 15m
coverage-threshold: 100%
fail-on-uncovered: true

- critical-path-tests:
parallel: false
timeout: 10m
coverage-threshold: 100%
must-pass: true

- race-condition-tests:
parallel: false
timeout: 30m
iterations: 10

- chaos-tests:
environment: staging
timeout: 60m

- performance-tests:
environment: perf
timeout: 30m
benchmarks-required: true

Test Data Management​

  • Use test containers for FDB
  • Isolated workspaces per test
  • Automatic cleanup after tests
  • Deterministic test data generation

Version History​

  • 1.0.0 (2025-09-06): Initial comprehensive test suite

Approval​

QA Lead: ___________________ Date: ___________
Technical Lead: ___________________ Date: ___________
Test Automation Team: ___________________ Date: ___________