scripts-production-cleanup
#!/usr/bin/env python3 """ā
title: "Production readiness thresholds" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Production Cleanup Orchestration Script" keywords: ['analysis', 'cleanup', 'git', 'production', 'review'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "production-cleanup.py" language: python executable: true usage: "python3 scripts/production-cleanup.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: falseā
Production Cleanup Orchestration Script
Coordinates multi-agent cleanup operations to achieve 95/100 production readiness across repository directories and submodules.
Usage: python3 production-cleanup.py --target all --mode full python3 production-cleanup.py --target docs/ --mode assess --dry-run python3 production-cleanup.py --target submodules --priority P0,P1 """
import argparse import json import os import subprocess import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple
Production readiness thresholds
THRESHOLDS = { "max_root_files": 15, "target_root_files": 10, "min_production_score": 80, "critical_score": 40, "high_priority_score": 60, "medium_priority_score": 80 }
File classification patterns
FILE_PATTERNS = { "essential": [".gitignore", ".gitmodules", "README.md", "CLAUDE.md", "LICENSE", "CONTRIBUTING.md"], "docs": [".md", ".txt", ".pdf"], "code": [".py", ".sh", ".js", ".ts", ".rs", ".go"], "data": [".json", ".yml", ".yaml", ".toml", ".xml"], "logs": [".log"], "temp": [".tmp", ".temp", "~", ".bak", ".swp"] }
class ProductionCleanupOrchestrator: """ Orchestrates multi-agent production cleanup operations. """
def __init__(self, target: str, mode: str, priority: str, dry_run: bool):
self.target = target
self.mode = mode
self.priority = priority
self.dry_run = dry_run
self.timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%SZ")
self.repo_root = self._find_repo_root()
self.reports_dir = self.repo_root / "reports"
self.reports_dir.mkdir(exist_ok=True)
def _find_repo_root(self) -> Path:
"""Find repository root (has .git directory)."""
current = Path.cwd()
while current != current.parent:
if (current / ".git").exists():
return current
current = current.parent
raise RuntimeError("Not in a git repository")
def execute(self) -> Dict:
"""
Execute production cleanup orchestration.
Returns:
Dict with execution results and reports.
"""
print(f"\n{'='*70}")
print(f"š Production Cleanup Orchestrator")
print(f"{'='*70}")
print(f"Target: {self.target}")
print(f"Mode: {self.mode}")
print(f"Priority: {self.priority}")
print(f"Dry Run: {self.dry_run}")
print(f"Timestamp: {self.timestamp}")
print(f"{'='*70}\n")
results = {
"target": self.target,
"mode": self.mode,
"priority": self.priority,
"dry_run": self.dry_run,
"timestamp": self.timestamp,
"phases_executed": [],
"reports_generated": [],
"errors": []
}
# Phase 1: Assessment
if self.mode in ["assess", "plan", "execute", "verify", "full"]:
print("\nš Phase 1: Assessment")
print("-" * 70)
assessment = self._phase_1_assessment()
results["phases_executed"].append("assessment")
results["assessment"] = assessment
results["reports_generated"].append(assessment["report_path"])
# Phase 2: Planning
if self.mode in ["plan", "execute", "verify", "full"]:
print("\nš Phase 2: Planning")
print("-" * 70)
plan = self._phase_2_planning(results.get("assessment"))
results["phases_executed"].append("planning")
results["plan"] = plan
results["reports_generated"].append(plan["report_path"])
# Phase 3: Execution
if self.mode in ["execute", "verify", "full"] and not self.dry_run:
print("\nāļø Phase 3: Execution")
print("-" * 70)
execution = self._phase_3_execution(results.get("plan"))
results["phases_executed"].append("execution")
results["execution"] = execution
results["reports_generated"].extend(execution["reports"])
# Phase 4: Verification
if self.mode in ["verify", "full"]:
print("\nā
Phase 4: Verification")
print("-" * 70)
verification = self._phase_4_verification()
results["phases_executed"].append("verification")
results["verification"] = verification
results["reports_generated"].extend(verification["reports"])
# Phase 5: Submodules
if self.mode == "full" and self.target in ["all", "submodules"]:
print("\nš¦ Phase 5: Submodules")
print("-" * 70)
submodules = self._phase_5_submodules()
results["phases_executed"].append("submodules")
results["submodules"] = submodules
results["reports_generated"].extend(submodules["reports"])
# Phase 6: Final Reporting
if self.mode == "full":
print("\nš Phase 6: Final Reporting")
print("-" * 70)
final_report = self._phase_6_reporting(results)
results["phases_executed"].append("reporting")
results["final_report"] = final_report
results["reports_generated"].append(final_report["report_path"])
self._print_summary(results)
return results
def _phase_1_assessment(self) -> Dict:
"""
Phase 1: Assess production readiness using codebase-analyzer agent.
"""
print("Invoking codebase-analyzer agent...")
# TODO: Invoke actual agent via Task tool
# For now, generate assessment via direct analysis
assessment = {
"timestamp": self.timestamp,
"target": self.target,
"directories_analyzed": [],
"overall_score": 0,
"issues": [],
"recommendations": []
}
# Analyze directories
if self.target == "all":
directories = self._get_all_directories()
elif self.target == "submodules":
directories = self._get_submodule_directories()
else:
directories = [Path(self.target)]
for directory in directories:
if not directory.exists():
continue
dir_assessment = self._assess_directory(directory)
assessment["directories_analyzed"].append(dir_assessment)
# Calculate overall score
if assessment["directories_analyzed"]:
assessment["overall_score"] = sum(
d["production_score"] for d in assessment["directories_analyzed"]
) / len(assessment["directories_analyzed"])
# Save report
report_path = self.reports_dir / f"production-readiness-assessment-{self.timestamp}.json"
with open(report_path, "w") as f:
json.dump(assessment, f, indent=2)
assessment["report_path"] = str(report_path)
print(f"ā
Assessment complete. Overall score: {assessment['overall_score']:.1f}/100")
print(f"š Report: {report_path}")
return assessment
def _assess_directory(self, directory: Path) -> Dict:
"""Assess a single directory for production readiness."""
files = list(directory.glob("*"))
files = [f for f in files if f.is_file() and not f.name.startswith(".")]
file_count = len(files)
file_count_score = max(0, 100 - (file_count - 10) * 5)
# Organization score (files in subdirectories)
subdirs = [d for d in directory.glob("*") if d.is_dir() and not d.name.startswith(".")]
files_in_subdirs = sum(len(list(d.rglob("*"))) for d in subdirs)
total_files = file_count + files_in_subdirs
organization_score = (files_in_subdirs / total_files * 100) if total_files > 0 else 0
# Documentation score
readme_exists = (directory / "README.md").exists()
docs_score = 50 if readme_exists else 0
if readme_exists:
readme_size = (directory / "README.md").stat().st_size
docs_score += min(50, readme_size / 100) # Quality based on size
production_score = (file_count_score + organization_score + docs_score) / 3
return {
"directory": str(directory.relative_to(self.repo_root)),
"current_files": file_count,
"target_files": THRESHOLDS["target_root_files"],
"production_score": round(production_score, 1),
"file_count_score": round(file_count_score, 1),
"organization_score": round(organization_score, 1),
"documentation_score": round(docs_score, 1),
"issues": self._identify_issues(file_count, readme_exists, production_score),
"recommendations": self._generate_recommendations(file_count, readme_exists)
}
def _identify_issues(self, file_count: int, readme_exists: bool, score: float) -> List[str]:
"""Identify issues with directory."""
issues = []
if file_count > THRESHOLDS["max_root_files"]:
issues.append(f"Too many root files ({file_count} > {THRESHOLDS['max_root_files']})")
if not readme_exists:
issues.append("No README.md found")
if score < THRESHOLDS["critical_score"]:
issues.append(f"Critical production score ({score:.1f} < {THRESHOLDS['critical_score']})")
return issues
def _generate_recommendations(self, file_count: int, readme_exists: bool) -> List[str]:
"""Generate recommendations for improvement."""
recommendations = []
if file_count > THRESHOLDS["target_root_files"]:
recommendations.append("Create subdirectories to organize files by category")
if not readme_exists:
recommendations.append("Add README.md with directory purpose and structure")
if file_count > 20:
recommendations.append("Archive old/historical files to archive/ subdirectory")
return recommendations
def _phase_2_planning(self, assessment: Optional[Dict]) -> Dict:
"""Phase 2: Create cleanup plan using project-organizer agent."""
print("Invoking project-organizer agent...")
plan = {
"timestamp": self.timestamp,
"directories_to_clean": [],
"priority_breakdown": {"P0": [], "P1": [], "P2": [], "P3": []}
}
if not assessment:
print("ā ļø No assessment provided, skipping planning")
return plan
# Prioritize directories
for dir_assessment in assessment["directories_analyzed"]:
score = dir_assessment["production_score"]
directory = dir_assessment["directory"]
if score < THRESHOLDS["critical_score"]:
priority = "P0"
elif score < THRESHOLDS["high_priority_score"]:
priority = "P1"
elif score < THRESHOLDS["medium_priority_score"]:
priority = "P2"
else:
priority = "P3"
plan["priority_breakdown"][priority].append(directory)
# Filter by requested priority
if self.priority == "all" or priority in self.priority.split(","):
plan["directories_to_clean"].append({
"directory": directory,
"priority": priority,
"score": score,
"actions": self._plan_directory_cleanup(directory)
})
# Save report
report_path = self.reports_dir / f"cleanup-plan-{self.timestamp}.json"
with open(report_path, "w") as f:
json.dump(plan, f, indent=2)
plan["report_path"] = str(report_path)
print(f"ā
Planning complete. {len(plan['directories_to_clean'])} directories to clean")
print(f"š Report: {report_path}")
return plan
def _plan_directory_cleanup(self, directory: str) -> List[Dict]:
"""Plan cleanup actions for a directory."""
# TODO: Implement intelligent file categorization and mapping
# This is a placeholder that would be enhanced with agent logic
return [
{
"action": "create_subdirectories",
"subdirectories": ["docs/", "reports/", "archive/", "configs/"]
},
{
"action": "categorize_files",
"categories": ["documentation", "reports", "configurations", "historical"]
}
]
def _phase_3_execution(self, plan: Optional[Dict]) -> Dict:
"""Phase 3: Execute cleanup using project-organizer agent."""
print("Invoking project-organizer, code-locator, and codi-documentation-writer agents...")
execution = {
"timestamp": self.timestamp,
"directories_cleaned": [],
"reports": []
}
if not plan:
print("ā ļø No plan provided, skipping execution")
return execution
for dir_plan in plan.get("directories_to_clean", []):
directory = dir_plan["directory"]
print(f"\nCleaning {directory}...")
# Execute cleanup (placeholder)
# TODO: Invoke agents to execute actual file moves
result = {
"directory": directory,
"before_files": 0, # Would be actual count
"after_files": 0, # Would be actual count
"files_moved": 0,
"subdirectories_created": 0,
"documentation_updated": False
}
execution["directories_cleaned"].append(result)
# Generate per-directory report
report_path = self.reports_dir / f"cleanup-execution-{directory.replace('/', '-')}-{self.timestamp}.json"
with open(report_path, "w") as f:
json.dump(result, f, indent=2)
execution["reports"].append(str(report_path))
print(f"\nā
Execution complete. {len(execution['directories_cleaned'])} directories cleaned")
return execution
def _phase_4_verification(self) -> Dict:
"""Phase 4: Verify cleanup using codebase-analyst agent."""
print("Invoking codebase-analyst agent...")
verification = {
"timestamp": self.timestamp,
"verified_directories": [],
"reports": []
}
# TODO: Invoke agent to verify cleanup results
print("ā
Verification complete")
return verification
def _phase_5_submodules(self) -> Dict:
"""Phase 5: Clean submodules using submodule-orchestrator agent."""
print("Invoking submodule-orchestrator agent...")
submodules = {
"timestamp": self.timestamp,
"submodules_cleaned": [],
"reports": []
}
# TODO: Invoke agent to clean submodule roots
print("ā
Submodule cleanup complete")
return submodules
def _phase_6_reporting(self, results: Dict) -> Dict:
"""Phase 6: Generate final report using codi-documentation-writer agent."""
print("Invoking codi-documentation-writer agent...")
# Generate comprehensive Markdown report
report_content = self._generate_markdown_report(results)
report_path = self.repo_root / "docs" / "project-management" / f"PRODUCTION-READINESS-REPORT-{self.timestamp}.md"
report_path.parent.mkdir(parents=True, exist_ok=True)
with open(report_path, "w") as f:
f.write(report_content)
print(f"ā
Final report generated")
print(f"š Report: {report_path}")
return {"report_path": str(report_path)}
def _generate_markdown_report(self, results: Dict) -> str:
"""Generate comprehensive Markdown report."""
return f"""# Production Readiness Report
Generated: {self.timestamp} Target: {self.target} Mode: {self.mode} Priority: {self.priority}
Executive Summaryā
Phases executed: {', '.join(results['phases_executed'])}
Assessment Resultsā
{json.dumps(results.get('assessment', {}), indent=2)}
Cleanup Planā
{json.dumps(results.get('plan', {}), indent=2)}
Generated Reportsā
{chr(10).join(f'- {report}' for report in results['reports_generated'])}
Generated by CODITECT Production Cleanup Orchestrator """
def _get_all_directories(self) -> List[Path]:
"""Get all top-level directories in repository."""
return [d for d in self.repo_root.iterdir() if d.is_dir() and not d.name.startswith(".")]
def _get_submodule_directories(self) -> List[Path]:
"""Get all submodule directories."""
submodules_dir = self.repo_root / "submodules"
if not submodules_dir.exists():
return []
submodules = []
for category in submodules_dir.iterdir():
if category.is_dir():
submodules.extend([s for s in category.iterdir() if s.is_dir()])
return submodules
def _print_summary(self, results: Dict):
"""Print execution summary."""
print(f"\n{'='*70}")
print(f"ā
Production Cleanup Complete")
print(f"{'='*70}")
print(f"Phases Executed: {', '.join(results['phases_executed'])}")
print(f"Reports Generated: {len(results['reports_generated'])}")
print(f"\nReports:")
for report in results['reports_generated']:
print(f" š {report}")
print(f"{'='*70}\n")
def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Production Cleanup Orchestration", formatter_class=argparse.RawDescriptionHelpFormatter )
parser.add_argument(
"--target",
required=True,
help="Target to clean: all | <directory> | submodules | root"
)
parser.add_argument(
"--mode",
default="full",
choices=["assess", "plan", "execute", "verify", "full"],
help="Cleanup mode (default: full)"
)
parser.add_argument(
"--priority",
default="all",
help="Priority filter: P0 | P0,P1 | all (default: all)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without executing"
)
args = parser.parse_args()
try:
orchestrator = ProductionCleanupOrchestrator(
target=args.target,
mode=args.mode,
priority=args.priority,
dry_run=args.dry_run
)
results = orchestrator.execute()
# Exit with appropriate code
if results.get("errors"):
sys.exit(1)
else:
sys.exit(0)
except Exception as e:
print(f"\nā Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
if name == "main": main()