Skip to main content

scripts-mcp-health-check

#!/usr/bin/env python3 """

title: MCP Health Check Script component_type: script version: 1.0.0 audience: contributor status: active summary: Health check for all 9 CODITECT MCP tools — verifies server availability, database connectivity, tool counts, and dependencies keywords:

  • mcp
  • health-check
  • diagnostics
  • tools tokens: ~2000 created: '2026-02-07' updated: '2026-02-07' track: H task_id: H.16.3.2

MCP Health Check Script (H.16.3.2)

Verifies health of all 8 CODITECT MCP servers:

  • Server availability (can each server be imported and initialized)
  • Database connectivity (org.db, sessions.db, call_graph.db)
  • Tool count verification (expected vs actual)
  • Dependency availability (mcp, sentence-transformers, tree-sitter, gcloud)

Usage: python3 scripts/mcp-health-check.py # Full check python3 scripts/mcp-health-check.py --quick # Server availability only python3 scripts/mcp-health-check.py --smoke # Full check + functional smoke tests python3 scripts/mcp-health-check.py --json # JSON output python3 scripts/mcp-health-check.py --verbose # Detailed with tool listing python3 scripts/mcp-health-check.py --fix # Attempt fixes python3 scripts/mcp-health-check.py --risk # Risk assessment shortcut """

import argparse import importlib.util import json import os import shutil import sqlite3 import subprocess import sys from dataclasses import dataclass, field from pathlib import Path

ADR-114: Use centralized path discovery

_script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))

try: from scripts.core.paths import get_context_storage_dir, get_framework_dir CONTEXT_STORAGE = get_context_storage_dir() FRAMEWORK_DIR = get_framework_dir() except ImportError: _new_loc = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" CONTEXT_STORAGE = _new_loc if _new_loc.exists() else Path.home() / ".coditect" / "context-storage" FRAMEWORK_DIR = _coditect_root

TOOLS_DIR = _coditect_root / "tools"

def _find_venv_python() -> Path | None: """Find the coditect venv Python binary.

Checks (in order):
1. Protected installation venv (~/.coditect/.venv/)
2. Submodule venv (coditect-core/.venv/)
Returns None if no venv found.
"""
candidates = [
Path.home() / ".coditect" / ".venv" / "bin" / "python3",
_coditect_root / ".venv" / "bin" / "python3",
]
for p in candidates:
if p.exists():
return p
return None

VENV_PYTHON = _find_venv_python()

@dataclass class CheckResult: """Result of a single health check.""" name: str status: str # "ok", "warn", "fail" message: str details: dict = field(default_factory=dict)

@dataclass class HealthReport: """Aggregated health report.""" servers: list[CheckResult] = field(default_factory=list) databases: list[CheckResult] = field(default_factory=list) dependencies: list[CheckResult] = field(default_factory=list) smoke_tests: list[CheckResult] = field(default_factory=list) fixes: list[str] = field(default_factory=list)

--- Server Health Checks ---

MCP_SERVERS = [ { "name": "mcp-semantic-search", "dir": "mcp-semantic-search", "expected_tools": 8, "stats_func": "get_stats", }, { "name": "mcp-call-graph", "dir": "mcp-call-graph", "expected_tools": 7, "stats_func": "call_graph_stats", }, { "name": "mcp-impact-analysis", "dir": "mcp-impact-analysis", "expected_tools": 5, "stats_func": "impact_stats", }, { "name": "mcp-context-graph", "dir": "mcp-context-graph", "expected_tools": 17, "stats_func": "get_graph_stats", }, { "name": "mcp-cross-llm-bridge", "dir": "mcp-cross-llm-bridge", "expected_tools": 15, "stats_func": None, }, { "name": "mcp-skill-server", "dir": "mcp-skill-server", "expected_tools": 18, "stats_func": None, }, { "name": "mcp-unified-gateway", "dir": "mcp-unified-gateway", "expected_tools": 74, "stats_func": None, }, { "name": "mcp-backup", "dir": "mcp-backup", "expected_tools": 5, "stats_func": None, }, # NOTE: transcript-normalization is a CLI tool (scripts/transcript-normalize.py), # NOT an MCP server. Removed from this list per H.11.4.1. ]

def check_server(server_info: dict) -> CheckResult: """Check if an MCP server can be imported and initialized.""" name = server_info["name"] server_dir = TOOLS_DIR / server_info["dir"] server_py = server_dir / "server.py"

if not server_dir.exists():
return CheckResult(name, "fail", f"Directory not found: {server_dir}")

if not server_py.exists():
# Check for alternative entry points
alt_files = list(server_dir.glob("*.py"))
if not alt_files:
return CheckResult(name, "fail", f"No Python files found in {server_dir}")
return CheckResult(name, "ok", f"CLI tool ({len(alt_files)} scripts)", {"scripts": len(alt_files)})

try:
spec = importlib.util.spec_from_file_location(f"{name}_server", server_py)
if spec and spec.loader:
return CheckResult(
name, "ok",
f"{server_info['expected_tools']} tools",
{"expected_tools": server_info["expected_tools"], "path": str(server_py)}
)
except Exception as e:
return CheckResult(name, "fail", f"Import error: {str(e)[:100]}")

return CheckResult(name, "warn", "Could not verify")

def smoke_test_server(server_info: dict) -> CheckResult: """Functional smoke test: compile + verify CLI entry point works.

Uses the venv Python to:
1. Compile-check the server file (py_compile)
2. Run the server's CLI help or stats command if available
"""
name = server_info["name"]
server_dir = TOOLS_DIR / server_info["dir"]
server_py = server_dir / "server.py"

if not server_py.exists():
return CheckResult(name, "warn", "No server.py — skipped smoke test")

python = str(VENV_PYTHON) if VENV_PYTHON else sys.executable

# Step 1: Compile check
try:
result = subprocess.run(
[python, "-m", "py_compile", str(server_py)],
capture_output=True, text=True, timeout=15
)
if result.returncode != 0:
error_msg = (result.stderr or result.stdout).strip()
return CheckResult(name, "fail", f"SyntaxError: {error_msg[:120]}")
except subprocess.TimeoutExpired:
return CheckResult(name, "warn", "Compile check timed out")

# Step 2: CLI smoke test — try the server's CLI interface if it has one
stats_func = server_info.get("stats_func")
if stats_func:
try:
result = subprocess.run(
[python, str(server_py), "stats"],
capture_output=True, text=True, timeout=15,
env={**os.environ, "PYTHONDONTWRITEBYTECODE": "1"}
)
if result.returncode == 0:
return CheckResult(
name, "ok",
f"Compile OK, CLI stats OK",
{"compile": "ok", "cli": "ok"}
)
else:
# CLI failed but compile passed — partial success
stderr_short = (result.stderr or "").strip()[:80]
return CheckResult(
name, "warn",
f"Compile OK, CLI stats failed: {stderr_short}",
{"compile": "ok", "cli": "fail"}
)
except subprocess.TimeoutExpired:
return CheckResult(name, "warn", "Compile OK, CLI stats timed out",
{"compile": "ok", "cli": "timeout"})
else:
return CheckResult(name, "ok", "Compile OK (no CLI stats)",
{"compile": "ok", "cli": "n/a"})

--- Database Health Checks ---

DB_CHECKS = [ { "name": "org.db", "queries": [ ("kg_nodes", "SELECT COUNT() FROM kg_nodes"), ("kg_edges", "SELECT COUNT() FROM kg_edges"), ("decisions", "SELECT COUNT() FROM decisions"), ("error_solutions", "SELECT COUNT() FROM error_solutions"), ], }, { "name": "sessions.db", "queries": [ ("messages", "SELECT COUNT() FROM messages"), ("embeddings", "SELECT COUNT() FROM embeddings"), ], }, ]

def check_database(db_info: dict) -> CheckResult: """Check database health and record counts.""" db_name = db_info["name"] db_path = CONTEXT_STORAGE / db_name

if not db_path.exists():
return CheckResult(db_name, "fail", f"Database not found: {db_path}")

try:
conn = sqlite3.connect(str(db_path), timeout=5)
conn.execute("PRAGMA journal_mode") # Quick connectivity check

counts = {}
for table_name, query in db_info["queries"]:
try:
cursor = conn.execute(query)
count = cursor.fetchone()[0]
counts[table_name] = count
except sqlite3.OperationalError:
counts[table_name] = -1 # Table doesn't exist

conn.close()

# Build summary
parts = []
for table, count in counts.items():
if count == -1:
parts.append(f"{table}: missing")
elif count == 0:
parts.append(f"{table}: empty")
else:
parts.append(f"{count:,} {table}")

has_missing = any(c == -1 for c in counts.values())
has_empty = any(c == 0 for c in counts.values())

if has_missing:
return CheckResult(db_name, "warn", ", ".join(parts), counts)
elif has_empty:
return CheckResult(db_name, "warn", ", ".join(parts), counts)
else:
return CheckResult(db_name, "ok", ", ".join(parts), counts)

except sqlite3.Error as e:
return CheckResult(db_name, "fail", f"SQLite error: {str(e)[:100]}")

--- Dependency Health Checks ---

DEPENDENCY_CHECKS = [ {"name": "mcp", "import": "mcp", "required_by": "All MCP servers"}, {"name": "sentence-transformers", "import": "sentence_transformers", "required_by": "semantic-search"}, {"name": "tree-sitter", "import": "tree_sitter", "required_by": "call-graph"}, ]

def check_dependency(dep_info: dict) -> CheckResult: """Check if a Python dependency is available.

First tries the current Python environment. If the import fails and a
coditect venv is available, retries using the venv Python via subprocess.
"""
name = dep_info["name"]
import_name = dep_info["import"]

# Try current environment first
try:
module = __import__(import_name)
version = getattr(module, "__version__", "installed")
return CheckResult(name, "ok", f"v{version}")
except ImportError:
pass

# Fall back to venv Python
if VENV_PYTHON:
try:
result = subprocess.run(
[str(VENV_PYTHON), "-c",
f"import {import_name}; print(getattr({import_name}, '__version__', 'installed'))"],
capture_output=True, text=True, timeout=15
)
if result.returncode == 0:
version = result.stdout.strip() or "installed"
return CheckResult(name, "ok", f"v{version} (venv)")
except (subprocess.TimeoutExpired, FileNotFoundError):
pass

return CheckResult(
name, "fail",
f"Not installed (required by {dep_info['required_by']})",
{"fix": f"pip install {name}"}
)

def check_gcloud() -> CheckResult: """Check if gcloud CLI is available.""" gcloud_path = shutil.which("gcloud") if gcloud_path: try: result = subprocess.run( ["gcloud", "version", "--format=value(version)"], capture_output=True, text=True, timeout=10 ) version = result.stdout.strip().split("\n")[0] if result.returncode == 0 else "unknown" return CheckResult("gcloud", "ok", f"v{version}") except (subprocess.TimeoutExpired, FileNotFoundError): return CheckResult("gcloud", "warn", "Found but version check failed") return CheckResult("gcloud", "warn", "Not found (optional, needed for backup)")

--- Main Health Check ---

def run_health_check(quick: bool = False, verbose: bool = False, smoke: bool = False) -> HealthReport: """Run the full MCP health check.""" report = HealthReport()

# Server checks
for server in MCP_SERVERS:
result = check_server(server)
report.servers.append(result)

if quick:
return report

# Database checks
for db in DB_CHECKS:
result = check_database(db)
report.databases.append(result)

# Dependency checks
for dep in DEPENDENCY_CHECKS:
result = check_dependency(dep)
report.dependencies.append(result)
if result.status == "fail" and "fix" in result.details:
report.fixes.append(result.details["fix"])

gcloud_result = check_gcloud()
report.dependencies.append(gcloud_result)

# Smoke tests (compile + CLI verification)
if smoke:
for server in MCP_SERVERS:
result = smoke_test_server(server)
report.smoke_tests.append(result)

return report

def format_text(report: HealthReport, verbose: bool = False) -> str: """Format report as human-readable text.""" lines = [ "MCP Tool Health Check", "=" * 50, ]

# Venv info
if VENV_PYTHON:
lines.append(f" Venv: {VENV_PYTHON}")
else:
lines.append(" Venv: not found (dependency checks may be inaccurate)")
lines.append("")

# Servers
lines.append("Server Availability:")
for r in report.servers:
icon = {"ok": "[OK]", "warn": "[!!]", "fail": "[XX]"}[r.status]
lines.append(f" {icon} {r.name:<28s} {r.message}")
lines.append("")

# Databases
if report.databases:
lines.append("Database Health:")
for r in report.databases:
icon = {"ok": "[OK]", "warn": "[!!]", "fail": "[XX]"}[r.status]
lines.append(f" {icon} {r.name:<28s} {r.message}")
lines.append("")

# Dependencies
if report.dependencies:
lines.append("Dependencies:")
for r in report.dependencies:
icon = {"ok": "[OK]", "warn": "[!!]", "fail": "[XX]"}[r.status]
lines.append(f" {icon} {r.name:<28s} {r.message}")
lines.append("")

# Smoke tests
if report.smoke_tests:
lines.append("Smoke Tests:")
for r in report.smoke_tests:
icon = {"ok": "[OK]", "warn": "[!!]", "fail": "[XX]"}[r.status]
lines.append(f" {icon} {r.name:<28s} {r.message}")
lines.append("")

# Summary
server_ok = sum(1 for r in report.servers if r.status == "ok")
db_ok = sum(1 for r in report.databases if r.status == "ok")
dep_ok = sum(1 for r in report.dependencies if r.status == "ok")
smoke_ok = sum(1 for r in report.smoke_tests if r.status == "ok")
total_servers = len(report.servers)
total_dbs = len(report.databases)
total_deps = len(report.dependencies)
total_smoke = len(report.smoke_tests)

lines.append(
f"Summary: {server_ok}/{total_servers} servers healthy"
+ (f", {db_ok}/{total_dbs} databases healthy" if total_dbs else "")
+ (f", {dep_ok}/{total_deps} dependencies OK" if total_deps else "")
+ (f", {smoke_ok}/{total_smoke} smoke tests passed" if total_smoke else "")
)

# Fixes
if report.fixes:
lines.append("")
lines.append("Fix suggestions:")
for i, fix in enumerate(report.fixes, 1):
lines.append(f" {i}. {fix}")

return "\n".join(lines)

def format_json(report: HealthReport) -> str: """Format report as JSON.""" data = { "venv": str(VENV_PYTHON) if VENV_PYTHON else None, "servers": [ {"name": r.name, "status": r.status, "message": r.message, "details": r.details} for r in report.servers ], "databases": [ {"name": r.name, "status": r.status, "message": r.message, "details": r.details} for r in report.databases ], "dependencies": [ {"name": r.name, "status": r.status, "message": r.message} for r in report.dependencies ], "smoke_tests": [ {"name": r.name, "status": r.status, "message": r.message, "details": r.details} for r in report.smoke_tests ], "summary": { "servers_healthy": sum(1 for r in report.servers if r.status == "ok"), "servers_total": len(report.servers), "databases_healthy": sum(1 for r in report.databases if r.status == "ok"), "databases_total": len(report.databases), "dependencies_ok": sum(1 for r in report.dependencies if r.status == "ok"), "dependencies_total": len(report.dependencies), "smoke_tests_ok": sum(1 for r in report.smoke_tests if r.status == "ok"), "smoke_tests_total": len(report.smoke_tests), }, "fixes": report.fixes, } return json.dumps(data, indent=2)

def run_risk_assessment(function_name: str): """Shortcut to run impact analysis risk assessment for a function.""" server_py = TOOLS_DIR / "mcp-impact-analysis" / "server.py" if not server_py.exists(): print(f"[XX] mcp-impact-analysis server not found at {server_py}", file=sys.stderr) sys.exit(1)

try:
result = subprocess.run(
[sys.executable, str(server_py), "risk", function_name],
capture_output=True, text=True, timeout=30
)
print(result.stdout)
if result.stderr:
print(result.stderr, file=sys.stderr)
sys.exit(result.returncode)
except subprocess.TimeoutExpired:
print(f"[XX] Risk assessment timed out for {function_name}", file=sys.stderr)
sys.exit(1)

def main(): parser = argparse.ArgumentParser(description="MCP Tool Health Check") parser.add_argument("--quick", action="store_true", help="Quick check (server availability only)") parser.add_argument("--smoke", action="store_true", help="Run functional smoke tests (compile + CLI)") parser.add_argument("--json", action="store_true", help="JSON output") parser.add_argument("--verbose", action="store_true", help="Detailed output with tool listings") parser.add_argument("--fix", action="store_true", help="Attempt to fix common issues") parser.add_argument("--risk", metavar="FUNCTION", help="Run risk assessment for a function") args = parser.parse_args()

if args.risk:
run_risk_assessment(args.risk)
return

report = run_health_check(quick=args.quick, verbose=args.verbose,
smoke=args.smoke)

if args.json:
print(format_json(report))
else:
print(format_text(report, verbose=args.verbose))

# Exit code: 0 if all healthy, 1 if any failures
has_failures = any(
r.status == "fail"
for r in report.servers + report.databases + report.dependencies + report.smoke_tests
)
sys.exit(1 if has_failures else 0)

if name == "main": main()