ADR-013-v4: Queue Management - Part 2 (Technical)
Document Specification Block​
Document: ADR-013-v4-queue-management-part2-technical
Version: 1.0.1
Purpose: Constrain AI implementation with exact technical specifications for queue management
Audience: AI agents, developers implementing the system
Date Created: 2025-08-31
Date Modified: 2025-09-01
QA Review Date: 2025-08-31
Status: DRAFT
Table of Contents​
- Constraints
- Dependencies
- Component Architecture
- Data Models
- Implementation Patterns
- API Specifications
- Testing Requirements
- Performance Benchmarks
- Security Controls
- Logging and Error Handling
- References
- Approval Signatures
1. Constraints​
CONSTRAINT: Multi-Tenant Queue Isolation​
Each tenant's tasks MUST be completely isolated. No cross-tenant task visibility or processing allowed.
CONSTRAINT: At-Least-Once Delivery​
Tasks MUST NOT be lost. Use FoundationDB transactions to ensure atomic state transitions.
CONSTRAINT: Priority Ordering​
Within each priority level, tasks MUST be processed in FIFO order to ensure fairness.
CONSTRAINT: Worker Heterogeneity​
System MUST support different worker types (AI agents, humans, automated systems) with capability matching.
CONSTRAINT: Performance Requirements​
Task assignment latency MUST be <100ms. System MUST handle 10,000 tasks/second per region.
2. Dependencies​
cargo.toml Dependencies​
[dependencies]
# Core async runtime
tokio = { version = "1.35", features = ["full"] }
# Database
foundationdb = { version = "0.8", features = ["embedded-fdb-include"] }
# Web framework
actix-web = "4.4"
actix-rt = "2.9"
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Error handling
anyhow = "1.0"
thiserror = "1.0"
# Utilities
uuid = { version = "1.6", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
tracing = "0.1"
# Queue-specific
priority-queue = "1.3"
dashmap = "5.5"
[dev-dependencies]
tokio-test = "0.4"
3. Component Architecture​
// File: src/services/queue_service.rs
use std::sync::Arc;
use anyhow::Result;
use foundationdb::Database;
pub struct QueueService {
db: Arc<Database>,
task_queue: Arc<TaskQueue>,
worker_registry: Arc<WorkerRegistry>,
scheduler: Arc<TaskScheduler>,
monitor: Arc<QueueMonitor>,
}
// File: src/services/queue_service.rs
impl QueueService {
pub async fn submit_task(&self, task: QueuedTask) -> Result<Uuid> {
// 1. Validate task
task.validate()?;
// 2. Calculate priority
let priority = self.calculate_priority(&task)?;
// 3. Store in FoundationDB
let repo = TaskQueueRepository::new(self.db.clone(), task.tenant_id);
repo.enqueue_task(&task, priority).await?;
// 4. Notify scheduler
self.scheduler.notify_new_task(task.tenant_id).await?;
Ok(task.task_id)
}
}
### Worker Registry Implementation
```rust
// File: src/services/worker_registry.rs
use dashmap::DashMap;
use std::collections::HashSet;
pub struct WorkerRegistry {
workers: DashMap<String, WorkerInfo>,
capabilities: DashMap<TaskType, HashSet<String>>,
}
#[derive(Clone, Debug)]
pub struct WorkerInfo {
pub worker_id: String,
pub worker_type: WorkerType,
pub capabilities: HashSet<TaskType>,
pub capacity: u32,
pub current_load: u32,
pub health_status: HealthStatus,
pub last_heartbeat: DateTime<Utc>,
}
#[derive(Clone, Debug, PartialEq)]
pub enum WorkerType {
AIAgent { model: String },
Human { user_id: Uuid },
Automated { system: String },
}
#[derive(Clone, Debug, PartialEq)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
impl WorkerRegistry {
pub async fn register_worker(&self, info: WorkerInfo) -> Result<()> {
// Update worker info
self.workers.insert(info.worker_id.clone(), info.clone());
// Update capability index
for capability in &info.capabilities {
self.capabilities
.entry(*capability)
.or_insert_with(HashSet::new)
.insert(info.worker_id.clone());
}
Ok(())
}
pub async fn find_capable_worker(&self, task_type: &TaskType) -> Option<String> {
if let Some(worker_ids) = self.capabilities.get(task_type) {
// Find worker with lowest load
let mut best_worker = None;
let mut lowest_load_ratio = f32::MAX;
for worker_id in worker_ids.iter() {
if let Some(info) = self.workers.get(worker_id) {
if info.health_status == HealthStatus::Healthy {
let load_ratio = info.current_load as f32 / info.capacity as f32;
if load_ratio < lowest_load_ratio {
lowest_load_ratio = load_ratio;
best_worker = Some(worker_id.clone());
}
}
}
}
best_worker
} else {
None
}
}
pub async fn update_heartbeat(&self, worker_id: &str) -> Result<()> {
if let Some(mut info) = self.workers.get_mut(worker_id) {
info.last_heartbeat = Utc::now();
Ok(())
} else {
Err(anyhow!("Worker not found"))
}
}
}
Task Scheduler Implementation​
// File: src/services/task_scheduler.rs
use tokio::sync::mpsc;
use std::collections::{HashMap, VecDeque};
pub struct TaskScheduler {
notification_tx: mpsc::Sender<SchedulerEvent>,
dependency_graph: Arc<RwLock<HashMap<Uuid, Vec<Uuid>>>>,
ready_queue: Arc<RwLock<VecDeque<Uuid>>>,
}
#[derive(Debug)]
pub enum SchedulerEvent {
NewTask { tenant_id: Uuid },
TaskCompleted { task_id: Uuid },
WorkerAvailable { worker_id: String },
}
impl TaskScheduler {
pub async fn notify_new_task(&self, tenant_id: Uuid) -> Result<()> {
self.notification_tx
.send(SchedulerEvent::NewTask { tenant_id })
.await?;
Ok(())
}
pub async fn process_dependencies(&self, task: &QueuedTask) -> Result<bool> {
if task.dependencies.is_empty() {
return Ok(true);
}
let mut graph = self.dependency_graph.write().await;
// Check if all dependencies are completed
for dep_id in &task.dependencies {
if graph.contains_key(dep_id) {
// Dependency not yet completed
graph.entry(*dep_id)
.or_insert_with(Vec::new)
.push(task.task_id);
return Ok(false);
}
}
// All dependencies satisfied
Ok(true)
}
pub async fn handle_task_completion(&self, task_id: Uuid) -> Result<()> {
let mut graph = self.dependency_graph.write().await;
// Find tasks waiting on this completion
if let Some(waiting_tasks) = graph.remove(&task_id) {
let mut ready_queue = self.ready_queue.write().await;
for waiting_task_id in waiting_tasks {
ready_queue.push_back(waiting_task_id);
}
}
Ok(())
}
pub async fn apply_priority_aging(&self, db: Arc<Database>) -> Result<()> {
// Age tasks that have been pending too long
let threshold = Utc::now() - Duration::hours(1);
let repo = TaskQueueRepository::new(db, tenant_id);
let aged_tasks = repo.find_tasks_older_than(threshold).await?;
for mut task in aged_tasks {
task.priority = (task.priority + 1).min(100);
repo.update_priority(&task).await?;
}
Ok(())
}
}
Queue Monitor Implementation​
// File: src/services/queue_monitor.rs
use metrics::{counter, gauge, histogram};
pub struct QueueMonitor {
alert_thresholds: AlertThresholds,
metrics_tx: mpsc::Sender<QueueMetrics>,
}
#[derive(Clone)]
pub struct AlertThresholds {
pub max_queue_depth: usize,
pub max_wait_time_ms: u64,
pub min_success_rate: f32,
pub max_dead_letter_rate: f32,
}
#[derive(Debug)]
pub struct QueueMetrics {
pub timestamp: DateTime<Utc>,
pub queue_depth: HashMap<TaskType, usize>,
pub average_wait_time_ms: u64,
pub tasks_processed: u64,
pub tasks_failed: u64,
pub worker_utilization: HashMap<String, f32>,
}
impl QueueMonitor {
pub async fn record_task_submitted(&self, task: &QueuedTask) {
counter!("queue.tasks.submitted", 1,
"tenant_id" => task.tenant_id.to_string(),
"task_type" => format!("{:?}", task.task_type)
);
}
pub async fn record_task_assigned(&self, task: &QueuedTask, wait_time: Duration) {
histogram!("queue.wait_time_ms", wait_time.as_millis() as u64,
"task_type" => format!("{:?}", task.task_type)
);
counter!("queue.tasks.assigned", 1,
"task_type" => format!("{:?}", task.task_type)
);
}
pub async fn check_alert_conditions(&self, metrics: &QueueMetrics) -> Vec<Alert> {
let mut alerts = Vec::new();
// Check queue depth
for (task_type, depth) in &metrics.queue_depth {
if *depth > self.alert_thresholds.max_queue_depth {
alerts.push(Alert::QueueDepthHigh {
task_type: *task_type,
depth: *depth,
});
}
}
// Check wait time
if metrics.average_wait_time_ms > self.alert_thresholds.max_wait_time_ms {
alerts.push(Alert::WaitTimeHigh {
average_ms: metrics.average_wait_time_ms,
});
}
// Check success rate
let success_rate = metrics.tasks_processed as f32 /
(metrics.tasks_processed + metrics.tasks_failed) as f32;
if success_rate < self.alert_thresholds.min_success_rate {
alerts.push(Alert::SuccessRateLow {
rate: success_rate,
});
}
alerts
}
pub async fn update_queue_depth(&self, tenant_id: Uuid, depths: HashMap<TaskType, usize>) {
for (task_type, depth) in depths {
gauge!("queue.depth", depth as f64,
"tenant_id" => tenant_id.to_string(),
"task_type" => format!("{:?}", task_type)
);
}
}
}
#[derive(Debug)]
pub enum Alert {
QueueDepthHigh { task_type: TaskType, depth: usize },
WaitTimeHigh { average_ms: u64 },
SuccessRateLow { rate: f32 },
DeadLetterRateHigh { rate: f32 },
}
[↑ Back to Top](#table-of-contents)
## 4. Data Models
### Queued Task Structure
```rust
// File: src/models/queue.rs
use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize)]
pub struct QueuedTask {
pub task_id: Uuid,
pub tenant_id: Uuid,
pub task_type: TaskType,
pub payload: serde_json::Value,
pub priority: i32,
pub status: QueueStatus,
pub retry_count: u32,
pub max_retries: u32,
pub created_at: DateTime<Utc>,
pub assigned_at: Option<DateTime<Utc>>,
pub worker_id: Option<String>,
pub deadline: Option<DateTime<Utc>>,
pub dependencies: Vec<Uuid>,
}
#[derive(Serialize, Deserialize)]
pub enum QueueStatus {
Pending,
Assigned,
Processing,
Completed,
Failed,
Retrying,
DeadLetter,
}
#[derive(Serialize, Deserialize)]
pub enum TaskType {
CodeGeneration,
CodeReview,
Testing,
Deployment,
Documentation,
Analysis,
}
5. Implementation Patterns​
Repository Pattern for Queue Storage​
// File: src/db/repositories/task_queue_repository.rs
use foundationdb::{Database, Transaction};
use anyhow::Result;
impl TaskQueueRepository {
pub async fn enqueue_task(&self, task: &QueuedTask, priority: i32) -> Result<()> {
let tx = self.db.create_transaction()?;
// Primary storage
let task_key = format!("{}/queue/tasks/{}", self.tenant_id, task.task_id);
tx.set(&task_key, &serde_json::to_vec(task)?);
// Priority index
let priority_key = format!("{}/queue/by_priority/{:010}/{}",
self.tenant_id,
i32::MAX - priority, // Invert for descending order
task.task_id
);
tx.set(&priority_key, b"");
// Status index
let status_key = format!("{}/queue/by_status/{:?}/{}",
self.tenant_id,
task.status,
task.task_id
);
tx.set(&status_key, b"");
tx.commit().await?;
Ok(())
}
pub async fn dequeue_task(&self, worker_type: WorkerType) -> Result<Option<QueuedTask>> {
let tx = self.db.create_transaction()?;
// Find highest priority pending task
let prefix = format!("{}/queue/by_priority/", self.tenant_id);
let range = tx.get_range(&prefix, 1).await?;
if let Some((key, _)) = range.first() {
// Extract task_id from key
let task_id = extract_task_id(key)?;
// Load task
let task_key = format!("{}/queue/tasks/{}", self.tenant_id, task_id);
let task_data = tx.get(&task_key).await?.ok_or("Task not found")?;
let mut task: QueuedTask = serde_json::from_slice(&task_data)?;
// Update status
task.status = QueueStatus::Assigned;
task.assigned_at = Some(Utc::now());
// Update storage
tx.set(&task_key, &serde_json::to_vec(&task)?);
tx.commit().await?;
Ok(Some(task))
} else {
Ok(None)
}
}
}
Retry Logic Implementation​
// File: src/services/retry_service.rs
impl RetryService {
pub async fn should_retry(&self, task: &QueuedTask, error: &Error) -> Result<bool> {
// Check retry count
if task.retry_count >= task.max_retries {
return Ok(false);
}
// Check error type
match classify_error(error) {
ErrorType::Transient => Ok(true),
ErrorType::RateLimit => Ok(true),
ErrorType::Permanent => Ok(false),
}
}
pub fn calculate_backoff(&self, retry_count: u32) -> Duration {
let base_delay = Duration::from_secs(1);
let exponential = base_delay * 2u32.pow(retry_count);
let jitter = Duration::from_millis(rand::random::<u64>() % 1000);
exponential + jitter
}
}
Dead Letter Queue Handler​
// File: src/services/dead_letter_queue.rs
use std::collections::HashMap;
pub struct DeadLetterQueueHandler {
db: Arc<Database>,
classifier: Arc<FailureClassifier>,
notifier: Arc<AlertNotifier>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeadLetterTask {
pub task: QueuedTask,
pub failure_reason: String,
pub failure_type: FailureType,
pub error_details: serde_json::Value,
pub attempts: Vec<AttemptRecord>,
pub moved_to_dlq_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AttemptRecord {
pub attempt_number: u32,
pub timestamp: DateTime<Utc>,
pub worker_id: Option<String>,
pub error_message: String,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum FailureType {
InvalidInput,
AuthenticationFailure,
RateLimitExceeded,
Timeout,
InternalError,
DependencyFailure,
Unknown,
}
impl DeadLetterQueueHandler {
pub async fn move_to_dead_letter(&self, task: QueuedTask, error: &Error, attempts: Vec<AttemptRecord>) -> Result<()> {
let tx = self.db.create_transaction()?;
// Classify failure
let failure_type = self.classifier.classify_error(error)?;
// Create dead letter entry
let dlq_task = DeadLetterTask {
task: task.clone(),
failure_reason: error.to_string(),
failure_type,
error_details: json!({
"error_chain": error.chain().map(|e| e.to_string()).collect::<Vec<_>>(),
"backtrace": format!("{:?}", error.backtrace()),
}),
attempts,
moved_to_dlq_at: Utc::now(),
};
// Store in dead letter queue
let dlq_key = format!("{}/dlq/{}/{}",
task.tenant_id,
failure_type.to_string().to_lowercase(),
task.task_id
);
tx.set(&dlq_key, &serde_json::to_vec(&dlq_task)?);
// Remove from active queue
let task_key = format!("{}/queue/tasks/{}", task.tenant_id, task.task_id);
tx.clear(&task_key);
// Update metrics
self.update_dlq_metrics(&failure_type).await?;
// Send alerts if needed
self.check_alert_conditions(&failure_type).await?;
tx.commit().await?;
Ok(())
}
pub async fn bulk_retry(&self, filter: DlqFilter) -> Result<BulkRetryResult> {
let mut result = BulkRetryResult::default();
// Find matching tasks
let tasks = self.find_tasks_by_filter(filter).await?;
result.total_found = tasks.len();
for dlq_task in tasks {
// Reset retry count
let mut task = dlq_task.task.clone();
task.retry_count = 0;
task.status = QueueStatus::Pending;
// Re-enqueue
match self.requeue_task(task).await {
Ok(_) => result.successfully_retried += 1,
Err(e) => {
result.failed_to_retry += 1;
result.errors.push(format!("Task {}: {}", dlq_task.task.task_id, e));
}
}
}
Ok(result)
}
pub async fn analyze_failures(&self, time_range: TimeRange) -> Result<FailureAnalysis> {
let mut analysis = FailureAnalysis::default();
// Group by failure type
let failures = self.get_failures_in_range(time_range).await?;
for failure in failures {
analysis.by_type
.entry(failure.failure_type.clone())
.and_modify(|c| *c += 1)
.or_insert(1);
analysis.by_task_type
.entry(failure.task.task_type.clone())
.and_modify(|c| *c += 1)
.or_insert(1);
}
// Identify patterns
analysis.patterns = self.identify_failure_patterns(&failures)?;
Ok(analysis)
}
}
#[derive(Default)]
pub struct BulkRetryResult {
pub total_found: usize,
pub successfully_retried: usize,
pub failed_to_retry: usize,
pub errors: Vec<String>,
}
#[derive(Default)]
pub struct FailureAnalysis {
pub by_type: HashMap<FailureType, usize>,
pub by_task_type: HashMap<TaskType, usize>,
pub patterns: Vec<FailurePattern>,
}
#[derive(Debug)]
pub struct FailurePattern {
pub pattern_type: String,
pub description: String,
pub affected_count: usize,
pub recommendation: String,
}
6. API Specifications​
Queue Management Endpoints​
// File: src/api/handlers/queue_routes.rs
use actix_web::{web, HttpResponse, Result};
#[post("/api/v1/queue/submit")]
pub async fn submit_task(
Json(request): Json<SubmitTaskRequest>,
Extension(queue_service): Extension<Arc<QueueService>>,
Claims(claims): Claims,
) -> Result<HttpResponse> {
let task = QueuedTask {
task_id: Uuid::new_v4(),
tenant_id: claims.tenant_id,
task_type: request.task_type,
payload: request.payload,
priority: request.priority.unwrap_or(50),
status: QueueStatus::Pending,
retry_count: 0,
max_retries: 3,
created_at: Utc::now(),
assigned_at: None,
worker_id: None,
deadline: request.deadline,
dependencies: request.dependencies.unwrap_or_default(),
};
let task_id = queue_service.submit_task(task).await?;
Ok(HttpResponse::Created().json(json!({
"task_id": task_id,
"status": "queued"
})))
}
#[get("/api/v1/queue/status/{task_id}")]
pub async fn get_task_status(
Path(task_id): Path<Uuid>,
Extension(queue_service): Extension<Arc<QueueService>>,
Claims(claims): Claims,
) -> Result<HttpResponse> {
let status = queue_service
.get_task_status(claims.tenant_id, task_id)
.await?;
Ok(HttpResponse::Ok().json(status))
}
7. Testing Requirements​
Test Coverage Requirements​
- Unit Test Coverage: ≥90% of queue logic
- Integration Test Coverage: ≥80% of database operations
- Load Test Coverage: Handle 10x normal load
- Chaos Test Coverage: Network failures, worker crashes
Unit Tests​
// File: src/services/queue_service_tests.rs
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_priority_ordering() {
let service = setup_test_service().await;
// Submit tasks with different priorities
let low = create_task(10);
let high = create_task(90);
let medium = create_task(50);
service.submit_task(low.clone()).await.unwrap();
service.submit_task(high.clone()).await.unwrap();
service.submit_task(medium.clone()).await.unwrap();
// Dequeue should return high priority first
let first = service.dequeue_task().await.unwrap().unwrap();
assert_eq!(first.priority, 90);
}
#[tokio::test]
async fn test_retry_logic() {
let service = setup_test_service().await;
let task = create_task(50);
// Simulate failure
let error = anyhow!("Network timeout");
// Should retry transient errors
assert!(service.should_retry(&task, &error).await.unwrap());
// Calculate backoff
let backoff = service.calculate_backoff(2);
assert!(backoff >= Duration::from_secs(4));
assert!(backoff < Duration::from_secs(5));
}
#[tokio::test]
async fn test_worker_capability_matching() {
let registry = WorkerRegistry::new();
// Register workers with different capabilities
let ai_worker = WorkerInfo {
worker_id: "ai-001".to_string(),
worker_type: WorkerType::AIAgent { model: "gpt-4".to_string() },
capabilities: vec![TaskType::CodeGeneration, TaskType::CodeReview].into_iter().collect(),
capacity: 10,
current_load: 3,
health_status: HealthStatus::Healthy,
last_heartbeat: Utc::now(),
};
registry.register_worker(ai_worker).await.unwrap();
// Find capable worker
let worker = registry.find_capable_worker(&TaskType::CodeGeneration).await;
assert_eq!(worker, Some("ai-001".to_string()));
// No worker for deployment
let worker = registry.find_capable_worker(&TaskType::Deployment).await;
assert!(worker.is_none());
}
#[tokio::test]
async fn test_dependency_resolution() {
let scheduler = TaskScheduler::new();
// Task with no dependencies
let independent = QueuedTask {
task_id: Uuid::new_v4(),
dependencies: vec![],
..Default::default()
};
assert!(scheduler.process_dependencies(&independent).await.unwrap());
// Task with unsatisfied dependencies
let dependent = QueuedTask {
task_id: Uuid::new_v4(),
dependencies: vec![Uuid::new_v4()],
..Default::default()
};
assert!(!scheduler.process_dependencies(&dependent).await.unwrap());
}
}
Integration Tests​
// File: tests/queue_integration_tests.rs
#[tokio::test]
async fn test_end_to_end_task_lifecycle() {
let (service, worker) = setup_integration_test().await;
// Submit task
let task = QueuedTask {
task_id: Uuid::new_v4(),
tenant_id: test_tenant_id(),
task_type: TaskType::Testing,
payload: json!({"test": "data"}),
priority: 50,
status: QueueStatus::Pending,
retry_count: 0,
max_retries: 3,
created_at: Utc::now(),
..Default::default()
};
let task_id = service.submit_task(task).await.unwrap();
// Worker dequeues task
let dequeued = worker.get_next_task().await.unwrap().unwrap();
assert_eq!(dequeued.task_id, task_id);
assert_eq!(dequeued.status, QueueStatus::Assigned);
// Worker completes task
worker.complete_task(task_id, json!({"result": "success"})).await.unwrap();
// Verify completion
let final_status = service.get_task_status(test_tenant_id(), task_id).await.unwrap();
assert_eq!(final_status.status, QueueStatus::Completed);
}
#[tokio::test]
async fn test_multi_tenant_isolation() {
let service = setup_test_service().await;
let tenant1 = Uuid::new_v4();
let tenant2 = Uuid::new_v4();
// Submit tasks for different tenants
let task1 = create_task_for_tenant(tenant1, 50);
let task2 = create_task_for_tenant(tenant2, 90);
service.submit_task(task1.clone()).await.unwrap();
service.submit_task(task2.clone()).await.unwrap();
// Tenant 1 should only see their task
let tenant1_tasks = service.list_tasks(tenant1).await.unwrap();
assert_eq!(tenant1_tasks.len(), 1);
assert_eq!(tenant1_tasks[0].tenant_id, tenant1);
// Tenant 2 should only see their task
let tenant2_tasks = service.list_tasks(tenant2).await.unwrap();
assert_eq!(tenant2_tasks.len(), 1);
assert_eq!(tenant2_tasks[0].tenant_id, tenant2);
}
#[tokio::test]
async fn test_dead_letter_queue_flow() {
let service = setup_test_service().await;
let dlq_handler = service.dead_letter_handler();
// Create task that will fail
let task = QueuedTask {
task_id: Uuid::new_v4(),
max_retries: 2,
..create_task(50)
};
service.submit_task(task.clone()).await.unwrap();
// Simulate failures
for i in 0..3 {
let error = anyhow!("Processing failed");
service.handle_task_failure(task.task_id, error).await.unwrap();
}
// Task should be in DLQ
let dlq_tasks = dlq_handler.list_tasks(DlqFilter::All).await.unwrap();
assert_eq!(dlq_tasks.len(), 1);
assert_eq!(dlq_tasks[0].task.task_id, task.task_id);
// Bulk retry
let result = dlq_handler.bulk_retry(DlqFilter::All).await.unwrap();
assert_eq!(result.successfully_retried, 1);
}
Load Tests​
// File: tests/queue_load_tests.rs
#[tokio::test]
async fn test_high_throughput() {
let service = setup_test_service().await;
let start = Instant::now();
// Submit 10,000 tasks concurrently
let mut handles = vec![];
for i in 0..10_000 {
let service_clone = service.clone();
let handle = tokio::spawn(async move {
let task = create_task(i % 100);
service_clone.submit_task(task).await
});
handles.push(handle);
}
// Wait for all submissions
for handle in handles {
handle.await.unwrap().unwrap();
}
let elapsed = start.elapsed();
let throughput = 10_000.0 / elapsed.as_secs_f64();
// Should handle at least 10,000 tasks/second
assert!(throughput >= 10_000.0, "Throughput {} < 10,000", throughput);
}
#[tokio::test]
async fn test_concurrent_workers() {
let service = setup_test_service().await;
// Submit 1000 tasks
for _ in 0..1000 {
service.submit_task(create_task(50)).await.unwrap();
}
// Spawn 10 workers
let mut worker_handles = vec![];
for worker_id in 0..10 {
let service_clone = service.clone();
let handle = tokio::spawn(async move {
let mut processed = 0;
while let Some(task) = service_clone.dequeue_task().await.unwrap() {
// Simulate processing
tokio::time::sleep(Duration::from_millis(10)).await;
service_clone.complete_task(task.task_id).await.unwrap();
processed += 1;
}
processed
});
worker_handles.push(handle);
}
// Wait for all workers
let total_processed: usize = futures::future::join_all(worker_handles)
.await
.into_iter()
.map(|r| r.unwrap())
.sum();
assert_eq!(total_processed, 1000);
}
8. Performance Benchmarks​
Required Performance Metrics​
// File: benches/queue_benchmarks.rs
const BENCHMARK_TASKS: usize = 100_000;
const TARGET_THROUGHPUT: usize = 10_000; // tasks/second
const MAX_ASSIGNMENT_LATENCY_MS: u64 = 100;
#[bench]
fn bench_task_submission(b: &mut Bencher) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let service = runtime.block_on(setup_service());
b.iter(|| {
runtime.block_on(async {
for _ in 0..1000 {
let task = generate_random_task();
service.submit_task(task).await.unwrap();
}
})
});
}
#[bench]
fn bench_task_dequeue(b: &mut Bencher) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let service = runtime.block_on(setup_service());
// Pre-populate queue
runtime.block_on(async {
for _ in 0..10_000 {
service.submit_task(generate_random_task()).await.unwrap();
}
});
b.iter(|| {
runtime.block_on(async {
service.dequeue_task(WorkerType::AIAgent).await
})
});
}
#[bench]
fn bench_concurrent_operations(b: &mut Bencher) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let service = runtime.block_on(setup_service());
b.iter(|| {
runtime.block_on(async {
let mut handles = vec![];
// Mix of operations
for i in 0..100 {
let service_clone = service.clone();
let handle = tokio::spawn(async move {
if i % 3 == 0 {
// Submit
service_clone.submit_task(generate_random_task()).await
} else if i % 3 == 1 {
// Dequeue
service_clone.dequeue_task(WorkerType::AIAgent).await.ok();
} else {
// Status check
service_clone.get_task_status(test_tenant_id(), Uuid::new_v4()).await.ok();
}
});
handles.push(handle);
}
futures::future::join_all(handles).await;
})
});
}
impl PerformanceValidator {
pub async fn validate_requirements(&self) -> Result<ValidationReport> {
let mut report = ValidationReport::default();
// Test submission latency
let start = Instant::now();
self.service.submit_task(generate_random_task()).await?;
let submission_latency = start.elapsed();
report.submission_latency_ms = submission_latency.as_millis() as u64;
// Test assignment latency
let start = Instant::now();
self.service.dequeue_task(WorkerType::AIAgent).await?;
let assignment_latency = start.elapsed();
report.assignment_latency_ms = assignment_latency.as_millis() as u64;
// Test throughput
let throughput = self.measure_throughput(Duration::from_secs(10)).await?;
report.throughput_per_second = throughput;
// Validate against requirements
report.meets_requirements =
report.assignment_latency_ms <= MAX_ASSIGNMENT_LATENCY_MS &&
report.throughput_per_second >= TARGET_THROUGHPUT;
Ok(report)
}
}
9. Security Controls​
Access Control​
impl QueueAccessControl {
pub fn can_submit_task(&self, claims: &Claims, task_type: &TaskType) -> bool {
claims.tenant_id == task.tenant_id &&
match task_type {
TaskType::Deployment => claims.roles.contains("deploy"),
_ => claims.roles.contains("developer"),
}
}
pub fn can_view_queue(&self, claims: &Claims, tenant_id: Uuid) -> bool {
claims.tenant_id == tenant_id &&
claims.roles.contains("developer")
}
}
10. Logging and Error Handling​
Logging Pattern​
use tracing::{info, warn, error};
pub fn log_queue_event(action: &str, task_id: Uuid, details: serde_json::Value) {
info!(
action = action,
task_id = %task_id,
details = ?details,
"Queue event"
);
}
pub fn log_queue_error(action: &str, task_id: Uuid, error: &Error) {
error!(
action = action,
task_id = %task_id,
error = %error,
"Queue operation failed"
);
}
Error Handling​
#[derive(thiserror::Error, Debug)]
pub enum QueueError {
#[error("Queue is full. Please try again later.")]
QueueFull,
#[error("Task not found: {0}")]
TaskNotFound(Uuid),
#[error("No workers available for task type: {0:?}")]
NoWorkersAvailable(TaskType),
}
11. References​
- ADR-001-v4: Container Execution
- ADR-003-v4: Multi-Tenant Architecture
- LOGGING-STANDARD-v4
- ERROR-HANDLING-STANDARD-v4
12. Approval Signatures​
Technical Sign-off​
| Component | Owner | Approved | Date |
|---|---|---|---|
| Architecture | Session6 | ✓ | 2025-08-31 |
| Implementation | Pending | - | - |
| Security Review | Pending | - | - |
| Performance Test | Pending | - | - |
Implementation Checklist​
- Queue models implemented
- Repository pattern with FDB
- Priority queue logic
- Worker Registry with capability matching
- Task Scheduler with dependency resolution
- Queue Monitor with metrics collection
- Retry mechanism with exponential backoff
- Dead Letter Queue handler with bulk retry
- API endpoints for submit/status
- Unit tests with worker and dependency tests
- Integration tests covering multi-tenancy
- Load tests demonstrating 10k tasks/second
- Performance benchmarks with validation
- Security controls validated