Skip to main content

#!/usr/bin/env python3 """ CODITECT Backup Encryption — AES-256-GCM at Rest

Provides encryption/decryption for backup files using AES-256-GCM with PBKDF2-HMAC-SHA256 key derivation.

File format (.enc): Magic: b"CODI" (4 bytes) Version: 1 (1 byte) Salt: 32 bytes (PBKDF2 salt) Nonce: 12 bytes (GCM nonce) Tag: 16 bytes (GCM auth tag) Body: ciphertext (variable length)

Key sources (in priority order): 1. CODITECT_BACKUP_KEY env var (hex-encoded 32-byte key) 2. Key file path (--key-file) 3. Passphrase derivation (interactive or env CODITECT_BACKUP_PASSPHRASE)

Author: AZ1.AI CODITECT Team Task: J.20.1.6 Date: 2026-02-10 """

import io import os import struct import tarfile import logging from pathlib import Path from typing import Optional, Tuple

from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives import hashes

logger = logging.getLogger(name)

Constants

MAGIC = b"CODI" FORMAT_VERSION = 1 SALT_SIZE = 32 NONCE_SIZE = 12 TAG_SIZE = 16 KEY_SIZE = 32 # 256 bits PBKDF2_ITERATIONS = 600_000 HEADER_SIZE = len(MAGIC) + 1 + SALT_SIZE + NONCE_SIZE # 49 bytes CHUNK_SIZE = 64 * 1024 * 1024 # 64MB — GCM max plaintext per operation

class BackupCryptoError(Exception): """Base exception for backup encryption errors.""" pass

class KeyNotFoundError(BackupCryptoError): """No encryption key available.""" pass

class DecryptionError(BackupCryptoError): """Decryption failed (wrong key, corrupted data, or tampered).""" pass

def derive_key(passphrase: str, salt: bytes) -> bytes: """ Derive a 256-bit key from passphrase using PBKDF2-HMAC-SHA256.

Args:
passphrase: User passphrase
salt: 32-byte random salt

Returns:
32-byte derived key
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=KEY_SIZE,
salt=salt,
iterations=PBKDF2_ITERATIONS,
)
return kdf.derive(passphrase.encode("utf-8"))

def get_encryption_key( key_file: Optional[Path] = None, passphrase: Optional[str] = None, salt: Optional[bytes] = None, ) -> Tuple[bytes, bytes]: """ Resolve encryption key from available sources.

Priority:
1. CODITECT_BACKUP_KEY env var (raw hex key, no derivation needed)
2. key_file parameter
3. passphrase parameter
4. CODITECT_BACKUP_PASSPHRASE env var

Args:
key_file: Path to file containing hex-encoded key
passphrase: Passphrase for key derivation
salt: Salt for PBKDF2 (generated if None)

Returns:
Tuple of (key, salt). Salt is empty bytes if using raw key.

Raises:
KeyNotFoundError: If no key source is available
"""
if salt is None:
salt = os.urandom(SALT_SIZE)

# 1. Environment variable (raw key)
env_key = os.environ.get("CODITECT_BACKUP_KEY")
if env_key:
try:
key = bytes.fromhex(env_key)
if len(key) != KEY_SIZE:
raise ValueError(f"Key must be {KEY_SIZE} bytes, got {len(key)}")
logger.debug("Using encryption key from CODITECT_BACKUP_KEY env var")
return key, b"" # No salt needed for raw key
except ValueError as e:
raise BackupCryptoError(f"Invalid CODITECT_BACKUP_KEY: {e}")

# 2. Key file
if key_file and key_file.exists():
try:
raw = key_file.read_text().strip()
key = bytes.fromhex(raw)
if len(key) != KEY_SIZE:
raise ValueError(f"Key must be {KEY_SIZE} bytes, got {len(key)}")
logger.debug(f"Using encryption key from {key_file}")
return key, b""
except ValueError as e:
raise BackupCryptoError(f"Invalid key file {key_file}: {e}")

# 3. Passphrase parameter
if passphrase:
logger.debug("Deriving key from provided passphrase")
return derive_key(passphrase, salt), salt

# 4. Environment passphrase
env_passphrase = os.environ.get("CODITECT_BACKUP_PASSPHRASE")
if env_passphrase:
logger.debug("Deriving key from CODITECT_BACKUP_PASSPHRASE env var")
return derive_key(env_passphrase, salt), salt

raise KeyNotFoundError(
"No encryption key available. Set CODITECT_BACKUP_KEY (hex), "
"CODITECT_BACKUP_PASSPHRASE, use --key-file, or provide --passphrase."
)

def encrypt_bytes(data: bytes, key: bytes) -> Tuple[bytes, bytes]: """ Encrypt data using AES-256-GCM.

Args:
data: Plaintext data
key: 32-byte encryption key

Returns:
Tuple of (nonce, ciphertext_with_tag)
The tag is appended to ciphertext by AESGCM.
"""
nonce = os.urandom(NONCE_SIZE)
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, data, None) # No additional authenticated data
return nonce, ciphertext

def decrypt_bytes(nonce: bytes, ciphertext: bytes, key: bytes) -> bytes: """ Decrypt data using AES-256-GCM.

Args:
nonce: 12-byte nonce used during encryption
ciphertext: Ciphertext with appended GCM tag
key: 32-byte encryption key

Returns:
Decrypted plaintext

Raises:
DecryptionError: If decryption fails
"""
aesgcm = AESGCM(key)
try:
return aesgcm.decrypt(nonce, ciphertext, None)
except Exception as e:
raise DecryptionError(f"Decryption failed (wrong key or corrupted data): {e}")

def encrypt_file( input_path: Path, output_path: Path, key: bytes, salt: bytes = b"", ) -> dict: """ Encrypt a file using AES-256-GCM with CODITECT header format.

Args:
input_path: Path to plaintext file
output_path: Path to write encrypted file (.enc)
key: 32-byte encryption key
salt: Salt used for key derivation (empty if raw key)

Returns:
Dict with encryption metadata
"""
plaintext = input_path.read_bytes()
original_size = len(plaintext)

nonce, ciphertext = encrypt_bytes(plaintext, key)

# Pad salt to SALT_SIZE if empty (raw key case)
stored_salt = salt if len(salt) == SALT_SIZE else (b"\x00" * SALT_SIZE)

# Write: MAGIC + VERSION + SALT + NONCE + CIPHERTEXT (includes tag)
with open(output_path, "wb") as f:
f.write(MAGIC)
f.write(struct.pack("B", FORMAT_VERSION))
f.write(stored_salt)
f.write(nonce)
f.write(ciphertext)

encrypted_size = output_path.stat().st_size

logger.info(
f"Encrypted {input_path.name}: {original_size:,} → {encrypted_size:,} bytes"
)

return {
"original_size": original_size,
"encrypted_size": encrypted_size,
"algorithm": "AES-256-GCM",
"kdf": "PBKDF2-HMAC-SHA256" if salt else "raw",
"iterations": PBKDF2_ITERATIONS if salt else 0,
}

def decrypt_file( input_path: Path, output_path: Path, key: Optional[bytes] = None, key_file: Optional[Path] = None, passphrase: Optional[str] = None, ) -> dict: """ Decrypt a CODITECT encrypted file.

Args:
input_path: Path to encrypted file (.enc)
output_path: Path to write decrypted file
key: 32-byte key (if already resolved)
key_file: Path to key file (alternative)
passphrase: Passphrase (alternative)

Returns:
Dict with decryption metadata

Raises:
DecryptionError: If decryption fails
"""
data = input_path.read_bytes()

if len(data) < HEADER_SIZE:
raise DecryptionError(f"File too small to be encrypted: {len(data)} bytes")

# Parse header
magic = data[:4]
if magic != MAGIC:
raise DecryptionError(f"Not a CODITECT encrypted file (magic: {magic!r})")

version = struct.unpack("B", data[4:5])[0]
if version != FORMAT_VERSION:
raise DecryptionError(f"Unsupported format version: {version}")

salt = data[5:37]
nonce = data[37:49]
ciphertext = data[49:]

# Resolve key
if key is None:
is_derived = salt != (b"\x00" * SALT_SIZE)
if is_derived:
key, _ = get_encryption_key(
key_file=key_file, passphrase=passphrase, salt=salt
)
else:
key, _ = get_encryption_key(key_file=key_file, passphrase=passphrase)

plaintext = decrypt_bytes(nonce, ciphertext, key)
output_path.write_bytes(plaintext)

logger.info(
f"Decrypted {input_path.name}: {len(ciphertext):,} → {len(plaintext):,} bytes"
)

return {
"encrypted_size": len(data),
"decrypted_size": len(plaintext),
}

def encrypt_directory( input_dir: Path, output_path: Path, key: bytes, salt: bytes = b"", ) -> dict: """ Tar a directory, then encrypt the tarball.

Args:
input_dir: Directory to encrypt
output_path: Path to write encrypted archive (.tar.enc)
key: 32-byte encryption key
salt: Salt for header

Returns:
Dict with encryption metadata
"""
# Create tar in memory
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w:gz") as tar:
tar.add(str(input_dir), arcname=input_dir.name)
tar_data = tar_buffer.getvalue()

nonce, ciphertext = encrypt_bytes(tar_data, key)

stored_salt = salt if len(salt) == SALT_SIZE else (b"\x00" * SALT_SIZE)

with open(output_path, "wb") as f:
f.write(MAGIC)
f.write(struct.pack("B", FORMAT_VERSION))
f.write(stored_salt)
f.write(nonce)
f.write(ciphertext)

encrypted_size = output_path.stat().st_size

logger.info(
f"Encrypted directory {input_dir.name}: "
f"{len(tar_data):,} (tar.gz) → {encrypted_size:,} bytes"
)

return {
"original_tar_size": len(tar_data),
"encrypted_size": encrypted_size,
"algorithm": "AES-256-GCM",
"compression": "gzip",
}

def decrypt_directory( input_path: Path, output_dir: Path, key: Optional[bytes] = None, key_file: Optional[Path] = None, passphrase: Optional[str] = None, ) -> dict: """ Decrypt an encrypted tar archive and extract.

Args:
input_path: Path to encrypted archive (.tar.enc)
output_dir: Directory to extract into
key: 32-byte key (if already resolved)
key_file: Path to key file
passphrase: Passphrase

Returns:
Dict with decryption metadata
"""
data = input_path.read_bytes()

if len(data) < HEADER_SIZE:
raise DecryptionError(f"File too small: {len(data)} bytes")

magic = data[:4]
if magic != MAGIC:
raise DecryptionError(f"Not a CODITECT encrypted file (magic: {magic!r})")

version = struct.unpack("B", data[4:5])[0]
if version != FORMAT_VERSION:
raise DecryptionError(f"Unsupported format version: {version}")

salt = data[5:37]
nonce = data[37:49]
ciphertext = data[49:]

if key is None:
is_derived = salt != (b"\x00" * SALT_SIZE)
if is_derived:
key, _ = get_encryption_key(
key_file=key_file, passphrase=passphrase, salt=salt
)
else:
key, _ = get_encryption_key(key_file=key_file, passphrase=passphrase)

tar_data = decrypt_bytes(nonce, ciphertext, key)

output_dir.mkdir(parents=True, exist_ok=True)
tar_buffer = io.BytesIO(tar_data)
with tarfile.open(fileobj=tar_buffer, mode="r:gz") as tar:
# Security: filter out absolute paths and parent traversals
safe_members = []
for member in tar.getmembers():
if member.name.startswith("/") or ".." in member.name:
logger.warning(f"Skipping unsafe tar member: {member.name}")
continue
safe_members.append(member)
tar.extractall(path=str(output_dir), members=safe_members)

logger.info(
f"Decrypted archive {input_path.name}: "
f"{len(ciphertext):,} → {len(tar_data):,} bytes (tar.gz)"
)

return {
"encrypted_size": len(data),
"tar_size": len(tar_data),
"extracted_to": str(output_dir),
}

def is_encrypted(path: Path) -> bool: """Check if a file has the CODITECT encryption header.""" try: with open(path, "rb") as f: magic = f.read(4) return magic == MAGIC except (OSError, IOError): return False