#!/usr/bin/env python3 """ CODITECT Zero Trust File Integrity Registry (ADR-182)
Content-addressable file integrity verification for Zero Trust architecture. SHA-256 hashes all framework files, stores in org.db with immutable audit trail.
Usage: python3 scripts/file_integrity.py --scan # Hash all files, update registry python3 scripts/file_integrity.py --verify # Check files against registry python3 scripts/file_integrity.py --diff # Show changes since last scan python3 scripts/file_integrity.py --export # Export manifest as JSON python3 scripts/file_integrity.py --stats # Show registry statistics python3 scripts/file_integrity.py --baseline # First-time full scan
Version: 1.0.0 ADR: ADR-182 (Zero Trust File Integrity Registry) Track: D/M (Security) Created: 2026-02-12 """
import argparse import hashlib import json import os import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple
version = "1.0.0"
============================================================================
File Type Classification
============================================================================
FILE_TYPE_PATTERNS = { "agent": ("agents/", ".md"), "command": ("commands/", ".md"), "hook": ("hooks/", ".py"), "config": ("config/", ".json"), # ADR-213: ADRs, tracks, and standards migrated to coditect-documentation "adr": ("adrs/", ".md"), "track": ("project/plans/tracks/TRACK-", ".md"), "standard": ("standards/", ".md"), }
SKILL_PATTERN = "SKILL.md" CLAUDE_MD_PATTERN = "CLAUDE.md"
Directories to scan (relative to coditect-core root)
ADR-213: documentary content (ADRs, tracks, standards) moved to coditect-documentation
SCAN_DIRS = [ "agents", "commands", "skills", "scripts", "hooks", "config", "docs", ]
File extensions to include
SCAN_EXTENSIONS = {".md", ".py", ".json", ".yaml", ".yml", ".sh", ".toml", ".cfg"}
Directories to exclude
EXCLUDE_DIRS = { "pycache", ".git", ".venv", "venv", "node_modules", ".mypy_cache", ".pytest_cache", "dist", "build", ".egg-info", "cusf-archive", "exports-archive", }
Files to exclude
EXCLUDE_FILES = { ".DS_Store", "Thumbs.db", ".gitkeep", }
def classify_file_type(relative_path: str) -> str: """Classify a file into a type based on its path.""" if relative_path.endswith(CLAUDE_MD_PATTERN): return "claude-md" if relative_path.endswith(SKILL_PATTERN): return "skill" for file_type, (prefix, suffix) in FILE_TYPE_PATTERNS.items(): if relative_path.startswith(prefix) and relative_path.endswith(suffix): return file_type if relative_path.startswith("scripts/") and relative_path.endswith(".py"): return "script" if relative_path.startswith("docs/") and relative_path.endswith(".md"): return "documentation" return "other"
============================================================================
Hashing (reuses J.15.3 pattern)
============================================================================
def compute_file_hash(file_path: Path) -> str: """Compute SHA-256 hash of file content. Chunked for memory efficiency.""" sha256 = hashlib.sha256() try: with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): sha256.update(chunk) return sha256.hexdigest() except (IOError, OSError) as e: print(f" Warning: Could not hash {file_path}: {e}", file=sys.stderr) return ""
============================================================================
File Discovery
============================================================================
def discover_files(root: Path) -> List[Tuple[Path, str, str]]: """ Discover all framework files to track. Returns: [(absolute_path, relative_path, file_type), ...] """ files = [] for scan_dir in SCAN_DIRS: dir_path = root / scan_dir if not dir_path.is_dir(): continue for file_path in dir_path.rglob("*"): if not file_path.is_file(): continue if file_path.suffix not in SCAN_EXTENSIONS: continue if file_path.name in EXCLUDE_FILES: continue if any(excl in file_path.parts for excl in EXCLUDE_DIRS): continue relative = str(file_path.relative_to(root)) file_type = classify_file_type(relative) files.append((file_path, relative, file_type))
# Also find CLAUDE.md files at any level
for claude_md in root.rglob("CLAUDE.md"):
if not claude_md.is_file():
continue
if any(excl in claude_md.parts for excl in EXCLUDE_DIRS):
continue
relative = str(claude_md.relative_to(root))
if not any(f[1] == relative for f in files):
files.append((claude_md, relative, "claude-md"))
return sorted(files, key=lambda x: x[1])
============================================================================
Database Operations (org.db — Tier 1 Irreplaceable)
============================================================================
def get_org_db_path() -> str: """Get path to org.db.""" return os.path.expanduser( "~/PROJECTS/.coditect-data/context-storage/org.db" )
def ensure_tables(db_path: str) -> None: """Create integrity registry tables if they don't exist.""" conn = sqlite3.connect(db_path) conn.execute("PRAGMA journal_mode=WAL") conn.executescript(""" CREATE TABLE IF NOT EXISTS file_integrity_registry ( file_path TEXT PRIMARY KEY, file_name TEXT NOT NULL, content_hash TEXT NOT NULL, file_size INTEGER NOT NULL, file_mtime REAL NOT NULL, file_type TEXT NOT NULL, first_seen_at TEXT NOT NULL, last_verified_at TEXT NOT NULL, last_changed_at TEXT NOT NULL, verified_by TEXT NOT NULL DEFAULT 'system' );
CREATE INDEX IF NOT EXISTS idx_fir_type
ON file_integrity_registry(file_type);
CREATE INDEX IF NOT EXISTS idx_fir_hash
ON file_integrity_registry(content_hash);
CREATE INDEX IF NOT EXISTS idx_fir_changed
ON file_integrity_registry(last_changed_at);
CREATE TABLE IF NOT EXISTS file_integrity_audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_path TEXT NOT NULL,
action TEXT NOT NULL,
old_hash TEXT,
new_hash TEXT,
file_size INTEGER,
recorded_at TEXT NOT NULL,
recorded_by TEXT NOT NULL,
details TEXT
);
CREATE INDEX IF NOT EXISTS idx_fia_path
ON file_integrity_audit(file_path);
CREATE INDEX IF NOT EXISTS idx_fia_action
ON file_integrity_audit(action);
CREATE INDEX IF NOT EXISTS idx_fia_recorded
ON file_integrity_audit(recorded_at);
""")
conn.commit()
conn.close()
def load_registry(db_path: str) -> Dict[str, dict]: """Load current registry state into memory.""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT * FROM file_integrity_registry" ).fetchall() conn.close() return {row["file_path"]: dict(row) for row in rows}
def audit_log( conn: sqlite3.Connection, file_path: str, action: str, old_hash: Optional[str], new_hash: Optional[str], file_size: Optional[int], recorded_by: str, details: Optional[dict] = None, ) -> None: """Append an immutable entry to the audit log.""" now = datetime.now(timezone.utc).isoformat() conn.execute( """INSERT INTO file_integrity_audit (file_path, action, old_hash, new_hash, file_size, recorded_at, recorded_by, details) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( file_path, action, old_hash, new_hash, file_size, now, recorded_by, json.dumps(details) if details else None, ), )
============================================================================
Core Operations
============================================================================
def scan(root: Path, db_path: str, recorded_by: str = "system") -> dict: """ Scan all framework files, update registry, log changes. Returns summary statistics. """ ensure_tables(db_path) registry = load_registry(db_path) files = discover_files(root) now = datetime.now(timezone.utc).isoformat()
stats = {
"scanned": 0,
"created": 0,
"modified": 0,
"unchanged": 0,
"deleted": 0,
"errors": 0,
}
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
seen_paths = set()
for abs_path, rel_path, file_type in files:
stats["scanned"] += 1
seen_paths.add(rel_path)
content_hash = compute_file_hash(abs_path)
if not content_hash:
stats["errors"] += 1
continue
try:
st = abs_path.stat()
except OSError:
stats["errors"] += 1
continue
file_size = st.st_size
file_mtime = st.st_mtime
if rel_path in registry:
existing = registry[rel_path]
if existing["content_hash"] == content_hash:
# Unchanged — update last_verified_at
stats["unchanged"] += 1
conn.execute(
"""UPDATE file_integrity_registry
SET last_verified_at = ?, verified_by = ?,
file_mtime = ?
WHERE file_path = ?""",
(now, recorded_by, file_mtime, rel_path),
)
audit_log(conn, rel_path, "verified", content_hash, content_hash,
file_size, recorded_by)
else:
# Modified — update hash, log change
stats["modified"] += 1
old_hash = existing["content_hash"]
conn.execute(
"""UPDATE file_integrity_registry
SET content_hash = ?, file_size = ?, file_mtime = ?,
file_name = ?, file_type = ?,
last_verified_at = ?, last_changed_at = ?,
verified_by = ?
WHERE file_path = ?""",
(content_hash, file_size, file_mtime,
abs_path.name, file_type,
now, now, recorded_by, rel_path),
)
audit_log(conn, rel_path, "modified", old_hash, content_hash,
file_size, recorded_by,
{"old_size": existing["file_size"], "new_size": file_size})
print(f" MODIFIED: {rel_path}")
else:
# New file — insert
stats["created"] += 1
conn.execute(
"""INSERT INTO file_integrity_registry
(file_path, file_name, content_hash, file_size, file_mtime,
file_type, first_seen_at, last_verified_at, last_changed_at,
verified_by)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(rel_path, abs_path.name, content_hash, file_size, file_mtime,
file_type, now, now, now, recorded_by),
)
audit_log(conn, rel_path, "created", None, content_hash,
file_size, recorded_by)
# Detect deleted files
for reg_path in registry:
if reg_path not in seen_paths:
stats["deleted"] += 1
old = registry[reg_path]
conn.execute(
"DELETE FROM file_integrity_registry WHERE file_path = ?",
(reg_path,),
)
audit_log(conn, reg_path, "deleted", old["content_hash"], None,
None, recorded_by)
print(f" DELETED: {reg_path}")
conn.commit()
conn.close()
return stats
def verify(root: Path, db_path: str) -> dict: """ Verify current files against registry. Reports drift without updating. Returns verification report. """ ensure_tables(db_path) registry = load_registry(db_path) files = discover_files(root)
report = {
"verified": 0,
"modified": [],
"missing": [],
"new": [],
"total_scanned": 0,
}
seen_paths = set()
for abs_path, rel_path, file_type in files:
report["total_scanned"] += 1
seen_paths.add(rel_path)
content_hash = compute_file_hash(abs_path)
if not content_hash:
continue
if rel_path in registry:
if registry[rel_path]["content_hash"] == content_hash:
report["verified"] += 1
else:
report["modified"].append({
"path": rel_path,
"expected_hash": registry[rel_path]["content_hash"][:16] + "...",
"actual_hash": content_hash[:16] + "...",
"last_verified": registry[rel_path]["last_verified_at"],
})
else:
report["new"].append({
"path": rel_path,
"type": file_type,
"hash": content_hash[:16] + "...",
})
for reg_path in registry:
if reg_path not in seen_paths:
report["missing"].append({
"path": reg_path,
"last_hash": registry[reg_path]["content_hash"][:16] + "...",
"last_seen": registry[reg_path]["last_verified_at"],
})
return report
def diff(db_path: str, since: Optional[str] = None) -> List[dict]: """Show changes since last scan or since a given timestamp.""" ensure_tables(db_path) conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row
if since:
rows = conn.execute(
"""SELECT * FROM file_integrity_audit
WHERE recorded_at > ? AND action != 'verified'
ORDER BY recorded_at DESC""",
(since,),
).fetchall()
else:
# Get changes from the most recent scan (all non-verified entries from latest recorded_at batch)
latest = conn.execute(
"SELECT MAX(recorded_at) FROM file_integrity_audit WHERE action != 'verified'"
).fetchone()[0]
if not latest:
conn.close()
return []
rows = conn.execute(
"""SELECT * FROM file_integrity_audit
WHERE action != 'verified'
ORDER BY recorded_at DESC
LIMIT 100""",
).fetchall()
conn.close()
return [dict(r) for r in rows]
def export_manifest(db_path: str) -> dict: """Export full registry as JSON manifest.""" ensure_tables(db_path) conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT * FROM file_integrity_registry ORDER BY file_path" ).fetchall() audit_count = conn.execute( "SELECT COUNT(*) FROM file_integrity_audit" ).fetchone()[0] conn.close()
now = datetime.now(timezone.utc).isoformat()
files = []
for row in rows:
files.append({
"name": row["file_name"],
"path": row["file_path"],
"hash": row["content_hash"],
"size": row["file_size"],
"type": row["file_type"],
"first_seen": row["first_seen_at"],
"last_verified": row["last_verified_at"],
"last_changed": row["last_changed_at"],
})
return {
"manifest_version": "1.0.0",
"generated_at": now,
"file_count": len(files),
"audit_entries": audit_count,
"files": files,
}
def get_stats(db_path: str) -> dict: """Get registry statistics.""" ensure_tables(db_path) conn = sqlite3.connect(db_path)
total = conn.execute(
"SELECT COUNT(*) FROM file_integrity_registry"
).fetchone()[0]
by_type = conn.execute(
"""SELECT file_type, COUNT(*) as cnt
FROM file_integrity_registry
GROUP BY file_type
ORDER BY cnt DESC"""
).fetchall()
audit_total = conn.execute(
"SELECT COUNT(*) FROM file_integrity_audit"
).fetchone()[0]
audit_by_action = conn.execute(
"""SELECT action, COUNT(*) as cnt
FROM file_integrity_audit
GROUP BY action
ORDER BY cnt DESC"""
).fetchall()
# Dedup stats: files with same hash
dup_hashes = conn.execute(
"""SELECT content_hash, COUNT(*) as cnt
FROM file_integrity_registry
GROUP BY content_hash
HAVING cnt > 1
ORDER BY cnt DESC
LIMIT 10"""
).fetchall()
last_scan = conn.execute(
"SELECT MAX(last_verified_at) FROM file_integrity_registry"
).fetchone()[0]
conn.close()
return {
"total_files": total,
"by_type": [(t, c) for t, c in by_type],
"audit_entries": audit_total,
"audit_by_action": [(a, c) for a, c in audit_by_action],
"duplicate_hashes": [(h[:16] + "...", c) for h, c in dup_hashes],
"last_scan": last_scan,
}
============================================================================
CLI
============================================================================
def find_coditect_core_root() -> Optional[Path]: """Find the coditect-core root directory.""" # Try from current directory cwd = Path.cwd()
# Check if we're in coditect-core
if (cwd / "agents").is_dir() and (cwd / "commands").is_dir():
return cwd
# Check .coditect symlink
coditect_link = cwd / ".coditect"
if coditect_link.is_symlink() or coditect_link.is_dir():
target = coditect_link.resolve()
if (target / "agents").is_dir():
return target
# Check git root
try:
import subprocess
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
capture_output=True, text=True, cwd=str(cwd)
)
if result.returncode == 0:
git_root = Path(result.stdout.strip())
coditect = git_root / ".coditect"
if coditect.is_symlink() or coditect.is_dir():
target = coditect.resolve()
if (target / "agents").is_dir():
return target
# Direct coditect-core checkout
if (git_root / "agents").is_dir():
return git_root
except Exception:
pass
# Fallback: known location
fallback = Path.home() / "PROJECTS" / "coditect-rollout-master" / "submodules" / "core" / "coditect-core"
if fallback.is_dir() and (fallback / "agents").is_dir():
return fallback
return None
def main(): parser = argparse.ArgumentParser( description="CODITECT Zero Trust File Integrity Registry (ADR-182)", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--scan", action="store_true", help="Hash all files, update registry, log changes") parser.add_argument("--verify", action="store_true", help="Check files against registry, report drift (read-only)") parser.add_argument("--diff", action="store_true", help="Show changes since last scan") parser.add_argument("--since", type=str, default=None, help="Show changes since ISO timestamp (with --diff)") parser.add_argument("--export", action="store_true", help="Export full manifest as JSON") parser.add_argument("--export-file", type=str, default=None, help="Export manifest to file (default: stdout)") parser.add_argument("--stats", action="store_true", help="Show registry statistics") parser.add_argument("--baseline", action="store_true", help="Run initial baseline scan (same as --scan, first time)") parser.add_argument("--root", type=str, default=None, help="Override coditect-core root path") parser.add_argument("--db", type=str, default=None, help="Override org.db path") parser.add_argument("--recorded-by", type=str, default="system", help="Session ID for audit trail attribution") parser.add_argument("--json", action="store_true", help="Output in JSON format") parser.add_argument("--version", action="version", version=f"file-integrity {version}")
args = parser.parse_args()
# Resolve paths
db_path = args.db or get_org_db_path()
root = Path(args.root) if args.root else find_coditect_core_root()
if not root and (args.scan or args.verify or args.baseline):
print("Error: Could not find coditect-core root. Use --root to specify.", file=sys.stderr)
sys.exit(1)
if not os.path.exists(db_path) and not (args.scan or args.baseline):
print(f"Error: org.db not found at {db_path}. Run --scan or --baseline first.", file=sys.stderr)
sys.exit(1)
# Execute requested operation
if args.scan or args.baseline:
print(f"{'Baseline' if args.baseline else 'Scanning'}: {root}")
print(f"Database: {db_path}")
print()
stats = scan(root, db_path, args.recorded_by)
if args.json:
print(json.dumps(stats, indent=2))
else:
print(f"\nFile Integrity {'Baseline' if args.baseline else 'Scan'} Complete")
print("=" * 45)
print(f" Scanned: {stats['scanned']}")
print(f" Created: {stats['created']}")
print(f" Modified: {stats['modified']}")
print(f" Unchanged: {stats['unchanged']}")
print(f" Deleted: {stats['deleted']}")
print(f" Errors: {stats['errors']}")
elif args.verify:
print(f"Verifying: {root}")
print(f"Database: {db_path}")
print()
report = verify(root, db_path)
if args.json:
print(json.dumps(report, indent=2))
else:
print("File Integrity Verification Report")
print("=" * 45)
print(f" Scanned: {report['total_scanned']}")
print(f" Verified: {report['verified']} (unchanged)")
print(f" Modified: {len(report['modified'])}")
print(f" New: {len(report['new'])}")
print(f" Missing: {len(report['missing'])}")
if report["modified"]:
print("\nModified files (drift detected):")
for m in report["modified"]:
print(f" {m['path']}")
print(f" Expected: {m['expected_hash']} Actual: {m['actual_hash']}")
if report["missing"]:
print("\nMissing files:")
for m in report["missing"]:
print(f" {m['path']} (last seen: {m['last_seen']})")
if report["new"]:
print(f"\nNew files (not in registry): {len(report['new'])}")
for n in report["new"][:10]:
print(f" {n['path']} [{n['type']}]")
if len(report["new"]) > 10:
print(f" ... and {len(report['new']) - 10} more")
tampered = len(report["modified"]) + len(report["missing"])
if tampered == 0:
print("\nIntegrity: PASS — all files match registry")
else:
print(f"\nIntegrity: DRIFT DETECTED — {tampered} file(s) differ from registry")
elif args.diff:
changes = diff(db_path, args.since)
if args.json:
print(json.dumps(changes, indent=2))
else:
if not changes:
print("No changes found.")
else:
print(f"Changes ({len(changes)} entries):")
print("-" * 70)
for c in changes:
action = c["action"].upper()
path = c["file_path"]
ts = c["recorded_at"]
if c["old_hash"] and c["new_hash"]:
print(f" [{action}] {path}")
print(f" {c['old_hash'][:16]}... -> {c['new_hash'][:16]}...")
print(f" at {ts}")
elif c["new_hash"]:
print(f" [{action}] {path}")
print(f" hash: {c['new_hash'][:16]}...")
print(f" at {ts}")
else:
print(f" [{action}] {path}")
print(f" at {ts}")
elif args.export:
manifest = export_manifest(db_path)
if args.export_file:
with open(args.export_file, "w") as f:
json.dump(manifest, f, indent=2)
print(f"Manifest exported to {args.export_file}")
print(f" {manifest['file_count']} files, {manifest['audit_entries']} audit entries")
else:
print(json.dumps(manifest, indent=2))
elif args.stats:
stats = get_stats(db_path)
if args.json:
print(json.dumps(stats, indent=2, default=str))
else:
print("File Integrity Registry Statistics")
print("=" * 45)
print(f" Total files tracked: {stats['total_files']}")
print(f" Last scan: {stats['last_scan'] or 'Never'}")
print(f" Audit log entries: {stats['audit_entries']}")
if stats["by_type"]:
print("\n Files by type:")
for t, c in stats["by_type"]:
print(f" {t:20s} {c:>5}")
if stats["audit_by_action"]:
print("\n Audit by action:")
for a, c in stats["audit_by_action"]:
print(f" {a:20s} {c:>5}")
if stats["duplicate_hashes"]:
print(f"\n Duplicate content ({len(stats['duplicate_hashes'])} hashes with >1 file):")
for h, c in stats["duplicate_hashes"]:
print(f" {h} -> {c} files")
else:
parser.print_help()
if name == "main": main()