Skip to main content

Technical Design Document: CODITECT Task Orchestrator

1. Overview

1.1 Purpose

This document provides the technical specifications for the CODITECT Task Orchestrator, a system that decomposes large project plans into atomic, database-stored tasks with intelligent agent assignment, dependency resolution, and execution tracking.

1.2 Scope

In ScopeOut of Scope
Task decompositionAgent implementation
Database schemaFrontend UI
API designUser authentication (uses CODITECT auth)
Agent matching algorithmAgent execution runtime
Dependency resolutionExternal integrations (GitHub, Jira)
Execution tracking

1.3 References

Local ADRs:

External ADRs (coditect-core/internal/architecture/adrs/):

  • ADR-068: Large Project Plan Token Economics (motivation)
  • ADR-052: Intent-Aware Context Management
  • ADR-053: Cloud Context Sync Architecture
  • ADR-054: Track Nomenclature Extensibility

2. Data Models

2.1 Entity Relationship Diagram

┌──────────────────────────────────────────────────────────────────────────┐
│ DATA MODEL │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ atomic_tasks │ 1───* │task_dependencies│ │
│ │─────────────────│ │─────────────────│ │
│ │ id (PK) │◀────────│ source_task_id │ │
│ │ task_id (UK) │◀────────│ target_task_id │ │
│ │ parent_task_id │ │ dependency_type │ │
│ │ track │ └─────────────────┘ │
│ │ section │ │
│ │ task_number │ ┌─────────────────┐ │
│ │ subtask_number │ 1───* │task_executions │ │
│ │ title │ │─────────────────│ │
│ │ description │◀────────│ task_id (FK) │ │
│ │ status │ │ agent_name │ │
│ │ priority │ │ started_at │ │
│ │ assigned_agent │ │ completed_at │ │
│ │ title_embedding │ │ outcome │ │
│ └─────────────────┘ │ tokens_input │ │
│ │ │ tokens_output │ │
│ │ └─────────────────┘ │
│ │ │
│ │ *───1 ┌─────────────────────┐ │
│ └──────▶│agent_task_affinity │ │
│ │─────────────────────│ │
│ │ agent_name │ │
│ │ task_type │ │
│ │ domain │ │
│ │ success_rate │ │
│ │ affinity_score │ │
│ └─────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────┘

2.2 Core Entities

2.2.1 AtomicTask

FieldTypeConstraintsDescription
idUUIDPK, NOT NULLInternal identifier
task_idVARCHAR(20)UNIQUE, NOT NULLHierarchical ID (e.g., "A.9.1.1")
parent_task_idVARCHAR(20)FK → task_idParent task reference
trackCHAR(1)NOT NULL, IN (A-G)Project track
sectionINTEGERNOT NULLSection number
task_numberINTEGERNOT NULLTask number within section
subtask_numberINTEGERNULLABLESubtask number (atomic level)
titleVARCHAR(255)NOT NULLTask title
descriptionTEXTNULLABLEDetailed description
acceptance_criteriaTEXT[]NULLABLEArray of criteria
task_typeVARCHAR(50)NOT NULLClassification type
domainVARCHAR(50)NULLABLEDomain area
complexityVARCHAR(10)IN (trivial, simple, moderate, complex)Complexity rating
estimated_tokensINTEGERNULLABLEEstimated token usage
statusVARCHAR(20)DEFAULT 'pending'Current status
priorityINTEGER1-100, DEFAULT 50Priority score
recommended_agentVARCHAR(100)NULLABLESuggested agent
assigned_agentVARCHAR(100)NULLABLECurrently assigned agent
assigned_atTIMESTAMPNULLABLEAssignment timestamp
started_atTIMESTAMPNULLABLEExecution start time
completed_atTIMESTAMPNULLABLEExecution end time
execution_duration_msINTEGERNULLABLEDuration in milliseconds
outcomeVARCHAR(20)IN (success, partial, failure, timeout)Execution outcome
output_artifactsTEXT[]NULLABLEFiles created/modified
retry_countINTEGERDEFAULT 0Number of retries
token_usageINTEGERNULLABLEActual tokens used
source_fileVARCHAR(255)NULLABLEOrigin markdown file
source_line_startINTEGERNULLABLEStart line in source
source_line_endINTEGERNULLABLEEnd line in source
title_embeddingVECTOR(1536)NULLABLESemantic embedding
created_atTIMESTAMPDEFAULT NOW()Creation timestamp
updated_atTIMESTAMPDEFAULT NOW()Last update timestamp

2.2.2 TaskDependency

FieldTypeConstraintsDescription
idUUIDPK, NOT NULLInternal identifier
source_task_idVARCHAR(20)FK → atomic_tasks, NOT NULLDependency source
target_task_idVARCHAR(20)FK → atomic_tasks, NOT NULLDependency target
dependency_typeVARCHAR(20)NOT NULL, IN (blocks, informs, relates)Relationship type
created_atTIMESTAMPDEFAULT NOW()Creation timestamp

Dependency Types:

  • blocks: Source must complete before target can start (hard dependency)
  • informs: Source provides context for target (soft dependency)
  • relates: Tasks are related but independent (informational)

2.2.3 AgentTaskAffinity

FieldTypeConstraintsDescription
idUUIDPK, NOT NULLInternal identifier
agent_nameVARCHAR(100)NOT NULLAgent identifier
task_typeVARCHAR(50)NOT NULLTask classification
domainVARCHAR(50)NULLABLEDomain area
total_executionsINTEGERDEFAULT 0Total task executions
successful_executionsINTEGERDEFAULT 0Successful completions
success_rateDECIMAL(5,4)GENERATEDCalculated success rate
avg_duration_msINTEGERNULLABLEAverage execution time
avg_token_usageINTEGERNULLABLEAverage tokens used
affinity_scoreDECIMAL(5,4)DEFAULT 0.5Calculated affinity
last_updatedTIMESTAMPDEFAULT NOW()Last update timestamp

2.2.4 TaskExecution

FieldTypeConstraintsDescription
idUUIDPK, NOT NULLInternal identifier
task_idVARCHAR(20)FK → atomic_tasks, NOT NULLTask reference
agent_nameVARCHAR(100)NOT NULLExecuting agent
started_atTIMESTAMPNOT NULLExecution start
completed_atTIMESTAMPNULLABLEExecution end
duration_msINTEGERNULLABLEDuration in milliseconds
outcomeVARCHAR(20)IN (success, partial, failure, timeout, aborted)Result
session_idVARCHAR(100)NULLABLEClaude Code session
machine_idVARCHAR(100)NULLABLEMachine identifier
model_usedVARCHAR(50)NULLABLELLM model used
tokens_inputINTEGERNULLABLEInput tokens
tokens_outputINTEGERNULLABLEOutput tokens
retry_numberINTEGERDEFAULT 0Retry iteration
artifacts_createdTEXT[]NULLABLEFiles created
error_messageTEXTNULLABLEError details
execution_logJSONBNULLABLEDetailed log
created_atTIMESTAMPDEFAULT NOW()Record creation

3. API Design

3.1 Base URL

Production: https://api.coditect.ai/orchestrator/v1
Development: http://localhost:8080/api/v1

3.2 Authentication

All endpoints require CODITECT authentication via JWT token:

Authorization: Bearer <jwt_token>
X-Tenant-ID: <tenant_uuid>

3.3 Task Endpoints

GET /tasks/ready

Get tasks ready for execution (dependencies resolved, not assigned).

Request:

GET /tasks/ready?limit=10&track=A&domain=backend

Parameters:

ParameterTypeRequiredDescription
limitintegerNoMax results (default: 10)
trackstringNoFilter by track (A-G)
domainstringNoFilter by domain
task_typestringNoFilter by task type

Response:

{
"tasks": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"task_id": "A.10.1.2",
"title": "Implement container session API endpoint",
"track": "A",
"status": "ready",
"priority": 85,
"recommended_agent": "senior-architect",
"estimated_tokens": 5000,
"blocking_count": 0,
"dependent_count": 3
}
],
"total": 42,
"page": 1
}

POST /tasks/{task_id}/start

Mark a task as started and assign an agent.

Request:

POST /tasks/A.10.1.2/start
Content-Type: application/json

{
"agent_name": "senior-architect",
"session_id": "abc123",
"machine_id": "machine-uuid"
}

Response:

{
"task_id": "A.10.1.2",
"status": "in_progress",
"assigned_agent": "senior-architect",
"started_at": "2026-01-13T10:30:00Z",
"execution_id": "exec-uuid"
}

POST /tasks/{task_id}/complete

Mark a task as completed.

Request:

POST /tasks/A.10.1.2/complete
Content-Type: application/json

{
"execution_id": "exec-uuid",
"outcome": "success",
"artifacts": ["src/api/container_sessions.py", "tests/test_container_sessions.py"],
"tokens_used": 4200
}

Response:

{
"task_id": "A.10.1.2",
"status": "completed",
"outcome": "success",
"duration_ms": 180000,
"unblocked_tasks": ["A.10.1.3", "A.10.2.1"]
}

GET /tasks/search

Semantic search across tasks.

Request:

GET /tasks/search?q=authentication&limit=5

Response:

{
"tasks": [
{
"task_id": "D.3.2.1",
"title": "Implement JWT authentication middleware",
"similarity_score": 0.92,
"status": "completed"
}
]
}

3.4 Agent Endpoints

GET /agents/recommend/{task_id}

Get agent recommendations for a task.

Request:

GET /agents/recommend/A.10.1.2?top=3

Response:

{
"task_id": "A.10.1.2",
"recommendations": [
{
"agent_name": "senior-architect",
"affinity_score": 0.87,
"breakdown": {
"semantic_similarity": 0.92,
"historical_success": 0.85,
"domain_expertise": 0.90,
"availability": 0.70
}
},
{
"agent_name": "backend-specialist",
"affinity_score": 0.82
}
]
}

GET /agents/{agent_name}/affinity

Get agent performance metrics.

Request:

GET /agents/senior-architect/affinity

Response:

{
"agent_name": "senior-architect",
"affinities": [
{
"task_type": "implementation",
"domain": "backend",
"success_rate": 0.92,
"total_executions": 47,
"avg_duration_ms": 145000
},
{
"task_type": "architecture",
"domain": "system-design",
"success_rate": 0.88,
"total_executions": 23
}
]
}

3.5 Dependency Endpoints

GET /dependencies/graph/{task_id}

Get dependency graph for a task.

Request:

GET /dependencies/graph/A.10.1.2?depth=2

Response:

{
"task_id": "A.10.1.2",
"upstream": [
{
"task_id": "A.10.1.1",
"dependency_type": "blocks",
"status": "completed"
}
],
"downstream": [
{
"task_id": "A.10.1.3",
"dependency_type": "blocks",
"status": "pending"
},
{
"task_id": "B.7.1.5",
"dependency_type": "informs",
"status": "pending"
}
]
}

4. Agent Matching Algorithm

4.1 Affinity Score Calculation

def calculate_affinity_score(
agent: str,
task: AtomicTask,
weights: Optional[Dict[str, float]] = None
) -> float:
"""
Calculate agent-task affinity score.

Default weights:
- semantic_similarity: 0.4
- historical_success: 0.3
- domain_expertise: 0.2
- availability: 0.1
"""
if weights is None:
weights = {
"semantic": 0.4,
"historical": 0.3,
"domain": 0.2,
"availability": 0.1
}

# 1. Semantic Similarity (cosine distance)
agent_embedding = get_agent_capability_embedding(agent)
task_embedding = task.title_embedding
semantic_score = cosine_similarity(agent_embedding, task_embedding)

# 2. Historical Success Rate
affinity = get_agent_affinity(agent, task.task_type, task.domain)
if affinity and affinity.total_executions >= 5:
historical_score = affinity.success_rate
else:
historical_score = 0.5 # Default for insufficient data

# 3. Domain Expertise
domain_score = calculate_domain_expertise(agent, task.domain)

# 4. Availability (inverse of active task count)
active_tasks = get_active_task_count(agent)
availability_score = 1.0 / (1 + active_tasks * 0.2)

# Weighted combination
return (
semantic_score * weights["semantic"] +
historical_score * weights["historical"] +
domain_score * weights["domain"] +
availability_score * weights["availability"]
)

4.2 Domain Expertise Calculation

DOMAIN_AGENT_MAPPING = {
"backend": ["senior-architect", "backend-specialist", "database-architect"],
"frontend": ["frontend-react-typescript-expert", "ui-specialist"],
"devops": ["devops-engineer", "cloud-architect"],
"security": ["security-specialist"],
"testing": ["testing-specialist", "qa-engineer"],
"documentation": ["codi-documentation-writer"],
}

def calculate_domain_expertise(agent: str, domain: str) -> float:
"""Calculate domain expertise score."""
if not domain:
return 0.5

domain_agents = DOMAIN_AGENT_MAPPING.get(domain, [])

if agent in domain_agents:
# Position in list indicates expertise level
position = domain_agents.index(agent)
return 1.0 - (position * 0.15) # First agent = 1.0, second = 0.85, etc.

return 0.3 # Default for non-domain agents

4.3 Learning Feedback Loop

def update_affinity_after_execution(execution: TaskExecution) -> None:
"""
Update agent-task affinity based on execution results.
Uses exponential moving average for smooth learning.
"""
task = get_task(execution.task_id)
affinity = get_or_create_affinity(
execution.agent_name,
task.task_type,
task.domain
)

# Update execution counts
affinity.total_executions += 1
if execution.outcome == "success":
affinity.successful_executions += 1

# Update average duration (EMA with alpha=0.3)
if execution.duration_ms:
if affinity.avg_duration_ms:
affinity.avg_duration_ms = int(
0.3 * execution.duration_ms +
0.7 * affinity.avg_duration_ms
)
else:
affinity.avg_duration_ms = execution.duration_ms

# Update average token usage
total_tokens = (execution.tokens_input or 0) + (execution.tokens_output or 0)
if total_tokens:
if affinity.avg_token_usage:
affinity.avg_token_usage = int(
0.3 * total_tokens +
0.7 * affinity.avg_token_usage
)
else:
affinity.avg_token_usage = total_tokens

# Recalculate affinity score
affinity.affinity_score = calculate_base_affinity_score(affinity)

save_affinity(affinity)

5. Task Decomposition

5.1 Markdown Parser

import re
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class ParsedTask:
task_id: str
track: str
section: int
task_number: int
subtask_number: Optional[int]
title: str
completed: bool
line_number: int

def parse_pilot_plan(content: str) -> List[ParsedTask]:
"""
Parse PILOT plan markdown into structured tasks.

Supported formats:
- [x] A.9.1.1: Task title
- [ ] A.9.1: Task title
- [x] **A.9.1.1:** Task title
"""
tasks = []

# Pattern components
checkbox = r'- \[([ xX])\]'
task_id = r'\*{0,2}([A-G])\.(\d+)\.(\d+)(?:\.(\d+))?\*{0,2}:?\s*'
title = r'(.+)'

pattern = f'{checkbox}\\s+{task_id}{title}'

for line_num, line in enumerate(content.split('\n'), start=1):
match = re.match(pattern, line.strip())
if match:
completed = match.group(1).lower() == 'x'
track = match.group(2)
section = int(match.group(3))
task_num = int(match.group(4))
subtask_num = int(match.group(5)) if match.group(5) else None
task_title = match.group(6).strip()

task_id_str = f"{track}.{section}.{task_num}"
if subtask_num:
task_id_str += f".{subtask_num}"

tasks.append(ParsedTask(
task_id=task_id_str,
track=track,
section=section,
task_number=task_num,
subtask_number=subtask_num,
title=task_title,
completed=completed,
line_number=line_num
))

return tasks

5.2 Task Classification

TASK_TYPE_PATTERNS = {
"implementation": [
r"implement", r"create", r"build", r"develop", r"add",
r"setup", r"configure", r"enable"
],
"documentation": [
r"document", r"write.*doc", r"update.*readme",
r"create.*guide", r"add.*comment"
],
"testing": [
r"test", r"verify", r"validate", r"check", r"assert",
r"qa", r"quality"
],
"refactoring": [
r"refactor", r"clean", r"optimize", r"improve",
r"reorganize", r"restructure"
],
"bugfix": [
r"fix", r"resolve", r"repair", r"correct", r"patch"
],
"review": [
r"review", r"audit", r"analyze", r"assess", r"evaluate"
],
"deployment": [
r"deploy", r"release", r"publish", r"launch", r"rollout"
],
"research": [
r"research", r"investigate", r"explore", r"study",
r"analyze.*option"
]
}

def classify_task_type(title: str) -> str:
"""Classify task type based on title keywords."""
title_lower = title.lower()

for task_type, patterns in TASK_TYPE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, title_lower):
return task_type

return "general"

def infer_domain(track: str, title: str) -> str:
"""Infer domain from track and title."""
track_domains = {
'A': 'backend',
'B': 'frontend',
'C': 'devops',
'D': 'security',
'E': 'testing',
'F': 'documentation',
'G': 'dms'
}

return track_domains.get(track, 'general')

6. Dependency Resolution

6.1 Ready Task Query

-- Get tasks ready for execution
WITH blocking_incomplete AS (
SELECT DISTINCT d.target_task_id
FROM task_dependencies d
JOIN atomic_tasks t ON d.source_task_id = t.task_id
WHERE d.dependency_type = 'blocks'
AND t.status != 'completed'
)
SELECT t.*
FROM atomic_tasks t
WHERE t.status IN ('pending', 'ready')
AND t.assigned_agent IS NULL
AND t.task_id NOT IN (SELECT target_task_id FROM blocking_incomplete)
ORDER BY t.priority DESC, t.created_at ASC
LIMIT $1;

6.2 Dependency Graph Traversal

def get_dependency_graph(
task_id: str,
depth: int = 2,
direction: str = "both"
) -> Dict:
"""
Get dependency graph for a task.

Args:
task_id: Starting task
depth: Max traversal depth
direction: "upstream", "downstream", or "both"
"""
graph = {
"task_id": task_id,
"upstream": [],
"downstream": []
}

if direction in ("both", "upstream"):
graph["upstream"] = traverse_dependencies(
task_id, depth, "upstream"
)

if direction in ("both", "downstream"):
graph["downstream"] = traverse_dependencies(
task_id, depth, "downstream"
)

return graph

def traverse_dependencies(
task_id: str,
depth: int,
direction: str
) -> List[Dict]:
"""Recursive dependency traversal."""
if depth <= 0:
return []

if direction == "upstream":
deps = db.execute("""
SELECT d.*, t.status, t.title
FROM task_dependencies d
JOIN atomic_tasks t ON d.source_task_id = t.task_id
WHERE d.target_task_id = :task_id
""", task_id=task_id)
else:
deps = db.execute("""
SELECT d.*, t.status, t.title
FROM task_dependencies d
JOIN atomic_tasks t ON d.target_task_id = t.task_id
WHERE d.source_task_id = :task_id
""", task_id=task_id)

result = []
for dep in deps:
node = {
"task_id": dep.source_task_id if direction == "upstream" else dep.target_task_id,
"dependency_type": dep.dependency_type,
"status": dep.status,
"title": dep.title
}

# Recurse
if depth > 1:
child_id = node["task_id"]
node["children"] = traverse_dependencies(
child_id, depth - 1, direction
)

result.append(node)

return result

7. Circuit Breaker Pattern

7.1 Implementation

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery

@dataclass
class CircuitBreaker:
name: str
failure_threshold: int = 5
recovery_timeout: timedelta = timedelta(minutes=5)
half_open_max_calls: int = 3

# State
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
last_failure_time: Optional[datetime] = None
half_open_calls: int = 0

def can_execute(self) -> bool:
"""Check if execution is allowed."""
if self.state == CircuitState.CLOSED:
return True

if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if datetime.now() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
return True
return False

if self.state == CircuitState.HALF_OPEN:
return self.half_open_calls < self.half_open_max_calls

return False

def record_success(self) -> None:
"""Record successful execution."""
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0

def record_failure(self) -> None:
"""Record failed execution."""
self.failure_count += 1
self.last_failure_time = datetime.now()

if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN

# Per-agent circuit breakers
agent_circuits: Dict[str, CircuitBreaker] = {}

def get_circuit(agent_name: str) -> CircuitBreaker:
if agent_name not in agent_circuits:
agent_circuits[agent_name] = CircuitBreaker(name=agent_name)
return agent_circuits[agent_name]

7.2 Usage in Dispatcher

async def dispatch_task(task: AtomicTask, agent: str) -> bool:
"""
Dispatch task to agent with circuit breaker protection.
"""
circuit = get_circuit(agent)

if not circuit.can_execute():
logger.warning(f"Circuit open for {agent}, skipping task {task.task_id}")
return False

try:
result = await execute_task_with_agent(task, agent)

if result.success:
circuit.record_success()
else:
circuit.record_failure()

return result.success

except Exception as e:
circuit.record_failure()
logger.error(f"Task {task.task_id} failed with {agent}: {e}")
return False

8. Integration Specifications

8.1 /orient Command Integration

# In /orient command implementation
def orient_with_orchestrator():
"""
Enhanced /orient that uses task orchestrator.
"""
# 1. Get ready tasks from orchestrator
ready_tasks = orchestrator_client.get_ready_tasks(
limit=5,
track=current_track # From session context
)

# 2. Get recommendations for each task
recommendations = []
for task in ready_tasks:
agents = orchestrator_client.get_agent_recommendations(
task.task_id, top=2
)
recommendations.append({
"task": task,
"recommended_agents": agents
})

# 3. Generate task list for session
return format_task_list(recommendations)

8.2 Cloud Sync Integration

# In task-plan-sync.py hook
def sync_task_completion(task_id: str, outcome: str) -> None:
"""
Sync task completion to orchestrator and cloud.
"""
# 1. Update orchestrator
orchestrator_client.complete_task(task_id, outcome)

# 2. Update PILOT plan markdown
update_pilot_plan_checkbox(task_id, outcome == "success")

# 3. Sync to cloud backend
cloud_client.sync_task(task_id, {
"status": "completed" if outcome == "success" else "failed",
"outcome": outcome,
"completed_at": datetime.now().isoformat()
})

9. Deployment

9.1 Infrastructure Requirements

ComponentSpecification
DatabasePostgreSQL 15+ with pgvector extension
API ServerPython 3.11+ with FastAPI
CacheRedis (optional, for rate limiting)
Searchpgvector for semantic search

9.2 Docker Compose

version: '3.8'
services:
orchestrator-api:
build: .
ports:
- "8080:8080"
environment:
- DATABASE_URL=postgresql://postgres:pass@db:5432/orchestrator
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis

db:
image: pgvector/pgvector:pg15
environment:
POSTGRES_DB: orchestrator
POSTGRES_PASSWORD: pass
volumes:
- pgdata:/var/lib/postgresql/data

redis:
image: redis:7-alpine

volumes:
pgdata:

10. Appendix

10.1 Task Type Definitions

TypeDescriptionTypical Agents
implementationCode creationsenior-architect, backend-specialist
documentationWriting docscodi-documentation-writer
testingTest creation/executiontesting-specialist
refactoringCode improvementsenior-architect
bugfixBug resolutiondepends on domain
reviewCode/design reviewsenior-architect, security-specialist
deploymentRelease tasksdevops-engineer
researchInvestigationdepends on domain

10.2 Status Transitions

FromToTrigger
pendingreadyAll blocking deps completed
readyin_progressAgent assigned, execution started
in_progresscompletedExecution succeeded
in_progressfailedExecution failed
failedreadyRetry requested
blockedpendingBlocker resolved

Document Version: 1.0.0 Last Updated: January 13, 2026 Author: Claude Opus 4.5 Review Status: Draft