Skip to main content

ADR-001-v4: Container Execution Architecture - Part 2: Technical

Document Specification Block​

Document: ADR-001-v4-container-execution-architecture-part2-technical
Version: 3.0.0
Purpose: Technical implementation details for CODITECT's container execution architecture
Audience: Developers, AI agents, technical implementers
Date Created: 2025-08-27
Date Modified: 2025-09-03
Status: UPDATED_FOR_STATEFULSETS
Changes: Replaced ephemeral containers with GKE StatefulSets

Table of Contents​

↑ Back to Top


Implementation Overview​

System Architecture​

Component Dependencies​

[dependencies]

Existing dependencies from cargo.toml

actix-web = "4.4" tokio = { version = "1.36", features = ["full"] } foundationdb = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" uuid = { version = "1.7", features = ["v4", "serde"] } jsonwebtoken = "9.2" dashmap = "5.5" anyhow = "1.0"

terminal handling

nix = { version = "0.27", features = ["term", "ioctl"] } portable-pty = "0.8" bytes = "1.5"

WebSocket support

actix-ws = "0.2" actix-web-actors = "4.3"

Kubernetes client for StatefulSet management

k8s-openapi = { version = "0.20", features = ["v1_28"] } kube = { version = "0.87", features = ["client", "runtime", "ws"] }

↑ Back to Top


Architecture Components​

workspace Manager for StatefulSets​

// src/workspace/statefulset_manager.rs use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec}; use k8s_openapi::api::core::v1::{PersistentVolumeClaim, PersistentVolumeClaimSpec, ResourceRequirements}; use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use kube::{Api, Client, ResourceExt}; use std::collections::BTreeMap;

pub struct workspaceManager { k8s_client: Client, namespace: String, }

impl workspaceManager { pub async fn new() -> Result { let k8s_client = Client::try_default().await?; Ok(Self { k8s_client, namespace: "coditect".to_string(), }) }

pub async fn create_workspace(
&self,
workspace_id: &str,
user_id: &str,
) -> Result<workspaceInfo> {
let statefulset_api: Api<StatefulSet> = Api::namespaced(
self.k8s_client.clone(),
&self.namespace,
);

let statefulset = self.build_statefulset(workspace_id, user_id);
let created = statefulset_api.create(&Default::default(), &statefulset).await?;

Ok(workspaceInfo {
workspace_id: workspace_id.to_string(),
pod_name: format!("{}-0", workspace_id),
status: "Creating".to_string(),
})
}

fn build_statefulset(&self, workspace_id: &str, user_id: &str) -> StatefulSet {
let mut labels = BTreeMap::new();
labels.insert("app".to_string(), "workspace".to_string());
labels.insert("workspace-id".to_string(), workspace_id.to_string());
labels.insert("user-id".to_string(), user_id.to_string());

serde_json::from_value(json!({
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"metadata": {
"name": workspace_id,
"namespace": self.namespace,
"labels": labels,
},
"spec": {
"serviceName": "workspace-service",
"replicas": 1,
"selector": {
"matchLabels": {
"workspace-id": workspace_id,
}
},
"template": {
"metadata": {
"labels": labels,
},
"spec": {
"nodeSelector": {
"cloud.google.com/gke-spot": "true",
},
"containers": [{
"name": "workspace",
"image": "gcr.io/PROJECT_ID/workspace:latest",
"ports": [{
"containerPort": 8080,
"name": "websocket",
}],
"resources": {
"requests": {
"cpu": "1",
"memory": "2Gi",
},
"limits": {
"cpu": "2",
"memory": "4Gi",
},
},
"volumeMounts": [{
"name": "workspace-storage",
"mountPath": "/workspace",
}],
"env": [
{
"name": "WORKSPACE_ID",
"value": workspace_id,
},
{
"name": "USER_ID",
"value": user_id,
},
],
}],
},
},
"volumeClaimTemplates": [{
"metadata": {
"name": "workspace-storage",
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"storageClassName": "standard-rwo",
"resources": {
"requests": {
"storage": "10Gi",
},
},
},
}],
},
})).unwrap()
}

}

WebSocket Gateway (Existing)​

// src/gateway/websocket_server.rs use actix_web::{web, HttpRequest, HttpResponse}; use actix_ws::Message; use dashmap::DashMap; use std::sync::Arc;

pub struct WebSocketGateway { connections: Arc<DashMap<String, Connection>>, terminal_bridge: Arc, file_handler: Arc, session_manager: Arc, jwt_secret: String, }

impl WebSocketGateway { pub fn new(jwt_secret: String, db: Database) -> Self { Self { connections: Arc::new(DashMap::new()), terminal_bridge: Arc::new(terminalBridge::new()), file_handler: Arc::new(FileOperationHandler::new()), session_manager: Arc::new(SessionManager::new(db)), jwt_secret, } }

pub async fn websocket_handler(
&self,
req: HttpRequest,
stream: web::Payload,
) -> Result<HttpResponse, actix_web::Error> {
let (response, session, msg_stream) = actix_ws::handle(&req, stream)?;

// Extract JWT from request
let token = extract_jwt_from_request(&req)?;
let claims = validate_jwt(&token, &self.jwt_secret)?;

// Create connection
let connection = Connection {
id: Uuid::new_v4().to_string(),
workspace_id: claims.workspace_id,
user_id: claims.user_id,
session,
};

let conn_id = connection.id.clone();
self.connections.insert(conn_id.clone(), connection);

// Spawn message handler
let gateway = self.clone();
actix_web::rt::spawn(async move {
gateway.handle_connection(conn_id, msg_stream).await;
});

Ok(response)
}

async fn handle_connection(
&self,
conn_id: String,
mut msg_stream: impl Stream<Item = Result<Message, actix_ws::ProtocolError>>,
) {
while let Some(Ok(msg)) = msg_stream.next().await {
match msg {
Message::Text(text) => {
if let Ok(message) = serde_json::from_str::<GatewayMessage>(&text) {
self.route_message(conn_id.clone(), message).await;
}
}
Message::Close(_) => {
self.connections.remove(&conn_id);
break;
}
_ => {}
}
}
}

}

↑ Back to Top


terminal Bridge with PTY Fix​

Fixed PTY Implementation​

// src/gateway/terminal_bridge/pty.rs use portable_pty::{native_pty_system, CommandBuilder, PtySize}; use std::sync::Arc; use tokio::sync::Mutex;

pub struct PtyProcess { master: Box<dyn portable_pty::MasterPty + Send>, child: Arc<Mutex<Box<dyn portable_pty::Child + Send + Sync>>>, }

impl PtyProcess { pub fn new(cols: u16, rows: u16) -> Result { let pty_system = native_pty_system(); let pty_size = PtySize { rows, cols, pixel_width: 0, pixel_height: 0, };

    let pair = pty_system.openpty(pty_size)?;
let mut cmd = CommandBuilder::new("bash");
cmd.env("TERM", "xterm-256color");

let child = pair.slave.spawn_command(cmd)?;

Ok(Self {
master: pair.master,
child: Arc::new(Mutex::new(child)),
})
}

// FIXED: Proper write handling without "File exists" error
pub async fn write_input(&mut self, data: &[u8]) -> Result<()> {
let mut writer = self.master.take_writer()?;

// Use non-blocking write with proper error handling
tokio::task::spawn_blocking(move || {
writer.write_all(data)?;
writer.flush()?;
Ok::<_, anyhow::Error>(())
})
.await??;

Ok(())
}

pub async fn read_output(&mut self) -> Result<Vec<u8>> {
let mut reader = self.master.try_clone_reader()?;
let mut buffer = vec![0u8; 4096];

let n = tokio::task::spawn_blocking(move || {
reader.read(&mut buffer)
})
.await??;

buffer.truncate(n);
Ok(buffer)
}

}

// src/gateway/terminal_bridge/bridge.rs pub struct terminalBridge { sessions: Arc<DashMap<String, terminalSession>>, }

impl terminalBridge { pub async fn create_session( &self, workspace_id: &str, cols: u16, rows: u16, ) -> Result { let session_id = Uuid::new_v4().to_string(); let pty = PtyProcess::new(cols, rows)?;

    let session = terminalSession {
id: session_id.clone(),
workspace_id: workspace_id.to_string(),
pty: Arc::new(Mutex::new(pty)),
created_at: Utc::now(),
last_activity: Utc::now(),
};

self.sessions.insert(session_id.clone(), session);
Ok(session_id)
}

pub async fn send_input(
&self,
session_id: &str,
data: &[u8],
) -> Result<()> {
let session = self.sessions.get(session_id)
.ok_or_else(|| anyhow!("Session not found"))?;

let mut pty = session.pty.lock().await;
pty.write_input(data).await?;

// Update last activity
drop(pty);
self.update_activity(session_id);

Ok(())
}

}

↑ Back to Top


FoundationDB Schema​

// src/models/terminal.rs use serde::{Deserialize, Serialize}; use chrono::{DateTime, Utc};

#[derive(Debug, Clone, Serialize, Deserialize)] pub struct terminalSessionState { pub id: String, pub workspace_id: String, pub started_at: DateTime, pub last_active: DateTime, pub shell_type: String, pub working_directory: String, pub environment_vars: HashMap<String, String>, pub command_history: Vec, }

#[derive(Debug, Clone, Serialize, Deserialize)] pub struct terminalOutput { pub session_id: String, pub lines: VecDeque, pub cursor_position: (u16, u16), pub screen_size: (u16, u16), }

// src/db/repositories/terminal_repository.rs use foundationdb::{Database, Transaction};

pub struct terminalRepository { db: Database, }

impl terminalRepository { // Key patterns for multi-tenant isolation fn session_key(workspace_id: &str, session_id: &str) -> String { format!("{}/terminal_sessions/{}", workspace_id, session_id) }

fn output_key(workspace_id: &str, session_id: &str) -> String {
format!("{}/terminal_output/{}", workspace_id, session_id)
}

pub async fn save_session_state(
&self,
state: &terminalSessionState,
) -> Result<()> {
let tr = self.db.create_trx()?;
let key = Self::session_key(&state.workspace_id, &state.id);
let value = serde_json::to_vec(state)?;

tr.set(key.as_bytes(), &value);
tr.commit().await?;

Ok(())
}

pub async fn get_session_state(
&self,
workspace_id: &str,
session_id: &str,
) -> Result<Option<terminalSessionState>> {
let tr = self.db.create_trx()?;
let key = Self::session_key(workspace_id, session_id);

match tr.get(key.as_bytes(), false).await? {
Some(value) => {
let state = serde_json::from_slice(&value)?;
Ok(Some(state))
}
None => Ok(None),
}
}

}

↑ Back to Top


Testing Requirements​

#[cfg(test)] mod tests { use super::*;

#[tokio::test]
async fn test_pty_write_without_error() {
let mut pty = PtyProcess::new(80, 24).unwrap();

// Test multiple writes
for i in 0..10 {
let input = format!("echo test_{}\n", i);
let result = pty.write_input(input.as_bytes()).await;
assert!(result.is_ok(), "Write {} should succeed", i);

tokio::time::sleep(Duration::from_millis(10)).await;
}
}

#[tokio::test]
async fn test_terminal_session_persistence() {
let db = setup_test_db().await;
let repo = terminalRepository::new(db);
let bridge = terminalBridge::new();

let workspace_id = "test-workspace";
let session_id = bridge.create_session(workspace_id, 80, 24).await.unwrap();

// Execute commands
bridge.send_input(&session_id, b"export FOO=bar\n").await.unwrap();
bridge.send_input(&session_id, b"cd /tmp\n").await.unwrap();

// Save state
let state = terminalSessionState {
id: session_id.clone(),
workspace_id: workspace_id.to_string(),
started_at: Utc::now(),
last_active: Utc::now(),
shell_type: "bash".to_string(),
working_directory: "/tmp".to_string(),
environment_vars: [("FOO".to_string(), "bar".to_string())].into(),
command_history: vec!["export FOO=bar".to_string(), "cd /tmp".to_string()],
};

repo.save_session_state(&state).await.unwrap();

// Retrieve and verify
let loaded = repo.get_session_state(workspace_id, &session_id).await.unwrap();
assert!(loaded.is_some());
assert_eq!(loaded.unwrap().environment_vars.get("FOO"), Some(&"bar".to_string()));
}

#[tokio::test]
async fn test_concurrent_terminal_sessions() {
let bridge = terminalBridge::new();
let mut handles = vec![];

// Create 10 concurrent sessions
for i in 0..10 {
let bridge_clone = bridge.clone();
let handle = tokio::spawn(async move {
let workspace_id = format!("workspace-{}", i);
let session_id = bridge_clone.create_session(&workspace_id, 80, 24).await.unwrap();

// Send unique command
let cmd = format!("echo session_{}\n", i);
bridge_clone.send_input(&session_id, cmd.as_bytes()).await.unwrap();

(workspace_id, session_id)
});
handles.push(handle);
}

// Verify all sessions created
let results: Vec<_> = futures::future::join_all(handles).await;
assert_eq!(results.len(), 10);

for (i, result) in results.iter().enumerate() {
let (workspace_id, session_id) = result.as_ref().unwrap();
assert_eq!(workspace_id, &format!("workspace-{}", i));
assert!(bridge.sessions.contains_key(session_id));
}
}

}

↑ Back to Top


Deployment Configuration​

GKE StatefulSet Configuration​

statefulset-workspace.yaml

apiVersion: apps/v1 kind: StatefulSet metadata: name: workspace-statefulset namespace: coditect spec: serviceName: workspace-service replicas: 1 selector: matchLabels: app: workspace template: metadata: labels: app: workspace spec: nodeSelector: cloud.google.com/gke-spot: "true" # Use Spot instances for cost savings containers: - name: workspace image: gcr.io/PROJECT_ID/workspace:latest ports: - containerPort: 8080 name: websocket resources: requests: cpu: "1" memory: "2Gi" limits: cpu: "2" memory: "4Gi" volumeMounts: - name: workspace-storage mountPath: /workspace env: - name: FDB_CLUSTER_FILE value: /secrets/fdb.cluster - name: WORKSPACE_ID valueFrom: fieldRef: fieldPath: metadata.name volumeClaimTemplates:

  • metadata: name: workspace-storage spec: accessModes: [ "ReadWriteOnce" ] storageClassName: "standard-rwo" resources: requests: storage: 10Gi

Service for workspace pods

apiVersion: v1 kind: Service metadata: name: workspace-service namespace: coditect spec: clusterIP: None # Headless service for StatefulSet selector: app: workspace ports:

  • port: 8080 targetPort: 8080 name: websocket

↑ Back to Top


Migration Steps​

Phase 1: PTY Fix Deployment​

#!/bin/bash

deploy-pty-fix.sh

1. Run tests locally

cargo test terminal_bridge

2. Build and push image

docker build -t gcr.io/$PROJECT_ID/websocket-gateway:pty-fix . docker push gcr.io/$PROJECT_ID/websocket-gateway:pty-fix

3. Deploy to staging

gcloud run deploy websocket-gateway-staging
--image=gcr.io/$PROJECT_ID/websocket-gateway:pty-fix
--region=us-central1
--tag=pty-fix

4. Test terminal input

./test-terminal-input.sh staging

5. Deploy to production with traffic split

gcloud run deploy websocket-gateway
--image=gcr.io/$PROJECT_ID/websocket-gateway:pty-fix
--region=us-central1
--tag=pty-fix
--traffic=pty-fix=10,LATEST=90

6. Monitor for errors

gcloud logging read "resource.type=cloud_run_revision
AND resource.labels.service_name=websocket-gateway
AND severity>=ERROR" --limit=50

7. Full rollout if stable

gcloud run services update-traffic websocket-gateway
--to-latest
--region=us-central1

Phase 2: Deploy StatefulSet workspaces​

#!/bin/bash

deploy-statefulset-workspaces.sh

1. Create GKE Autopilot cluster

gcloud container clusters create-auto coditect-cluster
--region=us-central1
--project=$PROJECT_ID

2. Create namespace

kubectl create namespace coditect

3. Create FoundationDB secret

kubectl create secret generic fdb-cluster
--from-file=fdb.cluster
--namespace=coditect

4. Deploy StatefulSet

kubectl apply -f statefulset-workspace.yaml

5. Verify deployment

kubectl get statefulsets -n coditect kubectl get pvc -n coditect --max-instances=100
--session-affinity

3. Update DNS to split traffic

Point 10% traffic to Cloud Run endpoint for testing

4. Monitor metrics

gcloud monitoring dashboards create
--config-from-file=cloud-run-dashboard.yaml

5. Gradually increase traffic

for pct in 25 50 75 100; do echo "Shifting $pct% traffic to Cloud Run"

Update load balancer weights

sleep 3600 # Monitor for 1 hour done

6. Shutdown GKE cluster

gcloud container clusters delete websocket-gateway-cluster
--region=us-central1

↑ Back to Top


Previous Part​

Previous: See Part 1: Human Narrative for business context and value proposition.

↑ Back to Top