#!/usr/bin/env python3 """ CODITECT Task Specification Validator
Validates tasks in PILOT plan against CODITECT-STD-003 (ADR-115).
Usage: python3 scripts/validate-pilot-tasks.py [FILE] python3 scripts/validate-pilot-tasks.py --all python3 scripts/validate-pilot-tasks.py --stats
Examples: # Validate specific file python3 scripts/validate-pilot-tasks.py internal/project/plans/PILOT-PARALLEL-EXECUTION-PLAN.md
# Validate all plan files
python3 scripts/validate-pilot-tasks.py --all
# Show task statistics
python3 scripts/validate-pilot-tasks.py --stats
"""
import argparse import re import sys import json from pathlib import Path from typing import Optional from dataclasses import dataclass, field
Try to import yaml, fall back to basic parsing if not available
try: import yaml YAML_AVAILABLE = True except ImportError: YAML_AVAILABLE = False print("Warning: PyYAML not installed. Using basic validation only.", file=sys.stderr)
@dataclass class ValidationError: """Represents a validation error.""" task_id: str field: str message: str severity: str = "error" # error, warning, info
@dataclass class TaskSpec: """Parsed task specification.""" task_id: str title: str status: str track: str agent_name: str agent_model: Optional[str] = None priority: str = "P2" domain: Optional[str] = None references: list = field(default_factory=list) dependencies: dict = field(default_factory=dict) constraints: list = field(default_factory=list) acceptance_criteria: list = field(default_factory=list) verification: Optional[dict] = None estimated_hours: Optional[float] = None tags: list = field(default_factory=list) raw_yaml: dict = field(default_factory=dict) line_number: int = 0
Validation constants
VALID_STATUSES = {"pending", "in_progress", "complete", "blocked", "cancelled"} VALID_TRACKS = set("ABCDEFGHIJKLMN") VALID_PRIORITIES = {"P0", "P1", "P2", "P3"} VALID_DOMAINS = {"backend", "frontend", "devops", "security", "testing", "docs", "research"} VALID_REF_TYPES = {"adr", "spec", "doc", "code", "url"}
TASK_ID_PATTERN = re.compile(r'^[A-N].\d+.\d+(.\d+)?$') FRONTMATTER_PATTERN = re.compile(r'^---\n(.*?)\n---', re.DOTALL | re.MULTILINE)
def parse_yaml_frontmatter(content: str) -> list[tuple[dict, int]]: """Extract all YAML frontmatter blocks from content.""" tasks = []
if not YAML_AVAILABLE:
return tasks
# Find all frontmatter blocks
lines = content.split('\n')
in_frontmatter = False
frontmatter_start = 0
frontmatter_lines = []
for i, line in enumerate(lines):
if line.strip() == '---':
if not in_frontmatter:
in_frontmatter = True
frontmatter_start = i + 1
frontmatter_lines = []
else:
# End of frontmatter
in_frontmatter = False
try:
yaml_content = '\n'.join(frontmatter_lines)
data = yaml.safe_load(yaml_content)
if data and isinstance(data, dict) and 'task_id' in data:
tasks.append((data, frontmatter_start))
except yaml.YAMLError as e:
print(f"Warning: YAML parse error at line {frontmatter_start}: {e}", file=sys.stderr)
elif in_frontmatter:
frontmatter_lines.append(line)
return tasks
def validate_task(task_data: dict, line_number: int) -> list[ValidationError]: """Validate a single task specification.""" errors = [] task_id = task_data.get('task_id', 'UNKNOWN')
# Required field checks
required_fields = ['task_id', 'title', 'status', 'track', 'agent']
for field_name in required_fields:
if field_name not in task_data:
errors.append(ValidationError(
task_id=task_id,
field=field_name,
message=f"Missing required field: {field_name}"
))
# task_id format
if 'task_id' in task_data:
if not TASK_ID_PATTERN.match(task_data['task_id']):
errors.append(ValidationError(
task_id=task_id,
field='task_id',
message=f"Invalid task_id format: {task_data['task_id']}. Expected: Track.Section.Task[.Subtask]"
))
# title length
if 'title' in task_data:
if len(task_data['title']) > 100:
errors.append(ValidationError(
task_id=task_id,
field='title',
message=f"Title too long ({len(task_data['title'])} chars). Maximum: 100"
))
# status validation
if 'status' in task_data:
if task_data['status'] not in VALID_STATUSES:
errors.append(ValidationError(
task_id=task_id,
field='status',
message=f"Invalid status: {task_data['status']}. Valid: {VALID_STATUSES}"
))
# track validation
if 'track' in task_data:
if task_data['track'] not in VALID_TRACKS:
errors.append(ValidationError(
task_id=task_id,
field='track',
message=f"Invalid track: {task_data['track']}. Valid: {VALID_TRACKS}"
))
# Check track matches task_id prefix
if 'task_id' in task_data and task_data['task_id'][0] != task_data['track']:
errors.append(ValidationError(
task_id=task_id,
field='track',
message=f"Track '{task_data['track']}' doesn't match task_id prefix '{task_data['task_id'][0]}'"
))
# agent validation
if 'agent' in task_data:
agent = task_data['agent']
if isinstance(agent, dict):
if 'name' not in agent:
errors.append(ValidationError(
task_id=task_id,
field='agent.name',
message="Missing required field: agent.name"
))
else:
errors.append(ValidationError(
task_id=task_id,
field='agent',
message="Agent must be an object with 'name' field"
))
# acceptance_criteria validation
if 'acceptance_criteria' not in task_data or not task_data['acceptance_criteria']:
errors.append(ValidationError(
task_id=task_id,
field='acceptance_criteria',
message="At least one acceptance criterion required"
))
else:
for i, criterion in enumerate(task_data['acceptance_criteria']):
if not isinstance(criterion, dict):
errors.append(ValidationError(
task_id=task_id,
field=f'acceptance_criteria[{i}]',
message="Each criterion must be an object with 'criterion' field"
))
elif 'criterion' not in criterion:
errors.append(ValidationError(
task_id=task_id,
field=f'acceptance_criteria[{i}].criterion',
message="Missing 'criterion' field"
))
elif not criterion.get('testable', True):
errors.append(ValidationError(
task_id=task_id,
field=f'acceptance_criteria[{i}].testable',
message="Acceptance criterion must be testable",
severity="warning"
))
# priority validation (optional but if present must be valid)
if 'priority' in task_data:
if task_data['priority'] not in VALID_PRIORITIES:
errors.append(ValidationError(
task_id=task_id,
field='priority',
message=f"Invalid priority: {task_data['priority']}. Valid: {VALID_PRIORITIES}"
))
# domain validation (optional but if present must be valid)
if 'domain' in task_data:
if task_data['domain'] not in VALID_DOMAINS:
errors.append(ValidationError(
task_id=task_id,
field='domain',
message=f"Invalid domain: {task_data['domain']}. Valid: {VALID_DOMAINS}"
))
# references validation
if 'references' in task_data:
for i, ref in enumerate(task_data['references']):
if not isinstance(ref, dict):
errors.append(ValidationError(
task_id=task_id,
field=f'references[{i}]',
message="Each reference must be an object"
))
elif 'type' in ref and ref['type'] not in VALID_REF_TYPES:
errors.append(ValidationError(
task_id=task_id,
field=f'references[{i}].type',
message=f"Invalid reference type: {ref['type']}. Valid: {VALID_REF_TYPES}"
))
# dependencies validation (check for self-reference)
if 'dependencies' in task_data:
deps = task_data['dependencies']
blocked_by = deps.get('blocked_by', [])
blocks = deps.get('blocks', [])
if task_id in blocked_by:
errors.append(ValidationError(
task_id=task_id,
field='dependencies.blocked_by',
message="Task cannot be blocked by itself"
))
if task_id in blocks:
errors.append(ValidationError(
task_id=task_id,
field='dependencies.blocks',
message="Task cannot block itself"
))
return errors
def validate_file(file_path: Path) -> tuple[list[ValidationError], list[TaskSpec]]: """Validate all tasks in a file.""" if not file_path.exists(): return [ValidationError( task_id="FILE", field="path", message=f"File not found: {file_path}" )], []
content = file_path.read_text()
tasks_data = parse_yaml_frontmatter(content)
all_errors = []
all_tasks = []
for task_data, line_number in tasks_data:
errors = validate_task(task_data, line_number)
all_errors.extend(errors)
# Create TaskSpec object
agent = task_data.get('agent', {})
task = TaskSpec(
task_id=task_data.get('task_id', 'UNKNOWN'),
title=task_data.get('title', ''),
status=task_data.get('status', 'pending'),
track=task_data.get('track', ''),
agent_name=agent.get('name', '') if isinstance(agent, dict) else '',
agent_model=agent.get('model') if isinstance(agent, dict) else None,
priority=task_data.get('priority', 'P2'),
domain=task_data.get('domain'),
references=task_data.get('references', []),
dependencies=task_data.get('dependencies', {}),
constraints=task_data.get('constraints', []),
acceptance_criteria=task_data.get('acceptance_criteria', []),
verification=task_data.get('verification'),
estimated_hours=task_data.get('estimated_hours'),
tags=task_data.get('tags', []),
raw_yaml=task_data,
line_number=line_number
)
all_tasks.append(task)
return all_errors, all_tasks
def check_circular_dependencies(tasks: list[TaskSpec]) -> list[ValidationError]: """Check for circular dependencies across all tasks.""" errors = [] task_ids = {t.task_id for t in tasks}
# Build dependency graph
graph = {}
for task in tasks:
blocked_by = task.dependencies.get('blocked_by', [])
graph[task.task_id] = set(blocked_by) if blocked_by else set()
# Check for cycles using DFS
def has_cycle(node: str, visited: set, rec_stack: set) -> bool:
visited.add(node)
rec_stack.add(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
if has_cycle(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True
rec_stack.remove(node)
return False
visited = set()
for task_id in graph:
if task_id not in visited:
if has_cycle(task_id, visited, set()):
errors.append(ValidationError(
task_id=task_id,
field='dependencies',
message="Circular dependency detected"
))
return errors
def print_stats(tasks: list[TaskSpec]) -> None: """Print task statistics.""" if not tasks: print("No tasks found with YAML frontmatter.") return
# Count by status
by_status = {}
for task in tasks:
by_status[task.status] = by_status.get(task.status, 0) + 1
# Count by track
by_track = {}
for task in tasks:
by_track[task.track] = by_track.get(task.track, 0) + 1
# Count by priority
by_priority = {}
for task in tasks:
by_priority[task.priority] = by_priority.get(task.priority, 0) + 1
print("\n" + "=" * 60)
print("CODITECT Task Specification Statistics")
print("=" * 60)
print(f"\nTotal Tasks: {len(tasks)}")
print("\nBy Status:")
for status in sorted(by_status.keys()):
pct = (by_status[status] / len(tasks)) * 100
print(f" {status:15} {by_status[status]:4} ({pct:.1f}%)")
print("\nBy Track:")
for track in sorted(by_track.keys()):
pct = (by_track[track] / len(tasks)) * 100
print(f" Track {track:10} {by_track[track]:4} ({pct:.1f}%)")
print("\nBy Priority:")
for priority in sorted(by_priority.keys()):
pct = (by_priority[priority] / len(tasks)) * 100
print(f" {priority:15} {by_priority[priority]:4} ({pct:.1f}%)")
# Tasks without acceptance criteria
no_criteria = [t for t in tasks if not t.acceptance_criteria]
if no_criteria:
print(f"\nTasks without acceptance criteria: {len(no_criteria)}")
for t in no_criteria[:5]:
print(f" - {t.task_id}: {t.title[:50]}")
if len(no_criteria) > 5:
print(f" ... and {len(no_criteria) - 5} more")
# Tasks without verification
no_verification = [t for t in tasks if not t.verification]
if no_verification:
print(f"\nTasks without verification: {len(no_verification)}")
def main(): parser = argparse.ArgumentParser( description="Validate CODITECT task specifications (CODITECT-STD-003)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=doc ) parser.add_argument('file', nargs='?', help='File to validate') parser.add_argument('--all', action='store_true', help='Validate all plan files') parser.add_argument('--stats', action='store_true', help='Show task statistics') parser.add_argument('--json', action='store_true', help='Output as JSON') parser.add_argument('--warnings', action='store_true', help='Include warnings')
args = parser.parse_args()
if not YAML_AVAILABLE:
print("Error: PyYAML required. Install with: pip install pyyaml", file=sys.stderr)
sys.exit(1)
# Determine files to validate
if args.all:
# Find all plan files
repo_root = Path(__file__).parent.parent
files = list(repo_root.glob('**/plans/*.md'))
files += list(repo_root.glob('**/project/*.md'))
elif args.file:
files = [Path(args.file)]
else:
# Default to PILOT plan
repo_root = Path(__file__).parent.parent
files = [repo_root / 'internal' / 'project' / 'plans' / 'PILOT-PARALLEL-EXECUTION-PLAN.md']
all_errors = []
all_tasks = []
for file_path in files:
errors, tasks = validate_file(file_path)
all_errors.extend(errors)
all_tasks.extend(tasks)
# Check for circular dependencies
circular_errors = check_circular_dependencies(all_tasks)
all_errors.extend(circular_errors)
# Filter by severity
if not args.warnings:
all_errors = [e for e in all_errors if e.severity == 'error']
# Output
if args.stats:
print_stats(all_tasks)
if args.json:
output = {
'errors': [{'task_id': e.task_id, 'field': e.field, 'message': e.message, 'severity': e.severity} for e in all_errors],
'task_count': len(all_tasks),
'error_count': len(all_errors)
}
print(json.dumps(output, indent=2))
else:
if all_errors:
print("\n" + "=" * 60)
print("VALIDATION ERRORS")
print("=" * 60)
for error in all_errors:
icon = "❌" if error.severity == "error" else "⚠️"
print(f"{icon} [{error.task_id}] {error.field}: {error.message}")
print(f"\nTotal: {len(all_errors)} error(s)")
sys.exit(1)
else:
print(f"✅ Validation passed. {len(all_tasks)} task(s) validated.")
sys.exit(0)
if name == 'main': main()