Skip to main content

ADR-031: CODI2 and Monitor Service Integration - Part 2 (Technical)

Document Specification Block​

Document: ADR-031-v4-codi2-monitor-integration-part2-technical
Version: 1.1.0
Purpose: Technical implementation blueprint for CODI2 and codi-monitor integration
Audience: Software Engineers, DevOps, System Architects
Date Created: 2025-09-28
Date Modified: 2025-09-28
Date Released: 2025-09-28
Status: DRAFT
QA Reviewed: 2025-09-28 - APPROVED WITH MINOR REVISIONS

Table of Contents​

↑ Back to Top


Technical Overview​

The CODI2-Monitor integration creates a client-server architecture where CODI2 acts as the CLI client and codi-monitor provides background monitoring services. Communication occurs via Unix domain sockets (local) or TCP (containers), with shared SQLite storage for resilience.

Key Technologies​

  • Language: Rust 1.73+
  • IPC: Unix domain sockets / TCP
  • Storage: SQLite 3.40+ with WAL mode
  • Serialization: MessagePack for efficiency
  • Framework: Tokio for async operations

↑ Back to Top


System Architecture​

Component Architecture​

Sequence Diagram: Log Command Flow​

↑ Back to Top


Core Components​

1. IPC Protocol Definition​

// codi-common/src/protocol.rs
use serde::{Serialize, Deserialize};
use uuid::Uuid;
use std::path::PathBuf;
use chrono::{DateTime, Utc};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Message {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub payload: MessagePayload,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum MessagePayload {
// Commands from CODI2
LogEntry {
level: LogLevel,
component: String,
message: String,
metadata: Option<serde_json::Value>,
},
StartMonitoring {
path: PathBuf,
recursive: bool,
patterns: Vec<String>,
},
StopMonitoring {
path: PathBuf,
},
SessionStart {
session_id: String,
identity: String,
agent_type: Option<String>,
},
SessionEnd {
session_id: String,
},
QueryLogs {
filter: LogFilter,
limit: usize,
},

// Responses from Monitor
Acknowledgment {
request_id: Uuid,
status: AckStatus,
},
LogResults {
entries: Vec<LogEntry>,
total_count: usize,
},
Error {
code: ErrorCode,
message: String,
details: Option<serde_json::Value>,
},
Status {
components: Vec<ComponentStatus>,
uptime: std::time::Duration,
},
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum LogLevel {
Trace,
Debug,
Info,
Warn,
Error,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum AckStatus {
Accepted,
Queued,
Rejected(String),
}

2. CODI2 Monitor Client​

// codi2/src/monitor_client.rs
use tokio::net::{UnixStream, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::path::Path;
use anyhow::{Result, Context};

pub struct MonitorClient {
connection: Option<Connection>,
}

enum Connection {
Unix(UnixStream),
Tcp(TcpStream),
}

impl MonitorClient {
pub async fn connect() -> Result<Self> {
// Try Unix socket first (fastest for local)
if let Ok(conn) = Self::try_unix_socket().await {
return Ok(Self { connection: Some(conn) });
}

// Try TCP (containers)
if let Ok(conn) = Self::try_tcp_socket().await {
return Ok(Self { connection: Some(conn) });
}

// Fall back to standalone mode
Ok(Self { connection: None })
}

async fn try_unix_socket() -> Result<Connection> {
let socket_path = "/tmp/codi-monitor.sock";
let stream = UnixStream::connect(socket_path)
.await
.context("Failed to connect to Unix socket")?;
Ok(Connection::Unix(stream))
}

async fn try_tcp_socket() -> Result<Connection> {
let addr = "127.0.0.1:9847";
let stream = TcpStream::connect(addr)
.await
.context("Failed to connect to TCP socket")?;
Ok(Connection::Tcp(stream))
}

pub async fn send(&mut self, message: Message) -> Result<()> {
let Some(ref mut conn) = self.connection else {
// Standalone mode - no error
return Ok(());
};

// Serialize with MessagePack
let data = rmp_serde::to_vec(&message)?;
let len = data.len() as u32;

// Write length-prefixed message
match conn {
Connection::Unix(stream) => {
stream.write_all(&len.to_le_bytes()).await?;
stream.write_all(&data).await?;
}
Connection::Tcp(stream) => {
stream.write_all(&len.to_le_bytes()).await?;
stream.write_all(&data).await?;
}
}

Ok(())
}

pub fn is_connected(&self) -> bool {
self.connection.is_some()
}
}

3. Monitor IPC Server​

// codi-monitor/src/ipc_server.rs
use tokio::net::{UnixListener, TcpListener};
use tokio::sync::mpsc;
use std::sync::Arc;

pub struct IpcServer {
unix_path: Option<String>,
tcp_port: Option<u16>,
message_tx: mpsc::Sender<Message>,
}

impl IpcServer {
pub async fn start(self: Arc<Self>) -> Result<()> {
let mut tasks = vec![];

// Start Unix socket listener
if let Some(path) = &self.unix_path {
let server = self.clone();
let path = path.clone();
tasks.push(tokio::spawn(async move {
server.listen_unix(&path).await
}));
}

// Start TCP listener
if let Some(port) = self.tcp_port {
let server = self.clone();
tasks.push(tokio::spawn(async move {
server.listen_tcp(port).await
}));
}

// Wait for any listener to fail
futures::future::try_join_all(tasks).await?;
Ok(())
}

async fn listen_unix(&self, path: &str) -> Result<()> {
// Remove existing socket file
let _ = std::fs::remove_file(path);

let listener = UnixListener::bind(path)?;
info!("IPC server listening on Unix socket: {}", path);

loop {
let (stream, _) = listener.accept().await?;
let tx = self.message_tx.clone();

tokio::spawn(async move {
if let Err(e) = handle_connection(stream, tx).await {
error!("Connection error: {}", e);
}
});
}
}

async fn listen_tcp(&self, port: u16) -> Result<()> {
let addr = format!("0.0.0.0:{}", port);
let listener = TcpListener::bind(&addr).await?;
info!("IPC server listening on TCP: {}", addr);

loop {
let (stream, addr) = listener.accept().await?;
let tx = self.message_tx.clone();

tokio::spawn(async move {
info!("New TCP connection from: {}", addr);
if let Err(e) = handle_connection(stream, tx).await {
error!("Connection error from {}: {}", addr, e);
}
});
}
}
}

async fn handle_connection<S>(mut stream: S, tx: mpsc::Sender<Message>) -> Result<()>
where
S: AsyncReadExt + AsyncWriteExt + Unpin,
{
let mut len_buf = [0u8; 4];

loop {
// Read message length
stream.read_exact(&mut len_buf).await?;
let len = u32::from_le_bytes(len_buf) as usize;

// Limit message size
if len > 1024 * 1024 { // 1MB max
return Err(anyhow!("Message too large: {}", len));
}

// Read message data
let mut data = vec![0u8; len];
stream.read_exact(&mut data).await?;

// Deserialize message
let message: Message = rmp_serde::from_slice(&data)?;

// Send to processing queue
tx.send(message).await?;
}
}

4. Shared Storage Schema​

-- SQLite schema for shared storage
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;

CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
level TEXT NOT NULL,
component TEXT NOT NULL,
session_id TEXT,
message TEXT NOT NULL,
metadata TEXT, -- JSON
synced BOOLEAN DEFAULT FALSE,
sync_timestamp INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now'))
);

CREATE INDEX idx_logs_timestamp ON logs(timestamp);
CREATE INDEX idx_logs_session ON logs(session_id);
CREATE INDEX idx_logs_synced ON logs(synced);

CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
identity TEXT NOT NULL,
agent_type TEXT,
started_at INTEGER NOT NULL,
ended_at INTEGER,
metadata TEXT -- JSON
);

CREATE TABLE IF NOT EXISTS monitored_paths (
path TEXT PRIMARY KEY,
recursive BOOLEAN DEFAULT TRUE,
patterns TEXT, -- JSON array
active BOOLEAN DEFAULT TRUE,
created_at INTEGER DEFAULT (strftime('%s', 'now'))
);

CREATE TABLE IF NOT EXISTS export_archive (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_path TEXT NOT NULL,
archive_path TEXT NOT NULL,
session_id TEXT,
file_size INTEGER,
processed_at INTEGER DEFAULT (strftime('%s', 'now'))
);

CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id TEXT NOT NULL,
payload TEXT NOT NULL, -- MessagePack blob
retry_count INTEGER DEFAULT 0,
next_retry INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now'))
);

↑ Back to Top


Data Models​

Configuration Model​

// Shared configuration structure
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CodiConfig {
pub general: GeneralConfig,
pub storage: StorageConfig,
pub monitor: MonitorConfig,
pub server_hub: ServerHubConfig,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GeneralConfig {
pub workspace: PathBuf,
pub log_level: String,
pub session_timeout: Duration,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StorageConfig {
pub sqlite_path: PathBuf,
pub wal_mode: bool,
pub cache_size_mb: usize,
pub retention_days: u32,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MonitorConfig {
pub unix_socket_path: Option<String>,
pub tcp_port: Option<u16>,
pub file_batch_size: usize,
pub export_patterns: Vec<String>,
pub scan_interval: Duration,
}

↑ Back to Top


Security Considerations​

1. IPC Security​

// Unix socket permissions
std::fs::set_permissions(
socket_path,
std::os::unix::fs::PermissionsExt::from_mode(0o600)
)?;

// TCP authentication token
pub struct AuthToken {
token: String,
expires_at: DateTime<Utc>,
}

2. Message Validation​

  • Size limits to prevent DoS
  • Schema validation for all messages
  • Rate limiting per connection
  • Input sanitization

3. Storage Security​

  • SQLite encryption at rest (SQLCipher)
  • Secure file permissions
  • Audit trail for all operations

↑ Back to Top


Error Recovery Patterns​

1. Connection Recovery​

impl MonitorClient {
async fn send_with_retry(&mut self, message: Message) -> Result<()> {
let mut retries = 0;
const MAX_RETRIES: u32 = 3;
const BACKOFF_BASE: u64 = 100; // milliseconds

loop {
match self.send_internal(&message).await {
Ok(()) => return Ok(()),
Err(e) if retries < MAX_RETRIES => {
warn!("Send failed (attempt {}): {}", retries + 1, e);

// Exponential backoff
let delay = BACKOFF_BASE * 2u64.pow(retries);
tokio::time::sleep(Duration::from_millis(delay)).await;

// Try to reconnect
if let Err(reconnect_err) = self.reconnect().await {
error!("Reconnection failed: {}", reconnect_err);
}

retries += 1;
}
Err(e) => {
error!("Send failed after {} retries: {}", MAX_RETRIES, e);
// Fall back to standalone mode
self.connection = None;
return Ok(()); // Don't fail, just disconnect
}
}
}
}
}

2. Storage Recovery​

impl StorageManager {
async fn write_with_recovery(&self, entry: &LogEntry) -> Result<()> {
// Try primary write
match self.write_to_sqlite(entry).await {
Ok(()) => Ok(()),
Err(e) if self.is_corruption_error(&e) => {
error!("Database corruption detected: {}", e);
self.recover_database().await?;
self.write_to_sqlite(entry).await
}
Err(e) if self.is_disk_full(&e) => {
error!("Disk full: {}", e);
self.rotate_old_logs().await?;
self.write_to_sqlite(entry).await
}
Err(e) => Err(e),
}
}

async fn recover_database(&self) -> Result<()> {
// Move corrupted database
let backup_path = format!("{}.corrupt.{}",
self.db_path,
Utc::now().timestamp()
);
fs::rename(&self.db_path, backup_path).await?;

// Create new database
self.initialize_schema().await?;

// Attempt to recover data from backup
if let Ok(recovered) = self.recover_from_backup(&backup_path).await {
info!("Recovered {} entries from corrupted database", recovered);
}

Ok(())
}
}

3. Monitor Process Recovery​

// Supervisor process that monitors the monitor
async fn monitor_supervisor() -> Result<()> {
loop {
// Check if monitor is running
match check_monitor_health().await {
Ok(HealthStatus::Healthy) => {
// All good, check again later
tokio::time::sleep(Duration::from_secs(30)).await;
}
Ok(HealthStatus::Degraded(reason)) => {
warn!("Monitor degraded: {}", reason);
// Attempt repair
repair_monitor(&reason).await?;
}
Err(e) => {
error!("Monitor health check failed: {}", e);
// Restart monitor
restart_monitor().await?;
}
}
}
}

async fn restart_monitor() -> Result<()> {
// Kill existing process
if let Ok(pid) = read_pid_file().await {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::SIGTERM
);
tokio::time::sleep(Duration::from_secs(5)).await;
}

// Start new instance
Command::new("codi-monitor")
.spawn()
.context("Failed to restart monitor")?;

Ok(())
}

↑ Back to Top


Performance Requirements​

Latency Targets​

OperationTargetMaximum
IPC Round-trip< 1ms10ms
Log Write< 0.1ms1ms
Export Detection< 100ms500ms
Batch Sync< 5s30s

Throughput Requirements​

  • 100,000 log entries/second
  • 10,000 file events/second
  • 1,000 concurrent CODI2 clients
  • 100MB/s sustained write rate

Resource Constraints​

[resource_limits]
memory_mb = 512
cpu_cores = 2
sqlite_cache_mb = 128
connection_pool_size = 100

↑ Back to Top


Deployment Configuration​

Docker Configuration​

# Dockerfile for codi-monitor
FROM rust:1.73-slim as builder

WORKDIR /build
COPY cargo.toml Cargo.lock ./
COPY src ./src

RUN cargo build --release

FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates
COPY --from=builder /build/target/release/codi-monitor /usr/local/bin/

# Create necessary directories
RUN mkdir -p /var/lib/codi /var/run/codi

# TCP mode for containers
ENV CODI_MONITOR_MODE=tcp
ENV CODI_MONITOR_PORT=9847

EXPOSE 9847

ENTRYPOINT ["codi-monitor"]

Systemd Service​

[Unit]
Description=CODI Monitor Service
After=network.target

[Service]
Type=simple
User=coditect
Group=coditect
ExecStart=/usr/local/bin/codi-monitor
Restart=always
RestartSec=10

# Security
NoNewPrivileges=true
PrivateTmp=true
ProtectHome=true
ReadWritePaths=/var/lib/codi

# Resource limits
MemoryLimit=512M
CPUQuota=200%

[Install]
WantedBy=multi-user.target

↑ Back to Top


Monitoring & Observability​

Metrics Exposed​

// Prometheus metrics
static LOG_COUNTER: Counter = Counter::new("codi_logs_total");
static IPC_DURATION: Histogram = Histogram::new("codi_ipc_duration_seconds");
static EXPORT_COUNTER: Counter = Counter::new("codi_exports_processed_total");
static SYNC_LAG: Gauge = Gauge::new("codi_sync_lag_seconds");

Health Check Endpoint​

#[derive(Serialize)]
struct HealthStatus {
status: &'static str,
version: &'static str,
uptime_seconds: u64,
components: HashMap<String, ComponentHealth>,
}

pub async fn health_check() -> Json<HealthStatus> {
Json(HealthStatus {
status: "healthy",
version: env!("CARGO_PKG_version"),
uptime_seconds: UPTIME.elapsed().as_secs(),
components: collect_component_health().await,
})
}

↑ Back to Top


Approval Signatures​

RoleNameDateSignature
Technical LeadPending
Security ArchitectPending
DevOps LeadPending
QA LeadPending

Version History​

VersionDateChangesAuthor
1.0.02025-09-28Initial technical specificationORCHESTRATOR-SESSION-2025-09-27
1.1.02025-09-28Added error recovery patterns section per QA reviewORCHESTRATOR-SESSION-2025-09-27

↑ Back to Top