#!/usr/bin/env python3 """ CODITECT Multi-Database Backup & Restore (J.20.3)
Extends the single-database backup system to support all CODITECT databases per ADR-118 tier architecture:
Tier 1: platform.db — Component metadata (regenerable) Tier 2: org.db — Decisions, learnings, error_solutions (CRITICAL) Tier 3: sessions.db — Messages, analytics (regenerable) Tier 4: projects.db — Project registry and configuration Extra: messaging.db — Inter-agent messaging Extra: call_graph.db — Code call graph index (regenerable)
Also provides:
- Incremental (delta) backup via SQLite page-level change tracking
- Backup verification with SHA-256 checksums
Usage: from backup_databases import MultiDatabaseBackup, DatabaseTier
mdb = MultiDatabaseBackup()
result = mdb.backup_all() # Backup all databases
result = mdb.backup_tier(DatabaseTier.CRITICAL) # Backup Tier 2 only
mdb.verify_backup(backup_path) # Verify checksums
mdb.restore_all(backup_path) # Restore all
CLI: python3 backup_databases.py backup --all python3 backup_databases.py backup --tier critical python3 backup_databases.py backup --databases org sessions python3 backup_databases.py backup --incremental python3 backup_databases.py verify <backup_path> python3 backup_databases.py restore <backup_path> --databases org sessions python3 backup_databases.py list python3 backup_databases.py info <backup_path>
Author: AZ1.AI CODITECT Team Task: J.20.3 (Database Support) Date: 2026-02-10 """
import argparse import hashlib import json import logging import os import shutil import sqlite3 import sys from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Dict, List, Optional, Set
Add project root to path
PROJECT_ROOT = Path(file).parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT))
logger = logging.getLogger(name)
---------------------------------------------------------------------------
Database tier definitions (ADR-118)
---------------------------------------------------------------------------
class DatabaseTier(Enum): """ADR-118 database tier classification.""" REGENERABLE_INDEX = 1 # platform.db, call_graph.db — can be rebuilt CRITICAL = 2 # org.db — irreplaceable decisions/learnings REGENERABLE_SESSION = 3 # sessions.db — messages, analytics CONFIGURATION = 4 # projects.db — project registry MESSAGING = 5 # messaging.db — inter-agent comms
Canonical database registry
DATABASE_REGISTRY: Dict[str, dict] = { "org": { "filename": "org.db", "tier": DatabaseTier.CRITICAL, "priority": 1, # Backup first "description": "Decisions, learnings, error_solutions (IRREPLACEABLE)", "path_func": "get_org_db_path", "constant": "ORG_DB", }, "sessions": { "filename": "sessions.db", "tier": DatabaseTier.REGENERABLE_SESSION, "priority": 2, "description": "Messages, analytics (regenerable)", "path_func": "get_sessions_db_path", "constant": "SESSIONS_DB", }, "projects": { "filename": "projects.db", "tier": DatabaseTier.CONFIGURATION, "priority": 3, "description": "Project registry and configuration", "path_func": "get_projects_db_path", "constant": "PROJECTS_DB", }, "platform": { "filename": "platform.db", "tier": DatabaseTier.REGENERABLE_INDEX, "priority": 4, "description": "Component metadata index (regenerable)", "path_func": None, # No dedicated function yet "constant": None, }, "messaging": { "filename": "messaging.db", "tier": DatabaseTier.MESSAGING, "priority": 5, "description": "Inter-agent messaging", "path_func": "get_messaging_db_path", "constant": "MESSAGING_DB", }, "call_graph": { "filename": "call_graph.db", "tier": DatabaseTier.REGENERABLE_INDEX, "priority": 6, "description": "Code call graph index (regenerable)", "path_func": None, "constant": None, }, }
---------------------------------------------------------------------------
Path resolution
---------------------------------------------------------------------------
def _get_context_storage_dir() -> Path: """Resolve context storage directory using paths module or fallback.""" try: from paths import get_context_storage_dir return get_context_storage_dir() except ImportError: pass
# Fallback
user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if user_data.exists():
return user_data
return PROJECT_ROOT / "context-storage"
def _resolve_db_path(db_name: str) -> Path: """Resolve the filesystem path for a named database.""" info = DATABASE_REGISTRY.get(db_name) if not info: raise ValueError(f"Unknown database: {db_name}")
# Try paths module first
if info["path_func"]:
try:
from paths import (
get_org_db_path, get_sessions_db_path,
get_projects_db_path, get_messaging_db_path,
)
func_map = {
"get_org_db_path": get_org_db_path,
"get_sessions_db_path": get_sessions_db_path,
"get_projects_db_path": get_projects_db_path,
"get_messaging_db_path": get_messaging_db_path,
}
func = func_map.get(info["path_func"])
if func:
return func()
except ImportError:
pass
# Fallback: filename in context storage dir
return _get_context_storage_dir() / info["filename"]
---------------------------------------------------------------------------
Checksum utilities (J.20.3.6)
---------------------------------------------------------------------------
def compute_sha256(filepath: Path, chunk_size: int = 8192) -> str: """Compute SHA-256 hex digest of a file.""" h = hashlib.sha256() with open(filepath, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break h.update(chunk) return h.hexdigest()
def write_checksums(backup_dir: Path) -> Dict[str, str]: """Write SHA-256 checksums for all files in a backup directory.
Returns:
Dict mapping relative filename to hex digest.
"""
checksums: Dict[str, str] = {}
for fpath in sorted(backup_dir.rglob("*")):
if fpath.is_file() and fpath.name != "checksums.json":
rel = str(fpath.relative_to(backup_dir))
checksums[rel] = compute_sha256(fpath)
checksum_file = backup_dir / "checksums.json"
with open(checksum_file, "w") as f:
json.dump(checksums, f, indent=2, sort_keys=True)
return checksums
def verify_checksums(backup_dir: Path) -> Dict[str, str]: """Verify SHA-256 checksums for a backup.
Returns:
Dict mapping filename to status: "ok", "mismatch", "missing", "extra".
Raises:
FileNotFoundError: If checksums.json not found.
"""
checksum_file = backup_dir / "checksums.json"
if not checksum_file.exists():
raise FileNotFoundError(f"No checksums.json in {backup_dir}")
with open(checksum_file) as f:
expected = json.load(f)
results: Dict[str, str] = {}
# Check expected files
for rel, expected_hash in expected.items():
fpath = backup_dir / rel
if not fpath.exists():
results[rel] = "missing"
else:
actual = compute_sha256(fpath)
results[rel] = "ok" if actual == expected_hash else "mismatch"
# Check for extra files
for fpath in backup_dir.rglob("*"):
if fpath.is_file() and fpath.name != "checksums.json":
rel = str(fpath.relative_to(backup_dir))
if rel not in expected:
results[rel] = "extra"
return results
---------------------------------------------------------------------------
Incremental backup (J.20.3.5)
---------------------------------------------------------------------------
def compute_page_hash(db_path: Path) -> str: """Compute a hash over SQLite database content for change detection.
Uses the SQLite data_version pragma + file size + mtime as a fast
fingerprint. For true page-level deltas, we'd need WAL inspection,
but this gives us reliable change detection.
"""
if not db_path.exists():
return "missing"
stat = db_path.stat()
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.execute("PRAGMA data_version")
data_version = cursor.fetchone()[0]
conn.close()
except Exception:
data_version = 0
fingerprint = f"{db_path.name}:{stat.st_size}:{stat.st_mtime_ns}:{data_version}"
return hashlib.sha256(fingerprint.encode()).hexdigest()[:16]
def detect_changed_databases( snapshot_file: Path, db_names: Optional[List[str]] = None, ) -> Dict[str, bool]: """Compare current database fingerprints against last snapshot.
Args:
snapshot_file: Path to JSON file with previous fingerprints.
db_names: Database names to check (default: all).
Returns:
Dict mapping db_name to True if changed since last snapshot.
"""
if db_names is None:
db_names = list(DATABASE_REGISTRY.keys())
# Load previous snapshot
previous: Dict[str, str] = {}
if snapshot_file.exists():
with open(snapshot_file) as f:
previous = json.load(f)
# Compute current fingerprints
current: Dict[str, str] = {}
changed: Dict[str, bool] = {}
for name in db_names:
db_path = _resolve_db_path(name)
fp = compute_page_hash(db_path)
current[name] = fp
changed[name] = (fp != previous.get(name, ""))
# Save current snapshot
snapshot_file.parent.mkdir(parents=True, exist_ok=True)
with open(snapshot_file, "w") as f:
json.dump(current, f, indent=2)
return changed
---------------------------------------------------------------------------
Multi-Database Backup class
---------------------------------------------------------------------------
class MultiDatabaseBackup: """Backup and restore multiple CODITECT databases."""
def __init__(
self,
backup_dir: Optional[Path] = None,
encrypt: bool = False,
key_file: Optional[Path] = None,
passphrase: Optional[str] = None,
compress: bool = False,
compress_algo: str = "gzip",
compress_level: Optional[int] = None,
):
self.backup_dir = backup_dir or (_get_context_storage_dir() / "backups")
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.encrypt = encrypt
self.key_file = key_file
self.passphrase = passphrase
self.compress = compress
self.compress_algo = compress_algo
self.compress_level = compress_level
self._snapshot_file = self.backup_dir / ".incremental_snapshot.json"
# ------------------------------------------------------------------
# Core backup (J.20.3.1 – J.20.3.4)
# ------------------------------------------------------------------
def _backup_single_db(self, db_name: str, dest_dir: Path) -> Dict:
"""Backup a single database using SQLite online backup API.
Returns metadata dict for the backed-up database.
"""
info = DATABASE_REGISTRY[db_name]
db_path = _resolve_db_path(db_name)
result = {
"name": db_name,
"filename": info["filename"],
"tier": info["tier"].name,
"priority": info["priority"],
"source": str(db_path),
"exists": db_path.exists(),
}
if not db_path.exists():
logger.warning(f"Database not found, skipping: {db_name} ({db_path})")
result["status"] = "skipped"
result["reason"] = "not_found"
return result
# Validate it's a valid SQLite database
try:
conn = sqlite3.connect(str(db_path))
conn.execute("SELECT 1")
table_count = conn.execute(
"SELECT count(*) FROM sqlite_master WHERE type='table'"
).fetchone()[0]
conn.close()
result["table_count"] = table_count
except sqlite3.DatabaseError as e:
logger.warning(f"Invalid SQLite database, skipping: {db_name} — {e}")
result["status"] = "skipped"
result["reason"] = f"invalid_sqlite: {e}"
return result
# Perform online backup
backup_file = dest_dir / info["filename"]
source_conn = sqlite3.connect(str(db_path))
backup_conn = sqlite3.connect(str(backup_file))
with backup_conn:
source_conn.backup(backup_conn)
source_conn.close()
backup_conn.close()
result["size_bytes"] = backup_file.stat().st_size
result["size_kb"] = round(result["size_bytes"] / 1024, 2)
result["status"] = "ok"
logger.info(
f" {db_name}.db: {result['size_kb']} KB "
f"({table_count} tables, tier {info['tier'].name})"
)
return result
def backup_all(
self,
db_names: Optional[List[str]] = None,
incremental: bool = False,
) -> Dict:
"""Backup multiple databases into a single backup directory.
Args:
db_names: List of database names to back up (default: all).
incremental: Only backup databases that changed since last backup.
Returns:
Dict with backup metadata including per-database results.
"""
if db_names is None:
db_names = list(DATABASE_REGISTRY.keys())
# Sort by priority (critical first)
db_names = sorted(
db_names,
key=lambda n: DATABASE_REGISTRY.get(n, {}).get("priority", 99),
)
# Incremental: filter to changed databases only
if incremental:
changed = detect_changed_databases(self._snapshot_file, db_names)
original_count = len(db_names)
db_names = [n for n in db_names if changed.get(n, True)]
logger.info(
f"Incremental: {len(db_names)}/{original_count} databases changed"
)
if not db_names:
logger.info("No databases changed — nothing to backup.")
return {
"status": "no_changes",
"timestamp": datetime.now(timezone.utc).isoformat(),
"databases": [],
"incremental": True,
}
# Create backup directory
timestamp = datetime.now(timezone.utc)
backup_name = f"multidb_{timestamp.strftime('%Y-%m-%dT%H-%M-%SZ')}"
backup_path = self.backup_dir / backup_name
backup_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Starting multi-database backup: {backup_name}")
logger.info(f" Databases: {', '.join(db_names)}")
# Backup each database
db_results = []
total_size = 0
for db_name in db_names:
result = self._backup_single_db(db_name, backup_path)
db_results.append(result)
if result["status"] == "ok":
total_size += result.get("size_bytes", 0)
# Apply compression if requested
compression_meta = {}
if self.compress:
compression_meta = self._compress_backup(backup_path)
# Apply encryption if requested
encryption_meta = {}
if self.encrypt:
encryption_meta = self._encrypt_backup(backup_path)
# Generate checksums (J.20.3.6)
checksums = write_checksums(backup_path)
# Write backup manifest
manifest = {
"version": "2.0.0",
"type": "multi_database",
"timestamp": timestamp.isoformat(),
"backup_name": backup_name,
"incremental": incremental,
"databases": db_results,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 3),
"backed_up": [r["name"] for r in db_results if r["status"] == "ok"],
"skipped": [r["name"] for r in db_results if r["status"] == "skipped"],
"compressed": self.compress,
"encrypted": self.encrypt,
"checksum_count": len(checksums),
}
if compression_meta:
manifest["compression"] = compression_meta
if encryption_meta:
manifest["encryption"] = encryption_meta
manifest_file = backup_path / "backup_manifest.json"
with open(manifest_file, "w") as f:
json.dump(manifest, f, indent=2)
ok_count = len(manifest["backed_up"])
skip_count = len(manifest["skipped"])
logger.info(
f"Backup complete: {ok_count} databases backed up"
f"{f', {skip_count} skipped' if skip_count else ''}"
f" ({manifest['total_size_mb']} MB) → {backup_path}"
)
# Update incremental snapshot
detect_changed_databases(self._snapshot_file, manifest["backed_up"])
manifest["path"] = str(backup_path)
return manifest
def backup_tier(self, tier: DatabaseTier) -> Dict:
"""Backup all databases in a specific tier."""
db_names = [
name for name, info in DATABASE_REGISTRY.items()
if info["tier"] == tier
]
if not db_names:
raise ValueError(f"No databases in tier {tier.name}")
logger.info(f"Backing up tier {tier.name}: {', '.join(db_names)}")
return self.backup_all(db_names)
# ------------------------------------------------------------------
# Compression / Encryption helpers
# ------------------------------------------------------------------
def _compress_backup(self, backup_dir: Path) -> Dict:
"""Compress all .db files in backup directory."""
try:
from backup_compress import compress_file
except ImportError:
from scripts.core.backup_compress import compress_file
meta = {"algorithm": self.compress_algo, "files": []}
for db_file in sorted(backup_dir.glob("*.db")):
cm = compress_file(db_file, algorithm=self.compress_algo, level=self.compress_level)
db_file.unlink()
meta["files"].append(cm)
logger.info(f"Compressed {len(meta['files'])} database files ({self.compress_algo})")
return meta
def _encrypt_backup(self, backup_dir: Path) -> Dict:
"""Encrypt all data files in backup directory."""
try:
from backup_crypto import get_encryption_key, encrypt_file
except ImportError:
from scripts.core.backup_crypto import get_encryption_key, encrypt_file
enc_key, salt = get_encryption_key(key_file=self.key_file, passphrase=self.passphrase)
meta = {
"algorithm": "AES-256-GCM",
"kdf": "PBKDF2-HMAC-SHA256" if salt else "raw",
"files": [],
}
# Encrypt .db, .gz, .zst files (not manifests/checksums)
skip_names = {"backup_manifest.json", "checksums.json"}
for fpath in sorted(backup_dir.iterdir()):
if fpath.is_file() and fpath.name not in skip_names:
enc_out = backup_dir / (fpath.name + ".enc")
em = encrypt_file(fpath, enc_out, enc_key, salt)
fpath.unlink()
meta["files"].append(em)
logger.info(f"Encrypted {len(meta['files'])} files (AES-256-GCM)")
return meta
# ------------------------------------------------------------------
# Verification (J.20.3.6)
# ------------------------------------------------------------------
def verify_backup(self, backup_path: Path) -> Dict:
"""Verify backup integrity via checksums.
Returns:
Dict with verification results and overall status.
"""
backup_path = Path(backup_path)
if not backup_path.exists():
raise FileNotFoundError(f"Backup not found: {backup_path}")
logger.info(f"Verifying backup: {backup_path.name}")
results = verify_checksums(backup_path)
ok_count = sum(1 for v in results.values() if v == "ok")
total = len(results)
mismatches = [k for k, v in results.items() if v == "mismatch"]
missing = [k for k, v in results.items() if v == "missing"]
extra = [k for k, v in results.items() if v == "extra"]
passed = len(mismatches) == 0 and len(missing) == 0
status = "passed" if passed else "failed"
if passed:
logger.info(f"Verification PASSED: {ok_count}/{total} files OK")
else:
logger.error(
f"Verification FAILED: {len(mismatches)} mismatches, "
f"{len(missing)} missing"
)
return {
"status": status,
"passed": passed,
"total_files": total,
"ok": ok_count,
"mismatches": mismatches,
"missing": missing,
"extra": extra,
"details": results,
}
# ------------------------------------------------------------------
# Restore
# ------------------------------------------------------------------
def restore_all(
self,
backup_path: Path,
db_names: Optional[List[str]] = None,
verify_first: bool = True,
) -> Dict:
"""Restore databases from a multi-database backup.
Args:
backup_path: Path to backup directory.
db_names: Specific databases to restore (default: all in backup).
verify_first: Verify checksums before restoring.
Returns:
Dict with restore results.
"""
backup_path = Path(backup_path)
if not backup_path.exists():
raise FileNotFoundError(f"Backup not found: {backup_path}")
# Read manifest
manifest_file = backup_path / "backup_manifest.json"
if not manifest_file.exists():
raise FileNotFoundError(f"No backup_manifest.json in {backup_path}")
with open(manifest_file) as f:
manifest = json.load(f)
logger.info(f"Restoring from: {backup_path.name}")
logger.info(f" Backup timestamp: {manifest.get('timestamp', 'unknown')}")
# Verify first
if verify_first:
try:
verification = self.verify_backup(backup_path)
if not verification["passed"]:
logger.error("Backup verification failed — aborting restore")
return {
"status": "aborted",
"reason": "verification_failed",
"verification": verification,
}
except FileNotFoundError:
logger.warning("No checksums.json — skipping verification")
# Determine which databases to restore
backed_up = set(manifest.get("backed_up", []))
if db_names is None:
db_names = list(backed_up)
else:
# Validate requested databases exist in backup
missing = set(db_names) - backed_up
if missing:
raise ValueError(
f"Databases not in backup: {', '.join(missing)}. "
f"Available: {', '.join(backed_up)}"
)
# Decrypt if encrypted
temp_files: List[Path] = []
if manifest.get("encrypted"):
try:
from backup_crypto import decrypt_file
except ImportError:
from scripts.core.backup_crypto import decrypt_file
logger.info("Decrypting backup files...")
for enc_file in sorted(backup_path.glob("*.enc")):
dec_name = enc_file.name[:-4]
dec_path = backup_path / dec_name
decrypt_file(
enc_file, dec_path,
key_file=self.key_file, passphrase=self.passphrase,
)
temp_files.append(dec_path)
logger.info("Decryption complete.")
# Decompress if compressed
if manifest.get("compressed"):
try:
from backup_compress import decompress_file
except ImportError:
from scripts.core.backup_compress import decompress_file
logger.info("Decompressing backup files...")
for comp_file in sorted(backup_path.iterdir()):
if comp_file.is_file() and (
comp_file.name.endswith(".gz") or comp_file.name.endswith(".zst")
) and not comp_file.name.endswith(".enc"):
decompress_file(comp_file)
temp_files.append(comp_file)
logger.info("Decompression complete.")
# Restore each database
db_results = []
for db_name in sorted(
db_names,
key=lambda n: DATABASE_REGISTRY.get(n, {}).get("priority", 99),
):
info = DATABASE_REGISTRY.get(db_name)
if not info:
db_results.append({"name": db_name, "status": "unknown_database"})
continue
backup_file = backup_path / info["filename"]
if not backup_file.exists():
db_results.append({"name": db_name, "status": "file_not_found"})
logger.warning(f" {db_name}: backup file not found, skipping")
continue
target_path = _resolve_db_path(db_name)
# Create safety backup of current database
if target_path.exists():
safety = target_path.with_suffix(".db.before_restore")
shutil.copy2(target_path, safety)
logger.info(f" {db_name}: safety backup → {safety.name}")
# Restore
shutil.copy2(backup_file, target_path)
# Validate restored database
try:
conn = sqlite3.connect(str(target_path))
conn.execute("SELECT 1")
tables = conn.execute(
"SELECT count(*) FROM sqlite_master WHERE type='table'"
).fetchone()[0]
conn.close()
db_results.append({
"name": db_name,
"status": "ok",
"tables": tables,
"target": str(target_path),
})
logger.info(f" {db_name}: restored ({tables} tables)")
except Exception as e:
db_results.append({
"name": db_name,
"status": "restored_but_invalid",
"error": str(e),
})
logger.error(f" {db_name}: restored but validation failed: {e}")
# Cleanup temp files
for tmp in temp_files:
if tmp.exists():
if tmp.is_file():
tmp.unlink()
elif tmp.is_dir():
shutil.rmtree(tmp)
ok_count = sum(1 for r in db_results if r["status"] == "ok")
logger.info(f"Restore complete: {ok_count}/{len(db_names)} databases restored")
return {
"status": "ok" if ok_count == len(db_names) else "partial",
"timestamp": datetime.now(timezone.utc).isoformat(),
"source": str(backup_path),
"databases": db_results,
"restored_count": ok_count,
"total_requested": len(db_names),
}
# ------------------------------------------------------------------
# Listing / info
# ------------------------------------------------------------------
def list_backups(self, limit: int = 20) -> List[Dict]:
"""List multi-database backups, newest first."""
backups = []
for bdir in sorted(self.backup_dir.glob("multidb_*"), reverse=True):
if not bdir.is_dir():
continue
manifest_file = bdir / "backup_manifest.json"
if manifest_file.exists():
with open(manifest_file) as f:
manifest = json.load(f)
manifest["path"] = str(bdir)
# Compute actual disk size
total = sum(
fp.stat().st_size for fp in bdir.rglob("*") if fp.is_file()
)
manifest["disk_size_mb"] = round(total / (1024 * 1024), 3)
backups.append(manifest)
if len(backups) >= limit:
break
return backups
def get_backup_info(self, backup_path: Path) -> Dict:
"""Get detailed info about a specific backup."""
backup_path = Path(backup_path)
manifest_file = backup_path / "backup_manifest.json"
if not manifest_file.exists():
raise FileNotFoundError(f"No manifest in {backup_path}")
with open(manifest_file) as f:
info = json.load(f)
# Add file listing
files = []
for fp in sorted(backup_path.iterdir()):
if fp.is_file():
files.append({
"name": fp.name,
"size_bytes": fp.stat().st_size,
"size_kb": round(fp.stat().st_size / 1024, 2),
})
info["files"] = files
info["path"] = str(backup_path)
return info
---------------------------------------------------------------------------
CLI
---------------------------------------------------------------------------
def main(): parser = argparse.ArgumentParser( description="CODITECT Multi-Database Backup (J.20.3)" ) sub = parser.add_subparsers(dest="command", help="Command to execute")
# backup
bp = sub.add_parser("backup", help="Backup databases")
bp.add_argument("--all", action="store_true", help="Backup all databases")
bp.add_argument(
"--databases", nargs="+",
choices=list(DATABASE_REGISTRY.keys()),
help="Specific databases to backup",
)
bp.add_argument(
"--tier",
choices=["critical", "regenerable_index", "regenerable_session", "configuration", "messaging"],
help="Backup all databases in a tier",
)
bp.add_argument("--incremental", action="store_true", help="Only backup changed databases")
bp.add_argument("--compress", action="store_true", help="Compress backup files")
bp.add_argument("--compress-algo", choices=["gzip", "zstd"], default="gzip")
bp.add_argument("--encrypt", action="store_true", help="Encrypt backup files")
bp.add_argument("--key-file", type=str, help="Encryption key file")
bp.add_argument("--passphrase", type=str, help="Encryption passphrase")
bp.add_argument("--backup-dir", type=str, help="Custom backup directory")
# verify
vp = sub.add_parser("verify", help="Verify backup integrity")
vp.add_argument("backup_path", type=str, help="Path to backup directory")
# restore
rp = sub.add_parser("restore", help="Restore from backup")
rp.add_argument("backup_path", type=str, help="Path to backup directory")
rp.add_argument(
"--databases", nargs="+",
choices=list(DATABASE_REGISTRY.keys()),
help="Specific databases to restore",
)
rp.add_argument("--no-verify", action="store_true", help="Skip checksum verification")
# list
sub.add_parser("list", help="List available backups")
# info
ip = sub.add_parser("info", help="Get backup details")
ip.add_argument("backup_path", type=str, help="Path to backup directory")
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
mdb = MultiDatabaseBackup(
backup_dir=Path(args.backup_dir) if hasattr(args, "backup_dir") and args.backup_dir else None,
encrypt=getattr(args, "encrypt", False),
key_file=Path(args.key_file) if hasattr(args, "key_file") and getattr(args, "key_file") else None,
passphrase=getattr(args, "passphrase", None),
compress=getattr(args, "compress", False),
compress_algo=getattr(args, "compress_algo", "gzip"),
)
if args.command == "backup":
if args.tier:
tier_map = {
"critical": DatabaseTier.CRITICAL,
"regenerable_index": DatabaseTier.REGENERABLE_INDEX,
"regenerable_session": DatabaseTier.REGENERABLE_SESSION,
"configuration": DatabaseTier.CONFIGURATION,
"messaging": DatabaseTier.MESSAGING,
}
result = mdb.backup_tier(tier_map[args.tier])
elif args.databases:
result = mdb.backup_all(args.databases, incremental=args.incremental)
else:
result = mdb.backup_all(incremental=args.incremental)
print(json.dumps(result, indent=2, default=str))
elif args.command == "verify":
result = mdb.verify_backup(Path(args.backup_path))
print(json.dumps(result, indent=2))
return 0 if result["passed"] else 1
elif args.command == "restore":
result = mdb.restore_all(
Path(args.backup_path),
db_names=args.databases,
verify_first=not args.no_verify,
)
print(json.dumps(result, indent=2, default=str))
return 0 if result["status"] == "ok" else 1
elif args.command == "list":
backups = mdb.list_backups()
if not backups:
print("No multi-database backups found.")
return 0
for b in backups:
dbs = ", ".join(b.get("backed_up", []))
print(
f" {b.get('backup_name', 'unknown'):40s} "
f"{b.get('total_size_mb', 0):8.3f} MB "
f"[{dbs}]"
)
elif args.command == "info":
info = mdb.get_backup_info(Path(args.backup_path))
print(json.dumps(info, indent=2, default=str))
return 0
if name == "main": sys.exit(main() or 0)