#!/usr/bin/env python3 """ Install CODITECT Context Watcher as a system service.
Cross-platform: macOS (launchd) and Linux (systemd).
Usage: python3 install-context-watcher.py # Install and start python3 install-context-watcher.py --status # Check status python3 install-context-watcher.py --uninstall # Remove service python3 install-context-watcher.py --download # Force download from GitHub python3 install-context-watcher.py --api # Force use API (skip if fails) """
import argparse import hashlib import json import os import platform import shutil import subprocess import sys import urllib.request import urllib.error from pathlib import Path
API configuration (primary source)
API_BASE_URL = os.environ.get("CODITECT_API_URL", "https://api.coditect.ai") API_TIMEOUT = 10 # seconds
GitHub release configuration (fallback)
GITHUB_REPO = "coditect-ai/coditect-core" RELEASE_VERSION = "2.0.0" RELEASE_TAG = f"codi-watcher-v{RELEASE_VERSION}"
License acknowledgment tracking
LICENSE_ACCEPTED_FILE = ".coditect-license-accepted"
LICENSE_NOTICE = """
CODITECT SOFTWARE LICENSE AGREEMENT
================================================================================
CODITECT Framework - Copyright (c) 2025 AZ1.AI Inc. All Rights Reserved.
By proceeding with this installation, you acknowledge and agree that:
-
PROPRIETARY SOFTWARE: CODITECT is proprietary software owned by AZ1.AI Inc. All intellectual property rights are reserved.
-
NO WARRANTIES: The software is provided "AS IS" without any warranties. AZ1.AI Inc. disclaims all warranties including merchantability, fitness for a particular purpose, and non-infringement.
-
LIMITATION OF LIABILITY: AZ1.AI Inc. shall not be liable for any damages arising from the use or inability to use the software, including but not limited to direct, indirect, incidental, or consequential damages.
-
ASSUMPTION OF RISK: You assume full responsibility for the selection, installation, and use of the software. This includes all AI-generated content, security implications, and compliance requirements.
-
INDEMNIFICATION: You agree to indemnify and hold harmless AZ1.AI Inc. from any claims arising from your use of the software.
Full license terms: https://coditect.ai/license
"""
def get_home() -> Path: """Get home directory.""" return Path.home()
def check_license_accepted() -> bool: """Check if user has previously accepted the license.""" coditect = get_coditect_dir() license_file = coditect / LICENSE_ACCEPTED_FILE return license_file.exists()
def record_license_acceptance(): """Record that user has accepted the license.""" from datetime import datetime coditect = get_coditect_dir() license_file = coditect / LICENSE_ACCEPTED_FILE
# Create directory if needed
coditect.mkdir(parents=True, exist_ok=True)
# Record acceptance with timestamp and machine info
acceptance_data = {
"accepted_at": datetime.now().isoformat(),
"version": RELEASE_VERSION,
"hostname": platform.node(),
"platform": f"{platform.system()}-{platform.machine()}"
}
license_file.write_text(json.dumps(acceptance_data, indent=2))
def prompt_license_acceptance(non_interactive: bool = False) -> bool: """ Display license notice and prompt for acceptance.
Args:
non_interactive: If True, assume acceptance (for CI/automation)
Returns:
True if license accepted, False otherwise
"""
# Check if already accepted
if check_license_accepted():
return True
# Display license notice
print(LICENSE_NOTICE)
# In non-interactive mode, require explicit environment variable
if non_interactive:
if os.environ.get("CODITECT_ACCEPT_LICENSE", "").lower() == "yes":
print("License accepted via CODITECT_ACCEPT_LICENSE=yes")
record_license_acceptance()
return True
else:
print("Error: Non-interactive mode requires CODITECT_ACCEPT_LICENSE=yes")
print("Set environment variable to accept: export CODITECT_ACCEPT_LICENSE=yes")
return False
# Interactive prompt
print("Do you accept these terms? (yes/no)")
try:
response = input("> ").strip().lower()
except (EOFError, KeyboardInterrupt):
print("\nInstallation cancelled.")
return False
if response in ("yes", "y"):
record_license_acceptance()
print("License accepted. Proceeding with installation...\n")
return True
else:
print("\nLicense not accepted. Installation cancelled.")
print("You must accept the license terms to use CODITECT.")
return False
def get_coditect_dir() -> Path: """Get CODITECT installation directory.""" home = get_home() coditect = home / ".coditect" if coditect.exists(): return coditect.resolve()
# Check for symlinked .coditect
projects_coditect = home / "PROJECTS" / ".coditect"
if projects_coditect.exists():
return projects_coditect.resolve()
return coditect
def ensure_directories(): """Create required directories.""" coditect = get_coditect_dir() (coditect / "logs").mkdir(parents=True, exist_ok=True) (coditect / "bin").mkdir(parents=True, exist_ok=True)
# ADR-114 & ADR-118: User data goes to .coditect-data
user_data = get_home() / "PROJECTS" / ".coditect-data" / "context-storage"
user_data.mkdir(parents=True, exist_ok=True)
# Legacy fallback (for backward compatibility)
legacy_storage = coditect / "context-storage"
if not legacy_storage.exists() and not legacy_storage.is_symlink():
legacy_storage.mkdir(parents=True, exist_ok=True)
def get_platform_identifier() -> str: """Get the platform identifier for API requests.""" system = platform.system().lower() machine = platform.machine().lower()
if system == "darwin":
if machine in ("arm64", "aarch64"):
return "darwin-arm64"
else:
return "darwin-x86_64"
elif system == "linux":
if machine in ("arm64", "aarch64"):
return "linux-arm64"
else:
return "linux-x86_64"
else:
raise ValueError(f"Unsupported platform: {system}-{machine}")
def get_platform_artifact() -> str: """Get the artifact name for the current platform.""" platform_id = get_platform_identifier() return f"codi-watcher-{platform_id}"
def get_machine_id() -> str: """Get CODITECT machine ID if available.""" coditect = get_coditect_dir() machine_id_file = coditect / "machine-id.json" if machine_id_file.exists(): try: data = json.loads(machine_id_file.read_text()) return data.get("machine_uuid", "") except Exception: pass return ""
def download_from_api(dest_path: Path, version: str = None) -> dict: """Download binary using the CODITECT API.
The API provides:
- Signed GCS URLs (5-minute TTL) for secure downloads
- Checksum verification
- Download tracking for analytics
Args:
dest_path: Where to save the binary
version: Version to download (default: latest)
Returns:
dict with keys: success, source, sha256, file_size, version
On failure, success=False with error message
"""
try:
platform_id = get_platform_identifier()
machine_id = get_machine_id()
# Build request payload
payload = {
"platform": platform_id,
"binary_name": "codi-watcher",
"machine_id": machine_id
}
if version:
payload["version"] = version
# Make API request
api_url = f"{API_BASE_URL}/api/v1/releases/download"
req_data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(
api_url,
data=req_data,
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': f'coditect-install-script/{RELEASE_VERSION}'
},
method='POST'
)
print(f"Requesting download URL from API...")
with urllib.request.urlopen(req, timeout=API_TIMEOUT) as response:
result = json.loads(response.read().decode('utf-8'))
download_url = result.get('download_url')
sha256 = result.get('sha256', '')
file_size = result.get('file_size', 0)
source = result.get('source', 'unknown')
api_version = result.get('version', version or RELEASE_VERSION)
if not download_url:
return {"success": False, "error": "API returned no download URL"}
print(f" ✓ Got download URL (source: {source})")
print(f" Downloading {get_platform_artifact()}...")
# Download the binary
urllib.request.urlretrieve(download_url, dest_path)
dest_path.chmod(0o755)
# Verify checksum if provided
if sha256:
sha256_calc = hashlib.sha256()
with open(dest_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256_calc.update(chunk)
actual_checksum = sha256_calc.hexdigest()
if actual_checksum == sha256:
print(f" ✓ Checksum verified")
else:
print(f" ⚠ Checksum mismatch!")
print(f" Expected: {sha256[:16]}...")
print(f" Got: {actual_checksum[:16]}...")
dest_path.unlink()
return {"success": False, "error": "Checksum verification failed"}
else:
print(f" ℹ No checksum provided (skipping verification)")
return {
"success": True,
"source": source,
"sha256": sha256,
"file_size": file_size,
"version": api_version
}
except urllib.error.HTTPError as e:
error_body = ""
try:
error_body = e.read().decode('utf-8')
except Exception:
pass
return {"success": False, "error": f"API error {e.code}: {error_body}"}
except urllib.error.URLError as e:
return {"success": False, "error": f"Network error: {e.reason}"}
except json.JSONDecodeError as e:
return {"success": False, "error": f"Invalid API response: {e}"}
except Exception as e:
return {"success": False, "error": str(e)}
def download_from_github(dest_path: Path, version: str = RELEASE_VERSION) -> bool: """Download pre-built binary from GitHub releases (fallback method).
Args:
dest_path: Where to save the binary
version: Version to download (default: RELEASE_VERSION)
Returns:
True if download successful, False otherwise
"""
try:
artifact = get_platform_artifact()
tag = f"codi-watcher-v{version}"
print(f"Downloading {artifact} v{version} from GitHub (fallback)...")
# Try using gh CLI first (handles GitHub auth and redirects properly)
gh_result = subprocess.run(
["gh", "release", "download", tag,
"--repo", GITHUB_REPO,
"--pattern", artifact,
"--dir", str(dest_path.parent),
"--clobber"],
capture_output=True,
text=True
)
if gh_result.returncode == 0:
# gh downloads to artifact name, rename to dest_path if different
downloaded = dest_path.parent / artifact
if downloaded != dest_path and downloaded.exists():
downloaded.rename(dest_path)
print(f" ✓ Downloaded via gh CLI")
else:
# Fall back to curl (handles redirects better than urllib)
print(f" gh CLI failed, trying curl...")
url = f"https://github.com/{GITHUB_REPO}/releases/download/{tag}/{artifact}"
curl_result = subprocess.run(
["curl", "-L", "-o", str(dest_path), url],
capture_output=True,
text=True
)
if curl_result.returncode != 0:
print(f" Release not found: {tag}")
print(f" Run GitHub Actions workflow to create release first")
return False
print(f" ✓ Downloaded via curl")
# Make executable
dest_path.chmod(0o755)
# Try to verify checksum (optional - don't fail if checksum missing)
checksum_path = dest_path.parent / f"{artifact}.sha256"
try:
# Download checksum file
gh_checksum = subprocess.run(
["gh", "release", "download", tag,
"--repo", GITHUB_REPO,
"--pattern", f"{artifact}.sha256",
"--dir", str(dest_path.parent),
"--clobber"],
capture_output=True,
text=True
)
if gh_checksum.returncode == 0 and checksum_path.exists():
expected_checksum = checksum_path.read_text().strip().split()[0]
# Calculate actual checksum
sha256 = hashlib.sha256()
with open(dest_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
actual_checksum = sha256.hexdigest()
if actual_checksum == expected_checksum:
print(f" ✓ Checksum verified")
else:
print(f" ⚠ Checksum mismatch (expected: {expected_checksum[:16]}..., got: {actual_checksum[:16]}...)")
dest_path.unlink()
checksum_path.unlink(missing_ok=True)
return False
# Clean up checksum file
checksum_path.unlink(missing_ok=True)
else:
print(f" ℹ Checksum file not available (skipping verification)")
except Exception as e:
print(f" ℹ Checksum verification skipped: {e}")
# Note: macOS re-signing is handled by ensure_binary() after all download methods
return True
except Exception as e:
print(f"Download failed: {e}")
return False
def ensure_binary(force_download: bool = False, use_api: bool = True) -> bool: """Ensure the codi-watcher binary exists and is properly named.
IMPORTANT: Binary must be named 'codi-watcher' (not 'coditect-context-watch')
because Claude Code shell integration backgrounds commands containing
'context-watch' in the name, causing the service to hang.
See ADR-066 for details.
Download priority:
1. CODITECT API (signed GCS URLs) - secure, tracked
2. GitHub releases (fallback) - public, untracked
3. Build from source (last resort) - requires cargo
Args:
force_download: If True, always download even if binary exists
use_api: If True, try API first before GitHub fallback
"""
coditect = get_coditect_dir()
bin_dir = coditect / "bin"
binary_name = "codi-watcher"
binary_path = bin_dir / binary_name
# Check if binary exists (unless force download requested)
if binary_path.exists() and not force_download:
print(f"✓ Binary exists: {binary_path}")
return True
downloaded = False
# Method 1: Try CODITECT API first (signed URLs, tracking)
if use_api:
print(f"Attempting to download codi-watcher v{RELEASE_VERSION} via API...")
api_result = download_from_api(binary_path, RELEASE_VERSION)
if api_result.get("success"):
downloaded = True
print(f"✓ Downloaded v{api_result.get('version')} from {api_result.get('source')}")
else:
print(f" API unavailable: {api_result.get('error', 'unknown error')}")
print(f" Falling back to GitHub releases...")
# Method 2: Try GitHub releases (fallback)
if not downloaded:
print(f"Attempting to download codi-watcher v{RELEASE_VERSION} from GitHub...")
if download_from_github(binary_path):
downloaded = True
# Apply macOS code signing if download succeeded
if downloaded and binary_path.exists():
if platform.system() == "Darwin":
sign_result = subprocess.run(
["codesign", "--force", "--sign", "-", str(binary_path)],
capture_output=True,
text=True
)
if sign_result.returncode == 0:
print(f"✓ Re-signed binary (macOS requirement)")
else:
print(f"⚠ Could not re-sign: {sign_result.stderr}")
print(f"✓ Downloaded to {binary_path}")
return True
# Method 3: Build from source (last resort)
print("Download failed, trying to build from source...")
# Fallback: Check for source and try to build
source_dir = coditect / "tools" / "context-watcher"
if not source_dir.exists():
# Try codanna (library contains watcher)
codanna_dir = coditect.parent / "codanna" if coditect.is_symlink() else None
if codanna_dir and (codanna_dir / "Cargo.toml").exists():
source_dir = codanna_dir
if source_dir and (source_dir / "Cargo.toml").exists():
print(f"Building binary from {source_dir}...")
result = subprocess.run(
["cargo", "build", "--release", "--bin", "codi-watcher"],
cwd=source_dir,
capture_output=True,
text=True
)
if result.returncode == 0:
# Copy with correct name
built_binary = source_dir / "target" / "release" / "codi-watcher"
if built_binary.exists():
shutil.copy(built_binary, binary_path)
binary_path.chmod(0o755)
# Re-sign binary on macOS (required after copy)
if platform.system() == "Darwin":
sign_result = subprocess.run(
["codesign", "--force", "--sign", "-", str(binary_path)],
capture_output=True,
text=True
)
if sign_result.returncode == 0:
print(f"✓ Re-signed binary (macOS requirement)")
else:
print(f"Warning: Could not re-sign binary: {sign_result.stderr}")
print(f"✓ Installed binary to {binary_path}")
return True
else:
print(f"Error: Built binary not found at {built_binary}")
else:
print(f"Error building binary: {result.stderr}")
print(f"Error: Binary not found at {binary_path}")
print("Options:")
print(f" 1. Wait for GitHub release codi-watcher-v{RELEASE_VERSION}")
print(f" 2. Build manually:")
print(f" cd ~/.coditect/tools/context-watcher && cargo build --release")
print(f" cp target/release/codi-watcher ~/.coditect/bin/codi-watcher")
print(f" codesign --force --sign - ~/.coditect/bin/codi-watcher # macOS only")
return False
def install_macos(force_download: bool = False, use_api: bool = True): """Install launchd service on macOS.""" home = get_home() coditect = get_coditect_dir()
# Ensure binary exists first
ensure_directories()
if not ensure_binary(force_download=force_download, use_api=use_api):
return False
# Source plist template
source_plist = coditect / "scripts" / "services" / "ai.coditect.context-watcher.plist"
if not source_plist.exists():
print(f"Error: Plist template not found at {source_plist}")
return False
# Destination
launch_agents = home / "Library" / "LaunchAgents"
launch_agents.mkdir(parents=True, exist_ok=True)
dest_plist = launch_agents / "ai.coditect.context-watcher.plist"
# Read and substitute $HOME
plist_content = source_plist.read_text()
plist_content = plist_content.replace("$HOME", str(home))
# Write plist
dest_plist.write_text(plist_content)
print(f"✓ Installed plist to {dest_plist}")
# Unload if already loaded
subprocess.run(
["launchctl", "unload", str(dest_plist)],
capture_output=True
)
# Load service
result = subprocess.run(
["launchctl", "load", str(dest_plist)],
capture_output=True,
text=True
)
if result.returncode == 0:
print("✓ Service loaded")
print()
print("Context watcher is now running!")
print("Check status: ~/.coditect/bin/codi-watcher --status")
print("View logs: tail -f ~/.coditect/logs/context-watcher.log")
return True
else:
print(f"Error loading service: {result.stderr}")
return False
def install_linux(force_download: bool = False, use_api: bool = True): """Install systemd user service on Linux.""" home = get_home() coditect = get_coditect_dir()
# Ensure binary exists first
ensure_directories()
if not ensure_binary(force_download=force_download, use_api=use_api):
return False
# Source service file
source_service = coditect / "scripts" / "services" / "coditect-context-watcher.service"
if not source_service.exists():
print(f"Error: Service template not found at {source_service}")
return False
# Destination
systemd_user = home / ".config" / "systemd" / "user"
systemd_user.mkdir(parents=True, exist_ok=True)
dest_service = systemd_user / "coditect-context-watcher.service"
# Copy service file
shutil.copy(source_service, dest_service)
print(f"✓ Installed service to {dest_service}")
# Reload systemd
subprocess.run(["systemctl", "--user", "daemon-reload"], capture_output=True)
# Enable service
result = subprocess.run(
["systemctl", "--user", "enable", "coditect-context-watcher"],
capture_output=True,
text=True
)
if result.returncode == 0:
print("✓ Service enabled")
else:
print(f"Warning: Could not enable service: {result.stderr}")
# Start service
result = subprocess.run(
["systemctl", "--user", "start", "coditect-context-watcher"],
capture_output=True,
text=True
)
if result.returncode == 0:
print("✓ Service started")
print()
print("Context watcher is now running!")
print("Check status: ~/.coditect/bin/codi-watcher --status")
print("Service status: systemctl --user status coditect-context-watcher")
return True
else:
print(f"Error starting service: {result.stderr}")
return False
def uninstall_macos(): """Uninstall launchd service on macOS.""" home = get_home() plist = home / "Library" / "LaunchAgents" / "ai.coditect.context-watcher.plist"
if plist.exists():
# Unload
subprocess.run(["launchctl", "unload", str(plist)], capture_output=True)
# Remove
plist.unlink()
print("✓ Service uninstalled")
else:
print("Service not installed")
def uninstall_linux(): """Uninstall systemd user service on Linux.""" home = get_home() service = home / ".config" / "systemd" / "user" / "coditect-context-watcher.service"
if service.exists():
# Stop
subprocess.run(
["systemctl", "--user", "stop", "coditect-context-watcher"],
capture_output=True
)
# Disable
subprocess.run(
["systemctl", "--user", "disable", "coditect-context-watcher"],
capture_output=True
)
# Remove
service.unlink()
# Reload
subprocess.run(["systemctl", "--user", "daemon-reload"], capture_output=True)
print("✓ Service uninstalled")
else:
print("Service not installed")
def status_macos(): """Check launchd service status on macOS.""" result = subprocess.run( ["launchctl", "list", "ai.coditect.context-watcher"], capture_output=True, text=True ) if result.returncode == 0: print("Service Status: RUNNING") print(result.stdout) else: print("Service Status: NOT RUNNING")
def status_linux(): """Check systemd service status on Linux.""" subprocess.run( ["systemctl", "--user", "status", "coditect-context-watcher"], capture_output=False )
def main(): parser = argparse.ArgumentParser( description="Install CODITECT Context Watcher as a system service", epilog=""" Examples: python3 install-context-watcher.py # Install (requires license acceptance) python3 install-context-watcher.py --status # Check service status python3 install-context-watcher.py --uninstall # Remove service python3 install-context-watcher.py --download # Force re-download python3 install-context-watcher.py --no-api # Skip API, use GitHub directly python3 install-context-watcher.py --accept-license # Non-interactive acceptance
Download Priority (default):
- CODITECT API (signed GCS URLs, tracked)
- GitHub releases (fallback)
- Build from source (if cargo available)
License: CODITECT is proprietary software. Copyright (c) 2025 AZ1.AI Inc. All Rights Reserved. See LICENSE file for full terms. """ ) parser.add_argument( "--status", action="store_true", help="Check service status" ) parser.add_argument( "--uninstall", action="store_true", help="Uninstall service" ) parser.add_argument( "--download", action="store_true", help="Force re-download binary (even if exists)" ) parser.add_argument( "--no-api", action="store_true", help="Skip API, download directly from GitHub releases" ) parser.add_argument( "--accept-license", action="store_true", help="Accept license non-interactively (or set CODITECT_ACCEPT_LICENSE=yes)" )
args = parser.parse_args()
system = platform.system()
use_api = not args.no_api
print(f"Platform: {system}")
if not use_api:
print(f"API: disabled (--no-api)")
print()
# Status and uninstall don't require license acceptance
if args.status:
if system == "Darwin":
status_macos()
elif system == "Linux":
status_linux()
return
if args.uninstall:
if system == "Darwin":
uninstall_macos()
elif system == "Linux":
uninstall_linux()
return
# Installation requires license acceptance
non_interactive = args.accept_license or os.environ.get("CODITECT_ACCEPT_LICENSE", "").lower() == "yes"
if not prompt_license_acceptance(non_interactive=non_interactive):
sys.exit(1)
# Proceed with installation
if system == "Darwin":
install_macos(force_download=args.download, use_api=use_api)
elif system == "Linux":
install_linux(force_download=args.download, use_api=use_api)
else:
print(f"Platform {system} not supported yet")
print("Please run the daemon manually:")
print(" python3 ~/.coditect/scripts/context-watcher-daemon.py")
if name == "main": main()