#!/usr/bin/env python3 """ Git Push Sync - Push and Sync Protected Installation
FOR CODITECT CONTRIBUTORS ONLY. Customers do not update coditect-core directly.
Wrapper for git push that automatically syncs the protected installation after a successful push to GitHub using git pull (non-destructive).
Usage: python3 scripts/git-push-sync.py # Push and sync python3 scripts/git-push-sync.py -m "feat: ..." # With commit message (if uncommitted) python3 scripts/git-push-sync.py --dry-run # Preview only python3 scripts/git-push-sync.py --no-sync # Push only, skip protected sync python3 scripts/git-push-sync.py --no-parent # Skip parent repo update python3 scripts/git-push-sync.py --pull-only # Only pull to protected (no push)
Part of ADR-113: Post-Push Protected Installation Sync Updated: 2026-01-25 - Changed from clone+swap to git pull approach
Created: 2026-01-25 """
import argparse import json import os import subprocess import sys from datetime import datetime from pathlib import Path from typing import Optional, Tuple
ANSI colors
Shared Colors module (consolidates 36 duplicate definitions)
_script_dir = Path(file).parent sys.path.insert(0, str(_script_dir / "core")) from colors import Colors
def colorize(text: str, color: str) -> str: return f"{color}{text}{Colors.END}"
Paths
HOME = Path.home() PARENT_REPO = HOME / "PROJECTS" / "coditect-rollout-master" SUBMODULE = PARENT_REPO / "submodules" / "core" / "coditect-core"
Platform-specific paths (ADR-114: User Data Separation)
if sys.platform == "darwin": CODITECT_BASE = HOME / "Library" / "Application Support" / "CODITECT" elif sys.platform == "win32": CODITECT_BASE = Path(os.environ.get("LOCALAPPDATA", HOME / "AppData" / "Local")) / "CODITECT" else: CODITECT_BASE = Path(os.environ.get("XDG_DATA_HOME", HOME / ".local" / "share")) / "coditect"
Framework location (synced from GitHub)
FRAMEWORK_LOC = CODITECT_BASE / "core"
User data location (NOT synced - separate per ADR-114)
USER_DATA_LOC = CODITECT_BASE / "data"
Legacy path for backward compatibility
PROTECTED_LOC = FRAMEWORK_LOC
REPO_URL = "https://github.com/coditect-ai/coditect-core.git"
User data paths (in data/ directory per ADR-114)
MACHINE_ID_FILE = USER_DATA_LOC / "machine-id.json" SESSION_LOGS_DIR = USER_DATA_LOC / "session-logs"
Fallback to old location if not migrated yet
if not MACHINE_ID_FILE.exists() and (HOME / ".coditect" / "machine-id.json").exists(): MACHINE_ID_FILE = HOME / ".coditect" / "machine-id.json" if not SESSION_LOGS_DIR.exists() and (HOME / ".coditect" / "session-logs").exists(): SESSION_LOGS_DIR = HOME / ".coditect" / "session-logs"
def run_cmd(cmd: list, cwd: Optional[Path] = None, capture: bool = True) -> Tuple[int, str, str]: """Run a command and return (returncode, stdout, stderr).""" try: result = subprocess.run( cmd, cwd=cwd, capture_output=capture, text=True ) return result.returncode, result.stdout, result.stderr except Exception as e: return 1, "", str(e)
def get_machine_id() -> Optional[str]: """Get the machine hostname for session logging.""" if MACHINE_ID_FILE.exists(): try: with open(MACHINE_ID_FILE) as f: data = json.load(f) return data.get("hostname", "unknown") except (json.JSONDecodeError, IOError): pass return "unknown"
def detect_working_directory() -> Path: """Detect the correct coditect-core directory.""" cwd = Path.cwd()
# Check if we're in the submodule
if (cwd / ".git").exists() and "coditect-core" in str(cwd):
return cwd
# Check if we're in the parent repo
if (cwd / "submodules" / "core" / "coditect-core").exists():
return cwd / "submodules" / "core" / "coditect-core"
# Default to SUBMODULE constant
if SUBMODULE.exists():
return SUBMODULE
print(colorize("⚠ Cannot detect coditect-core directory", Colors.RED))
sys.exit(1)
def check_for_uncommitted_changes(repo: Path) -> Tuple[bool, str]: """Check if there are uncommitted changes (tracked files only, not untracked).""" rc, stdout, _ = run_cmd(["git", "status", "--porcelain"], repo) if not stdout.strip(): return False, ""
# Filter out untracked files (lines starting with ??)
lines = stdout.strip().split('\n')
tracked_changes = [l for l in lines if not l.startswith('??')]
if tracked_changes:
return True, '\n'.join(tracked_changes)
return False, ""
def commit_changes(repo: Path, message: str) -> bool: """Stage and commit all changes.""" # Stage all rc, _, stderr = run_cmd(["git", "add", "-A"], repo) if rc != 0: print(colorize(f"⚠ Failed to stage: {stderr}", Colors.RED)) return False
# Commit with co-author
full_message = f"{message}\n\nCo-Authored-By: Claude <noreply@anthropic.com>"
rc, _, stderr = run_cmd(["git", "commit", "-m", full_message], repo)
if rc != 0:
print(colorize(f"⚠ Failed to commit: {stderr}", Colors.RED))
return False
print(colorize(f"✓ Committed: {message}", Colors.GREEN))
return True
def git_push(repo: Path, dry_run: bool = False) -> bool: """Execute git push.""" print(colorize("\n=== Step 1: Git Push ===", Colors.BOLD))
# Check we're on main
rc, stdout, _ = run_cmd(["git", "branch", "--show-current"], repo)
branch = stdout.strip()
if branch != "main":
print(colorize(f"⚠ Not on main branch. Current: {branch}", Colors.YELLOW))
return False
# Check for unpushed commits
rc, stdout, _ = run_cmd(["git", "log", "origin/main..HEAD", "--oneline"], repo)
if not stdout.strip():
print(" No commits to push.")
return True
print(" Commits to push:")
for line in stdout.strip().split('\n'):
print(f" {line}")
if dry_run:
print(colorize(" [DRY RUN] Would push to origin/main", Colors.YELLOW))
return True
# Push
print(" Pushing to origin/main...")
rc, stdout, stderr = run_cmd(["git", "push", "origin", "main"], repo, capture=False)
if rc != 0:
print(colorize("⚠ Push failed", Colors.RED))
return False
print(colorize("✓ Pushed to origin/main", Colors.GREEN))
return True
def update_session_log(action: str, dry_run: bool = False) -> bool: """Update session log with sync action.""" print(colorize("\n=== Step 2: Update Session Log ===", Colors.BOLD))
hostname = get_machine_id()
today = datetime.now().strftime("%Y-%m-%d")
log_file = SESSION_LOGS_DIR / f"SESSION-LOG-{today}.md"
if not SESSION_LOGS_DIR.exists():
print(colorize(f"⚠ Session logs directory not found: {SESSION_LOGS_DIR}", Colors.YELLOW))
return True # Non-fatal
timestamp = datetime.now().strftime("%H:%M:%S")
entry = f"\n## {timestamp} - Git Push Sync\n\n- **Action:** {action}\n- **Host:** {hostname}\n- **Protected Updated:** Yes\n\n"
if dry_run:
print(colorize(f" [DRY RUN] Would append to: {log_file}", Colors.YELLOW))
print(f" Entry: {action}")
return True
# Append to session log
try:
with open(log_file, "a") as f:
f.write(entry)
print(colorize(f"✓ Session log updated: {log_file.name}", Colors.GREEN))
return True
except IOError as e:
print(colorize(f"⚠ Failed to update session log: {e}", Colors.YELLOW))
return True # Non-fatal
def ensure_protected_is_git_repo() -> bool: """Ensure protected installation is a git repo. Clone if needed.""" git_dir = PROTECTED_LOC / ".git"
if git_dir.exists():
return True
print(colorize(" Protected installation is not a git repo. Initializing...", Colors.YELLOW))
if PROTECTED_LOC.exists():
# Has files but no .git - need to init and set remote
print(" → Initializing git in existing directory...")
rc, _, stderr = run_cmd(["git", "init"], PROTECTED_LOC)
if rc != 0:
print(colorize(f"⚠ Git init failed: {stderr}", Colors.RED))
return False
rc, _, stderr = run_cmd(["git", "remote", "add", "origin", REPO_URL], PROTECTED_LOC)
if rc != 0 and "already exists" not in stderr:
print(colorize(f"⚠ Failed to add remote: {stderr}", Colors.RED))
return False
# Fetch and reset to match remote
print(" → Fetching from origin...")
rc, _, stderr = run_cmd(["git", "fetch", "origin"], PROTECTED_LOC)
if rc != 0:
print(colorize(f"⚠ Fetch failed: {stderr}", Colors.RED))
return False
print(" → Resetting to origin/main...")
rc, _, stderr = run_cmd(["git", "reset", "--hard", "origin/main"], PROTECTED_LOC)
if rc != 0:
print(colorize(f"⚠ Reset failed: {stderr}", Colors.RED))
return False
else:
# No directory - clone fresh (shallow for space efficiency)
print(" → Cloning fresh (shallow)...")
PROTECTED_LOC.parent.mkdir(parents=True, exist_ok=True)
rc, _, stderr = run_cmd(["git", "clone", "--depth", "1", REPO_URL, str(PROTECTED_LOC)])
if rc != 0:
print(colorize(f"⚠ Clone failed: {stderr}", Colors.RED))
return False
# Install pre-commit hook to prevent accidental commits
install_protected_hooks()
print(colorize("✓ Protected installation initialized as git repo", Colors.GREEN))
return True
def install_protected_hooks(): """Install git hooks in protected to prevent accidental commits.""" hooks_dir = PROTECTED_LOC / ".git" / "hooks" hooks_dir.mkdir(parents=True, exist_ok=True)
pre_commit = hooks_dir / "pre-commit"
pre_commit_content = '''#!/bin/bash
Prevent commits in protected installation
echo "ERROR: This is the protected CODITECT installation." echo "Commits are not allowed here. Make changes in:" echo " ~/PROJECTS/coditect-rollout-master/submodules/core/coditect-core" exit 1 '''
with open(pre_commit, 'w') as f:
f.write(pre_commit_content)
os.chmod(pre_commit, 0o755)
def pull_sync_protected(dry_run: bool = False) -> bool: """Sync protected installation from GitHub using git pull.
This is non-destructive - local-only files (like session-logs-git/)
are preserved automatically since they're not tracked by git.
"""
print(colorize("\n=== Step 3: Pull Sync Protected ===", Colors.BOLD))
if not CODITECT_BASE.exists():
print(colorize(f"⚠ CODITECT base not found: {CODITECT_BASE}", Colors.RED))
print(" Run: python3 scripts/CODITECT-CORE-INITIAL-SETUP.py")
return False
if dry_run:
print(colorize(f" [DRY RUN] Would pull to: {PROTECTED_LOC}", Colors.YELLOW))
return True
print(f" Protected: {PROTECTED_LOC}")
# Ensure protected is a git repo
if not ensure_protected_is_git_repo():
return False
# Check current state
rc, stdout, _ = run_cmd(["git", "rev-parse", "--short", "HEAD"], PROTECTED_LOC)
before_commit = stdout.strip()
print(f" Current commit: {before_commit}")
# Fetch latest
print(" → Fetching from origin...")
rc, _, stderr = run_cmd(["git", "fetch", "origin"], PROTECTED_LOC)
if rc != 0:
print(colorize(f"⚠ Fetch failed: {stderr}", Colors.RED))
return False
# Check if we're behind
rc, stdout, _ = run_cmd(["git", "log", "HEAD..origin/main", "--oneline"], PROTECTED_LOC)
if not stdout.strip():
print(" Already up to date.")
return True
print(" Commits to pull:")
for line in stdout.strip().split('\n')[:5]:
print(f" {line}")
if len(stdout.strip().split('\n')) > 5:
print(f" ... and {len(stdout.strip().split(chr(10))) - 5} more")
# Pull (fast-forward only to be safe)
print(" → Pulling changes...")
rc, _, stderr = run_cmd(["git", "pull", "--ff-only", "origin", "main"], PROTECTED_LOC)
if rc != 0:
# Try reset if fast-forward fails (shouldn't happen in protected)
print(colorize(" Fast-forward failed, resetting to origin/main...", Colors.YELLOW))
rc, _, stderr = run_cmd(["git", "reset", "--hard", "origin/main"], PROTECTED_LOC)
if rc != 0:
print(colorize(f"⚠ Reset failed: {stderr}", Colors.RED))
return False
# Verify
rc, stdout, _ = run_cmd(["git", "rev-parse", "--short", "HEAD"], PROTECTED_LOC)
after_commit = stdout.strip()
print(colorize(f"✓ Updated: {before_commit} → {after_commit}", Colors.GREEN))
return True
def verify_sync(repo: Path) -> bool: """Verify sync between development and protected.""" print(colorize("\n=== Step 4: Verify Sync ===", Colors.BOLD))
if not PROTECTED_LOC.exists():
print(colorize("⚠ Protected installation not found", Colors.YELLOW))
return False
# Compare commits
rc, dev_commit, _ = run_cmd(["git", "rev-parse", "--short", "HEAD"], repo)
rc, prot_commit, _ = run_cmd(["git", "rev-parse", "--short", "HEAD"], PROTECTED_LOC)
dev_commit = dev_commit.strip()
prot_commit = prot_commit.strip()
if dev_commit == prot_commit:
print(colorize(f" ✓ Commits match: {dev_commit}", Colors.GREEN))
else:
print(colorize(f" ! Commits differ: Dev={dev_commit}, Protected={prot_commit}", Colors.YELLOW))
# Compare component counts
def count_files(path: Path, subdir: str, pattern: str = "*.md") -> int:
d = path / subdir
if d.exists():
return len(list(d.glob(pattern)))
return 0
print(" Component counts:")
for component_type in ["agents", "commands"]:
dev_count = count_files(repo, component_type)
prot_count = count_files(PROTECTED_LOC, component_type)
status = colorize("✓", Colors.GREEN) if dev_count == prot_count else colorize("!", Colors.YELLOW)
print(f" {status} {component_type}: Dev={dev_count}, Protected={prot_count}")
# Skills are nested
dev_skills = len(list((repo / "skills").rglob("SKILL.md"))) if (repo / "skills").exists() else 0
prot_skills = len(list((PROTECTED_LOC / "skills").rglob("SKILL.md"))) if (PROTECTED_LOC / "skills").exists() else 0
status = colorize("✓", Colors.GREEN) if dev_skills == prot_skills else colorize("!", Colors.YELLOW)
print(f" {status} skills: Dev={dev_skills}, Protected={prot_skills}")
print(colorize("\n✓ Sync verification complete", Colors.GREEN))
return True
def update_parent_submodule(repo: Path, dry_run: bool = False) -> bool: """Update the submodule reference in coditect-rollout-master.""" print(colorize("\n=== Step 5: Update Parent Repository ===", Colors.BOLD))
if not PARENT_REPO.exists():
print(colorize(f"⚠ Parent repo not found: {PARENT_REPO}", Colors.YELLOW))
return True # Non-fatal
# Get the current coditect-core commit
rc, stdout, _ = run_cmd(["git", "rev-parse", "--short", "HEAD"], repo)
core_commit = stdout.strip()
# Check if submodule has changes
rc, stdout, _ = run_cmd(["git", "status", "--porcelain", "submodules/core/coditect-core"], PARENT_REPO)
if not stdout.strip():
print(" Submodule already up to date in parent.")
return True
if dry_run:
print(colorize(f" [DRY RUN] Would commit submodule update to {core_commit}", Colors.YELLOW))
return True
os.chdir(PARENT_REPO)
# Remove any stale index.lock
lock_file = PARENT_REPO / ".git" / "index.lock"
if lock_file.exists():
lock_file.unlink()
# Stage submodule
rc, _, stderr = run_cmd(["git", "add", "submodules/core/coditect-core"], PARENT_REPO)
if rc != 0:
print(colorize(f"⚠ Failed to stage submodule: {stderr}", Colors.RED))
return False
# Commit
commit_msg = f"chore(submodules): Update coditect-core to {core_commit}\n\nCo-Authored-By: Claude <noreply@anthropic.com>"
rc, _, stderr = run_cmd(["git", "commit", "-m", commit_msg], PARENT_REPO)
if rc != 0:
print(colorize(f"⚠ Commit failed: {stderr}", Colors.RED))
return False
print(colorize(f"✓ Committed submodule update ({core_commit})", Colors.GREEN))
# Push
print(" Pushing to origin/main...")
rc, _, stderr = run_cmd(["git", "push", "origin", "main"], PARENT_REPO, capture=False)
if rc != 0:
print(colorize("⚠ Push failed", Colors.RED))
return False
print(colorize("✓ Parent repo updated and pushed", Colors.GREEN))
return True
def main(): parser = argparse.ArgumentParser( description="Push to GitHub and sync protected installation via git pull", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 scripts/git-push-sync.py # Push and sync python3 scripts/git-push-sync.py -m "feat: ..." # With commit message python3 scripts/git-push-sync.py --dry-run # Preview only python3 scripts/git-push-sync.py --no-sync # Push only python3 scripts/git-push-sync.py --pull-only # Only pull to protected """ ) parser.add_argument("-m", "--message", type=str, help="Commit message (if uncommitted changes)") parser.add_argument("--dry-run", action="store_true", help="Preview without making changes") parser.add_argument("--no-sync", action="store_true", help="Push only, skip protected sync") parser.add_argument("--no-parent", action="store_true", help="Skip parent repo update") parser.add_argument("--pull-only", action="store_true", help="Only pull to protected (skip push)") parser.add_argument("--yes", "-y", action="store_true", help="Skip confirmations")
args = parser.parse_args()
print(colorize("\n╔══════════════════════════════════════════════════════════════╗", Colors.CYAN))
print(colorize("║ CODITECT Git Push Sync (ADR-113) - Pull Mode ║", Colors.CYAN))
print(colorize("╚══════════════════════════════════════════════════════════════╝", Colors.CYAN))
# Detect working directory
repo = detect_working_directory()
print(f"\nRepository: {repo}")
if args.dry_run:
print(colorize("[DRY RUN MODE - No changes will be made]\n", Colors.YELLOW))
# Pull-only mode
if args.pull_only:
if not pull_sync_protected(args.dry_run):
sys.exit(1)
verify_sync(repo)
print(colorize("\n✓ Pull sync complete!", Colors.GREEN))
sys.exit(0)
# Check for uncommitted changes
has_changes, changes = check_for_uncommitted_changes(repo)
if has_changes:
print("\nUncommitted changes detected:")
for line in changes.split('\n')[:10]:
print(f" {line}")
if len(changes.split('\n')) > 10:
print(f" ... and {len(changes.split(chr(10))) - 10} more")
if args.message:
if args.dry_run:
print(colorize(f" [DRY RUN] Would commit with message: {args.message}", Colors.YELLOW))
else:
if not commit_changes(repo, args.message):
sys.exit(1)
else:
print(colorize("\n⚠ Uncommitted changes. Use -m 'message' to commit or commit manually first.", Colors.YELLOW))
sys.exit(1)
# Confirmation
if not args.dry_run and not args.yes:
print(colorize("\n⚠ This will:", Colors.YELLOW))
print(" 1. Push to origin/main")
if not args.no_sync:
print(" 2. Update session log")
print(" 3. Pull from GitHub to protected installation")
if not args.no_parent:
print(" 4. Update parent repo submodule reference")
response = input("\nProceed? [y/N] ").strip().lower()
if response != 'y':
print(" Cancelled.")
sys.exit(0)
# Step 1: Git push
if not git_push(repo, args.dry_run):
sys.exit(1)
if args.no_sync:
print(colorize("\n✓ Push complete (sync skipped)", Colors.GREEN))
sys.exit(0)
# Step 2: Update session log
rc, stdout, _ = run_cmd(["git", "rev-parse", "--short", "HEAD"], repo)
commit_hash = stdout.strip()
update_session_log(f"Pushed {commit_hash} and synced protected installation", args.dry_run)
# Step 3: Pull sync
if not pull_sync_protected(args.dry_run):
print(colorize("\n⚠ Sync failed but push succeeded", Colors.YELLOW))
print(" Run manually: python3 scripts/git-push-sync.py --pull-only")
sys.exit(1)
# Step 4: Verify
verify_sync(repo)
# Step 5: Update parent
if not args.no_parent:
if not update_parent_submodule(repo, args.dry_run):
print(colorize("⚠ Parent update failed, but coditect-core sync succeeded", Colors.YELLOW))
print(colorize("\n╔══════════════════════════════════════════════════════════════╗", Colors.GREEN))
print(colorize("║ ✓ Sync Complete! ║", Colors.GREEN))
print(colorize("╚══════════════════════════════════════════════════════════════╝", Colors.GREEN))
print(f"\n Development: {repo}")
print(f" Protected: {PROTECTED_LOC}")
if not args.no_parent:
print(f" Parent: {PARENT_REPO}")
if name == "main": main()