#!/usr/bin/env python3 """ Unit Tests for CODITECT Task Queue Manager
Part of Track H.2: Inter-Agent Communication Infrastructure
Tests cover:
- Task data model serialization/deserialization
- TaskQueueConfig from various sources
- LocalTaskQueue functionality
- TaskQueueManager operations
- Priority queue ordering
- Dependency resolution and unblocking
- Deadlock detection
- Retry logic
Run tests: cd scripts/core python -m pytest test_task_queue_manager.py -v
# Or directly
python test_task_queue_manager.py
Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0 """
import asyncio import json import os import sys import tempfile import time import unittest from datetime import datetime, timedelta from pathlib import Path from typing import List from unittest.mock import patch
Add script directory to path
script_dir = Path(file).parent sys.path.insert(0, str(script_dir))
from task_queue_manager import ( Task, TaskStatus, TaskPriority, TaskQueueConfig, TaskQueueManager, LocalTaskQueue, create_task, )
class TestTask(unittest.TestCase): """Test Task dataclass."""
def test_create_default_task(self):
"""Test creating a task with default values."""
task = Task()
self.assertIsNotNone(task.id)
self.assertEqual(len(task.id), 36) # UUID format
self.assertEqual(task.description, "")
self.assertEqual(task.agent, "")
self.assertEqual(task.status, TaskStatus.PENDING.value)
self.assertEqual(task.priority, TaskPriority.NORMAL.value)
self.assertEqual(task.retry_count, 0)
self.assertEqual(task.max_retries, 3)
def test_create_custom_task(self):
"""Test creating a task with custom values."""
task = Task(
description="Process data",
agent="data-processor",
inputs={"data": [1, 2, 3]},
priority=TaskPriority.HIGH.value,
max_retries=5,
)
self.assertEqual(task.description, "Process data")
self.assertEqual(task.agent, "data-processor")
self.assertEqual(task.inputs, {"data": [1, 2, 3]})
self.assertEqual(task.priority, 7)
self.assertEqual(task.max_retries, 5)
def test_to_dict(self):
"""Test converting task to dictionary."""
task = Task(
description="Test task",
agent="test-agent",
)
data = task.to_dict()
self.assertIsInstance(data, dict)
self.assertEqual(data["description"], "Test task")
self.assertEqual(data["agent"], "test-agent")
self.assertIn("id", data)
self.assertIn("created_at", data)
def test_to_json(self):
"""Test converting task to JSON string."""
task = Task(
description="Test task",
inputs={"key": "value"},
)
json_str = task.to_json()
self.assertIsInstance(json_str, str)
parsed = json.loads(json_str)
self.assertEqual(parsed["description"], "Test task")
self.assertEqual(parsed["inputs"], {"key": "value"})
def test_from_dict(self):
"""Test creating task from dictionary."""
data = {
"id": "task-123",
"description": "From dict",
"agent": "test-agent",
"status": "in_progress",
"priority": 9,
}
task = Task.from_dict(data)
self.assertEqual(task.id, "task-123")
self.assertEqual(task.description, "From dict")
self.assertEqual(task.agent, "test-agent")
self.assertEqual(task.status, "in_progress")
self.assertEqual(task.priority, 9)
def test_from_json(self):
"""Test creating task from JSON string."""
json_str = json.dumps({
"description": "From JSON",
"agent": "json-agent",
"inputs": {"data": 123},
})
task = Task.from_json(json_str)
self.assertEqual(task.description, "From JSON")
self.assertEqual(task.agent, "json-agent")
self.assertEqual(task.inputs, {"data": 123})
def test_round_trip_serialization(self):
"""Test that task survives JSON round-trip."""
original = Task(
description="Round trip test",
agent="test-agent",
inputs={"nested": {"data": [1, 2, 3]}},
metadata={"key": "value"},
)
json_str = original.to_json()
restored = Task.from_json(json_str)
self.assertEqual(original.description, restored.description)
self.assertEqual(original.agent, restored.agent)
self.assertEqual(original.inputs, restored.inputs)
self.assertEqual(original.metadata, restored.metadata)
def test_is_expired_false(self):
"""Test that new task is not expired."""
task = Task(ttl_seconds=3600) # 1 hour
self.assertFalse(task.is_expired)
def test_can_retry_true(self):
"""Test that task with retries remaining can retry."""
task = Task(retry_count=0, max_retries=3)
self.assertTrue(task.can_retry)
def test_can_retry_false(self):
"""Test that task with no retries remaining cannot retry."""
task = Task(retry_count=3, max_retries=3)
self.assertFalse(task.can_retry)
class TestTaskStatus(unittest.TestCase): """Test TaskStatus enum."""
def test_status_values(self):
"""Test all status values."""
self.assertEqual(TaskStatus.PENDING.value, "pending")
self.assertEqual(TaskStatus.BLOCKED.value, "blocked")
self.assertEqual(TaskStatus.READY.value, "ready")
self.assertEqual(TaskStatus.IN_PROGRESS.value, "in_progress")
self.assertEqual(TaskStatus.COMPLETED.value, "completed")
self.assertEqual(TaskStatus.FAILED.value, "failed")
self.assertEqual(TaskStatus.CANCELLED.value, "cancelled")
class TestTaskPriority(unittest.TestCase): """Test TaskPriority enum."""
def test_priority_order(self):
"""Test priority values are ordered correctly."""
self.assertLess(TaskPriority.BACKGROUND.value, TaskPriority.LOW.value)
self.assertLess(TaskPriority.LOW.value, TaskPriority.NORMAL.value)
self.assertLess(TaskPriority.NORMAL.value, TaskPriority.HIGH.value)
self.assertLess(TaskPriority.HIGH.value, TaskPriority.URGENT.value)
self.assertLess(TaskPriority.URGENT.value, TaskPriority.CRITICAL.value)
def test_priority_values(self):
"""Test specific priority values."""
self.assertEqual(TaskPriority.BACKGROUND.value, 1)
self.assertEqual(TaskPriority.NORMAL.value, 5)
self.assertEqual(TaskPriority.CRITICAL.value, 10)
class TestTaskQueueConfig(unittest.TestCase): """Test TaskQueueConfig dataclass."""
def test_default_config(self):
"""Test default configuration values."""
config = TaskQueueConfig()
self.assertEqual(config.host, "localhost")
self.assertEqual(config.port, 6379)
self.assertIsNone(config.password)
self.assertEqual(config.db, 0)
def test_custom_config(self):
"""Test custom configuration values."""
config = TaskQueueConfig(
host="redis.example.com",
port=6380,
password="secret",
db=5,
)
self.assertEqual(config.host, "redis.example.com")
self.assertEqual(config.port, 6380)
self.assertEqual(config.password, "secret")
self.assertEqual(config.db, 5)
def test_url_property(self):
"""Test URL generation."""
config = TaskQueueConfig(
host="localhost",
port=6379,
db=1,
)
self.assertEqual(config.url, "redis://localhost:6379/1")
def test_url_with_password(self):
"""Test URL generation with password."""
config = TaskQueueConfig(
host="localhost",
port=6379,
password="secret",
db=0,
)
self.assertEqual(config.url, "redis://:secret@localhost:6379/0")
def test_from_env(self):
"""Test creating config from environment variables."""
env_vars = {
"REDIS_HOST": "env-host",
"REDIS_PORT": "6380",
"REDIS_PASSWORD": "env-pass",
"REDIS_DB": "2",
}
with patch.dict(os.environ, env_vars, clear=False):
config = TaskQueueConfig.from_env()
self.assertEqual(config.host, "env-host")
self.assertEqual(config.port, 6380)
self.assertEqual(config.password, "env-pass")
self.assertEqual(config.db, 2)
class TestLocalTaskQueue(unittest.TestCase): """Test LocalTaskQueue for development mode."""
def setUp(self):
"""Set up test fixtures."""
self.queue = LocalTaskQueue()
def test_enqueue_simple_task(self):
"""Test enqueuing a simple task."""
task = Task(description="Test task", agent="test-agent")
task_id = self.queue.enqueue(task)
self.assertEqual(task_id, task.id)
self.assertEqual(task.status, TaskStatus.READY.value)
def test_enqueue_with_priority(self):
"""Test enqueuing tasks with different priorities."""
low_task = Task(description="Low priority")
high_task = Task(description="High priority")
self.queue.enqueue(low_task, priority=TaskPriority.LOW.value)
self.queue.enqueue(high_task, priority=TaskPriority.HIGH.value)
# High priority should be dequeued first
next_task = self.queue.dequeue()
self.assertEqual(next_task.description, "High priority")
def test_dequeue_empty_queue(self):
"""Test dequeuing from empty queue."""
task = self.queue.dequeue()
self.assertIsNone(task)
def test_dequeue_updates_status(self):
"""Test that dequeue updates task status."""
task = Task(description="Test task")
self.queue.enqueue(task)
dequeued = self.queue.dequeue()
self.assertEqual(dequeued.status, TaskStatus.IN_PROGRESS.value)
self.assertIsNotNone(dequeued.started_at)
def test_complete_task(self):
"""Test completing a task."""
task = Task(description="Test task")
self.queue.enqueue(task)
self.queue.dequeue()
self.queue.complete(task.id, result={"status": "done"})
completed = self.queue.get_task(task.id)
self.assertEqual(completed.status, TaskStatus.COMPLETED.value)
self.assertEqual(completed.result, {"status": "done"})
def test_fail_task_with_retry(self):
"""Test failing a task with retry."""
task = Task(description="Test task", max_retries=3)
self.queue.enqueue(task)
self.queue.dequeue()
self.queue.fail(task.id, "Test error", retry=True)
failed = self.queue.get_task(task.id)
self.assertEqual(failed.retry_count, 1)
self.assertEqual(failed.error, "Test error")
def test_fail_task_permanently(self):
"""Test failing a task permanently."""
task = Task(description="Test task", max_retries=0)
self.queue.enqueue(task)
self.queue.dequeue()
self.queue.fail(task.id, "Permanent error", retry=True)
failed = self.queue.get_task(task.id)
self.assertEqual(failed.status, TaskStatus.FAILED.value)
def test_dependency_blocking(self):
"""Test that tasks with dependencies are blocked."""
parent = Task(description="Parent task")
child = Task(description="Child task")
parent_id = self.queue.enqueue(parent)
child_id = self.queue.enqueue(child, depends_on=[parent_id])
child_task = self.queue.get_task(child_id)
self.assertEqual(child_task.status, TaskStatus.BLOCKED.value)
def test_dependency_unblocking(self):
"""Test that completing parent unblocks child."""
parent = Task(description="Parent task")
child = Task(description="Child task")
parent_id = self.queue.enqueue(parent)
child_id = self.queue.enqueue(child, depends_on=[parent_id])
# Complete parent
self.queue.dequeue() # Gets parent
self.queue.complete(parent_id)
# Child should now be ready
child_task = self.queue.get_task(child_id)
self.assertEqual(child_task.status, TaskStatus.READY.value)
def test_multiple_dependencies(self):
"""Test task with multiple dependencies."""
dep1 = Task(description="Dependency 1")
dep2 = Task(description="Dependency 2")
child = Task(description="Child task")
dep1_id = self.queue.enqueue(dep1)
dep2_id = self.queue.enqueue(dep2)
child_id = self.queue.enqueue(child, depends_on=[dep1_id, dep2_id])
# Child should be blocked
child_task = self.queue.get_task(child_id)
self.assertEqual(child_task.status, TaskStatus.BLOCKED.value)
# Complete first dependency
self.queue.dequeue()
self.queue.complete(dep1_id)
# Child should still be blocked
child_task = self.queue.get_task(child_id)
self.assertEqual(child_task.status, TaskStatus.BLOCKED.value)
# Complete second dependency
self.queue.dequeue()
self.queue.complete(dep2_id)
# Child should now be ready
child_task = self.queue.get_task(child_id)
self.assertEqual(child_task.status, TaskStatus.READY.value)
def test_get_stats(self):
"""Test getting queue statistics."""
task1 = Task(description="Task 1")
task2 = Task(description="Task 2")
self.queue.enqueue(task1)
self.queue.enqueue(task2, depends_on=[task1.id])
stats = self.queue.get_stats()
self.assertEqual(stats["ready"], 1)
self.assertEqual(stats["blocked"], 1)
self.assertIn("total_tasks", stats)
def test_deadlock_detection_no_deadlock(self):
"""Test deadlock detection with no deadlock."""
task1 = Task(description="Task 1")
task2 = Task(description="Task 2")
self.queue.enqueue(task1)
self.queue.enqueue(task2, depends_on=[task1.id])
cycles = self.queue.detect_deadlocks()
self.assertEqual(len(cycles), 0)
def test_priority_ordering(self):
"""Test that tasks are dequeued in priority order."""
tasks = [
(Task(description="Background"), TaskPriority.BACKGROUND.value),
(Task(description="Normal"), TaskPriority.NORMAL.value),
(Task(description="Critical"), TaskPriority.CRITICAL.value),
(Task(description="Low"), TaskPriority.LOW.value),
(Task(description="High"), TaskPriority.HIGH.value),
]
for task, priority in tasks:
self.queue.enqueue(task, priority=priority)
# Dequeue should return in priority order (highest first)
order = []
while True:
task = self.queue.dequeue()
if task is None:
break
order.append(task.description)
self.assertEqual(order[0], "Critical")
self.assertEqual(order[-1], "Background")
class TestTaskQueueManagerLocalMode(unittest.TestCase): """Test TaskQueueManager in local fallback mode (no Redis)."""
def test_connect_local_mode(self):
"""Test connecting in local mode."""
async def run_test():
manager = TaskQueueManager()
connected = await manager.connect(use_local_fallback=True)
self.assertTrue(connected)
self.assertTrue(manager.is_connected)
self.assertTrue(manager.is_local_mode)
await manager.disconnect()
asyncio.run(run_test())
def test_disconnect(self):
"""Test disconnecting from manager."""
async def run_test():
manager = TaskQueueManager()
await manager.connect(use_local_fallback=True)
self.assertTrue(manager.is_connected)
await manager.disconnect()
self.assertFalse(manager.is_connected)
asyncio.run(run_test())
def test_enqueue_requires_connection(self):
"""Test that enqueue requires connection."""
async def run_test():
manager = TaskQueueManager()
task = Task(description="Test")
with self.assertRaises(RuntimeError) as ctx:
await manager.enqueue(task)
self.assertIn("Not connected", str(ctx.exception))
asyncio.run(run_test())
def test_enqueue_and_dequeue(self):
"""Test basic enqueue and dequeue."""
async def run_test():
manager = TaskQueueManager()
await manager.connect(use_local_fallback=True)
task = Task(description="Test task", agent="test-agent")
task_id = await manager.enqueue(task)
self.assertIsNotNone(task_id)
dequeued = await manager.dequeue()
self.assertEqual(dequeued.id, task_id)
self.assertEqual(dequeued.description, "Test task")
await manager.disconnect()
asyncio.run(run_test())
def test_complete_task(self):
"""Test completing a task."""
async def run_test():
manager = TaskQueueManager()
await manager.connect(use_local_fallback=True)
task = Task(description="Test task")
task_id = await manager.enqueue(task)
await manager.dequeue()
await manager.complete(task_id, result={"status": "done"})
completed = await manager.get_task(task_id)
self.assertEqual(completed.status, TaskStatus.COMPLETED.value)
await manager.disconnect()
asyncio.run(run_test())
def test_fail_task(self):
"""Test failing a task."""
async def run_test():
manager = TaskQueueManager()
await manager.connect(use_local_fallback=True)
task = Task(description="Test task", max_retries=0)
task_id = await manager.enqueue(task)
await manager.dequeue()
await manager.fail(task_id, "Test error", retry=False)
failed = await manager.get_task(task_id)
self.assertEqual(failed.status, TaskStatus.FAILED.value)
self.assertEqual(failed.error, "Test error")
await manager.disconnect()
asyncio.run(run_test())
def test_cancel_task(self):
"""Test cancelling a task."""
async def run_test():
manager = TaskQueueManager()
await manager.connect(use_local_fallback=True)
task = Task(description="Test task")
task_id = await manager.enqueue(task)
await manager.cancel(task_id)
cancelled = await manager.get_task(task_id)
self.assertEqual(cancelled.status, TaskStatus.CANCELLED.value)
await manager.disconnect()
asyncio.run(run_test())
def test_get_stats(self):
"""Test getting queue statistics."""
async def run_test():
manager = TaskQueueManager()
await manager.connect(use_local_fallback=True)
task1 = Task(description="Task 1")
task2 = Task(description="Task 2")
await manager.enqueue(task1)
await manager.enqueue(task2, depends_on=[task1.id])
stats = await manager.get_stats()
self.assertIn("ready", stats)
self.assertIn("blocked", stats)
await manager.disconnect()
asyncio.run(run_test())
def test_detect_deadlocks(self):
"""Test deadlock detection."""
async def run_test():
manager = TaskQueueManager()
await manager.connect(use_local_fallback=True)
task = Task(description="Test")
await manager.enqueue(task)
cycles = await manager.detect_deadlocks()
self.assertEqual(len(cycles), 0)
await manager.disconnect()
asyncio.run(run_test())
def test_dependency_chain(self):
"""Test a chain of dependencies."""
async def run_test():
manager = TaskQueueManager()
await manager.connect(use_local_fallback=True)
# Create chain: task1 -> task2 -> task3
task1 = Task(description="Task 1")
task2 = Task(description="Task 2")
task3 = Task(description="Task 3")
task1_id = await manager.enqueue(task1)
task2_id = await manager.enqueue(task2, depends_on=[task1_id])
task3_id = await manager.enqueue(task3, depends_on=[task2_id])
# Only task1 should be ready
stats = await manager.get_stats()
self.assertEqual(stats["ready"], 1)
self.assertEqual(stats["blocked"], 2)
# Complete task1
await manager.dequeue()
await manager.complete(task1_id)
# Now task2 should be ready, task3 still blocked
stats = await manager.get_stats()
self.assertEqual(stats["ready"], 1)
self.assertEqual(stats["blocked"], 1)
# Complete task2
await manager.dequeue()
await manager.complete(task2_id)
# Now task3 should be ready
stats = await manager.get_stats()
self.assertEqual(stats["ready"], 1)
self.assertEqual(stats["blocked"], 0)
await manager.disconnect()
asyncio.run(run_test())
class TestCreateTaskHelper(unittest.TestCase): """Test create_task helper function."""
def test_create_task_simple(self):
"""Test creating a simple task."""
task = create_task(
description="Test task",
agent="test-agent",
)
self.assertEqual(task.description, "Test task")
self.assertEqual(task.agent, "test-agent")
self.assertEqual(task.priority, TaskPriority.NORMAL.value)
def test_create_task_with_all_options(self):
"""Test creating a task with all options."""
task = create_task(
description="Full task",
agent="complex-agent",
inputs={"data": [1, 2, 3]},
priority=TaskPriority.HIGH.value,
depends_on=["task-1", "task-2"],
metadata={"source": "test"},
)
self.assertEqual(task.description, "Full task")
self.assertEqual(task.agent, "complex-agent")
self.assertEqual(task.inputs, {"data": [1, 2, 3]})
self.assertEqual(task.priority, TaskPriority.HIGH.value)
self.assertEqual(task.dependencies, ["task-1", "task-2"])
self.assertEqual(task.metadata, {"source": "test"})
def run_tests(): """Run all tests and return results.""" loader = unittest.TestLoader() suite = unittest.TestSuite()
# Add all test classes
test_classes = [
TestTask,
TestTaskStatus,
TestTaskPriority,
TestTaskQueueConfig,
TestLocalTaskQueue,
TestTaskQueueManagerLocalMode,
TestCreateTaskHelper,
]
for test_class in test_classes:
suite.addTests(loader.loadTestsFromTestCase(test_class))
# Run tests
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
# Print summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
print(f"Tests Run: {result.testsRun}")
print(f"Failures: {len(result.failures)}")
print(f"Errors: {len(result.errors)}")
print(f"Skipped: {len(result.skipped)}")
success = result.wasSuccessful()
print(f"\nOverall: {'PASSED' if success else 'FAILED'}")
print("="*70)
return result
if name == "main": result = run_tests() sys.exit(0 if result.wasSuccessful() else 1)