scripts-task-dispatcher
#!/usr/bin/env python3 """
title: "ANSI colors" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Task Dispatcher - Intelligent task assignment to appropriate agents." keywords: ['api', 'backend', 'database', 'dispatcher', 'docker'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "task-dispatcher.py" language: python executable: true usage: "python3 scripts/task-dispatcher.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Task Dispatcher - Intelligent task assignment to appropriate agents.
Features:
- Queries database for pending tasks by priority
- Matches tasks to agent types via keyword patterns
- Manages task queue with dependency checking
- Prevents duplicate assignment
- Provides queue status and statistics
Usage: python3 task-dispatcher.py --next # Get next available task python3 task-dispatcher.py --next 5 # Get next 5 tasks python3 task-dispatcher.py --queue # Show full queue python3 task-dispatcher.py --assign T001.001 # Assign specific task python3 task-dispatcher.py --stats # Show dispatch statistics """
import argparse import json import re import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple
Configuration
SCRIPT_DIR = Path(file).parent CORE_DIR = SCRIPT_DIR.parent CONFIG_DIR = CORE_DIR / "config"
ADR-114 & ADR-118: Use centralized path discovery
sys.path.insert(0, str(SCRIPT_DIR / "core"))
Shared Colors module (consolidates 36 duplicate definitions)
from colors import Colors try: from paths import get_sessions_db_path, SESSIONS_DB DB_PATH = SESSIONS_DB # Tasks go to sessions.db (Tier 3) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "sessions.db" else: DB_PATH = CORE_DIR / "context-storage" / "sessions.db"
Agent type mappings - keywords to agent types
AGENT_MAPPINGS = { "devops-engineer": [ r"infrastructure", r"deploy", r"docker", r"kubernetes", r"k8s", r"ci/?cd", r"pipeline", r"container", r"helm", r"terraform", r"cloud\sbuild", r"gke", r"gcp", r"aws", r"azure" ], "security-specialist": [ r"security", r"auth", r"authentication", r"authorization", r"compliance", r"audit", r"vulnerability", r"penetration", r"encryption", r"ssl", r"tls", r"certificate", r"oauth", r"jwt" ], "testing-specialist": [ r"test", r"testing", r"validation", r"coverage", r"qa", r"quality", r"assert", r"mock", r"fixture", r"pytest", r"jest", r"unit\stest", r"integration\stest", r"e2e" ], "codi-documentation-writer": [ r"document", r"documentation", r"guide", r"readme", r"manual", r"tutorial", r"reference", r"api\sdoc", r"changelog", r"adr" ], "backend-development": [ r"api", r"endpoint", r"backend", r"server", r"rest", r"graphql", r"handler", r"controller", r"service\slayer", r"middleware", r"route", r"actix", r"rust\sbackend" ], "frontend-development-agent": [ r"ui", r"frontend", r"component", r"react", r"vue", r"svelte", r"css", r"style", r"layout", r"responsive", r"accessibility", r"a11y", r"wcag", r"user\sinterface" ], "database-architect": [ r"database", r"schema", r"migration", r"sql", r"postgres", r"mysql", r"sqlite", r"mongodb", r"redis", r"index", r"query\soptimization", r"table", r"foundationdb" ], "application-performance": [ r"performance", r"optimization", r"profil", r"benchmark", r"latency", r"throughput", r"cache", r"memory", r"cpu" ], "monitoring-specialist": [ r"monitor", r"observability", r"metric", r"logging", r"trace", r"prometheus", r"grafana", r"alert", r"dashboard", r"slo", r"sli" ], "ai-specialist": [ r"ai\b", r"ml\b", r"machine\slearning", r"model", r"llm", r"prompt", r"agent", r"neural", r"training", r"inference" ], "cloud-architect": [ r"architect", r"design", r"pattern", r"scalab", r"distributed", r"microservice", r"system\sdesign", r"high\s*availability" ], "general-purpose": [] # Default fallback }
class TaskDispatcher: def init(self): self.db_path = DB_PATH self._ensure_tables()
def _ensure_tables(self):
"""Ensure required tables exist."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Task assignment tracking
cursor.execute("""
CREATE TABLE IF NOT EXISTS task_assignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
agent_type TEXT NOT NULL,
assigned_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
status TEXT DEFAULT 'assigned' CHECK(status IN ('assigned', 'in_progress', 'completed', 'failed', 'cancelled')),
result TEXT,
FOREIGN KEY (task_id) REFERENCES v2_tasks(task_id)
)
""")
# Task dependencies
cursor.execute("""
CREATE TABLE IF NOT EXISTS task_dependencies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
depends_on TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES v2_tasks(task_id),
FOREIGN KEY (depends_on) REFERENCES v2_tasks(task_id),
UNIQUE(task_id, depends_on)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_task_assignments_task ON task_assignments(task_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_task_assignments_status ON task_assignments(status)")
conn.commit()
conn.close()
def match_agent_type(self, description: str, epic_name: str = "", feature_name: str = "") -> str:
"""Match task to appropriate agent type based on keywords."""
combined_text = f"{description} {epic_name} {feature_name}".lower()
scores = {}
for agent_type, patterns in AGENT_MAPPINGS.items():
if not patterns:
continue
score = sum(1 for p in patterns if re.search(p, combined_text, re.IGNORECASE))
if score > 0:
scores[agent_type] = score
if scores:
return max(scores, key=scores.get)
return "general-purpose"
def get_pending_tasks(self, limit: int = 10, priority: Optional[str] = None) -> List[Dict]:
"""Get pending tasks ordered by priority."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = """
SELECT
t.task_id,
t.description,
t.priority,
t.status,
t.estimated_hours,
f.feature_id,
f.name as feature_name,
e.epic_id,
e.name as epic_name
FROM v2_tasks t
JOIN v2_features f ON t.feature_id = f.feature_id
JOIN v2_epics e ON f.epic_id = e.epic_id
WHERE t.status = 'pending'
AND t.task_id NOT IN (
SELECT task_id FROM task_assignments
WHERE status IN ('assigned', 'in_progress')
)
"""
if priority:
query += f" AND t.priority = '{priority}'"
query += """
ORDER BY
CASE t.priority
WHEN 'P0' THEN 1
WHEN 'P1' THEN 2
WHEN 'P2' THEN 3
END,
t.task_id
LIMIT ?
"""
cursor.execute(query, (limit,))
rows = cursor.fetchall()
conn.close()
tasks = []
for row in rows:
task = dict(row)
task['agent_type'] = self.match_agent_type(
task['description'],
task.get('epic_name', ''),
task.get('feature_name', '')
)
tasks.append(task)
return tasks
def check_dependencies(self, task_id: str) -> Tuple[bool, List[str]]:
"""Check if task dependencies are satisfied."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT d.depends_on, t.status
FROM task_dependencies d
JOIN v2_tasks t ON d.depends_on = t.task_id
WHERE d.task_id = ?
""", (task_id,))
blocking = []
for dep_id, status in cursor.fetchall():
if status != 'completed':
blocking.append(dep_id)
conn.close()
return len(blocking) == 0, blocking
def assign_task(self, task_id: str, agent_type: Optional[str] = None) -> Dict:
"""Assign a task to an agent."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get task details
cursor.execute("""
SELECT t.*, f.name as feature_name, e.name as epic_name
FROM v2_tasks t
JOIN v2_features f ON t.feature_id = f.feature_id
JOIN v2_epics e ON f.epic_id = e.epic_id
WHERE t.task_id = ?
""", (task_id,))
row = cursor.fetchone()
if not row:
conn.close()
return {"error": f"Task {task_id} not found"}
task = dict(row)
# Check if already assigned
cursor.execute("""
SELECT * FROM task_assignments
WHERE task_id = ? AND status IN ('assigned', 'in_progress')
""", (task_id,))
if cursor.fetchone():
conn.close()
return {"error": f"Task {task_id} is already assigned"}
# Check dependencies
deps_ok, blocking = self.check_dependencies(task_id)
if not deps_ok:
conn.close()
return {"error": f"Task {task_id} blocked by: {', '.join(blocking)}"}
# Determine agent type
if not agent_type:
agent_type = self.match_agent_type(
task['description'],
task.get('epic_name', ''),
task.get('feature_name', '')
)
# Create assignment
cursor.execute("""
INSERT INTO task_assignments (task_id, agent_type, assigned_at, status)
VALUES (?, ?, ?, 'assigned')
""", (task_id, agent_type, datetime.now(timezone.utc).isoformat()))
# Update task status
cursor.execute("""
UPDATE v2_tasks SET status = 'in_progress' WHERE task_id = ?
""", (task_id,))
conn.commit()
conn.close()
return {
"task_id": task_id,
"agent_type": agent_type,
"description": task['description'],
"priority": task['priority'],
"feature": task.get('feature_name'),
"epic": task.get('epic_name'),
"status": "assigned"
}
def complete_task(self, task_id: str, result: str = "success") -> Dict:
"""Mark a task as completed."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Update assignment
cursor.execute("""
UPDATE task_assignments
SET status = 'completed', completed_at = ?, result = ?
WHERE task_id = ? AND status = 'in_progress'
""", (datetime.now(timezone.utc).isoformat(), result, task_id))
# Update task
cursor.execute("""
UPDATE v2_tasks
SET status = 'completed', completed_at = ?
WHERE task_id = ?
""", (datetime.now(timezone.utc).isoformat(), task_id))
conn.commit()
affected = cursor.rowcount
conn.close()
return {"task_id": task_id, "completed": affected > 0}
def fail_task(self, task_id: str, reason: str) -> Dict:
"""Mark a task as failed."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Update assignment
cursor.execute("""
UPDATE task_assignments
SET status = 'failed', completed_at = ?, result = ?
WHERE task_id = ? AND status = 'in_progress'
""", (datetime.now(timezone.utc).isoformat(), reason, task_id))
# Update task back to pending for retry
cursor.execute("""
UPDATE v2_tasks SET status = 'pending' WHERE task_id = ?
""", (task_id,))
conn.commit()
conn.close()
return {"task_id": task_id, "failed": True, "reason": reason}
def get_queue_status(self) -> Dict:
"""Get current queue status."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Task counts by status
cursor.execute("""
SELECT status, COUNT(*) FROM v2_tasks GROUP BY status
""")
task_status = dict(cursor.fetchall())
# Task counts by priority
cursor.execute("""
SELECT priority, COUNT(*) FROM v2_tasks WHERE status = 'pending' GROUP BY priority
""")
pending_by_priority = dict(cursor.fetchall())
# Active assignments
cursor.execute("""
SELECT agent_type, COUNT(*) FROM task_assignments
WHERE status IN ('assigned', 'in_progress')
GROUP BY agent_type
""")
active_by_agent = dict(cursor.fetchall())
# Recent completions
cursor.execute("""
SELECT task_id, agent_type, completed_at FROM task_assignments
WHERE status = 'completed'
ORDER BY completed_at DESC
LIMIT 5
""")
recent = cursor.fetchall()
conn.close()
return {
"task_status": task_status,
"pending_by_priority": pending_by_priority,
"active_by_agent": active_by_agent,
"recent_completions": recent,
"total_pending": sum(pending_by_priority.values()),
"total_active": sum(active_by_agent.values())
}
def get_stats(self) -> Dict:
"""Get dispatch statistics."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Overall stats
cursor.execute("SELECT COUNT(*) FROM task_assignments")
total_assignments = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM task_assignments WHERE status = 'completed'")
completed = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM task_assignments WHERE status = 'failed'")
failed = cursor.fetchone()[0]
# By agent type
cursor.execute("""
SELECT agent_type,
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
FROM task_assignments
GROUP BY agent_type
""")
by_agent = cursor.fetchall()
conn.close()
return {
"total_assignments": total_assignments,
"completed": completed,
"failed": failed,
"success_rate": round(completed / total_assignments * 100, 1) if total_assignments > 0 else 0,
"by_agent": [
{"agent": a, "total": t, "completed": c, "failed": f}
for a, t, c, f in by_agent
]
}
def print_task(task: Dict, index: int = None): """Pretty print a task.""" priority_colors = {"P0": Colors.RED, "P1": Colors.YELLOW, "P2": Colors.GREEN} priority = task.get('priority', 'P2') color = priority_colors.get(priority, Colors.RESET)
prefix = f"{index}. " if index is not None else ""
print(f"\n{prefix}{Colors.BOLD}{task['task_id']}{Colors.RESET} {color}[{priority}]{Colors.RESET}")
print(f" {task['description'][:80]}...")
print(f" {Colors.CYAN}Agent: {task.get('agent_type', 'unassigned')}{Colors.RESET}")
print(f" {Colors.DIM}Epic: {task.get('epic_name', 'N/A')} > {task.get('feature_name', 'N/A')}{Colors.RESET}")
def main(): parser = argparse.ArgumentParser(description="V2 Task Dispatcher") parser.add_argument("--next", "-n", type=int, nargs="?", const=1, help="Get next N available tasks") parser.add_argument("--priority", "-p", choices=["P0", "P1", "P2"], help="Filter by priority") parser.add_argument("--queue", "-q", action="store_true", help="Show full queue status") parser.add_argument("--assign", "-a", type=str, help="Assign specific task ID") parser.add_argument("--agent", type=str, help="Specify agent type for assignment") parser.add_argument("--complete", "-c", type=str, help="Mark task as completed") parser.add_argument("--fail", "-f", type=str, help="Mark task as failed") parser.add_argument("--reason", type=str, default="", help="Failure reason") parser.add_argument("--stats", "-s", action="store_true", help="Show dispatch statistics") parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
dispatcher = TaskDispatcher()
if args.next:
tasks = dispatcher.get_pending_tasks(limit=args.next, priority=args.priority)
if args.json:
print(json.dumps(tasks, indent=2))
else:
print(f"\n{Colors.BOLD}Next {len(tasks)} Available Tasks:{Colors.RESET}")
for i, task in enumerate(tasks, 1):
print_task(task, i)
elif args.queue:
status = dispatcher.get_queue_status()
if args.json:
print(json.dumps(status, indent=2))
else:
print(f"\n{Colors.BOLD}Queue Status{Colors.RESET}")
print("=" * 40)
print(f"Total Pending: {status['total_pending']}")
print(f"Active: {status['total_active']}")
print(f"\n{Colors.BOLD}Pending by Priority:{Colors.RESET}")
for p, count in sorted(status['pending_by_priority'].items()):
color = {"P0": Colors.RED, "P1": Colors.YELLOW, "P2": Colors.GREEN}.get(p, Colors.RESET)
print(f" {color}{p}: {count}{Colors.RESET}")
print(f"\n{Colors.BOLD}Active by Agent:{Colors.RESET}")
for agent, count in status['active_by_agent'].items():
print(f" {Colors.CYAN}{agent}: {count}{Colors.RESET}")
elif args.assign:
result = dispatcher.assign_task(args.assign, args.agent)
if args.json:
print(json.dumps(result, indent=2))
else:
if "error" in result:
print(f"{Colors.RED}Error: {result['error']}{Colors.RESET}")
else:
print(f"{Colors.GREEN}Assigned {result['task_id']} to {result['agent_type']}{Colors.RESET}")
elif args.complete:
result = dispatcher.complete_task(args.complete)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"{Colors.GREEN}Completed: {result['task_id']}{Colors.RESET}")
elif args.fail:
result = dispatcher.fail_task(args.fail, args.reason)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"{Colors.RED}Failed: {result['task_id']} - {result['reason']}{Colors.RESET}")
elif args.stats:
stats = dispatcher.get_stats()
if args.json:
print(json.dumps(stats, indent=2))
else:
print(f"\n{Colors.BOLD}Dispatch Statistics{Colors.RESET}")
print("=" * 40)
print(f"Total Assignments: {stats['total_assignments']}")
print(f"Completed: {Colors.GREEN}{stats['completed']}{Colors.RESET}")
print(f"Failed: {Colors.RED}{stats['failed']}{Colors.RESET}")
print(f"Success Rate: {stats['success_rate']}%")
print(f"\n{Colors.BOLD}By Agent:{Colors.RESET}")
for agent_stat in stats['by_agent']:
print(f" {Colors.CYAN}{agent_stat['agent']}{Colors.RESET}: {agent_stat['completed']}/{agent_stat['total']}")
else:
# Default: show queue summary
tasks = dispatcher.get_pending_tasks(limit=5)
status = dispatcher.get_queue_status()
print(f"\n{Colors.BOLD}V2 Task Dispatcher{Colors.RESET}")
print("=" * 40)
print(f"Pending: {status['total_pending']} | Active: {status['total_active']}")
print(f"\n{Colors.BOLD}Next 5 Tasks:{Colors.RESET}")
for i, task in enumerate(tasks, 1):
print_task(task, i)
if name == "main": main()