Skip to main content

scripts-agent-executor

#!/usr/bin/env python3 """

title: "ANSI colors" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Agent Executor - Executes tasks via Claude Code agents." keywords: ['agent', 'database', 'executor', 'review'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "agent-executor.py" language: python executable: true usage: "python3 scripts/agent-executor.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Agent Executor - Executes tasks via Claude Code agents.

Features:

  • Executes tasks assigned by the dispatcher
  • Updates task status in database
  • Handles timeouts and retries
  • Logs execution results
  • Supports dry-run mode

Usage: python3 agent-executor.py --task T001.001 # Execute specific task python3 agent-executor.py --next # Execute next available task python3 agent-executor.py --batch 5 # Execute batch of 5 tasks python3 agent-executor.py --dry-run --task T001.001 # Preview without executing """

import argparse import json import os import re import shutil import sqlite3 import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional

ANSI colors

Shared Colors module (consolidates 36 duplicate definitions)

from colors import Colors

SCRIPT_DIR = Path(file).parent CORE_DIR = SCRIPT_DIR.parent PROJECT_ROOT = CORE_DIR.parent.parent.parent LOG_DIR = CORE_DIR / "logs" / "agent-executions" CONFIG_PATH = CORE_DIR / "config" / "orchestrator-config.json"

ADR-114 & ADR-118: Use centralized path discovery for database

sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_sessions_db_path, SESSIONS_DB DB_PATH = SESSIONS_DB # Tier 3: Agent execution logs are regenerable session data except ImportError: # Fallback for environments without paths.py _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "sessions.db" else: DB_PATH = CORE_DIR / "context-storage" / "sessions.db"

Ensure directories exist

LOG_DIR.mkdir(parents=True, exist_ok=True)

Default config

DEFAULT_CONFIG = { "timeout": 7200, # 2 hours in seconds "max_retries": 3, "retry_delay": 30, # seconds "max_concurrent": 5, "claude_binary": "claude", "default_model": "sonnet" }

class AgentExecutor: def init(self, config_path: Optional[Path] = None): self.config = self._load_config(config_path) self.db_path = DB_PATH

def _load_config(self, config_path: Optional[Path]) -> Dict:
"""Load configuration from file or use defaults."""
config = DEFAULT_CONFIG.copy()
path = config_path or CONFIG_PATH

if path.exists():
try:
with open(path) as f:
loaded = json.load(f)
config.update(loaded.get("executor", {}))
except Exception as e:
print(f"{Colors.YELLOW}Warning: Could not load config: {e}{Colors.RESET}")

return config

def _find_claude_binary(self) -> Optional[str]:
"""Find the Claude Code binary."""
# Check config first
if self.config.get("claude_binary"):
binary = self.config["claude_binary"]
if shutil.which(binary):
return binary

# Common locations
locations = [
"claude",
"/usr/local/bin/claude",
"/opt/homebrew/bin/claude",
os.path.expanduser("~/.claude/local/claude"),
]

for loc in locations:
if shutil.which(loc):
return loc

return None

def get_task(self, task_id: str) -> Optional[Dict]:
"""Get task details from database."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()

cursor.execute("""
SELECT
t.task_id,
t.description,
t.priority,
t.status,
t.estimated_hours,
f.feature_id,
f.name as feature_name,
e.epic_id,
e.name as epic_name
FROM v2_tasks t
JOIN v2_features f ON t.feature_id = f.feature_id
JOIN v2_epics e ON f.epic_id = e.epic_id
WHERE t.task_id = ?
""", (task_id,))

row = cursor.fetchone()
conn.close()

return dict(row) if row else None

def get_assignment(self, task_id: str) -> Optional[Dict]:
"""Get current assignment for a task."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()

cursor.execute("""
SELECT * FROM task_assignments
WHERE task_id = ?
ORDER BY assigned_at DESC
LIMIT 1
""", (task_id,))

row = cursor.fetchone()
conn.close()

return dict(row) if row else None

def update_status(self, task_id: str, status: str, result: Optional[str] = None):
"""Update task and assignment status."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

now = datetime.now(timezone.utc).isoformat()

# Update task
if status == "completed":
cursor.execute("""
UPDATE v2_tasks SET status = 'completed', completed_at = ?
WHERE task_id = ?
""", (now, task_id))
elif status == "in_progress":
cursor.execute("""
UPDATE v2_tasks SET status = 'in_progress' WHERE task_id = ?
""", (task_id,))
elif status in ("failed", "blocked"):
cursor.execute("""
UPDATE v2_tasks SET status = 'pending' WHERE task_id = ?
""", (task_id,))

# Update assignment
if status == "in_progress":
cursor.execute("""
UPDATE task_assignments SET status = 'in_progress', started_at = ?
WHERE task_id = ? AND status = 'assigned'
""", (now, task_id))
elif status == "completed":
cursor.execute("""
UPDATE task_assignments SET status = 'completed', completed_at = ?, result = ?
WHERE task_id = ? AND status = 'in_progress'
""", (now, result or "success", task_id))
elif status == "failed":
cursor.execute("""
UPDATE task_assignments SET status = 'failed', completed_at = ?, result = ?
WHERE task_id = ? AND status = 'in_progress'
""", (now, result or "unknown error", task_id))

conn.commit()
conn.close()

def log_execution(self, task_id: str, agent_type: str, output: str, success: bool, duration: float):
"""Log execution details to file."""
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
log_file = LOG_DIR / f"{timestamp}-{task_id.replace('.', '-')}.log"

log_content = f"""

Task Execution Log

Task ID: {task_id} Agent Type: {agent_type} Timestamp: {timestamp} Duration: {duration:.1f}s Success: {success}

Output:

{output} """

    with open(log_file, "w") as f:
f.write(log_content)

return log_file

def build_prompt(self, task: Dict, agent_type: str) -> str:
"""Build the prompt for Claude Code execution."""
prompt = f"""Execute the following task from the V2 project plan:

Task ID: {task['task_id']} Priority: {task['priority']} Epic: {task['epic_name']} Feature: {task['feature_name']}

Description: {task['description']}

Instructions:

  1. Analyze what needs to be done
  2. Implement the required changes
  3. Verify your implementation works
  4. Report completion status

Working Directory: {PROJECT_ROOT} Database: {DB_PATH}

Please proceed with implementation. When complete, summarize what was done."""

    return prompt

def execute_task(self, task_id: str, dry_run: bool = False) -> Dict:
"""Execute a task using Claude Code."""
# Get task details
task = self.get_task(task_id)
if not task:
return {"success": False, "error": f"Task {task_id} not found"}

# Get assignment
assignment = self.get_assignment(task_id)
if not assignment:
return {"success": False, "error": f"Task {task_id} not assigned"}

agent_type = assignment.get("agent_type", "general-purpose")

print(f"\n{Colors.BOLD}Executing Task: {task_id}{Colors.RESET}")
print(f" Agent: {Colors.CYAN}{agent_type}{Colors.RESET}")
print(f" Priority: {task['priority']}")
print(f" Description: {task['description'][:60]}...")

if dry_run:
print(f"\n{Colors.YELLOW}[DRY RUN] Would execute with prompt:{Colors.RESET}")
prompt = self.build_prompt(task, agent_type)
print(f"{Colors.DIM}{prompt[:500]}...{Colors.RESET}")
return {"success": True, "dry_run": True, "task_id": task_id}

# Find Claude binary
claude_bin = self._find_claude_binary()
if not claude_bin:
return {"success": False, "error": "Claude Code binary not found"}

# Update status to in_progress
self.update_status(task_id, "in_progress")

# Build command
prompt = self.build_prompt(task, agent_type)
cmd = [
claude_bin,
"--print",
"-p", prompt
]

# Execute
start_time = time.time()
try:
result = subprocess.run(
cmd,
cwd=str(PROJECT_ROOT),
capture_output=True,
text=True,
timeout=self.config["timeout"]
)

duration = time.time() - start_time
output = result.stdout + result.stderr
success = result.returncode == 0

# Log execution
log_file = self.log_execution(task_id, agent_type, output, success, duration)

# Update status
if success:
self.update_status(task_id, "completed", "Execution successful")
print(f"{Colors.GREEN}✓ Task completed in {duration:.1f}s{Colors.RESET}")
else:
self.update_status(task_id, "failed", f"Exit code: {result.returncode}")
print(f"{Colors.RED}✗ Task failed (exit code: {result.returncode}){Colors.RESET}")

return {
"success": success,
"task_id": task_id,
"duration": duration,
"log_file": str(log_file),
"output_preview": output[:500] if output else ""
}

except subprocess.TimeoutExpired:
duration = time.time() - start_time
self.update_status(task_id, "failed", f"Timeout after {duration:.1f}s")
print(f"{Colors.RED}✗ Task timed out after {duration:.1f}s{Colors.RESET}")
return {"success": False, "error": "Timeout", "task_id": task_id}

except Exception as e:
self.update_status(task_id, "failed", str(e))
print(f"{Colors.RED}✗ Task failed: {e}{Colors.RESET}")
return {"success": False, "error": str(e), "task_id": task_id}

def execute_next(self, dry_run: bool = False) -> Dict:
"""Get and execute the next available task."""
# Import dispatcher to get next task
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()

# Find next assigned task
cursor.execute("""
SELECT task_id FROM task_assignments
WHERE status = 'assigned'
ORDER BY assigned_at
LIMIT 1
""")

row = cursor.fetchone()
conn.close()

if not row:
print(f"{Colors.YELLOW}No assigned tasks waiting for execution{Colors.RESET}")
return {"success": False, "error": "No tasks available"}

return self.execute_task(row["task_id"], dry_run)

def execute_batch(self, count: int, dry_run: bool = False) -> List[Dict]:
"""Execute a batch of tasks."""
results = []

for i in range(count):
print(f"\n{Colors.BOLD}[{i+1}/{count}] Processing...{Colors.RESET}")
result = self.execute_next(dry_run)
results.append(result)

if not result.get("success") and result.get("error") == "No tasks available":
print(f"{Colors.YELLOW}No more tasks available{Colors.RESET}")
break

# Small delay between tasks
if not dry_run and i < count - 1:
time.sleep(2)

return results

def main(): parser = argparse.ArgumentParser(description="V2 Agent Executor") parser.add_argument("--task", "-t", type=str, help="Execute specific task ID") parser.add_argument("--next", "-n", action="store_true", help="Execute next available task") parser.add_argument("--batch", "-b", type=int, help="Execute batch of N tasks") parser.add_argument("--dry-run", "-d", action="store_true", help="Preview without executing") parser.add_argument("--config", "-c", type=str, help="Path to config file") parser.add_argument("--json", action="store_true", help="Output as JSON")

args = parser.parse_args()

config_path = Path(args.config) if args.config else None
executor = AgentExecutor(config_path)

if args.task:
result = executor.execute_task(args.task, args.dry_run)
if args.json:
print(json.dumps(result, indent=2))
elif not args.dry_run:
if result.get("success"):
print(f"\n{Colors.GREEN}Task {args.task} completed successfully{Colors.RESET}")
else:
print(f"\n{Colors.RED}Task {args.task} failed: {result.get('error')}{Colors.RESET}")

elif args.next:
result = executor.execute_next(args.dry_run)
if args.json:
print(json.dumps(result, indent=2))

elif args.batch:
results = executor.execute_batch(args.batch, args.dry_run)
if args.json:
print(json.dumps(results, indent=2))
else:
success = sum(1 for r in results if r.get("success"))
print(f"\n{Colors.BOLD}Batch Complete:{Colors.RESET}")
print(f" Successful: {Colors.GREEN}{success}{Colors.RESET}")
print(f" Failed: {Colors.RED}{len(results) - success}{Colors.RESET}")

else:
# Show help
parser.print_help()
print(f"\n{Colors.BOLD}Example usage:{Colors.RESET}")
print(" python3 agent-executor.py --next # Execute next task")
print(" python3 agent-executor.py --task T001.001 # Execute specific task")
print(" python3 agent-executor.py --batch 5 # Execute 5 tasks")

if name == "main": main()