CODITECT Parallel Task Execution Enhancement Plan
Purpose: Native CODITECT enhancements for production-grade parallel agent execution with multi-LLM support.
Date: December 18, 2025 Status: Implementation Ready Builds On: Existing CODITECT skills and patterns
Executive Summary
This document outlines enhancements to existing CODITECT skills to enable:
- Complete multi-LLM provider support (extending
multi-provider-llm-fallback) - Task isolation via git worktrees (extending
git-workflow-automation) - Container-per-task execution (extending
production-patterns) - Dependency conflict resolution (new
dependency-resolutionskill)
All implementations follow CODITECT coding standards and extend existing production-ready infrastructure.
Current State Assessment
Existing Skill Capabilities
| Skill | Current State | Gap |
|---|---|---|
multi-provider-llm-fallback | Comprehensive routing, failover, health monitoring | Provider implementations stub-only |
git-workflow-automation | Branch management, conventional commits | No worktree isolation for parallel tasks |
production-patterns | Circuit breaker, retry, observability | No container-per-task pattern |
multi-agent-workflow | FSM orchestration, token budget | No task isolation mechanism |
Enhancement Strategy
Principle: Extend existing skills rather than create parallel implementations.
Enhancement 1: Complete Multi-LLM Provider Support
Skill: multi-provider-llm-fallback
Status: Extend existing implementation (lines 469-483 need completion)
Current Implementation (Verified Working)
# Already exists in SKILL.md:
- LLMProvider abstract base class
- MultiProviderRouter with health monitoring
- ProviderConfig dataclass
- Failover logic with rate limit awareness
- Cost optimization routing
Enhancement: Complete Provider Implementations
File: skills/multi-provider-llm-fallback/providers.py
"""
CODITECT Multi-Provider LLM Implementations
Extends the multi-provider-llm-fallback skill with complete
provider implementations for Anthropic, OpenAI, Google, and local models.
CODITECT Standard: Production-patterns compliant with circuit breaker integration.
"""
import os
import asyncio
from typing import Dict, Any, Optional, AsyncGenerator
from abc import ABC, abstractmethod
from dataclasses import dataclass
import httpx
from datetime import datetime
# Import CODITECT production patterns
from skills.production_patterns.core.circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from skills.adaptive_retry.core.retry import retry_with_backoff
@dataclass
class ProviderResponse:
"""Standardized response across all providers."""
content: str
model: str
provider: str
usage: Dict[str, int]
latency_ms: float
cached: bool = False
class BaseLLMProvider(ABC):
"""Abstract base for CODITECT LLM providers."""
def __init__(self, circuit_breaker: Optional[CircuitBreaker] = None):
self.circuit_breaker = circuit_breaker or CircuitBreaker(
CircuitBreakerConfig(failure_threshold=3, timeout_seconds=60)
)
self._client: Optional[httpx.AsyncClient] = None
@property
@abstractmethod
def name(self) -> str:
"""Provider name for logging and routing."""
pass
@abstractmethod
async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""Generate completion with provider."""
pass
@abstractmethod
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion tokens."""
pass
async def health_check(self) -> bool:
"""Verify provider is reachable."""
pass
async def __aenter__(self):
self._client = httpx.AsyncClient(timeout=120.0)
return self
async def __aexit__(self, *args):
if self._client:
await self._client.aclose()
class AnthropicProvider(BaseLLMProvider):
"""Anthropic Claude provider - CODITECT primary."""
def __init__(
self,
api_key: Optional[str] = None,
model: str = "claude-sonnet-4-20250514",
**kwargs
):
super().__init__(**kwargs)
self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
self.model = model
self.base_url = "https://api.anthropic.com/v1"
@property
def name(self) -> str:
return "anthropic"
async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""Anthropic Messages API completion."""
start_time = datetime.now()
async def _call():
response = await self._client.post(
f"{self.base_url}/messages",
headers={
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"
},
json={
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"messages": [{"role": "user", "content": prompt}]
}
)
response.raise_for_status()
return response.json()
result = await self.circuit_breaker.call(_call)
latency = (datetime.now() - start_time).total_seconds() * 1000
return ProviderResponse(
content=result["content"][0]["text"],
model=self.model,
provider=self.name,
usage={
"input_tokens": result["usage"]["input_tokens"],
"output_tokens": result["usage"]["output_tokens"]
},
latency_ms=latency
)
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion tokens."""
async with self._client.stream(
"POST",
f"{self.base_url}/messages",
headers={
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"
},
json={
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"stream": True,
"messages": [{"role": "user", "content": prompt}]
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if data["type"] == "content_block_delta":
yield data["delta"]["text"]
class OpenAIProvider(BaseLLMProvider):
"""OpenAI GPT/Codex provider."""
def __init__(
self,
api_key: Optional[str] = None,
model: str = "gpt-4-turbo",
**kwargs
):
super().__init__(**kwargs)
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.model = model
self.base_url = "https://api.openai.com/v1"
@property
def name(self) -> str:
return "openai"
async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""OpenAI Chat Completions API."""
start_time = datetime.now()
async def _call():
response = await self._client.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"messages": [{"role": "user", "content": prompt}]
}
)
response.raise_for_status()
return response.json()
result = await self.circuit_breaker.call(_call)
latency = (datetime.now() - start_time).total_seconds() * 1000
return ProviderResponse(
content=result["choices"][0]["message"]["content"],
model=self.model,
provider=self.name,
usage={
"input_tokens": result["usage"]["prompt_tokens"],
"output_tokens": result["usage"]["completion_tokens"]
},
latency_ms=latency
)
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion tokens."""
async with self._client.stream(
"POST",
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"max_tokens": kwargs.get("max_tokens", 4096),
"stream": True,
"messages": [{"role": "user", "content": prompt}]
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
data = json.loads(line[6:])
if data["choices"][0]["delta"].get("content"):
yield data["choices"][0]["delta"]["content"]
class GoogleProvider(BaseLLMProvider):
"""Google Gemini provider."""
def __init__(
self,
api_key: Optional[str] = None,
model: str = "gemini-pro",
**kwargs
):
super().__init__(**kwargs)
self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
self.model = model
self.base_url = "https://generativelanguage.googleapis.com/v1beta"
@property
def name(self) -> str:
return "google"
async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""Google Gemini API completion."""
start_time = datetime.now()
async def _call():
response = await self._client.post(
f"{self.base_url}/models/{self.model}:generateContent",
params={"key": self.api_key},
json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"maxOutputTokens": kwargs.get("max_tokens", 4096)
}
}
)
response.raise_for_status()
return response.json()
result = await self.circuit_breaker.call(_call)
latency = (datetime.now() - start_time).total_seconds() * 1000
return ProviderResponse(
content=result["candidates"][0]["content"]["parts"][0]["text"],
model=self.model,
provider=self.name,
usage={
"input_tokens": result.get("usageMetadata", {}).get("promptTokenCount", 0),
"output_tokens": result.get("usageMetadata", {}).get("candidatesTokenCount", 0)
},
latency_ms=latency
)
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion - Gemini uses SSE."""
async with self._client.stream(
"POST",
f"{self.base_url}/models/{self.model}:streamGenerateContent",
params={"key": self.api_key, "alt": "sse"},
json={"contents": [{"parts": [{"text": prompt}]}]}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if data.get("candidates"):
text = data["candidates"][0]["content"]["parts"][0].get("text", "")
if text:
yield text
class LocalProvider(BaseLLMProvider):
"""Local LLM provider (Ollama, LM Studio)."""
def __init__(
self,
base_url: str = "http://localhost:11434",
model: str = "codellama",
**kwargs
):
super().__init__(**kwargs)
self.base_url = base_url
self.model = model
@property
def name(self) -> str:
return "local"
async def complete(self, prompt: str, **kwargs) -> ProviderResponse:
"""Ollama-compatible API completion."""
start_time = datetime.now()
async def _call():
response = await self._client.post(
f"{self.base_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False
}
)
response.raise_for_status()
return response.json()
result = await self.circuit_breaker.call(_call)
latency = (datetime.now() - start_time).total_seconds() * 1000
return ProviderResponse(
content=result["response"],
model=self.model,
provider=self.name,
usage={
"input_tokens": result.get("prompt_eval_count", 0),
"output_tokens": result.get("eval_count", 0)
},
latency_ms=latency
)
async def stream(self, prompt: str, **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion tokens."""
async with self._client.stream(
"POST",
f"{self.base_url}/api/generate",
json={"model": self.model, "prompt": prompt, "stream": True}
) as response:
async for line in response.aiter_lines():
if line:
data = json.loads(line)
if data.get("response"):
yield data["response"]
Enhancement 2: Task Isolation via Git Worktrees
Skill: git-workflow-automation
Enhancement: Add core/task_isolation.py for parallel agent execution
Design
Extends git-workflow-automation to support:
- Isolated worktree per agent task
- Branch management for parallel work
- Safe merge/discard operations
- Integration with multi-agent-workflow FSM
File: skills/git-workflow-automation/core/task_isolation.py
"""
CODITECT Task Isolation via Git Worktrees
Extends git-workflow-automation with parallel task execution support.
Each agent task gets an isolated worktree for conflict-free development.
CODITECT Standard: Integrates with multi-agent-workflow FSM.
"""
import subprocess
import shutil
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Dict, List
from datetime import datetime
from enum import Enum
import uuid
import json
class TaskStatus(Enum):
"""Task lifecycle states aligned with multi-agent-workflow FSM."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
MERGED = "merged"
DISCARDED = "discarded"
@dataclass
class IsolatedTask:
"""Represents an isolated task execution environment."""
task_id: str
branch_name: str
worktree_path: Path
base_branch: str
status: TaskStatus
created_at: datetime
metadata: Dict = field(default_factory=dict)
def to_dict(self) -> Dict:
"""Serialize for checkpoint persistence."""
return {
"task_id": self.task_id,
"branch_name": self.branch_name,
"worktree_path": str(self.worktree_path),
"base_branch": self.base_branch,
"status": self.status.value,
"created_at": self.created_at.isoformat(),
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict) -> "IsolatedTask":
"""Deserialize from checkpoint."""
return cls(
task_id=data["task_id"],
branch_name=data["branch_name"],
worktree_path=Path(data["worktree_path"]),
base_branch=data["base_branch"],
status=TaskStatus(data["status"]),
created_at=datetime.fromisoformat(data["created_at"]),
metadata=data.get("metadata", {})
)
class TaskIsolationManager:
"""
Manages isolated git worktrees for parallel agent task execution.
Integrates with:
- git-workflow-automation: Uses conventional commits
- multi-agent-workflow: Reports task status to FSM
- production-patterns: Includes error recovery
CODITECT Standard: Follows checkpoint/resume pattern.
"""
def __init__(self, repo_root: Path):
self.repo_root = repo_root
self.worktrees_dir = repo_root / ".coditect" / "worktrees"
self.worktrees_dir.mkdir(parents=True, exist_ok=True)
self.tasks: Dict[str, IsolatedTask] = {}
self._load_state()
def _load_state(self):
"""Load persisted task state from checkpoint."""
state_file = self.worktrees_dir / "task_state.json"
if state_file.exists():
data = json.loads(state_file.read_text())
for task_data in data.get("tasks", []):
task = IsolatedTask.from_dict(task_data)
if task.worktree_path.exists():
self.tasks[task.task_id] = task
def _save_state(self):
"""Persist task state for checkpoint/resume."""
state_file = self.worktrees_dir / "task_state.json"
data = {
"tasks": [t.to_dict() for t in self.tasks.values()],
"updated_at": datetime.now().isoformat()
}
state_file.write_text(json.dumps(data, indent=2))
def create_task_environment(
self,
task_description: str,
base_branch: str = "main",
metadata: Optional[Dict] = None
) -> IsolatedTask:
"""
Create isolated worktree for a task.
Args:
task_description: Human-readable task name
base_branch: Branch to fork from
metadata: Additional task context for FSM
Returns:
IsolatedTask with worktree ready for use
"""
task_id = f"task-{uuid.uuid4().hex[:8]}"
# Sanitize description for branch name
safe_name = "".join(c if c.isalnum() or c == "-" else "-" for c in task_description.lower())[:30]
branch_name = f"coditect/{task_id}/{safe_name}"
worktree_path = self.worktrees_dir / task_id
# Create worktree with new branch
subprocess.run([
"git", "worktree", "add",
"-b", branch_name,
str(worktree_path),
base_branch
], cwd=self.repo_root, check=True, capture_output=True)
task = IsolatedTask(
task_id=task_id,
branch_name=branch_name,
worktree_path=worktree_path,
base_branch=base_branch,
status=TaskStatus.PENDING,
created_at=datetime.now(),
metadata=metadata or {"description": task_description}
)
self.tasks[task_id] = task
self._save_state()
return task
def get_task(self, task_id: str) -> Optional[IsolatedTask]:
"""Get task by ID."""
return self.tasks.get(task_id)
def list_active_tasks(self) -> List[IsolatedTask]:
"""List all active (non-terminal) tasks."""
return [
t for t in self.tasks.values()
if t.status in (TaskStatus.PENDING, TaskStatus.RUNNING)
]
def update_task_status(self, task_id: str, status: TaskStatus):
"""Update task status and persist."""
if task_id in self.tasks:
self.tasks[task_id].status = status
self._save_state()
def commit_task_changes(
self,
task_id: str,
message: str,
commit_type: str = "feat"
) -> str:
"""
Commit changes in task worktree using conventional commits.
Follows git-workflow-automation conventions.
"""
task = self.tasks[task_id]
# Stage all changes
subprocess.run(
["git", "add", "-A"],
cwd=task.worktree_path,
check=True
)
# Check if there are changes to commit
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=task.worktree_path,
capture_output=True,
text=True
)
if not result.stdout.strip():
return "" # No changes
# Conventional commit format from git-workflow-automation
full_message = f"""{commit_type}: {message}
Task: {task_id}
Description: {task.metadata.get('description', 'N/A')}
Co-Authored-By: CODITECT <noreply@coditect.ai>
"""
subprocess.run(
["git", "commit", "-m", full_message],
cwd=task.worktree_path,
check=True
)
# Get commit hash
result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=task.worktree_path,
capture_output=True,
text=True
)
return result.stdout.strip()
def merge_task(
self,
task_id: str,
target_branch: str = "main",
squash: bool = False
) -> bool:
"""
Merge task worktree changes back to target branch.
Args:
task_id: Task to merge
target_branch: Branch to merge into
squash: Whether to squash commits
Returns:
True if merge successful
"""
task = self.tasks[task_id]
# Checkout target in main repo
subprocess.run(
["git", "checkout", target_branch],
cwd=self.repo_root,
check=True
)
# Merge task branch
merge_args = ["git", "merge"]
if squash:
merge_args.append("--squash")
else:
merge_args.append("--no-ff")
merge_args.extend([task.branch_name, "-m", f"Merge {task_id}: {task.metadata.get('description', '')}"])
result = subprocess.run(
merge_args,
cwd=self.repo_root,
capture_output=True
)
if result.returncode == 0:
task.status = TaskStatus.MERGED
self._save_state()
return True
return False
def discard_task(self, task_id: str, delete_branch: bool = True):
"""
Discard task worktree and optionally delete branch.
Use when task failed or was cancelled.
"""
task = self.tasks[task_id]
# Remove worktree
subprocess.run(
["git", "worktree", "remove", str(task.worktree_path), "--force"],
cwd=self.repo_root,
capture_output=True
)
# Delete branch if requested
if delete_branch:
subprocess.run(
["git", "branch", "-D", task.branch_name],
cwd=self.repo_root,
capture_output=True
)
task.status = TaskStatus.DISCARDED
self._save_state()
def cleanup_completed(self):
"""Clean up all merged/discarded task worktrees."""
for task_id, task in list(self.tasks.items()):
if task.status in (TaskStatus.MERGED, TaskStatus.DISCARDED):
if task.worktree_path.exists():
self.discard_task(task_id, delete_branch=False)
del self.tasks[task_id]
self._save_state()
Enhancement 3: Container-per-Task Execution
Skill: production-patterns
Enhancement: Add core/task_container.py for isolated container execution
Design
Extends production-patterns with:
- Docker container per task
- Resource limits and security
- Integration with task isolation worktrees
- Lifecycle management
File: skills/production-patterns/core/task_container.py
"""
CODITECT Container-per-Task Execution
Extends production-patterns with isolated Docker containers for agent tasks.
Each container gets its own environment preventing state leakage.
CODITECT Standard: Integrates with circuit breaker and observability.
"""
import docker
from docker.errors import DockerException
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, List, Tuple
from datetime import datetime
import logging
import json
from skills.production_patterns.core.circuit_breaker import CircuitBreaker, CircuitBreakerConfig
from skills.production_patterns.core.observability import MetricsCollector, StructuredLogger
logger = StructuredLogger("task-container")
metrics = MetricsCollector()
@dataclass
class ContainerConfig:
"""Configuration for task containers."""
base_image: str = "coditect/agent-runtime:latest"
memory_limit: str = "4g"
cpu_limit: float = 2.0
working_dir: str = "/workspace"
user: str = "1000:1000" # Non-root for security
network_mode: str = "bridge"
environment: Dict[str, str] = None
def __post_init__(self):
if self.environment is None:
self.environment = {}
@dataclass
class TaskContainer:
"""Represents a running task container."""
task_id: str
container_id: str
status: str
worktree_path: Path
created_at: datetime
config: ContainerConfig
def to_dict(self) -> Dict:
return {
"task_id": self.task_id,
"container_id": self.container_id,
"status": self.status,
"worktree_path": str(self.worktree_path),
"created_at": self.created_at.isoformat()
}
class TaskContainerManager:
"""
Manages Docker containers for isolated task execution.
Integrates with:
- TaskIsolationManager: Mounts worktree into container
- production-patterns: Circuit breaker for Docker API
- Observability: Metrics and structured logging
CODITECT Standard: Production-ready with health monitoring.
"""
def __init__(self, default_config: Optional[ContainerConfig] = None):
self.client = docker.from_env()
self.default_config = default_config or ContainerConfig()
self.containers: Dict[str, TaskContainer] = {}
self.circuit_breaker = CircuitBreaker(
CircuitBreakerConfig(failure_threshold=3, timeout_seconds=30)
)
def create_container(
self,
task_id: str,
worktree_path: Path,
config: Optional[ContainerConfig] = None,
additional_mounts: Optional[Dict[str, str]] = None
) -> TaskContainer:
"""
Create isolated container for a task.
Args:
task_id: Unique task identifier
worktree_path: Git worktree to mount as workspace
config: Container configuration
additional_mounts: Extra volume mounts {host_path: container_path}
Returns:
TaskContainer with running container
"""
cfg = config or self.default_config
volumes = {
str(worktree_path.absolute()): {
"bind": cfg.working_dir,
"mode": "rw"
}
}
# Add any additional mounts
if additional_mounts:
for host_path, container_path in additional_mounts.items():
volumes[host_path] = {"bind": container_path, "mode": "rw"}
# Merge environment
environment = {
"CODITECT_TASK_ID": task_id,
"CODITECT_WORKSPACE": cfg.working_dir,
**cfg.environment
}
def _create():
container = self.client.containers.run(
cfg.base_image,
detach=True,
name=f"coditect-task-{task_id}",
volumes=volumes,
environment=environment,
working_dir=cfg.working_dir,
user=cfg.user,
mem_limit=cfg.memory_limit,
cpu_period=100000,
cpu_quota=int(cfg.cpu_limit * 100000),
network_mode=cfg.network_mode,
# Keep container running
command="tail -f /dev/null",
# Labels for management
labels={
"coditect.task_id": task_id,
"coditect.component": "task-container"
}
)
return container
try:
container = self.circuit_breaker.call(_create)
except Exception as e:
logger.error(f"Failed to create container for {task_id}", error=e)
metrics.increment("container_creation_failed", tags={"task_id": task_id})
raise
task_container = TaskContainer(
task_id=task_id,
container_id=container.id,
status="running",
worktree_path=worktree_path,
created_at=datetime.now(),
config=cfg
)
self.containers[task_id] = task_container
metrics.increment("containers_created", tags={"task_id": task_id})
logger.info(f"Created container for task {task_id}", container_id=container.id[:12])
return task_container
def execute_command(
self,
task_id: str,
command: str,
timeout: int = 300
) -> Tuple[int, str, str]:
"""
Execute command in task container.
Args:
task_id: Task container to execute in
command: Shell command to run
timeout: Execution timeout in seconds
Returns:
Tuple of (exit_code, stdout, stderr)
"""
task_container = self.containers.get(task_id)
if not task_container:
raise ValueError(f"No container for task {task_id}")
container = self.client.containers.get(task_container.container_id)
logger.info(f"Executing command in {task_id}", command=command[:100])
exec_result = container.exec_run(
cmd=["sh", "-c", command],
workdir=task_container.config.working_dir,
demux=True # Separate stdout/stderr
)
exit_code = exec_result.exit_code
stdout = exec_result.output[0].decode() if exec_result.output[0] else ""
stderr = exec_result.output[1].decode() if exec_result.output[1] else ""
metrics.histogram(
"container_command_duration",
len(stdout) + len(stderr), # Proxy for work done
tags={"task_id": task_id, "success": str(exit_code == 0)}
)
return exit_code, stdout, stderr
def get_container_status(self, task_id: str) -> Dict:
"""Get container status and resource usage."""
task_container = self.containers.get(task_id)
if not task_container:
return {"status": "not_found"}
try:
container = self.client.containers.get(task_container.container_id)
stats = container.stats(stream=False)
return {
"status": container.status,
"memory_usage": stats["memory_stats"].get("usage", 0),
"memory_limit": stats["memory_stats"].get("limit", 0),
"cpu_percent": self._calculate_cpu_percent(stats),
"created_at": task_container.created_at.isoformat()
}
except DockerException:
return {"status": "error"}
def _calculate_cpu_percent(self, stats: Dict) -> float:
"""Calculate CPU usage percentage from stats."""
cpu_delta = (
stats["cpu_stats"]["cpu_usage"]["total_usage"] -
stats["precpu_stats"]["cpu_usage"]["total_usage"]
)
system_delta = (
stats["cpu_stats"]["system_cpu_usage"] -
stats["precpu_stats"]["system_cpu_usage"]
)
if system_delta > 0:
return (cpu_delta / system_delta) * 100.0
return 0.0
def stop_container(self, task_id: str, timeout: int = 10):
"""Stop task container gracefully."""
task_container = self.containers.get(task_id)
if not task_container:
return
try:
container = self.client.containers.get(task_container.container_id)
container.stop(timeout=timeout)
task_container.status = "stopped"
logger.info(f"Stopped container for task {task_id}")
except DockerException as e:
logger.warning(f"Error stopping container {task_id}", error=e)
def remove_container(self, task_id: str, force: bool = False):
"""Remove task container."""
task_container = self.containers.get(task_id)
if not task_container:
return
try:
container = self.client.containers.get(task_container.container_id)
container.remove(force=force)
del self.containers[task_id]
metrics.increment("containers_removed", tags={"task_id": task_id})
logger.info(f"Removed container for task {task_id}")
except DockerException as e:
logger.warning(f"Error removing container {task_id}", error=e)
def cleanup_all(self):
"""Stop and remove all task containers."""
for task_id in list(self.containers.keys()):
self.stop_container(task_id)
self.remove_container(task_id, force=True)
def list_containers(self) -> List[TaskContainer]:
"""List all managed containers."""
return list(self.containers.values())
Enhancement 4: Dependency Resolution Skill (New)
Skill: dependency-resolution (new)
Purpose: Automated dependency conflict detection and resolution
Design
New skill providing:
- Queue-based dependency management
- Conflict detection across package managers
- Resolution strategies
- Integration with
/new-projectcommand
File: skills/dependency-resolution/SKILL.md
See separate implementation document for complete skill specification following CODITECT SKILL.md template.
Integration: Parallel Agent Orchestrator
Combines all enhancements into a unified orchestration layer.
File: skills/multi-agent-workflow/core/parallel_orchestrator.py
"""
CODITECT Parallel Agent Orchestrator
Unifies task isolation, container execution, and multi-LLM providers
for production-grade parallel agent execution.
Integrates:
- TaskIsolationManager (git worktrees)
- TaskContainerManager (Docker containers)
- MultiProviderRouter (LLM fallback)
- FSM workflow from multi-agent-workflow
CODITECT Standard: Production-ready with checkpoint/resume.
"""
import asyncio
from pathlib import Path
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import json
from skills.git_workflow_automation.core.task_isolation import (
TaskIsolationManager, TaskStatus
)
from skills.production_patterns.core.task_container import (
TaskContainerManager, ContainerConfig
)
from skills.multi_provider_llm_fallback.providers import (
AnthropicProvider, OpenAIProvider, GoogleProvider
)
@dataclass
class AgentTask:
"""Definition of an agent task for parallel execution."""
description: str
prompt: str
dependencies: List[str] = None
preferred_provider: str = None
timeout: int = 300
@dataclass
class TaskResult:
"""Result of an agent task execution."""
task_id: str
description: str
status: str # success, failed, timeout
output: str
provider_used: str
execution_time: float
commit_hash: Optional[str] = None
class ParallelAgentOrchestrator:
"""
Orchestrates parallel agent task execution with full isolation.
Workflow:
1. Create isolated worktree for each task
2. Spawn container in worktree
3. Execute agent with LLM provider (with fallback)
4. Commit changes in worktree
5. Merge successful tasks
6. Clean up
CODITECT Standard: Checkpoint/resume via FSM state.
"""
def __init__(self, repo_root: Path):
self.repo_root = repo_root
self.isolation_manager = TaskIsolationManager(repo_root)
self.container_manager = TaskContainerManager()
# Initialize providers with failover
self.providers = [
AnthropicProvider(),
OpenAIProvider(),
GoogleProvider()
]
async def execute_parallel_tasks(
self,
tasks: List[AgentTask],
base_branch: str = "main",
auto_merge: bool = False
) -> List[TaskResult]:
"""
Execute multiple agent tasks in parallel with full isolation.
Args:
tasks: List of tasks to execute
base_branch: Git branch to fork from
auto_merge: Whether to automatically merge successful tasks
Returns:
List of TaskResult for each task
"""
# Phase 1: Create isolated environments
task_contexts = []
for task in tasks:
isolated_task = self.isolation_manager.create_task_environment(
task_description=task.description,
base_branch=base_branch,
metadata={"prompt": task.prompt}
)
container = self.container_manager.create_container(
task_id=isolated_task.task_id,
worktree_path=isolated_task.worktree_path
)
task_contexts.append({
"task": task,
"isolated_task": isolated_task,
"container": container
})
# Phase 2: Execute tasks in parallel
async def execute_single_task(ctx: Dict) -> TaskResult:
task = ctx["task"]
isolated_task = ctx["isolated_task"]
start_time = datetime.now()
self.isolation_manager.update_task_status(
isolated_task.task_id, TaskStatus.RUNNING
)
try:
# Select provider (preferred or first available)
provider = self._select_provider(task.preferred_provider)
async with provider:
response = await provider.complete(task.prompt)
# Apply changes in container
exit_code, stdout, stderr = self.container_manager.execute_command(
isolated_task.task_id,
f"echo '{response.content}' > agent_output.txt"
)
# Commit changes
commit_hash = self.isolation_manager.commit_task_changes(
isolated_task.task_id,
message=task.description,
commit_type="feat"
)
self.isolation_manager.update_task_status(
isolated_task.task_id, TaskStatus.COMPLETED
)
return TaskResult(
task_id=isolated_task.task_id,
description=task.description,
status="success",
output=response.content,
provider_used=provider.name,
execution_time=(datetime.now() - start_time).total_seconds(),
commit_hash=commit_hash
)
except Exception as e:
self.isolation_manager.update_task_status(
isolated_task.task_id, TaskStatus.FAILED
)
return TaskResult(
task_id=isolated_task.task_id,
description=task.description,
status="failed",
output=str(e),
provider_used="none",
execution_time=(datetime.now() - start_time).total_seconds()
)
results = await asyncio.gather(
*[execute_single_task(ctx) for ctx in task_contexts]
)
# Phase 3: Merge and cleanup
for ctx, result in zip(task_contexts, results):
if auto_merge and result.status == "success":
self.isolation_manager.merge_task(
ctx["isolated_task"].task_id,
target_branch=base_branch
)
# Clean up container
self.container_manager.remove_container(
ctx["isolated_task"].task_id, force=True
)
return list(results)
def _select_provider(self, preferred: Optional[str] = None):
"""Select provider by name or return first available."""
if preferred:
for provider in self.providers:
if provider.name == preferred:
return provider
return self.providers[0]
def get_task_status(self, task_id: str) -> Dict:
"""Get comprehensive status of a task."""
isolated_task = self.isolation_manager.get_task(task_id)
container_status = self.container_manager.get_container_status(task_id)
return {
"task_id": task_id,
"git_status": isolated_task.status.value if isolated_task else "not_found",
"container": container_status,
"worktree": str(isolated_task.worktree_path) if isolated_task else None
}
def cleanup_all(self):
"""Clean up all tasks and containers."""
self.container_manager.cleanup_all()
self.isolation_manager.cleanup_completed()
Implementation Roadmap
Phase 1: Provider Implementations (Day 1)
- Create
skills/multi-provider-llm-fallback/providers.py - Implement AnthropicProvider with streaming
- Implement OpenAIProvider with streaming
- Implement GoogleProvider with streaming
- Unit tests for each provider
Phase 2: Task Isolation (Day 1-2)
- Create
skills/git-workflow-automation/core/task_isolation.py - Implement TaskIsolationManager
- Integration with git-workflow-automation
- Checkpoint/resume testing
Phase 3: Container Execution (Day 2)
- Create
skills/production-patterns/core/task_container.py - Implement TaskContainerManager
- Integration with observability
- Resource limit testing
Phase 4: Integration (Day 3)
- Create
skills/multi-agent-workflow/core/parallel_orchestrator.py - Integration tests across all components
- Documentation updates
- Update SKILL.md files
Testing Requirements
All implementations must pass:
- Unit tests (pytest)
- Integration tests with Docker
- Multi-provider failover tests
- Checkpoint/resume tests
- Concurrent execution tests
Document Control
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0.0 | 2025-12-18 | CODITECT Engineering | Initial enhancement plan |
CODITECT Standard Compliance:
- Follows SKILL.md template format
- Integrates with existing production-patterns
- Uses conventional commits via git-workflow-automation
- Implements checkpoint/resume per multi-agent-workflow
- Includes structured logging and metrics via observability hooks