Auto Trigger Configuration
Agentic Orchestrator Skill
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
Expert skill for coordinating multiple specialized agents to complete complex workflows. Provides patterns for task decomposition, parallel execution, state management, and result synthesis.
When to Use
Use this skill when:
- Task requires multiple specialized agents
- Work can be parallelized across agents
- Need checkpoint/resume capabilities
- Quality gates between phases are required
- Results from multiple agents need synthesis
- Strategy briefs, research reports, or complex implementations
Don't use this skill when:
- Single agent can complete the task
- Simple sequential workflow suffices
- No inter-agent dependencies exist
- Task is research-only without synthesis
Core Patterns
1. Wave Execution Architecture
Execute agents in waves - parallel within waves, sequential between waves.
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
import asyncio
import json
from datetime import datetime, timezone
class ExecutionStatus(Enum):
"""Agent execution status"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class AgentTask:
"""Task for a single agent"""
agent_name: str
prompt: str
depends_on: List[str] = field(default_factory=list)
max_tokens: int = 8000
priority: int = 0
timeout: float = 300.0
@dataclass
class AgentResult:
"""Result from agent execution"""
agent_name: str
status: ExecutionStatus
output: Optional[str] = None
error: Optional[str] = None
tokens_used: int = 0
duration_seconds: float = 0.0
class WaveOrchestrator:
"""
Execute agents in waves with dependency resolution.
Wave 1: Independent research agents (parallel)
Wave 2: Framework/analysis agents (depend on Wave 1)
Wave 3: Synthesis agent (depends on all)
"""
def __init__(self, max_parallel: int = 5):
self.max_parallel = max_parallel
self.results: Dict[str, AgentResult] = {}
self.state_file = ".coditect/checkpoints/orchestration-state.json"
def plan_waves(self, tasks: List[AgentTask]) -> List[List[AgentTask]]:
"""
Organize tasks into waves based on dependencies.
Returns list of waves, each wave contains tasks that can run in parallel.
"""
waves = []
completed = set()
remaining = list(tasks)
while remaining:
# Find tasks with all dependencies satisfied
ready = [
t for t in remaining
if all(d in completed for d in t.depends_on)
]
if not ready:
# Circular dependency or missing task
raise ValueError(
f"Cannot schedule remaining tasks: {[t.agent_name for t in remaining]}"
)
# Sort by priority within wave
ready.sort(key=lambda t: t.priority, reverse=True)
waves.append(ready)
# Update tracking
for task in ready:
completed.add(task.agent_name)
remaining.remove(task)
return waves
async def execute_wave(
self,
wave: List[AgentTask],
executor: Callable[[AgentTask], AgentResult]
) -> Dict[str, AgentResult]:
"""Execute all tasks in a wave in parallel."""
semaphore = asyncio.Semaphore(self.max_parallel)
async def bounded_execute(task: AgentTask) -> AgentResult:
async with semaphore:
return await executor(task)
# Execute all tasks in parallel
coros = [bounded_execute(task) for task in wave]
results = await asyncio.gather(*coros, return_exceptions=True)
# Process results
wave_results = {}
for task, result in zip(wave, results):
if isinstance(result, Exception):
wave_results[task.agent_name] = AgentResult(
agent_name=task.agent_name,
status=ExecutionStatus.FAILED,
error=str(result)
)
else:
wave_results[task.agent_name] = result
return wave_results
async def execute_all(
self,
tasks: List[AgentTask],
executor: Callable[[AgentTask], AgentResult],
checkpoint_after_wave: bool = True
) -> Dict[str, AgentResult]:
"""
Execute all tasks in waves.
Args:
tasks: List of agent tasks to execute
executor: Function to execute a single task
checkpoint_after_wave: Save state after each wave
Returns:
Dictionary of agent name -> result
"""
waves = self.plan_waves(tasks)
all_results = {}
for wave_num, wave in enumerate(waves, 1):
print(f"Executing Wave {wave_num}/{len(waves)}: "
f"{[t.agent_name for t in wave]}")
# Execute wave
wave_results = await self.execute_wave(wave, executor)
all_results.update(wave_results)
self.results.update(wave_results)
# Check for failures
failures = [
name for name, result in wave_results.items()
if result.status == ExecutionStatus.FAILED
]
if failures:
print(f"Wave {wave_num} had failures: {failures}")
# Continue with successful results for now
# Checkpoint
if checkpoint_after_wave:
self.save_checkpoint(wave_num)
return all_results
def save_checkpoint(self, wave_num: int):
"""Save current state for resume capability."""
checkpoint = {
"wave_completed": wave_num,
"timestamp": datetime.now(timezone.utc).isoformat(),
"results": {
name: {
"status": result.status.value,
"output_length": len(result.output or ""),
"tokens_used": result.tokens_used,
"error": result.error
}
for name, result in self.results.items()
}
}
with open(self.state_file, 'w') as f:
json.dump(checkpoint, f, indent=2)
def load_checkpoint(self) -> Optional[Dict]:
"""Load previous checkpoint for resume."""
try:
with open(self.state_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
2. Task Decomposition
Break complex tasks into agent-sized subtasks.
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum
class TaskCategory(Enum):
"""Categories for task routing"""
RESEARCH = "research"
ANALYSIS = "analysis"
IMPLEMENTATION = "implementation"
SYNTHESIS = "synthesis"
VALIDATION = "validation"
@dataclass
class DecomposedTask:
"""A decomposed subtask"""
id: str
category: TaskCategory
description: str
agent_type: str
prompt: str
depends_on: List[str]
priority: int
estimated_tokens: int
class TaskDecomposer:
"""
Decompose complex tasks into orchestratable subtasks.
Uses capability matching to route to appropriate agents.
"""
# Agent capabilities mapping
AGENT_CAPABILITIES = {
"market-researcher": {
"categories": [TaskCategory.RESEARCH],
"keywords": ["market", "size", "growth", "segment", "demand"],
"max_tokens": 8000
},
"competitive-analyst": {
"categories": [TaskCategory.RESEARCH, TaskCategory.ANALYSIS],
"keywords": ["competitor", "competitive", "moat", "positioning"],
"max_tokens": 10000
},
"trend-analyst": {
"categories": [TaskCategory.RESEARCH, TaskCategory.ANALYSIS],
"keywords": ["trend", "emerging", "disruption", "technology"],
"max_tokens": 8000
},
"framework-specialist": {
"categories": [TaskCategory.ANALYSIS],
"keywords": ["framework", "porter", "swot", "strategic"],
"max_tokens": 6000
},
"synthesis-writer": {
"categories": [TaskCategory.SYNTHESIS],
"keywords": ["synthesis", "brief", "report", "recommendation"],
"max_tokens": 12000
}
}
def decompose(
self,
task_description: str,
context: Dict[str, Any]
) -> List[DecomposedTask]:
"""
Decompose a task into subtasks for different agents.
Args:
task_description: High-level task description
context: Context like industry, geography, etc.
Returns:
List of decomposed subtasks
"""
subtasks = []
task_lower = task_description.lower()
# Determine required capabilities
required_agents = []
for agent, caps in self.AGENT_CAPABILITIES.items():
if any(kw in task_lower for kw in caps["keywords"]):
required_agents.append(agent)
# Default to full research workflow if strategy brief
if "strategy" in task_lower or "brief" in task_lower:
required_agents = list(self.AGENT_CAPABILITIES.keys())
# Create subtasks
priority = len(required_agents)
for agent in required_agents:
caps = self.AGENT_CAPABILITIES[agent]
# Determine dependencies
if agent == "synthesis-writer":
deps = [a for a in required_agents if a != "synthesis-writer"]
elif agent == "framework-specialist":
deps = ["market-researcher", "competitive-analyst"]
deps = [d for d in deps if d in required_agents]
else:
deps = []
subtask = DecomposedTask(
id=f"task_{agent}",
category=caps["categories"][0],
description=f"{agent} analysis for {context.get('industry', 'target')}",
agent_type=agent,
prompt=self._generate_prompt(agent, context),
depends_on=[f"task_{d}" for d in deps],
priority=priority,
estimated_tokens=caps["max_tokens"]
)
subtasks.append(subtask)
priority -= 1
return subtasks
def _generate_prompt(
self,
agent_type: str,
context: Dict[str, Any]
) -> str:
"""Generate agent-specific prompt."""
industry = context.get("industry", "the target industry")
geography = context.get("geography", "Global")
prompts = {
"market-researcher": f"""
Research the {industry} market in {geography}:
- Market size and growth rate (with sources)
- Key segments and characteristics
- Demand drivers and trends
- Value chain structure
Return findings in structured JSON format with sources and confidence scores.
""",
"competitive-analyst": f"""
Analyze top competitors in {industry}:
- Market leaders with revenue/share
- Business models and positioning
- Strengths, weaknesses, moats
- Recent strategic moves
Return competitor profiles in structured JSON format.
""",
"trend-analyst": f"""
Identify key trends affecting {industry}:
- Technology trends and disruption
- Regulatory changes
- Consumer behavior shifts
- Emerging opportunities and threats
Return trend analysis with timeline and impact assessment.
""",
"framework-specialist": f"""
Apply strategic frameworks to {industry}:
- Porter's Five Forces analysis
- SWOT for a new entrant
- Value chain analysis
- Strategic group mapping
Use insights from research and competitive analysis.
""",
"synthesis-writer": f"""
Synthesize all research into executive strategy brief for {industry}:
- Executive summary (300 words max)
- Key findings and insights
- Strategic recommendations (prioritized)
- Risk assessment
- Implementation roadmap
Combine all agent outputs into cohesive narrative.
"""
}
return prompts.get(agent_type, f"Analyze {industry}")
3. Result Synthesis
Combine outputs from multiple agents into unified result.
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import json
@dataclass
class SynthesisInput:
"""Input for synthesis from an agent"""
agent_name: str
category: str
content: str
confidence: float
sources: List[str]
@dataclass
class SynthesizedResult:
"""Final synthesized output"""
executive_summary: str
detailed_findings: Dict[str, Any]
recommendations: List[Dict[str, Any]]
sources: List[str]
confidence_score: float
quality_score: float
class ResultSynthesizer:
"""
Synthesize results from multiple agents into cohesive output.
Handles conflicting information, source aggregation, and
quality scoring.
"""
def synthesize(
self,
inputs: List[SynthesisInput],
context: Dict[str, Any]
) -> SynthesizedResult:
"""
Synthesize multiple agent outputs.
Args:
inputs: Results from individual agents
context: Original task context
Returns:
Synthesized result with quality metrics
"""
# Aggregate by category
by_category = {}
all_sources = []
confidences = []
for inp in inputs:
if inp.category not in by_category:
by_category[inp.category] = []
by_category[inp.category].append(inp)
all_sources.extend(inp.sources)
confidences.append(inp.confidence)
# Deduplicate sources
unique_sources = list(set(all_sources))
# Calculate overall confidence
avg_confidence = sum(confidences) / len(confidences) if confidences else 0.5
# Build findings structure
findings = {}
for category, category_inputs in by_category.items():
findings[category] = {
"content": [i.content for i in category_inputs],
"agent_count": len(category_inputs),
"avg_confidence": sum(i.confidence for i in category_inputs) / len(category_inputs)
}
# Generate executive summary
exec_summary = self._generate_executive_summary(findings, context)
# Extract recommendations
recommendations = self._extract_recommendations(inputs)
# Calculate quality score
quality = self._calculate_quality(
inputs=inputs,
unique_sources=unique_sources,
recommendations=recommendations
)
return SynthesizedResult(
executive_summary=exec_summary,
detailed_findings=findings,
recommendations=recommendations,
sources=unique_sources,
confidence_score=avg_confidence,
quality_score=quality
)
def _generate_executive_summary(
self,
findings: Dict[str, Any],
context: Dict[str, Any]
) -> str:
"""Generate executive summary from findings."""
industry = context.get("industry", "the market")
summary_parts = [
f"## Executive Summary: {industry}\n"
]
if "research" in findings:
summary_parts.append(
f"**Market Overview:** Based on {findings['research']['agent_count']} "
f"research agents with {findings['research']['avg_confidence']:.0%} confidence.\n"
)
if "analysis" in findings:
summary_parts.append(
f"**Analysis:** Strategic frameworks applied by "
f"{findings['analysis']['agent_count']} analysts.\n"
)
return "\n".join(summary_parts)
def _extract_recommendations(
self,
inputs: List[SynthesisInput]
) -> List[Dict[str, Any]]:
"""Extract and prioritize recommendations."""
recommendations = []
priority = 1
for inp in inputs:
# Look for recommendation patterns in content
if "recommend" in inp.content.lower():
recommendations.append({
"priority": priority,
"source": inp.agent_name,
"confidence": inp.confidence,
"summary": f"Recommendation from {inp.agent_name}"
})
priority += 1
return recommendations
def _calculate_quality(
self,
inputs: List[SynthesisInput],
unique_sources: List[str],
recommendations: List[Dict[str, Any]]
) -> float:
"""Calculate quality score (0-1)."""
score = 0.0
total_checks = 5
# Check 1: Multiple agents contributed
if len(inputs) >= 3:
score += 1
elif len(inputs) >= 2:
score += 0.5
# Check 2: Sources provided
if len(unique_sources) >= 10:
score += 1
elif len(unique_sources) >= 5:
score += 0.5
# Check 3: Recommendations generated
if len(recommendations) >= 3:
score += 1
elif len(recommendations) >= 1:
score += 0.5
# Check 4: High confidence
avg_conf = sum(i.confidence for i in inputs) / len(inputs) if inputs else 0
if avg_conf >= 0.8:
score += 1
elif avg_conf >= 0.6:
score += 0.5
# Check 5: Multiple categories covered
categories = set(i.category for i in inputs)
if len(categories) >= 3:
score += 1
elif len(categories) >= 2:
score += 0.5
return score / total_checks
4. Quality Gates
Enforce quality standards between orchestration phases.
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
class GateDecision(Enum):
"""Quality gate decision"""
PASS = "pass"
FAIL = "fail"
CONDITIONAL = "conditional"
@dataclass
class GateResult:
"""Result of quality gate check"""
decision: GateDecision
score: float
threshold: float
checks_passed: List[str]
checks_failed: List[str]
recommendations: List[str]
class QualityGate:
"""
Quality gates between orchestration phases.
Ensures outputs meet standards before proceeding.
"""
def __init__(
self,
name: str,
threshold: float = 0.75,
required_checks: List[str] = None
):
self.name = name
self.threshold = threshold
self.required_checks = required_checks or []
self.checks: Dict[str, Callable] = {}
def add_check(
self,
name: str,
check_fn: Callable[[Any], bool],
required: bool = False
):
"""Add a quality check."""
self.checks[name] = check_fn
if required and name not in self.required_checks:
self.required_checks.append(name)
def evaluate(self, data: Any) -> GateResult:
"""
Evaluate data against quality gate.
Args:
data: Data to evaluate
Returns:
GateResult with pass/fail decision
"""
passed = []
failed = []
recommendations = []
for check_name, check_fn in self.checks.items():
try:
if check_fn(data):
passed.append(check_name)
else:
failed.append(check_name)
if check_name in self.required_checks:
recommendations.append(
f"Required check '{check_name}' failed - must be fixed"
)
except Exception as e:
failed.append(check_name)
recommendations.append(f"Check '{check_name}' error: {str(e)}")
# Calculate score
total = len(passed) + len(failed)
score = len(passed) / total if total > 0 else 0
# Determine decision
required_failed = [c for c in failed if c in self.required_checks]
if required_failed:
decision = GateDecision.FAIL
recommendations.append(
f"Failed required checks: {required_failed}"
)
elif score >= self.threshold:
decision = GateDecision.PASS
else:
decision = GateDecision.CONDITIONAL
recommendations.append(
f"Score {score:.1%} below threshold {self.threshold:.1%}"
)
return GateResult(
decision=decision,
score=score,
threshold=self.threshold,
checks_passed=passed,
checks_failed=failed,
recommendations=recommendations
)
# Pre-built gates for strategy brief workflow
def create_research_gate() -> QualityGate:
"""Create quality gate for research phase."""
gate = QualityGate("research", threshold=0.75)
gate.add_check(
"has_market_size",
lambda d: "market" in str(d).lower() and any(
c in str(d) for c in ["$", "billion", "million", "%"]
),
required=True
)
gate.add_check(
"has_sources",
lambda d: "source" in str(d).lower() or "http" in str(d).lower()
)
gate.add_check(
"sufficient_length",
lambda d: len(str(d)) >= 2000
)
return gate
def create_synthesis_gate() -> QualityGate:
"""Create quality gate for synthesis phase."""
gate = QualityGate("synthesis", threshold=0.85)
gate.add_check(
"has_executive_summary",
lambda d: "executive summary" in str(d).lower(),
required=True
)
gate.add_check(
"has_recommendations",
lambda d: "recommend" in str(d).lower(),
required=True
)
gate.add_check(
"has_structure",
lambda d: str(d).count("##") >= 3
)
gate.add_check(
"appropriate_length",
lambda d: 5000 <= len(str(d)) <= 50000
)
return gate
Usage Examples
Strategy Brief Orchestration
# Create orchestrator
orchestrator = WaveOrchestrator(max_parallel=3)
# Define tasks
tasks = [
AgentTask(
agent_name="market-researcher",
prompt="Research AI development tools market...",
priority=3
),
AgentTask(
agent_name="competitive-analyst",
prompt="Analyze top competitors...",
priority=3
),
AgentTask(
agent_name="trend-analyst",
prompt="Identify key trends...",
priority=3
),
AgentTask(
agent_name="framework-specialist",
prompt="Apply strategic frameworks...",
depends_on=["market-researcher", "competitive-analyst"],
priority=2
),
AgentTask(
agent_name="synthesis-writer",
prompt="Create executive strategy brief...",
depends_on=["market-researcher", "competitive-analyst",
"trend-analyst", "framework-specialist"],
priority=1
),
]
# Execute with checkpointing
results = await orchestrator.execute_all(
tasks=tasks,
executor=execute_agent, # Your agent execution function
checkpoint_after_wave=True
)
# Synthesize results
synthesizer = ResultSynthesizer()
final = synthesizer.synthesize(
inputs=[...], # Convert results to SynthesisInput
context={"industry": "AI Development Tools"}
)
print(f"Quality Score: {final.quality_score:.1%}")
With Quality Gates
# Create gates
research_gate = create_research_gate()
synthesis_gate = create_synthesis_gate()
# After Wave 1 (research)
research_results = await orchestrator.execute_wave(wave1, executor)
gate_result = research_gate.evaluate(research_results)
if gate_result.decision == GateDecision.FAIL:
print(f"Research failed quality gate: {gate_result.recommendations}")
# Handle failure - retry or abort
# After synthesis
synthesis_result = await orchestrator.execute_wave(synthesis_wave, executor)
final_gate = synthesis_gate.evaluate(synthesis_result)
if final_gate.decision == GateDecision.PASS:
print(f"Strategy brief passed all quality checks: {final_gate.score:.1%}")
Best Practices
DO
- Plan waves carefully - Minimize wave count, maximize parallelism
- Use checkpoints - Enable resume after failures
- Set appropriate timeouts - Prevent hung agents
- Validate between phases - Catch errors early
- Track metrics - Monitor token usage and success rates
- Handle partial failures - Continue with successful results when possible
- Deduplicate sources - Clean synthesis inputs
DON'T
- Don't create circular dependencies - Will deadlock
- Don't skip quality gates - Garbage in, garbage out
- Don't ignore failures - Log and handle appropriately
- Don't over-parallelize - Stay within API limits
- Don't hardcode prompts - Use templates with context
- Don't forget synthesis - Raw outputs need integration
Configuration Reference
| Parameter | Default | Description |
|---|---|---|
max_parallel | 5 | Maximum concurrent agents per wave |
checkpoint_dir | .coditect/checkpoints/ | Checkpoint storage |
research_threshold | 0.75 | Research quality gate threshold |
synthesis_threshold | 0.85 | Synthesis quality gate threshold |
default_timeout | 300s | Agent execution timeout |
max_retries | 2 | Retries per agent on failure |
Integration with CODITECT
Recommended integration points:
| Component | Usage | Notes |
|---|---|---|
| strategy-brief-generator | Primary orchestrator | Full workflow |
| orchestrator agent | General coordination | Task tool routing |
| workflow-orchestrator | Complex pipelines | State management |
| use-case-analyzer | Intent routing | Capability matching |
Success Metrics
| Metric | Target | Measurement |
|---|---|---|
| Wave efficiency | >80% parallel | Tasks running simultaneously |
| Quality gate pass rate | >90% | First-attempt passes |
| Checkpoint recovery | 100% | Resume from any wave |
| Token efficiency | <50K avg | Per strategy brief |
| End-to-end time | <10 min | Full orchestration |
Multi-Context Window Support
State Tracking (JSON)
{
"orchestration_id": "orch_20251221_strategy",
"status": "wave_2_complete",
"waves_completed": 2,
"waves_total": 3,
"results": {
"market-researcher": {"status": "completed", "tokens": 4521},
"competitive-analyst": {"status": "completed", "tokens": 6234},
"trend-analyst": {"status": "completed", "tokens": 3892},
"framework-specialist": {"status": "completed", "tokens": 4156}
},
"quality_scores": {
"research_gate": 0.87,
"analysis_gate": 0.82
},
"next_action": "Execute synthesis wave"
}
Progress Notes (Markdown)
# Orchestration Progress - Strategy Brief
## Status: Wave 2 Complete (67%)
### Completed
- [x] Wave 1: Research (3 agents, 14,647 tokens)
- [x] Wave 2: Analysis (1 agent, 4,156 tokens)
### Pending
- [ ] Wave 3: Synthesis (1 agent)
### Quality Gates
- Research: 87% ✓
- Analysis: 82% ✓
### Next Steps
1. Execute synthesis-writer with all inputs
2. Validate final output (85% threshold)
3. Generate strategy brief document
Session Recovery
When starting a fresh context window:
- Load checkpoint: Read
.coditect/checkpoints/orchestration-state.json - Resume from wave: Skip completed waves
- Inject prior results: Pass to dependent agents
- Continue execution: Complete remaining waves
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: agentic-orchestrator
Completed:
- [x] All waves executed (3/3 waves)
- [x] All agents completed successfully
- [x] Quality gates passed (research: 87%, synthesis: 91%)
- [x] Results synthesized into final output
Outputs:
- Strategy brief: docs/strategy-brief.md
- Orchestration log: .coditect/checkpoints/orchestration-state.json
- Quality score: 91% (above 85% threshold)
- Total tokens: 48,234 (within 50K budget)
- Execution time: 8m 24s (within 10min target)
Completion Checklist
Before marking this skill as complete, verify:
- All waves planned with correct dependencies
- All agents executed without failures
- Quality gates evaluated and passed (or conditionally passed)
- Results synthesized into cohesive output
- Checkpoint files saved for resume capability
- Token usage tracked and within budget
- Final quality score meets threshold
Failure Indicators
This skill has FAILED if:
- ❌ Circular dependencies detected during wave planning
- ❌ Critical agents failed without successful retry
- ❌ Quality gate failure with required checks not met
- ❌ Synthesis produced incomplete or malformed output
- ❌ Token budget exceeded by >20%
- ❌ Checkpoint corruption preventing resume
When NOT to Use
Do NOT use this skill when:
- Single agent can complete the task independently
- Task is purely research-only without synthesis requirement
- No inter-agent dependencies exist (simple parallel execution suffices)
- Real-time response required (<30s) - orchestration adds overhead
- Task requires human decision-making between steps
Use alternatives:
- Direct agent invocation for single-agent tasks
Task(subagent_type=...)for simple Claude Code built-in agent use- Manual coordination for tasks requiring human judgment
- Simple parallel execution without wave orchestration for independent tasks
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Creating circular dependencies | Deadlocks wave execution | Validate dependency graph before execution |
| Skipping quality gates | Garbage in, garbage out | Always evaluate between phases |
| Over-parallelization | API rate limit errors | Respect max_parallel limit (default: 5) |
| Ignoring partial failures | Cascade failures | Handle failures gracefully, continue with successful results |
| Hardcoding prompts | Context-specific failures | Use template functions with context injection |
| No checkpointing | Can't resume after failures | Save state after each wave |
| Missing synthesis | Raw unintegrated outputs | Always include synthesis agent for final integration |
Principles
This skill embodies:
- #1 Recycle → Extend → Re-Use → Create - Reuses wave patterns, checkpoint state, and quality gates
- #2 First Principles - Understands task decomposition fundamentals before orchestrating
- #3 Keep It Simple - Minimal waves, maximum parallelism within each wave
- #4 Separation of Concerns - Each agent handles one bounded context
- #5 Eliminate Ambiguity - Clear dependency graph, explicit quality criteria
- #6 Clear, Understandable, Explainable - Transparent orchestration state and progress tracking
- #7 Validate Continuously - Quality gates between waves ensure early error detection