Skip to main content

ADR-031: CODI2 and Monitor Service Integration - Part 3 (Testing)

Document Specification Block​

Document: ADR-031-v4-codi2-monitor-integration-part3-testing
Version: 1.1.0
Purpose: Comprehensive testing strategy for CODI2 and codi-monitor integration
Audience: QA Engineers, Developers, DevOps
Date Created: 2025-09-28
Date Modified: 2025-09-28
Date Released: 2025-09-28
Status: DRAFT
QA Reviewed: 2025-09-28 - APPROVED WITH MINOR REVISIONS

Table of Contents​

↑ Back to Top


Testing Philosophy​

Core Principles​

  1. Integration First: Focus on the interaction between CODI2 and monitor
  2. Failure Resilience: Test degraded states extensively
  3. Environment Parity: Same tests must pass locally and in containers
  4. Performance Baseline: Establish and maintain performance benchmarks
  5. Automation Required: All tests must be automatable

Coverage Requirements​

  • Unit Tests: 90% minimum
  • Integration Tests: 85% minimum
  • Critical Paths: 100% coverage
  • Failure Scenarios: 95% coverage

↑ Back to Top


Test Categories​

Test Matrix Overview​

CategoryFocusCoverage TargetAutomation
UnitIndividual components90%CI/CD
IntegrationIPC communication85%CI/CD
PerformanceLatency & throughputBaselinesNightly
FailureDegraded operation95%CI/CD
EnvironmentLocal vs container100%CI/CD
SecurityAuth & validation100%Weekly
E2EFull workflowCritical pathsRelease

↑ Back to Top


Unit Testing Strategy​

1. Protocol Tests​

#[cfg(test)]
mod protocol_tests {
use super::*;
use uuid::Uuid;
use chrono::Utc;

#[test]
fn test_message_serialization() {
let msg = Message {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: MessagePayload::LogEntry {
level: LogLevel::Info,
component: "test".to_string(),
message: "Test message".to_string(),
metadata: None,
},
};

// Test MessagePack serialization
let encoded = rmp_serde::to_vec(&msg).unwrap();
let decoded: Message = rmp_serde::from_slice(&encoded).unwrap();

assert_eq!(msg.id, decoded.id);
match decoded.payload {
MessagePayload::LogEntry { message, .. } => {
assert_eq!(message, "Test message");
}
_ => panic!("Wrong payload type"),
}
}

#[test]
fn test_message_size_limits() {
let large_message = "x".repeat(2 * 1024 * 1024); // 2MB
let msg = Message {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: MessagePayload::LogEntry {
level: LogLevel::Info,
component: "test".to_string(),
message: large_message,
metadata: None,
},
};

let encoded = rmp_serde::to_vec(&msg).unwrap();
assert!(encoded.len() > 1024 * 1024); // Should exceed 1MB limit
}
}

2. Client Tests​

#[cfg(test)]
mod client_tests {
use super::*;
use tokio::test;

#[test]
async fn test_standalone_mode() {
// No monitor running
let client = MonitorClient::connect().await.unwrap();
assert!(!client.is_connected());

// Should not error when sending
let msg = create_test_message();
assert!(client.send(msg).await.is_ok());
}

#[test]
async fn test_reconnection() {
let mut client = MonitorClient::connect().await.unwrap();

// Simulate disconnection
client.connection = None;

// Should handle gracefully
let msg = create_test_message();
assert!(client.send(msg).await.is_ok());
}
}

3. Storage Tests​

#[cfg(test)]
mod storage_tests {
use super::*;
use rusqlite::Connection;
use tempfile::tempdir;

#[test]
fn test_concurrent_access() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");

// Multiple connections with WAL mode
let conn1 = Connection::open(&db_path).unwrap();
let conn2 = Connection::open(&db_path).unwrap();

conn1.execute("PRAGMA journal_mode = WAL", []).unwrap();

// Concurrent writes should work
conn1.execute(
"INSERT INTO logs (timestamp, level, component, message)
VALUES (?, ?, ?, ?)",
params![Utc::now().timestamp(), "INFO", "test", "msg1"],
).unwrap();

conn2.execute(
"INSERT INTO logs (timestamp, level, component, message)
VALUES (?, ?, ?, ?)",
params![Utc::now().timestamp(), "INFO", "test", "msg2"],
).unwrap();

// Verify both writes succeeded
let count: i64 = conn1
.query_row("SELECT COUNT(*) FROM logs", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 2);
}
}

↑ Back to Top


Integration Testing​

1. IPC Communication Tests​

#[tokio::test]
async fn test_unix_socket_communication() {
let dir = tempdir().unwrap();
let socket_path = dir.path().join("test.sock");

// Start server
let (tx, mut rx) = mpsc::channel(10);
let server = Arc::new(IpcServer {
unix_path: Some(socket_path.to_str().unwrap().to_string()),
tcp_port: None,
message_tx: tx,
});

let server_handle = tokio::spawn(async move {
server.start().await
});

// Wait for server to start
tokio::time::sleep(Duration::from_millis(100)).await;

// Connect client
let mut client = MonitorClient::connect_to_unix(&socket_path).await.unwrap();

// Send message
let msg = create_test_log_message();
client.send(msg.clone()).await.unwrap();

// Verify receipt
let received = rx.recv().await.unwrap();
assert_eq!(received.id, msg.id);
}

#[tokio::test]
async fn test_tcp_socket_communication() {
let port = 19847; // Test port

// Start TCP server
let (tx, mut rx) = mpsc::channel(10);
let server = Arc::new(IpcServer {
unix_path: None,
tcp_port: Some(port),
message_tx: tx,
});

let server_handle = tokio::spawn(async move {
server.start().await
});

tokio::time::sleep(Duration::from_millis(100)).await;

// Connect TCP client
let mut client = MonitorClient::connect_to_tcp("127.0.0.1", port).await.unwrap();

// Send and verify
let msg = create_test_log_message();
client.send(msg.clone()).await.unwrap();

let received = rx.recv().await.unwrap();
assert_eq!(received.id, msg.id);
}

2. Full Workflow Tests​

#[tokio::test]
async fn test_log_command_workflow() {
// Setup
let test_env = TestEnvironment::new().await;

// Execute CODI2 log command
let output = Command::new("codi2")
.args(&["log", "Test message", "--action", "TEST"])
.output()
.await
.unwrap();

assert!(output.status.success());

// Verify local storage
let logs = test_env.query_logs("message = 'Test message'").await;
assert_eq!(logs.len(), 1);
assert_eq!(logs[0].action, "TEST");

// Verify monitor received it
if test_env.monitor_connected() {
let monitor_logs = test_env.monitor.get_received_logs().await;
assert_eq!(monitor_logs.len(), 1);
}
}

#[tokio::test]
async fn test_export_workflow() {
let test_env = TestEnvironment::new().await;

// Create export file
let export_path = test_env.workspace.join("2025-09-28-EXPORT-test.txt");
fs::write(&export_path, "Test export content").await.unwrap();

// Wait for processing
tokio::time::sleep(Duration::from_secs(2)).await;

// Verify file was moved
assert!(!export_path.exists());

// Verify in archive
let archives = test_env.query_export_archive().await;
assert_eq!(archives.len(), 1);
assert!(archives[0].archive_path.contains("20250928"));
}

↑ Back to Top


Performance Testing​

1. Latency Benchmarks​

#[bench]
fn bench_ipc_roundtrip(b: &mut Bencher) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let test_env = runtime.block_on(TestEnvironment::new());

b.iter(|| {
runtime.block_on(async {
let msg = create_test_log_message();
let start = Instant::now();
test_env.client.send(msg).await.unwrap();
let response = test_env.client.receive().await.unwrap();
let duration = start.elapsed();

assert!(duration < Duration::from_millis(1));
duration
})
});
}

#[bench]
fn bench_sqlite_write(b: &mut Bencher) {
let conn = Connection::open_in_memory().unwrap();
setup_schema(&conn);

b.iter(|| {
conn.execute(
"INSERT INTO logs (timestamp, level, component, message)
VALUES (?, ?, ?, ?)",
params![
Utc::now().timestamp(),
"INFO",
"bench",
"Benchmark message"
],
).unwrap()
});
}

2. Throughput Tests​

#[tokio::test]
async fn test_high_volume_logging() {
let test_env = TestEnvironment::new().await;
let message_count = 100_000;

let start = Instant::now();

// Send messages concurrently
let tasks: Vec<_> = (0..message_count)
.map(|i| {
let mut client = test_env.client.clone();
tokio::spawn(async move {
let msg = create_test_log_message_with_id(i);
client.send(msg).await.unwrap();
})
})
.collect();

futures::future::join_all(tasks).await;

let duration = start.elapsed();
let rate = message_count as f64 / duration.as_secs_f64();

println!("Throughput: {:.0} messages/second", rate);
assert!(rate > 50_000.0); // Minimum 50k msgs/sec
}

3. Resource Usage Tests​

#[test]
fn test_memory_usage() {
let test_env = TestEnvironment::new_blocking();

// Baseline memory
let baseline = get_process_memory();

// Generate load
for i in 0..1_000_000 {
test_env.log_sync(&format!("Message {}", i));
}

// Check memory growth
let final_memory = get_process_memory();
let growth_mb = (final_memory - baseline) / 1024 / 1024;

println!("Memory growth: {} MB", growth_mb);
assert!(growth_mb < 100); // Less than 100MB growth
}

↑ Back to Top


Failure Scenario Testing​

1. Monitor Unavailability​

#[tokio::test]
async fn test_monitor_down_scenario() {
// Start without monitor
let output = Command::new("codi2")
.args(&["log", "Monitor is down"])
.output()
.await
.unwrap();

assert!(output.status.success());
assert!(String::from_utf8_lossy(&output.stdout)
.contains("Log saved locally"));

// Verify local storage
let conn = Connection::open(".codi/db/codi.db").unwrap();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM logs WHERE message = 'Monitor is down'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(count, 1);
}

2. Connection Loss​

#[tokio::test]
async fn test_connection_loss_recovery() {
let mut test_env = TestEnvironment::new().await;

// Send initial message
test_env.send_log("Before disconnect").await;

// Kill monitor
test_env.stop_monitor().await;

// Should continue working
test_env.send_log("During disconnect").await;

// Restart monitor
test_env.start_monitor().await;
tokio::time::sleep(Duration::from_secs(1)).await;

// Should reconnect and sync
test_env.send_log("After reconnect").await;

// Verify all messages eventually synced
let synced = test_env.get_synced_logs().await;
assert_eq!(synced.len(), 3);
}

3. Resource Exhaustion​

#[tokio::test]
async fn test_disk_full_handling() {
// Create small ramdisk to simulate full disk
let ramdisk = create_ramdisk(10 * 1024 * 1024); // 10MB
let test_env = TestEnvironment::with_path(&ramdisk).await;

// Fill disk with logs
let mut count = 0;
loop {
match test_env.send_log(&format!("Message {}", count)).await {
Ok(_) => count += 1,
Err(e) if e.to_string().contains("No space left") => break,
Err(e) => panic!("Unexpected error: {}", e),
}
}

// Should handle gracefully
assert!(count > 0);
println!("Handled {} messages before disk full", count);

// Verify error reporting
let output = Command::new("codi2")
.args(&["status"])
.output()
.await
.unwrap();

assert!(String::from_utf8_lossy(&output.stdout)
.contains("Storage warning"));
}

↑ Back to Top


Environment Testing​

1. Local vs Container Comparison​

#[test]
fn test_environment_detection() {
// Test detection logic
std::env::set_var("KUBERNETES_SERVICE_HOST", "10.0.0.1");
assert_eq!(detect_environment(), Environment::Kubernetes);
std::env::remove_var("KUBERNETES_SERVICE_HOST");

// Docker detection
let dockerenv = Path::new("/.dockerenv");
if dockerenv.exists() {
assert_eq!(detect_environment(), Environment::Docker);
} else {
assert_eq!(detect_environment(), Environment::Local);
}
}

2. Container Integration​

# docker-compose.test.yml
version: '3.8'
services:
codi-monitor:
build:
context: .
dockerfile: Dockerfile.monitor
environment:
CODI_MONITOR_MODE: tcp
CODI_MONITOR_PORT: 9847
volumes:
- codi-data:/var/lib/codi

codi2-test:
build:
context: .
dockerfile: dockerfile.test
depends_on:
- codi-monitor
environment:
CODI_MONITOR_HOST: codi-monitor
CODI_MONITOR_PORT: 9847
command: cargo test --test container_integration

volumes:
codi-data:

3. Multi-Platform Tests​

#[cfg_attr(not(unix), ignore)]
#[test]
fn test_unix_socket_permissions() {
use std::os::unix::fs::PermissionsExt;

let socket_path = "/tmp/test-codi.sock";
// Create socket file
File::create(socket_path).unwrap();

// Set secure permissions
std::fs::set_permissions(
socket_path,
Permissions::from_mode(0o600),
).unwrap();

// Verify
let metadata = std::fs::metadata(socket_path).unwrap();
let mode = metadata.permissions().mode();
assert_eq!(mode & 0o777, 0o600);
}

↑ Back to Top


Additional Test Scenarios​

1. Network Partition Testing​

#[tokio::test]
async fn test_network_partition() {
let test_env = TestEnvironment::new().await;

// Send initial messages
test_env.send_log("Before partition").await;

// Simulate network partition
test_env.block_tcp_port(9847).await;

// Messages should queue locally
test_env.send_log("During partition").await;
test_env.send_log("Still partitioned").await;

// Restore connectivity
test_env.unblock_tcp_port(9847).await;

// Wait for reconnection and sync
tokio::time::sleep(Duration::from_secs(5)).await;

// Verify all messages eventually delivered
let logs = test_env.query_monitor_logs().await;
assert_eq!(logs.len(), 3);
}

2. Clock Skew Testing​

#[test]
fn test_clock_skew_handling() {
let mut test_env = TestEnvironment::new_blocking();

// Set clock 1 hour ahead
test_env.set_system_time_offset(Duration::from_secs(3600));

// Log entry with future timestamp
test_env.send_log("Future message").unwrap();

// Reset clock
test_env.reset_system_time();

// Log normal entry
test_env.send_log("Normal message").unwrap();

// Verify both stored with corrected timestamps
let logs = test_env.query_logs_ordered().unwrap();
assert_eq!(logs.len(), 2);

// Timestamps should be monotonic despite skew
assert!(logs[0].timestamp <= logs[1].timestamp);
}

3. Upgrade Compatibility Testing​

#[test]
fn test_version_compatibility() {
// Test old client with new monitor
let old_msg = MessageV1 {
id: Uuid::new_v4(),
payload: "old format",
};

let encoded = encode_v1(&old_msg);
let result = decode_current(&encoded);
assert!(result.is_ok(), "Should handle v1 messages");

// Test new client with old monitor
let new_msg = Message {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: MessagePayload::LogEntry { /* ... */ },
version: 2,
};

let result = send_to_v1_monitor(&new_msg);
assert!(result.is_ok(), "Should degrade gracefully");
}

↑ Back to Top


Security Testing​

1. Input Validation​

#[test]
fn test_message_injection_prevention() {
let malicious_inputs = vec![
"'; DROP TABLE logs; --",
"../../../etc/passwd",
"\0\0\0\0",
&"x".repeat(10_000_000), // 10MB string
];

for input in malicious_inputs {
let result = validate_log_message(input);
assert!(result.is_err() || result.unwrap() != input);
}
}

2. Authentication Tests​

#[tokio::test]
async fn test_tcp_authentication() {
let test_env = TestEnvironment::new().await;

// Try without auth token
let mut client = TcpStream::connect("127.0.0.1:9847").await.unwrap();
let msg = create_test_message();
let result = send_raw_message(&mut client, &msg).await;

assert!(result.is_err() ||
result.unwrap().contains("Authentication required"));

// With valid token
let token = test_env.generate_auth_token();
let mut auth_client = MonitorClient::connect_with_auth(token).await.unwrap();
assert!(auth_client.send(msg).await.is_ok());
}

↑ Back to Top


Acceptance Criteria​

Functional Requirements​

IDRequirementTest Coverage
F1CODI2 commands work without monitor✓ Standalone tests
F2Automatic reconnection✓ Recovery tests
F3Export file detection < 500ms✓ Performance tests
F4Log shipping to Server Hub✓ Integration tests
F5Session tracking✓ Workflow tests

Non-Functional Requirements​

IDRequirementTest Coverage
P1IPC latency < 1ms✓ Benchmarks
P2100k logs/second✓ Throughput tests
P3Memory < 512MB✓ Resource tests
S1Secure IPC✓ Security tests
S2Input validation✓ Injection tests

Test Execution Matrix​

#!/bin/bash
# run-all-tests.sh

echo "Running CODI2-Monitor Integration Tests"

# Unit tests
cargo test --package codi2 --lib
cargo test --package codi-monitor --lib

# Integration tests
cargo test --test integration

# Performance benchmarks
cargo bench --bench ipc_performance

# Container tests
docker-compose -f docker-compose.test.yml up --abort-on-container-exit

# Security scan
cargo audit
cargo clippy -- -D warnings

# Coverage report
cargo tarpaulin --out Html --output-dir coverage/

↑ Back to Top


Performance Baselines​

Established Baselines​

Based on prototype testing and benchmarks:

MetricBaselineTargetNotes
IPC Latency (Unix)0.15ms< 1msp99 latency
IPC Latency (TCP)0.8ms< 5msLocal network
Log Write0.05ms< 0.1msSQLite WAL mode
Export Detection50ms< 100msinotify event
Batch Sync2.3s< 5s1000 entries
Memory Usage42MB< 512MBIdle state
CPU Usage0.1%< 5%Idle monitoring

Baseline Test Suite​

// benches/baselines.rs
use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn establish_baselines(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();

c.bench_function("baseline_ipc_unix", |b| {
b.iter(|| {
runtime.block_on(async {
let msg = create_minimal_message();
let start = Instant::now();
send_unix_message(black_box(msg)).await;
start.elapsed()
})
})
});

c.bench_function("baseline_sqlite_wal", |b| {
let conn = setup_wal_connection();
b.iter(|| {
write_log_entry(&conn, black_box("bench"))
})
});
}

criterion_group!(
name = baselines;
config = Criterion::default().sample_size(1000);
targets = establish_baselines
);

Test Automation​

CI/CD Pipeline​

# .github/workflows/integration-tests.yml
name: CODI2-Monitor Integration Tests

on:
push:
paths:
- 'codi2/**'
- 'codi-monitor/**'
- 'codi-common/**'
pull_request:

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Setup Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: rustfmt, clippy

- name: Run tests
run: ./scripts/run-all-tests.sh

- name: Upload coverage
uses: codecov/codecov-action@v3
with:
files: ./coverage/tarpaulin-report.html

Test Reports​

Generate comprehensive test report:

// tests/common/reporting.rs
pub fn generate_test_report(results: TestResults) -> String {
format!(
r#"
# CODI2-Monitor Integration Test Report

Generated: {}

## Summary
- Total Tests: {}
- Passed: {}
- Failed: {}
- Skipped: {}
- Coverage: {:.1}%

## Performance Baselines
- IPC Latency: {:.3}ms (target: <1ms)
- Throughput: {:.0} msg/s (target: >50k)
- Memory Usage: {}MB (limit: 512MB)

## Failure Scenarios
- Monitor Down: {}
- Connection Loss: {}
- Resource Exhaustion: {}

## Recommendations
{}
"#,
Utc::now().format("%Y-%m-%d %H:%M:%S UTC"),
results.total,
results.passed,
results.failed,
results.skipped,
results.coverage * 100.0,
results.ipc_latency_ms,
results.throughput,
results.memory_mb,
if results.monitor_down_passed { "✓" } else { "✗" },
if results.connection_loss_passed { "✓" } else { "✗" },
if results.resource_exhaustion_passed { "✓" } else { "✗" },
generate_recommendations(&results),
)
}

↑ Back to Top


Approval Signatures​

RoleNameDateSignature
QA LeadPending
Development LeadPending
DevOps LeadPending
Security LeadPending

Version History​

VersionDateChangesAuthor
1.0.02025-09-28Initial test strategyORCHESTRATOR-SESSION-2025-09-27
1.1.02025-09-28Added network partition, clock skew, upgrade compatibility tests, and performance baselines per QA reviewORCHESTRATOR-SESSION-2025-09-27

↑ Back to Top