Skip to main content

Auto Trigger Configuration

Agentic Orchestrator Skill

How to Use This Skill

  1. Review the patterns and examples below
  2. Apply the relevant patterns to your implementation
  3. Follow the best practices outlined in this skill

Expert skill for coordinating multiple specialized agents to complete complex workflows. Provides patterns for task decomposition, parallel execution, state management, and result synthesis.

When to Use

Use this skill when:

  • Task requires multiple specialized agents
  • Work can be parallelized across agents
  • Need checkpoint/resume capabilities
  • Quality gates between phases are required
  • Results from multiple agents need synthesis
  • Strategy briefs, research reports, or complex implementations

Don't use this skill when:

  • Single agent can complete the task
  • Simple sequential workflow suffices
  • No inter-agent dependencies exist
  • Task is research-only without synthesis

Core Patterns

1. Wave Execution Architecture

Execute agents in waves - parallel within waves, sequential between waves.

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import asyncio
import json
from datetime import datetime, timezone


class ExecutionStatus(Enum):
"""Agent execution status"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"


@dataclass
class AgentTask:
"""Task for a single agent"""
agent_name: str
prompt: str
depends_on: List[str] = field(default_factory=list)
max_tokens: int = 8000
priority: int = 0
timeout: float = 300.0


@dataclass
class AgentResult:
"""Result from agent execution"""
agent_name: str
status: ExecutionStatus
output: Optional[str] = None
error: Optional[str] = None
tokens_used: int = 0
duration_seconds: float = 0.0


class WaveOrchestrator:
"""
Execute agents in waves with dependency resolution.

Wave 1: Independent research agents (parallel)
Wave 2: Framework/analysis agents (depend on Wave 1)
Wave 3: Synthesis agent (depends on all)
"""

def __init__(self, max_parallel: int = 5):
self.max_parallel = max_parallel
self.results: Dict[str, AgentResult] = {}
self.state_file = ".coditect/checkpoints/orchestration-state.json"

def plan_waves(self, tasks: List[AgentTask]) -> List[List[AgentTask]]:
"""
Organize tasks into waves based on dependencies.

Returns list of waves, each wave contains tasks that can run in parallel.
"""
waves = []
completed = set()
remaining = list(tasks)

while remaining:
# Find tasks with all dependencies satisfied
ready = [
t for t in remaining
if all(d in completed for d in t.depends_on)
]

if not ready:
# Circular dependency or missing task
raise ValueError(
f"Cannot schedule remaining tasks: {[t.agent_name for t in remaining]}"
)

# Sort by priority within wave
ready.sort(key=lambda t: t.priority, reverse=True)
waves.append(ready)

# Update tracking
for task in ready:
completed.add(task.agent_name)
remaining.remove(task)

return waves

async def execute_wave(
self,
wave: List[AgentTask],
executor: Callable[[AgentTask], AgentResult]
) -> Dict[str, AgentResult]:
"""Execute all tasks in a wave in parallel."""
semaphore = asyncio.Semaphore(self.max_parallel)

async def bounded_execute(task: AgentTask) -> AgentResult:
async with semaphore:
return await executor(task)

# Execute all tasks in parallel
coros = [bounded_execute(task) for task in wave]
results = await asyncio.gather(*coros, return_exceptions=True)

# Process results
wave_results = {}
for task, result in zip(wave, results):
if isinstance(result, Exception):
wave_results[task.agent_name] = AgentResult(
agent_name=task.agent_name,
status=ExecutionStatus.FAILED,
error=str(result)
)
else:
wave_results[task.agent_name] = result

return wave_results

async def execute_all(
self,
tasks: List[AgentTask],
executor: Callable[[AgentTask], AgentResult],
checkpoint_after_wave: bool = True
) -> Dict[str, AgentResult]:
"""
Execute all tasks in waves.

Args:
tasks: List of agent tasks to execute
executor: Function to execute a single task
checkpoint_after_wave: Save state after each wave

Returns:
Dictionary of agent name -> result
"""
waves = self.plan_waves(tasks)
all_results = {}

for wave_num, wave in enumerate(waves, 1):
print(f"Executing Wave {wave_num}/{len(waves)}: "
f"{[t.agent_name for t in wave]}")

# Execute wave
wave_results = await self.execute_wave(wave, executor)
all_results.update(wave_results)
self.results.update(wave_results)

# Check for failures
failures = [
name for name, result in wave_results.items()
if result.status == ExecutionStatus.FAILED
]

if failures:
print(f"Wave {wave_num} had failures: {failures}")
# Continue with successful results for now

# Checkpoint
if checkpoint_after_wave:
self.save_checkpoint(wave_num)

return all_results

def save_checkpoint(self, wave_num: int):
"""Save current state for resume capability."""
checkpoint = {
"wave_completed": wave_num,
"timestamp": datetime.now(timezone.utc).isoformat(),
"results": {
name: {
"status": result.status.value,
"output_length": len(result.output or ""),
"tokens_used": result.tokens_used,
"error": result.error
}
for name, result in self.results.items()
}
}

with open(self.state_file, 'w') as f:
json.dump(checkpoint, f, indent=2)

def load_checkpoint(self) -> Optional[Dict]:
"""Load previous checkpoint for resume."""
try:
with open(self.state_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return None

2. Task Decomposition

Break complex tasks into agent-sized subtasks.

from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum


class TaskCategory(Enum):
"""Categories for task routing"""
RESEARCH = "research"
ANALYSIS = "analysis"
IMPLEMENTATION = "implementation"
SYNTHESIS = "synthesis"
VALIDATION = "validation"


@dataclass
class DecomposedTask:
"""A decomposed subtask"""
id: str
category: TaskCategory
description: str
agent_type: str
prompt: str
depends_on: List[str]
priority: int
estimated_tokens: int


class TaskDecomposer:
"""
Decompose complex tasks into orchestratable subtasks.

Uses capability matching to route to appropriate agents.
"""

# Agent capabilities mapping
AGENT_CAPABILITIES = {
"market-researcher": {
"categories": [TaskCategory.RESEARCH],
"keywords": ["market", "size", "growth", "segment", "demand"],
"max_tokens": 8000
},
"competitive-analyst": {
"categories": [TaskCategory.RESEARCH, TaskCategory.ANALYSIS],
"keywords": ["competitor", "competitive", "moat", "positioning"],
"max_tokens": 10000
},
"trend-analyst": {
"categories": [TaskCategory.RESEARCH, TaskCategory.ANALYSIS],
"keywords": ["trend", "emerging", "disruption", "technology"],
"max_tokens": 8000
},
"framework-specialist": {
"categories": [TaskCategory.ANALYSIS],
"keywords": ["framework", "porter", "swot", "strategic"],
"max_tokens": 6000
},
"synthesis-writer": {
"categories": [TaskCategory.SYNTHESIS],
"keywords": ["synthesis", "brief", "report", "recommendation"],
"max_tokens": 12000
}
}

def decompose(
self,
task_description: str,
context: Dict[str, Any]
) -> List[DecomposedTask]:
"""
Decompose a task into subtasks for different agents.

Args:
task_description: High-level task description
context: Context like industry, geography, etc.

Returns:
List of decomposed subtasks
"""
subtasks = []
task_lower = task_description.lower()

# Determine required capabilities
required_agents = []
for agent, caps in self.AGENT_CAPABILITIES.items():
if any(kw in task_lower for kw in caps["keywords"]):
required_agents.append(agent)

# Default to full research workflow if strategy brief
if "strategy" in task_lower or "brief" in task_lower:
required_agents = list(self.AGENT_CAPABILITIES.keys())

# Create subtasks
priority = len(required_agents)
for agent in required_agents:
caps = self.AGENT_CAPABILITIES[agent]

# Determine dependencies
if agent == "synthesis-writer":
deps = [a for a in required_agents if a != "synthesis-writer"]
elif agent == "framework-specialist":
deps = ["market-researcher", "competitive-analyst"]
deps = [d for d in deps if d in required_agents]
else:
deps = []

subtask = DecomposedTask(
id=f"task_{agent}",
category=caps["categories"][0],
description=f"{agent} analysis for {context.get('industry', 'target')}",
agent_type=agent,
prompt=self._generate_prompt(agent, context),
depends_on=[f"task_{d}" for d in deps],
priority=priority,
estimated_tokens=caps["max_tokens"]
)
subtasks.append(subtask)
priority -= 1

return subtasks

def _generate_prompt(
self,
agent_type: str,
context: Dict[str, Any]
) -> str:
"""Generate agent-specific prompt."""
industry = context.get("industry", "the target industry")
geography = context.get("geography", "Global")

prompts = {
"market-researcher": f"""
Research the {industry} market in {geography}:
- Market size and growth rate (with sources)
- Key segments and characteristics
- Demand drivers and trends
- Value chain structure

Return findings in structured JSON format with sources and confidence scores.
""",
"competitive-analyst": f"""
Analyze top competitors in {industry}:
- Market leaders with revenue/share
- Business models and positioning
- Strengths, weaknesses, moats
- Recent strategic moves

Return competitor profiles in structured JSON format.
""",
"trend-analyst": f"""
Identify key trends affecting {industry}:
- Technology trends and disruption
- Regulatory changes
- Consumer behavior shifts
- Emerging opportunities and threats

Return trend analysis with timeline and impact assessment.
""",
"framework-specialist": f"""
Apply strategic frameworks to {industry}:
- Porter's Five Forces analysis
- SWOT for a new entrant
- Value chain analysis
- Strategic group mapping

Use insights from research and competitive analysis.
""",
"synthesis-writer": f"""
Synthesize all research into executive strategy brief for {industry}:
- Executive summary (300 words max)
- Key findings and insights
- Strategic recommendations (prioritized)
- Risk assessment
- Implementation roadmap

Combine all agent outputs into cohesive narrative.
"""
}

return prompts.get(agent_type, f"Analyze {industry}")

3. Result Synthesis

Combine outputs from multiple agents into unified result.

from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import json


@dataclass
class SynthesisInput:
"""Input for synthesis from an agent"""
agent_name: str
category: str
content: str
confidence: float
sources: List[str]


@dataclass
class SynthesizedResult:
"""Final synthesized output"""
executive_summary: str
detailed_findings: Dict[str, Any]
recommendations: List[Dict[str, Any]]
sources: List[str]
confidence_score: float
quality_score: float


class ResultSynthesizer:
"""
Synthesize results from multiple agents into cohesive output.

Handles conflicting information, source aggregation, and
quality scoring.
"""

def synthesize(
self,
inputs: List[SynthesisInput],
context: Dict[str, Any]
) -> SynthesizedResult:
"""
Synthesize multiple agent outputs.

Args:
inputs: Results from individual agents
context: Original task context

Returns:
Synthesized result with quality metrics
"""
# Aggregate by category
by_category = {}
all_sources = []
confidences = []

for inp in inputs:
if inp.category not in by_category:
by_category[inp.category] = []
by_category[inp.category].append(inp)
all_sources.extend(inp.sources)
confidences.append(inp.confidence)

# Deduplicate sources
unique_sources = list(set(all_sources))

# Calculate overall confidence
avg_confidence = sum(confidences) / len(confidences) if confidences else 0.5

# Build findings structure
findings = {}
for category, category_inputs in by_category.items():
findings[category] = {
"content": [i.content for i in category_inputs],
"agent_count": len(category_inputs),
"avg_confidence": sum(i.confidence for i in category_inputs) / len(category_inputs)
}

# Generate executive summary
exec_summary = self._generate_executive_summary(findings, context)

# Extract recommendations
recommendations = self._extract_recommendations(inputs)

# Calculate quality score
quality = self._calculate_quality(
inputs=inputs,
unique_sources=unique_sources,
recommendations=recommendations
)

return SynthesizedResult(
executive_summary=exec_summary,
detailed_findings=findings,
recommendations=recommendations,
sources=unique_sources,
confidence_score=avg_confidence,
quality_score=quality
)

def _generate_executive_summary(
self,
findings: Dict[str, Any],
context: Dict[str, Any]
) -> str:
"""Generate executive summary from findings."""
industry = context.get("industry", "the market")

summary_parts = [
f"## Executive Summary: {industry}\n"
]

if "research" in findings:
summary_parts.append(
f"**Market Overview:** Based on {findings['research']['agent_count']} "
f"research agents with {findings['research']['avg_confidence']:.0%} confidence.\n"
)

if "analysis" in findings:
summary_parts.append(
f"**Analysis:** Strategic frameworks applied by "
f"{findings['analysis']['agent_count']} analysts.\n"
)

return "\n".join(summary_parts)

def _extract_recommendations(
self,
inputs: List[SynthesisInput]
) -> List[Dict[str, Any]]:
"""Extract and prioritize recommendations."""
recommendations = []
priority = 1

for inp in inputs:
# Look for recommendation patterns in content
if "recommend" in inp.content.lower():
recommendations.append({
"priority": priority,
"source": inp.agent_name,
"confidence": inp.confidence,
"summary": f"Recommendation from {inp.agent_name}"
})
priority += 1

return recommendations

def _calculate_quality(
self,
inputs: List[SynthesisInput],
unique_sources: List[str],
recommendations: List[Dict[str, Any]]
) -> float:
"""Calculate quality score (0-1)."""
score = 0.0
total_checks = 5

# Check 1: Multiple agents contributed
if len(inputs) >= 3:
score += 1
elif len(inputs) >= 2:
score += 0.5

# Check 2: Sources provided
if len(unique_sources) >= 10:
score += 1
elif len(unique_sources) >= 5:
score += 0.5

# Check 3: Recommendations generated
if len(recommendations) >= 3:
score += 1
elif len(recommendations) >= 1:
score += 0.5

# Check 4: High confidence
avg_conf = sum(i.confidence for i in inputs) / len(inputs) if inputs else 0
if avg_conf >= 0.8:
score += 1
elif avg_conf >= 0.6:
score += 0.5

# Check 5: Multiple categories covered
categories = set(i.category for i in inputs)
if len(categories) >= 3:
score += 1
elif len(categories) >= 2:
score += 0.5

return score / total_checks

4. Quality Gates

Enforce quality standards between orchestration phases.

from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable
from enum import Enum


class GateDecision(Enum):
"""Quality gate decision"""
PASS = "pass"
FAIL = "fail"
CONDITIONAL = "conditional"


@dataclass
class GateResult:
"""Result of quality gate check"""
decision: GateDecision
score: float
threshold: float
checks_passed: List[str]
checks_failed: List[str]
recommendations: List[str]


class QualityGate:
"""
Quality gates between orchestration phases.

Ensures outputs meet standards before proceeding.
"""

def __init__(
self,
name: str,
threshold: float = 0.75,
required_checks: List[str] = None
):
self.name = name
self.threshold = threshold
self.required_checks = required_checks or []
self.checks: Dict[str, Callable] = {}

def add_check(
self,
name: str,
check_fn: Callable[[Any], bool],
required: bool = False
):
"""Add a quality check."""
self.checks[name] = check_fn
if required and name not in self.required_checks:
self.required_checks.append(name)

def evaluate(self, data: Any) -> GateResult:
"""
Evaluate data against quality gate.

Args:
data: Data to evaluate

Returns:
GateResult with pass/fail decision
"""
passed = []
failed = []
recommendations = []

for check_name, check_fn in self.checks.items():
try:
if check_fn(data):
passed.append(check_name)
else:
failed.append(check_name)
if check_name in self.required_checks:
recommendations.append(
f"Required check '{check_name}' failed - must be fixed"
)
except Exception as e:
failed.append(check_name)
recommendations.append(f"Check '{check_name}' error: {str(e)}")

# Calculate score
total = len(passed) + len(failed)
score = len(passed) / total if total > 0 else 0

# Determine decision
required_failed = [c for c in failed if c in self.required_checks]

if required_failed:
decision = GateDecision.FAIL
recommendations.append(
f"Failed required checks: {required_failed}"
)
elif score >= self.threshold:
decision = GateDecision.PASS
else:
decision = GateDecision.CONDITIONAL
recommendations.append(
f"Score {score:.1%} below threshold {self.threshold:.1%}"
)

return GateResult(
decision=decision,
score=score,
threshold=self.threshold,
checks_passed=passed,
checks_failed=failed,
recommendations=recommendations
)


# Pre-built gates for strategy brief workflow
def create_research_gate() -> QualityGate:
"""Create quality gate for research phase."""
gate = QualityGate("research", threshold=0.75)

gate.add_check(
"has_market_size",
lambda d: "market" in str(d).lower() and any(
c in str(d) for c in ["$", "billion", "million", "%"]
),
required=True
)

gate.add_check(
"has_sources",
lambda d: "source" in str(d).lower() or "http" in str(d).lower()
)

gate.add_check(
"sufficient_length",
lambda d: len(str(d)) >= 2000
)

return gate


def create_synthesis_gate() -> QualityGate:
"""Create quality gate for synthesis phase."""
gate = QualityGate("synthesis", threshold=0.85)

gate.add_check(
"has_executive_summary",
lambda d: "executive summary" in str(d).lower(),
required=True
)

gate.add_check(
"has_recommendations",
lambda d: "recommend" in str(d).lower(),
required=True
)

gate.add_check(
"has_structure",
lambda d: str(d).count("##") >= 3
)

gate.add_check(
"appropriate_length",
lambda d: 5000 <= len(str(d)) <= 50000
)

return gate

Usage Examples

Strategy Brief Orchestration

# Create orchestrator
orchestrator = WaveOrchestrator(max_parallel=3)

# Define tasks
tasks = [
AgentTask(
agent_name="market-researcher",
prompt="Research AI development tools market...",
priority=3
),
AgentTask(
agent_name="competitive-analyst",
prompt="Analyze top competitors...",
priority=3
),
AgentTask(
agent_name="trend-analyst",
prompt="Identify key trends...",
priority=3
),
AgentTask(
agent_name="framework-specialist",
prompt="Apply strategic frameworks...",
depends_on=["market-researcher", "competitive-analyst"],
priority=2
),
AgentTask(
agent_name="synthesis-writer",
prompt="Create executive strategy brief...",
depends_on=["market-researcher", "competitive-analyst",
"trend-analyst", "framework-specialist"],
priority=1
),
]

# Execute with checkpointing
results = await orchestrator.execute_all(
tasks=tasks,
executor=execute_agent, # Your agent execution function
checkpoint_after_wave=True
)

# Synthesize results
synthesizer = ResultSynthesizer()
final = synthesizer.synthesize(
inputs=[...], # Convert results to SynthesisInput
context={"industry": "AI Development Tools"}
)

print(f"Quality Score: {final.quality_score:.1%}")

With Quality Gates

# Create gates
research_gate = create_research_gate()
synthesis_gate = create_synthesis_gate()

# After Wave 1 (research)
research_results = await orchestrator.execute_wave(wave1, executor)
gate_result = research_gate.evaluate(research_results)

if gate_result.decision == GateDecision.FAIL:
print(f"Research failed quality gate: {gate_result.recommendations}")
# Handle failure - retry or abort

# After synthesis
synthesis_result = await orchestrator.execute_wave(synthesis_wave, executor)
final_gate = synthesis_gate.evaluate(synthesis_result)

if final_gate.decision == GateDecision.PASS:
print(f"Strategy brief passed all quality checks: {final_gate.score:.1%}")

Best Practices

DO

  • Plan waves carefully - Minimize wave count, maximize parallelism
  • Use checkpoints - Enable resume after failures
  • Set appropriate timeouts - Prevent hung agents
  • Validate between phases - Catch errors early
  • Track metrics - Monitor token usage and success rates
  • Handle partial failures - Continue with successful results when possible
  • Deduplicate sources - Clean synthesis inputs

DON'T

  • Don't create circular dependencies - Will deadlock
  • Don't skip quality gates - Garbage in, garbage out
  • Don't ignore failures - Log and handle appropriately
  • Don't over-parallelize - Stay within API limits
  • Don't hardcode prompts - Use templates with context
  • Don't forget synthesis - Raw outputs need integration

Configuration Reference

ParameterDefaultDescription
max_parallel5Maximum concurrent agents per wave
checkpoint_dir.coditect/checkpoints/Checkpoint storage
research_threshold0.75Research quality gate threshold
synthesis_threshold0.85Synthesis quality gate threshold
default_timeout300sAgent execution timeout
max_retries2Retries per agent on failure

Integration with CODITECT

Recommended integration points:

ComponentUsageNotes
strategy-brief-generatorPrimary orchestratorFull workflow
orchestrator agentGeneral coordinationTask tool routing
workflow-orchestratorComplex pipelinesState management
use-case-analyzerIntent routingCapability matching

Success Metrics

MetricTargetMeasurement
Wave efficiency>80% parallelTasks running simultaneously
Quality gate pass rate>90%First-attempt passes
Checkpoint recovery100%Resume from any wave
Token efficiency<50K avgPer strategy brief
End-to-end time<10 minFull orchestration

Multi-Context Window Support

State Tracking (JSON)

{
"orchestration_id": "orch_20251221_strategy",
"status": "wave_2_complete",
"waves_completed": 2,
"waves_total": 3,
"results": {
"market-researcher": {"status": "completed", "tokens": 4521},
"competitive-analyst": {"status": "completed", "tokens": 6234},
"trend-analyst": {"status": "completed", "tokens": 3892},
"framework-specialist": {"status": "completed", "tokens": 4156}
},
"quality_scores": {
"research_gate": 0.87,
"analysis_gate": 0.82
},
"next_action": "Execute synthesis wave"
}

Progress Notes (Markdown)

# Orchestration Progress - Strategy Brief

## Status: Wave 2 Complete (67%)

### Completed
- [x] Wave 1: Research (3 agents, 14,647 tokens)
- [x] Wave 2: Analysis (1 agent, 4,156 tokens)

### Pending
- [ ] Wave 3: Synthesis (1 agent)

### Quality Gates
- Research: 87% ✓
- Analysis: 82% ✓

### Next Steps
1. Execute synthesis-writer with all inputs
2. Validate final output (85% threshold)
3. Generate strategy brief document

Session Recovery

When starting a fresh context window:

  1. Load checkpoint: Read .coditect/checkpoints/orchestration-state.json
  2. Resume from wave: Skip completed waves
  3. Inject prior results: Pass to dependent agents
  4. Continue execution: Complete remaining waves

Success Output

When successful, this skill MUST output:

✅ SKILL COMPLETE: agentic-orchestrator

Completed:
- [x] All waves executed (3/3 waves)
- [x] All agents completed successfully
- [x] Quality gates passed (research: 87%, synthesis: 91%)
- [x] Results synthesized into final output

Outputs:
- Strategy brief: docs/strategy-brief.md
- Orchestration log: .coditect/checkpoints/orchestration-state.json
- Quality score: 91% (above 85% threshold)
- Total tokens: 48,234 (within 50K budget)
- Execution time: 8m 24s (within 10min target)

Completion Checklist

Before marking this skill as complete, verify:

  • All waves planned with correct dependencies
  • All agents executed without failures
  • Quality gates evaluated and passed (or conditionally passed)
  • Results synthesized into cohesive output
  • Checkpoint files saved for resume capability
  • Token usage tracked and within budget
  • Final quality score meets threshold

Failure Indicators

This skill has FAILED if:

  • ❌ Circular dependencies detected during wave planning
  • ❌ Critical agents failed without successful retry
  • ❌ Quality gate failure with required checks not met
  • ❌ Synthesis produced incomplete or malformed output
  • ❌ Token budget exceeded by >20%
  • ❌ Checkpoint corruption preventing resume

When NOT to Use

Do NOT use this skill when:

  • Single agent can complete the task independently
  • Task is purely research-only without synthesis requirement
  • No inter-agent dependencies exist (simple parallel execution suffices)
  • Real-time response required (<30s) - orchestration adds overhead
  • Task requires human decision-making between steps

Use alternatives:

  • Direct agent invocation for single-agent tasks
  • Task(subagent_type=...) for simple Claude Code built-in agent use
  • Manual coordination for tasks requiring human judgment
  • Simple parallel execution without wave orchestration for independent tasks

Anti-Patterns (Avoid)

Anti-PatternProblemSolution
Creating circular dependenciesDeadlocks wave executionValidate dependency graph before execution
Skipping quality gatesGarbage in, garbage outAlways evaluate between phases
Over-parallelizationAPI rate limit errorsRespect max_parallel limit (default: 5)
Ignoring partial failuresCascade failuresHandle failures gracefully, continue with successful results
Hardcoding promptsContext-specific failuresUse template functions with context injection
No checkpointingCan't resume after failuresSave state after each wave
Missing synthesisRaw unintegrated outputsAlways include synthesis agent for final integration

Principles

This skill embodies:

  • #1 Recycle → Extend → Re-Use → Create - Reuses wave patterns, checkpoint state, and quality gates
  • #2 First Principles - Understands task decomposition fundamentals before orchestrating
  • #3 Keep It Simple - Minimal waves, maximum parallelism within each wave
  • #4 Separation of Concerns - Each agent handles one bounded context
  • #5 Eliminate Ambiguity - Clear dependency graph, explicit quality criteria
  • #6 Clear, Understandable, Explainable - Transparent orchestration state and progress tracking
  • #7 Validate Continuously - Quality gates between waves ensure early error detection