Agent Skills Framework Extension
Orchestration Patterns Skill
When to Use This Skill
Use this skill when implementing orchestration patterns patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
Multi-agent coordination, task decomposition, workflow planning, and distributed execution management.
Core Capabilities
- Task Decomposition - Break complex tasks into executable units
- Dependency Management - Track and resolve task dependencies
- Parallel Execution - Coordinate concurrent task execution
- Error Handling - Retry logic and graceful degradation
- State Management - Persistent workflow state tracking
Multi-Agent Orchestration Framework
#!/usr/bin/env python3
"""
Multi-agent orchestration framework for coordinating specialized agents.
Implements task decomposition, dependency resolution, and parallel execution.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
import asyncio
import json
from datetime import datetime
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
class TaskPriority(Enum):
P0 = 0 # Critical
P1 = 1 # High
P2 = 2 # Medium
P3 = 3 # Low
@dataclass
class Task:
id: str
name: str
description: str
agent: str
priority: TaskPriority
status: TaskStatus = TaskStatus.PENDING
dependencies: List[str] = field(default_factory=list)
subtasks: List['Task'] = field(default_factory=list)
result: Optional[Any] = None
error: Optional[str] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
retries: int = 0
max_retries: int = 3
def is_blocked(self, completed_tasks: set) -> bool:
"""Check if task is blocked by incomplete dependencies."""
return any(dep not in completed_tasks for dep in self.dependencies)
def can_execute(self, completed_tasks: set) -> bool:
"""Check if task can be executed."""
return (
self.status == TaskStatus.PENDING and
not self.is_blocked(completed_tasks)
)
@dataclass
class WorkflowPlan:
name: str
tasks: List[Task]
created_at: datetime = field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
def get_ready_tasks(self, completed_tasks: set) -> List[Task]:
"""Get tasks ready for execution."""
return [
task for task in self.tasks
if task.can_execute(completed_tasks)
]
def get_task_by_id(self, task_id: str) -> Optional[Task]:
"""Find task by ID."""
for task in self.tasks:
if task.id == task_id:
return task
return None
def to_dict(self) -> Dict:
"""Serialize workflow plan to dictionary."""
return {
'name': self.name,
'created_at': self.created_at.isoformat(),
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'tasks': [
{
'id': t.id,
'name': t.name,
'description': t.description,
'agent': t.agent,
'priority': t.priority.name,
'status': t.status.value,
'dependencies': t.dependencies,
'result': str(t.result) if t.result else None,
'error': t.error
}
for t in self.tasks
]
}
class Orchestrator:
def __init__(self):
self.agents: Dict[str, Callable] = {}
self.workflows: List[WorkflowPlan] = []
def register_agent(self, name: str, handler: Callable):
"""Register an agent handler function."""
self.agents[name] = handler
def decompose_task(self, goal: str) -> WorkflowPlan:
"""
Decompose high-level goal into executable task graph.
This is a simplified example - production would use LLM-based planning.
"""
tasks = []
# Example: Deploy application workflow
if "deploy application" in goal.lower():
tasks = [
Task(
id="task-1",
name="Run Tests",
description="Execute test suite to ensure code quality",
agent="qa-specialist",
priority=TaskPriority.P0,
dependencies=[]
),
Task(
id="task-2",
name="Build Container",
description="Build Docker container image",
agent="devops-engineer",
priority=TaskPriority.P0,
dependencies=["task-1"]
),
Task(
id="task-3",
name="Security Scan",
description="Scan container for vulnerabilities",
agent="security-specialist",
priority=TaskPriority.P0,
dependencies=["task-2"]
),
Task(
id="task-4",
name="Deploy to Staging",
description="Deploy to staging environment",
agent="devops-engineer",
priority=TaskPriority.P1,
dependencies=["task-3"]
),
Task(
id="task-5",
name="Integration Tests",
description="Run integration tests in staging",
agent="qa-specialist",
priority=TaskPriority.P1,
dependencies=["task-4"]
),
Task(
id="task-6",
name="Deploy to Production",
description="Deploy to production environment",
agent="devops-engineer",
priority=TaskPriority.P0,
dependencies=["task-5"]
)
]
workflow = WorkflowPlan(name=goal, tasks=tasks)
self.workflows.append(workflow)
return workflow
async def execute_task(self, task: Task) -> bool:
"""Execute a single task using assigned agent."""
if task.agent not in self.agents:
task.status = TaskStatus.FAILED
task.error = f"Agent '{task.agent}' not registered"
return False
task.status = TaskStatus.RUNNING
task.started_at = datetime.utcnow()
try:
agent_handler = self.agents[task.agent]
result = await agent_handler(task)
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.utcnow()
return True
except Exception as e:
task.error = str(e)
task.retries += 1
if task.retries < task.max_retries:
task.status = TaskStatus.PENDING
print(f"Task {task.id} failed, retrying ({task.retries}/{task.max_retries})")
return False
else:
task.status = TaskStatus.FAILED
print(f"Task {task.id} failed permanently after {task.retries} retries")
return False
async def execute_workflow(self, workflow: WorkflowPlan) -> Dict:
"""
Execute workflow with parallel task execution.
Returns execution report.
"""
completed_tasks = set()
failed_tasks = set()
print(f"\n🚀 Starting workflow: {workflow.name}")
print(f"📋 Total tasks: {len(workflow.tasks)}\n")
while len(completed_tasks) + len(failed_tasks) < len(workflow.tasks):
# Get tasks ready for execution
ready_tasks = workflow.get_ready_tasks(completed_tasks)
if not ready_tasks:
# Check if all remaining tasks are blocked by failures
remaining = [t for t in workflow.tasks if t.id not in completed_tasks and t.id not in failed_tasks]
if all(t.is_blocked(completed_tasks) for t in remaining):
print("\n⚠️ Workflow blocked - dependencies failed")
break
# No tasks ready but workflow not blocked - wait
await asyncio.sleep(0.1)
continue
# Sort by priority
ready_tasks.sort(key=lambda t: t.priority.value)
# Execute tasks in parallel
execution_results = await asyncio.gather(
*[self.execute_task(task) for task in ready_tasks],
return_exceptions=True
)
# Update completed/failed sets
for task, success in zip(ready_tasks, execution_results):
if success:
completed_tasks.add(task.id)
print(f"✅ Completed: {task.name}")
elif task.status == TaskStatus.FAILED:
failed_tasks.add(task.id)
print(f"❌ Failed: {task.name} - {task.error}")
workflow.completed_at = datetime.utcnow()
# Generate report
report = {
'workflow': workflow.name,
'total_tasks': len(workflow.tasks),
'completed': len(completed_tasks),
'failed': len(failed_tasks),
'success_rate': (len(completed_tasks) / len(workflow.tasks)) * 100,
'duration': (workflow.completed_at - workflow.created_at).total_seconds(),
'tasks': workflow.to_dict()['tasks']
}
print(f"\n📊 Workflow Summary:")
print(f" Completed: {report['completed']}/{report['total_tasks']}")
print(f" Failed: {report['failed']}")
print(f" Success Rate: {report['success_rate']:.1f}%")
print(f" Duration: {report['duration']:.1f}s")
return report
# Example agent handlers
async def qa_specialist_handler(task: Task) -> str:
"""Example QA specialist agent."""
await asyncio.sleep(1) # Simulate work
if "test" in task.name.lower():
return "All tests passed"
return "QA complete"
async def devops_engineer_handler(task: Task) -> str:
"""Example DevOps engineer agent."""
await asyncio.sleep(1) # Simulate work
if "deploy" in task.name.lower():
return "Deployment successful"
elif "build" in task.name.lower():
return "Build complete"
return "DevOps task complete"
async def security_specialist_handler(task: Task) -> str:
"""Example security specialist agent."""
await asyncio.sleep(1) # Simulate work
if "scan" in task.name.lower():
return "No vulnerabilities found"
return "Security check passed"
# Usage example
async def main():
orchestrator = Orchestrator()
# Register agents
orchestrator.register_agent("qa-specialist", qa_specialist_handler)
orchestrator.register_agent("devops-engineer", devops_engineer_handler)
orchestrator.register_agent("security-specialist", security_specialist_handler)
# Decompose and execute workflow
workflow = orchestrator.decompose_task("Deploy application to production")
report = await orchestrator.execute_workflow(workflow)
# Save report
with open('workflow-report.json', 'w') as f:
json.dump(report, f, indent=2)
if __name__ == '__main__':
asyncio.run(main())
Temporal Workflow Pattern
#!/usr/bin/env python3
"""
Temporal workflow for durable orchestration with retries and compensation.
Requires: pip install temporalio
"""
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
from datetime import timedelta
from dataclasses import dataclass
@dataclass
class DeploymentConfig:
app_name: str
version: str
environment: str
rollback_on_failure: bool = True
@activity.defn
async def run_tests() -> str:
"""Run automated tests."""
# Actual test execution
return "Tests passed"
@activity.defn
async def build_container(config: DeploymentConfig) -> str:
"""Build Docker container."""
# Actual build logic
return f"Built {config.app_name}:{config.version}"
@activity.defn
async def deploy_to_environment(config: DeploymentConfig) -> str:
"""Deploy to specified environment."""
# Actual deployment logic
return f"Deployed to {config.environment}"
@activity.defn
async def rollback_deployment(config: DeploymentConfig) -> str:
"""Rollback failed deployment."""
# Actual rollback logic
return f"Rolled back {config.app_name} in {config.environment}"
@workflow.defn
class DeploymentWorkflow:
@workflow.run
async def run(self, config: DeploymentConfig) -> str:
"""
Deployment workflow with automatic rollback on failure.
Temporal handles retries, state persistence, and compensation.
"""
# Step 1: Run tests
test_result = await workflow.execute_activity(
run_tests,
start_to_close_timeout=timedelta(minutes=10),
retry_policy={"maximum_attempts": 3}
)
# Step 2: Build container
build_result = await workflow.execute_activity(
build_container,
config,
start_to_close_timeout=timedelta(minutes=15),
retry_policy={"maximum_attempts": 2}
)
# Step 3: Deploy with compensation on failure
try:
deploy_result = await workflow.execute_activity(
deploy_to_environment,
config,
start_to_close_timeout=timedelta(minutes=30),
retry_policy={"maximum_attempts": 3}
)
return f"Deployment successful: {deploy_result}"
except Exception as e:
# Compensation: rollback on failure
if config.rollback_on_failure:
await workflow.execute_activity(
rollback_deployment,
config,
start_to_close_timeout=timedelta(minutes=10)
)
raise workflow.ApplicationError(
f"Deployment failed and rolled back: {e}"
)
# Usage
async def run_deployment():
client = await Client.connect("localhost:7233")
config = DeploymentConfig(
app_name="my-app",
version="1.2.3",
environment="production"
)
result = await client.execute_workflow(
DeploymentWorkflow.run,
config,
id=f"deployment-{config.app_name}-{config.version}",
task_queue="deployment-queue"
)
print(result)
DAG-Based Workflow
#!/usr/bin/env python3
"""
DAG-based workflow for Airflow-style task orchestration.
Supports complex dependencies and conditional execution.
"""
from typing import List, Callable, Dict, Any
from dataclasses import dataclass
import networkx as nx
@dataclass
class DAGTask:
id: str
func: Callable
upstream_tasks: List[str] = None
def __post_init__(self):
if self.upstream_tasks is None:
self.upstream_tasks = []
class DAG:
def __init__(self, dag_id: str):
self.dag_id = dag_id
self.tasks: Dict[str, DAGTask] = {}
self.graph = nx.DiGraph()
def add_task(self, task: DAGTask):
"""Add task to DAG."""
self.tasks[task.id] = task
self.graph.add_node(task.id)
for upstream in task.upstream_tasks:
self.graph.add_edge(upstream, task.id)
def validate(self) -> bool:
"""Validate DAG has no cycles."""
try:
nx.find_cycle(self.graph)
raise ValueError("DAG contains cycles")
except nx.NetworkXNoCycle:
return True
def get_execution_order(self) -> List[List[str]]:
"""Get tasks grouped by execution level (for parallelization)."""
return list(nx.topological_generations(self.graph))
def execute(self, context: Dict[str, Any] = None) -> Dict:
"""Execute DAG with parallel execution where possible."""
if not self.validate():
raise ValueError("Invalid DAG")
context = context or {}
results = {}
for level in self.get_execution_order():
print(f"Executing level: {level}")
for task_id in level:
task = self.tasks[task_id]
# Prepare task context
task_context = {
**context,
'task_id': task_id,
'upstream_results': {
upstream: results.get(upstream)
for upstream in task.upstream_tasks
}
}
# Execute task
result = task.func(**task_context)
results[task_id] = result
return results
# Example DAG
def create_deployment_dag() -> DAG:
dag = DAG("deployment_dag")
def run_tests(**context):
print("Running tests...")
return {"status": "passed"}
def build_app(**context):
print("Building application...")
return {"artifact": "app:v1.0"}
def deploy_staging(**context):
artifact = context['upstream_results']['build']['artifact']
print(f"Deploying {artifact} to staging...")
return {"url": "https://staging.example.com"}
def deploy_prod(**context):
artifact = context['upstream_results']['build']['artifact']
print(f"Deploying {artifact} to production...")
return {"url": "https://example.com"}
# Define tasks
dag.add_task(DAGTask("test", run_tests))
dag.add_task(DAGTask("build", build_app, ["test"]))
dag.add_task(DAGTask("staging", deploy_staging, ["build"]))
dag.add_task(DAGTask("prod", deploy_prod, ["staging"]))
return dag
# Usage
if __name__ == '__main__':
dag = create_deployment_dag()
results = dag.execute()
print(f"\nResults: {results}")
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: orchestration-patterns
Completed:
- [x] Task graph created with dependencies resolved
- [x] Agent handlers registered
- [x] Workflow executed with parallel processing
- [x] Execution report generated
Outputs:
- WorkflowPlan with all tasks defined
- Execution report (completed/failed/success_rate)
- Task execution logs with timestamps
- Dependency graph visualization
- Performance metrics (duration, throughput)
Completion Checklist
Before marking this skill as complete, verify:
- All tasks have clear dependencies listed
- No circular dependencies in task graph
- Agent handlers registered for all task types
- Retry logic configured with max attempts
- Error handling strategies defined (continue/abort/escalate)
- State persistence implemented (file system or database)
- Parallel execution working for independent tasks
- Execution report includes success/failure metrics
Failure Indicators
This skill has FAILED if:
- ❌ Circular dependencies detected in task graph
- ❌ Task execution blocked indefinitely
- ❌ Agent handler missing for task type
- ❌ No retry logic for transient failures
- ❌ Workflow state lost on interruption
- ❌ Parallel tasks causing race conditions
- ❌ No error recovery mechanism
- ❌ Execution report incomplete or missing
When NOT to Use
Do NOT use this skill when:
- Single task with no dependencies (use direct execution instead)
- All tasks are sequential (simple script better)
- No inter-task communication needed (run independently)
- Task count < 3 (overhead not justified)
- Real-time requirements (orchestration adds latency)
- No failure recovery needed (simple batch processing)
Use alternative approaches when:
- Single task → Direct function call
- Sequential only → Bash script with &&
- Independent tasks → Parallel batch script
- Simple workflow → Task queue (Celery/RQ)
- Event-driven → Pub/sub system
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Monolithic task definitions | Hard to debug, no reusability | Break into discrete, idempotent tasks |
| Missing dependency declarations | Execution order undefined | Explicitly list all dependencies |
| No retry logic | Transient failures = permanent | Add retry with exponential backoff |
| In-memory state only | Lost on crash | Persist workflow state to disk/DB |
| Synchronous execution | Slow, no parallelization | Use async/await for concurrent execution |
| Ignoring task failures | Silent errors propagate | Fail fast or escalate based on criticality |
| Hardcoded agent selection | Inflexible routing | Use agent discovery/registry pattern |
| No timeout enforcement | Tasks hang indefinitely | Set timeouts per task priority |
Principles
This skill embodies these CODITECT principles:
#3 Separation of Concerns
- Each task = one responsibility
- Agent handlers isolated by domain
- Orchestrator separate from execution
#4 Modularity and Reusability
- Task definitions reusable across workflows
- Agent handlers composable
- Workflow patterns (Temporal, DAG) pluggable
#7 Incremental Improvement
- Start with simple orchestrator
- Add Temporal for durability
- Add DAG for complex dependencies
Automation (CODITECT-STANDARD-AUTOMATION.md)
- Automatic dependency resolution
- Self-healing with retry logic
- State persistence for recovery
Usage Examples
Multi-Agent Workflow Orchestration
Apply orchestration-patterns skill to coordinate multi-agent workflow for complex task execution with dependency management
Temporal Durable Workflow
Apply orchestration-patterns skill to implement Temporal workflow with automatic retries and rollback compensation
DAG-Based Task Execution
Apply orchestration-patterns skill to create Airflow-style DAG with parallel task execution and dependency resolution
Integration Points
- git-workflow-patterns - Coordinate git operations across agents
- project-organization-patterns - Structure orchestration artifacts
- cicd-pipeline-design - Integrate with CI/CD workflows