scripts-autonomous-orchestrator
#!/usr/bin/env python3 """
title: "ANSI colors and cursor control" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Autonomous Orchestrator - Main control loop for multi-agent task execution." keywords: ['api', 'autonomous', 'backend', 'database', 'docker'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "autonomous-orchestrator.py" language: python executable: true usage: "python3 scripts/autonomous-orchestrator.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Autonomous Orchestrator - Main control loop for multi-agent task execution.
Features:
- Coordinates sync daemon, task dispatcher, and agent executor
- Manages concurrent agent execution
- Handles task dependencies
- Creates checkpoints at milestones
- Provides real-time progress dashboard
- Handles failures and retries
Usage: python3 autonomous-orchestrator.py # Start orchestrator python3 autonomous-orchestrator.py --max-agents 3 # Limit concurrent agents python3 autonomous-orchestrator.py --priority P0 # Only P0 tasks python3 autonomous-orchestrator.py --dry-run # Preview mode python3 autonomous-orchestrator.py --status # Show current status python3 autonomous-orchestrator.py --dashboard # Live dashboard """
import argparse import json import os import signal import sqlite3 import subprocess import sys import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Set
ANSI colors and cursor control
Shared Colors module (consolidates 36 duplicate definitions)
from colors import Colors
SCRIPT_DIR = Path(file).parent CORE_DIR = SCRIPT_DIR.parent PROJECT_ROOT = CORE_DIR.parent.parent.parent CONFIG_PATH = CORE_DIR / "config" / "orchestrator-config.json" STATE_FILE = CORE_DIR / "logs" / "orchestrator-state.json" CHECKPOINT_DIR = CORE_DIR / "checkpoints"
ADR-114 & ADR-118: Use centralized path discovery for database
sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_sessions_db_path, SESSIONS_DB DB_PATH = SESSIONS_DB # Tier 3: Orchestrator state is regenerable session data except ImportError: _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "sessions.db" else: DB_PATH = CORE_DIR / "context-storage" / "sessions.db"
Ensure directories exist
STATE_FILE.parent.mkdir(parents=True, exist_ok=True) CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
Default configuration
DEFAULT_CONFIG = { "max_concurrent_agents": 5, "poll_interval": 10, # seconds "checkpoint_interval": 10, # tasks "retry_limit": 3, "timeout_per_task": 7200, # 2 hours "sync_interval": 30, "priority_filter": None, # None means all priorities "auto_assign": True, "pause_on_failure_count": 5 }
class AutonomousOrchestrator: def init(self, config: Optional[Dict] = None): self.config = {**DEFAULT_CONFIG, **(config or {})} self.running = False self.paused = False self.active_tasks: Dict[str, Dict] = {} self.completed_count = 0 self.failed_count = 0 self.start_time: Optional[datetime] = None self.lock = threading.Lock()
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# Ensure database tables exist
self._ensure_tables()
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
self.log("Received shutdown signal, stopping orchestrator...")
self.running = False
def _ensure_tables(self):
"""Ensure required orchestrator tables exist."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Orchestrator state tracking
cursor.execute("""
CREATE TABLE IF NOT EXISTS orchestrator_state (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
stopped_at TEXT,
tasks_completed INTEGER DEFAULT 0,
tasks_failed INTEGER DEFAULT 0,
status TEXT DEFAULT 'running' CHECK(status IN ('running', 'stopped', 'paused', 'error'))
)
""")
# Checkpoint tracking
cursor.execute("""
CREATE TABLE IF NOT EXISTS orchestrator_checkpoints (
id INTEGER PRIMARY KEY AUTOINCREMENT,
checkpoint_name TEXT NOT NULL,
created_at TEXT NOT NULL,
tasks_completed INTEGER,
tasks_pending INTEGER,
notes TEXT
)
""")
conn.commit()
conn.close()
def log(self, message: str, level: str = "INFO"):
"""Log message with timestamp."""
timestamp = datetime.now().strftime("%H:%M:%S")
color = {
"INFO": Colors.BLUE,
"SUCCESS": Colors.GREEN,
"WARN": Colors.YELLOW,
"ERROR": Colors.RED,
"TASK": Colors.CYAN,
"CHECKPOINT": Colors.MAGENTA
}.get(level, Colors.RESET)
print(f"{Colors.DIM}{timestamp}{Colors.RESET} {color}[{level:^10}]{Colors.RESET} {message}")
def get_pending_tasks(self, limit: int = 10) -> List[Dict]:
"""Get pending tasks ordered by priority."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = """
SELECT
t.task_id,
t.description,
t.priority,
t.status,
t.estimated_hours,
f.feature_id,
f.name as feature_name,
e.epic_id,
e.name as epic_name
FROM v2_tasks t
JOIN v2_features f ON t.feature_id = f.feature_id
JOIN v2_epics e ON f.epic_id = e.epic_id
WHERE t.status = 'pending'
AND t.task_id NOT IN (
SELECT task_id FROM task_assignments
WHERE status IN ('assigned', 'in_progress')
)
"""
if self.config.get("priority_filter"):
query += f" AND t.priority = '{self.config['priority_filter']}'"
query += """
ORDER BY
CASE t.priority
WHEN 'P0' THEN 1
WHEN 'P1' THEN 2
WHEN 'P2' THEN 3
END,
t.task_id
LIMIT ?
"""
cursor.execute(query, (limit,))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def assign_task(self, task: Dict) -> bool:
"""Assign a task to an agent."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Match agent type based on task description
agent_type = self._match_agent_type(task['description'], task.get('epic_name', ''))
# Create assignment
cursor.execute("""
INSERT INTO task_assignments (task_id, agent_type, assigned_at, status)
VALUES (?, ?, ?, 'assigned')
""", (task['task_id'], agent_type, datetime.now(timezone.utc).isoformat()))
# Update task status
cursor.execute("""
UPDATE v2_tasks SET status = 'in_progress' WHERE task_id = ?
""", (task['task_id'],))
conn.commit()
conn.close()
return True
def _match_agent_type(self, description: str, epic_name: str) -> str:
"""Match task to agent type based on keywords."""
import re
combined = f"{description} {epic_name}".lower()
mappings = {
"devops-engineer": [r"deploy", r"docker", r"kubernetes", r"ci/?cd", r"pipeline", r"infrastructure"],
"security-specialist": [r"security", r"auth", r"compliance", r"vulnerability"],
"testing-specialist": [r"test", r"validation", r"coverage", r"qa"],
"codi-documentation-writer": [r"document", r"guide", r"readme", r"manual"],
"backend-development": [r"api", r"endpoint", r"backend", r"server"],
"frontend-development-agent": [r"ui", r"frontend", r"component", r"react"],
"database-architect": [r"database", r"schema", r"migration", r"sql"],
}
for agent, patterns in mappings.items():
if any(re.search(p, combined) for p in patterns):
return agent
return "general-purpose"
def execute_task(self, task_id: str) -> Dict:
"""Execute a single task."""
# Use agent-executor subprocess
cmd = [
sys.executable,
str(SCRIPT_DIR / "agent-executor.py"),
"--task", task_id,
"--json"
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.config["timeout_per_task"]
)
if result.returncode == 0:
return json.loads(result.stdout)
else:
return {"success": False, "error": result.stderr, "task_id": task_id}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Timeout", "task_id": task_id}
except Exception as e:
return {"success": False, "error": str(e), "task_id": task_id}
def create_checkpoint(self, name: str = None):
"""Create a checkpoint of current state."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Get current stats
cursor.execute("SELECT COUNT(*) FROM v2_tasks WHERE status = 'completed'")
completed = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM v2_tasks WHERE status = 'pending'")
pending = cursor.fetchone()[0]
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
checkpoint_name = name or f"checkpoint-{timestamp}"
# Record checkpoint
cursor.execute("""
INSERT INTO orchestrator_checkpoints (checkpoint_name, created_at, tasks_completed, tasks_pending)
VALUES (?, ?, ?, ?)
""", (checkpoint_name, datetime.now(timezone.utc).isoformat(), completed, pending))
conn.commit()
conn.close()
# Trigger sync
self._sync()
self.log(f"Checkpoint created: {checkpoint_name} ({completed} completed, {pending} pending)", "CHECKPOINT")
def _sync(self):
"""Trigger database-to-markdown sync."""
try:
subprocess.run(
[sys.executable, str(SCRIPT_DIR / "sync-project-plan.py"), "--db-to-plan"],
capture_output=True,
timeout=60
)
except Exception as e:
self.log(f"Sync error: {e}", "WARN")
def get_status(self) -> Dict:
"""Get current orchestrator status."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Task counts
cursor.execute("""
SELECT status, COUNT(*) FROM v2_tasks GROUP BY status
""")
task_counts = dict(cursor.fetchall())
# Active assignments
cursor.execute("""
SELECT agent_type, COUNT(*) FROM task_assignments
WHERE status IN ('assigned', 'in_progress')
GROUP BY agent_type
""")
active = dict(cursor.fetchall())
# Recent activity
cursor.execute("""
SELECT task_id, agent_type, status, completed_at
FROM task_assignments
WHERE completed_at IS NOT NULL
ORDER BY completed_at DESC
LIMIT 10
""")
recent = cursor.fetchall()
conn.close()
total = sum(task_counts.values())
completed = task_counts.get("completed", 0)
return {
"running": self.running,
"paused": self.paused,
"task_counts": task_counts,
"total_tasks": total,
"completed": completed,
"progress_pct": round(completed / total * 100, 1) if total > 0 else 0,
"active_agents": active,
"active_count": sum(active.values()),
"session_completed": self.completed_count,
"session_failed": self.failed_count,
"recent_activity": [
{"task": r[0], "agent": r[1], "status": r[2], "time": r[3]}
for r in recent
],
"uptime": str(datetime.now() - self.start_time) if self.start_time else "N/A"
}
def print_dashboard(self, clear: bool = False):
"""Print status dashboard."""
status = self.get_status()
if clear:
print("\033[2J\033[H", end="") # Clear screen
print(f"\n{Colors.BOLD}{'═' * 60}{Colors.RESET}")
print(f"{Colors.BOLD} AUTONOMOUS ORCHESTRATOR DASHBOARD{Colors.RESET}")
print(f"{Colors.BOLD}{'═' * 60}{Colors.RESET}")
# Status indicator
if status['running']:
state = f"{Colors.GREEN}● RUNNING{Colors.RESET}"
elif status['paused']:
state = f"{Colors.YELLOW}● PAUSED{Colors.RESET}"
else:
state = f"{Colors.RED}● STOPPED{Colors.RESET}"
print(f"\n Status: {state} Uptime: {status['uptime']}")
# Progress bar
pct = status['progress_pct']
bar_width = 40
filled = int(bar_width * pct / 100)
bar = "█" * filled + "░" * (bar_width - filled)
print(f"\n Progress: [{bar}] {pct}%")
print(f" Total: {status['total_tasks']} | Completed: {Colors.GREEN}{status['completed']}{Colors.RESET} | Pending: {status['task_counts'].get('pending', 0)}")
# Active agents
print(f"\n {Colors.BOLD}Active Agents ({status['active_count']}):{Colors.RESET}")
for agent, count in status['active_agents'].items():
print(f" {Colors.CYAN}{agent}{Colors.RESET}: {count}")
# Session stats
print(f"\n {Colors.BOLD}This Session:{Colors.RESET}")
print(f" Completed: {Colors.GREEN}{self.completed_count}{Colors.RESET}")
print(f" Failed: {Colors.RED}{self.failed_count}{Colors.RESET}")
# Recent activity
print(f"\n {Colors.BOLD}Recent Activity:{Colors.RESET}")
for item in status['recent_activity'][:5]:
color = Colors.GREEN if item['status'] == 'completed' else Colors.RED
print(f" {color}●{Colors.RESET} {item['task']} ({item['agent']}) - {item['status']}")
print(f"\n{Colors.BOLD}{'═' * 60}{Colors.RESET}")
print(f" Press Ctrl+C to stop")
print(f"{Colors.BOLD}{'═' * 60}{Colors.RESET}\n")
def run(self, dry_run: bool = False):
"""Main orchestration loop."""
self.running = True
self.start_time = datetime.now()
self.log(f"Starting autonomous orchestrator (max agents: {self.config['max_concurrent_agents']})")
if self.config.get('priority_filter'):
self.log(f"Priority filter: {self.config['priority_filter']}")
# Record start in database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO orchestrator_state (started_at, status)
VALUES (?, 'running')
""", (datetime.now(timezone.utc).isoformat(),))
conn.commit()
conn.close()
checkpoint_counter = 0
with ThreadPoolExecutor(max_workers=self.config['max_concurrent_agents']) as executor:
futures = {}
while self.running:
try:
if self.paused:
time.sleep(5)
continue
# Check for completed futures
done_futures = [f for f in futures if f.done()]
for future in done_futures:
task_id = futures.pop(future)
try:
result = future.result()
if result.get("success"):
self.completed_count += 1
self.log(f"Task {task_id} completed", "SUCCESS")
else:
self.failed_count += 1
self.log(f"Task {task_id} failed: {result.get('error', 'unknown')}", "ERROR")
except Exception as e:
self.failed_count += 1
self.log(f"Task {task_id} exception: {e}", "ERROR")
# Check if we should checkpoint
checkpoint_counter += 1
if checkpoint_counter >= self.config['checkpoint_interval']:
self.create_checkpoint()
checkpoint_counter = 0
# Check failure threshold
if self.failed_count >= self.config['pause_on_failure_count']:
self.log(f"Pausing due to {self.failed_count} failures", "WARN")
self.paused = True
continue
# Fill available slots with new tasks
available_slots = self.config['max_concurrent_agents'] - len(futures)
if available_slots > 0:
pending_tasks = self.get_pending_tasks(limit=available_slots)
for task in pending_tasks:
if dry_run:
self.log(f"[DRY RUN] Would execute: {task['task_id']}", "TASK")
else:
self.assign_task(task)
future = executor.submit(self.execute_task, task['task_id'])
futures[future] = task['task_id']
self.log(f"Started: {task['task_id']} ({task['priority']})", "TASK")
# No more pending tasks and no active tasks - we're done
if len(futures) == 0 and len(pending_tasks) == 0:
self.log("All tasks completed!", "SUCCESS")
break
time.sleep(self.config['poll_interval'])
except Exception as e:
self.log(f"Orchestrator error: {e}", "ERROR")
time.sleep(self.config['poll_interval'])
# Final checkpoint and cleanup
self.create_checkpoint("final")
# Record stop in database
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
UPDATE orchestrator_state
SET stopped_at = ?, tasks_completed = ?, tasks_failed = ?, status = 'stopped'
WHERE id = (SELECT MAX(id) FROM orchestrator_state)
""", (datetime.now(timezone.utc).isoformat(), self.completed_count, self.failed_count))
conn.commit()
conn.close()
self.log(f"Orchestrator stopped. Completed: {self.completed_count}, Failed: {self.failed_count}")
def main(): parser = argparse.ArgumentParser(description="V2 Autonomous Orchestrator") parser.add_argument("--max-agents", "-m", type=int, default=5, help="Maximum concurrent agents") parser.add_argument("--priority", "-p", choices=["P0", "P1", "P2"], help="Filter by priority") parser.add_argument("--dry-run", "-d", action="store_true", help="Preview mode without execution") parser.add_argument("--status", "-s", action="store_true", help="Show current status") parser.add_argument("--dashboard", action="store_true", help="Show live dashboard") parser.add_argument("--checkpoint", type=str, help="Create named checkpoint")
args = parser.parse_args()
config = {
"max_concurrent_agents": args.max_agents,
"priority_filter": args.priority
}
orchestrator = AutonomousOrchestrator(config)
if args.status:
status = orchestrator.get_status()
print(json.dumps(status, indent=2, default=str))
return
if args.dashboard:
orchestrator.print_dashboard()
return
if args.checkpoint:
orchestrator.create_checkpoint(args.checkpoint)
return
# Run the orchestrator
print(f"\n{Colors.BOLD}Starting Autonomous Orchestrator{Colors.RESET}")
print(f" Max concurrent agents: {args.max_agents}")
print(f" Priority filter: {args.priority or 'All'}")
print(f" Mode: {'Dry Run' if args.dry_run else 'Live'}")
print()
orchestrator.run(dry_run=args.dry_run)
if name == "main": main()