#!/usr/bin/env python3 """ CODITECT Backup Encryption — AES-256-GCM at Rest
Provides encryption/decryption for backup files using AES-256-GCM with PBKDF2-HMAC-SHA256 key derivation.
File format (.enc): Magic: b"CODI" (4 bytes) Version: 1 (1 byte) Salt: 32 bytes (PBKDF2 salt) Nonce: 12 bytes (GCM nonce) Tag: 16 bytes (GCM auth tag) Body: ciphertext (variable length)
Key sources (in priority order): 1. CODITECT_BACKUP_KEY env var (hex-encoded 32-byte key) 2. Key file path (--key-file) 3. Passphrase derivation (interactive or env CODITECT_BACKUP_PASSPHRASE)
Author: AZ1.AI CODITECT Team Task: J.20.1.6 Date: 2026-02-10 """
import io import os import struct import tarfile import logging from pathlib import Path from typing import Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives import hashes
logger = logging.getLogger(name)
Constants
MAGIC = b"CODI" FORMAT_VERSION = 1 SALT_SIZE = 32 NONCE_SIZE = 12 TAG_SIZE = 16 KEY_SIZE = 32 # 256 bits PBKDF2_ITERATIONS = 600_000 HEADER_SIZE = len(MAGIC) + 1 + SALT_SIZE + NONCE_SIZE # 49 bytes CHUNK_SIZE = 64 * 1024 * 1024 # 64MB — GCM max plaintext per operation
class BackupCryptoError(Exception): """Base exception for backup encryption errors.""" pass
class KeyNotFoundError(BackupCryptoError): """No encryption key available.""" pass
class DecryptionError(BackupCryptoError): """Decryption failed (wrong key, corrupted data, or tampered).""" pass
def derive_key(passphrase: str, salt: bytes) -> bytes: """ Derive a 256-bit key from passphrase using PBKDF2-HMAC-SHA256.
Args:
passphrase: User passphrase
salt: 32-byte random salt
Returns:
32-byte derived key
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=KEY_SIZE,
salt=salt,
iterations=PBKDF2_ITERATIONS,
)
return kdf.derive(passphrase.encode("utf-8"))
def get_encryption_key( key_file: Optional[Path] = None, passphrase: Optional[str] = None, salt: Optional[bytes] = None, ) -> Tuple[bytes, bytes]: """ Resolve encryption key from available sources.
Priority:
1. CODITECT_BACKUP_KEY env var (raw hex key, no derivation needed)
2. key_file parameter
3. passphrase parameter
4. CODITECT_BACKUP_PASSPHRASE env var
Args:
key_file: Path to file containing hex-encoded key
passphrase: Passphrase for key derivation
salt: Salt for PBKDF2 (generated if None)
Returns:
Tuple of (key, salt). Salt is empty bytes if using raw key.
Raises:
KeyNotFoundError: If no key source is available
"""
if salt is None:
salt = os.urandom(SALT_SIZE)
# 1. Environment variable (raw key)
env_key = os.environ.get("CODITECT_BACKUP_KEY")
if env_key:
try:
key = bytes.fromhex(env_key)
if len(key) != KEY_SIZE:
raise ValueError(f"Key must be {KEY_SIZE} bytes, got {len(key)}")
logger.debug("Using encryption key from CODITECT_BACKUP_KEY env var")
return key, b"" # No salt needed for raw key
except ValueError as e:
raise BackupCryptoError(f"Invalid CODITECT_BACKUP_KEY: {e}")
# 2. Key file
if key_file and key_file.exists():
try:
raw = key_file.read_text().strip()
key = bytes.fromhex(raw)
if len(key) != KEY_SIZE:
raise ValueError(f"Key must be {KEY_SIZE} bytes, got {len(key)}")
logger.debug(f"Using encryption key from {key_file}")
return key, b""
except ValueError as e:
raise BackupCryptoError(f"Invalid key file {key_file}: {e}")
# 3. Passphrase parameter
if passphrase:
logger.debug("Deriving key from provided passphrase")
return derive_key(passphrase, salt), salt
# 4. Environment passphrase
env_passphrase = os.environ.get("CODITECT_BACKUP_PASSPHRASE")
if env_passphrase:
logger.debug("Deriving key from CODITECT_BACKUP_PASSPHRASE env var")
return derive_key(env_passphrase, salt), salt
raise KeyNotFoundError(
"No encryption key available. Set CODITECT_BACKUP_KEY (hex), "
"CODITECT_BACKUP_PASSPHRASE, use --key-file, or provide --passphrase."
)
def encrypt_bytes(data: bytes, key: bytes) -> Tuple[bytes, bytes]: """ Encrypt data using AES-256-GCM.
Args:
data: Plaintext data
key: 32-byte encryption key
Returns:
Tuple of (nonce, ciphertext_with_tag)
The tag is appended to ciphertext by AESGCM.
"""
nonce = os.urandom(NONCE_SIZE)
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, data, None) # No additional authenticated data
return nonce, ciphertext
def decrypt_bytes(nonce: bytes, ciphertext: bytes, key: bytes) -> bytes: """ Decrypt data using AES-256-GCM.
Args:
nonce: 12-byte nonce used during encryption
ciphertext: Ciphertext with appended GCM tag
key: 32-byte encryption key
Returns:
Decrypted plaintext
Raises:
DecryptionError: If decryption fails
"""
aesgcm = AESGCM(key)
try:
return aesgcm.decrypt(nonce, ciphertext, None)
except Exception as e:
raise DecryptionError(f"Decryption failed (wrong key or corrupted data): {e}")
def encrypt_file( input_path: Path, output_path: Path, key: bytes, salt: bytes = b"", ) -> dict: """ Encrypt a file using AES-256-GCM with CODITECT header format.
Args:
input_path: Path to plaintext file
output_path: Path to write encrypted file (.enc)
key: 32-byte encryption key
salt: Salt used for key derivation (empty if raw key)
Returns:
Dict with encryption metadata
"""
plaintext = input_path.read_bytes()
original_size = len(plaintext)
nonce, ciphertext = encrypt_bytes(plaintext, key)
# Pad salt to SALT_SIZE if empty (raw key case)
stored_salt = salt if len(salt) == SALT_SIZE else (b"\x00" * SALT_SIZE)
# Write: MAGIC + VERSION + SALT + NONCE + CIPHERTEXT (includes tag)
with open(output_path, "wb") as f:
f.write(MAGIC)
f.write(struct.pack("B", FORMAT_VERSION))
f.write(stored_salt)
f.write(nonce)
f.write(ciphertext)
encrypted_size = output_path.stat().st_size
logger.info(
f"Encrypted {input_path.name}: {original_size:,} → {encrypted_size:,} bytes"
)
return {
"original_size": original_size,
"encrypted_size": encrypted_size,
"algorithm": "AES-256-GCM",
"kdf": "PBKDF2-HMAC-SHA256" if salt else "raw",
"iterations": PBKDF2_ITERATIONS if salt else 0,
}
def decrypt_file( input_path: Path, output_path: Path, key: Optional[bytes] = None, key_file: Optional[Path] = None, passphrase: Optional[str] = None, ) -> dict: """ Decrypt a CODITECT encrypted file.
Args:
input_path: Path to encrypted file (.enc)
output_path: Path to write decrypted file
key: 32-byte key (if already resolved)
key_file: Path to key file (alternative)
passphrase: Passphrase (alternative)
Returns:
Dict with decryption metadata
Raises:
DecryptionError: If decryption fails
"""
data = input_path.read_bytes()
if len(data) < HEADER_SIZE:
raise DecryptionError(f"File too small to be encrypted: {len(data)} bytes")
# Parse header
magic = data[:4]
if magic != MAGIC:
raise DecryptionError(f"Not a CODITECT encrypted file (magic: {magic!r})")
version = struct.unpack("B", data[4:5])[0]
if version != FORMAT_VERSION:
raise DecryptionError(f"Unsupported format version: {version}")
salt = data[5:37]
nonce = data[37:49]
ciphertext = data[49:]
# Resolve key
if key is None:
is_derived = salt != (b"\x00" * SALT_SIZE)
if is_derived:
key, _ = get_encryption_key(
key_file=key_file, passphrase=passphrase, salt=salt
)
else:
key, _ = get_encryption_key(key_file=key_file, passphrase=passphrase)
plaintext = decrypt_bytes(nonce, ciphertext, key)
output_path.write_bytes(plaintext)
logger.info(
f"Decrypted {input_path.name}: {len(ciphertext):,} → {len(plaintext):,} bytes"
)
return {
"encrypted_size": len(data),
"decrypted_size": len(plaintext),
}
def encrypt_directory( input_dir: Path, output_path: Path, key: bytes, salt: bytes = b"", ) -> dict: """ Tar a directory, then encrypt the tarball.
Args:
input_dir: Directory to encrypt
output_path: Path to write encrypted archive (.tar.enc)
key: 32-byte encryption key
salt: Salt for header
Returns:
Dict with encryption metadata
"""
# Create tar in memory
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w:gz") as tar:
tar.add(str(input_dir), arcname=input_dir.name)
tar_data = tar_buffer.getvalue()
nonce, ciphertext = encrypt_bytes(tar_data, key)
stored_salt = salt if len(salt) == SALT_SIZE else (b"\x00" * SALT_SIZE)
with open(output_path, "wb") as f:
f.write(MAGIC)
f.write(struct.pack("B", FORMAT_VERSION))
f.write(stored_salt)
f.write(nonce)
f.write(ciphertext)
encrypted_size = output_path.stat().st_size
logger.info(
f"Encrypted directory {input_dir.name}: "
f"{len(tar_data):,} (tar.gz) → {encrypted_size:,} bytes"
)
return {
"original_tar_size": len(tar_data),
"encrypted_size": encrypted_size,
"algorithm": "AES-256-GCM",
"compression": "gzip",
}
def decrypt_directory( input_path: Path, output_dir: Path, key: Optional[bytes] = None, key_file: Optional[Path] = None, passphrase: Optional[str] = None, ) -> dict: """ Decrypt an encrypted tar archive and extract.
Args:
input_path: Path to encrypted archive (.tar.enc)
output_dir: Directory to extract into
key: 32-byte key (if already resolved)
key_file: Path to key file
passphrase: Passphrase
Returns:
Dict with decryption metadata
"""
data = input_path.read_bytes()
if len(data) < HEADER_SIZE:
raise DecryptionError(f"File too small: {len(data)} bytes")
magic = data[:4]
if magic != MAGIC:
raise DecryptionError(f"Not a CODITECT encrypted file (magic: {magic!r})")
version = struct.unpack("B", data[4:5])[0]
if version != FORMAT_VERSION:
raise DecryptionError(f"Unsupported format version: {version}")
salt = data[5:37]
nonce = data[37:49]
ciphertext = data[49:]
if key is None:
is_derived = salt != (b"\x00" * SALT_SIZE)
if is_derived:
key, _ = get_encryption_key(
key_file=key_file, passphrase=passphrase, salt=salt
)
else:
key, _ = get_encryption_key(key_file=key_file, passphrase=passphrase)
tar_data = decrypt_bytes(nonce, ciphertext, key)
output_dir.mkdir(parents=True, exist_ok=True)
tar_buffer = io.BytesIO(tar_data)
with tarfile.open(fileobj=tar_buffer, mode="r:gz") as tar:
# Security: filter out absolute paths and parent traversals
safe_members = []
for member in tar.getmembers():
if member.name.startswith("/") or ".." in member.name:
logger.warning(f"Skipping unsafe tar member: {member.name}")
continue
safe_members.append(member)
tar.extractall(path=str(output_dir), members=safe_members)
logger.info(
f"Decrypted archive {input_path.name}: "
f"{len(ciphertext):,} → {len(tar_data):,} bytes (tar.gz)"
)
return {
"encrypted_size": len(data),
"tar_size": len(tar_data),
"extracted_to": str(output_dir),
}
def is_encrypted(path: Path) -> bool: """Check if a file has the CODITECT encryption header.""" try: with open(path, "rb") as f: magic = f.read(4) return magic == MAGIC except (OSError, IOError): return False