CODITECT Task Queue Manager - Redis-based Persistent Task Queue
Part of Track H.2: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications
This module provides:
- Task: Data model for tasks with status, priority, dependencies
- TaskQueueManager: Redis-backed task queue with priority and dependency resolution
- TaskQueueConfig: Configuration management
- Local fallback for development without Redis
Features:
- Priority-based task scheduling (1-10 scale)
- Task dependencies with automatic unblocking
- Deadlock detection for circular dependencies
- Retry with exponential backoff
- Task expiration and cleanup
- Statistics and monitoring
Usage: from scripts.core.task_queue_manager import TaskQueueManager, Task, TaskPriority
manager = TaskQueueManager()
await manager.connect()
# Enqueue a task
task = Task(
description="Process data",
agent="data-processor",
inputs={"data": "..."}
)
task_id = await manager.enqueue(task, priority=TaskPriority.HIGH)
# Enqueue with dependencies
task2_id = await manager.enqueue(
task2,
depends_on=[task_id]
)
# Dequeue next task
next_task = await manager.dequeue()
# Complete task (unblocks dependents)
await manager.complete(next_task.id, result={"status": "done"})
Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0
File: task_queue_manager.py
Classes
TaskStatus
Status of a task in the queue.
TaskPriority
Priority levels for tasks (higher = more urgent).
Task
Data model for a task in the queue.
TaskQueueConfig
Configuration for TaskQueueManager.
LocalTaskQueue
Local in-memory task queue for development without Redis.
TaskQueueManager
Redis-backed task queue manager with priority and dependency resolution.
Functions
create_task(description, agent, inputs, priority, depends_on, metadata)
Create a new Task instance.
to_dict()
Convert to dictionary for serialization.
to_json()
Convert to JSON string.
from_dict(cls, data)
Create Task from dictionary.
from_json(cls, json_str)
Create Task from JSON string.
is_expired()
Check if task has expired.
can_retry()
Check if task can be retried.
url()
Generate Redis URL from config.
from_env(cls)
Create config from environment variables.
enqueue(task, priority, depends_on)
Add task to queue.
dequeue()
Get next highest priority task.
complete(task_id, result)
Mark task as complete and unblock dependents.
fail(task_id, error, retry)
Mark task as failed.
get_task(task_id)
Get task by ID.
get_stats()
Get queue statistics.
Usage
python task_queue_manager.py