Skip to main content

CODITECT Parallel Task Execution Enhancement Plan

CODITECT Parallel Task Execution Enhancement Plan

Purpose: Native CODITECT enhancements for production-grade parallel agent execution with multi-LLM support.

Date: December 18, 2025 Status: Implementation Ready Builds On: Existing CODITECT skills and patterns


Executive Summary

This document outlines enhancements to existing CODITECT skills to enable:

  1. Complete multi-LLM provider support (extending multi-provider-llm-fallback)
  2. Task isolation via git worktrees (extending git-workflow-automation)
  3. Container-per-task execution (extending production-patterns)
  4. Dependency conflict resolution (new dependency-resolution skill)

All implementations follow CODITECT coding standards and extend existing production-ready infrastructure.


Current State Assessment

Existing Skill Capabilities

SkillCurrent StateGap
multi-provider-llm-fallbackComprehensive routing, failover, health monitoringProvider implementations stub-only
git-workflow-automationBranch management, conventional commitsNo worktree isolation for parallel tasks
production-patternsCircuit breaker, retry, observabilityNo container-per-task pattern
multi-agent-workflowFSM orchestration, token budgetNo task isolation mechanism

Enhancement Strategy

Principle: Extend existing skills rather than create parallel implementations.


Enhancement 1: Complete Multi-LLM Provider Support

Skill: multi-provider-llm-fallback Status: Extend existing implementation (lines 469-483 need completion)

Current Implementation (Verified Working)

# Already exists in SKILL.md:
- LLMProvider abstract base class
- MultiProviderRouter with health monitoring
- ProviderConfig dataclass
- Failover logic with rate limit awareness
- Cost optimization routing

Enhancement: Complete Provider Implementations

File: skills/multi-provider-llm-fallback/providers.py

"""
CODITECT Multi-Provider LLM Implementations

Extends the multi-provider-llm-fallback skill with complete
provider implementations for Anthropic, OpenAI, Google, and local models.

CODITECT Standard: Production-patterns compliant with circuit breaker integration.
"""

import os
import asyncio
from typing import Dict, Any, Optional, AsyncGenerator
from abc import ABC, abstractmethod
from dataclasses import dataclass
import httpx
from datetime import datetime

# Import CODITECT production patterns
from skills.production_patterns.core.circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from skills.adaptive_retry.core.retry import retry_with_backoff


@dataclass
class ProviderResponse:
"""Standardized response across all providers."""
content: str
model: str
provider: str
usage: Dict[str, int]
latency_ms: float
cached: bool = False


class BaseLLMProvider(ABC):
"""Abstract base for CODITECT LLM providers."""

def __init__(self, circuit_breaker: Optional[CircuitBreaker] = None):
self.circuit_breaker = circuit_breaker or CircuitBreaker(
CircuitBreakerConfig(failure_threshold=3, timeout_seconds=60)
)
self._client: Optional[httpx.AsyncClient] = None

@property
@abstractmethod
def name(self) -> str:
"""Provider name for logging and routing."""
pass

@abstractmethod
async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""Generate completion with provider."""
pass

@abstractmethod
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion tokens."""
pass

async def health_check(self) -> bool:
"""Verify provider is reachable."""
pass

async def __aenter__(self):
self._client = httpx.AsyncClient(timeout=120.0)
return self

async def __aexit__(self, *args):
if self._client:
await self._client.aclose()


class AnthropicProvider(BaseLLMProvider):
"""Anthropic Claude provider - CODITECT primary."""

def __init__(
self,
api_key: Optional[str] = None,
model: str = "claude-sonnet-4-20250514",
**kwargs
):
super().__init__(**kwargs)
self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
self.model = model
self.base_url = "https://api.anthropic.com/v1"

@property
def name(self) -> str:
return "anthropic"

async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""Anthropic Messages API completion."""
start_time = datetime.now()

async def _call():
response = await self._client.post(
f"{self.base_url}/messages",
headers={
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"
},
json={
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"messages": [{"role": "user", "content": prompt}]
}
)
response.raise_for_status()
return response.json()

result = await self.circuit_breaker.call(_call)
latency = (datetime.now() - start_time).total_seconds() * 1000

return ProviderResponse(
content=result["content"][0]["text"],
model=self.model,
provider=self.name,
usage={
"input_tokens": result["usage"]["input_tokens"],
"output_tokens": result["usage"]["output_tokens"]
},
latency_ms=latency
)

async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion tokens."""
async with self._client.stream(
"POST",
f"{self.base_url}/messages",
headers={
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"
},
json={
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"stream": True,
"messages": [{"role": "user", "content": prompt}]
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if data["type"] == "content_block_delta":
yield data["delta"]["text"]


class OpenAIProvider(BaseLLMProvider):
"""OpenAI GPT/Codex provider."""

def __init__(
self,
api_key: Optional[str] = None,
model: str = "gpt-4-turbo",
**kwargs
):
super().__init__(**kwargs)
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.model = model
self.base_url = "https://api.openai.com/v1"

@property
def name(self) -> str:
return "openai"

async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""OpenAI Chat Completions API."""
start_time = datetime.now()

async def _call():
response = await self._client.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"messages": [{"role": "user", "content": prompt}]
}
)
response.raise_for_status()
return response.json()

result = await self.circuit_breaker.call(_call)
latency = (datetime.now() - start_time).total_seconds() * 1000

return ProviderResponse(
content=result["choices"][0]["message"]["content"],
model=self.model,
provider=self.name,
usage={
"input_tokens": result["usage"]["prompt_tokens"],
"output_tokens": result["usage"]["completion_tokens"]
},
latency_ms=latency
)

async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion tokens."""
async with self._client.stream(
"POST",
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"stream": True,
"messages": [{"role": "user", "content": prompt}]
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
data = json.loads(line[6:])
if data["choices"][0]["delta"].get("content"):
yield data["choices"][0]["delta"]["content"]


class GoogleProvider(BaseLLMProvider):
"""Google Gemini provider."""

def __init__(
self,
api_key: Optional[str] = None,
model: str = "gemini-pro",
**kwargs
):
super().__init__(**kwargs)
self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
self.model = model
self.base_url = "https://generativelanguage.googleapis.com/v1beta"

@property
def name(self) -> str:
return "google"

async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""Google Gemini API completion."""
start_time = datetime.now()

async def _call():
response = await self._client.post(
f"{self.base_url}/models/{self.model}:generateContent",
params={"key": self.api_key},
json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"maxOutputTokens": kwargs.get("max_tokens", 4096)
}
}
)
response.raise_for_status()
return response.json()

result = await self.circuit_breaker.call(_call)
latency = (datetime.now() - start_time).total_seconds() * 1000

return ProviderResponse(
content=result["candidates"][0]["content"]["parts"][0]["text"],
model=self.model,
provider=self.name,
usage={
"input_tokens": result.get("usageMetadata", {}).get("promptTokenCount", 0),
"output_tokens": result.get("usageMetadata", {}).get("candidatesTokenCount", 0)
},
latency_ms=latency
)

async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion - Gemini uses SSE."""
async with self._client.stream(
"POST",
f"{self.base_url}/models/{self.model}:streamGenerateContent",
params={"key": self.api_key, "alt": "sse"},
json={"contents": [{"parts": [{"text": prompt}]}]}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if data.get("candidates"):
text = data["candidates"][0]["content"]["parts"][0].get("text", "")
if text:
yield text


class LocalProvider(BaseLLMProvider):
"""Local LLM provider (Ollama, LM Studio)."""

def __init__(
self,
base_url: str = "http://localhost:11434",
model: str = "codellama",
**kwargs
):
super().__init__(**kwargs)
self.base_url = base_url
self.model = model

@property
def name(self) -> str:
return "local"

async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""Ollama-compatible API completion."""
start_time = datetime.now()

async def _call():
response = await self._client.post(
f"{self.base_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False
}
)
response.raise_for_status()
return response.json()

result = await self.circuit_breaker.call(_call)
latency = (datetime.now() - start_time).total_seconds() * 1000

return ProviderResponse(
content=result["response"],
model=self.model,
provider=self.name,
usage={
"input_tokens": result.get("prompt_eval_count", 0),
"output_tokens": result.get("eval_count", 0)
},
latency_ms=latency
)

async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion tokens."""
async with self._client.stream(
"POST",
f"{self.base_url}/api/generate",
json={"model": self.model, "prompt": prompt, "stream": True}
) as response:
async for line in response.aiter_lines():
if line:
data = json.loads(line)
if data.get("response"):
yield data["response"]

Enhancement 2: Task Isolation via Git Worktrees

Skill: git-workflow-automation Enhancement: Add core/task_isolation.py for parallel agent execution

Design

Extends git-workflow-automation to support:

  • Isolated worktree per agent task
  • Branch management for parallel work
  • Safe merge/discard operations
  • Integration with multi-agent-workflow FSM

File: skills/git-workflow-automation/core/task_isolation.py

"""
CODITECT Task Isolation via Git Worktrees

Extends git-workflow-automation with parallel task execution support.
Each agent task gets an isolated worktree for conflict-free development.

CODITECT Standard: Integrates with multi-agent-workflow FSM.
"""

import subprocess
import shutil
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Dict, List
from datetime import datetime
from enum import Enum
import uuid
import json


class TaskStatus(Enum):
"""Task lifecycle states aligned with multi-agent-workflow FSM."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
MERGED = "merged"
DISCARDED = "discarded"


@dataclass
class IsolatedTask:
"""Represents an isolated task execution environment."""
task_id: str
branch_name: str
worktree_path: Path
base_branch: str
status: TaskStatus
created_at: datetime
metadata: Dict = field(default_factory=dict)

def to_dict(self) -> Dict:
"""Serialize for checkpoint persistence."""
return {
"task_id": self.task_id,
"branch_name": self.branch_name,
"worktree_path": str(self.worktree_path),
"base_branch": self.base_branch,
"status": self.status.value,
"created_at": self.created_at.isoformat(),
"metadata": self.metadata
}

@classmethod
def from_dict(cls, data: Dict) -> "IsolatedTask":
"""Deserialize from checkpoint."""
return cls(
task_id=data["task_id"],
branch_name=data["branch_name"],
worktree_path=Path(data["worktree_path"]),
base_branch=data["base_branch"],
status=TaskStatus(data["status"]),
created_at=datetime.fromisoformat(data["created_at"]),
metadata=data.get("metadata", {})
)


class TaskIsolationManager:
"""
Manages isolated git worktrees for parallel agent task execution.

Integrates with:
- git-workflow-automation: Uses conventional commits
- multi-agent-workflow: Reports task status to FSM
- production-patterns: Includes error recovery

CODITECT Standard: Follows checkpoint/resume pattern.
"""

def __init__(self, repo_root: Path):
self.repo_root = repo_root
self.worktrees_dir = repo_root / ".coditect" / "worktrees"
self.worktrees_dir.mkdir(parents=True, exist_ok=True)
self.tasks: Dict[str, IsolatedTask] = {}
self._load_state()

def _load_state(self):
"""Load persisted task state from checkpoint."""
state_file = self.worktrees_dir / "task_state.json"
if state_file.exists():
data = json.loads(state_file.read_text())
for task_data in data.get("tasks", []):
task = IsolatedTask.from_dict(task_data)
if task.worktree_path.exists():
self.tasks[task.task_id] = task

def _save_state(self):
"""Persist task state for checkpoint/resume."""
state_file = self.worktrees_dir / "task_state.json"
data = {
"tasks": [t.to_dict() for t in self.tasks.values()],
"updated_at": datetime.now().isoformat()
}
state_file.write_text(json.dumps(data, indent=2))

def create_task_environment(
self,
task_description: str,
base_branch: str = "main",
metadata: Optional[Dict] = None
) -> IsolatedTask:
"""
Create isolated worktree for a task.

Args:
task_description: Human-readable task name
base_branch: Branch to fork from
metadata: Additional task context for FSM

Returns:
IsolatedTask with worktree ready for use
"""
task_id = f"task-{uuid.uuid4().hex[:8]}"
# Sanitize description for branch name
safe_name = "".join(c if c.isalnum() or c == "-" else "-" for c in task_description.lower())[:30]
branch_name = f"coditect/{task_id}/{safe_name}"
worktree_path = self.worktrees_dir / task_id

# Create worktree with new branch
subprocess.run([
"git", "worktree", "add",
"-b", branch_name,
str(worktree_path),
base_branch
], cwd=self.repo_root, check=True, capture_output=True)

task = IsolatedTask(
task_id=task_id,
branch_name=branch_name,
worktree_path=worktree_path,
base_branch=base_branch,
status=TaskStatus.PENDING,
created_at=datetime.now(),
metadata=metadata or {"description": task_description}
)

self.tasks[task_id] = task
self._save_state()
return task

def get_task(self, task_id: str) -> Optional[IsolatedTask]:
"""Get task by ID."""
return self.tasks.get(task_id)

def list_active_tasks(self) -> List[IsolatedTask]:
"""List all active (non-terminal) tasks."""
return [
t for t in self.tasks.values()
if t.status in (TaskStatus.PENDING, TaskStatus.RUNNING)
]

def update_task_status(self, task_id: str, status: TaskStatus):
"""Update task status and persist."""
if task_id in self.tasks:
self.tasks[task_id].status = status
self._save_state()

def commit_task_changes(
self,
task_id: str,
message: str,
commit_type: str = "feat"
) -> str:
"""
Commit changes in task worktree using conventional commits.

Follows git-workflow-automation conventions.
"""
task = self.tasks[task_id]

# Stage all changes
subprocess.run(
["git", "add", "-A"],
cwd=task.worktree_path,
check=True
)

# Check if there are changes to commit
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=task.worktree_path,
capture_output=True,
text=True
)

if not result.stdout.strip():
return "" # No changes

# Conventional commit format from git-workflow-automation
full_message = f"""{commit_type}: {message}

Task: {task_id}
Description: {task.metadata.get('description', 'N/A')}

Co-Authored-By: CODITECT <noreply@coditect.ai>
"""

subprocess.run(
["git", "commit", "-m", full_message],
cwd=task.worktree_path,
check=True
)

# Get commit hash
result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=task.worktree_path,
capture_output=True,
text=True
)
return result.stdout.strip()

def merge_task(
self,
task_id: str,
target_branch: str = "main",
squash: bool = False
) -> bool:
"""
Merge task worktree changes back to target branch.

Args:
task_id: Task to merge
target_branch: Branch to merge into
squash: Whether to squash commits

Returns:
True if merge successful
"""
task = self.tasks[task_id]

# Checkout target in main repo
subprocess.run(
["git", "checkout", target_branch],
cwd=self.repo_root,
check=True
)

# Merge task branch
merge_args = ["git", "merge"]
if squash:
merge_args.append("--squash")
else:
merge_args.append("--no-ff")
merge_args.extend([task.branch_name, "-m", f"Merge {task_id}: {task.metadata.get('description', '')}"])

result = subprocess.run(
merge_args,
cwd=self.repo_root,
capture_output=True
)

if result.returncode == 0:
task.status = TaskStatus.MERGED
self._save_state()
return True
return False

def discard_task(self, task_id: str, delete_branch: bool = True):
"""
Discard task worktree and optionally delete branch.

Use when task failed or was cancelled.
"""
task = self.tasks[task_id]

# Remove worktree
subprocess.run(
["git", "worktree", "remove", str(task.worktree_path), "--force"],
cwd=self.repo_root,
capture_output=True
)

# Delete branch if requested
if delete_branch:
subprocess.run(
["git", "branch", "-D", task.branch_name],
cwd=self.repo_root,
capture_output=True
)

task.status = TaskStatus.DISCARDED
self._save_state()

def cleanup_completed(self):
"""Clean up all merged/discarded task worktrees."""
for task_id, task in list(self.tasks.items()):
if task.status in (TaskStatus.MERGED, TaskStatus.DISCARDED):
if task.worktree_path.exists():
self.discard_task(task_id, delete_branch=False)
del self.tasks[task_id]
self._save_state()

Enhancement 3: Container-per-Task Execution

Skill: production-patterns Enhancement: Add core/task_container.py for isolated container execution

Design

Extends production-patterns with:

  • Docker container per task
  • Resource limits and security
  • Integration with task isolation worktrees
  • Lifecycle management

File: skills/production-patterns/core/task_container.py

"""
CODITECT Container-per-Task Execution

Extends production-patterns with isolated Docker containers for agent tasks.
Each container gets its own environment preventing state leakage.

CODITECT Standard: Integrates with circuit breaker and observability.
"""

import docker
from docker.errors import DockerException
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, List, Tuple
from datetime import datetime
import logging
import json

from skills.production_patterns.core.circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from skills.production_patterns.core.observability import MetricsCollector, StructuredLogger


logger = StructuredLogger("task-container")
metrics = MetricsCollector()


@dataclass
class ContainerConfig:
"""Configuration for task containers."""
base_image: str = "coditect/agent-runtime:latest"
memory_limit: str = "4g"
cpu_limit: float = 2.0
working_dir: str = "/workspace"
user: str = "1000:1000" # Non-root for security
network_mode: str = "bridge"
environment: Dict[str, str] = None

def __post_init__(self):
if self.environment is None:
self.environment = {}


@dataclass
class TaskContainer:
"""Represents a running task container."""
task_id: str
container_id: str
status: str
worktree_path: Path
created_at: datetime
config: ContainerConfig

def to_dict(self) -> Dict:
return {
"task_id": self.task_id,
"container_id": self.container_id,
"status": self.status,
"worktree_path": str(self.worktree_path),
"created_at": self.created_at.isoformat()
}


class TaskContainerManager:
"""
Manages Docker containers for isolated task execution.

Integrates with:
- TaskIsolationManager: Mounts worktree into container
- production-patterns: Circuit breaker for Docker API
- Observability: Metrics and structured logging

CODITECT Standard: Production-ready with health monitoring.
"""

def __init__(self, default_config: Optional[ContainerConfig] = None):
self.client = docker.from_env()
self.default_config = default_config or ContainerConfig()
self.containers: Dict[str, TaskContainer] = {}
self.circuit_breaker = CircuitBreaker(
CircuitBreakerConfig(failure_threshold=3, timeout_seconds=30)
)

def create_container(
self,
task_id: str,
worktree_path: Path,
config: Optional[ContainerConfig] = None,
additional_mounts: Optional[Dict[str, str]] = None
) -> TaskContainer:
"""
Create isolated container for a task.

Args:
task_id: Unique task identifier
worktree_path: Git worktree to mount as workspace
config: Container configuration
additional_mounts: Extra volume mounts {host_path: container_path}

Returns:
TaskContainer with running container
"""
cfg = config or self.default_config

volumes = {
str(worktree_path.absolute()): {
"bind": cfg.working_dir,
"mode": "rw"
}
}

# Add any additional mounts
if additional_mounts:
for host_path, container_path in additional_mounts.items():
volumes[host_path] = {"bind": container_path, "mode": "rw"}

# Merge environment
environment = {
"CODITECT_TASK_ID": task_id,
"CODITECT_WORKSPACE": cfg.working_dir,
**cfg.environment
}

def _create():
container = self.client.containers.run(
cfg.base_image,
detach=True,
name=f"coditect-task-{task_id}",
volumes=volumes,
environment=environment,
working_dir=cfg.working_dir,
user=cfg.user,
mem_limit=cfg.memory_limit,
cpu_period=100000,
cpu_quota=int(cfg.cpu_limit * 100000),
network_mode=cfg.network_mode,
# Keep container running
command="tail -f /dev/null",
# Labels for management
labels={
"coditect.task_id": task_id,
"coditect.component": "task-container"
}
)
return container

try:
container = self.circuit_breaker.call(_create)
except Exception as e:
logger.error(f"Failed to create container for {task_id}", error=e)
metrics.increment("container_creation_failed", tags={"task_id": task_id})
raise

task_container = TaskContainer(
task_id=task_id,
container_id=container.id,
status="running",
worktree_path=worktree_path,
created_at=datetime.now(),
config=cfg
)

self.containers[task_id] = task_container
metrics.increment("containers_created", tags={"task_id": task_id})
logger.info(f"Created container for task {task_id}", container_id=container.id[:12])

return task_container

def execute_command(
self,
task_id: str,
command: str,
timeout: int = 300
) -> Tuple[int, str, str]:
"""
Execute command in task container.

Args:
task_id: Task container to execute in
command: Shell command to run
timeout: Execution timeout in seconds

Returns:
Tuple of (exit_code, stdout, stderr)
"""
task_container = self.containers.get(task_id)
if not task_container:
raise ValueError(f"No container for task {task_id}")

container = self.client.containers.get(task_container.container_id)

logger.info(f"Executing command in {task_id}", command=command[:100])

exec_result = container.exec_run(
cmd=["sh", "-c", command],
workdir=task_container.config.working_dir,
demux=True # Separate stdout/stderr
)

exit_code = exec_result.exit_code
stdout = exec_result.output[0].decode() if exec_result.output[0] else ""
stderr = exec_result.output[1].decode() if exec_result.output[1] else ""

metrics.histogram(
"container_command_duration",
len(stdout) + len(stderr), # Proxy for work done
tags={"task_id": task_id, "success": str(exit_code == 0)}
)

return exit_code, stdout, stderr

def get_container_status(self, task_id: str) -> Dict:
"""Get container status and resource usage."""
task_container = self.containers.get(task_id)
if not task_container:
return {"status": "not_found"}

try:
container = self.client.containers.get(task_container.container_id)
stats = container.stats(stream=False)

return {
"status": container.status,
"memory_usage": stats["memory_stats"].get("usage", 0),
"memory_limit": stats["memory_stats"].get("limit", 0),
"cpu_percent": self._calculate_cpu_percent(stats),
"created_at": task_container.created_at.isoformat()
}
except DockerException:
return {"status": "error"}

def _calculate_cpu_percent(self, stats: Dict) -> float:
"""Calculate CPU usage percentage from stats."""
cpu_delta = (
stats["cpu_stats"]["cpu_usage"]["total_usage"] -
stats["precpu_stats"]["cpu_usage"]["total_usage"]
)
system_delta = (
stats["cpu_stats"]["system_cpu_usage"] -
stats["precpu_stats"]["system_cpu_usage"]
)
if system_delta > 0:
return (cpu_delta / system_delta) * 100.0
return 0.0

def stop_container(self, task_id: str, timeout: int = 10):
"""Stop task container gracefully."""
task_container = self.containers.get(task_id)
if not task_container:
return

try:
container = self.client.containers.get(task_container.container_id)
container.stop(timeout=timeout)
task_container.status = "stopped"
logger.info(f"Stopped container for task {task_id}")
except DockerException as e:
logger.warning(f"Error stopping container {task_id}", error=e)

def remove_container(self, task_id: str, force: bool = False):
"""Remove task container."""
task_container = self.containers.get(task_id)
if not task_container:
return

try:
container = self.client.containers.get(task_container.container_id)
container.remove(force=force)
del self.containers[task_id]
metrics.increment("containers_removed", tags={"task_id": task_id})
logger.info(f"Removed container for task {task_id}")
except DockerException as e:
logger.warning(f"Error removing container {task_id}", error=e)

def cleanup_all(self):
"""Stop and remove all task containers."""
for task_id in list(self.containers.keys()):
self.stop_container(task_id)
self.remove_container(task_id, force=True)

def list_containers(self) -> List[TaskContainer]:
"""List all managed containers."""
return list(self.containers.values())

Enhancement 4: Dependency Resolution Skill (New)

Skill: dependency-resolution (new) Purpose: Automated dependency conflict detection and resolution

Design

New skill providing:

  • Queue-based dependency management
  • Conflict detection across package managers
  • Resolution strategies
  • Integration with /new-project command

File: skills/dependency-resolution/SKILL.md

See separate implementation document for complete skill specification following CODITECT SKILL.md template.


Integration: Parallel Agent Orchestrator

Combines all enhancements into a unified orchestration layer.

File: skills/multi-agent-workflow/core/parallel_orchestrator.py

"""
CODITECT Parallel Agent Orchestrator

Unifies task isolation, container execution, and multi-LLM providers
for production-grade parallel agent execution.

Integrates:
- TaskIsolationManager (git worktrees)
- TaskContainerManager (Docker containers)
- MultiProviderRouter (LLM fallback)
- FSM workflow from multi-agent-workflow

CODITECT Standard: Production-ready with checkpoint/resume.
"""

import asyncio
from pathlib import Path
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import json

from skills.git_workflow_automation.core.task_isolation import (
TaskIsolationManager, TaskStatus
)
from skills.production_patterns.core.task_container import (
TaskContainerManager, ContainerConfig
)
from skills.multi_provider_llm_fallback.providers import (
AnthropicProvider, OpenAIProvider, GoogleProvider
)


@dataclass
class AgentTask:
"""Definition of an agent task for parallel execution."""
description: str
prompt: str
dependencies: List[str] = None
preferred_provider: str = None
timeout: int = 300


@dataclass
class TaskResult:
"""Result of an agent task execution."""
task_id: str
description: str
status: str # success, failed, timeout
output: str
provider_used: str
execution_time: float
commit_hash: Optional[str] = None


class ParallelAgentOrchestrator:
"""
Orchestrates parallel agent task execution with full isolation.

Workflow:
1. Create isolated worktree for each task
2. Spawn container in worktree
3. Execute agent with LLM provider (with fallback)
4. Commit changes in worktree
5. Merge successful tasks
6. Clean up

CODITECT Standard: Checkpoint/resume via FSM state.
"""

def __init__(self, repo_root: Path):
self.repo_root = repo_root
self.isolation_manager = TaskIsolationManager(repo_root)
self.container_manager = TaskContainerManager()

# Initialize providers with failover
self.providers = [
AnthropicProvider(),
OpenAIProvider(),
GoogleProvider()
]

async def execute_parallel_tasks(
self,
tasks: List[AgentTask],
base_branch: str = "main",
auto_merge: bool = False
) -> List[TaskResult]:
"""
Execute multiple agent tasks in parallel with full isolation.

Args:
tasks: List of tasks to execute
base_branch: Git branch to fork from
auto_merge: Whether to automatically merge successful tasks

Returns:
List of TaskResult for each task
"""
# Phase 1: Create isolated environments
task_contexts = []
for task in tasks:
isolated_task = self.isolation_manager.create_task_environment(
task_description=task.description,
base_branch=base_branch,
metadata={"prompt": task.prompt}
)

container = self.container_manager.create_container(
task_id=isolated_task.task_id,
worktree_path=isolated_task.worktree_path
)

task_contexts.append({
"task": task,
"isolated_task": isolated_task,
"container": container
})

# Phase 2: Execute tasks in parallel
async def execute_single_task(ctx: Dict) -> TaskResult:
task = ctx["task"]
isolated_task = ctx["isolated_task"]
start_time = datetime.now()

self.isolation_manager.update_task_status(
isolated_task.task_id, TaskStatus.RUNNING
)

try:
# Select provider (preferred or first available)
provider = self._select_provider(task.preferred_provider)

async with provider:
response = await provider.complete(task.prompt)

# Apply changes in container
exit_code, stdout, stderr = self.container_manager.execute_command(
isolated_task.task_id,
f"echo '{response.content}' > agent_output.txt"
)

# Commit changes
commit_hash = self.isolation_manager.commit_task_changes(
isolated_task.task_id,
message=task.description,
commit_type="feat"
)

self.isolation_manager.update_task_status(
isolated_task.task_id, TaskStatus.COMPLETED
)

return TaskResult(
task_id=isolated_task.task_id,
description=task.description,
status="success",
output=response.content,
provider_used=provider.name,
execution_time=(datetime.now() - start_time).total_seconds(),
commit_hash=commit_hash
)

except Exception as e:
self.isolation_manager.update_task_status(
isolated_task.task_id, TaskStatus.FAILED
)
return TaskResult(
task_id=isolated_task.task_id,
description=task.description,
status="failed",
output=str(e),
provider_used="none",
execution_time=(datetime.now() - start_time).total_seconds()
)

results = await asyncio.gather(
*[execute_single_task(ctx) for ctx in task_contexts]
)

# Phase 3: Merge and cleanup
for ctx, result in zip(task_contexts, results):
if auto_merge and result.status == "success":
self.isolation_manager.merge_task(
ctx["isolated_task"].task_id,
target_branch=base_branch
)

# Clean up container
self.container_manager.remove_container(
ctx["isolated_task"].task_id, force=True
)

return list(results)

def _select_provider(self, preferred: Optional[str] = None):
"""Select provider by name or return first available."""
if preferred:
for provider in self.providers:
if provider.name == preferred:
return provider
return self.providers[0]

def get_task_status(self, task_id: str) -> Dict:
"""Get comprehensive status of a task."""
isolated_task = self.isolation_manager.get_task(task_id)
container_status = self.container_manager.get_container_status(task_id)

return {
"task_id": task_id,
"git_status": isolated_task.status.value if isolated_task else "not_found",
"container": container_status,
"worktree": str(isolated_task.worktree_path) if isolated_task else None
}

def cleanup_all(self):
"""Clean up all tasks and containers."""
self.container_manager.cleanup_all()
self.isolation_manager.cleanup_completed()

Implementation Roadmap

Phase 1: Provider Implementations (Day 1)

  • Create skills/multi-provider-llm-fallback/providers.py
  • Implement AnthropicProvider with streaming
  • Implement OpenAIProvider with streaming
  • Implement GoogleProvider with streaming
  • Unit tests for each provider

Phase 2: Task Isolation (Day 1-2)

  • Create skills/git-workflow-automation/core/task_isolation.py
  • Implement TaskIsolationManager
  • Integration with git-workflow-automation
  • Checkpoint/resume testing

Phase 3: Container Execution (Day 2)

  • Create skills/production-patterns/core/task_container.py
  • Implement TaskContainerManager
  • Integration with observability
  • Resource limit testing

Phase 4: Integration (Day 3)

  • Create skills/multi-agent-workflow/core/parallel_orchestrator.py
  • Integration tests across all components
  • Documentation updates
  • Update SKILL.md files

Testing Requirements

All implementations must pass:

  1. Unit tests (pytest)
  2. Integration tests with Docker
  3. Multi-provider failover tests
  4. Checkpoint/resume tests
  5. Concurrent execution tests

Document Control

VersionDateAuthorChanges
1.0.02025-12-18CODITECT EngineeringInitial enhancement plan

CODITECT Standard Compliance:

  • Follows SKILL.md template format
  • Integrates with existing production-patterns
  • Uses conventional commits via git-workflow-automation
  • Implements checkpoint/resume per multi-agent-workflow
  • Includes structured logging and metrics via observability hooks