Skip to main content

CODITECT Task Queue Manager - Redis-based Persistent Task Queue

Part of Track H.2: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications

This module provides:

  • Task: Data model for tasks with status, priority, dependencies
  • TaskQueueManager: Redis-backed task queue with priority and dependency resolution
  • TaskQueueConfig: Configuration management
  • Local fallback for development without Redis

Features:

  • Priority-based task scheduling (1-10 scale)
  • Task dependencies with automatic unblocking
  • Deadlock detection for circular dependencies
  • Retry with exponential backoff
  • Task expiration and cleanup
  • Statistics and monitoring

Usage: from scripts.core.task_queue_manager import TaskQueueManager, Task, TaskPriority

manager = TaskQueueManager()
await manager.connect()

# Enqueue a task
task = Task(
description="Process data",
agent="data-processor",
inputs={"data": "..."}
)
task_id = await manager.enqueue(task, priority=TaskPriority.HIGH)

# Enqueue with dependencies
task2_id = await manager.enqueue(
task2,
depends_on=[task_id]
)

# Dequeue next task
next_task = await manager.dequeue()

# Complete task (unblocks dependents)
await manager.complete(next_task.id, result={"status": "done"})

Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0

File: task_queue_manager.py

Classes

TaskStatus

Status of a task in the queue.

TaskPriority

Priority levels for tasks (higher = more urgent).

Task

Data model for a task in the queue.

TaskQueueConfig

Configuration for TaskQueueManager.

LocalTaskQueue

Local in-memory task queue for development without Redis.

TaskQueueManager

Redis-backed task queue manager with priority and dependency resolution.

Functions

create_task(description, agent, inputs, priority, depends_on, metadata)

Create a new Task instance.

to_dict()

Convert to dictionary for serialization.

to_json()

Convert to JSON string.

from_dict(cls, data)

Create Task from dictionary.

from_json(cls, json_str)

Create Task from JSON string.

is_expired()

Check if task has expired.

can_retry()

Check if task can be retried.

url()

Generate Redis URL from config.

from_env(cls)

Create config from environment variables.

enqueue(task, priority, depends_on)

Add task to queue.

dequeue()

Get next highest priority task.

complete(task_id, result)

Mark task as complete and unblock dependents.

fail(task_id, error, retry)

Mark task as failed.

get_task(task_id)

Get task by ID.

get_stats()

Get queue statistics.

Usage

python task_queue_manager.py