ADR-031: CODI2 and Monitor Service Integration - Part 3 (Testing)
Document Specification Block​
Document: ADR-031-v4-codi2-monitor-integration-part3-testing
Version: 1.1.0
Purpose: Comprehensive testing strategy for CODI2 and codi-monitor integration
Audience: QA Engineers, Developers, DevOps
Date Created: 2025-09-28
Date Modified: 2025-09-28
Date Released: 2025-09-28
Status: DRAFT
QA Reviewed: 2025-09-28 - APPROVED WITH MINOR REVISIONS
Table of Contents​
- Testing Philosophy
- Test Categories
- Unit Testing Strategy
- Integration Testing
- Performance Testing
- Failure Scenario Testing
- Environment Testing
- Security Testing
- Acceptance Criteria
- Test Automation
- Approval Signatures
- Version History
Testing Philosophy​
Core Principles​
- Integration First: Focus on the interaction between CODI2 and monitor
- Failure Resilience: Test degraded states extensively
- Environment Parity: Same tests must pass locally and in containers
- Performance Baseline: Establish and maintain performance benchmarks
- Automation Required: All tests must be automatable
Coverage Requirements​
- Unit Tests: 90% minimum
- Integration Tests: 85% minimum
- Critical Paths: 100% coverage
- Failure Scenarios: 95% coverage
Test Categories​
Test Matrix Overview​
| Category | Focus | Coverage Target | Automation |
|---|---|---|---|
| Unit | Individual components | 90% | CI/CD |
| Integration | IPC communication | 85% | CI/CD |
| Performance | Latency & throughput | Baselines | Nightly |
| Failure | Degraded operation | 95% | CI/CD |
| Environment | Local vs container | 100% | CI/CD |
| Security | Auth & validation | 100% | Weekly |
| E2E | Full workflow | Critical paths | Release |
Unit Testing Strategy​
1. Protocol Tests​
#[cfg(test)]
mod protocol_tests {
use super::*;
use uuid::Uuid;
use chrono::Utc;
#[test]
fn test_message_serialization() {
let msg = Message {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: MessagePayload::LogEntry {
level: LogLevel::Info,
component: "test".to_string(),
message: "Test message".to_string(),
metadata: None,
},
};
// Test MessagePack serialization
let encoded = rmp_serde::to_vec(&msg).unwrap();
let decoded: Message = rmp_serde::from_slice(&encoded).unwrap();
assert_eq!(msg.id, decoded.id);
match decoded.payload {
MessagePayload::LogEntry { message, .. } => {
assert_eq!(message, "Test message");
}
_ => panic!("Wrong payload type"),
}
}
#[test]
fn test_message_size_limits() {
let large_message = "x".repeat(2 * 1024 * 1024); // 2MB
let msg = Message {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: MessagePayload::LogEntry {
level: LogLevel::Info,
component: "test".to_string(),
message: large_message,
metadata: None,
},
};
let encoded = rmp_serde::to_vec(&msg).unwrap();
assert!(encoded.len() > 1024 * 1024); // Should exceed 1MB limit
}
}
2. Client Tests​
#[cfg(test)]
mod client_tests {
use super::*;
use tokio::test;
#[test]
async fn test_standalone_mode() {
// No monitor running
let client = MonitorClient::connect().await.unwrap();
assert!(!client.is_connected());
// Should not error when sending
let msg = create_test_message();
assert!(client.send(msg).await.is_ok());
}
#[test]
async fn test_reconnection() {
let mut client = MonitorClient::connect().await.unwrap();
// Simulate disconnection
client.connection = None;
// Should handle gracefully
let msg = create_test_message();
assert!(client.send(msg).await.is_ok());
}
}
3. Storage Tests​
#[cfg(test)]
mod storage_tests {
use super::*;
use rusqlite::Connection;
use tempfile::tempdir;
#[test]
fn test_concurrent_access() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
// Multiple connections with WAL mode
let conn1 = Connection::open(&db_path).unwrap();
let conn2 = Connection::open(&db_path).unwrap();
conn1.execute("PRAGMA journal_mode = WAL", []).unwrap();
// Concurrent writes should work
conn1.execute(
"INSERT INTO logs (timestamp, level, component, message)
VALUES (?, ?, ?, ?)",
params![Utc::now().timestamp(), "INFO", "test", "msg1"],
).unwrap();
conn2.execute(
"INSERT INTO logs (timestamp, level, component, message)
VALUES (?, ?, ?, ?)",
params![Utc::now().timestamp(), "INFO", "test", "msg2"],
).unwrap();
// Verify both writes succeeded
let count: i64 = conn1
.query_row("SELECT COUNT(*) FROM logs", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 2);
}
}
Integration Testing​
1. IPC Communication Tests​
#[tokio::test]
async fn test_unix_socket_communication() {
let dir = tempdir().unwrap();
let socket_path = dir.path().join("test.sock");
// Start server
let (tx, mut rx) = mpsc::channel(10);
let server = Arc::new(IpcServer {
unix_path: Some(socket_path.to_str().unwrap().to_string()),
tcp_port: None,
message_tx: tx,
});
let server_handle = tokio::spawn(async move {
server.start().await
});
// Wait for server to start
tokio::time::sleep(Duration::from_millis(100)).await;
// Connect client
let mut client = MonitorClient::connect_to_unix(&socket_path).await.unwrap();
// Send message
let msg = create_test_log_message();
client.send(msg.clone()).await.unwrap();
// Verify receipt
let received = rx.recv().await.unwrap();
assert_eq!(received.id, msg.id);
}
#[tokio::test]
async fn test_tcp_socket_communication() {
let port = 19847; // Test port
// Start TCP server
let (tx, mut rx) = mpsc::channel(10);
let server = Arc::new(IpcServer {
unix_path: None,
tcp_port: Some(port),
message_tx: tx,
});
let server_handle = tokio::spawn(async move {
server.start().await
});
tokio::time::sleep(Duration::from_millis(100)).await;
// Connect TCP client
let mut client = MonitorClient::connect_to_tcp("127.0.0.1", port).await.unwrap();
// Send and verify
let msg = create_test_log_message();
client.send(msg.clone()).await.unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received.id, msg.id);
}
2. Full Workflow Tests​
#[tokio::test]
async fn test_log_command_workflow() {
// Setup
let test_env = TestEnvironment::new().await;
// Execute CODI2 log command
let output = Command::new("codi2")
.args(&["log", "Test message", "--action", "TEST"])
.output()
.await
.unwrap();
assert!(output.status.success());
// Verify local storage
let logs = test_env.query_logs("message = 'Test message'").await;
assert_eq!(logs.len(), 1);
assert_eq!(logs[0].action, "TEST");
// Verify monitor received it
if test_env.monitor_connected() {
let monitor_logs = test_env.monitor.get_received_logs().await;
assert_eq!(monitor_logs.len(), 1);
}
}
#[tokio::test]
async fn test_export_workflow() {
let test_env = TestEnvironment::new().await;
// Create export file
let export_path = test_env.workspace.join("2025-09-28-EXPORT-test.txt");
fs::write(&export_path, "Test export content").await.unwrap();
// Wait for processing
tokio::time::sleep(Duration::from_secs(2)).await;
// Verify file was moved
assert!(!export_path.exists());
// Verify in archive
let archives = test_env.query_export_archive().await;
assert_eq!(archives.len(), 1);
assert!(archives[0].archive_path.contains("20250928"));
}
Performance Testing​
1. Latency Benchmarks​
#[bench]
fn bench_ipc_roundtrip(b: &mut Bencher) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let test_env = runtime.block_on(TestEnvironment::new());
b.iter(|| {
runtime.block_on(async {
let msg = create_test_log_message();
let start = Instant::now();
test_env.client.send(msg).await.unwrap();
let response = test_env.client.receive().await.unwrap();
let duration = start.elapsed();
assert!(duration < Duration::from_millis(1));
duration
})
});
}
#[bench]
fn bench_sqlite_write(b: &mut Bencher) {
let conn = Connection::open_in_memory().unwrap();
setup_schema(&conn);
b.iter(|| {
conn.execute(
"INSERT INTO logs (timestamp, level, component, message)
VALUES (?, ?, ?, ?)",
params![
Utc::now().timestamp(),
"INFO",
"bench",
"Benchmark message"
],
).unwrap()
});
}
2. Throughput Tests​
#[tokio::test]
async fn test_high_volume_logging() {
let test_env = TestEnvironment::new().await;
let message_count = 100_000;
let start = Instant::now();
// Send messages concurrently
let tasks: Vec<_> = (0..message_count)
.map(|i| {
let mut client = test_env.client.clone();
tokio::spawn(async move {
let msg = create_test_log_message_with_id(i);
client.send(msg).await.unwrap();
})
})
.collect();
futures::future::join_all(tasks).await;
let duration = start.elapsed();
let rate = message_count as f64 / duration.as_secs_f64();
println!("Throughput: {:.0} messages/second", rate);
assert!(rate > 50_000.0); // Minimum 50k msgs/sec
}
3. Resource Usage Tests​
#[test]
fn test_memory_usage() {
let test_env = TestEnvironment::new_blocking();
// Baseline memory
let baseline = get_process_memory();
// Generate load
for i in 0..1_000_000 {
test_env.log_sync(&format!("Message {}", i));
}
// Check memory growth
let final_memory = get_process_memory();
let growth_mb = (final_memory - baseline) / 1024 / 1024;
println!("Memory growth: {} MB", growth_mb);
assert!(growth_mb < 100); // Less than 100MB growth
}
Failure Scenario Testing​
1. Monitor Unavailability​
#[tokio::test]
async fn test_monitor_down_scenario() {
// Start without monitor
let output = Command::new("codi2")
.args(&["log", "Monitor is down"])
.output()
.await
.unwrap();
assert!(output.status.success());
assert!(String::from_utf8_lossy(&output.stdout)
.contains("Log saved locally"));
// Verify local storage
let conn = Connection::open(".codi/db/codi.db").unwrap();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM logs WHERE message = 'Monitor is down'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 1);
}
2. Connection Loss​
#[tokio::test]
async fn test_connection_loss_recovery() {
let mut test_env = TestEnvironment::new().await;
// Send initial message
test_env.send_log("Before disconnect").await;
// Kill monitor
test_env.stop_monitor().await;
// Should continue working
test_env.send_log("During disconnect").await;
// Restart monitor
test_env.start_monitor().await;
tokio::time::sleep(Duration::from_secs(1)).await;
// Should reconnect and sync
test_env.send_log("After reconnect").await;
// Verify all messages eventually synced
let synced = test_env.get_synced_logs().await;
assert_eq!(synced.len(), 3);
}
3. Resource Exhaustion​
#[tokio::test]
async fn test_disk_full_handling() {
// Create small ramdisk to simulate full disk
let ramdisk = create_ramdisk(10 * 1024 * 1024); // 10MB
let test_env = TestEnvironment::with_path(&ramdisk).await;
// Fill disk with logs
let mut count = 0;
loop {
match test_env.send_log(&format!("Message {}", count)).await {
Ok(_) => count += 1,
Err(e) if e.to_string().contains("No space left") => break,
Err(e) => panic!("Unexpected error: {}", e),
}
}
// Should handle gracefully
assert!(count > 0);
println!("Handled {} messages before disk full", count);
// Verify error reporting
let output = Command::new("codi2")
.args(&["status"])
.output()
.await
.unwrap();
assert!(String::from_utf8_lossy(&output.stdout)
.contains("Storage warning"));
}
Environment Testing​
1. Local vs Container Comparison​
#[test]
fn test_environment_detection() {
// Test detection logic
std::env::set_var("KUBERNETES_SERVICE_HOST", "10.0.0.1");
assert_eq!(detect_environment(), Environment::Kubernetes);
std::env::remove_var("KUBERNETES_SERVICE_HOST");
// Docker detection
let dockerenv = Path::new("/.dockerenv");
if dockerenv.exists() {
assert_eq!(detect_environment(), Environment::Docker);
} else {
assert_eq!(detect_environment(), Environment::Local);
}
}
2. Container Integration​
# docker-compose.test.yml
version: '3.8'
services:
codi-monitor:
build:
context: .
dockerfile: Dockerfile.monitor
environment:
CODI_MONITOR_MODE: tcp
CODI_MONITOR_PORT: 9847
volumes:
- codi-data:/var/lib/codi
codi2-test:
build:
context: .
dockerfile: dockerfile.test
depends_on:
- codi-monitor
environment:
CODI_MONITOR_HOST: codi-monitor
CODI_MONITOR_PORT: 9847
command: cargo test --test container_integration
volumes:
codi-data:
3. Multi-Platform Tests​
#[cfg_attr(not(unix), ignore)]
#[test]
fn test_unix_socket_permissions() {
use std::os::unix::fs::PermissionsExt;
let socket_path = "/tmp/test-codi.sock";
// Create socket file
File::create(socket_path).unwrap();
// Set secure permissions
std::fs::set_permissions(
socket_path,
Permissions::from_mode(0o600),
).unwrap();
// Verify
let metadata = std::fs::metadata(socket_path).unwrap();
let mode = metadata.permissions().mode();
assert_eq!(mode & 0o777, 0o600);
}
Additional Test Scenarios​
1. Network Partition Testing​
#[tokio::test]
async fn test_network_partition() {
let test_env = TestEnvironment::new().await;
// Send initial messages
test_env.send_log("Before partition").await;
// Simulate network partition
test_env.block_tcp_port(9847).await;
// Messages should queue locally
test_env.send_log("During partition").await;
test_env.send_log("Still partitioned").await;
// Restore connectivity
test_env.unblock_tcp_port(9847).await;
// Wait for reconnection and sync
tokio::time::sleep(Duration::from_secs(5)).await;
// Verify all messages eventually delivered
let logs = test_env.query_monitor_logs().await;
assert_eq!(logs.len(), 3);
}
2. Clock Skew Testing​
#[test]
fn test_clock_skew_handling() {
let mut test_env = TestEnvironment::new_blocking();
// Set clock 1 hour ahead
test_env.set_system_time_offset(Duration::from_secs(3600));
// Log entry with future timestamp
test_env.send_log("Future message").unwrap();
// Reset clock
test_env.reset_system_time();
// Log normal entry
test_env.send_log("Normal message").unwrap();
// Verify both stored with corrected timestamps
let logs = test_env.query_logs_ordered().unwrap();
assert_eq!(logs.len(), 2);
// Timestamps should be monotonic despite skew
assert!(logs[0].timestamp <= logs[1].timestamp);
}
3. Upgrade Compatibility Testing​
#[test]
fn test_version_compatibility() {
// Test old client with new monitor
let old_msg = MessageV1 {
id: Uuid::new_v4(),
payload: "old format",
};
let encoded = encode_v1(&old_msg);
let result = decode_current(&encoded);
assert!(result.is_ok(), "Should handle v1 messages");
// Test new client with old monitor
let new_msg = Message {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: MessagePayload::LogEntry { /* ... */ },
version: 2,
};
let result = send_to_v1_monitor(&new_msg);
assert!(result.is_ok(), "Should degrade gracefully");
}
Security Testing​
1. Input Validation​
#[test]
fn test_message_injection_prevention() {
let malicious_inputs = vec![
"'; DROP TABLE logs; --",
"../../../etc/passwd",
"\0\0\0\0",
&"x".repeat(10_000_000), // 10MB string
];
for input in malicious_inputs {
let result = validate_log_message(input);
assert!(result.is_err() || result.unwrap() != input);
}
}
2. Authentication Tests​
#[tokio::test]
async fn test_tcp_authentication() {
let test_env = TestEnvironment::new().await;
// Try without auth token
let mut client = TcpStream::connect("127.0.0.1:9847").await.unwrap();
let msg = create_test_message();
let result = send_raw_message(&mut client, &msg).await;
assert!(result.is_err() ||
result.unwrap().contains("Authentication required"));
// With valid token
let token = test_env.generate_auth_token();
let mut auth_client = MonitorClient::connect_with_auth(token).await.unwrap();
assert!(auth_client.send(msg).await.is_ok());
}
Acceptance Criteria​
Functional Requirements​
| ID | Requirement | Test Coverage |
|---|---|---|
| F1 | CODI2 commands work without monitor | ✓ Standalone tests |
| F2 | Automatic reconnection | ✓ Recovery tests |
| F3 | Export file detection < 500ms | ✓ Performance tests |
| F4 | Log shipping to Server Hub | ✓ Integration tests |
| F5 | Session tracking | ✓ Workflow tests |
Non-Functional Requirements​
| ID | Requirement | Test Coverage |
|---|---|---|
| P1 | IPC latency < 1ms | ✓ Benchmarks |
| P2 | 100k logs/second | ✓ Throughput tests |
| P3 | Memory < 512MB | ✓ Resource tests |
| S1 | Secure IPC | ✓ Security tests |
| S2 | Input validation | ✓ Injection tests |
Test Execution Matrix​
#!/bin/bash
# run-all-tests.sh
echo "Running CODI2-Monitor Integration Tests"
# Unit tests
cargo test --package codi2 --lib
cargo test --package codi-monitor --lib
# Integration tests
cargo test --test integration
# Performance benchmarks
cargo bench --bench ipc_performance
# Container tests
docker-compose -f docker-compose.test.yml up --abort-on-container-exit
# Security scan
cargo audit
cargo clippy -- -D warnings
# Coverage report
cargo tarpaulin --out Html --output-dir coverage/
Performance Baselines​
Established Baselines​
Based on prototype testing and benchmarks:
| Metric | Baseline | Target | Notes |
|---|---|---|---|
| IPC Latency (Unix) | 0.15ms | < 1ms | p99 latency |
| IPC Latency (TCP) | 0.8ms | < 5ms | Local network |
| Log Write | 0.05ms | < 0.1ms | SQLite WAL mode |
| Export Detection | 50ms | < 100ms | inotify event |
| Batch Sync | 2.3s | < 5s | 1000 entries |
| Memory Usage | 42MB | < 512MB | Idle state |
| CPU Usage | 0.1% | < 5% | Idle monitoring |
Baseline Test Suite​
// benches/baselines.rs
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn establish_baselines(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
c.bench_function("baseline_ipc_unix", |b| {
b.iter(|| {
runtime.block_on(async {
let msg = create_minimal_message();
let start = Instant::now();
send_unix_message(black_box(msg)).await;
start.elapsed()
})
})
});
c.bench_function("baseline_sqlite_wal", |b| {
let conn = setup_wal_connection();
b.iter(|| {
write_log_entry(&conn, black_box("bench"))
})
});
}
criterion_group!(
name = baselines;
config = Criterion::default().sample_size(1000);
targets = establish_baselines
);
Test Automation​
CI/CD Pipeline​
# .github/workflows/integration-tests.yml
name: CODI2-Monitor Integration Tests
on:
push:
paths:
- 'codi2/**'
- 'codi-monitor/**'
- 'codi-common/**'
pull_request:
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: rustfmt, clippy
- name: Run tests
run: ./scripts/run-all-tests.sh
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
files: ./coverage/tarpaulin-report.html
Test Reports​
Generate comprehensive test report:
// tests/common/reporting.rs
pub fn generate_test_report(results: TestResults) -> String {
format!(
r#"
# CODI2-Monitor Integration Test Report
Generated: {}
## Summary
- Total Tests: {}
- Passed: {}
- Failed: {}
- Skipped: {}
- Coverage: {:.1}%
## Performance Baselines
- IPC Latency: {:.3}ms (target: <1ms)
- Throughput: {:.0} msg/s (target: >50k)
- Memory Usage: {}MB (limit: 512MB)
## Failure Scenarios
- Monitor Down: {}
- Connection Loss: {}
- Resource Exhaustion: {}
## Recommendations
{}
"#,
Utc::now().format("%Y-%m-%d %H:%M:%S UTC"),
results.total,
results.passed,
results.failed,
results.skipped,
results.coverage * 100.0,
results.ipc_latency_ms,
results.throughput,
results.memory_mb,
if results.monitor_down_passed { "✓" } else { "✗" },
if results.connection_loss_passed { "✓" } else { "✗" },
if results.resource_exhaustion_passed { "✓" } else { "✗" },
generate_recommendations(&results),
)
}
Approval Signatures​
| Role | Name | Date | Signature |
|---|---|---|---|
| QA Lead | Pending | ||
| Development Lead | Pending | ||
| DevOps Lead | Pending | ||
| Security Lead | Pending |
Version History​
| Version | Date | Changes | Author |
|---|---|---|---|
| 1.0.0 | 2025-09-28 | Initial test strategy | ORCHESTRATOR-SESSION-2025-09-27 |
| 1.1.0 | 2025-09-28 | Added network partition, clock skew, upgrade compatibility tests, and performance baselines per QA review | ORCHESTRATOR-SESSION-2025-09-27 |