#!/usr/bin/env python3 """ CODITECT GCS Backup Client — Google Cloud Storage Integration
Provides a Python SDK-based GCS client for backup upload, download, lifecycle management, cross-region replication, and access logging.
Replaces/complements the shell-based gsutil approach in backup-context-db.sh with a proper google-cloud-storage SDK client.
J.20.2 Subtasks: J.20.2.1: GCS client configuration (auth, project, bucket) J.20.2.2: Multipart/resumable upload for large databases J.20.2.3: Bucket lifecycle policies (GFS retention) J.20.2.4: Cross-region replication support J.20.2.5: Access logging integration
Usage: from backup_gcs import GCSBackupClient
client = GCSBackupClient()
client.upload_backup(local_dir, tier="daily")
client.download_backup("latest", dest_dir)
client.apply_lifecycle_policies()
Author: AZ1.AI CODITECT Team Date: 2026-02-10 """
import json import logging import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(name)
---------------------------------------------------------------------------
Exceptions
---------------------------------------------------------------------------
class GCSError(Exception): """Base exception for GCS operations.""" pass
class GCSAuthError(GCSError): """Authentication or credential error.""" pass
class GCSBucketError(GCSError): """Bucket creation/access error.""" pass
class GCSUploadError(GCSError): """Upload failure.""" pass
class GCSDownloadError(GCSError): """Download failure.""" pass
---------------------------------------------------------------------------
J.20.2.1 — GCS Client Configuration
---------------------------------------------------------------------------
Default GFS lifecycle retention (days)
GFS_DAILY_RETENTION = 7 GFS_WEEKLY_RETENTION = 28 GFS_MONTHLY_RETENTION = 365
Default bucket location
DEFAULT_LOCATION = "us-central1"
Resumable upload threshold (bytes) — SDK default is 8MB
RESUMABLE_THRESHOLD = 8 * 1024 * 1024 # 8 MB
Multipart chunk size for large uploads
UPLOAD_CHUNK_SIZE = 32 * 1024 * 1024 # 32 MB chunks
def _detect_gcp_project() -> Optional[str]: """Detect GCP project ID from environment or gcloud CLI.""" # 1. Explicit env var project = os.environ.get("CODITECT_GCP_PROJECT") or os.environ.get("GOOGLE_CLOUD_PROJECT") if project: return project
# 2. gcloud CLI
try:
result = subprocess.run(
["gcloud", "config", "get-value", "project"],
capture_output=True, text=True, timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return None
def _get_storage_client(): """ Get an authenticated google.cloud.storage.Client.
Credential resolution order:
1. GOOGLE_APPLICATION_CREDENTIALS env var (service account JSON)
2. Application Default Credentials (gcloud auth application-default login)
3. Compute Engine / GKE metadata server
"""
try:
from google.cloud import storage
except ImportError:
raise GCSError(
"google-cloud-storage not installed. "
"Run: pip install google-cloud-storage>=2.0.0"
)
try:
project = _detect_gcp_project()
client = storage.Client(project=project)
return client
except Exception as e:
raise GCSAuthError(
f"Failed to create GCS client: {e}\n"
"Ensure you are authenticated: gcloud auth application-default login"
)
class GCSBackupClient: """ Google Cloud Storage backup client for CODITECT context databases.
Supports:
- Resumable uploads for large files (automatic >8MB)
- GFS-tiered backup organization (daily/weekly/monthly)
- Bucket lifecycle policies
- Cross-region replication
- Access logging
- Tenant/project-scoped backup paths
"""
def __init__(
self,
bucket_name: Optional[str] = None,
prefix: str = "coditect-core",
location: str = DEFAULT_LOCATION,
project_scope: Optional[str] = None,
tenant_scope: Optional[str] = None,
):
"""
Initialize GCS backup client.
Args:
bucket_name: GCS bucket name. If None, auto-generates from GCP project ID.
prefix: Object prefix within bucket (default: "coditect-core").
location: Bucket location for creation (default: us-central1).
project_scope: CODITECT project scope for backup paths (ADR-159).
tenant_scope: CODITECT tenant scope for backup paths (ADR-159).
"""
self._client = None # Lazy initialization
self._bucket = None
self.prefix = prefix
self.location = location
self.project_scope = project_scope or os.environ.get("CODITECT_PROJECT")
self.tenant_scope = tenant_scope or os.environ.get("CODITECT_TENANT")
# Resolve bucket name
if bucket_name:
self.bucket_name = bucket_name
else:
env_bucket = os.environ.get("BACKUP_BUCKET") or os.environ.get("CODITECT_BACKUP_BUCKET")
if env_bucket:
# Strip gs:// prefix if present
self.bucket_name = env_bucket.replace("gs://", "")
else:
project_id = _detect_gcp_project() or "coditect"
self.bucket_name = f"{project_id}-context-backups"
@property
def client(self):
"""Lazy-initialized GCS client."""
if self._client is None:
self._client = _get_storage_client()
return self._client
@property
def bucket(self):
"""Get or create the backup bucket."""
if self._bucket is None:
self._bucket = self.client.bucket(self.bucket_name)
if not self._bucket.exists():
raise GCSBucketError(
f"Bucket does not exist: gs://{self.bucket_name}\n"
f"Run setup_bucket() first, or create manually:\n"
f" gsutil mb -l {self.location} -c STANDARD gs://{self.bucket_name}"
)
return self._bucket
def _get_gfs_prefix(self, tier: str, timestamp: Optional[datetime] = None) -> str:
"""
Build GFS-tiered object prefix.
Args:
tier: GFS tier ("daily", "weekly", "monthly").
timestamp: Backup timestamp (default: now UTC).
Returns:
Full GCS prefix path (e.g., "coditect-core/daily/2026-02-10/14-30-00").
"""
if timestamp is None:
timestamp = datetime.now(timezone.utc)
base = self.prefix
# Tenant/project scoping (ADR-159)
if self.tenant_scope:
base = f"{base}/tenants/{self.tenant_scope}"
elif self.project_scope:
base = f"{base}/projects/{self.project_scope}"
if tier == "daily":
date_str = timestamp.strftime("%Y-%m-%d")
time_str = timestamp.strftime("%H-%M-%S")
return f"{base}/daily/{date_str}/{time_str}"
elif tier == "weekly":
week_str = timestamp.strftime("%Y-W%V")
return f"{base}/weekly/{week_str}"
elif tier == "monthly":
month_str = timestamp.strftime("%Y-%m")
return f"{base}/monthly/{month_str}"
else:
raise ValueError(f"Unknown GFS tier: {tier}. Use daily, weekly, or monthly.")
# -------------------------------------------------------------------
# J.20.2.2 — Upload / Download
# -------------------------------------------------------------------
def upload_backup(
self,
local_dir: Path,
tier: str = "daily",
timestamp: Optional[datetime] = None,
promote_weekly: bool = False,
promote_monthly: bool = False,
chunk_size: int = UPLOAD_CHUNK_SIZE,
) -> Dict[str, Any]:
"""
Upload a local backup directory to GCS with GFS tiering.
Uses resumable uploads for files >8MB (SDK default).
Custom chunk_size controls memory usage for very large files.
Args:
local_dir: Local backup directory to upload.
tier: GFS tier ("daily", "weekly", "monthly").
timestamp: Backup timestamp (default: now UTC).
promote_weekly: Also copy to weekly tier.
promote_monthly: Also copy to monthly tier.
chunk_size: Upload chunk size in bytes (default: 32MB).
Returns:
Upload result metadata dict.
Raises:
GCSUploadError: If upload fails.
"""
local_dir = Path(local_dir)
if not local_dir.is_dir():
raise GCSUploadError(f"Not a directory: {local_dir}")
if timestamp is None:
timestamp = datetime.now(timezone.utc)
gcs_prefix = self._get_gfs_prefix(tier, timestamp)
uploaded_files = []
total_bytes = 0
try:
# Upload all files in the backup directory (recursively)
for local_file in sorted(local_dir.rglob("*")):
if not local_file.is_file():
continue
# Compute relative path for GCS object name
rel_path = local_file.relative_to(local_dir)
blob_name = f"{gcs_prefix}/{rel_path}"
blob = self.bucket.blob(blob_name, chunk_size=chunk_size)
file_size = local_file.stat().st_size
logger.info(f"Uploading: {rel_path} ({file_size / 1024:.1f} KB)")
blob.upload_from_filename(str(local_file))
uploaded_files.append({
"name": str(rel_path),
"blob": blob_name,
"size": file_size,
})
total_bytes += file_size
# Update LATEST pointer
latest_blob = self.bucket.blob(f"{self.prefix}/LATEST")
latest_content = f"{tier}/{timestamp.strftime('%Y-%m-%d')}/{timestamp.strftime('%H-%M-%S')}"
latest_blob.upload_from_string(latest_content)
logger.info(f"Updated LATEST pointer: {latest_content}")
# GFS promotion: copy daily -> weekly and/or monthly
promotion_results = []
if promote_weekly:
weekly_prefix = self._get_gfs_prefix("weekly", timestamp)
self._copy_blobs(gcs_prefix, weekly_prefix)
promotion_results.append("weekly")
logger.info(f"Promoted to weekly tier: {weekly_prefix}")
if promote_monthly:
monthly_prefix = self._get_gfs_prefix("monthly", timestamp)
self._copy_blobs(gcs_prefix, monthly_prefix)
promotion_results.append("monthly")
logger.info(f"Promoted to monthly tier: {monthly_prefix}")
result = {
"success": True,
"bucket": self.bucket_name,
"prefix": gcs_prefix,
"tier": tier,
"timestamp": timestamp.isoformat(),
"files": len(uploaded_files),
"total_bytes": total_bytes,
"total_mb": round(total_bytes / (1024 * 1024), 2),
"promoted_to": promotion_results,
"uploaded_files": uploaded_files,
}
logger.info(
f"Upload complete: {len(uploaded_files)} files, "
f"{total_bytes / (1024 * 1024):.2f} MB to gs://{self.bucket_name}/{gcs_prefix}"
)
return result
except Exception as e:
raise GCSUploadError(f"Upload failed: {e}")
def _copy_blobs(self, source_prefix: str, dest_prefix: str) -> None:
"""Copy all blobs from one prefix to another (GFS promotion)."""
for blob in self.bucket.list_blobs(prefix=source_prefix):
new_name = blob.name.replace(source_prefix, dest_prefix, 1)
self.bucket.copy_blob(blob, self.bucket, new_name)
def download_backup(
self,
target: str,
dest_dir: Path,
) -> Dict[str, Any]:
"""
Download a backup from GCS.
Args:
target: "latest" or a GFS path (e.g., "daily/2026-02-10").
dest_dir: Local directory to download into.
Returns:
Download result metadata dict.
Raises:
GCSDownloadError: If download fails.
"""
dest_dir = Path(dest_dir)
dest_dir.mkdir(parents=True, exist_ok=True)
try:
# Resolve "latest"
if target == "latest":
latest_blob = self.bucket.blob(f"{self.prefix}/LATEST")
if not latest_blob.exists():
raise GCSDownloadError("No LATEST pointer found in bucket")
latest_content = latest_blob.download_as_text().strip()
gcs_prefix = f"{self.prefix}/{latest_content}"
logger.info(f"Resolved 'latest' to: {gcs_prefix}")
else:
gcs_prefix = f"{self.prefix}/{target}"
# List and download all blobs under prefix
blobs = list(self.bucket.list_blobs(prefix=gcs_prefix))
if not blobs:
raise GCSDownloadError(f"No objects found at: gs://{self.bucket_name}/{gcs_prefix}")
downloaded_files = []
total_bytes = 0
for blob in blobs:
# Strip prefix to get relative path
rel_path = blob.name[len(gcs_prefix):].lstrip("/")
if not rel_path:
continue
local_path = dest_dir / rel_path
local_path.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Downloading: {rel_path}")
blob.download_to_filename(str(local_path))
file_size = local_path.stat().st_size
downloaded_files.append({
"name": rel_path,
"size": file_size,
})
total_bytes += file_size
result = {
"success": True,
"bucket": self.bucket_name,
"prefix": gcs_prefix,
"dest_dir": str(dest_dir),
"files": len(downloaded_files),
"total_bytes": total_bytes,
"total_mb": round(total_bytes / (1024 * 1024), 2),
"downloaded_files": downloaded_files,
}
logger.info(
f"Download complete: {len(downloaded_files)} files, "
f"{total_bytes / (1024 * 1024):.2f} MB"
)
return result
except GCSDownloadError:
raise
except Exception as e:
raise GCSDownloadError(f"Download failed: {e}")
def list_backups(self, tier: Optional[str] = None, limit: int = 20) -> List[Dict[str, Any]]:
"""
List available backups in the bucket.
Args:
tier: Filter by GFS tier ("daily", "weekly", "monthly"). None = all.
limit: Maximum number of results.
Returns:
List of backup info dicts.
"""
results = []
base = self.prefix
if self.tenant_scope:
base = f"{base}/tenants/{self.tenant_scope}"
elif self.project_scope:
base = f"{base}/projects/{self.project_scope}"
tiers = [tier] if tier else ["daily", "weekly", "monthly"]
for t in tiers:
prefix = f"{base}/{t}/"
# List top-level "directories" under tier
seen_dates = set()
for blob in self.bucket.list_blobs(prefix=prefix):
# Extract date portion from path
parts = blob.name[len(prefix):].split("/")
if parts:
date_key = parts[0]
if date_key not in seen_dates:
seen_dates.add(date_key)
results.append({
"tier": t,
"date": date_key,
"prefix": f"{prefix}{date_key}",
})
if len(results) >= limit:
break
# Sort by date descending, limit
results.sort(key=lambda x: x["date"], reverse=True)
return results[:limit]
def get_latest_info(self) -> Optional[Dict[str, str]]:
"""Get the LATEST pointer info."""
latest_blob = self.bucket.blob(f"{self.prefix}/LATEST")
if latest_blob.exists():
content = latest_blob.download_as_text().strip()
return {"pointer": content, "bucket": self.bucket_name}
return None
# -------------------------------------------------------------------
# J.20.2.3 — Bucket Lifecycle Policies (GFS Retention)
# -------------------------------------------------------------------
def apply_lifecycle_policies(
self,
daily_days: int = GFS_DAILY_RETENTION,
weekly_days: int = GFS_WEEKLY_RETENTION,
monthly_days: int = GFS_MONTHLY_RETENTION,
) -> Dict[str, Any]:
"""
Apply GFS lifecycle retention policies to the bucket.
Grandfather-Father-Son (GFS) retention:
- Daily (sons): deleted after daily_days (default: 7)
- Weekly (fathers): deleted after weekly_days (default: 28)
- Monthly (grandfathers): deleted after monthly_days (default: 365)
Args:
daily_days: Daily backup retention in days.
weekly_days: Weekly backup retention in days.
monthly_days: Monthly backup retention in days.
Returns:
Applied policy metadata.
"""
from google.cloud.storage import lifecycle as lc
bucket = self.bucket
bucket.lifecycle_rules = [] # Clear existing rules
# Daily tier: delete after N days
bucket.add_lifecycle_delete_rule(
age=daily_days,
matches_prefix=[f"{self.prefix}/daily/"],
)
# Weekly tier: delete after N days
bucket.add_lifecycle_delete_rule(
age=weekly_days,
matches_prefix=[f"{self.prefix}/weekly/"],
)
# Monthly tier: delete after N days
bucket.add_lifecycle_delete_rule(
age=monthly_days,
matches_prefix=[f"{self.prefix}/monthly/"],
)
bucket.patch()
result = {
"bucket": self.bucket_name,
"policies": {
"daily": {"retention_days": daily_days, "prefix": f"{self.prefix}/daily/"},
"weekly": {"retention_days": weekly_days, "prefix": f"{self.prefix}/weekly/"},
"monthly": {"retention_days": monthly_days, "prefix": f"{self.prefix}/monthly/"},
},
"total_rules": len(list(bucket.lifecycle_rules)),
}
logger.info(
f"Applied GFS lifecycle: daily={daily_days}d, "
f"weekly={weekly_days}d, monthly={monthly_days}d"
)
return result
def get_lifecycle_policies(self) -> List[Dict[str, Any]]:
"""Get current bucket lifecycle policies."""
bucket = self.bucket
bucket.reload()
rules = []
for rule in bucket.lifecycle_rules:
rules.append(dict(rule))
return rules
# -------------------------------------------------------------------
# J.20.2.4 — Cross-Region Replication Support
# -------------------------------------------------------------------
def setup_bucket(
self,
location: Optional[str] = None,
storage_class: str = "STANDARD",
dual_region: Optional[Tuple[str, str]] = None,
versioning: bool = False,
) -> Dict[str, Any]:
"""
Create and configure the backup bucket.
Args:
location: Bucket location (default: us-central1).
For cross-region, use multi-region like "US" or "EU".
storage_class: Storage class (STANDARD, NEARLINE, COLDLINE, ARCHIVE).
dual_region: Tuple of two regions for dual-region bucket
(e.g., ("us-central1", "us-east1")). Requires location="US".
versioning: Enable object versioning for extra protection.
Returns:
Bucket configuration metadata.
"""
from google.cloud import storage as gcs_module
loc = location or self.location
bucket_obj = self.client.bucket(self.bucket_name)
if bucket_obj.exists():
logger.info(f"Bucket already exists: gs://{self.bucket_name}")
bucket_obj.reload()
else:
logger.info(f"Creating bucket: gs://{self.bucket_name}")
bucket_obj.storage_class = storage_class
if dual_region:
# Dual-region requires a multi-region parent (US, EU, ASIA)
bucket_obj.custom_placement_config = {
"data_locations": list(dual_region),
}
# Dual-region buckets must use a multi-region location
if loc not in ("US", "EU", "ASIA"):
loc = "US"
logger.info(f"Dual-region: {dual_region[0]} + {dual_region[1]}")
bucket_obj = self.client.create_bucket(
bucket_obj,
location=loc,
)
logger.info(f"Bucket created: gs://{self.bucket_name} in {loc}")
# Enable versioning if requested
if versioning and not bucket_obj.versioning_enabled:
bucket_obj.versioning_enabled = True
bucket_obj.patch()
logger.info("Object versioning enabled")
# Cache the bucket
self._bucket = bucket_obj
result = {
"bucket": self.bucket_name,
"location": bucket_obj.location,
"location_type": bucket_obj.location_type,
"storage_class": bucket_obj.storage_class,
"versioning": bucket_obj.versioning_enabled,
"created": True,
}
if dual_region:
result["dual_region"] = list(dual_region)
return result
# -------------------------------------------------------------------
# J.20.2.5 — Access Logging Integration
# -------------------------------------------------------------------
def enable_access_logging(
self,
log_bucket: Optional[str] = None,
log_prefix: Optional[str] = None,
) -> Dict[str, Any]:
"""
Enable GCS access logging for the backup bucket.
Access logs record all read/write operations on the bucket,
useful for audit trails and compliance.
Args:
log_bucket: Bucket to store access logs. If None, uses the same bucket.
log_prefix: Prefix for log objects (default: "access-logs/").
Returns:
Logging configuration metadata.
"""
bucket = self.bucket
target_bucket = log_bucket or self.bucket_name
target_prefix = log_prefix or "access-logs/"
bucket.enable_logging(target_bucket, object_name_prefix=target_prefix)
bucket.patch()
result = {
"bucket": self.bucket_name,
"logging_enabled": True,
"log_bucket": target_bucket,
"log_prefix": target_prefix,
}
logger.info(
f"Access logging enabled: gs://{target_bucket}/{target_prefix}"
)
return result
def disable_access_logging(self) -> Dict[str, Any]:
"""Disable GCS access logging."""
bucket = self.bucket
bucket.disable_logging()
bucket.patch()
logger.info("Access logging disabled")
return {"bucket": self.bucket_name, "logging_enabled": False}
def get_logging_config(self) -> Optional[Dict[str, str]]:
"""Get current access logging configuration."""
bucket = self.bucket
bucket.reload()
if bucket.get_logging():
return bucket.get_logging()
return None
# -------------------------------------------------------------------
# Utility methods
# -------------------------------------------------------------------
def get_bucket_info(self) -> Dict[str, Any]:
"""Get comprehensive bucket information."""
bucket = self.bucket
bucket.reload()
info = {
"name": bucket.name,
"location": bucket.location,
"location_type": bucket.location_type,
"storage_class": bucket.storage_class,
"versioning": bucket.versioning_enabled,
"lifecycle_rules": len(list(bucket.lifecycle_rules)),
"logging": bucket.get_logging(),
"created": bucket.time_created.isoformat() if bucket.time_created else None,
}
return info
def compute_gfs_tier(self, timestamp: Optional[datetime] = None) -> Dict[str, Any]:
"""
Determine which GFS tiers a backup should be promoted to.
Rules:
- Every backup goes to daily tier
- Sunday (day_of_week=7) also promotes to weekly
- Last day of month also promotes to monthly
Args:
timestamp: The backup timestamp (default: now UTC).
Returns:
Dict with tier info and promotion flags.
"""
if timestamp is None:
timestamp = datetime.now(timezone.utc)
day_of_week = timestamp.isoweekday() # 1=Mon, 7=Sun
# Check if last day of month
import calendar
_, last_day = calendar.monthrange(timestamp.year, timestamp.month)
is_last_day = timestamp.day == last_day
return {
"timestamp": timestamp.isoformat(),
"tier": "daily",
"promote_weekly": day_of_week == 7,
"promote_monthly": is_last_day,
"day_of_week": day_of_week,
"day_of_month": timestamp.day,
"last_day_of_month": last_day,
}
---------------------------------------------------------------------------
CLI interface
---------------------------------------------------------------------------
def main(): """CLI interface for GCS backup operations.""" import argparse
parser = argparse.ArgumentParser(
description="CODITECT GCS Backup Client"
)
parser.add_argument(
"command",
choices=["upload", "download", "list", "setup", "lifecycle", "info", "logging"],
help="Command to execute",
)
parser.add_argument(
"--local-dir",
type=str,
help="Local directory for upload/download",
)
parser.add_argument(
"--target",
type=str,
default="latest",
help="Backup target for download (default: latest)",
)
parser.add_argument(
"--tier",
type=str,
choices=["daily", "weekly", "monthly"],
default=None,
help="GFS tier filter",
)
parser.add_argument(
"--bucket",
type=str,
default=None,
help="Override bucket name",
)
parser.add_argument(
"--location",
type=str,
default=DEFAULT_LOCATION,
help=f"Bucket location (default: {DEFAULT_LOCATION})",
)
parser.add_argument(
"--dual-region",
type=str,
nargs=2,
default=None,
metavar=("REGION1", "REGION2"),
help="Dual-region bucket (e.g., us-central1 us-east1)",
)
parser.add_argument(
"--versioning",
action="store_true",
help="Enable object versioning on bucket",
)
parser.add_argument(
"--project-scope",
type=str,
default=None,
help="CODITECT project scope (ADR-159)",
)
parser.add_argument(
"--tenant-scope",
type=str,
default=None,
help="CODITECT tenant scope (ADR-159)",
)
parser.add_argument(
"--limit",
type=int,
default=20,
help="Limit list results (default: 20)",
)
parser.add_argument(
"--log-bucket",
type=str,
default=None,
help="Bucket for access logs (default: same bucket)",
)
parser.add_argument(
"--enable",
action="store_true",
help="Enable logging (with logging command)",
)
parser.add_argument(
"--disable",
action="store_true",
help="Disable logging (with logging command)",
)
parser.add_argument(
"--json",
action="store_true",
dest="output_json",
help="Output as JSON",
)
args = parser.parse_args()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
client = GCSBackupClient(
bucket_name=args.bucket,
project_scope=args.project_scope,
tenant_scope=args.tenant_scope,
)
try:
if args.command == "upload":
if not args.local_dir:
parser.error("--local-dir required for upload")
tier_info = client.compute_gfs_tier()
result = client.upload_backup(
Path(args.local_dir),
tier="daily",
promote_weekly=tier_info["promote_weekly"],
promote_monthly=tier_info["promote_monthly"],
)
elif args.command == "download":
if not args.local_dir:
parser.error("--local-dir required for download")
result = client.download_backup(args.target, Path(args.local_dir))
elif args.command == "list":
result = client.list_backups(tier=args.tier, limit=args.limit)
elif args.command == "setup":
dual_region = tuple(args.dual_region) if args.dual_region else None
result = client.setup_bucket(
location=args.location,
dual_region=dual_region,
versioning=args.versioning,
)
elif args.command == "lifecycle":
result = client.apply_lifecycle_policies()
elif args.command == "info":
result = client.get_bucket_info()
elif args.command == "logging":
if args.disable:
result = client.disable_access_logging()
elif args.enable:
result = client.enable_access_logging(log_bucket=args.log_bucket)
else:
result = client.get_logging_config()
if result is None:
result = {"logging_enabled": False}
if args.output_json:
print(json.dumps(result, indent=2, default=str))
else:
print(json.dumps(result, indent=2, default=str))
except GCSError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if name == "main": main()