Skip to main content

ADR-015-v4: Team Collaboration - Part 2 (Technical)

Document Specification Block​

Document: ADR-015-v4-team-collaboration-part2-technical
Version: 1.0.0
Purpose: Constrain AI implementation with exact technical specifications for team collaboration
Audience: AI agents, developers implementing the system
Date Created: 2025-09-01
Date Modified: 2025-09-01
QA Review Date: Pending
Status: DRAFT

Table of Contents​

  1. Constraints
  2. Dependencies
  3. Component Architecture
  4. Data Models
  5. Implementation Patterns
  6. API Specifications
  7. Testing Requirements
  8. Performance Benchmarks
  9. Security Controls
  10. Logging and Error Handling
  11. References
  12. Approval Signatures

1. Constraints​

CONSTRAINT: Sub-100ms Synchronization​

All collaborative edits MUST be synchronized to all active participants within 100ms.

CONSTRAINT: Conflict-Free Resolution​

Simultaneous edits MUST be resolved automatically without data loss using CRDTs.

CONSTRAINT: Tenant Isolation​

Collaboration sessions MUST be completely isolated between tenants. No cross-tenant visibility.

CONSTRAINT: Scale Requirements​

System MUST support 100+ concurrent collaborators per workspace without degradation.

CONSTRAINT: Audit Compliance​

Every collaborative action MUST be logged with full attribution for compliance.

↑ Back to Top

2. Dependencies​

cargo.toml Dependencies​

[dependencies]
# Core async and web
tokio = { version = "1.35", features = ["full"] }
actix-web = "4.4"
actix-web-actors = "4.2"

# WebSocket and real-time
tokio-tungstenite = "0.20"
futures-util = "0.3"

# CRDT implementation
crdts = "7.3"
operational-transform = "0.6"

# Database
foundationdb = { version = "0.8", features = ["embedded-fdb-include"] }
redis = { version = "0.23", features = ["tokio-comp", "streams"] }

# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"

# Utilities
uuid = { version = "1.6", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
dashmap = "5.5"
tracing = "0.1"

↑ Back to Top

3. Component Architecture​

// File: src/collaboration/service.rs
use std::sync::Arc;
use anyhow::Result;

pub struct CollaborationService {
websocket_server: Arc<WebSocketServer>,
sync_engine: Arc<SyncEngine>,
presence_tracker: Arc<PresenceTracker>,
cursor_broadcaster: Arc<CursorBroadcaster>,
persistence_layer: Arc<PersistenceLayer>,
audit_logger: Arc<AuditLogger>,
}

// File: src/collaboration/service.rs
impl CollaborationService {
pub async fn join_workspace(
&self,
workspace_id: Uuid,
user: AuthenticatedUser,
connection: WebSocketConnection,
) -> Result<CollaborationSession> {
// 1. Validate access
self.validate_workspace_access(&workspace_id, &user).await?;

// 2. Create session
let session = CollaborationSession {
id: Uuid::new_v4(),
workspace_id,
user_id: user.id,
connection_id: connection.id(),
joined_at: Utc::now(),
};

// 3. Initialize CRDT state
let document_state = self.sync_engine
.get_or_create_document(&workspace_id)
.await?;

// 4. Register presence
self.presence_tracker
.register_user(&workspace_id, &user, &session)
.await?;

// 5. Setup WebSocket handlers
self.websocket_server
.register_connection(connection, session.id)
.await?;

// 6. Broadcast join event
self.broadcast_presence_update(&workspace_id).await?;

// 7. Audit log
self.audit_logger.log_collaboration_event(
"workspace_join",
&workspace_id,
&user.id,
json!({ "session_id": session.id })
).await?;

Ok(session)
}
}

↑ Back to Top

4. Data Models​

Collaboration Models​

// File: src/models/collaboration.rs
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollaborationSession {
pub id: Uuid,
pub workspace_id: Uuid,
pub user_id: Uuid,
pub connection_id: String,
pub joined_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
pub cursor_position: Option<CursorPosition>,
pub selection: Option<TextSelection>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CursorPosition {
pub file_path: String,
pub line: usize,
pub column: usize,
pub viewport: ViewportInfo,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollaborativeEdit {
pub id: Uuid,
pub session_id: Uuid,
pub timestamp: DateTime<Utc>,
pub operation: CRDTOperation,
pub file_path: String,
pub vector_clock: VectorClock,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CRDTOperation {
Insert { position: usize, content: String, id: OperationId },
Delete { position: usize, length: usize, id: OperationId },
Format { start: usize, end: usize, attributes: HashMap<String, Value> },
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PresenceInfo {
pub user_id: Uuid,
pub display_name: String,
pub avatar_url: Option<String>,
pub color: String,
pub status: PresenceStatus,
pub cursor: Option<CursorPosition>,
pub selection: Option<TextSelection>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PresenceStatus {
Active,
Idle,
Typing,
Selecting,
Debugging,
}

↑ Back to Top

5. Implementation Patterns​

CRDT Synchronization​

// File: src/collaboration/sync_engine.rs
use crdts::{CmRDT, CvRDT, Dot, VClock};

pub struct SyncEngine {
documents: DashMap<Uuid, DocumentCRDT>,
db: Arc<Database>,
}

impl SyncEngine {
pub async fn apply_operation(
&self,
workspace_id: &Uuid,
operation: CollaborativeEdit,
) -> Result<Vec<CollaborativeEdit>> {
let mut document = self.documents
.get_mut(workspace_id)
.ok_or(SyncError::DocumentNotFound)?;

// Apply CRDT operation
let (merged_ops, conflicts) = document.merge_operation(operation)?;

// Persist to FoundationDB
self.persist_operations(workspace_id, &merged_ops).await?;

// Return operations to broadcast
Ok(merged_ops)
}

async fn persist_operations(
&self,
workspace_id: &Uuid,
operations: &[CollaborativeEdit],
) -> Result<()> {
let tx = self.db.create_trx()?;

for op in operations {
let key = format!(
"{}/collab/ops/{}",
workspace_id,
op.timestamp.timestamp_nanos()
);
tx.set(&key, &bincode::serialize(op)?);
}

tx.commit().await?;
Ok(())
}
}

WebSocket Message Handler​

// File: src/collaboration/websocket.rs
use actix::{Actor, StreamHandler};
use actix_web_actors::ws;

pub struct CollaborationWebSocket {
session_id: Uuid,
workspace_id: Uuid,
sync_engine: Arc<SyncEngine>,
broadcast_tx: broadcast::Sender<BroadcastMessage>,
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CollaborationWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Text(text)) => {
if let Ok(message) = serde_json::from_str::<ClientMessage>(&text) {
self.handle_client_message(message, ctx);
}
}
Ok(ws::Message::Binary(bin)) => {
if let Ok(message) = bincode::deserialize::<ClientMessage>(&bin) {
self.handle_client_message(message, ctx);
}
}
_ => {}
}
}
}

impl CollaborationWebSocket {
fn handle_client_message(&mut self, msg: ClientMessage, ctx: &mut Context<Self>) {
match msg {
ClientMessage::Edit(edit) => {
let sync_engine = self.sync_engine.clone();
let workspace_id = self.workspace_id;
let broadcast_tx = self.broadcast_tx.clone();

let fut = async move {
match sync_engine.apply_operation(&workspace_id, edit).await {
Ok(ops) => {
// Broadcast to all participants
let _ = broadcast_tx.send(BroadcastMessage::Operations(ops));
}
Err(e) => {
tracing::error!("Sync error: {}", e);
}
}
};

ctx.spawn(fut.into_actor(self));
}
ClientMessage::Cursor(position) => {
let _ = self.broadcast_tx.send(
BroadcastMessage::CursorUpdate {
session_id: self.session_id,
position,
}
);
}
ClientMessage::Presence(status) => {
let _ = self.broadcast_tx.send(
BroadcastMessage::PresenceUpdate {
session_id: self.session_id,
status,
}
);
}
}
}
}

↑ Back to Top

6. API Specifications​

Collaboration Endpoints​

// File: src/api/handlers/collaboration.rs
use actix_web::{web, HttpRequest, HttpResponse, Result};
use actix_web_actors::ws;

#[get("/api/v1/workspaces/{workspace_id}/collaborate")]
pub async fn websocket_collaborate(
req: HttpRequest,
path: web::Path<Uuid>,
stream: web::Payload,
collab_service: web::Data<Arc<CollaborationService>>,
claims: Claims,
) -> Result<HttpResponse> {
let workspace_id = path.into_inner();

// Validate access
if !collab_service.can_access_workspace(&workspace_id, &claims).await? {
return Ok(HttpResponse::Forbidden().json(json!({
"error": "workspace_access_denied"
})));
}

// Create WebSocket actor
let websocket = CollaborationWebSocket::new(
workspace_id,
claims.user_id,
collab_service.into_inner(),
);

// Start WebSocket connection
ws::start(websocket, &req, stream)
}

#[get("/api/v1/workspaces/{workspace_id}/presence")]
pub async fn get_active_collaborators(
path: web::Path<Uuid>,
presence_tracker: web::Data<Arc<PresenceTracker>>,
claims: Claims,
) -> Result<HttpResponse> {
let workspace_id = path.into_inner();

let collaborators = presence_tracker
.get_active_users(&workspace_id)
.await?;

Ok(HttpResponse::Ok().json(collaborators))
}

↑ Back to Top

7. Testing Requirements​

Test Coverage Requirements​

  • Unit Test Coverage: ≥95% of CRDT algorithms
  • Integration Test Coverage: ≥90% of sync protocols
  • E2E Test Coverage: Multi-user collaboration scenarios
  • Performance Test Coverage: 100+ concurrent users
  • Chaos Test Coverage: Network partitions, message reordering

Integration Tests​

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_concurrent_editing() {
let service = setup_test_service().await;

// Create two sessions
let session1 = service.join_workspace(workspace_id, user1).await.unwrap();
let session2 = service.join_workspace(workspace_id, user2).await.unwrap();

// Simultaneous edits at same position
let edit1 = CollaborativeEdit {
operation: CRDTOperation::Insert {
position: 10,
content: "Hello".to_string(),
id: OperationId::new(),
},
..Default::default()
};

let edit2 = CollaborativeEdit {
operation: CRDTOperation::Insert {
position: 10,
content: "World".to_string(),
id: OperationId::new(),
},
..Default::default()
};

// Apply both edits
service.apply_edit(&session1, edit1).await.unwrap();
service.apply_edit(&session2, edit2).await.unwrap();

// Verify both texts are present (order determined by CRDT)
let document = service.get_document(&workspace_id).await.unwrap();
let text = document.to_string();
assert!(text.contains("Hello"));
assert!(text.contains("World"));
}

#[tokio::test]
async fn test_presence_tracking() {
let service = setup_test_service().await;

// User joins
let session = service.join_workspace(workspace_id, user).await.unwrap();

// Check presence
let presence = service.get_presence(&workspace_id).await.unwrap();
assert_eq!(presence.len(), 1);
assert_eq!(presence[0].user_id, user.id);

// User leaves
service.leave_workspace(&session).await.unwrap();

// Check presence updated
let presence = service.get_presence(&workspace_id).await.unwrap();
assert_eq!(presence.len(), 0);
}
}

↑ Back to Top

8. Performance Benchmarks​

Required Metrics​

const SYNC_LATENCY_TARGET_MS: u64 = 100;
const CURSOR_UPDATE_TARGET_MS: u64 = 50;
const MAX_CONCURRENT_USERS: usize = 100;

pub struct PerformanceValidator {
pub async fn validate_sync_performance(&self) -> Result<ValidationReport> {
let mut report = ValidationReport::default();

// Test sync latency
let start = Instant::now();
self.apply_edit(test_edit).await?;
let latency = start.elapsed().as_millis() as u64;

report.sync_latency_ok = latency <= SYNC_LATENCY_TARGET_MS;
report.sync_latency_ms = latency;

// Test concurrent users
let handles: Vec<_> = (0..MAX_CONCURRENT_USERS)
.map(|i| {
let service = self.service.clone();
tokio::spawn(async move {
service.join_workspace(workspace_id, user).await
})
})
.collect();

let results = futures::future::join_all(handles).await;
report.concurrent_users_ok = results.iter().all(|r| r.is_ok());

Ok(report)
}
}

↑ Back to Top

9. Security Controls​

Collaboration Security​

// File: src/collaboration/security.rs
pub struct CollaborationSecurity {
pub async fn validate_workspace_access(
&self,
workspace_id: &Uuid,
user: &AuthenticatedUser,
) -> Result<()> {
// Check tenant isolation
let workspace = self.get_workspace(workspace_id).await?;
if workspace.tenant_id != user.tenant_id {
return Err(SecurityError::TenantViolation);
}

// Check workspace permissions
if !self.has_permission(user, workspace_id, "workspace:collaborate").await? {
return Err(SecurityError::InsufficientPermissions);
}

// Validate session limits
let active_sessions = self.count_user_sessions(user.id).await?;
if active_sessions >= MAX_SESSIONS_PER_USER {
return Err(SecurityError::SessionLimitExceeded);
}

Ok(())
}
}

↑ Back to Top

10. Logging and Error Handling​

Collaboration Event Logging​

// File: src/collaboration/logging.rs
use tracing::{info, warn, error, instrument};

#[instrument(skip(self, edit))]
pub async fn log_collaborative_edit(&self, edit: &CollaborativeEdit) {
info!(
workspace_id = %edit.workspace_id,
session_id = %edit.session_id,
operation_type = ?edit.operation.op_type(),
vector_clock = ?edit.vector_clock,
"Collaborative edit applied"
);
}

pub async fn log_sync_metrics(&self, metrics: &SyncMetrics) {
info!(
workspace_id = %metrics.workspace_id,
sync_latency_ms = metrics.sync_latency_ms,
operations_synced = metrics.operations_synced,
conflicts_resolved = metrics.conflicts_resolved,
active_users = metrics.active_users,
"Collaboration sync metrics"
);
}

Error Handling​

#[derive(thiserror::Error, Debug)]
pub enum CollaborationError {
#[error("workspace not found: {0}")]
workspaceNotFound(Uuid),

#[error("Sync conflict could not be resolved automatically")]
UnresolvableConflict,

#[error("Connection lost to {0} collaborators")]
ConnectionLost(usize),

#[error("Collaboration session limit exceeded")]
SessionLimitExceeded,

#[error("Invalid CRDT operation: {0}")]
InvalidOperation(String),
}

↑ Back to Top

11. References​

Version Compatibility​

  • CRDT Library: 7.3+ for proven algorithms
  • WebSocket: tokio-tungstenite 0.20+ for performance
  • FoundationDB: 7.1+ for high-frequency writes

↑ Back to Top

12. Approval Signatures​

Technical Sign-off​

ComponentOwnerApprovedDate
ArchitectureSession5✓2025-09-01
ImplementationPending--
Security ReviewPending--
Performance TestPending--

Implementation Checklist​

  • WebSocket server implemented
  • CRDT sync engine functional
  • Presence tracking system
  • Cursor broadcasting
  • Conflict resolution tested
  • Multi-tenant isolation verified
  • API endpoints complete
  • Performance benchmarks met
  • Security controls validated

↑ Back to Top