Skip to main content

#!/usr/bin/env python3 """ Backup Status Script

Provides detailed backup status information:

  • Lists recent backups with sizes and dates
  • Shows storage usage statistics
  • Displays backup history from local records
  • Checks backup health and alerts

Usage: python3 scripts/backup-status.py [options]

Options: --list List recent backups (default) --history Show local backup history --usage Show storage usage statistics --health Check backup health --json Output in JSON format --limit N Limit results to N entries (default: 10) """

import argparse import json import os import subprocess import sys from datetime import datetime, timedelta from pathlib import Path

Configuration

CODITECT_DIR = Path.home() / ".coditect" SCRIPT_DIR = Path(file).parent

ADR-114 & ADR-118: Use centralized path discovery

sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_context_storage_dir, CONTEXT_STORAGE BACKUP_HISTORY_FILE = CONTEXT_STORAGE / "backup-history.json" except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): BACKUP_HISTORY_FILE = _user_data / "backup-history.json" else: BACKUP_HISTORY_FILE = CODITECT_DIR / "context-storage" / "backup-history.json"

def get_gcp_project() -> str: """Get current GCP project ID.""" try: result = subprocess.run( ["gcloud", "config", "get-value", "project"], capture_output=True, text=True, timeout=10 ) return result.stdout.strip() except Exception: return None

def get_bucket_name() -> str: """Get the backup bucket name.""" project = get_gcp_project() if project: return f"gs://{project}-context-backups" return None

def list_gcs_backups(limit: int = 10) -> list: """List backups from GCS.""" bucket = get_bucket_name() if not bucket: return []

try:
# First, list date directories
result = subprocess.run(
["gsutil", "ls", f"{bucket}/coditect-core/"],
capture_output=True,
text=True,
timeout=30
)

if result.returncode != 0:
return []

backups = []
date_dirs = []

# Collect date directories
for line in result.stdout.strip().split("\n"):
line = line.strip()
if line and line.endswith("/"):
# Extract date from path like gs://bucket/coditect-core/2026-01-18/
path_parts = line.rstrip("/").split("/")
date_part = path_parts[-1]
# Validate it looks like a date (YYYY-MM-DD)
if len(date_part) == 10 and date_part[4] == "-" and date_part[7] == "-":
date_dirs.append((date_part, line))

# Sort dates descending and limit to recent ones
date_dirs.sort(key=lambda x: x[0], reverse=True)
date_dirs = date_dirs[:20] # Check last 20 dates max

# For each date, list time directories
for date, date_path in date_dirs:
try:
time_result = subprocess.run(
["gsutil", "ls", date_path],
capture_output=True,
text=True,
timeout=15
)
if time_result.returncode == 0:
for time_line in time_result.stdout.strip().split("\n"):
time_line = time_line.strip()
if time_line and time_line.endswith("/"):
# Extract time from path
time_parts = time_line.rstrip("/").split("/")
time_part = time_parts[-1]
# Validate it looks like a time (HH-MM-SS)
if len(time_part) == 8 and time_part[2] == "-" and time_part[5] == "-":
backups.append({
"path": time_line,
"date": date,
"time": time_part,
"full_timestamp": f"{date}/{time_part}"
})
except subprocess.TimeoutExpired:
continue

# Early exit if we have enough backups
if len(backups) >= limit:
break

# Sort by date/time descending
backups.sort(key=lambda x: x["full_timestamp"], reverse=True)
return backups[:limit]

except Exception as e:
return []

def get_backup_details(backup_path: str) -> dict: """Get detailed info for a specific backup.""" try: result = subprocess.run( ["gsutil", "du", "-s", backup_path], capture_output=True, text=True, timeout=30 )

    if result.returncode == 0:
parts = result.stdout.strip().split()
if parts:
size_bytes = int(parts[0])
return {
"path": backup_path,
"size_bytes": size_bytes,
"size_mb": round(size_bytes / (1024 ** 2), 1),
"size_gb": round(size_bytes / (1024 ** 3), 2)
}
except Exception:
pass

return {"path": backup_path, "size_bytes": 0}

def get_storage_usage() -> dict: """Get total storage usage statistics.""" bucket = get_bucket_name() if not bucket: return {"error": "Could not determine bucket name"}

try:
# Total bucket size
result = subprocess.run(
["gsutil", "du", "-s", f"{bucket}/"],
capture_output=True,
text=True,
timeout=60
)

total_bytes = 0
if result.returncode == 0:
parts = result.stdout.strip().split()
if parts:
total_bytes = int(parts[0])

# Count backups
backups = list_gcs_backups(limit=1000)

return {
"bucket": bucket,
"total_bytes": total_bytes,
"total_mb": round(total_bytes / (1024 ** 2), 1),
"total_gb": round(total_bytes / (1024 ** 3), 2),
"backup_count": len(backups),
"oldest_backup": backups[-1]["full_timestamp"] if backups else None,
"newest_backup": backups[0]["full_timestamp"] if backups else None
}

except Exception as e:
return {"error": str(e)}

def load_local_history() -> list: """Load local backup history.""" if BACKUP_HISTORY_FILE.exists(): try: with open(BACKUP_HISTORY_FILE) as f: return json.load(f) except (json.JSONDecodeError, IOError): return [] return []

def check_backup_health() -> dict: """Check backup health and generate alerts.""" health = { "status": "healthy", "alerts": [], "checks": [] }

# Check 1: Recent backup exists
backups = list_gcs_backups(limit=1)
if backups:
latest = backups[0]
try:
latest_date = datetime.strptime(latest["date"], "%Y-%m-%d")
days_old = (datetime.now() - latest_date).days

if days_old > 7:
health["alerts"].append(f"No backup in {days_old} days")
health["status"] = "warning"
elif days_old > 30:
health["alerts"].append(f"Backup is {days_old} days old - critical")
health["status"] = "critical"

health["checks"].append({
"check": "recent_backup",
"passed": days_old <= 7,
"days_old": days_old,
"latest": latest["full_timestamp"]
})
except ValueError:
health["checks"].append({
"check": "recent_backup",
"passed": False,
"error": "Could not parse backup date"
})
else:
health["alerts"].append("No backups found")
health["status"] = "critical"
health["checks"].append({
"check": "recent_backup",
"passed": False,
"error": "No backups found"
})

# Check 2: GCS authentication
project = get_gcp_project()
health["checks"].append({
"check": "gcs_auth",
"passed": project is not None,
"project": project
})
if not project:
health["alerts"].append("GCS authentication issue")
if health["status"] == "healthy":
health["status"] = "warning"

# Check 3: Storage usage
usage = get_storage_usage()
if "error" not in usage:
health["checks"].append({
"check": "storage_usage",
"passed": True,
"total_gb": usage.get("total_gb", 0)
})
else:
health["checks"].append({
"check": "storage_usage",
"passed": False,
"error": usage.get("error")
})

return health

def format_size(size_bytes: int) -> str: """Format size in human-readable form.""" if size_bytes >= 1024 ** 3: return f"{size_bytes / (1024 ** 3):.2f}GB" elif size_bytes >= 1024 ** 2: return f"{size_bytes / (1024 ** 2):.1f}MB" else: return f"{size_bytes / 1024:.1f}KB"

def print_backup_list(backups: list, json_output: bool = False): """Print backup list.""" if json_output: # Get sizes for each backup detailed = [] for b in backups[:5]: # Only get details for first 5 to avoid timeout details = get_backup_details(b["path"]) detailed.append({**b, **details}) print(json.dumps(detailed, indent=2)) else: print("=" * 70) print("Recent Backups") print("=" * 70) print(f"{'Date':<12} {'Time':<10} {'Path'}") print("-" * 70) for b in backups: print(f"{b['date']:<12} {b['time']:<10} {b['path']}") print("=" * 70) print(f"Total: {len(backups)} backups shown")

def print_storage_usage(usage: dict, json_output: bool = False): """Print storage usage.""" if json_output: print(json.dumps(usage, indent=2)) else: print("=" * 50) print("Storage Usage") print("=" * 50) if "error" in usage: print(f"Error: {usage['error']}") else: print(f"Bucket: {usage['bucket']}") print(f"Total Size: {format_size(usage['total_bytes'])}") print(f"Backup Count: {usage['backup_count']}") print(f"Oldest: {usage['oldest_backup']}") print(f"Newest: {usage['newest_backup']}") print("=" * 50)

def print_health_check(health: dict, json_output: bool = False): """Print health check results.""" if json_output: print(json.dumps(health, indent=2)) else: status_icons = {"healthy": "[OK]", "warning": "[WARN]", "critical": "[CRIT]"} print("=" * 50) print(f"Backup Health: {status_icons.get(health['status'], '?')} {health['status'].upper()}") print("=" * 50)

    for check in health["checks"]:
status = "[PASS]" if check["passed"] else "[FAIL]"
details = ", ".join(f"{k}={v}" for k, v in check.items() if k not in ["check", "passed"])
print(f"{status} {check['check']}: {details}")

if health["alerts"]:
print("-" * 50)
print("Alerts:")
for alert in health["alerts"]:
print(f" - {alert}")

print("=" * 50)

def print_local_history(history: list, limit: int, json_output: bool = False): """Print local backup history.""" history = history[-limit:][::-1] # Most recent first

if json_output:
print(json.dumps(history, indent=2))
else:
print("=" * 70)
print("Local Backup History")
print("=" * 70)
print(f"{'Timestamp':<22} {'Size':<10} {'Status'}")
print("-" * 70)
for entry in history:
ts = entry.get("timestamp", "")[:19]
size = format_size(entry.get("size_bytes", 0))
status = entry.get("status", "unknown")
print(f"{ts:<22} {size:<10} {status}")
print("=" * 70)
print(f"Total: {len(history)} entries")

def main(): parser = argparse.ArgumentParser(description="Backup status tool") parser.add_argument("--list", action="store_true", help="List recent backups") parser.add_argument("--history", action="store_true", help="Show local history") parser.add_argument("--usage", action="store_true", help="Show storage usage") parser.add_argument("--health", action="store_true", help="Check backup health") parser.add_argument("--json", action="store_true", help="Output JSON") parser.add_argument("--limit", type=int, default=10, help="Limit results")

args = parser.parse_args()

# Default to --list if no action specified
if not any([args.list, args.history, args.usage, args.health]):
args.list = True

if args.health:
health = check_backup_health()
print_health_check(health, args.json)
elif args.usage:
usage = get_storage_usage()
print_storage_usage(usage, args.json)
elif args.history:
history = load_local_history()
print_local_history(history, args.limit, args.json)
elif args.list:
backups = list_gcs_backups(args.limit)
print_backup_list(backups, args.json)

if name == "main": main()