#!/usr/bin/env python3 """ AM.2.3: Execute lowercase migration on a single submodule.
Performs the complete migration workflow for ONE submodule:
- Create inventory of files to rename
- Execute git mv operations (two-step for macOS)
- Update internal references within the submodule
- Commit changes
- Generate report
LARGE REPO SUPPORT (AM.5.1): For repos with 500+ files, use batched mode to avoid index.lock race conditions: --batch-size 50 # Process 50 files at a time --batch-delay 0.5 # Wait 0.5s between batches --batch-commit # Commit after each batch (safest)
Usage: python3 scripts/lowercase-migration/execute-submodule-migration.py submodules/core/coditect-core python3 scripts/lowercase-migration/execute-submodule-migration.py submodules/docs/coditect-docs-training --dry-run python3 scripts/lowercase-migration/execute-submodule-migration.py submodules/gtm/coditect-web-search-tools --yes
# Large repo (1000+ files):
python3 scripts/lowercase-migration/execute-submodule-migration.py submodules/gtm/coditect-gtm-crm --yes --batch-size 50 --batch-delay 0.5
"""
import os import sys import json import re import subprocess import time import shutil from pathlib import Path from datetime import datetime from collections import defaultdict
Configuration
SKIP_DIRS = { '.git', 'node_modules', 'dist', 'build', '.venv', 'venv', 'pycache', '.pytest_cache', '.mypy_cache', 'target', '.next', '.nuxt', 'coverage', '.tox', 'eggs', '*.egg-info' }
SKIP_EXTENSIONS = {'.pyc', '.pyo', '.so', '.dll', '.exe', '.bin'}
Files that should NOT be renamed (conventional uppercase)
PRESERVE_UPPERCASE = { 'README.md', 'LICENSE', 'LICENSE.md', 'CHANGELOG.md', 'CONTRIBUTING.md', 'CODE_OF_CONDUCT.md', 'SECURITY.md', 'AUTHORS', 'NOTICE', 'PATENTS', 'Makefile', 'Dockerfile', 'Jenkinsfile', 'Vagrantfile', 'Procfile', 'CLAUDE.md', 'SKILL.md' # CODITECT conventions }
MAX_RETRIES = 3 RETRY_DELAY = 0.2
Batch processing defaults (for large repos)
DEFAULT_BATCH_SIZE = 0 # 0 = no batching (all at once) DEFAULT_BATCH_DELAY = 0.5 # seconds between batches LARGE_REPO_THRESHOLD = 500 # Auto-suggest batching above this STALE_LOCK_SECONDS = 300 # 5 minutes - consider lock stale after this
def has_uppercase(name: str) -> bool: """Check if filename has uppercase characters (excluding extension).""" stem = Path(name).stem return any(c.isupper() for c in stem)
def to_lowercase_kebab(name: str) -> str: """Convert filename to lowercase-kebab-case.""" stem = Path(name).stem suffix = Path(name).suffix
# Insert hyphens before uppercase letters
result = re.sub(r'([a-z])([A-Z])', r'\1-\2', stem)
result = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1-\2', result)
result = result.lower()
# Replace underscores and spaces with hyphens
result = re.sub(r'[_\s]+', '-', result)
# Remove multiple consecutive hyphens
result = re.sub(r'-+', '-', result)
# Remove leading/trailing hyphens
result = result.strip('-')
return result + suffix.lower()
def get_git_dir(repo_path: Path) -> Path: """Get the actual .git directory (handles submodules with .git files).""" git_path = repo_path / '.git' if git_path.is_file(): # Submodule with gitdir pointer content = git_path.read_text().strip() if content.startswith('gitdir:'): gitdir = content.split(':', 1)[1].strip() return (repo_path / gitdir).resolve() return git_path
def remove_git_lock(repo_path: Path, force_stale: bool = False): """Remove git index.lock if it exists and is stale.
Args:
repo_path: Path to the git repository
force_stale: If True, remove lock only if older than STALE_LOCK_SECONDS
If False, always remove lock
"""
git_dir = get_git_dir(repo_path)
lock_file = git_dir / 'index.lock'
if lock_file.exists():
try:
if force_stale:
# Only remove if lock is stale (older than threshold)
lock_age = time.time() - lock_file.stat().st_mtime
if lock_age > STALE_LOCK_SECONDS:
print(f" ⚠ Removing stale index.lock (age: {lock_age:.0f}s)")
lock_file.unlink()
return True
return False
else:
lock_file.unlink()
return True
except OSError as e:
print(f" ⚠ Could not remove lock: {e}")
return False
return False
def check_and_clear_stale_lock(repo_path: Path) -> bool: """Check for stale index.lock and remove if found.""" git_dir = get_git_dir(repo_path) lock_file = git_dir / 'index.lock'
if lock_file.exists():
lock_age = time.time() - lock_file.stat().st_mtime
if lock_age > STALE_LOCK_SECONDS:
print(f"\n⚠ Stale index.lock detected (age: {lock_age:.0f}s > {STALE_LOCK_SECONDS}s)")
print(f" Removing: {lock_file}")
try:
lock_file.unlink()
return True
except OSError as e:
print(f" ✗ Failed to remove: {e}")
return False
else:
print(f"\n⚠ Active index.lock detected (age: {lock_age:.0f}s)")
print(" Another git process may be running. Wait or use --force-unlock")
return False
return True
def git_mv_with_retry(repo_path: Path, old_path: str, new_path: str, dry_run: bool = False) -> dict: """Execute git mv with retry logic and two-step rename for macOS.""" result = { 'old_path': old_path, 'new_path': new_path, 'success': False, 'method': None, 'error': None }
if dry_run:
result['success'] = True
result['method'] = 'dry_run'
return result
old_full = repo_path / old_path
new_full = repo_path / new_path
# Ensure parent directory exists
new_full.parent.mkdir(parents=True, exist_ok=True)
for attempt in range(MAX_RETRIES):
remove_git_lock(repo_path)
time.sleep(RETRY_DELAY)
# Check if only case is changing (macOS issue)
if old_path.lower() == new_path.lower():
# Two-step rename via temp file
temp_name = f"_temp_{Path(old_path).name}_{int(time.time())}"
temp_path = str(Path(old_path).parent / temp_name)
try:
# Step 1: old → temp
subprocess.run(
['git', 'mv', old_path, temp_path],
cwd=repo_path,
capture_output=True,
text=True,
check=True
)
remove_git_lock(repo_path)
time.sleep(RETRY_DELAY)
# Step 2: temp → new
subprocess.run(
['git', 'mv', temp_path, new_path],
cwd=repo_path,
capture_output=True,
text=True,
check=True
)
result['success'] = True
result['method'] = 'two_step'
return result
except subprocess.CalledProcessError as e:
result['error'] = f"Two-step failed: {e.stderr}"
# Clean up temp file if it exists
temp_full = repo_path / temp_path
if temp_full.exists():
try:
subprocess.run(['git', 'mv', temp_path, old_path],
cwd=repo_path, capture_output=True)
except subprocess.CalledProcessError:
pass
else:
# Direct rename
try:
subprocess.run(
['git', 'mv', old_path, new_path],
cwd=repo_path,
capture_output=True,
text=True,
check=True
)
result['success'] = True
result['method'] = 'direct'
return result
except subprocess.CalledProcessError as e:
result['error'] = f"Direct mv failed: {e.stderr}"
return result
def scan_submodule(submod_path: Path) -> dict: """Scan submodule for files needing rename.""" files_to_rename = [] dirs_to_rename = []
for root, dirs, files in os.walk(submod_path):
# Skip directories
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith('.')]
rel_root = Path(root).relative_to(submod_path)
# Check directories
for d in dirs:
if has_uppercase(d) and d not in PRESERVE_UPPERCASE:
rel_path = str(rel_root / d) if str(rel_root) != '.' else d
dirs_to_rename.append({
'path': rel_path,
'name': d,
'new_name': to_lowercase_kebab(d),
'depth': len(Path(rel_path).parts)
})
# Check files
for f in files:
if Path(f).suffix.lower() in SKIP_EXTENSIONS:
continue
if f in PRESERVE_UPPERCASE:
continue
if has_uppercase(f):
rel_path = str(rel_root / f) if str(rel_root) != '.' else f
files_to_rename.append({
'path': rel_path,
'name': f,
'new_name': to_lowercase_kebab(f)
})
# Sort directories by depth (deepest first)
dirs_to_rename.sort(key=lambda x: x['depth'], reverse=True)
return {
'files': files_to_rename,
'directories': dirs_to_rename
}
def update_references(submod_path: Path, renames: list, dry_run: bool = False) -> dict: """Update internal references after renames.""" updates = defaultdict(list)
# Build old→new mapping
rename_map = {r['old_path']: r['new_path'] for r in renames if r['success']}
if not rename_map:
return dict(updates)
# Find all text files that might have references
text_extensions = {'.md', '.txt', '.json', '.yaml', '.yml', '.py', '.js', '.ts', '.html', '.css'}
for root, dirs, files in os.walk(submod_path):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS and not d.startswith('.')]
for f in files:
if Path(f).suffix.lower() not in text_extensions:
continue
file_path = Path(root) / f
try:
content = file_path.read_text(encoding='utf-8')
new_content = content
for old_path, new_path in rename_map.items():
# Update various reference patterns
old_name = Path(old_path).name
new_name = Path(new_path).name
# Direct filename references
if old_name in new_content:
new_content = new_content.replace(old_name, new_name)
updates[str(file_path.relative_to(submod_path))].append(
f"{old_name} → {new_name}"
)
# Path references
if old_path in new_content:
new_content = new_content.replace(old_path, new_path)
if new_content != content and not dry_run:
file_path.write_text(new_content, encoding='utf-8')
except (UnicodeDecodeError, IOError):
continue
return dict(updates)
def commit_changes(submod_path: Path, stats: dict, dry_run: bool = False) -> bool: """Commit the migration changes.""" if dry_run: return True
try:
# Stage all changes
subprocess.run(
['git', 'add', '-A'],
cwd=submod_path,
capture_output=True,
check=True
)
# Create commit message
message = f"""chore: Lowercase naming migration
Renamed {stats['files_renamed']} files and {stats['dirs_renamed']} directories to lowercase-kebab-case per CODITECT naming standard.
Migration executed by: execute-submodule-migration.py Track: AM.2 (Lowercase Migration)
Co-Authored-By: Claude Opus 4.5 noreply@anthropic.com """
subprocess.run(
['git', 'commit', '-m', message],
cwd=submod_path,
capture_output=True,
check=True
)
return True
except subprocess.CalledProcessError as e:
print(f"Commit failed: {e.stderr}")
return False
def main(): import argparse parser = argparse.ArgumentParser( description='Execute lowercase migration on a single submodule', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Large Repo Examples:
Auto-batching for repos with 500+ files
%(prog)s submodules/gtm/coditect-gtm-crm --yes --batch-size 50
Extra safe: commit after each batch
%(prog)s submodules/gtm/coditect-gtm-crm --yes --batch-size 50 --batch-commit
Force remove stale index.lock before starting
%(prog)s submodules/cloud/coditect-cloud-ide --yes --force-unlock """ ) parser.add_argument('submodule_path', help='Path to the submodule') parser.add_argument('--dry-run', '-n', action='store_true', help='Preview without making changes') parser.add_argument('--yes', '-y', action='store_true', help='Skip confirmation prompts') parser.add_argument('--no-commit', action='store_true', help='Skip git commit') parser.add_argument('--output', '-o', help='Output report path')
# Batch processing options (AM.5.1)
parser.add_argument('--batch-size', '-b', type=int, default=DEFAULT_BATCH_SIZE,
help=f'Process files in batches of N (default: {DEFAULT_BATCH_SIZE}, 0=no batching)')
parser.add_argument('--batch-delay', type=float, default=DEFAULT_BATCH_DELAY,
help=f'Delay in seconds between batches (default: {DEFAULT_BATCH_DELAY})')
parser.add_argument('--batch-commit', action='store_true',
help='Commit after each batch (safest for large repos)')
parser.add_argument('--force-unlock', action='store_true',
help='Force remove stale index.lock before starting')
args = parser.parse_args()
submod_path = Path(args.submodule_path).resolve()
if not submod_path.exists():
print(f"Error: Submodule path does not exist: {submod_path}")
sys.exit(1)
if not (submod_path / '.git').exists():
print(f"Error: Not a git repository: {submod_path}")
sys.exit(1)
print("=" * 70)
print("LOWERCASE MIGRATION - SINGLE SUBMODULE")
print("=" * 70)
print(f"Submodule: {submod_path}")
print(f"Dry run: {args.dry_run}")
if args.batch_size > 0:
print(f"Batch mode: {args.batch_size} files/batch, {args.batch_delay}s delay")
if args.batch_commit:
print(f"Batch commit: enabled (commit after each batch)")
print()
# Check for stale lock before starting
if args.force_unlock:
print("Checking for stale index.lock...")
remove_git_lock(submod_path, force_stale=False) # Force remove any lock
else:
if not check_and_clear_stale_lock(submod_path):
print("\n✗ Cannot proceed with active git lock. Use --force-unlock to override.")
sys.exit(1)
# Scan for files to rename
print("Scanning for uppercase files...")
inventory = scan_submodule(submod_path)
file_count = len(inventory['files'])
dir_count = len(inventory['directories'])
print(f"Found: {file_count} files, {dir_count} directories to rename")
print()
if file_count == 0 and dir_count == 0:
print("No files or directories need renaming. Done!")
return
# Auto-enable batching for large repos
total_items = file_count + dir_count
batch_size = args.batch_size
if batch_size == 0 and total_items >= LARGE_REPO_THRESHOLD:
# Auto-suggest batching
if total_items >= 2000:
suggested_batch = 100
elif total_items >= 1000:
suggested_batch = 150
else:
suggested_batch = 200
print(f"\n⚠ LARGE REPO DETECTED: {total_items} items")
print(f" Recommended: Use --batch-size {suggested_batch} to avoid index.lock issues")
if not args.yes:
response = input(f" Enable batching with size {suggested_batch}? [Y/n]: ")
if response.lower() != 'n':
batch_size = suggested_batch
print(f" → Batching enabled: {batch_size} items per batch")
else:
# Auto-enable for --yes mode with large repos
batch_size = suggested_batch
print(f" → Auto-enabling batching: {batch_size} items per batch")
# Show preview
if not args.yes:
print("Files to rename (first 20):")
for item in inventory['files'][:20]:
print(f" {item['name']} → {item['new_name']}")
if file_count > 20:
print(f" ... and {file_count - 20} more")
print()
if not args.dry_run:
response = input("Proceed with migration? [y/N]: ")
if response.lower() != 'y':
print("Aborted.")
return
# Execute renames (with optional batching)
print("\nExecuting renames...")
results = []
success_count = 0
fail_count = 0
batch_num = 0
# Combine files and directories for batching
all_items = []
for item in inventory['files']:
old_path = item['path']
new_dir = str(Path(old_path).parent)
new_path = str(Path(new_dir) / item['new_name']) if new_dir != '.' else item['new_name']
all_items.append({
'type': 'file',
'item': item,
'old_path': old_path,
'new_path': new_path
})
for item in inventory['directories']:
old_path = item['path']
new_dir = str(Path(old_path).parent)
new_path = str(Path(new_dir) / item['new_name']) if new_dir != '.' else item['new_name']
all_items.append({
'type': 'dir',
'item': item,
'old_path': old_path,
'new_path': new_path
})
total_items = len(all_items)
# Process in batches if batch_size is set
if batch_size > 0:
num_batches = (total_items + batch_size - 1) // batch_size
print(f" Processing {total_items} items in {num_batches} batches of {batch_size}")
for i, entry in enumerate(all_items, 1):
item = entry['item']
old_path = entry['old_path']
new_path = entry['new_path']
item_type = entry['type']
result = git_mv_with_retry(submod_path, old_path, new_path, args.dry_run)
results.append(result)
type_label = 'dir' if item_type == 'dir' else ''
if result['success']:
success_count += 1
print(f" [{i}/{total_items}]{type_label} ✓ {item['name']} → {item['new_name']}")
else:
fail_count += 1
print(f" [{i}/{total_items}]{type_label} ✗ {item['name']} - {result['error']}")
# Batch processing: delay and optional commit after each batch
if batch_size > 0 and i % batch_size == 0 and i < total_items:
batch_num += 1
print(f"\n --- Batch {batch_num}/{num_batches} complete ({i}/{total_items} items) ---")
# Clear any lock before continuing
remove_git_lock(submod_path)
# Optional: commit after each batch for safety
if args.batch_commit and not args.dry_run:
batch_stats = {
'files_renamed': success_count,
'dirs_renamed': len([r for r in results if r.get('type') == 'dir' and r['success']]),
'failed': fail_count,
'references_updated': 0
}
print(f" Committing batch {batch_num}...")
if commit_changes(submod_path, batch_stats, args.dry_run):
print(f" ✓ Batch {batch_num} committed")
else:
print(f" ⚠ Batch {batch_num} commit failed, continuing...")
# Delay between batches
print(f" Waiting {args.batch_delay}s before next batch...")
time.sleep(args.batch_delay)
# Clear lock again after delay
remove_git_lock(submod_path)
print()
# Update internal references
print("\nUpdating internal references...")
ref_updates = update_references(submod_path, results, args.dry_run)
ref_count = sum(len(v) for v in ref_updates.values())
print(f"Updated {ref_count} references in {len(ref_updates)} files")
# Commit changes
# Count directories vs files from results
dir_success = sum(1 for i, r in enumerate(results) if r['success'] and i >= file_count)
file_success = success_count - dir_success
stats = {
'files_renamed': file_success,
'dirs_renamed': dir_success,
'failed': fail_count,
'references_updated': ref_count,
'batch_size': batch_size if batch_size > 0 else None,
'batches_processed': batch_num + 1 if batch_size > 0 else 1
}
if not args.no_commit and not args.dry_run:
print("\nCommitting changes...")
if commit_changes(submod_path, stats, args.dry_run):
print("✓ Changes committed")
else:
print("✗ Commit failed")
# Generate report
report = {
'submodule': str(submod_path),
'timestamp': datetime.utcnow().isoformat() + 'Z',
'dry_run': args.dry_run,
'stats': stats,
'results': results,
'reference_updates': ref_updates
}
output_path = args.output or submod_path / 'migration-report.json'
with open(output_path, 'w') as f:
json.dump(report, f, indent=2)
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"Files renamed: {stats['files_renamed']}")
print(f"Directories: {stats['dirs_renamed']}")
print(f"Failed: {stats['failed']}")
print(f"References updated:{stats['references_updated']}")
if stats.get('batch_size'):
print(f"Batch size: {stats['batch_size']}")
print(f"Batches processed: {stats['batches_processed']}")
print(f"Report saved to: {output_path}")
return report
if name == 'main': main()