ADR-015-v4: Team Collaboration - Part 2 (Technical)
Document Specification Block​
Document: ADR-015-v4-team-collaboration-part2-technical
Version: 1.0.0
Purpose: Constrain AI implementation with exact technical specifications for team collaboration
Audience: AI agents, developers implementing the system
Date Created: 2025-09-01
Date Modified: 2025-09-01
QA Review Date: Pending
Status: DRAFT
Table of Contents​
- Constraints
- Dependencies
- Component Architecture
- Data Models
- Implementation Patterns
- API Specifications
- Testing Requirements
- Performance Benchmarks
- Security Controls
- Logging and Error Handling
- References
- Approval Signatures
1. Constraints​
CONSTRAINT: Sub-100ms Synchronization​
All collaborative edits MUST be synchronized to all active participants within 100ms.
CONSTRAINT: Conflict-Free Resolution​
Simultaneous edits MUST be resolved automatically without data loss using CRDTs.
CONSTRAINT: Tenant Isolation​
Collaboration sessions MUST be completely isolated between tenants. No cross-tenant visibility.
CONSTRAINT: Scale Requirements​
System MUST support 100+ concurrent collaborators per workspace without degradation.
CONSTRAINT: Audit Compliance​
Every collaborative action MUST be logged with full attribution for compliance.
2. Dependencies​
cargo.toml Dependencies​
[dependencies]
# Core async and web
tokio = { version = "1.35", features = ["full"] }
actix-web = "4.4"
actix-web-actors = "4.2"
# WebSocket and real-time
tokio-tungstenite = "0.20"
futures-util = "0.3"
# CRDT implementation
crdts = "7.3"
operational-transform = "0.6"
# Database
foundationdb = { version = "0.8", features = ["embedded-fdb-include"] }
redis = { version = "0.23", features = ["tokio-comp", "streams"] }
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"
# Utilities
uuid = { version = "1.6", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
dashmap = "5.5"
tracing = "0.1"
3. Component Architecture​
// File: src/collaboration/service.rs
use std::sync::Arc;
use anyhow::Result;
pub struct CollaborationService {
websocket_server: Arc<WebSocketServer>,
sync_engine: Arc<SyncEngine>,
presence_tracker: Arc<PresenceTracker>,
cursor_broadcaster: Arc<CursorBroadcaster>,
persistence_layer: Arc<PersistenceLayer>,
audit_logger: Arc<AuditLogger>,
}
// File: src/collaboration/service.rs
impl CollaborationService {
pub async fn join_workspace(
&self,
workspace_id: Uuid,
user: AuthenticatedUser,
connection: WebSocketConnection,
) -> Result<CollaborationSession> {
// 1. Validate access
self.validate_workspace_access(&workspace_id, &user).await?;
// 2. Create session
let session = CollaborationSession {
id: Uuid::new_v4(),
workspace_id,
user_id: user.id,
connection_id: connection.id(),
joined_at: Utc::now(),
};
// 3. Initialize CRDT state
let document_state = self.sync_engine
.get_or_create_document(&workspace_id)
.await?;
// 4. Register presence
self.presence_tracker
.register_user(&workspace_id, &user, &session)
.await?;
// 5. Setup WebSocket handlers
self.websocket_server
.register_connection(connection, session.id)
.await?;
// 6. Broadcast join event
self.broadcast_presence_update(&workspace_id).await?;
// 7. Audit log
self.audit_logger.log_collaboration_event(
"workspace_join",
&workspace_id,
&user.id,
json!({ "session_id": session.id })
).await?;
Ok(session)
}
}
4. Data Models​
Collaboration Models​
// File: src/models/collaboration.rs
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollaborationSession {
pub id: Uuid,
pub workspace_id: Uuid,
pub user_id: Uuid,
pub connection_id: String,
pub joined_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
pub cursor_position: Option<CursorPosition>,
pub selection: Option<TextSelection>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CursorPosition {
pub file_path: String,
pub line: usize,
pub column: usize,
pub viewport: ViewportInfo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollaborativeEdit {
pub id: Uuid,
pub session_id: Uuid,
pub timestamp: DateTime<Utc>,
pub operation: CRDTOperation,
pub file_path: String,
pub vector_clock: VectorClock,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CRDTOperation {
Insert { position: usize, content: String, id: OperationId },
Delete { position: usize, length: usize, id: OperationId },
Format { start: usize, end: usize, attributes: HashMap<String, Value> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PresenceInfo {
pub user_id: Uuid,
pub display_name: String,
pub avatar_url: Option<String>,
pub color: String,
pub status: PresenceStatus,
pub cursor: Option<CursorPosition>,
pub selection: Option<TextSelection>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PresenceStatus {
Active,
Idle,
Typing,
Selecting,
Debugging,
}
5. Implementation Patterns​
CRDT Synchronization​
// File: src/collaboration/sync_engine.rs
use crdts::{CmRDT, CvRDT, Dot, VClock};
pub struct SyncEngine {
documents: DashMap<Uuid, DocumentCRDT>,
db: Arc<Database>,
}
impl SyncEngine {
pub async fn apply_operation(
&self,
workspace_id: &Uuid,
operation: CollaborativeEdit,
) -> Result<Vec<CollaborativeEdit>> {
let mut document = self.documents
.get_mut(workspace_id)
.ok_or(SyncError::DocumentNotFound)?;
// Apply CRDT operation
let (merged_ops, conflicts) = document.merge_operation(operation)?;
// Persist to FoundationDB
self.persist_operations(workspace_id, &merged_ops).await?;
// Return operations to broadcast
Ok(merged_ops)
}
async fn persist_operations(
&self,
workspace_id: &Uuid,
operations: &[CollaborativeEdit],
) -> Result<()> {
let tx = self.db.create_trx()?;
for op in operations {
let key = format!(
"{}/collab/ops/{}",
workspace_id,
op.timestamp.timestamp_nanos()
);
tx.set(&key, &bincode::serialize(op)?);
}
tx.commit().await?;
Ok(())
}
}
WebSocket Message Handler​
// File: src/collaboration/websocket.rs
use actix::{Actor, StreamHandler};
use actix_web_actors::ws;
pub struct CollaborationWebSocket {
session_id: Uuid,
workspace_id: Uuid,
sync_engine: Arc<SyncEngine>,
broadcast_tx: broadcast::Sender<BroadcastMessage>,
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CollaborationWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Text(text)) => {
if let Ok(message) = serde_json::from_str::<ClientMessage>(&text) {
self.handle_client_message(message, ctx);
}
}
Ok(ws::Message::Binary(bin)) => {
if let Ok(message) = bincode::deserialize::<ClientMessage>(&bin) {
self.handle_client_message(message, ctx);
}
}
_ => {}
}
}
}
impl CollaborationWebSocket {
fn handle_client_message(&mut self, msg: ClientMessage, ctx: &mut Context<Self>) {
match msg {
ClientMessage::Edit(edit) => {
let sync_engine = self.sync_engine.clone();
let workspace_id = self.workspace_id;
let broadcast_tx = self.broadcast_tx.clone();
let fut = async move {
match sync_engine.apply_operation(&workspace_id, edit).await {
Ok(ops) => {
// Broadcast to all participants
let _ = broadcast_tx.send(BroadcastMessage::Operations(ops));
}
Err(e) => {
tracing::error!("Sync error: {}", e);
}
}
};
ctx.spawn(fut.into_actor(self));
}
ClientMessage::Cursor(position) => {
let _ = self.broadcast_tx.send(
BroadcastMessage::CursorUpdate {
session_id: self.session_id,
position,
}
);
}
ClientMessage::Presence(status) => {
let _ = self.broadcast_tx.send(
BroadcastMessage::PresenceUpdate {
session_id: self.session_id,
status,
}
);
}
}
}
}
6. API Specifications​
Collaboration Endpoints​
// File: src/api/handlers/collaboration.rs
use actix_web::{web, HttpRequest, HttpResponse, Result};
use actix_web_actors::ws;
#[get("/api/v1/workspaces/{workspace_id}/collaborate")]
pub async fn websocket_collaborate(
req: HttpRequest,
path: web::Path<Uuid>,
stream: web::Payload,
collab_service: web::Data<Arc<CollaborationService>>,
claims: Claims,
) -> Result<HttpResponse> {
let workspace_id = path.into_inner();
// Validate access
if !collab_service.can_access_workspace(&workspace_id, &claims).await? {
return Ok(HttpResponse::Forbidden().json(json!({
"error": "workspace_access_denied"
})));
}
// Create WebSocket actor
let websocket = CollaborationWebSocket::new(
workspace_id,
claims.user_id,
collab_service.into_inner(),
);
// Start WebSocket connection
ws::start(websocket, &req, stream)
}
#[get("/api/v1/workspaces/{workspace_id}/presence")]
pub async fn get_active_collaborators(
path: web::Path<Uuid>,
presence_tracker: web::Data<Arc<PresenceTracker>>,
claims: Claims,
) -> Result<HttpResponse> {
let workspace_id = path.into_inner();
let collaborators = presence_tracker
.get_active_users(&workspace_id)
.await?;
Ok(HttpResponse::Ok().json(collaborators))
}
7. Testing Requirements​
Test Coverage Requirements​
- Unit Test Coverage: ≥95% of CRDT algorithms
- Integration Test Coverage: ≥90% of sync protocols
- E2E Test Coverage: Multi-user collaboration scenarios
- Performance Test Coverage: 100+ concurrent users
- Chaos Test Coverage: Network partitions, message reordering
Integration Tests​
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_concurrent_editing() {
let service = setup_test_service().await;
// Create two sessions
let session1 = service.join_workspace(workspace_id, user1).await.unwrap();
let session2 = service.join_workspace(workspace_id, user2).await.unwrap();
// Simultaneous edits at same position
let edit1 = CollaborativeEdit {
operation: CRDTOperation::Insert {
position: 10,
content: "Hello".to_string(),
id: OperationId::new(),
},
..Default::default()
};
let edit2 = CollaborativeEdit {
operation: CRDTOperation::Insert {
position: 10,
content: "World".to_string(),
id: OperationId::new(),
},
..Default::default()
};
// Apply both edits
service.apply_edit(&session1, edit1).await.unwrap();
service.apply_edit(&session2, edit2).await.unwrap();
// Verify both texts are present (order determined by CRDT)
let document = service.get_document(&workspace_id).await.unwrap();
let text = document.to_string();
assert!(text.contains("Hello"));
assert!(text.contains("World"));
}
#[tokio::test]
async fn test_presence_tracking() {
let service = setup_test_service().await;
// User joins
let session = service.join_workspace(workspace_id, user).await.unwrap();
// Check presence
let presence = service.get_presence(&workspace_id).await.unwrap();
assert_eq!(presence.len(), 1);
assert_eq!(presence[0].user_id, user.id);
// User leaves
service.leave_workspace(&session).await.unwrap();
// Check presence updated
let presence = service.get_presence(&workspace_id).await.unwrap();
assert_eq!(presence.len(), 0);
}
}
8. Performance Benchmarks​
Required Metrics​
const SYNC_LATENCY_TARGET_MS: u64 = 100;
const CURSOR_UPDATE_TARGET_MS: u64 = 50;
const MAX_CONCURRENT_USERS: usize = 100;
pub struct PerformanceValidator {
pub async fn validate_sync_performance(&self) -> Result<ValidationReport> {
let mut report = ValidationReport::default();
// Test sync latency
let start = Instant::now();
self.apply_edit(test_edit).await?;
let latency = start.elapsed().as_millis() as u64;
report.sync_latency_ok = latency <= SYNC_LATENCY_TARGET_MS;
report.sync_latency_ms = latency;
// Test concurrent users
let handles: Vec<_> = (0..MAX_CONCURRENT_USERS)
.map(|i| {
let service = self.service.clone();
tokio::spawn(async move {
service.join_workspace(workspace_id, user).await
})
})
.collect();
let results = futures::future::join_all(handles).await;
report.concurrent_users_ok = results.iter().all(|r| r.is_ok());
Ok(report)
}
}
9. Security Controls​
Collaboration Security​
// File: src/collaboration/security.rs
pub struct CollaborationSecurity {
pub async fn validate_workspace_access(
&self,
workspace_id: &Uuid,
user: &AuthenticatedUser,
) -> Result<()> {
// Check tenant isolation
let workspace = self.get_workspace(workspace_id).await?;
if workspace.tenant_id != user.tenant_id {
return Err(SecurityError::TenantViolation);
}
// Check workspace permissions
if !self.has_permission(user, workspace_id, "workspace:collaborate").await? {
return Err(SecurityError::InsufficientPermissions);
}
// Validate session limits
let active_sessions = self.count_user_sessions(user.id).await?;
if active_sessions >= MAX_SESSIONS_PER_USER {
return Err(SecurityError::SessionLimitExceeded);
}
Ok(())
}
}
10. Logging and Error Handling​
Collaboration Event Logging​
// File: src/collaboration/logging.rs
use tracing::{info, warn, error, instrument};
#[instrument(skip(self, edit))]
pub async fn log_collaborative_edit(&self, edit: &CollaborativeEdit) {
info!(
workspace_id = %edit.workspace_id,
session_id = %edit.session_id,
operation_type = ?edit.operation.op_type(),
vector_clock = ?edit.vector_clock,
"Collaborative edit applied"
);
}
pub async fn log_sync_metrics(&self, metrics: &SyncMetrics) {
info!(
workspace_id = %metrics.workspace_id,
sync_latency_ms = metrics.sync_latency_ms,
operations_synced = metrics.operations_synced,
conflicts_resolved = metrics.conflicts_resolved,
active_users = metrics.active_users,
"Collaboration sync metrics"
);
}
Error Handling​
#[derive(thiserror::Error, Debug)]
pub enum CollaborationError {
#[error("workspace not found: {0}")]
workspaceNotFound(Uuid),
#[error("Sync conflict could not be resolved automatically")]
UnresolvableConflict,
#[error("Connection lost to {0} collaborators")]
ConnectionLost(usize),
#[error("Collaboration session limit exceeded")]
SessionLimitExceeded,
#[error("Invalid CRDT operation: {0}")]
InvalidOperation(String),
}
11. References​
- ADR-001-v4: Container Execution
- ADR-003-v4: Multi-Tenant Architecture
- ADR-008-v4: Monitoring & Observability
- ADR-011-v4: Audit & Compliance
- LOGGING-STANDARD-v4
Version Compatibility​
- CRDT Library: 7.3+ for proven algorithms
- WebSocket: tokio-tungstenite 0.20+ for performance
- FoundationDB: 7.1+ for high-frequency writes
12. Approval Signatures​
Technical Sign-off​
| Component | Owner | Approved | Date |
|---|---|---|---|
| Architecture | Session5 | ✓ | 2025-09-01 |
| Implementation | Pending | - | - |
| Security Review | Pending | - | - |
| Performance Test | Pending | - | - |
Implementation Checklist​
- WebSocket server implemented
- CRDT sync engine functional
- Presence tracking system
- Cursor broadcasting
- Conflict resolution tested
- Multi-tenant isolation verified
- API endpoints complete
- Performance benchmarks met
- Security controls validated