Skip to main content

ADR-013-v4: Queue Management - Part 2 (Technical)

Document Specification Block​

Document: ADR-013-v4-queue-management-part2-technical
Version: 1.0.1
Purpose: Constrain AI implementation with exact technical specifications for queue management
Audience: AI agents, developers implementing the system
Date Created: 2025-08-31
Date Modified: 2025-09-01
QA Review Date: 2025-08-31
Status: DRAFT

Table of Contents​

  1. Constraints
  2. Dependencies
  3. Component Architecture
  4. Data Models
  5. Implementation Patterns
  6. API Specifications
  7. Testing Requirements
  8. Performance Benchmarks
  9. Security Controls
  10. Logging and Error Handling
  11. References
  12. Approval Signatures

1. Constraints​

CONSTRAINT: Multi-Tenant Queue Isolation​

Each tenant's tasks MUST be completely isolated. No cross-tenant task visibility or processing allowed.

CONSTRAINT: At-Least-Once Delivery​

Tasks MUST NOT be lost. Use FoundationDB transactions to ensure atomic state transitions.

CONSTRAINT: Priority Ordering​

Within each priority level, tasks MUST be processed in FIFO order to ensure fairness.

CONSTRAINT: Worker Heterogeneity​

System MUST support different worker types (AI agents, humans, automated systems) with capability matching.

CONSTRAINT: Performance Requirements​

Task assignment latency MUST be <100ms. System MUST handle 10,000 tasks/second per region.

↑ Back to Top

2. Dependencies​

cargo.toml Dependencies​

[dependencies]
# Core async runtime
tokio = { version = "1.35", features = ["full"] }

# Database
foundationdb = { version = "0.8", features = ["embedded-fdb-include"] }

# Web framework
actix-web = "4.4"
actix-rt = "2.9"

# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

# Error handling
anyhow = "1.0"
thiserror = "1.0"

# Utilities
uuid = { version = "1.6", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
tracing = "0.1"

# Queue-specific
priority-queue = "1.3"
dashmap = "5.5"

[dev-dependencies]
tokio-test = "0.4"

↑ Back to Top

3. Component Architecture​

// File: src/services/queue_service.rs
use std::sync::Arc;
use anyhow::Result;
use foundationdb::Database;

pub struct QueueService {
db: Arc<Database>,
task_queue: Arc<TaskQueue>,
worker_registry: Arc<WorkerRegistry>,
scheduler: Arc<TaskScheduler>,
monitor: Arc<QueueMonitor>,
}

// File: src/services/queue_service.rs
impl QueueService {
pub async fn submit_task(&self, task: QueuedTask) -> Result<Uuid> {
// 1. Validate task
task.validate()?;

// 2. Calculate priority
let priority = self.calculate_priority(&task)?;

// 3. Store in FoundationDB
let repo = TaskQueueRepository::new(self.db.clone(), task.tenant_id);
repo.enqueue_task(&task, priority).await?;

// 4. Notify scheduler
self.scheduler.notify_new_task(task.tenant_id).await?;

Ok(task.task_id)
}
}

### Worker Registry Implementation
```rust
// File: src/services/worker_registry.rs
use dashmap::DashMap;
use std::collections::HashSet;

pub struct WorkerRegistry {
workers: DashMap<String, WorkerInfo>,
capabilities: DashMap<TaskType, HashSet<String>>,
}

#[derive(Clone, Debug)]
pub struct WorkerInfo {
pub worker_id: String,
pub worker_type: WorkerType,
pub capabilities: HashSet<TaskType>,
pub capacity: u32,
pub current_load: u32,
pub health_status: HealthStatus,
pub last_heartbeat: DateTime<Utc>,
}

#[derive(Clone, Debug, PartialEq)]
pub enum WorkerType {
AIAgent { model: String },
Human { user_id: Uuid },
Automated { system: String },
}

#[derive(Clone, Debug, PartialEq)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}

impl WorkerRegistry {
pub async fn register_worker(&self, info: WorkerInfo) -> Result<()> {
// Update worker info
self.workers.insert(info.worker_id.clone(), info.clone());

// Update capability index
for capability in &info.capabilities {
self.capabilities
.entry(*capability)
.or_insert_with(HashSet::new)
.insert(info.worker_id.clone());
}

Ok(())
}

pub async fn find_capable_worker(&self, task_type: &TaskType) -> Option<String> {
if let Some(worker_ids) = self.capabilities.get(task_type) {
// Find worker with lowest load
let mut best_worker = None;
let mut lowest_load_ratio = f32::MAX;

for worker_id in worker_ids.iter() {
if let Some(info) = self.workers.get(worker_id) {
if info.health_status == HealthStatus::Healthy {
let load_ratio = info.current_load as f32 / info.capacity as f32;
if load_ratio < lowest_load_ratio {
lowest_load_ratio = load_ratio;
best_worker = Some(worker_id.clone());
}
}
}
}
best_worker
} else {
None
}
}

pub async fn update_heartbeat(&self, worker_id: &str) -> Result<()> {
if let Some(mut info) = self.workers.get_mut(worker_id) {
info.last_heartbeat = Utc::now();
Ok(())
} else {
Err(anyhow!("Worker not found"))
}
}
}

Task Scheduler Implementation​

// File: src/services/task_scheduler.rs
use tokio::sync::mpsc;
use std::collections::{HashMap, VecDeque};

pub struct TaskScheduler {
notification_tx: mpsc::Sender<SchedulerEvent>,
dependency_graph: Arc<RwLock<HashMap<Uuid, Vec<Uuid>>>>,
ready_queue: Arc<RwLock<VecDeque<Uuid>>>,
}

#[derive(Debug)]
pub enum SchedulerEvent {
NewTask { tenant_id: Uuid },
TaskCompleted { task_id: Uuid },
WorkerAvailable { worker_id: String },
}

impl TaskScheduler {
pub async fn notify_new_task(&self, tenant_id: Uuid) -> Result<()> {
self.notification_tx
.send(SchedulerEvent::NewTask { tenant_id })
.await?;
Ok(())
}

pub async fn process_dependencies(&self, task: &QueuedTask) -> Result<bool> {
if task.dependencies.is_empty() {
return Ok(true);
}

let mut graph = self.dependency_graph.write().await;

// Check if all dependencies are completed
for dep_id in &task.dependencies {
if graph.contains_key(dep_id) {
// Dependency not yet completed
graph.entry(*dep_id)
.or_insert_with(Vec::new)
.push(task.task_id);
return Ok(false);
}
}

// All dependencies satisfied
Ok(true)
}

pub async fn handle_task_completion(&self, task_id: Uuid) -> Result<()> {
let mut graph = self.dependency_graph.write().await;

// Find tasks waiting on this completion
if let Some(waiting_tasks) = graph.remove(&task_id) {
let mut ready_queue = self.ready_queue.write().await;
for waiting_task_id in waiting_tasks {
ready_queue.push_back(waiting_task_id);
}
}

Ok(())
}

pub async fn apply_priority_aging(&self, db: Arc<Database>) -> Result<()> {
// Age tasks that have been pending too long
let threshold = Utc::now() - Duration::hours(1);
let repo = TaskQueueRepository::new(db, tenant_id);

let aged_tasks = repo.find_tasks_older_than(threshold).await?;
for mut task in aged_tasks {
task.priority = (task.priority + 1).min(100);
repo.update_priority(&task).await?;
}

Ok(())
}
}

Queue Monitor Implementation​

// File: src/services/queue_monitor.rs
use metrics::{counter, gauge, histogram};

pub struct QueueMonitor {
alert_thresholds: AlertThresholds,
metrics_tx: mpsc::Sender<QueueMetrics>,
}

#[derive(Clone)]
pub struct AlertThresholds {
pub max_queue_depth: usize,
pub max_wait_time_ms: u64,
pub min_success_rate: f32,
pub max_dead_letter_rate: f32,
}

#[derive(Debug)]
pub struct QueueMetrics {
pub timestamp: DateTime<Utc>,
pub queue_depth: HashMap<TaskType, usize>,
pub average_wait_time_ms: u64,
pub tasks_processed: u64,
pub tasks_failed: u64,
pub worker_utilization: HashMap<String, f32>,
}

impl QueueMonitor {
pub async fn record_task_submitted(&self, task: &QueuedTask) {
counter!("queue.tasks.submitted", 1,
"tenant_id" => task.tenant_id.to_string(),
"task_type" => format!("{:?}", task.task_type)
);
}

pub async fn record_task_assigned(&self, task: &QueuedTask, wait_time: Duration) {
histogram!("queue.wait_time_ms", wait_time.as_millis() as u64,
"task_type" => format!("{:?}", task.task_type)
);

counter!("queue.tasks.assigned", 1,
"task_type" => format!("{:?}", task.task_type)
);
}

pub async fn check_alert_conditions(&self, metrics: &QueueMetrics) -> Vec<Alert> {
let mut alerts = Vec::new();

// Check queue depth
for (task_type, depth) in &metrics.queue_depth {
if *depth > self.alert_thresholds.max_queue_depth {
alerts.push(Alert::QueueDepthHigh {
task_type: *task_type,
depth: *depth,
});
}
}

// Check wait time
if metrics.average_wait_time_ms > self.alert_thresholds.max_wait_time_ms {
alerts.push(Alert::WaitTimeHigh {
average_ms: metrics.average_wait_time_ms,
});
}

// Check success rate
let success_rate = metrics.tasks_processed as f32 /
(metrics.tasks_processed + metrics.tasks_failed) as f32;

if success_rate < self.alert_thresholds.min_success_rate {
alerts.push(Alert::SuccessRateLow {
rate: success_rate,
});
}

alerts
}

pub async fn update_queue_depth(&self, tenant_id: Uuid, depths: HashMap<TaskType, usize>) {
for (task_type, depth) in depths {
gauge!("queue.depth", depth as f64,
"tenant_id" => tenant_id.to_string(),
"task_type" => format!("{:?}", task_type)
);
}
}
}

#[derive(Debug)]
pub enum Alert {
QueueDepthHigh { task_type: TaskType, depth: usize },
WaitTimeHigh { average_ms: u64 },
SuccessRateLow { rate: f32 },
DeadLetterRateHigh { rate: f32 },
}

[↑ Back to Top](#table-of-contents)

## 4. Data Models

### Queued Task Structure
```rust
// File: src/models/queue.rs
use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
use uuid::Uuid;

#[derive(Serialize, Deserialize)]
pub struct QueuedTask {
pub task_id: Uuid,
pub tenant_id: Uuid,
pub task_type: TaskType,
pub payload: serde_json::Value,
pub priority: i32,
pub status: QueueStatus,
pub retry_count: u32,
pub max_retries: u32,
pub created_at: DateTime<Utc>,
pub assigned_at: Option<DateTime<Utc>>,
pub worker_id: Option<String>,
pub deadline: Option<DateTime<Utc>>,
pub dependencies: Vec<Uuid>,
}

#[derive(Serialize, Deserialize)]
pub enum QueueStatus {
Pending,
Assigned,
Processing,
Completed,
Failed,
Retrying,
DeadLetter,
}

#[derive(Serialize, Deserialize)]
pub enum TaskType {
CodeGeneration,
CodeReview,
Testing,
Deployment,
Documentation,
Analysis,
}

↑ Back to Top

5. Implementation Patterns​

Repository Pattern for Queue Storage​

// File: src/db/repositories/task_queue_repository.rs
use foundationdb::{Database, Transaction};
use anyhow::Result;

impl TaskQueueRepository {
pub async fn enqueue_task(&self, task: &QueuedTask, priority: i32) -> Result<()> {
let tx = self.db.create_transaction()?;

// Primary storage
let task_key = format!("{}/queue/tasks/{}", self.tenant_id, task.task_id);
tx.set(&task_key, &serde_json::to_vec(task)?);

// Priority index
let priority_key = format!("{}/queue/by_priority/{:010}/{}",
self.tenant_id,
i32::MAX - priority, // Invert for descending order
task.task_id
);
tx.set(&priority_key, b"");

// Status index
let status_key = format!("{}/queue/by_status/{:?}/{}",
self.tenant_id,
task.status,
task.task_id
);
tx.set(&status_key, b"");

tx.commit().await?;
Ok(())
}

pub async fn dequeue_task(&self, worker_type: WorkerType) -> Result<Option<QueuedTask>> {
let tx = self.db.create_transaction()?;

// Find highest priority pending task
let prefix = format!("{}/queue/by_priority/", self.tenant_id);
let range = tx.get_range(&prefix, 1).await?;

if let Some((key, _)) = range.first() {
// Extract task_id from key
let task_id = extract_task_id(key)?;

// Load task
let task_key = format!("{}/queue/tasks/{}", self.tenant_id, task_id);
let task_data = tx.get(&task_key).await?.ok_or("Task not found")?;
let mut task: QueuedTask = serde_json::from_slice(&task_data)?;

// Update status
task.status = QueueStatus::Assigned;
task.assigned_at = Some(Utc::now());

// Update storage
tx.set(&task_key, &serde_json::to_vec(&task)?);

tx.commit().await?;
Ok(Some(task))
} else {
Ok(None)
}
}
}

Retry Logic Implementation​

// File: src/services/retry_service.rs
impl RetryService {
pub async fn should_retry(&self, task: &QueuedTask, error: &Error) -> Result<bool> {
// Check retry count
if task.retry_count >= task.max_retries {
return Ok(false);
}

// Check error type
match classify_error(error) {
ErrorType::Transient => Ok(true),
ErrorType::RateLimit => Ok(true),
ErrorType::Permanent => Ok(false),
}
}

pub fn calculate_backoff(&self, retry_count: u32) -> Duration {
let base_delay = Duration::from_secs(1);
let exponential = base_delay * 2u32.pow(retry_count);
let jitter = Duration::from_millis(rand::random::<u64>() % 1000);
exponential + jitter
}
}

Dead Letter Queue Handler​

// File: src/services/dead_letter_queue.rs
use std::collections::HashMap;

pub struct DeadLetterQueueHandler {
db: Arc<Database>,
classifier: Arc<FailureClassifier>,
notifier: Arc<AlertNotifier>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DeadLetterTask {
pub task: QueuedTask,
pub failure_reason: String,
pub failure_type: FailureType,
pub error_details: serde_json::Value,
pub attempts: Vec<AttemptRecord>,
pub moved_to_dlq_at: DateTime<Utc>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AttemptRecord {
pub attempt_number: u32,
pub timestamp: DateTime<Utc>,
pub worker_id: Option<String>,
pub error_message: String,
pub duration_ms: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum FailureType {
InvalidInput,
AuthenticationFailure,
RateLimitExceeded,
Timeout,
InternalError,
DependencyFailure,
Unknown,
}

impl DeadLetterQueueHandler {
pub async fn move_to_dead_letter(&self, task: QueuedTask, error: &Error, attempts: Vec<AttemptRecord>) -> Result<()> {
let tx = self.db.create_transaction()?;

// Classify failure
let failure_type = self.classifier.classify_error(error)?;

// Create dead letter entry
let dlq_task = DeadLetterTask {
task: task.clone(),
failure_reason: error.to_string(),
failure_type,
error_details: json!({
"error_chain": error.chain().map(|e| e.to_string()).collect::<Vec<_>>(),
"backtrace": format!("{:?}", error.backtrace()),
}),
attempts,
moved_to_dlq_at: Utc::now(),
};

// Store in dead letter queue
let dlq_key = format!("{}/dlq/{}/{}",
task.tenant_id,
failure_type.to_string().to_lowercase(),
task.task_id
);
tx.set(&dlq_key, &serde_json::to_vec(&dlq_task)?);

// Remove from active queue
let task_key = format!("{}/queue/tasks/{}", task.tenant_id, task.task_id);
tx.clear(&task_key);

// Update metrics
self.update_dlq_metrics(&failure_type).await?;

// Send alerts if needed
self.check_alert_conditions(&failure_type).await?;

tx.commit().await?;
Ok(())
}

pub async fn bulk_retry(&self, filter: DlqFilter) -> Result<BulkRetryResult> {
let mut result = BulkRetryResult::default();

// Find matching tasks
let tasks = self.find_tasks_by_filter(filter).await?;
result.total_found = tasks.len();

for dlq_task in tasks {
// Reset retry count
let mut task = dlq_task.task.clone();
task.retry_count = 0;
task.status = QueueStatus::Pending;

// Re-enqueue
match self.requeue_task(task).await {
Ok(_) => result.successfully_retried += 1,
Err(e) => {
result.failed_to_retry += 1;
result.errors.push(format!("Task {}: {}", dlq_task.task.task_id, e));
}
}
}

Ok(result)
}

pub async fn analyze_failures(&self, time_range: TimeRange) -> Result<FailureAnalysis> {
let mut analysis = FailureAnalysis::default();

// Group by failure type
let failures = self.get_failures_in_range(time_range).await?;

for failure in failures {
analysis.by_type
.entry(failure.failure_type.clone())
.and_modify(|c| *c += 1)
.or_insert(1);

analysis.by_task_type
.entry(failure.task.task_type.clone())
.and_modify(|c| *c += 1)
.or_insert(1);
}

// Identify patterns
analysis.patterns = self.identify_failure_patterns(&failures)?;

Ok(analysis)
}
}

#[derive(Default)]
pub struct BulkRetryResult {
pub total_found: usize,
pub successfully_retried: usize,
pub failed_to_retry: usize,
pub errors: Vec<String>,
}

#[derive(Default)]
pub struct FailureAnalysis {
pub by_type: HashMap<FailureType, usize>,
pub by_task_type: HashMap<TaskType, usize>,
pub patterns: Vec<FailurePattern>,
}

#[derive(Debug)]
pub struct FailurePattern {
pub pattern_type: String,
pub description: String,
pub affected_count: usize,
pub recommendation: String,
}

↑ Back to Top

6. API Specifications​

Queue Management Endpoints​

// File: src/api/handlers/queue_routes.rs
use actix_web::{web, HttpResponse, Result};

#[post("/api/v1/queue/submit")]
pub async fn submit_task(
Json(request): Json<SubmitTaskRequest>,
Extension(queue_service): Extension<Arc<QueueService>>,
Claims(claims): Claims,
) -> Result<HttpResponse> {
let task = QueuedTask {
task_id: Uuid::new_v4(),
tenant_id: claims.tenant_id,
task_type: request.task_type,
payload: request.payload,
priority: request.priority.unwrap_or(50),
status: QueueStatus::Pending,
retry_count: 0,
max_retries: 3,
created_at: Utc::now(),
assigned_at: None,
worker_id: None,
deadline: request.deadline,
dependencies: request.dependencies.unwrap_or_default(),
};

let task_id = queue_service.submit_task(task).await?;

Ok(HttpResponse::Created().json(json!({
"task_id": task_id,
"status": "queued"
})))
}

#[get("/api/v1/queue/status/{task_id}")]
pub async fn get_task_status(
Path(task_id): Path<Uuid>,
Extension(queue_service): Extension<Arc<QueueService>>,
Claims(claims): Claims,
) -> Result<HttpResponse> {
let status = queue_service
.get_task_status(claims.tenant_id, task_id)
.await?;

Ok(HttpResponse::Ok().json(status))
}

↑ Back to Top

7. Testing Requirements​

Test Coverage Requirements​

  • Unit Test Coverage: ≥90% of queue logic
  • Integration Test Coverage: ≥80% of database operations
  • Load Test Coverage: Handle 10x normal load
  • Chaos Test Coverage: Network failures, worker crashes

Unit Tests​

// File: src/services/queue_service_tests.rs
#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_priority_ordering() {
let service = setup_test_service().await;

// Submit tasks with different priorities
let low = create_task(10);
let high = create_task(90);
let medium = create_task(50);

service.submit_task(low.clone()).await.unwrap();
service.submit_task(high.clone()).await.unwrap();
service.submit_task(medium.clone()).await.unwrap();

// Dequeue should return high priority first
let first = service.dequeue_task().await.unwrap().unwrap();
assert_eq!(first.priority, 90);
}

#[tokio::test]
async fn test_retry_logic() {
let service = setup_test_service().await;
let task = create_task(50);

// Simulate failure
let error = anyhow!("Network timeout");

// Should retry transient errors
assert!(service.should_retry(&task, &error).await.unwrap());

// Calculate backoff
let backoff = service.calculate_backoff(2);
assert!(backoff >= Duration::from_secs(4));
assert!(backoff < Duration::from_secs(5));
}

#[tokio::test]
async fn test_worker_capability_matching() {
let registry = WorkerRegistry::new();

// Register workers with different capabilities
let ai_worker = WorkerInfo {
worker_id: "ai-001".to_string(),
worker_type: WorkerType::AIAgent { model: "gpt-4".to_string() },
capabilities: vec![TaskType::CodeGeneration, TaskType::CodeReview].into_iter().collect(),
capacity: 10,
current_load: 3,
health_status: HealthStatus::Healthy,
last_heartbeat: Utc::now(),
};

registry.register_worker(ai_worker).await.unwrap();

// Find capable worker
let worker = registry.find_capable_worker(&TaskType::CodeGeneration).await;
assert_eq!(worker, Some("ai-001".to_string()));

// No worker for deployment
let worker = registry.find_capable_worker(&TaskType::Deployment).await;
assert!(worker.is_none());
}

#[tokio::test]
async fn test_dependency_resolution() {
let scheduler = TaskScheduler::new();

// Task with no dependencies
let independent = QueuedTask {
task_id: Uuid::new_v4(),
dependencies: vec![],
..Default::default()
};
assert!(scheduler.process_dependencies(&independent).await.unwrap());

// Task with unsatisfied dependencies
let dependent = QueuedTask {
task_id: Uuid::new_v4(),
dependencies: vec![Uuid::new_v4()],
..Default::default()
};
assert!(!scheduler.process_dependencies(&dependent).await.unwrap());
}
}

Integration Tests​

// File: tests/queue_integration_tests.rs
#[tokio::test]
async fn test_end_to_end_task_lifecycle() {
let (service, worker) = setup_integration_test().await;

// Submit task
let task = QueuedTask {
task_id: Uuid::new_v4(),
tenant_id: test_tenant_id(),
task_type: TaskType::Testing,
payload: json!({"test": "data"}),
priority: 50,
status: QueueStatus::Pending,
retry_count: 0,
max_retries: 3,
created_at: Utc::now(),
..Default::default()
};

let task_id = service.submit_task(task).await.unwrap();

// Worker dequeues task
let dequeued = worker.get_next_task().await.unwrap().unwrap();
assert_eq!(dequeued.task_id, task_id);
assert_eq!(dequeued.status, QueueStatus::Assigned);

// Worker completes task
worker.complete_task(task_id, json!({"result": "success"})).await.unwrap();

// Verify completion
let final_status = service.get_task_status(test_tenant_id(), task_id).await.unwrap();
assert_eq!(final_status.status, QueueStatus::Completed);
}

#[tokio::test]
async fn test_multi_tenant_isolation() {
let service = setup_test_service().await;
let tenant1 = Uuid::new_v4();
let tenant2 = Uuid::new_v4();

// Submit tasks for different tenants
let task1 = create_task_for_tenant(tenant1, 50);
let task2 = create_task_for_tenant(tenant2, 90);

service.submit_task(task1.clone()).await.unwrap();
service.submit_task(task2.clone()).await.unwrap();

// Tenant 1 should only see their task
let tenant1_tasks = service.list_tasks(tenant1).await.unwrap();
assert_eq!(tenant1_tasks.len(), 1);
assert_eq!(tenant1_tasks[0].tenant_id, tenant1);

// Tenant 2 should only see their task
let tenant2_tasks = service.list_tasks(tenant2).await.unwrap();
assert_eq!(tenant2_tasks.len(), 1);
assert_eq!(tenant2_tasks[0].tenant_id, tenant2);
}

#[tokio::test]
async fn test_dead_letter_queue_flow() {
let service = setup_test_service().await;
let dlq_handler = service.dead_letter_handler();

// Create task that will fail
let task = QueuedTask {
task_id: Uuid::new_v4(),
max_retries: 2,
..create_task(50)
};

service.submit_task(task.clone()).await.unwrap();

// Simulate failures
for i in 0..3 {
let error = anyhow!("Processing failed");
service.handle_task_failure(task.task_id, error).await.unwrap();
}

// Task should be in DLQ
let dlq_tasks = dlq_handler.list_tasks(DlqFilter::All).await.unwrap();
assert_eq!(dlq_tasks.len(), 1);
assert_eq!(dlq_tasks[0].task.task_id, task.task_id);

// Bulk retry
let result = dlq_handler.bulk_retry(DlqFilter::All).await.unwrap();
assert_eq!(result.successfully_retried, 1);
}

Load Tests​

// File: tests/queue_load_tests.rs
#[tokio::test]
async fn test_high_throughput() {
let service = setup_test_service().await;
let start = Instant::now();

// Submit 10,000 tasks concurrently
let mut handles = vec![];
for i in 0..10_000 {
let service_clone = service.clone();
let handle = tokio::spawn(async move {
let task = create_task(i % 100);
service_clone.submit_task(task).await
});
handles.push(handle);
}

// Wait for all submissions
for handle in handles {
handle.await.unwrap().unwrap();
}

let elapsed = start.elapsed();
let throughput = 10_000.0 / elapsed.as_secs_f64();

// Should handle at least 10,000 tasks/second
assert!(throughput >= 10_000.0, "Throughput {} < 10,000", throughput);
}

#[tokio::test]
async fn test_concurrent_workers() {
let service = setup_test_service().await;

// Submit 1000 tasks
for _ in 0..1000 {
service.submit_task(create_task(50)).await.unwrap();
}

// Spawn 10 workers
let mut worker_handles = vec![];
for worker_id in 0..10 {
let service_clone = service.clone();
let handle = tokio::spawn(async move {
let mut processed = 0;
while let Some(task) = service_clone.dequeue_task().await.unwrap() {
// Simulate processing
tokio::time::sleep(Duration::from_millis(10)).await;
service_clone.complete_task(task.task_id).await.unwrap();
processed += 1;
}
processed
});
worker_handles.push(handle);
}

// Wait for all workers
let total_processed: usize = futures::future::join_all(worker_handles)
.await
.into_iter()
.map(|r| r.unwrap())
.sum();

assert_eq!(total_processed, 1000);
}

↑ Back to Top

8. Performance Benchmarks​

Required Performance Metrics​

// File: benches/queue_benchmarks.rs
const BENCHMARK_TASKS: usize = 100_000;
const TARGET_THROUGHPUT: usize = 10_000; // tasks/second
const MAX_ASSIGNMENT_LATENCY_MS: u64 = 100;

#[bench]
fn bench_task_submission(b: &mut Bencher) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let service = runtime.block_on(setup_service());

b.iter(|| {
runtime.block_on(async {
for _ in 0..1000 {
let task = generate_random_task();
service.submit_task(task).await.unwrap();
}
})
});
}

#[bench]
fn bench_task_dequeue(b: &mut Bencher) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let service = runtime.block_on(setup_service());

// Pre-populate queue
runtime.block_on(async {
for _ in 0..10_000 {
service.submit_task(generate_random_task()).await.unwrap();
}
});

b.iter(|| {
runtime.block_on(async {
service.dequeue_task(WorkerType::AIAgent).await
})
});
}

#[bench]
fn bench_concurrent_operations(b: &mut Bencher) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let service = runtime.block_on(setup_service());

b.iter(|| {
runtime.block_on(async {
let mut handles = vec![];

// Mix of operations
for i in 0..100 {
let service_clone = service.clone();
let handle = tokio::spawn(async move {
if i % 3 == 0 {
// Submit
service_clone.submit_task(generate_random_task()).await
} else if i % 3 == 1 {
// Dequeue
service_clone.dequeue_task(WorkerType::AIAgent).await.ok();
} else {
// Status check
service_clone.get_task_status(test_tenant_id(), Uuid::new_v4()).await.ok();
}
});
handles.push(handle);
}

futures::future::join_all(handles).await;
})
});
}

impl PerformanceValidator {
pub async fn validate_requirements(&self) -> Result<ValidationReport> {
let mut report = ValidationReport::default();

// Test submission latency
let start = Instant::now();
self.service.submit_task(generate_random_task()).await?;
let submission_latency = start.elapsed();
report.submission_latency_ms = submission_latency.as_millis() as u64;

// Test assignment latency
let start = Instant::now();
self.service.dequeue_task(WorkerType::AIAgent).await?;
let assignment_latency = start.elapsed();
report.assignment_latency_ms = assignment_latency.as_millis() as u64;

// Test throughput
let throughput = self.measure_throughput(Duration::from_secs(10)).await?;
report.throughput_per_second = throughput;

// Validate against requirements
report.meets_requirements =
report.assignment_latency_ms <= MAX_ASSIGNMENT_LATENCY_MS &&
report.throughput_per_second >= TARGET_THROUGHPUT;

Ok(report)
}
}

↑ Back to Top

9. Security Controls​

Access Control​

impl QueueAccessControl {
pub fn can_submit_task(&self, claims: &Claims, task_type: &TaskType) -> bool {
claims.tenant_id == task.tenant_id &&
match task_type {
TaskType::Deployment => claims.roles.contains("deploy"),
_ => claims.roles.contains("developer"),
}
}

pub fn can_view_queue(&self, claims: &Claims, tenant_id: Uuid) -> bool {
claims.tenant_id == tenant_id &&
claims.roles.contains("developer")
}
}

↑ Back to Top

10. Logging and Error Handling​

Logging Pattern​

use tracing::{info, warn, error};

pub fn log_queue_event(action: &str, task_id: Uuid, details: serde_json::Value) {
info!(
action = action,
task_id = %task_id,
details = ?details,
"Queue event"
);
}

pub fn log_queue_error(action: &str, task_id: Uuid, error: &Error) {
error!(
action = action,
task_id = %task_id,
error = %error,
"Queue operation failed"
);
}

Error Handling​

#[derive(thiserror::Error, Debug)]
pub enum QueueError {
#[error("Queue is full. Please try again later.")]
QueueFull,

#[error("Task not found: {0}")]
TaskNotFound(Uuid),

#[error("No workers available for task type: {0:?}")]
NoWorkersAvailable(TaskType),
}

↑ Back to Top

11. References​

↑ Back to Top

12. Approval Signatures​

Technical Sign-off​

ComponentOwnerApprovedDate
ArchitectureSession6✓2025-08-31
ImplementationPending--
Security ReviewPending--
Performance TestPending--

Implementation Checklist​

  • Queue models implemented
  • Repository pattern with FDB
  • Priority queue logic
  • Worker Registry with capability matching
  • Task Scheduler with dependency resolution
  • Queue Monitor with metrics collection
  • Retry mechanism with exponential backoff
  • Dead Letter Queue handler with bulk retry
  • API endpoints for submit/status
  • Unit tests with worker and dependency tests
  • Integration tests covering multi-tenancy
  • Load tests demonstrating 10k tasks/second
  • Performance benchmarks with validation
  • Security controls validated

↑ Back to Top