ADR-001-v4: Container Execution Architecture - Part 2: Technical
Document Specification Block​
Document: ADR-001-v4-container-execution-architecture-part2-technical
Version: 3.0.0
Purpose: Technical implementation details for CODITECT's container execution architecture
Audience: Developers, AI agents, technical implementers
Date Created: 2025-08-27
Date Modified: 2025-09-03
Status: UPDATED_FOR_STATEFULSETS
Changes: Replaced ephemeral containers with GKE StatefulSets
Table of Contents​
- Implementation Overview
- Architecture Components
- WebSocket Gateway Implementation
- terminal Bridge with PTY Fix
- FoundationDB Schema
- Testing Requirements
- Deployment Configuration
- Migration Steps
- Previous Part
Implementation Overview​
System Architecture​
Component Dependencies​
[dependencies]
Existing dependencies from cargo.toml
actix-web = "4.4" tokio = { version = "1.36", features = ["full"] } foundationdb = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" uuid = { version = "1.7", features = ["v4", "serde"] } jsonwebtoken = "9.2" dashmap = "5.5" anyhow = "1.0"
terminal handling
nix = { version = "0.27", features = ["term", "ioctl"] } portable-pty = "0.8" bytes = "1.5"
WebSocket support
actix-ws = "0.2" actix-web-actors = "4.3"
Kubernetes client for StatefulSet management
k8s-openapi = { version = "0.20", features = ["v1_28"] } kube = { version = "0.87", features = ["client", "runtime", "ws"] }
Architecture Components​
workspace Manager for StatefulSets​
// src/workspace/statefulset_manager.rs use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec}; use k8s_openapi::api::core::v1::{PersistentVolumeClaim, PersistentVolumeClaimSpec, ResourceRequirements}; use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use kube::{Api, Client, ResourceExt}; use std::collections::BTreeMap;
pub struct workspaceManager { k8s_client: Client, namespace: String, }
impl workspaceManager {
pub async fn new() -> Result
pub async fn create_workspace(
&self,
workspace_id: &str,
user_id: &str,
) -> Result<workspaceInfo> {
let statefulset_api: Api<StatefulSet> = Api::namespaced(
self.k8s_client.clone(),
&self.namespace,
);
let statefulset = self.build_statefulset(workspace_id, user_id);
let created = statefulset_api.create(&Default::default(), &statefulset).await?;
Ok(workspaceInfo {
workspace_id: workspace_id.to_string(),
pod_name: format!("{}-0", workspace_id),
status: "Creating".to_string(),
})
}
fn build_statefulset(&self, workspace_id: &str, user_id: &str) -> StatefulSet {
let mut labels = BTreeMap::new();
labels.insert("app".to_string(), "workspace".to_string());
labels.insert("workspace-id".to_string(), workspace_id.to_string());
labels.insert("user-id".to_string(), user_id.to_string());
serde_json::from_value(json!({
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"metadata": {
"name": workspace_id,
"namespace": self.namespace,
"labels": labels,
},
"spec": {
"serviceName": "workspace-service",
"replicas": 1,
"selector": {
"matchLabels": {
"workspace-id": workspace_id,
}
},
"template": {
"metadata": {
"labels": labels,
},
"spec": {
"nodeSelector": {
"cloud.google.com/gke-spot": "true",
},
"containers": [{
"name": "workspace",
"image": "gcr.io/PROJECT_ID/workspace:latest",
"ports": [{
"containerPort": 8080,
"name": "websocket",
}],
"resources": {
"requests": {
"cpu": "1",
"memory": "2Gi",
},
"limits": {
"cpu": "2",
"memory": "4Gi",
},
},
"volumeMounts": [{
"name": "workspace-storage",
"mountPath": "/workspace",
}],
"env": [
{
"name": "WORKSPACE_ID",
"value": workspace_id,
},
{
"name": "USER_ID",
"value": user_id,
},
],
}],
},
},
"volumeClaimTemplates": [{
"metadata": {
"name": "workspace-storage",
},
"spec": {
"accessModes": ["ReadWriteOnce"],
"storageClassName": "standard-rwo",
"resources": {
"requests": {
"storage": "10Gi",
},
},
},
}],
},
})).unwrap()
}
}
WebSocket Gateway (Existing)​
// src/gateway/websocket_server.rs use actix_web::{web, HttpRequest, HttpResponse}; use actix_ws::Message; use dashmap::DashMap; use std::sync::Arc;
pub struct WebSocketGateway {
connections: Arc<DashMap<String, Connection>>,
terminal_bridge: Arc
impl WebSocketGateway { pub fn new(jwt_secret: String, db: Database) -> Self { Self { connections: Arc::new(DashMap::new()), terminal_bridge: Arc::new(terminalBridge::new()), file_handler: Arc::new(FileOperationHandler::new()), session_manager: Arc::new(SessionManager::new(db)), jwt_secret, } }
pub async fn websocket_handler(
&self,
req: HttpRequest,
stream: web::Payload,
) -> Result<HttpResponse, actix_web::Error> {
let (response, session, msg_stream) = actix_ws::handle(&req, stream)?;
// Extract JWT from request
let token = extract_jwt_from_request(&req)?;
let claims = validate_jwt(&token, &self.jwt_secret)?;
// Create connection
let connection = Connection {
id: Uuid::new_v4().to_string(),
workspace_id: claims.workspace_id,
user_id: claims.user_id,
session,
};
let conn_id = connection.id.clone();
self.connections.insert(conn_id.clone(), connection);
// Spawn message handler
let gateway = self.clone();
actix_web::rt::spawn(async move {
gateway.handle_connection(conn_id, msg_stream).await;
});
Ok(response)
}
async fn handle_connection(
&self,
conn_id: String,
mut msg_stream: impl Stream<Item = Result<Message, actix_ws::ProtocolError>>,
) {
while let Some(Ok(msg)) = msg_stream.next().await {
match msg {
Message::Text(text) => {
if let Ok(message) = serde_json::from_str::<GatewayMessage>(&text) {
self.route_message(conn_id.clone(), message).await;
}
}
Message::Close(_) => {
self.connections.remove(&conn_id);
break;
}
_ => {}
}
}
}
}
terminal Bridge with PTY Fix​
Fixed PTY Implementation​
// src/gateway/terminal_bridge/pty.rs use portable_pty::{native_pty_system, CommandBuilder, PtySize}; use std::sync::Arc; use tokio::sync::Mutex;
pub struct PtyProcess { master: Box<dyn portable_pty::MasterPty + Send>, child: Arc<Mutex<Box<dyn portable_pty::Child + Send + Sync>>>, }
impl PtyProcess {
pub fn new(cols: u16, rows: u16) -> Result
let pair = pty_system.openpty(pty_size)?;
let mut cmd = CommandBuilder::new("bash");
cmd.env("TERM", "xterm-256color");
let child = pair.slave.spawn_command(cmd)?;
Ok(Self {
master: pair.master,
child: Arc::new(Mutex::new(child)),
})
}
// FIXED: Proper write handling without "File exists" error
pub async fn write_input(&mut self, data: &[u8]) -> Result<()> {
let mut writer = self.master.take_writer()?;
// Use non-blocking write with proper error handling
tokio::task::spawn_blocking(move || {
writer.write_all(data)?;
writer.flush()?;
Ok::<_, anyhow::Error>(())
})
.await??;
Ok(())
}
pub async fn read_output(&mut self) -> Result<Vec<u8>> {
let mut reader = self.master.try_clone_reader()?;
let mut buffer = vec![0u8; 4096];
let n = tokio::task::spawn_blocking(move || {
reader.read(&mut buffer)
})
.await??;
buffer.truncate(n);
Ok(buffer)
}
}
// src/gateway/terminal_bridge/bridge.rs pub struct terminalBridge { sessions: Arc<DashMap<String, terminalSession>>, }
impl terminalBridge {
pub async fn create_session(
&self,
workspace_id: &str,
cols: u16,
rows: u16,
) -> Result
let session = terminalSession {
id: session_id.clone(),
workspace_id: workspace_id.to_string(),
pty: Arc::new(Mutex::new(pty)),
created_at: Utc::now(),
last_activity: Utc::now(),
};
self.sessions.insert(session_id.clone(), session);
Ok(session_id)
}
pub async fn send_input(
&self,
session_id: &str,
data: &[u8],
) -> Result<()> {
let session = self.sessions.get(session_id)
.ok_or_else(|| anyhow!("Session not found"))?;
let mut pty = session.pty.lock().await;
pty.write_input(data).await?;
// Update last activity
drop(pty);
self.update_activity(session_id);
Ok(())
}
}
FoundationDB Schema​
// src/models/terminal.rs use serde::{Deserialize, Serialize}; use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct terminalSessionState {
pub id: String,
pub workspace_id: String,
pub started_at: DateTime
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct terminalOutput {
pub session_id: String,
pub lines: VecDeque
// src/db/repositories/terminal_repository.rs use foundationdb::{Database, Transaction};
pub struct terminalRepository { db: Database, }
impl terminalRepository { // Key patterns for multi-tenant isolation fn session_key(workspace_id: &str, session_id: &str) -> String { format!("{}/terminal_sessions/{}", workspace_id, session_id) }
fn output_key(workspace_id: &str, session_id: &str) -> String {
format!("{}/terminal_output/{}", workspace_id, session_id)
}
pub async fn save_session_state(
&self,
state: &terminalSessionState,
) -> Result<()> {
let tr = self.db.create_trx()?;
let key = Self::session_key(&state.workspace_id, &state.id);
let value = serde_json::to_vec(state)?;
tr.set(key.as_bytes(), &value);
tr.commit().await?;
Ok(())
}
pub async fn get_session_state(
&self,
workspace_id: &str,
session_id: &str,
) -> Result<Option<terminalSessionState>> {
let tr = self.db.create_trx()?;
let key = Self::session_key(workspace_id, session_id);
match tr.get(key.as_bytes(), false).await? {
Some(value) => {
let state = serde_json::from_slice(&value)?;
Ok(Some(state))
}
None => Ok(None),
}
}
}
Testing Requirements​
#[cfg(test)] mod tests { use super::*;
#[tokio::test]
async fn test_pty_write_without_error() {
let mut pty = PtyProcess::new(80, 24).unwrap();
// Test multiple writes
for i in 0..10 {
let input = format!("echo test_{}\n", i);
let result = pty.write_input(input.as_bytes()).await;
assert!(result.is_ok(), "Write {} should succeed", i);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
#[tokio::test]
async fn test_terminal_session_persistence() {
let db = setup_test_db().await;
let repo = terminalRepository::new(db);
let bridge = terminalBridge::new();
let workspace_id = "test-workspace";
let session_id = bridge.create_session(workspace_id, 80, 24).await.unwrap();
// Execute commands
bridge.send_input(&session_id, b"export FOO=bar\n").await.unwrap();
bridge.send_input(&session_id, b"cd /tmp\n").await.unwrap();
// Save state
let state = terminalSessionState {
id: session_id.clone(),
workspace_id: workspace_id.to_string(),
started_at: Utc::now(),
last_active: Utc::now(),
shell_type: "bash".to_string(),
working_directory: "/tmp".to_string(),
environment_vars: [("FOO".to_string(), "bar".to_string())].into(),
command_history: vec!["export FOO=bar".to_string(), "cd /tmp".to_string()],
};
repo.save_session_state(&state).await.unwrap();
// Retrieve and verify
let loaded = repo.get_session_state(workspace_id, &session_id).await.unwrap();
assert!(loaded.is_some());
assert_eq!(loaded.unwrap().environment_vars.get("FOO"), Some(&"bar".to_string()));
}
#[tokio::test]
async fn test_concurrent_terminal_sessions() {
let bridge = terminalBridge::new();
let mut handles = vec![];
// Create 10 concurrent sessions
for i in 0..10 {
let bridge_clone = bridge.clone();
let handle = tokio::spawn(async move {
let workspace_id = format!("workspace-{}", i);
let session_id = bridge_clone.create_session(&workspace_id, 80, 24).await.unwrap();
// Send unique command
let cmd = format!("echo session_{}\n", i);
bridge_clone.send_input(&session_id, cmd.as_bytes()).await.unwrap();
(workspace_id, session_id)
});
handles.push(handle);
}
// Verify all sessions created
let results: Vec<_> = futures::future::join_all(handles).await;
assert_eq!(results.len(), 10);
for (i, result) in results.iter().enumerate() {
let (workspace_id, session_id) = result.as_ref().unwrap();
assert_eq!(workspace_id, &format!("workspace-{}", i));
assert!(bridge.sessions.contains_key(session_id));
}
}
}
Deployment Configuration​
GKE StatefulSet Configuration​
statefulset-workspace.yaml
apiVersion: apps/v1 kind: StatefulSet metadata: name: workspace-statefulset namespace: coditect spec: serviceName: workspace-service replicas: 1 selector: matchLabels: app: workspace template: metadata: labels: app: workspace spec: nodeSelector: cloud.google.com/gke-spot: "true" # Use Spot instances for cost savings containers: - name: workspace image: gcr.io/PROJECT_ID/workspace:latest ports: - containerPort: 8080 name: websocket resources: requests: cpu: "1" memory: "2Gi" limits: cpu: "2" memory: "4Gi" volumeMounts: - name: workspace-storage mountPath: /workspace env: - name: FDB_CLUSTER_FILE value: /secrets/fdb.cluster - name: WORKSPACE_ID valueFrom: fieldRef: fieldPath: metadata.name volumeClaimTemplates:
- metadata: name: workspace-storage spec: accessModes: [ "ReadWriteOnce" ] storageClassName: "standard-rwo" resources: requests: storage: 10Gi
Service for workspace pods
apiVersion: v1 kind: Service metadata: name: workspace-service namespace: coditect spec: clusterIP: None # Headless service for StatefulSet selector: app: workspace ports:
- port: 8080 targetPort: 8080 name: websocket
Migration Steps​
Phase 1: PTY Fix Deployment​
#!/bin/bash
deploy-pty-fix.sh
1. Run tests locally
cargo test terminal_bridge
2. Build and push image
docker build -t gcr.io/$PROJECT_ID/websocket-gateway:pty-fix . docker push gcr.io/$PROJECT_ID/websocket-gateway:pty-fix
3. Deploy to staging
gcloud run deploy websocket-gateway-staging
--image=gcr.io/$PROJECT_ID/websocket-gateway:pty-fix
--region=us-central1
--tag=pty-fix
4. Test terminal input
./test-terminal-input.sh staging
5. Deploy to production with traffic split
gcloud run deploy websocket-gateway
--image=gcr.io/$PROJECT_ID/websocket-gateway:pty-fix
--region=us-central1
--tag=pty-fix
--traffic=pty-fix=10,LATEST=90
6. Monitor for errors
gcloud logging read "resource.type=cloud_run_revision
AND resource.labels.service_name=websocket-gateway
AND severity>=ERROR" --limit=50
7. Full rollout if stable
gcloud run services update-traffic websocket-gateway
--to-latest
--region=us-central1
Phase 2: Deploy StatefulSet workspaces​
#!/bin/bash
deploy-statefulset-workspaces.sh
1. Create GKE Autopilot cluster
gcloud container clusters create-auto coditect-cluster
--region=us-central1
--project=$PROJECT_ID
2. Create namespace
kubectl create namespace coditect
3. Create FoundationDB secret
kubectl create secret generic fdb-cluster
--from-file=fdb.cluster
--namespace=coditect
4. Deploy StatefulSet
kubectl apply -f statefulset-workspace.yaml
5. Verify deployment
kubectl get statefulsets -n coditect
kubectl get pvc -n coditect
--max-instances=100
--session-affinity
3. Update DNS to split traffic
Point 10% traffic to Cloud Run endpoint for testing
4. Monitor metrics
gcloud monitoring dashboards create
--config-from-file=cloud-run-dashboard.yaml
5. Gradually increase traffic
for pct in 25 50 75 100; do echo "Shifting $pct% traffic to Cloud Run"
Update load balancer weights
sleep 3600 # Monitor for 1 hour done
6. Shutdown GKE cluster
gcloud container clusters delete websocket-gateway-cluster
--region=us-central1
Previous Part​
Previous: See Part 1: Human Narrative for business context and value proposition.