scripts-orchestrate-ecosystem
#!/usr/bin/env python3 """
title: "Import submodule orchestrator" component_type: script version: "1.0.0" audience: contributor status: stable summary: "CODITECT Ecosystem Orchestrator - Multi-Submodule Automation" keywords: ['analysis', 'automation', 'ecosystem', 'generation', 'git'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "orchestrate-ecosystem.py" language: python executable: true usage: "python3 scripts/orchestrate-ecosystem.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
CODITECT Ecosystem Orchestrator - Multi-Submodule Automation
Orchestrates compliance standardization across an entire CODITECT ecosystem with zero human-in-the-loop operation. Coordinates multiple agents working in parallel on different submodules.
Usage: # Analyze entire ecosystem python3 orchestrate-ecosystem.py --root /path/to/project
# Execute specific phase
python3 orchestrate-ecosystem.py --root /path/to/project --phase 0.8
# Full automation with parallel execution
python3 orchestrate-ecosystem.py --root /path/to/project --execute --parallel 4
# Generate master coordination report
python3 orchestrate-ecosystem.py --root /path/to/project --report
Features: - Multi-submodule parallel processing - Phase-based execution (0.8, 0.9, 1.0, etc.) - Automatic agent task generation - Progress tracking and reporting - Git sync coordination
Author: CODITECT Team License: MIT """
import argparse import json import os import subprocess import sys from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple
Import submodule orchestrator
try: from orchestrate_submodule import analyze_submodule, CODITECT_STANDARDS except ImportError: # Define inline if not importable CODITECT_STANDARDS = { "min_compliance_score": 90, "max_claude_md_lines": 150, }
def analyze_submodule(path: Path) -> Dict:
"""Simplified inline analysis."""
score = 0
claude_md = path / "CLAUDE.md"
if claude_md.exists():
lines = len(claude_md.read_text().splitlines())
score += 25 if lines <= 150 else 15
if (path / ".coditect").is_symlink():
score += 10
if (path / ".claude").is_symlink():
score += 10
if (path / "docs").is_dir():
score += 15
if (path / "README.md").exists():
score += 10
if (path / ".gitignore").exists():
score += 10
if (path / "docs" / "project-management" / "PROJECT-PLAN.md").exists():
score += 10
return {
"path": str(path),
"name": path.name,
"score": score,
"compliant": score >= 90,
"agent_tasks": [] if score >= 90 else [{"agent": "project-organizer", "task": f"Standardize {path.name}"}]
}
Phase definitions
PHASES = { "0.7": {"category": "dev", "priority": "COMPLETE", "status": "✅"}, "0.8": {"category": "core", "priority": "P0", "status": "📋"}, "0.9": {"category": "cloud", "priority": "P0", "status": "📋"}, "1.0": {"category": "docs", "priority": "P1", "status": "📋"}, "1.1": {"category": "ops", "priority": "P2", "status": "📋"}, "1.2": {"category": "gtm", "priority": "P1", "status": "📋"}, "1.3": {"category": "labs", "priority": "P2", "status": "📋"}, "1.4": {"category": "mixed", "priority": "P2", "status": "📋"}, }
def discover_submodules(root: Path) -> Dict[str, List[Path]]: """Discover all submodules organized by category.""" submodules_dir = root / "submodules" categories = {}
if not submodules_dir.exists():
# Not a rollout-master structure, treat root as single project
return {"single": [root]}
for category_dir in submodules_dir.iterdir():
if category_dir.is_dir() and not category_dir.name.startswith("."):
category = category_dir.name
categories[category] = []
for submodule in category_dir.iterdir():
if submodule.is_dir() and not submodule.name.startswith("."):
categories[category].append(submodule)
return categories
def analyze_ecosystem(root: Path, parallel: int = 4) -> Dict: """Analyze entire ecosystem compliance.""" categories = discover_submodules(root) results = { "root": str(root), "timestamp": datetime.now().isoformat(), "categories": {}, "summary": { "total_submodules": 0, "compliant": 0, "non_compliant": 0, "average_score": 0, "total_agent_tasks": 0, }, "by_phase": {}, }
all_scores = []
def analyze_with_category(submodule: Path, category: str) -> Tuple[str, str, Dict]:
analysis = analyze_submodule(submodule)
return category, submodule.name, analysis
# Collect all submodules for parallel processing
all_submodules = []
for category, submodules in categories.items():
for submodule in submodules:
all_submodules.append((submodule, category))
# Parallel analysis
with ThreadPoolExecutor(max_workers=parallel) as executor:
futures = {
executor.submit(analyze_with_category, sm, cat): (sm, cat)
for sm, cat in all_submodules
}
for future in as_completed(futures):
try:
category, name, analysis = future.result()
if category not in results["categories"]:
results["categories"][category] = {
"submodules": {},
"compliant_count": 0,
"total_count": 0,
"average_score": 0,
}
results["categories"][category]["submodules"][name] = analysis
results["categories"][category]["total_count"] += 1
if analysis["compliant"]:
results["categories"][category]["compliant_count"] += 1
results["summary"]["compliant"] += 1
else:
results["summary"]["non_compliant"] += 1
results["summary"]["total_submodules"] += 1
results["summary"]["total_agent_tasks"] += len(analysis.get("agent_tasks", []))
all_scores.append(analysis["score"])
except Exception as e:
print(f"Error analyzing: {e}")
# Calculate averages
if all_scores:
results["summary"]["average_score"] = round(sum(all_scores) / len(all_scores), 1)
for category in results["categories"]:
cat_scores = [
sm["score"]
for sm in results["categories"][category]["submodules"].values()
]
if cat_scores:
results["categories"][category]["average_score"] = round(
sum(cat_scores) / len(cat_scores), 1
)
# Map to phases
for phase, info in PHASES.items():
cat = info["category"]
if cat in results["categories"]:
results["by_phase"][phase] = {
"category": cat,
"priority": info["priority"],
"status": info["status"],
"submodule_count": results["categories"][cat]["total_count"],
"compliant_count": results["categories"][cat]["compliant_count"],
"average_score": results["categories"][cat]["average_score"],
}
return results
def generate_execution_plan(analysis: Dict, phase: Optional[str] = None) -> Dict: """Generate agent execution plan for ecosystem or specific phase.""" plan = { "generated": datetime.now().isoformat(), "phases": [], "total_tasks": 0, "estimated_duration": "0 hours", }
target_categories = None
if phase and phase in PHASES:
target_categories = [PHASES[phase]["category"]]
else:
# All non-complete phases
target_categories = [
info["category"] for p, info in PHASES.items()
if info["status"] != "✅"
]
for category in target_categories:
if category not in analysis["categories"]:
continue
cat_data = analysis["categories"][category]
phase_plan = {
"category": category,
"submodules": [],
"task_count": 0,
}
for name, sm_analysis in cat_data["submodules"].items():
if not sm_analysis["compliant"]:
tasks = sm_analysis.get("agent_tasks", [])
phase_plan["submodules"].append({
"name": name,
"path": sm_analysis["path"],
"current_score": sm_analysis["score"],
"tasks": tasks,
"agent_invocations": [
f'Task(subagent_type="{t["agent"]}", prompt="{t["task"]}")'
for t in tasks
]
})
phase_plan["task_count"] += len(tasks)
plan["phases"].append(phase_plan)
plan["total_tasks"] += phase_plan["task_count"]
# Estimate duration (roughly 5 minutes per task)
hours = round(plan["total_tasks"] * 5 / 60, 1)
plan["estimated_duration"] = f"{hours} hours"
return plan
def generate_master_coordination_script(plan: Dict) -> str: """Generate master coordination script for all phases.""" lines = [ "#!/usr/bin/env python3", '"""', "CODITECT Ecosystem Master Coordination Script", f"Generated: {plan['generated']}", f"Total Tasks: {plan['total_tasks']}", f"Estimated Duration: {plan['estimated_duration']}", '"""', "", "# Execute these agent invocations in Claude Code", "# Each phase should be completed before moving to the next", "", ]
for i, phase in enumerate(plan["phases"], 1):
lines.extend([
f"# {'='*60}",
f"# PHASE {i}: {phase['category'].upper()} ({phase['task_count']} tasks)",
f"# {'='*60}",
"",
])
for sm in phase["submodules"]:
lines.extend([
f"# --- {sm['name']} (Score: {sm['current_score']}/100) ---",
])
for invocation in sm["agent_invocations"]:
lines.append(invocation)
lines.append("")
lines.extend([
f"# Phase {i} completion: Sync all submodules",
f'# /git-sync --target submodules/{phase["category"]}/ --mode full',
"",
])
lines.extend([
"# FINAL: Verify ecosystem compliance",
'Task(subagent_type="compliance-checker-agent", prompt="Verify all submodules are at 90+ compliance")',
"",
"# Generate final report",
"# python3 orchestrate-ecosystem.py --root . --report",
])
return "\n".join(lines)
def generate_report(analysis: Dict, output_format: str = "text") -> str: """Generate compliance report.""" if output_format == "json": return json.dumps(analysis, indent=2)
lines = [
"=" * 70,
"CODITECT ECOSYSTEM COMPLIANCE REPORT",
"=" * 70,
f"Generated: {analysis['timestamp']}",
f"Root: {analysis['root']}",
"",
"-" * 70,
"SUMMARY",
"-" * 70,
f"Total Submodules: {analysis['summary']['total_submodules']}",
f"Compliant (90+): {analysis['summary']['compliant']} ({round(analysis['summary']['compliant']/max(1,analysis['summary']['total_submodules'])*100)}%)",
f"Non-Compliant: {analysis['summary']['non_compliant']}",
f"Average Score: {analysis['summary']['average_score']}/100",
f"Total Agent Tasks Needed: {analysis['summary']['total_agent_tasks']}",
"",
]
# Phase breakdown
lines.extend([
"-" * 70,
"BY PHASE",
"-" * 70,
])
for phase, data in sorted(analysis.get("by_phase", {}).items()):
status = data.get("status", "📋")
lines.append(
f"Phase {phase} ({data['category']}): {status} "
f"[{data['compliant_count']}/{data['submodule_count']} compliant, "
f"avg: {data['average_score']}/100]"
)
lines.append("")
# Category details
lines.extend([
"-" * 70,
"BY CATEGORY",
"-" * 70,
])
for category, data in sorted(analysis["categories"].items()):
lines.extend([
f"\n{category.upper()}/ ({data['total_count']} submodules, avg: {data['average_score']}/100)",
])
for name, sm in sorted(data["submodules"].items(), key=lambda x: x[1]["score"]):
status = "✅" if sm["compliant"] else "❌"
tasks = len(sm.get("agent_tasks", []))
task_str = f" [{tasks} tasks]" if tasks > 0 else ""
lines.append(f" {status} {name}: {sm['score']}/100{task_str}")
return "\n".join(lines)
def main(): parser = argparse.ArgumentParser( description="CODITECT Ecosystem Orchestrator - Multi-Submodule Automation" ) parser.add_argument( "--root", "-r", type=Path, default=Path.cwd(), help="Root directory of CODITECT project (default: current directory)" ) parser.add_argument( "--phase", type=str, choices=list(PHASES.keys()), help="Execute specific phase only" ) parser.add_argument( "--execute", action="store_true", help="Generate and execute agent tasks" ) parser.add_argument( "--parallel", "-j", type=int, default=4, help="Number of parallel analysis workers (default: 4)" ) parser.add_argument( "--report", action="store_true", help="Generate comprehensive report" ) parser.add_argument( "--output", "-o", type=Path, help="Output file for report or execution plan" ) parser.add_argument( "--format", "-f", choices=["text", "json"], default="text", help="Output format (default: text)" ) parser.add_argument( "--generate-script", action="store_true", help="Generate master coordination script" )
args = parser.parse_args()
print("=" * 60)
print("CODITECT ECOSYSTEM ORCHESTRATOR")
print("=" * 60)
print(f"Root: {args.root}")
print(f"Mode: {'Report' if args.report else 'Analysis'}")
if args.phase:
print(f"Phase: {args.phase}")
# Analyze ecosystem
print("\n[Analyzing ecosystem...]")
analysis = analyze_ecosystem(args.root, parallel=args.parallel)
print(f"\nDiscovered: {analysis['summary']['total_submodules']} submodules")
print(f"Compliant: {analysis['summary']['compliant']}/{analysis['summary']['total_submodules']}")
print(f"Average Score: {analysis['summary']['average_score']}/100")
# Generate report
if args.report:
report = generate_report(analysis, output_format=args.format)
if args.output:
args.output.write_text(report)
print(f"\nReport saved to: {args.output}")
else:
print("\n" + report)
# Generate execution plan
if args.execute or args.generate_script:
print("\n[Generating execution plan...]")
plan = generate_execution_plan(analysis, phase=args.phase)
print(f"Tasks to execute: {plan['total_tasks']}")
print(f"Estimated duration: {plan['estimated_duration']}")
if args.generate_script:
script = generate_master_coordination_script(plan)
script_path = args.output or args.root / "ecosystem_orchestration.py"
Path(script_path).write_text(script)
print(f"\nMaster coordination script saved to: {script_path}")
if args.execute:
print("\n[Execution mode]")
print("Execute the generated script in Claude Code to run all agent tasks.")
print("Each task will invoke the appropriate specialized agent.")
# Save analysis JSON
if args.format == "json" and not args.report:
output = json.dumps(analysis, indent=2)
if args.output:
args.output.write_text(output)
print(f"\nAnalysis saved to: {args.output}")
else:
print(output)
# Exit code based on compliance
compliant_ratio = analysis["summary"]["compliant"] / max(1, analysis["summary"]["total_submodules"])
sys.exit(0 if compliant_ratio >= 0.9 else 1)
if name == "main": main()