#!/usr/bin/env python3 """ AM.2.4: Orchestrate full lowercase migration across all submodules.
Master orchestration script for staged rollout (Option A). Migrates one submodule at a time with validation between each.
Usage: python3 scripts/lowercase-migration/orchestrate-full-migration.py python3 scripts/lowercase-migration/orchestrate-full-migration.py --start-from coditect-docs-training python3 scripts/lowercase-migration/orchestrate-full-migration.py --category cloud --dry-run python3 scripts/lowercase-migration/orchestrate-full-migration.py --yes --parallel 3 python3 scripts/lowercase-migration/orchestrate-full-migration.py --next python3 scripts/lowercase-migration/orchestrate-full-migration.py --next --yes
Queue Document Integration: The script reads from and updates SUBMODULE-MIGRATION-QUEUE.md which contains a prioritized list of all submodules with their migration status.
Queue file: context-storage/lowercase-migration/SUBMODULE-MIGRATION-QUEUE.md
Status symbols:
- ā³ Pending - Not yet migrated
- š In Progress - Migration running
- ā
Completed - Migration successful
- ā Failed - Migration failed (check report)
"""
import os import sys import json import re import subprocess import time from pathlib import Path from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed
class QueueDocument: """Parser and updater for SUBMODULE-MIGRATION-QUEUE.md."""
def __init__(self, queue_path: Path):
self.queue_path = queue_path
self.content = ''
self.submodules = []
def load(self) -> bool:
"""Load the queue document."""
if not self.queue_path.exists():
return False
with open(self.queue_path) as f:
self.content = f.read()
self._parse()
return True
def _parse(self):
"""Parse submodules from queue document."""
self.submodules = []
# Pattern matches table rows like:
# | ā³ | coditect-docs-training | `submodules/docs/coditect-docs-training` | 43 | 0 | `python3 scripts/...` |
pattern = r'\|\s*([ā³šā
ā])\s*\|\s*([^|]+?)\s*\|\s*`([^`]+)`\s*\|\s*(\d+)\s*\|\s*(\d+)\s*\|\s*`([^`]+)`\s*\|'
priority = 0
for match in re.finditer(pattern, self.content):
priority += 1
name = match.group(2).strip()
path = match.group(3).strip()
# Extract category from path (e.g., submodules/docs/... -> docs)
path_parts = path.split('/')
category = path_parts[1] if len(path_parts) > 1 else 'unknown'
self.submodules.append({
'status': match.group(1),
'priority': priority,
'name': name,
'path': path,
'category': category,
'files': int(match.group(4)),
'dirs': int(match.group(5)),
'command': match.group(6),
'match_text': match.group(0),
'start': match.start(),
'end': match.end()
})
def get_pending(self) -> list:
"""Get all pending (ā³) submodules in priority order."""
return sorted(
[s for s in self.submodules if s['status'] == 'ā³'],
key=lambda x: x['priority']
)
def get_next(self) -> dict:
"""Get the next pending submodule."""
pending = self.get_pending()
return pending[0] if pending else None
def update_status(self, name: str, new_status: str) -> bool:
"""Update the status of a submodule in the queue document."""
if new_status not in ['ā³', 'š', 'ā
', 'ā']:
return False
# Find the submodule
for submod in self.submodules:
if submod['name'] == name:
old_status = submod['status']
# Replace in content - be careful with exact string matching
old_row = submod['match_text']
new_row = old_row.replace(f'| {old_status} |', f'| {new_status} |', 1)
self.content = self.content.replace(old_row, new_row)
# Save the file
with open(self.queue_path, 'w') as f:
f.write(self.content)
# Update in memory
submod['status'] = new_status
return True
return False
def get_stats(self) -> dict:
"""Get queue statistics."""
return {
'total': len(self.submodules),
'pending': len([s for s in self.submodules if s['status'] == 'ā³']),
'in_progress': len([s for s in self.submodules if s['status'] == 'š']),
'completed': len([s for s in self.submodules if s['status'] == 'ā
']),
'failed': len([s for s in self.submodules if s['status'] == 'ā']),
}
class MigrationOrchestrator: """Orchestrates staged lowercase migration across all submodules."""
def __init__(self, root_path: Path, config: dict = None):
self.root_path = root_path
self.config = config or {}
self.results = {
'started_at': datetime.utcnow().isoformat() + 'Z',
'completed': [],
'failed': [],
'skipped': [],
'pending': []
}
self.state_file = root_path / 'context-storage' / 'lowercase-migration' / 'orchestration-state.json'
self.queue_path = root_path / 'context-storage' / 'lowercase-migration' / 'SUBMODULE-MIGRATION-QUEUE.md'
self.queue = None
def load_queue(self) -> bool:
"""Load the queue document."""
self.queue = QueueDocument(self.queue_path)
return self.queue.load()
def show_queue_status(self):
"""Display queue status summary."""
if not self.queue:
if not self.load_queue():
print(f"Error: Queue document not found at {self.queue_path}")
print("Run inventory-all-submodules.py first to generate the queue.")
return False
stats = self.queue.get_stats()
print("\n" + "=" * 70)
print("SUBMODULE MIGRATION QUEUE STATUS")
print("=" * 70)
print(f"Queue file: {self.queue_path}")
print()
print(f" ā³ Pending: {stats['pending']}")
print(f" š In Progress: {stats['in_progress']}")
print(f" ā
Completed: {stats['completed']}")
print(f" ā Failed: {stats['failed']}")
print(f" āāāāāāāāāāāāāāāāā")
print(f" Total: {stats['total']}")
print()
return True
def show_next(self, auto_yes: bool = False) -> dict:
"""Show and optionally execute the next pending submodule."""
if not self.queue:
if not self.load_queue():
print(f"Error: Queue document not found at {self.queue_path}")
return None
self.show_queue_status()
next_submod = self.queue.get_next()
if not next_submod:
print("ā All submodules have been migrated!")
return None
print("-" * 70)
print("NEXT PENDING SUBMODULE")
print("-" * 70)
print(f" Priority: #{next_submod['priority']}")
print(f" Name: {next_submod['name']}")
print(f" Path: {next_submod['path']}")
print(f" Category: {next_submod['category']}")
print(f" Files: {next_submod['files']}")
print(f" Dirs: {next_submod['dirs']}")
print()
print(f" Command:")
print(f" {next_submod['command']}")
print()
# Show next 5 pending after this one
pending = self.queue.get_pending()
if len(pending) > 1:
print("-" * 70)
print("UPCOMING (next 5):")
for submod in pending[1:6]:
print(f" #{submod['priority']} {submod['name']} ({submod['category']}) - {submod['files']} files")
if len(pending) > 6:
print(f" ... and {len(pending) - 6} more")
print()
return next_submod
def run_next(self, dry_run: bool = False, auto_yes: bool = False) -> dict:
"""Run migration on the next pending submodule from queue."""
next_submod = self.show_next(auto_yes)
if not next_submod:
return None
if not auto_yes:
response = input("Run migration on this submodule? [y/N]: ")
if response.lower() != 'y':
print("Aborted.")
return None
# Use path directly from parsed queue document
submod_path = next_submod.get('path', '')
if not submod_path:
print(f"Error: No path found for submodule: {next_submod['name']}")
return None
# Create submodule dict for run_migration
submod = {
'name': next_submod['name'],
'path': submod_path,
'full_path': str(self.root_path / submod_path),
'category': next_submod['category']
}
# Update queue to "in progress"
self.queue.update_status(next_submod['name'], 'š')
print("\n" + "=" * 70)
print(f"MIGRATING: {next_submod['name']}")
print("=" * 70)
# Run the migration
result = self.run_migration(submod, dry_run, auto_yes=True)
if result['success']:
# Validate
validation = self.validate_submodule(submod)
if validation['validated']:
print(f"\nā Migration successful, validated")
if not dry_run:
self.queue.update_status(next_submod['name'], 'ā
')
else:
print(f"\nā Migration complete but validation warnings:")
print(f" - Uncommitted: {validation['uncommitted_changes']}")
print(f" - Remaining uppercase: {validation['remaining_uppercase']}")
if not dry_run:
self.queue.update_status(next_submod['name'], 'ā
')
else:
print(f"\nā Migration failed: {result.get('error', result.get('stderr', 'Unknown'))[:200]}")
if not dry_run:
self.queue.update_status(next_submod['name'], 'ā')
# Show updated status
print()
self.show_queue_status()
return result
def find_submodules(self) -> list:
"""Find all git submodules in the repository."""
submodules = []
gitmodules_path = self.root_path / '.gitmodules'
if gitmodules_path.exists():
with open(gitmodules_path) as f:
content = f.read()
for match in re.finditer(r'path\s*=\s*(.+)', content):
path = match.group(1).strip()
full_path = self.root_path / path
if full_path.exists():
# Determine category from path
parts = Path(path).parts
category = parts[1] if len(parts) > 1 else 'root'
submodules.append({
'path': path,
'full_path': str(full_path),
'name': Path(path).name,
'category': category
})
return submodules
def load_inventory(self) -> dict:
"""Load the pre-generated inventory."""
inventory_path = self.root_path / 'context-storage' / 'lowercase-migration' / 'inventory-all-submodules.json'
if not inventory_path.exists():
print(f"Error: Inventory not found at {inventory_path}")
print("Run: python3 scripts/lowercase-migration/inventory-all-submodules.py first")
sys.exit(1)
with open(inventory_path) as f:
return json.load(f)
def load_state(self) -> dict:
"""Load previous orchestration state for resume capability."""
if self.state_file.exists():
with open(self.state_file) as f:
return json.load(f)
return {'completed': [], 'failed': []}
def save_state(self):
"""Save current orchestration state."""
self.state_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.state_file, 'w') as f:
json.dump(self.results, f, indent=2)
def get_priority_order(self, submodules: list, inventory: dict) -> list:
"""Order submodules by migration priority."""
# Priority rules:
# 1. Smallest repos first (safer to test)
# 2. docs category before code
# 3. labs/archive last
def priority_key(submod):
name = submod['name']
category = submod['category']
# Get file count from inventory
inv_data = inventory.get('submodules', {}).get(name, {})
file_count = inv_data.get('file_count', 0)
# Category weights
category_weights = {
'docs': 1,
'tools': 2,
'gtm': 3,
'ops': 4,
'cloud': 5,
'core': 6,
'products': 7,
'labs': 8
}
cat_weight = category_weights.get(category, 5)
# Archive penalty
if 'archive' in name.lower():
cat_weight += 10
return (cat_weight, file_count, name)
return sorted(submodules, key=priority_key)
def run_migration(self, submod: dict, dry_run: bool = False, auto_yes: bool = False) -> dict:
"""Run migration on a single submodule."""
script_path = self.root_path / 'submodules' / 'core' / 'coditect-core' / 'scripts' / 'lowercase-migration' / 'execute-submodule-migration.py'
cmd = ['python3', str(script_path), submod['full_path']]
if dry_run:
cmd.append('--dry-run')
if auto_yes:
cmd.append('--yes')
# Output report to migration directory
report_dir = self.root_path / 'context-storage' / 'lowercase-migration' / 'reports'
report_dir.mkdir(parents=True, exist_ok=True)
report_path = report_dir / f"{submod['name']}-migration-report.json"
cmd.extend(['--output', str(report_path)])
try:
result = subprocess.run(
cmd,
cwd=self.root_path,
capture_output=True,
text=True,
timeout=600 # 10 minute timeout per submodule
)
success = result.returncode == 0
return {
'name': submod['name'],
'path': submod['path'],
'success': success,
'report_path': str(report_path) if report_path.exists() else None,
'stdout': result.stdout[-2000:] if result.stdout else '',
'stderr': result.stderr[-1000:] if result.stderr else '',
'timestamp': datetime.utcnow().isoformat() + 'Z'
}
except subprocess.TimeoutExpired:
return {
'name': submod['name'],
'path': submod['path'],
'success': False,
'error': 'Timeout after 10 minutes',
'timestamp': datetime.utcnow().isoformat() + 'Z'
}
except Exception as e:
return {
'name': submod['name'],
'path': submod['path'],
'success': False,
'error': str(e),
'timestamp': datetime.utcnow().isoformat() + 'Z'
}
def validate_submodule(self, submod: dict) -> dict:
"""Validate a submodule after migration."""
full_path = Path(submod['full_path'])
# Check for git status
try:
result = subprocess.run(
['git', 'status', '--porcelain'],
cwd=full_path,
capture_output=True,
text=True
)
uncommitted = len(result.stdout.strip().split('\n')) if result.stdout.strip() else 0
except subprocess.CalledProcessError:
uncommitted = -1
# Check for remaining uppercase files
remaining_uppercase = 0
for root, dirs, files in os.walk(full_path):
dirs[:] = [d for d in dirs if d not in {'.git', 'node_modules', '__pycache__'}]
for f in files:
if any(c.isupper() for c in Path(f).stem):
# Exclude preserved uppercase files
if f not in {'README.md', 'LICENSE', 'CLAUDE.md', 'SKILL.md', 'Makefile', 'Dockerfile'}:
remaining_uppercase += 1
return {
'name': submod['name'],
'uncommitted_changes': uncommitted,
'remaining_uppercase': remaining_uppercase,
'validated': uncommitted == 0 and remaining_uppercase < 10 # Allow some exceptions
}
def commit_parent_repo(self, message: str, dry_run: bool = False) -> bool:
"""Commit submodule pointer updates in parent repo."""
if dry_run:
return True
try:
# Stage submodule changes
subprocess.run(
['git', 'add', '-A'],
cwd=self.root_path,
capture_output=True,
check=True
)
# Commit
subprocess.run(
['git', 'commit', '-m', message],
cwd=self.root_path,
capture_output=True,
check=True
)
return True
except subprocess.CalledProcessError:
return False
def run_staged_migration(self, dry_run: bool = False, auto_yes: bool = False,
start_from: str = None, category: str = None,
parallel: int = 1) -> dict:
"""Execute staged migration across all submodules."""
print("=" * 70)
print("LOWERCASE MIGRATION - STAGED ROLLOUT")
print("=" * 70)
print(f"Root path: {self.root_path}")
print(f"Dry run: {dry_run}")
print(f"Parallel: {parallel}")
print()
# Load inventory and submodules
inventory = self.load_inventory()
submodules = self.find_submodules()
previous_state = self.load_state()
print(f"Found {len(submodules)} submodules")
# Filter by category if specified
if category:
submodules = [s for s in submodules if s['category'] == category]
print(f"Filtered to {len(submodules)} submodules in category: {category}")
# Get priority order
ordered_submodules = self.get_priority_order(submodules, inventory)
# Skip already completed
completed_names = set(previous_state.get('completed', []))
if start_from:
# Find start position
start_idx = None
for i, s in enumerate(ordered_submodules):
if s['name'] == start_from:
start_idx = i
break
if start_idx is not None:
ordered_submodules = ordered_submodules[start_idx:]
print(f"Starting from: {start_from}")
else:
# Skip already completed
ordered_submodules = [s for s in ordered_submodules if s['name'] not in completed_names]
print(f"\nSubmodules to process: {len(ordered_submodules)}")
if not ordered_submodules:
print("No submodules to process. Migration complete!")
return self.results
# Show preview
print("\nMigration order (first 20):")
for i, submod in enumerate(ordered_submodules[:20], 1):
inv_data = inventory.get('submodules', {}).get(submod['name'], {})
file_count = inv_data.get('file_count', 0)
print(f" {i}. {submod['name']} ({submod['category']}) - {file_count} files")
if len(ordered_submodules) > 20:
print(f" ... and {len(ordered_submodules) - 20} more")
if not auto_yes and not dry_run:
response = input("\nProceed with migration? [y/N]: ")
if response.lower() != 'y':
print("Aborted.")
return self.results
# Execute migrations
print("\n" + "=" * 70)
print("EXECUTING MIGRATIONS")
print("=" * 70)
if parallel > 1:
# Parallel execution
with ThreadPoolExecutor(max_workers=parallel) as executor:
futures = {
executor.submit(self.run_migration, submod, dry_run, auto_yes): submod
for submod in ordered_submodules
}
for future in as_completed(futures):
submod = futures[future]
result = future.result()
if result['success']:
self.results['completed'].append(result)
print(f"ā {submod['name']}")
else:
self.results['failed'].append(result)
print(f"ā {submod['name']} - {result.get('error', 'Unknown error')}")
self.save_state()
else:
# Sequential execution with validation
for i, submod in enumerate(ordered_submodules, 1):
print(f"\n[{i}/{len(ordered_submodules)}] {submod['name']}")
print("-" * 40)
# Run migration
result = self.run_migration(submod, dry_run, auto_yes)
if result['success']:
# Validate
validation = self.validate_submodule(submod)
if validation['validated']:
self.results['completed'].append(result)
print(f" ā Migration successful, validated")
else:
self.results['completed'].append(result)
print(f" ā Migration complete but validation warnings:")
print(f" - Uncommitted: {validation['uncommitted_changes']}")
print(f" - Remaining uppercase: {validation['remaining_uppercase']}")
else:
self.results['failed'].append(result)
print(f" ā Migration failed: {result.get('error', result.get('stderr', 'Unknown'))[:100]}")
if not auto_yes and not dry_run:
response = input(" Continue with next submodule? [Y/n]: ")
if response.lower() == 'n':
print("Stopping migration.")
break
self.save_state()
# Commit parent repo updates
if not dry_run and self.results['completed']:
print("\nCommitting submodule pointer updates in parent repo...")
commit_msg = f"""chore: Update submodule pointers after lowercase migration
Migrated {len(self.results['completed'])} submodules to lowercase naming. Failed: {len(self.results['failed'])}
Track: AM.2 (Lowercase Migration)
Co-Authored-By: Claude Opus 4.5 noreply@anthropic.com """ if self.commit_parent_repo(commit_msg, dry_run): print("ā Parent repo committed") else: print("ā Parent repo commit failed (may need manual commit)")
# Final summary
self.results['finished_at'] = datetime.utcnow().isoformat() + 'Z'
print("\n" + "=" * 70)
print("MIGRATION SUMMARY")
print("=" * 70)
print(f"Completed: {len(self.results['completed'])}")
print(f"Failed: {len(self.results['failed'])}")
print(f"Skipped: {len(self.results['skipped'])}")
if self.results['failed']:
print("\nFailed submodules:")
for f in self.results['failed']:
print(f" - {f['name']}: {f.get('error', 'Unknown')[:60]}")
# Save final state
self.save_state()
print(f"\nState saved to: {self.state_file}")
return self.results
def main(): import argparse parser = argparse.ArgumentParser( description='Orchestrate full lowercase migration', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Queue-Based Workflow: --next Show and optionally run the next pending submodule --queue-status Show queue statistics only --queue-mark NAME Mark a submodule with a status (use with --status) --status STATUS Status to set (pending, in_progress, completed, failed)
Examples:
Show next pending submodule and prompt to run
%(prog)s --next
Run next pending submodule without prompts
%(prog)s --next --yes
Just show queue status
%(prog)s --queue-status
Mark a submodule as completed (manual override)
%(prog)s --queue-mark coditect-docs-training --status completed """ ) parser.add_argument('--root', '-r', default=None, help='Root repository path') parser.add_argument('--dry-run', '-n', action='store_true', help='Preview without changes') parser.add_argument('--yes', '-y', action='store_true', help='Skip confirmation prompts') parser.add_argument('--start-from', help='Start from specific submodule name') parser.add_argument('--category', '-c', help='Only process submodules in category') parser.add_argument('--parallel', '-p', type=int, default=1, help='Number of parallel migrations') parser.add_argument('--reset-state', action='store_true', help='Reset orchestration state')
# Queue-based arguments
parser.add_argument('--next', action='store_true',
help='Show and optionally run the next pending submodule from queue')
parser.add_argument('--queue-status', action='store_true',
help='Show queue statistics only')
parser.add_argument('--queue-mark', metavar='NAME',
help='Mark a submodule with a status (use with --status)')
parser.add_argument('--status', choices=['pending', 'in_progress', 'completed', 'failed'],
help='Status to set when using --queue-mark')
args = parser.parse_args()
# Find root path
if args.root:
root_path = Path(args.root).resolve()
else:
# Auto-detect from script location
script_path = Path(__file__).resolve()
# Go up from scripts/lowercase-migration/ to coditect-core, then to rollout-master
root_path = script_path.parent.parent.parent.parent.parent
if not (root_path / '.gitmodules').exists():
root_path = root_path.parent
if not (root_path / '.gitmodules').exists():
print(f"Error: Cannot find repository root with .gitmodules")
print(f"Tried: {root_path}")
sys.exit(1)
orchestrator = MigrationOrchestrator(root_path)
# Handle queue-based commands
if args.queue_status:
orchestrator.show_queue_status()
sys.exit(0)
if args.queue_mark:
if not args.status:
print("Error: --queue-mark requires --status")
sys.exit(1)
status_map = {
'pending': 'ā³',
'in_progress': 'š',
'completed': 'ā
',
'failed': 'ā'
}
if orchestrator.load_queue():
if orchestrator.queue.update_status(args.queue_mark, status_map[args.status]):
print(f"ā Marked {args.queue_mark} as {args.status}")
else:
print(f"Error: Submodule '{args.queue_mark}' not found in queue")
sys.exit(1)
else:
print(f"Error: Queue document not found")
sys.exit(1)
sys.exit(0)
if args.next:
result = orchestrator.run_next(dry_run=args.dry_run, auto_yes=args.yes)
if result and not result.get('success'):
sys.exit(1)
sys.exit(0)
# Standard orchestration
if args.reset_state:
if orchestrator.state_file.exists():
orchestrator.state_file.unlink()
print("State reset.")
results = orchestrator.run_staged_migration(
dry_run=args.dry_run,
auto_yes=args.yes,
start_from=args.start_from,
category=args.category,
parallel=args.parallel
)
# Exit with error if any failed
if results['failed']:
sys.exit(1)
if name == 'main': main()