#!/usr/bin/env python3 """ CODITECT MEMORY-CONTEXT Database Backup & Restore
Backs up SQLite database and ChromaDB vector storage. Supports automated daily backups and restore functionality.
Usage: python3 scripts/core/db_backup.py backup # Create backup python3 scripts/core/db_backup.py restore BACKUP # Restore from backup python3 scripts/core/db_backup.py list # List backups python3 scripts/core/db_backup.py cleanup --days 30 # Delete old backups
Author: AZ1.AI CODITECT Team Sprint: Sprint +1 - MEMORY-CONTEXT Implementation Day 3 Date: 2025-11-16 """
import os import sys import shutil import argparse import json import logging import sqlite3 from pathlib import Path from datetime import datetime, timezone, timedelta from typing import List, Optional
Add project root to path
PROJECT_ROOT = Path(file).parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT))
Setup logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('db_backup.log') ] ) logger = logging.getLogger(name)
Custom exceptions
class DatabaseBackupError(Exception): """Base exception for database backup errors""" pass
class BackupNotFoundError(DatabaseBackupError): """Backup file or directory not found""" pass
class RestoreError(DatabaseBackupError): """Error during restore operation""" pass
class DatabaseBackup: """Backup and restore CODITECT MEMORY-CONTEXT database."""
def __init__(
self,
db_path: Path,
chroma_dir: Path,
backup_dir: Path,
encrypt: bool = False,
key_file: Optional[Path] = None,
passphrase: Optional[str] = None,
compress: bool = False,
compress_algo: str = "gzip",
compress_level: Optional[int] = None,
gcs_upload: bool = False,
gcs_bucket: Optional[str] = None,
gcs_project_scope: Optional[str] = None,
gcs_tenant_scope: Optional[str] = None,
):
"""
Initialize database backup.
Args:
db_path: Path to SQLite database
chroma_dir: Path to ChromaDB directory
backup_dir: Path to backup storage directory
encrypt: Enable AES-256-GCM encryption at rest
key_file: Path to hex-encoded key file
passphrase: Passphrase for key derivation
compress: Enable compression before upload
compress_algo: Compression algorithm ("gzip" or "zstd")
compress_level: Compression level (algorithm-specific)
gcs_upload: Upload backup to Google Cloud Storage after creation
gcs_bucket: Override GCS bucket name
gcs_project_scope: CODITECT project scope for GCS paths (ADR-159)
gcs_tenant_scope: CODITECT tenant scope for GCS paths (ADR-159)
"""
self.db_path = Path(db_path)
self.chroma_dir = Path(chroma_dir)
self.backup_dir = Path(backup_dir)
self.encrypt = encrypt
self.key_file = Path(key_file) if key_file else None
self.passphrase = passphrase
self.compress = compress
self.compress_algo = compress_algo
self.compress_level = compress_level
self.gcs_upload = gcs_upload
self.gcs_bucket = gcs_bucket
self.gcs_project_scope = gcs_project_scope
self.gcs_tenant_scope = gcs_tenant_scope
# Ensure backup directory exists
self.backup_dir.mkdir(parents=True, exist_ok=True)
def get_backup_name(self, timestamp: Optional[datetime] = None) -> str:
"""
Generate backup name with timestamp.
Args:
timestamp: Optional timestamp (default: now)
Returns:
Backup name (e.g., "backup_2025-11-16T12-30-45Z")
"""
if timestamp is None:
timestamp = datetime.now(timezone.utc)
return f"backup_{timestamp.strftime('%Y-%m-%dT%H-%M-%SZ')}"
def backup_sqlite(self, backup_path: Path) -> None:
"""
Backup SQLite database using online backup API.
Args:
backup_path: Path to backup database file
"""
if not self.db_path.exists():
raise FileNotFoundError(f"Database not found: {self.db_path}")
# Use SQLite online backup API for consistent backup
source_conn = sqlite3.connect(str(self.db_path))
backup_conn = sqlite3.connect(str(backup_path))
with backup_conn:
source_conn.backup(backup_conn)
source_conn.close()
backup_conn.close()
logger.info(f"SQLite backup complete: {backup_path.name}")
# Get backup size
backup_size_kb = backup_path.stat().st_size / 1024
logger.info(f" Backup size: {backup_size_kb:.2f} KB")
def backup_chromadb(self, backup_path: Path) -> None:
"""
Backup ChromaDB directory.
Args:
backup_path: Path to backup directory
"""
if not self.chroma_dir.exists():
logger.warning(f"ChromaDB directory not found: {self.chroma_dir}")
logger.warning("Skipping ChromaDB backup")
return
# Copy entire ChromaDB directory
shutil.copytree(self.chroma_dir, backup_path, dirs_exist_ok=True)
logger.info(f"ChromaDB backup complete: {backup_path.name}")
# Get backup size
total_size = sum(
f.stat().st_size
for f in backup_path.rglob('*')
if f.is_file()
)
backup_size_mb = total_size / (1024 * 1024)
logger.info(f" Backup size: {backup_size_mb:.2f} MB")
def create_backup(self) -> Path:
"""
Create full backup (SQLite + ChromaDB).
Returns:
Path to backup directory
Raises:
DatabaseBackupError: If backup creation fails
"""
backup_path = None
try:
logger.info("Starting backup creation...")
# Generate backup name
backup_name = self.get_backup_name()
backup_path = self.backup_dir / backup_name
# Create backup directory
logger.debug(f"Creating backup directory: {backup_path}")
backup_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Creating backup: {backup_name}")
# Backup SQLite
sqlite_backup = backup_path / "memory-context.db"
self.backup_sqlite(sqlite_backup)
# Backup ChromaDB
chromadb_backup = backup_path / "chromadb"
self.backup_chromadb(chromadb_backup)
# Compress if requested (J.20.1.7) — before encryption
compression_meta = {}
if self.compress:
try:
from backup_compress import compress_file, compress_directory
except ImportError:
from scripts.core.backup_compress import compress_file, compress_directory
logger.info(f"Compressing backup files ({self.compress_algo})...")
# Compress SQLite backup
if sqlite_backup.exists():
comp_meta = compress_file(
sqlite_backup,
algorithm=self.compress_algo,
level=self.compress_level,
)
sqlite_backup.unlink() # Remove uncompressed
suffix = ".gz" if self.compress_algo == "gzip" else ".zst"
sqlite_backup = backup_path / f"memory-context.db{suffix}"
compression_meta["sqlite"] = comp_meta
# Compress ChromaDB directory
if chromadb_backup.exists():
dir_meta = compress_directory(
chromadb_backup,
algorithm=self.compress_algo,
level=self.compress_level,
)
shutil.rmtree(chromadb_backup) # Remove uncompressed
ext = ".tar.gz" if self.compress_algo == "gzip" else ".tar.zst"
chromadb_backup = backup_path / f"chromadb{ext}"
compression_meta["chromadb"] = dir_meta
compression_meta["algorithm"] = self.compress_algo
logger.info("Compression complete.")
# Encrypt if requested (J.20.1.6) — after compression
encryption_meta = {}
if self.encrypt:
try:
from backup_crypto import (
get_encryption_key, encrypt_file, encrypt_directory, is_encrypted
)
except ImportError:
from scripts.core.backup_crypto import (
get_encryption_key, encrypt_file, encrypt_directory, is_encrypted
)
enc_key, salt = get_encryption_key(
key_file=self.key_file,
passphrase=self.passphrase,
)
logger.info("Encrypting backup files (AES-256-GCM)...")
# Encrypt SQLite backup (may be .db, .db.gz, or .db.zst)
enc_output = backup_path / (sqlite_backup.name + ".enc")
enc_meta = encrypt_file(
sqlite_backup,
enc_output,
enc_key,
salt,
)
sqlite_backup.unlink() # Remove plaintext/compressed
encryption_meta["sqlite"] = enc_meta
# Encrypt ChromaDB backup (may be dir, .tar.gz, or .tar.zst)
if chromadb_backup.exists():
if chromadb_backup.is_file():
# Already compressed to a file — encrypt as file
chroma_enc_out = backup_path / (chromadb_backup.name + ".enc")
dir_meta = encrypt_file(
chromadb_backup, chroma_enc_out, enc_key, salt,
)
chromadb_backup.unlink()
else:
# Raw directory — encrypt as tar archive
chroma_enc_out = backup_path / "chromadb.tar.enc"
dir_meta = encrypt_directory(
chromadb_backup, chroma_enc_out, enc_key, salt,
)
shutil.rmtree(chromadb_backup)
encryption_meta["chromadb"] = dir_meta
encryption_meta["encrypted"] = True
encryption_meta["algorithm"] = "AES-256-GCM"
encryption_meta["kdf"] = "PBKDF2-HMAC-SHA256" if salt else "raw"
logger.info("Encryption complete.")
# Create backup metadata
metadata = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'db_path': str(self.db_path),
'chroma_dir': str(self.chroma_dir),
'backup_name': backup_name,
'compressed': self.compress,
'encrypted': self.encrypt,
}
if compression_meta:
metadata['compression'] = compression_meta
if encryption_meta:
metadata['encryption'] = encryption_meta
metadata_file = backup_path / "backup_metadata.json"
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
# Also write legacy .txt format for backward compatibility
metadata_txt = backup_path / "backup_metadata.txt"
with open(metadata_txt, 'w') as f:
f.write(f"timestamp: {metadata['timestamp']}\n")
f.write(f"db_path: {metadata['db_path']}\n")
f.write(f"chroma_dir: {metadata['chroma_dir']}\n")
f.write(f"backup_name: {metadata['backup_name']}\n")
f.write(f"encrypted: {metadata['encrypted']}\n")
logger.info(f"✅ Backup created successfully: {backup_path}")
# Upload to GCS if requested (J.20.2)
if self.gcs_upload:
try:
from backup_gcs import GCSBackupClient
except ImportError:
from scripts.core.backup_gcs import GCSBackupClient
gcs = GCSBackupClient(
bucket_name=self.gcs_bucket,
project_scope=self.gcs_project_scope,
tenant_scope=self.gcs_tenant_scope,
)
tier_info = gcs.compute_gfs_tier()
gcs_result = gcs.upload_backup(
backup_path,
tier="daily",
promote_weekly=tier_info["promote_weekly"],
promote_monthly=tier_info["promote_monthly"],
)
metadata['gcs'] = gcs_result
# Update metadata file with GCS info
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(
f"✅ Uploaded to GCS: gs://{gcs_result['bucket']}/{gcs_result['prefix']} "
f"({gcs_result['total_mb']} MB)"
)
return backup_path
except FileNotFoundError as e:
logger.error(f"File not found during backup: {e}")
# Clean up partial backup
if backup_path and backup_path.exists():
shutil.rmtree(backup_path)
logger.debug(f"Cleaned up partial backup: {backup_path}")
raise DatabaseBackupError(f"Backup failed - file not found: {e}")
except PermissionError as e:
logger.error(f"Permission denied during backup: {e}")
raise DatabaseBackupError(f"Backup failed - permission denied: {e}")
except Exception as e:
logger.error(f"Unexpected error during backup: {e}", exc_info=True)
# Clean up partial backup
if backup_path and backup_path.exists():
try:
shutil.rmtree(backup_path)
logger.debug(f"Cleaned up partial backup: {backup_path}")
except Exception as cleanup_error:
logger.warning(f"Could not clean up partial backup: {cleanup_error}")
raise DatabaseBackupError(f"Backup failed: {e}")
def restore_sqlite(self, backup_path: Path) -> None:
"""
Restore SQLite database from backup.
Args:
backup_path: Path to backup database file
"""
if not backup_path.exists():
raise FileNotFoundError(f"Backup not found: {backup_path}")
# Create backup of current database before restoring
if self.db_path.exists():
current_backup = self.db_path.with_suffix('.db.before_restore')
shutil.copy2(self.db_path, current_backup)
logger.info(f"Created safety backup: {current_backup.name}")
# Restore from backup
shutil.copy2(backup_path, self.db_path)
logger.info(f"SQLite restore complete: {self.db_path.name}")
def restore_chromadb(self, backup_path: Path) -> None:
"""
Restore ChromaDB from backup.
Args:
backup_path: Path to backup ChromaDB directory
"""
if not backup_path.exists():
logger.warning(f"ChromaDB backup not found: {backup_path}")
logger.warning("Skipping ChromaDB restore")
return
# Create backup of current ChromaDB before restoring
if self.chroma_dir.exists():
current_backup = self.chroma_dir.parent / f"{self.chroma_dir.name}.before_restore"
if current_backup.exists():
shutil.rmtree(current_backup)
shutil.copytree(self.chroma_dir, current_backup)
logger.info(f"Created safety backup: {current_backup.name}")
# Remove current ChromaDB
shutil.rmtree(self.chroma_dir)
# Restore from backup
shutil.copytree(backup_path, self.chroma_dir)
logger.info(f"ChromaDB restore complete: {self.chroma_dir.name}")
def restore_backup(self, backup_name: str) -> None:
"""
Restore from backup.
Args:
backup_name: Name of backup to restore
Raises:
BackupNotFoundError: If backup not found
RestoreError: If restore operation fails
"""
try:
logger.info(f"Starting restore from backup: {backup_name}")
backup_path = self.backup_dir / backup_name
if not backup_path.exists():
logger.error(f"Backup not found: {backup_name}")
raise BackupNotFoundError(f"Backup not found: {backup_name}")
if not backup_path.is_dir():
logger.error(f"Backup path is not a directory: {backup_path}")
raise BackupNotFoundError(f"Invalid backup path: {backup_name}")
logger.info(f"Restoring from backup: {backup_name}")
# Phase 1: Detect and decrypt encrypted files (J.20.1.6)
temp_files = [] # Track temporary files for cleanup
enc_files = [f for f in backup_path.iterdir() if f.name.endswith(".enc")]
if enc_files:
try:
from backup_crypto import decrypt_file
except ImportError:
from scripts.core.backup_crypto import decrypt_file
logger.info("Detected encrypted backup, decrypting...")
for enc_file in enc_files:
# Strip .enc to get output name
dec_name = enc_file.name[:-4] # remove ".enc"
dec_path = backup_path / dec_name
decrypt_file(
enc_file, dec_path,
key_file=self.key_file, passphrase=self.passphrase,
)
temp_files.append(dec_path)
logger.info("Decryption complete.")
# Phase 2: Detect and decompress compressed files (J.20.1.7)
compressed_files = [
f for f in backup_path.iterdir()
if f.is_file() and (
f.name.endswith(".gz") or f.name.endswith(".zst")
) and not f.name.endswith(".enc")
]
if compressed_files:
try:
from backup_compress import decompress_file, decompress_directory
except ImportError:
from scripts.core.backup_compress import decompress_file, decompress_directory
logger.info("Detected compressed backup, decompressing...")
for comp_file in compressed_files:
if ".tar." in comp_file.name:
# Compressed directory archive
decompress_directory(comp_file, backup_path)
temp_files.append(comp_file)
else:
# Compressed single file
decompress_file(comp_file)
temp_files.append(comp_file)
logger.info("Decompression complete.")
# Phase 3: Restore plain files
sqlite_backup = backup_path / "memory-context.db"
self.restore_sqlite(sqlite_backup)
chromadb_backup = backup_path / "chromadb"
self.restore_chromadb(chromadb_backup)
# Phase 4: Clean up temporary decrypted/decompressed files
if temp_files:
for tmp in temp_files:
if tmp.is_file():
tmp.unlink()
elif tmp.is_dir():
shutil.rmtree(tmp)
# Also clean up restored plain files that came from decrypt/decompress
if sqlite_backup.exists() and sqlite_backup not in temp_files:
for enc in enc_files + compressed_files:
if "memory-context" in enc.name:
sqlite_backup.unlink()
break
chroma_plain = backup_path / "chromadb"
if chroma_plain.exists() and chroma_plain.is_dir():
for enc in enc_files + compressed_files:
if "chromadb" in enc.name:
shutil.rmtree(chroma_plain)
break
logger.info("Cleaned up temporary files.")
logger.info(f"✅ Restore completed successfully")
except BackupNotFoundError:
raise
except FileNotFoundError as e:
logger.error(f"File not found during restore: {e}")
raise RestoreError(f"Restore failed - file not found: {e}")
except PermissionError as e:
logger.error(f"Permission denied during restore: {e}")
raise RestoreError(f"Restore failed - permission denied: {e}")
except Exception as e:
logger.error(f"Unexpected error during restore: {e}", exc_info=True)
raise RestoreError(f"Restore failed: {e}")
def list_backups(self) -> List[dict]:
"""
List all available backups.
Returns:
List of backup metadata dictionaries
"""
backups = []
for backup_path in sorted(self.backup_dir.glob("backup_*")):
if backup_path.is_dir():
metadata = {'name': backup_path.name, 'path': str(backup_path)}
# Try JSON metadata first (v2), fall back to txt (v1)
json_meta = backup_path / "backup_metadata.json"
txt_meta = backup_path / "backup_metadata.txt"
if json_meta.exists():
with open(json_meta, 'r') as f:
metadata.update(json.load(f))
elif txt_meta.exists():
with open(txt_meta, 'r') as f:
for line in f:
if ':' in line:
key, value = line.strip().split(':', 1)
metadata[key.strip()] = value.strip()
# Get size
total_size = sum(
f.stat().st_size
for f in backup_path.rglob('*')
if f.is_file()
)
metadata['size_mb'] = total_size / (1024 * 1024)
backups.append(metadata)
return backups
def cleanup_old_backups(self, days: int = 30) -> int:
"""
Delete backups older than specified days.
Args:
days: Delete backups older than this many days
Returns:
Number of backups deleted
"""
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
deleted_count = 0
for backup in self.list_backups():
backup_timestamp = datetime.fromisoformat(backup.get('timestamp', ''))
if backup_timestamp < cutoff_date:
backup_path = Path(backup['path'])
shutil.rmtree(backup_path)
logger.info(f"Deleted old backup: {backup['name']}")
deleted_count += 1
if deleted_count > 0:
logger.info(f"✅ Cleaned up {deleted_count} old backups")
else:
logger.info("No old backups to clean up")
return deleted_count
def main(): """Main entry point.""" try: parser = argparse.ArgumentParser( description='Backup and restore CODITECT MEMORY-CONTEXT database' ) parser.add_argument( 'command', choices=['backup', 'restore', 'list', 'cleanup'], help='Command to execute' ) parser.add_argument( 'backup_name', nargs='?', help='Backup name (required for restore)' ) parser.add_argument( '--days', type=int, default=30, help='Delete backups older than N days (cleanup command)' ) parser.add_argument( '--db-path', type=str, default=None, help='Custom database path' ) parser.add_argument( '--chroma-dir', type=str, default=None, help='Custom ChromaDB directory' ) parser.add_argument( '--backup-dir', type=str, default=None, help='Custom backup directory' ) parser.add_argument( '--encrypt', action='store_true', default=False, help='Encrypt backup at rest (AES-256-GCM). Requires key via env, --key-file, or --passphrase.' ) parser.add_argument( '--key-file', type=str, default=None, help='Path to hex-encoded 32-byte encryption key file' ) parser.add_argument( '--passphrase', type=str, default=None, help='Passphrase for encryption key derivation (PBKDF2-HMAC-SHA256)' ) parser.add_argument( '--compress', action='store_true', default=False, help='Compress backup files before upload (gzip by default, or zstd if specified).' ) parser.add_argument( '--compress-algo', type=str, choices=['gzip', 'zstd'], default='gzip', help='Compression algorithm: gzip (default, stdlib) or zstd (requires python-zstandard).' ) parser.add_argument( '--compress-level', type=int, default=None, help='Compression level (gzip: 1-9, default 6; zstd: 1-22, default 3).' ) parser.add_argument( '--gcs-upload', action='store_true', default=False, help='Upload backup to Google Cloud Storage after creation.' ) parser.add_argument( '--gcs-bucket', type=str, default=None, help='Override GCS bucket name (default: {project-id}-context-backups).' ) parser.add_argument( '--gcs-project-scope', type=str, default=None, help='CODITECT project scope for GCS paths (ADR-159).' ) parser.add_argument( '--gcs-tenant-scope', type=str, default=None, help='CODITECT tenant scope for GCS paths (ADR-159).' ) parser.add_argument( '--multi-db', action='store_true', default=False, help='Multi-database backup mode (J.20.3). Backs up all databases per ADR-118 tiers.' ) parser.add_argument( '--databases', nargs='+', choices=['org', 'sessions', 'projects', 'platform', 'messaging', 'call_graph'], default=None, help='Specific databases to backup/restore (requires --multi-db).' ) parser.add_argument( '--incremental', action='store_true', default=False, help='Incremental backup: only backup databases that changed (requires --multi-db).' ) parser.add_argument( '--verify', action='store_true', default=False, help='Verify backup checksums after creation.' )
args = parser.parse_args()
# Determine paths - ADR-114 & ADR-118: Use centralized path discovery
# ADR-118: Backup prioritizes org.db (TIER 2 CRITICAL)
try:
from paths import get_org_db_path, ORG_DB, get_context_storage_dir
context_storage = get_context_storage_dir()
default_db_path = ORG_DB # CRITICAL - prioritize org.db for backup
except ImportError:
# Fallback for backward compatibility
_user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if _user_data.exists():
context_storage = _user_data
else:
context_storage = PROJECT_ROOT / "context-storage"
default_db_path = context_storage / "org.db"
db_path = Path(args.db_path) if args.db_path else default_db_path
chroma_dir = Path(args.chroma_dir) if args.chroma_dir else context_storage / "chromadb"
backup_dir = Path(args.backup_dir) if args.backup_dir else context_storage / "backups"
logger.info(f"Starting {args.command} operation")
# Create backup handler
backup = DatabaseBackup(
db_path=db_path,
chroma_dir=chroma_dir,
backup_dir=backup_dir,
encrypt=args.encrypt,
key_file=Path(args.key_file) if args.key_file else None,
passphrase=args.passphrase,
compress=args.compress,
compress_algo=args.compress_algo,
compress_level=args.compress_level,
gcs_upload=args.gcs_upload,
gcs_bucket=args.gcs_bucket,
gcs_project_scope=args.gcs_project_scope,
gcs_tenant_scope=args.gcs_tenant_scope,
)
# Execute command
if args.command == 'backup':
# Multi-database mode (J.20.3)
if args.multi_db:
try:
from backup_databases import MultiDatabaseBackup
except ImportError:
from scripts.core.backup_databases import MultiDatabaseBackup
mdb = MultiDatabaseBackup(
backup_dir=backup_dir,
encrypt=args.encrypt,
key_file=Path(args.key_file) if args.key_file else None,
passphrase=args.passphrase,
compress=args.compress,
compress_algo=args.compress_algo,
compress_level=args.compress_level,
)
result = mdb.backup_all(
db_names=args.databases,
incremental=args.incremental,
)
if args.verify and result.get("path"):
verification = mdb.verify_backup(Path(result["path"]))
result["verification"] = verification
print()
print("=" * 70)
print("MULTI-DATABASE BACKUP")
print("=" * 70)
print()
import json as _json
print(_json.dumps(result, indent=2, default=str))
print()
return 0
# Single-database mode (legacy)
backup_path = backup.create_backup()
if args.verify:
try:
from backup_databases import verify_checksums, write_checksums
except ImportError:
from scripts.core.backup_databases import verify_checksums, write_checksums
write_checksums(backup_path)
logger.info("Checksums written and verified.")
print()
print("=" * 70)
print("BACKUP CREATED")
print("=" * 70)
print()
print(f"Backup location: {backup_path}")
print()
return 0
elif args.command == 'restore':
if not args.backup_name:
logger.error("Backup name required for restore")
print("❌ Error: backup_name required for restore", file=sys.stderr)
print("Usage: db_backup.py restore <backup_name>", file=sys.stderr)
print("Run 'db_backup.py list' to see available backups", file=sys.stderr)
return 1
backup.restore_backup(args.backup_name)
print()
print("=" * 70)
print("RESTORE COMPLETE")
print("=" * 70)
print()
return 0
elif args.command == 'list':
backups = backup.list_backups()
print()
print("=" * 70)
print("AVAILABLE BACKUPS")
print("=" * 70)
print()
if not backups:
print("No backups found.")
else:
for b in backups:
print(f"Backup: {b['name']}")
print(f" Created: {b.get('timestamp', 'Unknown')}")
print(f" Size: {b.get('size_mb', 0):.2f} MB")
print()
return 0
elif args.command == 'cleanup':
deleted = backup.cleanup_old_backups(days=args.days)
print()
print("=" * 70)
print("CLEANUP COMPLETE")
print("=" * 70)
print()
print(f"Deleted {deleted} backups older than {args.days} days")
print()
return 0
except BackupNotFoundError as e:
logger.error(f"Backup not found: {e}")
print()
print("=" * 70)
print("BACKUP NOT FOUND")
print("=" * 70)
print()
print(f"❌ Error: {e}")
print(f"See db_backup.log for details")
print()
return 1
except RestoreError as e:
logger.error(f"Restore error: {e}")
print()
print("=" * 70)
print("RESTORE FAILED")
print("=" * 70)
print()
print(f"❌ Error: {e}")
print(f"See db_backup.log for details")
print()
return 1
except DatabaseBackupError as e:
logger.error(f"Backup/restore error: {e}")
print()
print("=" * 70)
print("OPERATION FAILED")
print("=" * 70)
print()
print(f"❌ Error: {e}")
print(f"See db_backup.log for details")
print()
return 1
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
print()
print("=" * 70)
print("COMMAND FAILED - UNEXPECTED ERROR")
print("=" * 70)
print()
print(f"❌ Unexpected error: {e}")
print(f"See db_backup.log for details")
print()
return 1
if name == "main": sys.exit(main())