Skip to main content

#!/usr/bin/env python3 """ Core Sync - Fully Automated Framework Sync

One-command, zero-interaction sync of coditect-core:

  1. Auto-detects changed files and generates commit message
  2. Commits, pushes coditect-core
  3. Updates parent repo submodule reference
  4. Pushes parent repo
  5. Syncs protected installation

Usage: python3 core-sync.py # Full auto sync python3 core-sync.py --dry-run # Preview what would happen python3 core-sync.py --message "..." # Override auto-generated message

For /sync command - the fastest way to sync your framework changes.

Created: 2026-01-29 Track: H.9 (Framework Automation) """

import argparse import json import os import re import subprocess import sys from collections import Counter from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple

Paths

HOME = Path.home() PARENT_REPO = HOME / "PROJECTS" / "coditect-rollout-master" DEV_COPY = PARENT_REPO / "submodules" / "core" / "coditect-core"

Platform-specific protected location

if sys.platform == "darwin": PROTECTED_LOC = HOME / "Library" / "Application Support" / "CODITECT" / "core" elif sys.platform == "win32": PROTECTED_LOC = Path(os.environ.get("LOCALAPPDATA", HOME / "AppData" / "Local")) / "CODITECT" / "core" else: PROTECTED_LOC = Path(os.environ.get("XDG_DATA_HOME", HOME / ".local" / "share")) / "coditect" / "core"

Colors

class C: RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' CYAN = '\033[96m' BOLD = '\033[1m' END = '\033[0m'

def run(cmd: List[str], cwd: Optional[Path] = None, capture: bool = True) -> Tuple[int, str, str]: """Run command, return (code, stdout, stderr).""" try: r = subprocess.run(cmd, cwd=cwd, capture_output=capture, text=True) return r.returncode, r.stdout or "", r.stderr or "" except Exception as e: return 1, "", str(e)

def remove_lock_files(): """Remove stale git lock files that block operations.""" lock_files = [ PARENT_REPO / ".git" / "index.lock", DEV_COPY.parent.parent.parent / ".git" / "modules" / "submodules" / "core" / "coditect-core" / "index.lock", ] for lock in lock_files: if lock.exists(): lock.unlink() print(f" Removed stale lock: {lock.name}")

def get_changed_files() -> List[str]: """Get list of changed files in coditect-core.""" os.chdir(DEV_COPY)

# Staged + unstaged + untracked
_, stdout, _ = run(["git", "status", "--porcelain"], DEV_COPY)
files = []
for line in stdout.strip().split('\n'):
if line.strip():
# Format: XY filename or XY -> renamed
parts = line[3:].split(' -> ')
files.append(parts[-1]) # Use final name if renamed

return files

def get_unpushed_commits() -> List[str]: """Get unpushed commit messages.""" _, stdout, _ = run(["git", "log", "origin/main..HEAD", "--format=%s"], DEV_COPY) return [s.strip() for s in stdout.strip().split('\n') if s.strip()]

def classify_changes(files: List[str]) -> Dict[str, List[str]]: """Classify files by component type.""" categories = { 'agents': [], 'commands': [], 'skills': [], 'scripts': [], 'hooks': [], 'config': [], 'docs': [], 'other': [] }

for f in files:
if f.startswith('agents/'):
categories['agents'].append(f)
elif f.startswith('commands/'):
categories['commands'].append(f)
elif f.startswith('skills/'):
categories['skills'].append(f)
elif f.startswith('scripts/'):
categories['scripts'].append(f)
elif f.startswith('hooks/'):
categories['hooks'].append(f)
elif f.startswith('config/'):
categories['config'].append(f)
elif f.startswith('docs/') or f.startswith('internal/'):
categories['docs'].append(f)
else:
categories['other'].append(f)

return {k: v for k, v in categories.items() if v}

def generate_commit_message(files: List[str], unpushed: List[str]) -> str: """Auto-generate a conventional commit message based on changes.""" if not files and not unpushed: return "chore: sync framework"

# If there are unpushed commits, summarize them
if unpushed and not files:
if len(unpushed) == 1:
return unpushed[0]
return f"chore: batch sync ({len(unpushed)} commits)"

categories = classify_changes(files)

# Determine commit type based on what changed
if len(categories) == 1:
cat = list(categories.keys())[0]
cat_files = categories[cat]

# Single file change
if len(cat_files) == 1:
filename = Path(cat_files[0]).stem
# Detect if new file or modification
_, status, _ = run(["git", "status", "--porcelain", cat_files[0]], DEV_COPY)
is_new = status.startswith('A') or status.startswith('??')

if cat == 'agents':
action = "Add" if is_new else "Update"
return f"feat(agents): {action} {filename} agent"
elif cat == 'commands':
action = "Add" if is_new else "Update"
return f"feat(commands): {action} /{filename} command"
elif cat == 'skills':
action = "Add" if is_new else "Update"
skill_name = cat_files[0].split('/')[1] if '/' in cat_files[0] else filename
return f"feat(skills): {action} {skill_name} skill"
elif cat == 'scripts':
action = "Add" if is_new else "Update"
return f"feat(scripts): {action} {filename}"
elif cat == 'hooks':
action = "Add" if is_new else "Update"
return f"feat(hooks): {action} {filename} hook"
elif cat == 'config':
return f"chore(config): Update {filename}"
elif cat == 'docs':
return f"docs: Update {filename}"

# Multiple files in same category
return f"feat({cat}): Update {len(cat_files)} {cat}"

# Multiple categories
summary_parts = []
for cat, cat_files in categories.items():
summary_parts.append(f"{len(cat_files)} {cat}")

# Determine primary type
if 'agents' in categories or 'commands' in categories or 'skills' in categories:
commit_type = "feat"
elif 'scripts' in categories or 'hooks' in categories:
commit_type = "feat"
elif 'docs' in categories:
commit_type = "docs"
else:
commit_type = "chore"

return f"{commit_type}: Update {', '.join(summary_parts)}"

def sync_core(message: Optional[str], dry_run: bool) -> bool: """Sync coditect-core: stage, commit, push.""" print(f"\n{C.BOLD}=== Syncing coditect-core ==={C.END}") os.chdir(DEV_COPY)

# Check for changes
files = get_changed_files()
unpushed = get_unpushed_commits()

if not files and not unpushed:
print(f" {C.GREEN}No changes to sync{C.END}")
return True

# Show what will be synced
if files:
print(f"\n Changed files ({len(files)}):")
for f in files[:10]:
print(f" {f}")
if len(files) > 10:
print(f" ... and {len(files) - 10} more")

if unpushed:
print(f"\n Unpushed commits ({len(unpushed)}):")
for c in unpushed[:5]:
print(f" {c}")

# Generate commit message if not provided
if not message:
message = generate_commit_message(files, unpushed)

print(f"\n Commit message: {C.CYAN}{message}{C.END}")

if dry_run:
print(f"\n {C.YELLOW}[DRY RUN] Would commit and push{C.END}")
return True

# Stage all changes
if files:
rc, _, err = run(["git", "add", "-A"], DEV_COPY)
if rc != 0:
print(f" {C.RED}Failed to stage: {err}{C.END}")
return False

# Commit if there are staged changes
if files:
full_msg = f"{message}\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>"
rc, _, err = run(["git", "commit", "-m", full_msg], DEV_COPY)
if rc != 0 and "nothing to commit" not in err:
print(f" {C.RED}Commit failed: {err}{C.END}")
return False
print(f" {C.GREEN}Committed{C.END}")

# Push
rc, out, err = run(["git", "push", "origin", "main"], DEV_COPY, capture=False)
if rc != 0:
print(f" {C.RED}Push failed{C.END}")
return False

print(f" {C.GREEN}Pushed to origin/main{C.END}")
return True

def sync_parent(dry_run: bool) -> bool: """Update parent repo with new submodule commit.""" print(f"\n{C.BOLD}=== Syncing coditect-rollout-master ==={C.END}") os.chdir(PARENT_REPO)

# Get core commit
_, stdout, _ = run(["git", "rev-parse", "--short", "HEAD"], DEV_COPY)
core_commit = stdout.strip()

# Check if submodule changed
_, status, _ = run(["git", "status", "--porcelain", "submodules/core/coditect-core"], PARENT_REPO)
if not status.strip():
print(f" {C.GREEN}Submodule already up to date{C.END}")
return True

print(f" Submodule updated to: {core_commit}")

if dry_run:
print(f" {C.YELLOW}[DRY RUN] Would commit and push parent{C.END}")
return True

# Stage submodule
rc, _, err = run(["git", "add", "submodules/core/coditect-core"], PARENT_REPO)
if rc != 0:
print(f" {C.RED}Failed to stage submodule: {err}{C.END}")
return False

# Commit
msg = f"chore(submodules): Update coditect-core to {core_commit}\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>"
rc, _, err = run(["git", "commit", "-m", msg], PARENT_REPO)
if rc != 0:
print(f" {C.RED}Commit failed: {err}{C.END}")
return False

print(f" {C.GREEN}Committed submodule update{C.END}")

# Push
rc, _, _ = run(["git", "push", "origin", "main"], PARENT_REPO, capture=False)
if rc != 0:
print(f" {C.RED}Push failed{C.END}")
return False

print(f" {C.GREEN}Pushed to origin/main{C.END}")
return True

def sync_protected(dry_run: bool) -> bool: """Sync protected installation via git pull.""" print(f"\n{C.BOLD}=== Syncing protected installation ==={C.END}")

if not PROTECTED_LOC.exists():
print(f" {C.YELLOW}Protected location not found: {PROTECTED_LOC}{C.END}")
print(f" Run CODITECT-CORE-INITIAL-SETUP.py to create it")
return True # Not a failure, just skip

if dry_run:
print(f" {C.YELLOW}[DRY RUN] Would pull latest to protected{C.END}")
return True

os.chdir(PROTECTED_LOC)

# Check if it's a git repo
if not (PROTECTED_LOC / ".git").exists():
print(f" {C.YELLOW}Protected location is not a git repo (read-only installation){C.END}")
print(f" Run: /framework-sync --update-only for full re-clone")
return True

# Pull latest
rc, out, err = run(["git", "pull", "--rebase"], PROTECTED_LOC)
if rc != 0:
# Try without rebase
rc, out, err = run(["git", "pull"], PROTECTED_LOC)
if rc != 0:
print(f" {C.YELLOW}Pull had conflicts, reset to origin{C.END}")
run(["git", "fetch", "origin"], PROTECTED_LOC)
run(["git", "reset", "--hard", "origin/main"], PROTECTED_LOC)

# Get current commit
_, stdout, _ = run(["git", "rev-parse", "--short", "HEAD"], PROTECTED_LOC)
print(f" {C.GREEN}Protected now at: {stdout.strip()}{C.END}")

return True

def sync_app_bundle(dry_run: bool) -> bool: """Re-copy and re-sign codi-watcher into .app bundle (macOS only, ADR-190).

After syncing the protected installation, the standalone binary at
PROTECTED/bin/codi-watcher may have been updated. The LaunchAgent
references the binary inside CoDiWatcher.app, so we must keep them
in sync and re-sign to preserve the Full Disk Access grant.
"""
if sys.platform != "darwin":
return True

app_dir = PROTECTED_LOC / "bin" / "CoDiWatcher.app"
binary_src = PROTECTED_LOC / "bin" / "codi-watcher"
binary_dest = app_dir / "Contents" / "MacOS" / "codi-watcher"

if not app_dir.exists():
# No .app bundle yet — skip silently
return True

if not binary_src.exists():
# No standalone binary — nothing to sync
return True

print(f"\n{C.BOLD}=== Syncing CoDiWatcher.app bundle (ADR-190) ==={C.END}")

# Check if source binary is newer than .app binary
src_mtime = binary_src.stat().st_mtime
if binary_dest.exists():
dest_mtime = binary_dest.stat().st_mtime
if src_mtime <= dest_mtime:
print(f" {C.GREEN}.app bundle binary is up to date{C.END}")
return True

if dry_run:
print(f" {C.YELLOW}[DRY RUN] Would re-copy and re-sign .app bundle{C.END}")
return True

# Copy binary into .app
import shutil
shutil.copy2(str(binary_src), str(binary_dest))
print(f" Copied binary into .app bundle")

# Find signing identity
rc, stdout, _ = run(["security", "find-identity", "-v", "-p", "codesigning"])
if "CODITECT" in stdout:
# Extract identity name
identity = "CODITECT Local Signing"
for line in stdout.split('\n'):
if "CODITECT" in line:
# Parse: 1) HASH "Name"
match = re.search(r'"([^"]*CODITECT[^"]*)"', line)
if match:
identity = match.group(1)
break

rc, _, err = run([
"codesign", "-fs", identity,
"--identifier", "ai.coditect.codi-watcher",
str(app_dir)
])
if rc == 0:
print(f" {C.GREEN}Re-signed .app with \"{identity}\"{C.END}")
else:
print(f" {C.YELLOW}Re-sign failed: {err} (FDA may still work){C.END}")
else:
# No signing identity — ad-hoc sign
rc, _, err = run([
"codesign", "-fs", "-",
"--identifier", "ai.coditect.codi-watcher",
str(app_dir)
])
if rc == 0:
print(f" {C.YELLOW}Ad-hoc signed .app (no CODITECT cert found){C.END}")
else:
print(f" {C.YELLOW}Ad-hoc sign failed: {err}{C.END}")

return True

def main(): parser = argparse.ArgumentParser( description="Fully automated coditect-core sync", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 core-sync.py # Auto sync everything python3 core-sync.py --dry-run # Preview changes python3 core-sync.py -m "fix: bug fix" # Custom message """ ) parser.add_argument("--dry-run", "-n", action="store_true", help="Preview what would happen") parser.add_argument("--message", "-m", type=str, help="Custom commit message (auto-generated if not provided)") parser.add_argument("--skip-parent", action="store_true", help="Skip updating coditect-rollout-master") parser.add_argument("--skip-protected", action="store_true", help="Skip syncing protected installation")

args = parser.parse_args()

print(f"\n{C.BOLD}{C.CYAN}CODITECT Core Sync{C.END}")
print("=" * 40)

if args.dry_run:
print(f"{C.YELLOW}[DRY RUN MODE]{C.END}\n")

# Remove any stale lock files
remove_lock_files()

# Step 1: Sync coditect-core
if not sync_core(args.message, args.dry_run):
print(f"\n{C.RED}Core sync failed{C.END}")
sys.exit(1)

# Step 2: Sync parent repo
if not args.skip_parent:
if not sync_parent(args.dry_run):
print(f"\n{C.YELLOW}Parent sync failed (core sync succeeded){C.END}")

# Step 3: Sync protected installation
if not args.skip_protected:
if not sync_protected(args.dry_run):
print(f"\n{C.YELLOW}Protected sync failed{C.END}")

# Step 4: Sync .app bundle if binary changed (macOS, ADR-190)
if not args.skip_protected:
sync_app_bundle(args.dry_run)

print(f"\n{C.GREEN}{C.BOLD}Sync complete!{C.END}")

if name == "main": main()