ADR-031: CODI2 and Monitor Service Integration - Part 2 (Technical)
Document Specification Block​
Document: ADR-031-v4-codi2-monitor-integration-part2-technical
Version: 1.1.0
Purpose: Technical implementation blueprint for CODI2 and codi-monitor integration
Audience: Software Engineers, DevOps, System Architects
Date Created: 2025-09-28
Date Modified: 2025-09-28
Date Released: 2025-09-28
Status: DRAFT
QA Reviewed: 2025-09-28 - APPROVED WITH MINOR REVISIONS
Table of Contents​
- Technical Overview
- System Architecture
- Core Components
- Integration Protocol
- Implementation Details
- Data Models
- Security Considerations
- Performance Requirements
- Deployment Configuration
- Monitoring & Observability
- Approval Signatures
- Version History
Technical Overview​
The CODI2-Monitor integration creates a client-server architecture where CODI2 acts as the CLI client and codi-monitor provides background monitoring services. Communication occurs via Unix domain sockets (local) or TCP (containers), with shared SQLite storage for resilience.
Key Technologies​
- Language: Rust 1.73+
- IPC: Unix domain sockets / TCP
- Storage: SQLite 3.40+ with WAL mode
- Serialization: MessagePack for efficiency
- Framework: Tokio for async operations
System Architecture​
Component Architecture​
Sequence Diagram: Log Command Flow​
Core Components​
1. IPC Protocol Definition​
// codi-common/src/protocol.rs
use serde::{Serialize, Deserialize};
use uuid::Uuid;
use std::path::PathBuf;
use chrono::{DateTime, Utc};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Message {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub payload: MessagePayload,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum MessagePayload {
// Commands from CODI2
LogEntry {
level: LogLevel,
component: String,
message: String,
metadata: Option<serde_json::Value>,
},
StartMonitoring {
path: PathBuf,
recursive: bool,
patterns: Vec<String>,
},
StopMonitoring {
path: PathBuf,
},
SessionStart {
session_id: String,
identity: String,
agent_type: Option<String>,
},
SessionEnd {
session_id: String,
},
QueryLogs {
filter: LogFilter,
limit: usize,
},
// Responses from Monitor
Acknowledgment {
request_id: Uuid,
status: AckStatus,
},
LogResults {
entries: Vec<LogEntry>,
total_count: usize,
},
Error {
code: ErrorCode,
message: String,
details: Option<serde_json::Value>,
},
Status {
components: Vec<ComponentStatus>,
uptime: std::time::Duration,
},
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum LogLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum AckStatus {
Accepted,
Queued,
Rejected(String),
}
2. CODI2 Monitor Client​
// codi2/src/monitor_client.rs
use tokio::net::{UnixStream, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::path::Path;
use anyhow::{Result, Context};
pub struct MonitorClient {
connection: Option<Connection>,
}
enum Connection {
Unix(UnixStream),
Tcp(TcpStream),
}
impl MonitorClient {
pub async fn connect() -> Result<Self> {
// Try Unix socket first (fastest for local)
if let Ok(conn) = Self::try_unix_socket().await {
return Ok(Self { connection: Some(conn) });
}
// Try TCP (containers)
if let Ok(conn) = Self::try_tcp_socket().await {
return Ok(Self { connection: Some(conn) });
}
// Fall back to standalone mode
Ok(Self { connection: None })
}
async fn try_unix_socket() -> Result<Connection> {
let socket_path = "/tmp/codi-monitor.sock";
let stream = UnixStream::connect(socket_path)
.await
.context("Failed to connect to Unix socket")?;
Ok(Connection::Unix(stream))
}
async fn try_tcp_socket() -> Result<Connection> {
let addr = "127.0.0.1:9847";
let stream = TcpStream::connect(addr)
.await
.context("Failed to connect to TCP socket")?;
Ok(Connection::Tcp(stream))
}
pub async fn send(&mut self, message: Message) -> Result<()> {
let Some(ref mut conn) = self.connection else {
// Standalone mode - no error
return Ok(());
};
// Serialize with MessagePack
let data = rmp_serde::to_vec(&message)?;
let len = data.len() as u32;
// Write length-prefixed message
match conn {
Connection::Unix(stream) => {
stream.write_all(&len.to_le_bytes()).await?;
stream.write_all(&data).await?;
}
Connection::Tcp(stream) => {
stream.write_all(&len.to_le_bytes()).await?;
stream.write_all(&data).await?;
}
}
Ok(())
}
pub fn is_connected(&self) -> bool {
self.connection.is_some()
}
}
3. Monitor IPC Server​
// codi-monitor/src/ipc_server.rs
use tokio::net::{UnixListener, TcpListener};
use tokio::sync::mpsc;
use std::sync::Arc;
pub struct IpcServer {
unix_path: Option<String>,
tcp_port: Option<u16>,
message_tx: mpsc::Sender<Message>,
}
impl IpcServer {
pub async fn start(self: Arc<Self>) -> Result<()> {
let mut tasks = vec![];
// Start Unix socket listener
if let Some(path) = &self.unix_path {
let server = self.clone();
let path = path.clone();
tasks.push(tokio::spawn(async move {
server.listen_unix(&path).await
}));
}
// Start TCP listener
if let Some(port) = self.tcp_port {
let server = self.clone();
tasks.push(tokio::spawn(async move {
server.listen_tcp(port).await
}));
}
// Wait for any listener to fail
futures::future::try_join_all(tasks).await?;
Ok(())
}
async fn listen_unix(&self, path: &str) -> Result<()> {
// Remove existing socket file
let _ = std::fs::remove_file(path);
let listener = UnixListener::bind(path)?;
info!("IPC server listening on Unix socket: {}", path);
loop {
let (stream, _) = listener.accept().await?;
let tx = self.message_tx.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, tx).await {
error!("Connection error: {}", e);
}
});
}
}
async fn listen_tcp(&self, port: u16) -> Result<()> {
let addr = format!("0.0.0.0:{}", port);
let listener = TcpListener::bind(&addr).await?;
info!("IPC server listening on TCP: {}", addr);
loop {
let (stream, addr) = listener.accept().await?;
let tx = self.message_tx.clone();
tokio::spawn(async move {
info!("New TCP connection from: {}", addr);
if let Err(e) = handle_connection(stream, tx).await {
error!("Connection error from {}: {}", addr, e);
}
});
}
}
}
async fn handle_connection<S>(mut stream: S, tx: mpsc::Sender<Message>) -> Result<()>
where
S: AsyncReadExt + AsyncWriteExt + Unpin,
{
let mut len_buf = [0u8; 4];
loop {
// Read message length
stream.read_exact(&mut len_buf).await?;
let len = u32::from_le_bytes(len_buf) as usize;
// Limit message size
if len > 1024 * 1024 { // 1MB max
return Err(anyhow!("Message too large: {}", len));
}
// Read message data
let mut data = vec![0u8; len];
stream.read_exact(&mut data).await?;
// Deserialize message
let message: Message = rmp_serde::from_slice(&data)?;
// Send to processing queue
tx.send(message).await?;
}
}
4. Shared Storage Schema​
-- SQLite schema for shared storage
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
level TEXT NOT NULL,
component TEXT NOT NULL,
session_id TEXT,
message TEXT NOT NULL,
metadata TEXT, -- JSON
synced BOOLEAN DEFAULT FALSE,
sync_timestamp INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now'))
);
CREATE INDEX idx_logs_timestamp ON logs(timestamp);
CREATE INDEX idx_logs_session ON logs(session_id);
CREATE INDEX idx_logs_synced ON logs(synced);
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
identity TEXT NOT NULL,
agent_type TEXT,
started_at INTEGER NOT NULL,
ended_at INTEGER,
metadata TEXT -- JSON
);
CREATE TABLE IF NOT EXISTS monitored_paths (
path TEXT PRIMARY KEY,
recursive BOOLEAN DEFAULT TRUE,
patterns TEXT, -- JSON array
active BOOLEAN DEFAULT TRUE,
created_at INTEGER DEFAULT (strftime('%s', 'now'))
);
CREATE TABLE IF NOT EXISTS export_archive (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_path TEXT NOT NULL,
archive_path TEXT NOT NULL,
session_id TEXT,
file_size INTEGER,
processed_at INTEGER DEFAULT (strftime('%s', 'now'))
);
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id TEXT NOT NULL,
payload TEXT NOT NULL, -- MessagePack blob
retry_count INTEGER DEFAULT 0,
next_retry INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now'))
);
Data Models​
Configuration Model​
// Shared configuration structure
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CodiConfig {
pub general: GeneralConfig,
pub storage: StorageConfig,
pub monitor: MonitorConfig,
pub server_hub: ServerHubConfig,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GeneralConfig {
pub workspace: PathBuf,
pub log_level: String,
pub session_timeout: Duration,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StorageConfig {
pub sqlite_path: PathBuf,
pub wal_mode: bool,
pub cache_size_mb: usize,
pub retention_days: u32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MonitorConfig {
pub unix_socket_path: Option<String>,
pub tcp_port: Option<u16>,
pub file_batch_size: usize,
pub export_patterns: Vec<String>,
pub scan_interval: Duration,
}
Security Considerations​
1. IPC Security​
// Unix socket permissions
std::fs::set_permissions(
socket_path,
std::os::unix::fs::PermissionsExt::from_mode(0o600)
)?;
// TCP authentication token
pub struct AuthToken {
token: String,
expires_at: DateTime<Utc>,
}
2. Message Validation​
- Size limits to prevent DoS
- Schema validation for all messages
- Rate limiting per connection
- Input sanitization
3. Storage Security​
- SQLite encryption at rest (SQLCipher)
- Secure file permissions
- Audit trail for all operations
Error Recovery Patterns​
1. Connection Recovery​
impl MonitorClient {
async fn send_with_retry(&mut self, message: Message) -> Result<()> {
let mut retries = 0;
const MAX_RETRIES: u32 = 3;
const BACKOFF_BASE: u64 = 100; // milliseconds
loop {
match self.send_internal(&message).await {
Ok(()) => return Ok(()),
Err(e) if retries < MAX_RETRIES => {
warn!("Send failed (attempt {}): {}", retries + 1, e);
// Exponential backoff
let delay = BACKOFF_BASE * 2u64.pow(retries);
tokio::time::sleep(Duration::from_millis(delay)).await;
// Try to reconnect
if let Err(reconnect_err) = self.reconnect().await {
error!("Reconnection failed: {}", reconnect_err);
}
retries += 1;
}
Err(e) => {
error!("Send failed after {} retries: {}", MAX_RETRIES, e);
// Fall back to standalone mode
self.connection = None;
return Ok(()); // Don't fail, just disconnect
}
}
}
}
}
2. Storage Recovery​
impl StorageManager {
async fn write_with_recovery(&self, entry: &LogEntry) -> Result<()> {
// Try primary write
match self.write_to_sqlite(entry).await {
Ok(()) => Ok(()),
Err(e) if self.is_corruption_error(&e) => {
error!("Database corruption detected: {}", e);
self.recover_database().await?;
self.write_to_sqlite(entry).await
}
Err(e) if self.is_disk_full(&e) => {
error!("Disk full: {}", e);
self.rotate_old_logs().await?;
self.write_to_sqlite(entry).await
}
Err(e) => Err(e),
}
}
async fn recover_database(&self) -> Result<()> {
// Move corrupted database
let backup_path = format!("{}.corrupt.{}",
self.db_path,
Utc::now().timestamp()
);
fs::rename(&self.db_path, backup_path).await?;
// Create new database
self.initialize_schema().await?;
// Attempt to recover data from backup
if let Ok(recovered) = self.recover_from_backup(&backup_path).await {
info!("Recovered {} entries from corrupted database", recovered);
}
Ok(())
}
}
3. Monitor Process Recovery​
// Supervisor process that monitors the monitor
async fn monitor_supervisor() -> Result<()> {
loop {
// Check if monitor is running
match check_monitor_health().await {
Ok(HealthStatus::Healthy) => {
// All good, check again later
tokio::time::sleep(Duration::from_secs(30)).await;
}
Ok(HealthStatus::Degraded(reason)) => {
warn!("Monitor degraded: {}", reason);
// Attempt repair
repair_monitor(&reason).await?;
}
Err(e) => {
error!("Monitor health check failed: {}", e);
// Restart monitor
restart_monitor().await?;
}
}
}
}
async fn restart_monitor() -> Result<()> {
// Kill existing process
if let Ok(pid) = read_pid_file().await {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::SIGTERM
);
tokio::time::sleep(Duration::from_secs(5)).await;
}
// Start new instance
Command::new("codi-monitor")
.spawn()
.context("Failed to restart monitor")?;
Ok(())
}
Performance Requirements​
Latency Targets​
| Operation | Target | Maximum |
|---|---|---|
| IPC Round-trip | < 1ms | 10ms |
| Log Write | < 0.1ms | 1ms |
| Export Detection | < 100ms | 500ms |
| Batch Sync | < 5s | 30s |
Throughput Requirements​
- 100,000 log entries/second
- 10,000 file events/second
- 1,000 concurrent CODI2 clients
- 100MB/s sustained write rate
Resource Constraints​
[resource_limits]
memory_mb = 512
cpu_cores = 2
sqlite_cache_mb = 128
connection_pool_size = 100
Deployment Configuration​
Docker Configuration​
# Dockerfile for codi-monitor
FROM rust:1.73-slim as builder
WORKDIR /build
COPY cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates
COPY --from=builder /build/target/release/codi-monitor /usr/local/bin/
# Create necessary directories
RUN mkdir -p /var/lib/codi /var/run/codi
# TCP mode for containers
ENV CODI_MONITOR_MODE=tcp
ENV CODI_MONITOR_PORT=9847
EXPOSE 9847
ENTRYPOINT ["codi-monitor"]
Systemd Service​
[Unit]
Description=CODI Monitor Service
After=network.target
[Service]
Type=simple
User=coditect
Group=coditect
ExecStart=/usr/local/bin/codi-monitor
Restart=always
RestartSec=10
# Security
NoNewPrivileges=true
PrivateTmp=true
ProtectHome=true
ReadWritePaths=/var/lib/codi
# Resource limits
MemoryLimit=512M
CPUQuota=200%
[Install]
WantedBy=multi-user.target
Monitoring & Observability​
Metrics Exposed​
// Prometheus metrics
static LOG_COUNTER: Counter = Counter::new("codi_logs_total");
static IPC_DURATION: Histogram = Histogram::new("codi_ipc_duration_seconds");
static EXPORT_COUNTER: Counter = Counter::new("codi_exports_processed_total");
static SYNC_LAG: Gauge = Gauge::new("codi_sync_lag_seconds");
Health Check Endpoint​
#[derive(Serialize)]
struct HealthStatus {
status: &'static str,
version: &'static str,
uptime_seconds: u64,
components: HashMap<String, ComponentHealth>,
}
pub async fn health_check() -> Json<HealthStatus> {
Json(HealthStatus {
status: "healthy",
version: env!("CARGO_PKG_version"),
uptime_seconds: UPTIME.elapsed().as_secs(),
components: collect_component_health().await,
})
}
Approval Signatures​
| Role | Name | Date | Signature |
|---|---|---|---|
| Technical Lead | Pending | ||
| Security Architect | Pending | ||
| DevOps Lead | Pending | ||
| QA Lead | Pending |
Version History​
| Version | Date | Changes | Author |
|---|---|---|---|
| 1.0.0 | 2025-09-28 | Initial technical specification | ORCHESTRATOR-SESSION-2025-09-27 |
| 1.1.0 | 2025-09-28 | Added error recovery patterns section per QA review | ORCHESTRATOR-SESSION-2025-09-27 |