Skip to main content

#!/usr/bin/env python3 """ CODITECT Backup Compression — gzip and zstd

Provides compression/decompression for backup files using gzip (stdlib) or zstd (optional, via python-zstandard).

Supported algorithms: - gzip: Standard library, level 1-9 (default 6), good compatibility - zstd: Via python-zstandard, level 1-22 (default 3), ~2-5x faster, better ratio

File naming: - .gz for gzip-compressed files - .zst for zstd-compressed files

Compose with encryption (J.20.1.6): Pipeline: plaintext → compress → encrypt Result: file.db → file.db.gz → file.db.gz.enc Restore: decrypt → decompress → plaintext

Author: AZ1.AI CODITECT Team Task: J.20.1.7 Date: 2026-02-10 """

import gzip import io import logging import os import shutil import tarfile from pathlib import Path from typing import Optional, Tuple

logger = logging.getLogger(name)

Chunk size for streaming compression

CHUNK_SIZE = 1024 * 1024 # 1MB

Default compression levels

GZIP_DEFAULT_LEVEL = 6 # gzip range: 1-9 ZSTD_DEFAULT_LEVEL = 3 # zstd range: 1-22

class CompressionError(Exception): """Base exception for compression errors.""" pass

class DecompressionError(CompressionError): """Decompression failed.""" pass

class UnsupportedAlgorithmError(CompressionError): """Requested compression algorithm not available.""" pass

def _get_zstd(): """Import zstandard, raising UnsupportedAlgorithmError if unavailable.""" try: import zstandard return zstandard except ImportError: raise UnsupportedAlgorithmError( "zstd compression requires python-zstandard. " "Install with: pip install zstandard" )

def detect_algorithm(path: Path) -> Optional[str]: """ Detect compression algorithm from file extension.

Returns:
"gzip", "zstd", or None if not compressed
"""
name = path.name
if name.endswith(".gz"):
return "gzip"
elif name.endswith(".zst"):
return "zstd"
return None

def compressed_suffix(algorithm: str) -> str: """Return file suffix for the given algorithm.""" if algorithm == "gzip": return ".gz" elif algorithm == "zstd": return ".zst" raise UnsupportedAlgorithmError(f"Unknown algorithm: {algorithm}")

def compress_file( input_path: Path, output_path: Optional[Path] = None, algorithm: str = "gzip", level: Optional[int] = None, ) -> dict: """ Compress a file.

Args:
input_path: Path to uncompressed file
output_path: Path to write compressed file (auto-generated if None)
algorithm: "gzip" or "zstd"
level: Compression level (algorithm-specific)

Returns:
Dict with compression metadata
"""
if output_path is None:
output_path = input_path.parent / (input_path.name + compressed_suffix(algorithm))

original_size = input_path.stat().st_size
data = input_path.read_bytes()

if algorithm == "gzip":
comp_level = level if level is not None else GZIP_DEFAULT_LEVEL
compressed = gzip.compress(data, compresslevel=comp_level)
elif algorithm == "zstd":
zstd = _get_zstd()
comp_level = level if level is not None else ZSTD_DEFAULT_LEVEL
cctx = zstd.ZstdCompressor(level=comp_level)
compressed = cctx.compress(data)
else:
raise UnsupportedAlgorithmError(f"Unknown algorithm: {algorithm}")

output_path.write_bytes(compressed)
compressed_size = len(compressed)

ratio = original_size / compressed_size if compressed_size > 0 else 0
logger.info(
f"Compressed {input_path.name}: {original_size:,} → {compressed_size:,} bytes "
f"({ratio:.1f}x, {algorithm} level {level or 'default'})"
)

return {
"original_size": original_size,
"compressed_size": compressed_size,
"ratio": round(ratio, 2),
"algorithm": algorithm,
"level": comp_level if algorithm == "gzip" else (level if level is not None else ZSTD_DEFAULT_LEVEL),
}

def decompress_file( input_path: Path, output_path: Optional[Path] = None, algorithm: Optional[str] = None, ) -> dict: """ Decompress a file.

Args:
input_path: Path to compressed file
output_path: Path to write decompressed file (auto-strips suffix if None)
algorithm: "gzip" or "zstd" (auto-detected from extension if None)

Returns:
Dict with decompression metadata
"""
if algorithm is None:
algorithm = detect_algorithm(input_path)
if algorithm is None:
raise DecompressionError(
f"Cannot detect compression from extension: {input_path.name}. "
"Specify algorithm explicitly."
)

if output_path is None:
suffix = compressed_suffix(algorithm)
if input_path.name.endswith(suffix):
output_path = input_path.parent / input_path.name[:-len(suffix)]
else:
output_path = input_path.parent / (input_path.name + ".decompressed")

compressed_size = input_path.stat().st_size
data = input_path.read_bytes()

try:
if algorithm == "gzip":
decompressed = gzip.decompress(data)
elif algorithm == "zstd":
zstd = _get_zstd()
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data)
else:
raise UnsupportedAlgorithmError(f"Unknown algorithm: {algorithm}")
except Exception as e:
if isinstance(e, (UnsupportedAlgorithmError, DecompressionError)):
raise
raise DecompressionError(f"Decompression failed ({algorithm}): {e}")

output_path.write_bytes(decompressed)
original_size = len(decompressed)

logger.info(
f"Decompressed {input_path.name}: {compressed_size:,} → {original_size:,} bytes"
)

return {
"compressed_size": compressed_size,
"decompressed_size": original_size,
"algorithm": algorithm,
}

def compress_directory( input_dir: Path, output_path: Optional[Path] = None, algorithm: str = "gzip", level: Optional[int] = None, ) -> dict: """ Tar and compress a directory.

Args:
input_dir: Directory to compress
output_path: Output path (default: input_dir.tar.gz or .tar.zst)
algorithm: "gzip" or "zstd"
level: Compression level

Returns:
Dict with compression metadata
"""
if output_path is None:
ext = ".tar.gz" if algorithm == "gzip" else ".tar.zst"
output_path = input_dir.parent / (input_dir.name + ext)

# Create tar in memory (uncompressed)
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
tar.add(str(input_dir), arcname=input_dir.name)
tar_data = tar_buffer.getvalue()
tar_size = len(tar_data)

# Compress the tarball
if algorithm == "gzip":
comp_level = level if level is not None else GZIP_DEFAULT_LEVEL
compressed = gzip.compress(tar_data, compresslevel=comp_level)
elif algorithm == "zstd":
zstd = _get_zstd()
comp_level = level if level is not None else ZSTD_DEFAULT_LEVEL
cctx = zstd.ZstdCompressor(level=comp_level)
compressed = cctx.compress(tar_data)
else:
raise UnsupportedAlgorithmError(f"Unknown algorithm: {algorithm}")

output_path.write_bytes(compressed)
compressed_size = len(compressed)

ratio = tar_size / compressed_size if compressed_size > 0 else 0
logger.info(
f"Compressed directory {input_dir.name}: "
f"{tar_size:,} (tar) → {compressed_size:,} bytes ({ratio:.1f}x)"
)

return {
"tar_size": tar_size,
"compressed_size": compressed_size,
"ratio": round(ratio, 2),
"algorithm": algorithm,
"level": comp_level if algorithm == "gzip" else (level if level is not None else ZSTD_DEFAULT_LEVEL),
}

def decompress_directory( input_path: Path, output_dir: Path, algorithm: Optional[str] = None, ) -> dict: """ Decompress and extract a tar archive.

Args:
input_path: Compressed tar archive (.tar.gz or .tar.zst)
output_dir: Directory to extract into
algorithm: "gzip" or "zstd" (auto-detected if None)

Returns:
Dict with decompression metadata
"""
if algorithm is None:
name = input_path.name
if name.endswith(".tar.gz"):
algorithm = "gzip"
elif name.endswith(".tar.zst"):
algorithm = "zstd"
else:
raise DecompressionError(
f"Cannot detect compression from: {name}. "
"Expected .tar.gz or .tar.zst"
)

compressed_size = input_path.stat().st_size
data = input_path.read_bytes()

# Decompress
try:
if algorithm == "gzip":
tar_data = gzip.decompress(data)
elif algorithm == "zstd":
zstd = _get_zstd()
dctx = zstd.ZstdDecompressor()
tar_data = dctx.decompress(data)
else:
raise UnsupportedAlgorithmError(f"Unknown algorithm: {algorithm}")
except Exception as e:
if isinstance(e, (UnsupportedAlgorithmError, DecompressionError)):
raise
raise DecompressionError(f"Decompression failed ({algorithm}): {e}")

# Extract tar
output_dir.mkdir(parents=True, exist_ok=True)
tar_buffer = io.BytesIO(tar_data)
with tarfile.open(fileobj=tar_buffer, mode="r") as tar:
safe_members = []
for member in tar.getmembers():
if member.name.startswith("/") or ".." in member.name:
logger.warning(f"Skipping unsafe tar member: {member.name}")
continue
safe_members.append(member)
tar.extractall(path=str(output_dir), members=safe_members)

logger.info(
f"Decompressed archive {input_path.name}: "
f"{compressed_size:,} → {len(tar_data):,} bytes (tar)"
)

return {
"compressed_size": compressed_size,
"tar_size": len(tar_data),
"extracted_to": str(output_dir),
"algorithm": algorithm,
}