#!/usr/bin/env python3 """ CODITECT Context Watcher Status and Health Check
Verifies codi-watcher health and displays time-filtered export activity reports.
Usage: python3 watcher_status.py # Full status python3 watcher_status.py --hours 12 # Last 12 hours python3 watcher_status.py --days 7 # Last 7 days python3 watcher_status.py --json # JSON output python3 watcher_status.py --verbose # Include cooldowns python3 watcher_status.py --llm claude # Filter by LLM
Track: J (Memory Intelligence) Task: J.19.9 (Watcher Status Command) Created: 2026-02-03 Version: 1.0.0 """
import argparse import json import os import subprocess import sys from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple
ADR-114: User data location
CODITECT_DATA = Path.home() / "PROJECTS/.coditect-data" CONTEXT_STORAGE = CODITECT_DATA / "context-storage"
Watcher state file (v2 format)
STATE_FILE = CONTEXT_STORAGE / "watcher-state.json"
Export directories
EXPORT_DIRS = { "archive": CONTEXT_STORAGE / "exports-archive", "claude": CODITECT_DATA / "sessions-export-pending-anthropic", "codex": CODITECT_DATA / "sessions-export-pending-codex", "gemini": CODITECT_DATA / "sessions-export-pending-gemini", "kimi": CODITECT_DATA / "sessions-export-pending-kimi", }
def check_daemon_status() -> Tuple[bool, Optional[int], str]: """ Check if context watcher daemon is running.
Returns:
Tuple of (is_running, pid, status_message)
"""
# macOS: Check launchctl
if sys.platform == "darwin":
try:
result = subprocess.run(
["launchctl", "list", "ai.coditect.context-watcher"],
capture_output=True,
text=True,
)
if result.returncode == 0:
# Parse PID from output
lines = result.stdout.strip().split("\n")
for line in lines:
parts = line.split()
if len(parts) >= 1 and parts[0].isdigit():
return True, int(parts[0]), "Running via launchctl"
return True, None, "Running via launchctl (PID unknown)"
else:
return False, None, "Not loaded in launchctl"
except FileNotFoundError:
pass
# Linux: Check systemd
try:
result = subprocess.run(
["systemctl", "is-active", "coditect-context-watcher"],
capture_output=True,
text=True,
)
if result.stdout.strip() == "active":
# Get PID
pid_result = subprocess.run(
["systemctl", "show", "-p", "MainPID", "coditect-context-watcher"],
capture_output=True,
text=True,
)
pid = None
if "MainPID=" in pid_result.stdout:
try:
pid = int(pid_result.stdout.strip().split("=")[1])
except (ValueError, IndexError):
pass
return True, pid, "Running via systemd"
return False, None, "Not running (systemd)"
except FileNotFoundError:
pass
# Fallback: Check for process
try:
result = subprocess.run(
["pgrep", "-f", "coditect-context-watch"],
capture_output=True,
text=True,
)
if result.returncode == 0 and result.stdout.strip():
pid = int(result.stdout.strip().split()[0])
return True, pid, "Running (process found)"
except (FileNotFoundError, ValueError):
pass
return False, None, "Not running"
def read_state() -> Optional[Dict]: """Read and parse watcher state file (v2 format).""" if not STATE_FILE.exists(): return None
try:
with open(STATE_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Failed to parse state file: {e}", file=sys.stderr)
return None
def count_exports( hours: Optional[int] = None, days: Optional[int] = None, llm_filter: Optional[str] = None, ) -> Dict[str, int]: """ Count exports from all directories, optionally filtered by time and LLM.
Args:
hours: Filter to exports within last N hours
days: Filter to exports within last N days
llm_filter: Filter to specific LLM (claude, codex, gemini, kimi)
Returns:
Dict mapping source name to count
"""
# Calculate cutoff time
cutoff = None
if hours:
cutoff = datetime.now() - timedelta(hours=hours)
elif days:
cutoff = datetime.now() - timedelta(days=days)
counts = {}
dirs_to_check = EXPORT_DIRS
# Filter directories if LLM specified
if llm_filter:
llm_filter = llm_filter.lower()
if llm_filter == "claude":
dirs_to_check = {"claude": EXPORT_DIRS["claude"]}
elif llm_filter in EXPORT_DIRS:
dirs_to_check = {llm_filter: EXPORT_DIRS[llm_filter]}
for name, path in dirs_to_check.items():
if not path.exists():
counts[name] = 0
continue
files = [f for f in path.iterdir() if f.is_file()]
# Apply time filter
if cutoff:
filtered = []
for f in files:
try:
mtime = datetime.fromtimestamp(f.stat().st_mtime)
if mtime > cutoff:
filtered.append(f)
except OSError:
continue
files = filtered
counts[name] = len(files)
return counts
def get_export_timestamps( hours: Optional[int] = None, days: Optional[int] = None ) -> List[datetime]: """Get all export timestamps for frequency analysis.""" cutoff = None if hours: cutoff = datetime.now() - timedelta(hours=hours) elif days: cutoff = datetime.now() - timedelta(days=days)
timestamps = []
for path in EXPORT_DIRS.values():
if not path.exists():
continue
for f in path.iterdir():
if f.is_file():
try:
mtime = datetime.fromtimestamp(f.stat().st_mtime)
if cutoff is None or mtime > cutoff:
timestamps.append(mtime)
except OSError:
continue
return sorted(timestamps)
def detect_gaps(days: int = 7) -> List[str]: """ Find days with no exports in the given period.
Args:
days: Number of days to check
Returns:
List of date strings (YYYY-MM-DD) with no exports
"""
cutoff = datetime.now() - timedelta(days=days)
# Collect all export dates
dates_with_exports = set()
for path in EXPORT_DIRS.values():
if not path.exists():
continue
for f in path.iterdir():
if f.is_file():
try:
mtime = datetime.fromtimestamp(f.stat().st_mtime)
if mtime > cutoff:
dates_with_exports.add(mtime.date())
except OSError:
continue
# Find gaps
today = datetime.now().date()
gaps = []
for i in range(days):
check_date = today - timedelta(days=i)
if check_date not in dates_with_exports:
gaps.append(check_date.isoformat())
return gaps
def assess_health( daemon_running: bool, state: Optional[Dict], export_counts: Dict[str, int], gaps: List[str], ) -> Tuple[str, List[str]]: """ Assess overall watcher health.
Returns:
Tuple of (status, issues_list)
"""
issues = []
if not daemon_running:
issues.append("Daemon not running")
if state is None:
issues.append("State file missing or invalid")
else:
# Check state freshness
last_saved = state.get("last_saved")
if last_saved:
try:
saved_time = datetime.fromisoformat(last_saved.replace("Z", "+00:00"))
age = datetime.now(timezone.utc) - saved_time
if age > timedelta(hours=6):
issues.append(f"State file stale ({age.total_seconds() / 3600:.1f}h old)")
except (ValueError, TypeError):
pass
total_exports = sum(export_counts.values())
if total_exports == 0:
issues.append("No exports found")
# Only flag gaps if more than 1 day missing (allow for weekends, etc.)
if len(gaps) > 1:
issues.append(f"Export gaps: {len(gaps)} days in last 7")
if len(issues) == 0:
return "HEALTHY", []
elif len(issues) == 1:
return "DEGRADED", issues
else:
return "CRITICAL", issues
def format_duration(seconds: float) -> str: """Format seconds into human-readable duration.""" if seconds < 60: return f"{int(seconds)}s" elif seconds < 3600: return f"{int(seconds / 60)}m" elif seconds < 86400: return f"{seconds / 3600:.1f}h" else: return f"{seconds / 86400:.1f}d"
def format_ago(iso_timestamp: str) -> str: """Format ISO timestamp as 'X ago' string.""" try: ts = datetime.fromisoformat(iso_timestamp.replace("Z", "+00:00")) now = datetime.now(timezone.utc) delta = now - ts return f"{format_duration(delta.total_seconds())} ago" except (ValueError, TypeError): return "unknown"
def generate_report( hours: Optional[int] = None, days: Optional[int] = None, llm_filter: Optional[str] = None, json_output: bool = False, verbose: bool = False, ) -> str: """Generate complete watcher status report.""" # Gather data daemon_running, pid, daemon_msg = check_daemon_status() state = read_state() counts = count_exports(hours=hours, days=days, llm_filter=llm_filter) gaps = detect_gaps(days=7) health, issues = assess_health(daemon_running, state, counts, gaps)
# Calculate frequency
timestamps = get_export_timestamps(hours=hours or 24, days=days)
avg_frequency = None
if len(timestamps) >= 2:
time_span = (timestamps[-1] - timestamps[0]).total_seconds()
if time_span > 0:
avg_frequency = len(timestamps) / (time_span / 3600) # per hour
if json_output:
result = {
"daemon": {
"running": daemon_running,
"pid": pid,
"message": daemon_msg,
},
"version": state.get("version") if state else None,
"started": state.get("global", {}).get("watcher_started") if state else None,
"last_saved": state.get("last_saved") if state else None,
"total_exports": sum(counts.values()),
"exports_by_source": counts,
"time_filter": {
"hours": hours,
"days": days,
"llm": llm_filter,
},
"frequency_per_hour": round(avg_frequency, 2) if avg_frequency else None,
"gaps": gaps,
"health": health,
"issues": issues,
}
if verbose and state:
result["llms"] = state.get("llms", {})
return json.dumps(result, indent=2)
# Build text report
lines = []
lines.append("=" * 56)
lines.append(" CODITECT Context Watcher Status")
lines.append("=" * 56)
lines.append("")
# Daemon status
status_icon = "" if daemon_running else ""
pid_str = f", PID: {pid}" if pid else ""
lines.append(f"Daemon Status: {status_icon} {daemon_msg}{pid_str}")
if state:
lines.append(f"Version: {state.get('version', 'Unknown')}")
started = state.get("global", {}).get("watcher_started")
if started:
lines.append(f"Started: {started} ({format_ago(started)})")
last_saved = state.get("last_saved")
if last_saved:
lines.append(f"State Updated: {last_saved} ({format_ago(last_saved)})")
# Time filter info
lines.append("")
if hours:
lines.append(f"--- Exports (last {hours} hours) ---")
elif days:
lines.append(f"--- Exports (last {days} days) ---")
else:
lines.append("--- Export Summary (all time) ---")
total = sum(counts.values())
lines.append(f"Total: {total}")
for source, count in sorted(counts.items()):
if count > 0 or not llm_filter:
lines.append(f" - {source}: {count}")
# Frequency
if avg_frequency:
lines.append("")
lines.append(f"Export Frequency: {avg_frequency:.2f}/hour")
# Last export
if state:
llms_data = state.get("llms", {})
latest_export = None
for llm_name, llm_data in llms_data.items():
last_exp = llm_data.get("last_export")
if last_exp:
if latest_export is None or last_exp > latest_export:
latest_export = last_exp
if latest_export:
lines.append(f"Last Export: {latest_export} ({format_ago(latest_export)})")
# Session cooldowns (verbose)
if verbose and state:
llms_data = state.get("llms", {})
lines.append("")
lines.append("--- Session Cooldowns ---")
for llm_name, llm_data in llms_data.items():
cooldowns = llm_data.get("session_cooldowns", {})
if cooldowns:
lines.append(f"{llm_name.upper()}: {len(cooldowns)} sessions")
for session_id, cooldown_until in list(cooldowns.items())[:5]:
short_id = session_id[:8]
lines.append(f" - {short_id}: until {cooldown_until}")
if len(cooldowns) > 5:
lines.append(f" ... and {len(cooldowns) - 5} more")
# Gaps
if gaps:
lines.append("")
lines.append(f"--- Export Gaps (last 7 days) ---")
for gap_date in gaps[:5]:
lines.append(f" - {gap_date}: No exports")
if len(gaps) > 5:
lines.append(f" ... and {len(gaps) - 5} more days")
# Health assessment
lines.append("")
lines.append("--- Health Assessment ---")
health_icon = {"HEALTHY": "", "DEGRADED": "", "CRITICAL": ""}.get(health, "")
lines.append(f"Status: {health_icon} {health}")
if issues:
for issue in issues:
lines.append(f" {issue}")
else:
lines.append(" [x] Daemon running")
lines.append(" [x] Exports triggering")
lines.append(" [x] State file updating")
if not gaps:
lines.append(" [x] No gaps detected")
lines.append("")
lines.append("=" * 56)
return "\n".join(lines)
def main(): parser = argparse.ArgumentParser( description="CODITECT Context Watcher Status and Health Check", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # Full status report %(prog)s --hours 12 # Exports in last 12 hours %(prog)s --days 7 # Exports in last 7 days %(prog)s --json # JSON output for scripting %(prog)s --verbose # Include session cooldowns %(prog)s --llm claude # Filter to Claude exports """, )
parser.add_argument(
"--hours",
type=int,
metavar="N",
help="Filter exports to last N hours",
)
parser.add_argument(
"--days",
type=int,
metavar="N",
help="Filter exports to last N days",
)
parser.add_argument(
"--llm",
type=str,
choices=["claude", "codex", "gemini", "kimi"],
help="Filter to specific LLM",
)
parser.add_argument(
"--json",
action="store_true",
help="Output as JSON",
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Include detailed session cooldowns",
)
args = parser.parse_args()
# Validate mutually exclusive time options
if args.hours and args.days:
parser.error("Cannot specify both --hours and --days")
report = generate_report(
hours=args.hours,
days=args.days,
llm_filter=args.llm,
json_output=args.json,
verbose=args.verbose,
)
print(report)
if name == "main": main()