Skip to main content

scripts-git-workflow

#!/usr/bin/env python3 """​

title: "Color codes for terminal output" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Git Workflow Orchestrator - CLI Interface Bottom-up git synchronization from submodules to ma..." keywords: ['analysis', 'backend', 'git', 'review', 'workflow'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "git-workflow.py" language: python executable: true usage: "python3 scripts/git-workflow.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false​

Git Workflow Orchestrator - CLI Interface Bottom-up git synchronization from submodules to master repository.

Usage: python3 git-workflow.py --target all --mode full python3 git-workflow.py --target submodules/core/coditect-core --dry-run python3 git-workflow.py --analyze

Author: CODITECT Production Operations Team Version: 1.0.0 Date: 2025-11-28 """

import argparse import json import os import subprocess import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple

Color codes for terminal output

Shared Colors module (consolidates 36 duplicate definitions)

_script_dir = Path(file).parent sys.path.insert(0, str(_script_dir / "core")) from colors import Colors

def print_colored(message: str, color: str) -> None: """Print colored message to terminal.""" print(f"{color}{message}{Colors.NC}")

def run_command(cmd: List[str], cwd: Optional[str] = None, capture_output: bool = True) -> Tuple[int, str, str]: """Run shell command and return exit code, stdout, stderr.""" try: result = subprocess.run( cmd, cwd=cwd, capture_output=capture_output, text=True, timeout=60 ) return result.returncode, result.stdout, result.stderr except subprocess.TimeoutExpired: return 1, "", "Command timed out" except Exception as e: return 1, "", str(e)

def get_git_root() -> Optional[Path]: """Get git repository root directory.""" exit_code, stdout, _ = run_command(['git', 'rev-parse', '--show-toplevel']) if exit_code == 0: return Path(stdout.strip()) return None

def enumerate_submodules(repo_root: Path) -> List[Dict[str, str]]: """Enumerate all git submodules in repository.""" submodules = []

gitmodules_path = repo_root / '.gitmodules'
if not gitmodules_path.exists():
return submodules

exit_code, stdout, _ = run_command(['git', 'submodule', 'status'], cwd=str(repo_root))
if exit_code != 0:
return submodules

for line in stdout.strip().split('\n'):
if not line.strip():
continue

parts = line.strip().split()
if len(parts) >= 2:
commit_hash = parts[0].lstrip('-+')
path = parts[1]

submodules.append({
'path': path,
'hash': commit_hash,
'status': 'detached' if line.startswith('-') else 'clean'
})

return submodules

def check_submodule_status(repo_root: Path, submodule_path: str) -> Dict[str, any]: """Check git status for a submodule.""" full_path = repo_root / submodule_path

# Check if directory exists
if not full_path.exists():
return {'error': 'Directory not found', 'requires_sync': False}

# Get current branch
exit_code, stdout, _ = run_command(['git', 'branch', '--show-current'], cwd=str(full_path))
branch = stdout.strip() if exit_code == 0 else 'unknown'

# Get git status
exit_code, stdout, _ = run_command(['git', 'status', '--porcelain'], cwd=str(full_path))
if exit_code != 0:
return {'error': 'Git status failed', 'requires_sync': False}

changes = []
for line in stdout.strip().split('\n'):
if not line.strip():
continue
status = line[:2]
file = line[3:]
changes.append({'status': status.strip(), 'file': file})

# Check for conflicts
exit_code, stdout, _ = run_command(['git', 'status'], cwd=str(full_path))
has_conflicts = 'both modified' in stdout or 'both added' in stdout

# Check commits ahead/behind
exit_code, stdout, _ = run_command(
['git', 'rev-list', '--left-right', '--count', f'origin/{branch}...HEAD'],
cwd=str(full_path)
)
ahead_behind = stdout.strip().split('\t') if exit_code == 0 else ['0', '0']

return {
'branch': branch,
'status': 'modified' if changes else 'clean',
'changes': changes,
'conflicts': has_conflicts,
'commits_ahead': int(ahead_behind[1]) if len(ahead_behind) > 1 else 0,
'commits_behind': int(ahead_behind[0]) if len(ahead_behind) > 0 else 0,
'requires_sync': len(changes) > 0 or int(ahead_behind[1] if len(ahead_behind) > 1 else 0) > 0
}

def analyze_repository(repo_root: Path) -> Dict[str, any]: """Analyze entire repository and identify sync requirements.""" print_colored("šŸ” Analyzing repository...", Colors.BLUE)

submodules = enumerate_submodules(repo_root)
print_colored(f"Found {len(submodules)} submodules", Colors.BLUE)

analysis = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'total_submodules': len(submodules),
'submodules_with_changes': 0,
'submodules': [],
'priority_groups': {
'P0_critical': [],
'P1_features': [],
'P2_documentation': []
}
}

for submodule in submodules:
path = submodule['path']
print(f" Checking {path}...")

status = check_submodule_status(repo_root, path)

submodule_info = {
'path': path,
**status
}

analysis['submodules'].append(submodule_info)

if status.get('requires_sync'):
analysis['submodules_with_changes'] += 1

# Categorize by priority
if 'core' in path or 'framework' in path:
analysis['priority_groups']['P0_critical'].append(path)
elif 'cloud' in path or 'backend' in path:
analysis['priority_groups']['P1_features'].append(path)
else:
analysis['priority_groups']['P2_documentation'].append(path)

return analysis

def generate_conventional_commit_message(submodule_path: str, changes: List[Dict]) -> str: """Generate conventional commit message based on changes.""" # Determine commit type commit_type = 'chore' if any('.md' in c['file'] for c in changes): commit_type = 'docs' elif any('.rs' in c['file'] or '.py' in c['file'] or '.ts' in c['file'] for c in changes): if any(c['status'] == 'A' for c in changes): commit_type = 'feat' else: commit_type = 'refactor'

# Extract scope from path
path_parts = submodule_path.split('/')
scope = path_parts[-1].replace('coditect-', '')

# Count changes
added = len([c for c in changes if 'A' in c['status']])
modified = len([c for c in changes if 'M' in c['status']])
deleted = len([c for c in changes if 'D' in c['status']])

# Generate subject
subject = f"update {scope}"
if added > 0:
subject = f"add {added} files to {scope}"
elif deleted > 0:
subject = f"remove {deleted} files from {scope}"

# Generate body
body_parts = []
if added > 0:
body_parts.append(f"- Added {added} new files")
if modified > 0:
body_parts.append(f"- Modified {modified} existing files")
if deleted > 0:
body_parts.append(f"- Deleted {deleted} files")

body = '\n'.join(body_parts)

# Full message
message = f"""{commit_type}({scope}): {subject}

{body}

šŸ¤– Generated with Claude Code

Co-Authored-By: Claude noreply@anthropic.com"""

return message

def sync_submodule(repo_root: Path, submodule_path: str, dry_run: bool = False) -> Dict[str, any]: """Synchronize a single submodule.""" full_path = repo_root / submodule_path

print_colored(f"\nšŸ“¦ Processing {submodule_path}...", Colors.BLUE)

# Check status
status = check_submodule_status(repo_root, submodule_path)
if not status.get('requires_sync'):
print_colored(" No changes, skipping", Colors.YELLOW)
return {'status': 'skipped', 'reason': 'no_changes'}

# Check for conflicts
if status.get('conflicts'):
print_colored(" āŒ Merge conflicts detected! Manual resolution required.", Colors.RED)
return {'status': 'failed', 'reason': 'conflicts'}

# Generate commit message
commit_message = generate_conventional_commit_message(submodule_path, status['changes'])

if dry_run:
print_colored(f" [DRY RUN] Would commit with message:", Colors.YELLOW)
print(f" {commit_message.split(chr(10))[0]}")
return {'status': 'dry_run', 'message': commit_message}

# Add changes
exit_code, _, stderr = run_command(['git', 'add', '.'], cwd=str(full_path))
if exit_code != 0:
print_colored(f" āŒ Failed to stage changes: {stderr}", Colors.RED)
return {'status': 'failed', 'reason': 'staging_failed'}

# Commit
exit_code, stdout, stderr = run_command(
['git', 'commit', '-m', commit_message],
cwd=str(full_path)
)
if exit_code != 0:
print_colored(f" āŒ Failed to commit: {stderr}", Colors.RED)
return {'status': 'failed', 'reason': 'commit_failed'}

# Extract commit hash
exit_code, commit_hash, _ = run_command(
['git', 'rev-parse', 'HEAD'],
cwd=str(full_path)
)
commit_hash = commit_hash.strip()[:7]

# Push
exit_code, _, stderr = run_command(['git', 'push'], cwd=str(full_path))
if exit_code != 0:
print_colored(f" āš ļø Push failed, trying rebase: {stderr}", Colors.YELLOW)
# Try pull rebase and push again
run_command(['git', 'pull', '--rebase'], cwd=str(full_path))
exit_code, _, stderr = run_command(['git', 'push'], cwd=str(full_path))
if exit_code != 0:
print_colored(f" āŒ Push failed after rebase: {stderr}", Colors.RED)
return {'status': 'failed', 'reason': 'push_failed'}

print_colored(f" āœ… Complete (commit: {commit_hash})", Colors.GREEN)

return {
'status': 'success',
'commit_hash': commit_hash,
'commit_message_first_line': commit_message.split('\n')[0],
'files_changed': len(status['changes'])
}

def update_master_repository(repo_root: Path, synchronized_submodules: List[str], dry_run: bool = False) -> Dict[str, any]: """Update master repository with new submodule pointers.""" print_colored("\nšŸ  Updating master repository...", Colors.BLUE)

if dry_run:
print_colored(" [DRY RUN] Would update submodule pointers and push", Colors.YELLOW)
return {'status': 'dry_run'}

# Stage submodule updates
for submodule_path in synchronized_submodules:
run_command(['git', 'add', submodule_path], cwd=str(repo_root))

# Generate master commit message
message = f"""chore: Update {len(synchronized_submodules)} submodules to latest

Synchronized submodules: {chr(10).join([f'- {s}' for s in synchronized_submodules])}

šŸ¤– Generated with Claude Code

Co-Authored-By: Claude noreply@anthropic.com"""

# Commit
exit_code, _, stderr = run_command(['git', 'commit', '-m', message], cwd=str(repo_root))
if exit_code != 0:
print_colored(f" āŒ Failed to commit master: {stderr}", Colors.RED)
return {'status': 'failed', 'reason': 'commit_failed'}

# Push
exit_code, _, stderr = run_command(['git', 'push'], cwd=str(repo_root))
if exit_code != 0:
print_colored(f" āŒ Failed to push master: {stderr}", Colors.RED)
return {'status': 'failed', 'reason': 'push_failed'}

print_colored(" āœ… Master repository updated", Colors.GREEN)

return {'status': 'success', 'submodules_updated': len(synchronized_submodules)}

def main(): parser = argparse.ArgumentParser( description='Git Workflow Orchestrator - Bottom-up git synchronization' ) parser.add_argument('--target', default='all', help='Target scope: all, specific submodule path, or submodules') parser.add_argument('--mode', default='full', choices=['analyze', 'full'], help='Operation mode: analyze (check only) or full (execute sync)') parser.add_argument('--dry-run', action='store_true', help='Preview changes without executing') parser.add_argument('--priority', default='all', help='Priority filter: P0, P1, P2, or all') parser.add_argument('--version', action='version', version='%(prog)s 1.0.0')

args = parser.parse_args()

# Get repository root
repo_root = get_git_root()
if not repo_root:
print_colored("āŒ Not in a git repository", Colors.RED)
sys.exit(1)

print_colored(f"Git Workflow Orchestrator v1.0.0", Colors.BLUE)
print_colored(f"Repository: {repo_root}", Colors.BLUE)
print()

# Phase 1: Analysis
analysis = analyze_repository(repo_root)

if args.mode == 'analyze':
# Just print analysis and exit
print_colored("\nšŸ“Š Analysis Results:", Colors.BLUE)
print(f" Total submodules: {analysis['total_submodules']}")
print(f" Submodules with changes: {analysis['submodules_with_changes']}")
print(f" P0 (Critical): {len(analysis['priority_groups']['P0_critical'])}")
print(f" P1 (Features): {len(analysis['priority_groups']['P1_features'])}")
print(f" P2 (Documentation): {len(analysis['priority_groups']['P2_documentation'])}")

# Save analysis
report_path = repo_root / 'reports' / f"git-analysis-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
report_path.parent.mkdir(exist_ok=True)
with open(report_path, 'w') as f:
json.dump(analysis, f, indent=2)
print_colored(f"\nšŸ’¾ Analysis saved to: {report_path}", Colors.GREEN)

sys.exit(0)

# Phase 2-4: Execute synchronization
synchronized = []

for submodule in analysis['submodules']:
if not submodule.get('requires_sync'):
continue

# Filter by priority if specified
if args.priority != 'all':
priority_group = None
if submodule['path'] in analysis['priority_groups']['P0_critical']:
priority_group = 'P0'
elif submodule['path'] in analysis['priority_groups']['P1_features']:
priority_group = 'P1'
elif submodule['path'] in analysis['priority_groups']['P2_documentation']:
priority_group = 'P2'

if args.priority not in ['all', priority_group]:
continue

result = sync_submodule(repo_root, submodule['path'], dry_run=args.dry_run)
if result['status'] == 'success':
synchronized.append(submodule['path'])

# Phase 5: Update master
if synchronized and not args.dry_run:
update_master_repository(repo_root, synchronized, dry_run=args.dry_run)

# Summary
print_colored("\n" + "="*60, Colors.BLUE)
print_colored("Summary:", Colors.BLUE)
print(f" Synchronized: {len(synchronized)} submodules")
print(f" Mode: {'DRY RUN' if args.dry_run else 'FULL EXECUTION'}")
print_colored("="*60, Colors.BLUE)

if name == 'main': main()