Agent Skills Framework Extension
Orchestrator Code Review Patterns Skill
When to Use This Skill
Use this skill when implementing orchestrator code review patterns patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
ADR compliance validation, multi-agent review coordination, and cross-cutting concerns analysis for architectural consistency.
Core Capabilities
- ADR Compliance - Validate against architectural decision records
- Multi-Agent Coordination - Orchestrate specialized review agents
- Cross-Cutting Concerns - Security, logging, monitoring, error handling
- Architectural Boundaries - Layer separation, dependency direction
- Technology Stack Validation - Approved libraries, versions, patterns
ADR Compliance Validator
# scripts/adr_validator.py
from typing import List, Dict, Optional
from dataclasses import dataclass
from pathlib import Path
import re
import yaml
@dataclass
class ADRViolation:
"""ADR compliance violation."""
adr_id: str
adr_title: str
file: str
line: int
violation_type: str
message: str
severity: str # 'blocker', 'critical', 'major', 'minor'
@dataclass
class ADR:
"""Architecture Decision Record."""
id: str
title: str
status: str # 'accepted', 'proposed', 'deprecated', 'superseded'
decision: str
consequences: List[str]
constraints: List[Dict[str, str]]
validation_rules: List[Dict[str, str]]
class ADRComplianceValidator:
"""Validate code changes against ADRs."""
def __init__(self, adr_path: str):
self.adr_path = Path(adr_path)
self.adrs = self._load_adrs()
def validate(self, changed_files: List[str]) -> List[ADRViolation]:
"""Validate changed files against all ADRs."""
violations = []
for file_path in changed_files:
for adr in self.adrs:
if adr.status != 'accepted':
continue
file_violations = self._validate_against_adr(file_path, adr)
violations.extend(file_violations)
return violations
def _load_adrs(self) -> List[ADR]:
"""Load all ADR files."""
adrs = []
for adr_file in self.adr_path.glob('ADR-*.md'):
adr = self._parse_adr(adr_file)
if adr:
adrs.append(adr)
return adrs
def _parse_adr(self, adr_file: Path) -> Optional[ADR]:
"""Parse ADR markdown file."""
with open(adr_file, 'r') as f:
content = f.read()
# Extract frontmatter
if content.startswith('---'):
parts = content.split('---', 2)
if len(parts) >= 3:
frontmatter = yaml.safe_load(parts[1])
body = parts[2]
return ADR(
id=frontmatter.get('id', ''),
title=frontmatter.get('title', ''),
status=frontmatter.get('status', 'proposed'),
decision=frontmatter.get('decision', ''),
consequences=frontmatter.get('consequences', []),
constraints=frontmatter.get('constraints', []),
validation_rules=frontmatter.get('validation_rules', [])
)
return None
def _validate_against_adr(self, file_path: str, adr: ADR) -> List[ADRViolation]:
"""Validate file against specific ADR."""
violations = []
with open(file_path, 'r') as f:
content = f.read()
lines = content.splitlines()
for rule in adr.validation_rules:
rule_type = rule.get('type')
if rule_type == 'forbidden_import':
violations.extend(
self._check_forbidden_import(file_path, content, lines, adr, rule)
)
elif rule_type == 'required_pattern':
violations.extend(
self._check_required_pattern(file_path, content, lines, adr, rule)
)
elif rule_type == 'technology_constraint':
violations.extend(
self._check_technology_constraint(file_path, content, lines, adr, rule)
)
elif rule_type == 'layer_boundary':
violations.extend(
self._check_layer_boundary(file_path, content, lines, adr, rule)
)
return violations
def _check_forbidden_import(
self,
file_path: str,
content: str,
lines: List[str],
adr: ADR,
rule: Dict
) -> List[ADRViolation]:
"""Check for forbidden imports."""
violations = []
forbidden_pattern = rule.get('pattern', '')
for i, line in enumerate(lines, 1):
if re.search(forbidden_pattern, line):
violations.append(ADRViolation(
adr_id=adr.id,
adr_title=adr.title,
file=file_path,
line=i,
violation_type='forbidden_import',
message=rule.get('message', f'Forbidden import: {forbidden_pattern}'),
severity=rule.get('severity', 'major')
))
return violations
def _check_required_pattern(
self,
file_path: str,
content: str,
lines: List[str],
adr: ADR,
rule: Dict
) -> List[ADRViolation]:
"""Check for required patterns."""
violations = []
required_pattern = rule.get('pattern', '')
context = rule.get('context', '') # Where pattern should appear
# Check if required pattern is present in appropriate context
if context and context in file_path:
if not re.search(required_pattern, content):
violations.append(ADRViolation(
adr_id=adr.id,
adr_title=adr.title,
file=file_path,
line=1,
violation_type='missing_required_pattern',
message=rule.get('message', f'Missing required pattern: {required_pattern}'),
severity=rule.get('severity', 'major')
))
return violations
def _check_layer_boundary(
self,
file_path: str,
content: str,
lines: List[str],
adr: ADR,
rule: Dict
) -> List[ADRViolation]:
"""Check for layer boundary violations."""
violations = []
# Example: Infrastructure layer should not import from Presentation layer
source_layer = rule.get('source_layer', '')
forbidden_layer = rule.get('forbidden_layer', '')
if source_layer in file_path:
for i, line in enumerate(lines, 1):
if f'from {forbidden_layer}' in line or f'import {forbidden_layer}' in line:
violations.append(ADRViolation(
adr_id=adr.id,
adr_title=adr.title,
file=file_path,
line=i,
violation_type='layer_boundary_violation',
message=f'{source_layer} cannot import from {forbidden_layer}',
severity='blocker'
))
return violations
# Example ADR with validation rules
"""
---
id: ADR-002
title: Database Access Layer Pattern
status: accepted
decision: All database access must go through Repository pattern
consequences:
- Improved testability via repository mocking
- Clear separation between business logic and data access
constraints:
- No direct ORM usage in services
- All queries in repository classes
validation_rules:
- type: forbidden_import
pattern: 'from sqlalchemy import'
context: 'services/'
message: 'Services must not directly import SQLAlchemy'
severity: blocker
- type: required_pattern
pattern: 'class \w+Repository'
context: 'repositories/'
message: 'Repository files must define Repository class'
severity: major
- type: layer_boundary
source_layer: 'services'
forbidden_layer: 'models'
message: 'Services cannot directly access ORM models'
severity: blocker
---
"""
# Usage
validator = ADRComplianceValidator('docs/adrs/')
violations = validator.validate(['src/services/user_service.py'])
if violations:
print(f"Found {len(violations)} ADR violations:\n")
for v in violations:
print(f"[{v.severity}] {v.adr_id}: {v.adr_title}")
print(f" {v.file}:{v.line}")
print(f" {v.message}\n")
Multi-Agent Review Orchestration
# scripts/multi_agent_review.py
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
class ReviewAgent(Enum):
"""Specialized review agents."""
SECURITY = "security-specialist"
PERFORMANCE = "performance-engineer"
ARCHITECTURE = "software-architect"
TESTING = "qa-specialist"
DOCUMENTATION = "technical-writer"
ACCESSIBILITY = "accessibility-expert"
@dataclass
class AgentReview:
"""Review from a specialized agent."""
agent: ReviewAgent
score: float # 0-100
comments: List[Dict]
blocking_issues: List[Dict]
recommendations: List[str]
class MultiAgentReviewOrchestrator:
"""Coordinate multiple specialized review agents."""
def __init__(self):
self.agent_assignments = self._configure_agents()
def _configure_agents(self) -> Dict[str, List[ReviewAgent]]:
"""Configure which agents review which file types."""
return {
'**/*.py': [
ReviewAgent.SECURITY,
ReviewAgent.PERFORMANCE,
ReviewAgent.TESTING
],
'**/api/**': [
ReviewAgent.SECURITY,
ReviewAgent.ARCHITECTURE,
ReviewAgent.DOCUMENTATION
],
'**/models/**': [
ReviewAgent.ARCHITECTURE,
ReviewAgent.PERFORMANCE
],
'**/tests/**': [
ReviewAgent.TESTING
],
'**/*.md': [
ReviewAgent.DOCUMENTATION,
ReviewAgent.ACCESSIBILITY
]
}
async def orchestrate_review(
self,
changed_files: List[str],
pr_metadata: Dict
) -> Dict[str, List[AgentReview]]:
"""Orchestrate multi-agent review."""
reviews = {}
for file_path in changed_files:
# Determine which agents should review this file
assigned_agents = self._assign_agents(file_path)
file_reviews = []
for agent in assigned_agents:
review = await self._invoke_agent(agent, file_path, pr_metadata)
file_reviews.append(review)
reviews[file_path] = file_reviews
return reviews
def _assign_agents(self, file_path: str) -> List[ReviewAgent]:
"""Assign agents based on file path patterns."""
import fnmatch
agents = set()
for pattern, pattern_agents in self.agent_assignments.items():
if fnmatch.fnmatch(file_path, pattern):
agents.update(pattern_agents)
return list(agents)
async def _invoke_agent(
self,
agent: ReviewAgent,
file_path: str,
metadata: Dict
) -> AgentReview:
"""Invoke specialized agent for review."""
# This would call the actual agent via API or message queue
# For demonstration, returning mock review
if agent == ReviewAgent.SECURITY:
return await self._security_review(file_path, metadata)
elif agent == ReviewAgent.PERFORMANCE:
return await self._performance_review(file_path, metadata)
elif agent == ReviewAgent.ARCHITECTURE:
return await self._architecture_review(file_path, metadata)
# ... other agents
async def _security_review(self, file_path: str, metadata: Dict) -> AgentReview:
"""Security-focused review."""
# Run SAST tools, check for vulnerabilities
comments = []
blocking = []
# Check for hardcoded secrets
# Check for SQL injection
# Check for XSS vulnerabilities
# etc.
return AgentReview(
agent=ReviewAgent.SECURITY,
score=85.0,
comments=comments,
blocking_issues=blocking,
recommendations=[
'Add input validation for user-provided data',
'Use parameterized queries for database access'
]
)
def aggregate_reviews(
self,
reviews: Dict[str, List[AgentReview]]
) -> Dict:
"""Aggregate multi-agent reviews into final decision."""
total_score = 0
total_count = 0
all_blocking = []
all_recommendations = []
for file_reviews in reviews.values():
for review in file_reviews:
total_score += review.score
total_count += 1
all_blocking.extend(review.blocking_issues)
all_recommendations.extend(review.recommendations)
overall_score = total_score / total_count if total_count > 0 else 0
# Determine approval status
if all_blocking:
status = 'CHANGES_REQUIRED'
elif overall_score >= 80:
status = 'APPROVED'
elif overall_score >= 60:
status = 'APPROVED_WITH_COMMENTS'
else:
status = 'CHANGES_REQUESTED'
return {
'overall_score': overall_score,
'status': status,
'blocking_issues': all_blocking,
'recommendations': list(set(all_recommendations)),
'agent_reviews': reviews
}
# Usage
orchestrator = MultiAgentReviewOrchestrator()
reviews = await orchestrator.orchestrate_review(
changed_files=['src/api/auth.py', 'src/models/user.py'],
pr_metadata={'author': 'dev', 'branch': 'feature/auth'}
)
result = orchestrator.aggregate_reviews(reviews)
print(f"Overall Score: {result['overall_score']:.1f}/100")
print(f"Status: {result['status']}")
Cross-Cutting Concerns Validator
// tools/cross-cutting-validator.ts
interface CrossCuttingConcern {
name: string;
required: boolean;
validator: (file: string, content: string) => Promise<ValidationResult[]>;
}
interface ValidationResult {
concern: string;
passed: boolean;
file: string;
line?: number;
message: string;
}
const CROSS_CUTTING_CONCERNS: CrossCuttingConcern[] = [
{
name: 'Error Handling',
required: true,
validator: async (file, content) => {
// Check for try-catch blocks, error handling
const hasTryCatch = /try\s*{[\s\S]*?}\s*catch/.test(content);
const hasErrorCheck = /if\s*\(\s*error/.test(content);
if (!hasTryCatch && !hasErrorCheck) {
return [{
concern: 'Error Handling',
passed: false,
file,
message: 'No error handling found in async operations'
}];
}
return [{ concern: 'Error Handling', passed: true, file, message: 'OK' }];
}
},
{
name: 'Logging',
required: true,
validator: async (file, content) => {
const hasLogging = /logger\.(debug|info|warn|error)/.test(content);
if (!hasLogging && file.includes('services/')) {
return [{
concern: 'Logging',
passed: false,
file,
message: 'Service files should include logging statements'
}];
}
return [{ concern: 'Logging', passed: true, file, message: 'OK' }];
}
},
{
name: 'Authentication',
required: false,
validator: async (file, content) => {
if (!file.includes('api/')) {
return [{ concern: 'Authentication', passed: true, file, message: 'N/A' }];
}
const hasAuth = /@requires_auth|@login_required|authenticate/.test(content);
if (!hasAuth) {
return [{
concern: 'Authentication',
passed: false,
file,
message: 'API endpoint missing authentication decorator'
}];
}
return [{ concern: 'Authentication', passed: true, file, message: 'OK' }];
}
},
{
name: 'Input Validation',
required: true,
validator: async (file, content) => {
if (!file.includes('api/') && !file.includes('services/')) {
return [{ concern: 'Input Validation', passed: true, file, message: 'N/A' }];
}
const hasValidation = /validate|schema\.load|pydantic/.test(content);
if (!hasValidation) {
return [{
concern: 'Input Validation',
passed: false,
file,
message: 'Missing input validation for user data'
}];
}
return [{ concern: 'Input Validation', passed: true, file, message: 'OK' }];
}
}
];
class CrossCuttingValidator {
async validate(files: string[]): Promise<Map<string, ValidationResult[]>> {
const results = new Map<string, ValidationResult[]>();
for (const file of files) {
const content = await this.readFile(file);
const fileResults: ValidationResult[] = [];
for (const concern of CROSS_CUTTING_CONCERNS) {
const concernResults = await concern.validator(file, content);
fileResults.push(...concernResults);
}
results.set(file, fileResults);
}
return results;
}
generateReport(results: Map<string, ValidationResult[]>): string {
let report = '# Cross-Cutting Concerns Validation\n\n';
let passCount = 0;
let failCount = 0;
for (const [file, fileResults] of results) {
const failed = fileResults.filter(r => !r.passed);
if (failed.length > 0) {
report += `## ${file}\n\n`;
for (const result of failed) {
report += `- ❌ **${result.concern}**: ${result.message}\n`;
failCount++;
}
report += '\n';
} else {
passCount++;
}
}
report += `\n## Summary\n\n`;
report += `- ✅ Files passing all checks: ${passCount}\n`;
report += `- ❌ Files with issues: ${failCount}\n`;
return report;
}
}
Usage Examples
Validate ADR Compliance
Apply orchestrator-code-review-patterns skill to validate PR changes against all accepted ADRs
Orchestrate Multi-Agent Review
Apply orchestrator-code-review-patterns skill to coordinate security, performance, and architecture reviews
Check Cross-Cutting Concerns
Apply orchestrator-code-review-patterns skill to validate error handling, logging, and auth in all API files
Integration Points
- code-review-patterns - Base review methodology
- comprehensive-review-patterns - Multi-dimensional analysis
- software-design-patterns - Pattern compliance validation
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: orchestrator-code-review-patterns
Completed:
- [x] ADR compliance validated against all accepted ADRs
- [x] Multi-agent reviews coordinated (security, performance, architecture)
- [x] Cross-cutting concerns verified (error handling, logging, auth)
- [x] Layer boundaries validated for architectural integrity
Outputs:
- ADR compliance report with violations classified by severity
- Aggregated multi-agent review with overall score and status
- Cross-cutting concerns validation report with pass/fail results
- Consolidated recommendations prioritized by business impact
Completion Checklist
Before marking this skill as complete, verify:
- All accepted ADRs loaded and parsed successfully
- Changed files validated against all relevant ADR rules
- Multi-agent reviews completed (security, performance, architecture, testing)
- Cross-cutting concerns checked (error handling, logging, authentication, input validation)
- Layer boundary violations detected (if applicable)
- Overall review score calculated and approval status determined
- Blocking issues flagged and prioritized
- Review report generated with actionable recommendations
Failure Indicators
This skill has FAILED if:
- ❌ ADR files not found or unable to parse frontmatter
- ❌ Validation rules in ADRs not executed
- ❌ Multi-agent orchestration failed (agents not invoked)
- ❌ Cross-cutting concerns validation incomplete
- ❌ No violations reported despite clear ADR non-compliance
- ❌ Review report missing severity classifications or recommendations
- ❌ Overall score calculation incorrect or approval status missing
When NOT to Use
Do NOT use this skill when:
- No ADRs exist in the project (establish architecture decisions first)
- Reviewing non-code changes (documentation, config only)
- Single-file review with no architectural implications (use code-review-patterns instead)
- ADRs are all in "proposed" status (not yet accepted)
- No multi-agent coordination needed (use specialized review skills directly)
- Cross-cutting concerns not applicable (pure data scripts, config files)
- Team hasn't agreed on layer boundaries or architecture patterns
- Review scope is exploratory (spike/POC without architectural constraints)
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Skipping ADR validation | Architectural drift over time | Always validate against accepted ADRs |
| Single-agent review only | Misses multi-dimensional issues | Coordinate security, performance, architecture agents |
| Ignoring cross-cutting concerns | Silent failures in production | Mandate error handling, logging, auth checks |
| Generic violation messages | Not actionable | Provide file:line references and fix suggestions |
| No severity classification | All issues treated equally | Prioritize by blocker/critical/major/minor |
| Manual agent coordination | Inconsistent coverage | Automate agent assignment by file patterns |
| Missing aggregation | Conflicting recommendations | Consolidate and prioritize across agents |
| One-time review | Drift continues | Integrate into CI/CD pipeline |
Principles
This skill embodies:
- #1 First Principles - Validate WHY architecture decisions exist before enforcing
- #3 Separation of Concerns - Orchestrate specialized agents, don't duplicate logic
- #5 Eliminate Ambiguity - Clear ADR compliance with specific violations
- #6 Clear, Understandable, Explainable - Multi-agent reviews aggregated into actionable report
- #8 No Assumptions - Verify actual code against ADR rules, not documentation claims
- #9 Keep It Simple - Coordinate agents efficiently without over-engineering
Full Standard: CODITECT-STANDARD-AUTOMATION.md