#!/usr/bin/env python3 """ CODITECT Task Queue Manager - Redis-based Persistent Task Queue
Part of Track H.2: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications
This module provides:
- Task: Data model for tasks with status, priority, dependencies
- TaskQueueManager: Redis-backed task queue with priority and dependency resolution
- TaskQueueConfig: Configuration management
- Local fallback for development without Redis
Features:
- Priority-based task scheduling (1-10 scale)
- Task dependencies with automatic unblocking
- Deadlock detection for circular dependencies
- Retry with exponential backoff
- Task expiration and cleanup
- Statistics and monitoring
Usage: from scripts.core.task_queue_manager import TaskQueueManager, Task, TaskPriority
manager = TaskQueueManager()
await manager.connect()
# Enqueue a task
task = Task(
description="Process data",
agent="data-processor",
inputs={"data": "..."}
)
task_id = await manager.enqueue(task, priority=TaskPriority.HIGH)
# Enqueue with dependencies
task2_id = await manager.enqueue(
task2,
depends_on=[task_id]
)
# Dequeue next task
next_task = await manager.dequeue()
# Complete task (unblocks dependents)
await manager.complete(next_task.id, result={"status": "done"})
Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0 """
import asyncio import json import logging import os import time import uuid from dataclasses import dataclass, field, asdict from datetime import datetime, timedelta from enum import Enum from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union import heapq import threading
Configure logging
logger = logging.getLogger(name)
Try to import Redis, fall back to local mode if not available
try: import redis.asyncio as aioredis from redis.asyncio import Redis as AsyncRedis REDIS_AVAILABLE = True except ImportError: try: import aioredis from aioredis import Redis as AsyncRedis REDIS_AVAILABLE = True except ImportError: REDIS_AVAILABLE = False logger.warning("redis package not installed. Using local task queue fallback.")
class TaskStatus(Enum): """Status of a task in the queue.""" PENDING = "pending" BLOCKED = "blocked" # Waiting for dependencies READY = "ready" # Dependencies satisfied, in ready queue IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" EXPIRED = "expired"
class TaskPriority(Enum): """Priority levels for tasks (higher = more urgent).""" BACKGROUND = 1 LOW = 3 NORMAL = 5 HIGH = 7 URGENT = 9 CRITICAL = 10
@dataclass class Task: """ Data model for a task in the queue.
Attributes:
id: Unique task identifier
description: Human-readable task description
agent: Target agent ID to execute the task
inputs: Input data for the task
status: Current task status
priority: Task priority (1-10)
dependencies: List of task IDs this task depends on
created_at: Task creation timestamp
started_at: Task execution start timestamp
completed_at: Task completion timestamp
result: Task execution result
error: Error message if failed
retry_count: Number of retry attempts
max_retries: Maximum retry attempts allowed
ttl_seconds: Time-to-live for the task
metadata: Additional metadata
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
description: str = ""
agent: str = ""
inputs: Dict[str, Any] = field(default_factory=dict)
status: str = TaskStatus.PENDING.value
priority: int = TaskPriority.NORMAL.value
dependencies: List[str] = field(default_factory=list)
created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
started_at: Optional[str] = None
completed_at: Optional[str] = None
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
ttl_seconds: int = 3600 # 1 hour default
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return asdict(self)
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), default=str)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Task":
"""Create Task from dictionary."""
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
@classmethod
def from_json(cls, json_str: str) -> "Task":
"""Create Task from JSON string."""
return cls.from_dict(json.loads(json_str))
@property
def is_expired(self) -> bool:
"""Check if task has expired."""
created = datetime.fromisoformat(self.created_at)
expiry = created + timedelta(seconds=self.ttl_seconds)
return datetime.utcnow() > expiry
@property
def can_retry(self) -> bool:
"""Check if task can be retried."""
return self.retry_count < self.max_retries
@dataclass class TaskQueueConfig: """Configuration for TaskQueueManager.""" host: str = "localhost" port: int = 6379 password: Optional[str] = None db: int = 0 decode_responses: bool = True
# Queue settings
ready_queue_key: str = "coditect:task_queue:ready"
blocked_queue_key: str = "coditect:task_queue:blocked"
in_progress_key: str = "coditect:task_queue:in_progress"
task_prefix: str = "coditect:task"
# Cleanup settings
completed_ttl_seconds: int = 86400 # 24 hours
failed_ttl_seconds: int = 604800 # 7 days
@property
def url(self) -> str:
"""Generate Redis URL from config."""
auth = f":{self.password}@" if self.password else ""
return f"redis://{auth}{self.host}:{self.port}/{self.db}"
@classmethod
def from_env(cls) -> "TaskQueueConfig":
"""Create config from environment variables."""
return cls(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
password=os.getenv("REDIS_PASSWORD"),
db=int(os.getenv("REDIS_DB", "0")),
)
class LocalTaskQueue: """ Local in-memory task queue for development without Redis.
Provides the same interface as Redis-backed TaskQueueManager
but uses Python data structures for local testing.
"""
def __init__(self):
self._tasks: Dict[str, Task] = {}
self._ready_queue: List[Tuple[int, float, str]] = [] # (neg_priority, timestamp, task_id)
self._blocked_tasks: Set[str] = set()
self._in_progress: Set[str] = set()
self._dependencies: Dict[str, Set[str]] = {} # task_id -> depends_on
self._dependents: Dict[str, Set[str]] = {} # task_id -> tasks depending on this
self._lock = threading.Lock()
logger.info("Using local in-memory task queue (no Redis)")
def enqueue(
self,
task: Task,
priority: int = TaskPriority.NORMAL.value,
depends_on: Optional[List[str]] = None,
) -> str:
"""Add task to queue."""
with self._lock:
task.priority = priority
task.dependencies = depends_on or []
self._tasks[task.id] = task
if depends_on:
# Track dependencies
self._dependencies[task.id] = set(depends_on)
for dep_id in depends_on:
if dep_id not in self._dependents:
self._dependents[dep_id] = set()
self._dependents[dep_id].add(task.id)
# Check if already satisfied
unsatisfied = [
d for d in depends_on
if d in self._tasks and self._tasks[d].status != TaskStatus.COMPLETED.value
]
if unsatisfied:
task.status = TaskStatus.BLOCKED.value
self._blocked_tasks.add(task.id)
else:
# All dependencies completed
task.status = TaskStatus.READY.value
heapq.heappush(
self._ready_queue,
(-priority, time.time(), task.id)
)
else:
task.status = TaskStatus.READY.value
heapq.heappush(
self._ready_queue,
(-priority, time.time(), task.id)
)
return task.id
def dequeue(self) -> Optional[Task]:
"""Get next highest priority task."""
with self._lock:
while self._ready_queue:
neg_priority, timestamp, task_id = heapq.heappop(self._ready_queue)
if task_id not in self._tasks:
continue
task = self._tasks[task_id]
if task.status != TaskStatus.READY.value:
continue
task.status = TaskStatus.IN_PROGRESS.value
task.started_at = datetime.utcnow().isoformat()
self._in_progress.add(task_id)
return task
return None
def complete(self, task_id: str, result: Optional[Dict[str, Any]] = None):
"""Mark task as complete and unblock dependents."""
with self._lock:
if task_id not in self._tasks:
return
task = self._tasks[task_id]
task.status = TaskStatus.COMPLETED.value
task.completed_at = datetime.utcnow().isoformat()
task.result = result
self._in_progress.discard(task_id)
# Unblock dependents
for dependent_id in self._dependents.get(task_id, set()):
if dependent_id not in self._dependencies:
continue
self._dependencies[dependent_id].discard(task_id)
if not self._dependencies[dependent_id]:
# All dependencies satisfied
self._blocked_tasks.discard(dependent_id)
if dependent_id in self._tasks:
dependent = self._tasks[dependent_id]
dependent.status = TaskStatus.READY.value
heapq.heappush(
self._ready_queue,
(-dependent.priority, time.time(), dependent_id)
)
def fail(self, task_id: str, error: str, retry: bool = True):
"""Mark task as failed."""
with self._lock:
if task_id not in self._tasks:
return
task = self._tasks[task_id]
self._in_progress.discard(task_id)
if retry and task.can_retry:
task.retry_count += 1
task.error = error
task.status = TaskStatus.READY.value
# Re-add with lower effective priority after delay
heapq.heappush(
self._ready_queue,
(-task.priority + task.retry_count, time.time() + 2 ** task.retry_count, task_id)
)
else:
task.status = TaskStatus.FAILED.value
task.error = error
task.completed_at = datetime.utcnow().isoformat()
def get_task(self, task_id: str) -> Optional[Task]:
"""Get task by ID."""
return self._tasks.get(task_id)
def get_stats(self) -> Dict[str, int]:
"""Get queue statistics."""
with self._lock:
status_counts = {}
for task in self._tasks.values():
status_counts[task.status] = status_counts.get(task.status, 0) + 1
return {
"total_tasks": len(self._tasks),
"ready": len(self._ready_queue),
"blocked": len(self._blocked_tasks),
"in_progress": len(self._in_progress),
**status_counts,
}
def detect_deadlocks(self) -> List[List[str]]:
"""Detect circular dependencies."""
with self._lock:
# Build adjacency list
graph: Dict[str, List[str]] = {}
for task_id in self._blocked_tasks:
deps = self._dependencies.get(task_id, set())
graph[task_id] = list(deps)
# Find cycles using DFS
cycles = []
visited = set()
rec_stack = set()
def dfs(node: str, path: List[str]) -> bool:
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
if dfs(neighbor, path):
return True
elif neighbor in rec_stack:
# Found cycle
cycle_start = path.index(neighbor)
cycles.append(path[cycle_start:] + [neighbor])
return True
path.pop()
rec_stack.remove(node)
return False
for node in graph:
if node not in visited:
dfs(node, [])
return cycles
class TaskQueueManager: """ Redis-backed task queue manager with priority and dependency resolution.
Features:
- Priority-based scheduling (higher priority dequeued first)
- Task dependencies with automatic unblocking
- Deadlock detection for circular dependencies
- Retry with exponential backoff
- Task expiration and cleanup
- Statistics and monitoring
Falls back to local in-memory queue if Redis is unavailable.
"""
def __init__(self, config: Optional[TaskQueueConfig] = None):
"""
Initialize TaskQueueManager.
Args:
config: TaskQueueConfig instance, or None to use env vars
"""
self.config = config or TaskQueueConfig.from_env()
self._redis: Optional[AsyncRedis] = None
self._local_queue: Optional[LocalTaskQueue] = None
self._connected = False
@property
def is_connected(self) -> bool:
"""Check if connected to queue backend."""
return self._connected
@property
def is_local_mode(self) -> bool:
"""Check if using local fallback mode."""
return self._local_queue is not None
async def connect(self, use_local_fallback: bool = True) -> bool:
"""
Connect to Redis.
Args:
use_local_fallback: If True, use local queue when Redis unavailable
Returns:
True if connected successfully
"""
if not REDIS_AVAILABLE:
if use_local_fallback:
self._local_queue = LocalTaskQueue()
self._connected = True
logger.info("Connected to local task queue (redis package not installed)")
return True
else:
raise ImportError("redis package is required. Install with: pip install redis")
try:
logger.info(f"Connecting to Redis at {self.config.host}:{self.config.port}")
self._redis = aioredis.from_url(
self.config.url,
decode_responses=self.config.decode_responses,
)
# Test connection
await self._redis.ping()
self._connected = True
logger.info("Successfully connected to Redis")
return True
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
if use_local_fallback:
logger.info("Falling back to local task queue")
self._local_queue = LocalTaskQueue()
self._connected = True
return True
raise
async def disconnect(self):
"""Close connection to queue backend."""
if self._redis:
await self._redis.close()
self._redis = None
self._local_queue = None
self._connected = False
logger.info("Disconnected from task queue")
async def enqueue(
self,
task: Task,
priority: int = TaskPriority.NORMAL.value,
depends_on: Optional[List[str]] = None,
) -> str:
"""
Add task to the queue.
Args:
task: Task to enqueue
priority: Task priority (1-10, higher = more urgent)
depends_on: List of task IDs this task depends on
Returns:
Task ID
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
return self._local_queue.enqueue(task, priority, depends_on)
task.priority = priority
task.dependencies = depends_on or []
task_key = f"{self.config.task_prefix}:{task.id}"
# Store task data
await self._redis.hset(task_key, mapping={
"id": task.id,
"description": task.description,
"agent": task.agent,
"inputs": json.dumps(task.inputs),
"status": TaskStatus.PENDING.value,
"priority": priority,
"dependencies": json.dumps(depends_on or []),
"created_at": task.created_at,
"retry_count": task.retry_count,
"max_retries": task.max_retries,
"ttl_seconds": task.ttl_seconds,
"metadata": json.dumps(task.metadata),
})
# Set expiration on task
await self._redis.expire(task_key, task.ttl_seconds * 2)
if depends_on:
# Store dependencies
deps_key = f"{task_key}:dependencies"
await self._redis.sadd(deps_key, *depends_on)
# Register as dependent
for dep_id in depends_on:
dependents_key = f"{self.config.task_prefix}:{dep_id}:dependents"
await self._redis.sadd(dependents_key, task.id)
# Check if dependencies are satisfied
unsatisfied = []
for dep_id in depends_on:
dep_key = f"{self.config.task_prefix}:{dep_id}"
status = await self._redis.hget(dep_key, "status")
if status != TaskStatus.COMPLETED.value:
unsatisfied.append(dep_id)
if unsatisfied:
# Block task
await self._redis.hset(task_key, "status", TaskStatus.BLOCKED.value)
await self._redis.sadd(self.config.blocked_queue_key, task.id)
else:
# All dependencies satisfied
await self._redis.hset(task_key, "status", TaskStatus.READY.value)
await self._redis.zadd(
self.config.ready_queue_key,
{task.id: priority},
)
else:
# No dependencies - add directly to ready queue
await self._redis.hset(task_key, "status", TaskStatus.READY.value)
await self._redis.zadd(
self.config.ready_queue_key,
{task.id: priority},
)
logger.debug(f"Enqueued task {task.id} with priority {priority}")
return task.id
async def dequeue(self) -> Optional[Task]:
"""
Get the next highest priority task that is ready.
Returns:
Next task to execute, or None if queue is empty
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
return self._local_queue.dequeue()
# Get highest priority task (ZPOPMAX returns highest score)
result = await self._redis.zpopmax(self.config.ready_queue_key, count=1)
if not result:
return None
task_id, priority = result[0]
task_key = f"{self.config.task_prefix}:{task_id}"
task_data = await self._redis.hgetall(task_key)
if not task_data:
return None
task = Task(
id=task_data["id"],
description=task_data.get("description", ""),
agent=task_data.get("agent", ""),
inputs=json.loads(task_data.get("inputs", "{}")),
status=TaskStatus.IN_PROGRESS.value,
priority=int(task_data.get("priority", 5)),
dependencies=json.loads(task_data.get("dependencies", "[]")),
created_at=task_data.get("created_at", datetime.utcnow().isoformat()),
retry_count=int(task_data.get("retry_count", 0)),
max_retries=int(task_data.get("max_retries", 3)),
ttl_seconds=int(task_data.get("ttl_seconds", 3600)),
metadata=json.loads(task_data.get("metadata", "{}")),
)
# Update status to IN_PROGRESS
await self._redis.hset(task_key, mapping={
"status": TaskStatus.IN_PROGRESS.value,
"started_at": datetime.utcnow().isoformat(),
})
await self._redis.sadd(self.config.in_progress_key, task.id)
logger.debug(f"Dequeued task {task.id}")
return task
async def complete(
self,
task_id: str,
result: Optional[Dict[str, Any]] = None,
):
"""
Mark task as complete and unblock dependent tasks.
Args:
task_id: ID of completed task
result: Task execution result
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
self._local_queue.complete(task_id, result)
return
task_key = f"{self.config.task_prefix}:{task_id}"
# Update task status
await self._redis.hset(task_key, mapping={
"status": TaskStatus.COMPLETED.value,
"completed_at": datetime.utcnow().isoformat(),
"result": json.dumps(result or {}),
})
# Remove from in_progress
await self._redis.srem(self.config.in_progress_key, task_id)
# Set TTL for cleanup
await self._redis.expire(task_key, self.config.completed_ttl_seconds)
# Unblock dependent tasks
dependents_key = f"{task_key}:dependents"
dependents = await self._redis.smembers(dependents_key)
for dependent_id in dependents:
dependent_deps_key = f"{self.config.task_prefix}:{dependent_id}:dependencies"
# Remove this task from dependent's dependencies
await self._redis.srem(dependent_deps_key, task_id)
# Check if all dependencies satisfied
remaining = await self._redis.scard(dependent_deps_key)
if remaining == 0:
# Unblock the dependent task
await self._redis.srem(self.config.blocked_queue_key, dependent_id)
dependent_key = f"{self.config.task_prefix}:{dependent_id}"
priority = await self._redis.hget(dependent_key, "priority")
# Add to ready queue
await self._redis.hset(dependent_key, "status", TaskStatus.READY.value)
await self._redis.zadd(
self.config.ready_queue_key,
{dependent_id: int(priority or 5)},
)
logger.debug(f"Unblocked dependent task {dependent_id}")
logger.debug(f"Completed task {task_id}")
async def fail(
self,
task_id: str,
error: str,
retry: bool = True,
):
"""
Mark task as failed, optionally retry.
Args:
task_id: ID of failed task
error: Error message
retry: Whether to retry the task
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
self._local_queue.fail(task_id, error, retry)
return
task_key = f"{self.config.task_prefix}:{task_id}"
task_data = await self._redis.hgetall(task_key)
if not task_data:
return
retry_count = int(task_data.get("retry_count", 0))
max_retries = int(task_data.get("max_retries", 3))
priority = int(task_data.get("priority", 5))
# Remove from in_progress
await self._redis.srem(self.config.in_progress_key, task_id)
if retry and retry_count < max_retries:
# Retry with exponential backoff
retry_count += 1
delay = 2 ** retry_count
await self._redis.hset(task_key, mapping={
"status": TaskStatus.PENDING.value,
"retry_count": retry_count,
"error": error,
})
# Re-queue after delay (using ZADD with future timestamp)
# Note: In production, use a proper delayed queue
await asyncio.sleep(delay)
await self._redis.hset(task_key, "status", TaskStatus.READY.value)
await self._redis.zadd(
self.config.ready_queue_key,
{task_id: priority},
)
logger.debug(f"Retrying task {task_id} (attempt {retry_count})")
else:
# Permanent failure
await self._redis.hset(task_key, mapping={
"status": TaskStatus.FAILED.value,
"error": error,
"completed_at": datetime.utcnow().isoformat(),
})
# Set TTL for cleanup
await self._redis.expire(task_key, self.config.failed_ttl_seconds)
logger.warning(f"Task {task_id} failed permanently: {error}")
async def cancel(self, task_id: str):
"""
Cancel a pending or blocked task.
Args:
task_id: ID of task to cancel
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
task = self._local_queue.get_task(task_id)
if task:
task.status = TaskStatus.CANCELLED.value
return
task_key = f"{self.config.task_prefix}:{task_id}"
# Remove from queues
await self._redis.zrem(self.config.ready_queue_key, task_id)
await self._redis.srem(self.config.blocked_queue_key, task_id)
await self._redis.srem(self.config.in_progress_key, task_id)
# Update status
await self._redis.hset(task_key, mapping={
"status": TaskStatus.CANCELLED.value,
"completed_at": datetime.utcnow().isoformat(),
})
logger.debug(f"Cancelled task {task_id}")
async def get_task(self, task_id: str) -> Optional[Task]:
"""
Get task by ID.
Args:
task_id: Task ID
Returns:
Task if found, None otherwise
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
return self._local_queue.get_task(task_id)
task_key = f"{self.config.task_prefix}:{task_id}"
task_data = await self._redis.hgetall(task_key)
if not task_data:
return None
return Task(
id=task_data.get("id", task_id),
description=task_data.get("description", ""),
agent=task_data.get("agent", ""),
inputs=json.loads(task_data.get("inputs", "{}")),
status=task_data.get("status", TaskStatus.PENDING.value),
priority=int(task_data.get("priority", 5)),
dependencies=json.loads(task_data.get("dependencies", "[]")),
created_at=task_data.get("created_at", ""),
started_at=task_data.get("started_at"),
completed_at=task_data.get("completed_at"),
result=json.loads(task_data.get("result", "null")),
error=task_data.get("error"),
retry_count=int(task_data.get("retry_count", 0)),
max_retries=int(task_data.get("max_retries", 3)),
ttl_seconds=int(task_data.get("ttl_seconds", 3600)),
metadata=json.loads(task_data.get("metadata", "{}")),
)
async def get_stats(self) -> Dict[str, Any]:
"""
Get queue statistics.
Returns:
Dictionary with queue metrics
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
return self._local_queue.get_stats()
ready_count = await self._redis.zcard(self.config.ready_queue_key)
blocked_count = await self._redis.scard(self.config.blocked_queue_key)
in_progress_count = await self._redis.scard(self.config.in_progress_key)
return {
"ready": ready_count,
"blocked": blocked_count,
"in_progress": in_progress_count,
"total_queued": ready_count + blocked_count + in_progress_count,
}
async def detect_deadlocks(self) -> List[List[str]]:
"""
Detect circular dependencies (deadlocks).
Returns:
List of cycles, where each cycle is a list of task IDs
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
return self._local_queue.detect_deadlocks()
# Get all blocked tasks
blocked_tasks = await self._redis.smembers(self.config.blocked_queue_key)
# Build dependency graph
graph: Dict[str, List[str]] = {}
for task_id in blocked_tasks:
deps_key = f"{self.config.task_prefix}:{task_id}:dependencies"
deps = await self._redis.smembers(deps_key)
graph[task_id] = list(deps)
# Find cycles using DFS
cycles = []
visited: Set[str] = set()
rec_stack: Set[str] = set()
def dfs(node: str, path: List[str]) -> bool:
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
if dfs(neighbor, path):
return True
elif neighbor in rec_stack:
# Found cycle
cycle_start = path.index(neighbor)
cycles.append(path[cycle_start:] + [neighbor])
return True
path.pop()
rec_stack.remove(node)
return False
for node in graph:
if node not in visited:
dfs(node, [])
return cycles
async def cleanup_expired(self) -> int:
"""
Remove expired tasks from the queue.
Returns:
Number of tasks cleaned up
"""
if not self._connected:
raise RuntimeError("Not connected to task queue. Call connect() first.")
if self._local_queue:
# Local cleanup
count = 0
with self._local_queue._lock:
expired = [
tid for tid, task in self._local_queue._tasks.items()
if task.is_expired
]
for tid in expired:
del self._local_queue._tasks[tid]
count += 1
return count
# Redis handles expiration via TTL
# This method is for manual cleanup if needed
return 0
Convenience functions
async def create_task_queue(config: Optional[TaskQueueConfig] = None) -> TaskQueueManager: """Create and connect a TaskQueueManager instance.""" manager = TaskQueueManager(config) await manager.connect() return manager
def create_task( description: str, agent: str, inputs: Optional[Dict[str, Any]] = None, priority: int = TaskPriority.NORMAL.value, depends_on: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, ) -> Task: """Create a new Task instance.""" return Task( description=description, agent=agent, inputs=inputs or {}, priority=priority, dependencies=depends_on or [], metadata=metadata or {}, )
CLI for testing
if name == "main": import argparse import sys
parser = argparse.ArgumentParser(description="CODITECT Task Queue Manager CLI")
parser.add_argument(
"command",
choices=["enqueue", "dequeue", "status", "stats", "deadlocks"],
help="Command to run"
)
parser.add_argument("--task-id", help="Task ID for status command")
parser.add_argument("--description", default="Test task", help="Task description")
parser.add_argument("--agent", default="test-agent", help="Target agent")
parser.add_argument("--priority", type=int, default=5, help="Task priority (1-10)")
parser.add_argument("--depends-on", help="Comma-separated list of task IDs")
parser.add_argument("--local", action="store_true", help="Use local queue")
parser.add_argument("--host", default="localhost", help="Redis host")
parser.add_argument("--port", type=int, default=6379, help="Redis port")
args = parser.parse_args()
async def main():
config = TaskQueueConfig(host=args.host, port=args.port)
manager = TaskQueueManager(config)
try:
await manager.connect(use_local_fallback=args.local)
print(f"Connected to task queue (local mode: {manager.is_local_mode})")
if args.command == "enqueue":
depends_on = args.depends_on.split(",") if args.depends_on else None
task = create_task(
description=args.description,
agent=args.agent,
priority=args.priority,
depends_on=depends_on,
)
task_id = await manager.enqueue(task, args.priority, depends_on)
print(f"Enqueued task: {task_id}")
elif args.command == "dequeue":
task = await manager.dequeue()
if task:
print(f"Dequeued task: {task.id}")
print(f" Description: {task.description}")
print(f" Agent: {task.agent}")
print(f" Priority: {task.priority}")
else:
print("No tasks in ready queue")
elif args.command == "status":
if not args.task_id:
print("Error: --task-id required for status command")
sys.exit(1)
task = await manager.get_task(args.task_id)
if task:
print(f"Task {task.id}:")
print(f" Status: {task.status}")
print(f" Description: {task.description}")
print(f" Agent: {task.agent}")
print(f" Priority: {task.priority}")
print(f" Dependencies: {task.dependencies}")
print(f" Created: {task.created_at}")
if task.error:
print(f" Error: {task.error}")
else:
print(f"Task {args.task_id} not found")
elif args.command == "stats":
stats = await manager.get_stats()
print("Queue Statistics:")
for key, value in stats.items():
print(f" {key}: {value}")
elif args.command == "deadlocks":
cycles = await manager.detect_deadlocks()
if cycles:
print(f"Found {len(cycles)} deadlock(s):")
for i, cycle in enumerate(cycles, 1):
print(f" Cycle {i}: {' -> '.join(cycle)}")
else:
print("No deadlocks detected")
finally:
await manager.disconnect()
asyncio.run(main())