Skip to main content

#!/usr/bin/env python3 """ CODITECT License Validator Module

Validates CODITECT licenses with:

  • Ed25519 signature verification
  • Machine binding validation
  • Expiration and grace period handling
  • Offline tolerance support
  • Server validation with caching

Usage: from scripts.core.license_validator import LicenseValidator

validator = LicenseValidator()
result = validator.validate()

if result.is_valid:
print(f"License valid: {result.tier}")
else:
print(f"License invalid: {result.error}")

ADR: ADR-067 Time-Controlled Licensing System """

import json import os import sys import hashlib import urllib.request import urllib.error from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum from pathlib import Path from typing import Optional, Dict, Any, List

============================================================================

Constants

============================================================================

ADR-114: Framework install vs user data

Licensing config is framework data, machine-id is user data

CODITECT_DIR = Path.home() / ".coditect" USER_DATA_DIR = Path.home() / "PROJECTS" / ".coditect-data" LICENSING_DIR = CODITECT_DIR / "licensing" CONFIG_DIR = CODITECT_DIR / "config"

LICENSE_FILE = LICENSING_DIR / "license.json" VALIDATION_CACHE_FILE = LICENSING_DIR / "validation-cache.json" OFFLINE_LOG_FILE = LICENSING_DIR / "offline-log.json"

ADR-114: machine-id.json is in user data (portable identity)

MACHINE_ID_FILE = USER_DATA_DIR / "machine-id.json" if (USER_DATA_DIR / "machine-id.json").exists() else CODITECT_DIR / "machine-id.json" LICENSING_CONFIG_FILE = CONFIG_DIR / "licensing.json"

============================================================================

Enums and Data Classes

============================================================================

class LicenseState(Enum): """License validity states.""" ACTIVE = "active" # License is valid and active WARNING = "warning" # License expiring soon (7 days) GRACE = "grace" # License expired, in grace period EXPIRED = "expired" # License expired, grace exhausted INVALID = "invalid" # License signature invalid UNBOUND = "unbound" # License not bound to this machine REVOKED = "revoked" # License has been revoked MISSING = "missing" # No license file found OFFLINE_EXCEEDED = "offline_exceeded" # Offline tolerance exceeded

class LicenseTier(Enum): """License tier types.""" PILOT = "pilot" PRO = "pro" TEAM = "team" ENTERPRISE = "enterprise" FREE = "free"

@dataclass class LicenseValidationResult: """Result of license validation.""" is_valid: bool state: LicenseState tier: Optional[LicenseTier] = None license_id: Optional[str] = None licensee_email: Optional[str] = None organization: Optional[str] = None expires_at: Optional[datetime] = None days_remaining: Optional[int] = None grace_days_remaining: Optional[int] = None offline_days_remaining: Optional[int] = None features: Dict[str, Any] = field(default_factory=dict) error: Optional[str] = None warning: Optional[str] = None degraded_mode: bool = False allowed_agents: List[str] = field(default_factory=list) allowed_commands: List[str] = field(default_factory=list)

============================================================================

License Validator Class

============================================================================

class LicenseValidator: """ CODITECT License Validator.

Validates licenses with signature verification, machine binding,
expiration handling, and offline support.
"""

def __init__(self, config_path: Optional[Path] = None):
"""Initialize the license validator."""
self.config_path = config_path or LICENSING_CONFIG_FILE
self.config = self._load_config()
self.machine_id = self._load_machine_id()

def _load_config(self) -> Dict[str, Any]:
"""Load licensing configuration."""
if self.config_path.exists():
try:
return json.loads(self.config_path.read_text())
except Exception:
pass
return self._default_config()

def _default_config(self) -> Dict[str, Any]:
"""Return default configuration."""
return {
"validation": {
"check_interval_hours": 24,
"cache_ttl_hours": 24
},
"offline": {
"enabled": True,
"default_max_days": 7
},
"degraded_mode": {
"allowed_agents": ["general-purpose"],
"allowed_commands": ["/help", "/orient", "/license-status", "/license-activate"]
}
}

def _load_machine_id(self) -> Optional[Dict[str, Any]]:
"""Load machine identification."""
if MACHINE_ID_FILE.exists():
try:
return json.loads(MACHINE_ID_FILE.read_text())
except Exception:
pass
return None

def validate(self, force_server_check: bool = False) -> LicenseValidationResult:
"""
Validate the current license.

Args:
force_server_check: Bypass cache and check server

Returns:
LicenseValidationResult with validation details
"""
# Check if license file exists
if not LICENSE_FILE.exists():
return self._no_license_result()

# Load license data
try:
license_data = json.loads(LICENSE_FILE.read_text())
except Exception as e:
return LicenseValidationResult(
is_valid=False,
state=LicenseState.INVALID,
error=f"Failed to parse license file: {e}"
)

# Verify signature
if not self._verify_signature(license_data):
return LicenseValidationResult(
is_valid=False,
state=LicenseState.INVALID,
error="License signature verification failed"
)

# Check machine binding
if not self._check_machine_binding(license_data):
return LicenseValidationResult(
is_valid=False,
state=LicenseState.UNBOUND,
error="License not bound to this machine"
)

# Check revocation (from cache)
if self._is_revoked(license_data.get("license_id")):
return LicenseValidationResult(
is_valid=False,
state=LicenseState.REVOKED,
error="License has been revoked"
)

# Check expiration and determine state
result = self._check_expiration(license_data)

# Check offline tolerance
if result.is_valid or result.state == LicenseState.GRACE:
offline_result = self._check_offline_tolerance(license_data)
if offline_result:
return offline_result
# Merge offline days remaining into result
if self._get_offline_days_remaining() is not None:
result.offline_days_remaining = self._get_offline_days_remaining()

# Try server validation if online and needed
if force_server_check or self._should_check_server():
server_result = self._server_validate(license_data)
if server_result:
result = server_result

return result

def _no_license_result(self) -> LicenseValidationResult:
"""Return result for missing license."""
degraded_config = self.config.get("degraded_mode", {})
return LicenseValidationResult(
is_valid=False,
state=LicenseState.MISSING,
error="No license file found. Run /license-activate to activate.",
degraded_mode=True,
allowed_agents=degraded_config.get("allowed_agents", ["general-purpose"]),
allowed_commands=degraded_config.get("allowed_commands", ["/help", "/license-activate"])
)

def _verify_signature(self, license_data: Dict[str, Any]) -> bool:
"""
Verify Ed25519 signature of license data.

Note: Requires PyNaCl library. Falls back to accepting unverified
licenses in development mode (no public key configured).
"""
signature_data = license_data.get("signature", {})
if not signature_data.get("value"):
# No signature present - check if in development mode
return self._is_development_mode()

public_key = self.config.get("public_key", {}).get("value")
if not public_key:
# No public key configured - development mode
return self._is_development_mode()

try:
# Import nacl only when needed
from nacl.signing import VerifyKey
from nacl.encoding import Base64Encoder

# Reconstruct the signed payload (everything except signature)
payload = {k: v for k, v in license_data.items() if k != "signature"}
payload_bytes = json.dumps(payload, sort_keys=True).encode()

# Verify signature
verify_key = VerifyKey(public_key.encode(), encoder=Base64Encoder)
signature = signature_data["value"]
verify_key.verify(payload_bytes, Base64Encoder.decode(signature))
return True
except ImportError:
# PyNaCl not installed - accept in development mode
return self._is_development_mode()
except Exception:
return False

def _is_development_mode(self) -> bool:
"""Check if running in development mode."""
# Development mode if no public key configured
return not self.config.get("public_key", {}).get("value")

def _check_machine_binding(self, license_data: Dict[str, Any]) -> bool:
"""Check if license is bound to this machine."""
binding = license_data.get("binding", {})
if not binding.get("machine_uuid"):
# No binding required
return True

if not self.machine_id:
# Can't verify without machine ID
return False

# Check UUID match
license_uuid = binding.get("machine_uuid")
local_uuid = self.machine_id.get("machine_uuid")

if license_uuid == local_uuid:
return True

# Check hardware hash as fallback
license_hash = binding.get("hardware_hash")
local_hash = self.machine_id.get("hardware_hash")

if license_hash and local_hash and license_hash == local_hash:
return True

return False

def _is_revoked(self, license_id: Optional[str]) -> bool:
"""
Check if license is in revocation list (CRL).

Per ADR-067, the CRL uses SHA256-hashed license keys for privacy.
Checks local cache first, then fetches from server if stale.
"""
if not license_id:
return False

try:
# Hash the license_id for privacy-preserving lookup
license_hash = hashlib.sha256(license_id.encode()).hexdigest()

# Check local CRL cache
if VALIDATION_CACHE_FILE.exists():
cache = json.loads(VALIDATION_CACHE_FILE.read_text())
revocation_list = cache.get("revocation_list", [])

# Direct match (for legacy compatibility)
if license_id in revocation_list:
return True

# Hashed match (ADR-067 standard)
if license_hash in revocation_list:
return True

# Check CRL staleness - fetch update if older than 24 hours
crl_updated = cache.get("crl_last_updated")
if crl_updated:
try:
last_update = datetime.fromisoformat(crl_updated)
hours_old = (datetime.now(last_update.tzinfo) - last_update).total_seconds() / 3600
if hours_old > 24:
self._update_crl_from_server(cache)
except Exception:
pass

return False
except Exception:
return False

def _update_crl_from_server(self, cache: Dict[str, Any]) -> None:
"""
Fetch incremental CRL updates from server.

Uses the /api/v1/licenses/revocations/ endpoint with ?since= parameter
for incremental updates.
"""
try:
server_url = self.config.get("server", {}).get("url", "https://api.coditect.ai")
crl_endpoint = f"{server_url}/api/v1/licenses/revocations/"

# Incremental update using since parameter
last_update = cache.get("crl_last_updated")
if last_update:
crl_endpoint += f"?since={last_update}"

request = urllib.request.Request(crl_endpoint, method="GET")
request.add_header("Content-Type", "application/json")

with urllib.request.urlopen(request, timeout=5) as response:
result = json.loads(response.read())

if result.get("revocations"):
# Merge new revocations into cache
existing = set(cache.get("revocation_list", []))
for revocation in result["revocations"]:
if isinstance(revocation, dict):
existing.add(revocation.get("license_key_hash", ""))
else:
existing.add(revocation)

cache["revocation_list"] = list(existing)
cache["crl_last_updated"] = datetime.now().isoformat()
VALIDATION_CACHE_FILE.write_text(json.dumps(cache, indent=2))

except Exception:
# Silently fail - network may be unavailable
pass

def _check_expiration(self, license_data: Dict[str, Any]) -> LicenseValidationResult:
"""Check license expiration and determine state."""
validity = license_data.get("validity", {})
expires_str = validity.get("expires_at")
grace_days = validity.get("grace_period_days", 7)

if not expires_str:
# No expiration - perpetual license
return self._build_valid_result(license_data, LicenseState.ACTIVE)

try:
expires_at = datetime.fromisoformat(expires_str.replace("Z", "+00:00"))
now = datetime.now(expires_at.tzinfo) if expires_at.tzinfo else datetime.now()
except Exception:
return LicenseValidationResult(
is_valid=False,
state=LicenseState.INVALID,
error="Invalid expiration date format"
)

days_remaining = (expires_at - now).days

# ACTIVE: More than 7 days remaining
if days_remaining > 7:
return self._build_valid_result(
license_data,
LicenseState.ACTIVE,
days_remaining=days_remaining,
expires_at=expires_at
)

# WARNING: 0-7 days remaining
if days_remaining >= 0:
result = self._build_valid_result(
license_data,
LicenseState.WARNING,
days_remaining=days_remaining,
expires_at=expires_at
)
result.warning = f"License expires in {days_remaining} day{'s' if days_remaining != 1 else ''}"
return result

# Past expiration - check grace period
days_expired = abs(days_remaining)
grace_remaining = grace_days - days_expired

# GRACE: Within grace period
if grace_remaining > 0:
result = self._build_valid_result(
license_data,
LicenseState.GRACE,
days_remaining=0,
grace_days_remaining=grace_remaining,
expires_at=expires_at
)
result.warning = f"License expired. Grace period ends in {grace_remaining} day{'s' if grace_remaining != 1 else ''}"
result.degraded_mode = True
degraded_config = self.config.get("degraded_mode", {})
result.allowed_agents = degraded_config.get("allowed_agents", ["general-purpose"])
result.allowed_commands = degraded_config.get("allowed_commands", [])
return result

# EXPIRED: Grace period exhausted
degraded_config = self.config.get("degraded_mode", {})
return LicenseValidationResult(
is_valid=False,
state=LicenseState.EXPIRED,
tier=self._get_tier(license_data),
license_id=license_data.get("license_id"),
licensee_email=license_data.get("licensee", {}).get("email"),
organization=license_data.get("licensee", {}).get("organization"),
expires_at=expires_at,
days_remaining=days_remaining,
error="License expired. Please renew via /license-renew",
degraded_mode=True,
allowed_agents=degraded_config.get("allowed_agents", ["general-purpose"]),
allowed_commands=degraded_config.get("allowed_commands", [])
)

def _build_valid_result(
self,
license_data: Dict[str, Any],
state: LicenseState,
days_remaining: Optional[int] = None,
grace_days_remaining: Optional[int] = None,
expires_at: Optional[datetime] = None
) -> LicenseValidationResult:
"""Build a valid license result."""
return LicenseValidationResult(
is_valid=True,
state=state,
tier=self._get_tier(license_data),
license_id=license_data.get("license_id"),
licensee_email=license_data.get("licensee", {}).get("email"),
organization=license_data.get("licensee", {}).get("organization"),
expires_at=expires_at,
days_remaining=days_remaining,
grace_days_remaining=grace_days_remaining,
features=license_data.get("features", {})
)

def _get_tier(self, license_data: Dict[str, Any]) -> LicenseTier:
"""Get license tier from data."""
tier_str = license_data.get("tier", "free")
try:
return LicenseTier(tier_str.lower())
except ValueError:
return LicenseTier.FREE

def _check_offline_tolerance(self, license_data: Dict[str, Any]) -> Optional[LicenseValidationResult]:
"""Check if offline tolerance is exceeded."""
offline_config = license_data.get("offline", {})
max_offline_days = offline_config.get("max_offline_days", 7)
last_server_check = offline_config.get("last_server_check")

if not last_server_check:
return None

try:
last_check = datetime.fromisoformat(last_server_check.replace("Z", "+00:00"))
now = datetime.now(last_check.tzinfo) if last_check.tzinfo else datetime.now()
days_offline = (now - last_check).days
days_remaining = max(0, max_offline_days - days_offline)

if days_offline > max_offline_days:
degraded_config = self.config.get("degraded_mode", {})
return LicenseValidationResult(
is_valid=False,
state=LicenseState.OFFLINE_EXCEEDED,
tier=self._get_tier(license_data),
license_id=license_data.get("license_id"),
offline_days_remaining=0,
error=f"Offline tolerance exceeded ({days_offline} days). Connect to validate.",
degraded_mode=True,
allowed_agents=degraded_config.get("allowed_agents", ["general-purpose"]),
allowed_commands=degraded_config.get("allowed_commands", [])
)

# Store days remaining for caller to access
self._offline_days_remaining = days_remaining
return None
except Exception:
return None

def _get_offline_days_remaining(self) -> Optional[int]:
"""Get offline days remaining from last check."""
return getattr(self, '_offline_days_remaining', None)

def _should_check_server(self) -> bool:
"""Determine if server check is needed."""
if not VALIDATION_CACHE_FILE.exists():
return True

try:
cache = json.loads(VALIDATION_CACHE_FILE.read_text())
last_validation = cache.get("last_validation")
if not last_validation:
return True

last_check = datetime.fromisoformat(last_validation)
cache_ttl = self.config.get("validation", {}).get("cache_ttl_hours", 24)

return (datetime.now() - last_check).total_seconds() > (cache_ttl * 3600)
except Exception:
return True

def _server_validate(self, license_data: Dict[str, Any]) -> Optional[LicenseValidationResult]:
"""Validate license with server."""
api_url = self.config.get("license_server", {}).get("api_url")
if not api_url:
return None

validate_endpoint = self.config.get("license_server", {}).get("validate_endpoint", "/validate")
timeout = self.config.get("license_server", {}).get("timeout_seconds", 10)

try:
url = f"{api_url}{validate_endpoint}"
payload = {
"license_id": license_data.get("license_id"),
"machine_uuid": self.machine_id.get("machine_uuid") if self.machine_id else None
}

req = urllib.request.Request(
url,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)

with urllib.request.urlopen(req, timeout=timeout) as response:
result = json.loads(response.read().decode())

# Update cache
self._update_validation_cache(result)

# Update last server check in license file
self._update_last_server_check()

if result.get("valid"):
return None # Use local validation
else:
# Server says invalid
return LicenseValidationResult(
is_valid=False,
state=LicenseState.INVALID,
error=result.get("error", "Server validation failed")
)
except urllib.error.URLError:
# Network error - continue with cached validation
self._log_offline_period()
return None
except Exception:
return None

def _update_validation_cache(self, server_response: Dict[str, Any]) -> None:
"""Update validation cache with server response."""
try:
LICENSING_DIR.mkdir(parents=True, exist_ok=True)
cache = {
"last_validation": datetime.now().isoformat(),
"validation_result": server_response.get("valid"),
"server_response": server_response,
"revocation_list": server_response.get("revocations", [])
}
VALIDATION_CACHE_FILE.write_text(json.dumps(cache, indent=2))
except Exception:
pass

def _update_last_server_check(self) -> None:
"""Update last_server_check in license file."""
try:
if LICENSE_FILE.exists():
license_data = json.loads(LICENSE_FILE.read_text())
if "offline" not in license_data:
license_data["offline"] = {}
license_data["offline"]["last_server_check"] = datetime.now().isoformat()
LICENSE_FILE.write_text(json.dumps(license_data, indent=2))
except Exception:
pass

def _log_offline_period(self) -> None:
"""Log offline period for tracking."""
try:
if OFFLINE_LOG_FILE.exists():
log = json.loads(OFFLINE_LOG_FILE.read_text())
else:
log = {
"is_offline": False,
"offline_periods": [],
"total_offline_days": 0,
"current_period_start": None
}

if not log.get("is_offline"):
log["is_offline"] = True
log["current_period_start"] = datetime.now().isoformat()

log["last_online_timestamp"] = datetime.now().isoformat()
OFFLINE_LOG_FILE.write_text(json.dumps(log, indent=2))
except Exception:
pass

def get_status(self) -> Dict[str, Any]:
"""Get human-readable license status."""
result = self.validate()

status = {
"valid": result.is_valid,
"state": result.state.value,
"tier": result.tier.value if result.tier else None,
"license_id": result.license_id,
"email": result.licensee_email,
"organization": result.organization,
"degraded_mode": result.degraded_mode
}

if result.expires_at:
status["expires_at"] = result.expires_at.isoformat()
if result.days_remaining is not None:
status["days_remaining"] = result.days_remaining
if result.grace_days_remaining is not None:
status["grace_days_remaining"] = result.grace_days_remaining
if result.offline_days_remaining is not None:
status["offline_days_remaining"] = result.offline_days_remaining
if result.error:
status["error"] = result.error
if result.warning:
status["warning"] = result.warning
if result.features:
status["features"] = result.features

return status

============================================================================

CLI Interface

============================================================================

def main(): """Command-line interface for license validation.""" import argparse

parser = argparse.ArgumentParser(description="CODITECT License Validator")
parser.add_argument("--json", "-j", action="store_true", help="Output JSON format")
parser.add_argument("--force", "-f", action="store_true", help="Force server check")
parser.add_argument("--status", "-s", action="store_true", help="Show license status")
parser.add_argument("--quiet", "-q", action="store_true", help="Only output errors")

args = parser.parse_args()

validator = LicenseValidator()

if args.status:
status = validator.get_status()
if args.json:
print(json.dumps(status, indent=2))
else:
print_status(status, quiet=args.quiet)
return

result = validator.validate(force_server_check=args.force)

if args.json:
print(json.dumps(validator.get_status(), indent=2))
else:
print_result(result, quiet=args.quiet)

# Exit codes
if result.is_valid:
sys.exit(0)
elif result.state in (LicenseState.WARNING, LicenseState.GRACE):
sys.exit(0) # Still usable
else:
sys.exit(1)

def print_status(status: Dict[str, Any], quiet: bool = False) -> None: """Print human-readable status.""" if quiet and status.get("valid"): return

state = status.get("state", "unknown")
tier = status.get("tier", "unknown")

# State indicators
state_icons = {
"active": "\033[92m✓\033[0m", # Green check
"warning": "\033[93m⚠\033[0m", # Yellow warning
"grace": "\033[93m⏳\033[0m", # Yellow hourglass
"expired": "\033[91m✗\033[0m", # Red X
"invalid": "\033[91m✗\033[0m",
"missing": "\033[91m✗\033[0m",
"revoked": "\033[91m✗\033[0m",
"offline_exceeded": "\033[93m📵\033[0m"
}

icon = state_icons.get(state, "?")
print(f"\nCODITECT License Status: {icon} {state.upper()}")

if tier and tier != "unknown":
print(f" Tier: {tier.title()}")

if status.get("organization"):
print(f" Organization: {status['organization']}")

if status.get("email"):
print(f" Licensee: {status['email']}")

if status.get("days_remaining") is not None:
days = status["days_remaining"]
if days > 0:
print(f" Expires in: {days} day{'s' if days != 1 else ''}")
elif days == 0:
print(f" Expires: Today")

if status.get("grace_days_remaining") is not None:
print(f" Grace period remaining: {status['grace_days_remaining']} days")

if status.get("warning"):
print(f"\n \033[93mWarning:\033[0m {status['warning']}")

if status.get("error"):
print(f"\n \033[91mError:\033[0m {status['error']}")

if status.get("degraded_mode"):
print(f"\n \033[93mDegraded Mode:\033[0m Some features restricted")

print()

def print_result(result: LicenseValidationResult, quiet: bool = False) -> None: """Print validation result.""" status = { "valid": result.is_valid, "state": result.state.value, "tier": result.tier.value if result.tier else None, "error": result.error, "warning": result.warning } print_status(status, quiet)

if name == "main": main()