#!/usr/bin/env python3 """ Context Watcher Daemon - Background process to auto-export sessions
Watches Claude Code session files and triggers export when context reaches threshold. Runs independently of Claude Code hooks (which are unreliable).
Cross-platform: Works on macOS and Linux. Windows support planned.
Usage: # Start daemon with defaults python3 ~/.coditect/scripts/context-watcher-daemon.py
# With custom threshold
python3 ~/.coditect/scripts/context-watcher-daemon.py --threshold 80
# Check status
python3 ~/.coditect/scripts/context-watcher-daemon.py --status
# Stop daemon
python3 ~/.coditect/scripts/context-watcher-daemon.py --stop
# Check if running
pgrep -f context-watcher-daemon
"""
import argparse import json import os import platform import signal import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional
ADR-114: Use centralized path discovery
_script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))
try: from scripts.core.paths import get_context_storage_dir, get_user_data_dir CONTEXT_STORAGE = get_context_storage_dir() except ImportError: get_user_data_dir = None _new_location = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" CONTEXT_STORAGE = _new_location if _new_location.exists() else Path.home() / ".coditect" / "context-storage"
Default configuration
DEFAULT_MIN_CONTEXT_PERCENT = 75 # Trigger export at 75% DEFAULT_MAX_CONTEXT_PERCENT = 95 # Don't trigger above 95% (too late) CONTEXT_LIMIT_TOKENS = 200000 DEFAULT_COOLDOWN_MINUTES = 10 # Cooldown between exports DEFAULT_POLL_INTERVAL_SECONDS = 10 # Check every 10 seconds
File paths (ADR-114 compliant)
STATE_FILE = CONTEXT_STORAGE / "auto-export-state.json" PID_FILE = CONTEXT_STORAGE / "context-watcher.pid"
ADR-114: Runtime logs go to user data dir
LOG_FILE = (get_user_data_dir() / "logs" / "context-watcher.log") if get_user_data_dir else (Path.home() / ".coditect" / "logs" / "context-watcher.log")
Runtime configuration (set by CLI args)
MIN_CONTEXT_PERCENT = DEFAULT_MIN_CONTEXT_PERCENT MAX_CONTEXT_PERCENT = DEFAULT_MAX_CONTEXT_PERCENT COOLDOWN_MINUTES = DEFAULT_COOLDOWN_MINUTES POLL_INTERVAL_SECONDS = DEFAULT_POLL_INTERVAL_SECONDS
def log(message: str): """Log message with timestamp.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{timestamp}] {message}" print(line, flush=True) try: with open(LOG_FILE, "a") as f: f.write(line + "\n") except: pass
def notify(title: str, message: str, sound: bool = True): """Send desktop notification (cross-platform: macOS + Linux).""" system = platform.system()
try:
if system == "Darwin": # macOS
sound_cmd = 'sound name "Glass"' if sound else ""
script = f'''
display notification "{message}" with title "{title}" {sound_cmd}
'''
subprocess.run(
["osascript", "-e", script],
capture_output=True,
timeout=5
)
elif system == "Linux":
# Use notify-send (requires libnotify-bin on Ubuntu/Debian)
cmd = ["notify-send", title, message]
if sound:
cmd.extend(["--urgency=normal"])
subprocess.run(cmd, capture_output=True, timeout=5)
# Windows support can be added later with win10toast or plyer
except FileNotFoundError:
log(f"Notification command not found on {system}")
except Exception as e:
log(f"Notification error: {e}")
def load_state() -> dict: """Load persistent state.""" if STATE_FILE.exists(): try: return json.loads(STATE_FILE.read_text()) except: pass return {}
def save_state(state: dict): """Save persistent state.""" STATE_FILE.parent.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text(json.dumps(state, indent=2))
def find_active_session() -> Optional[Path]: """Find the most recently modified session JSONL file.""" projects_dir = Path.home() / ".claude" / "projects" if not projects_dir.exists(): return None
# Find all JSONL files
jsonl_files = list(projects_dir.rglob("*.jsonl"))
if not jsonl_files:
return None
# Return most recently modified
return max(jsonl_files, key=lambda f: f.stat().st_mtime)
def get_context_percent(session_file: Path) -> tuple[float, dict]: """Parse session JSONL for context percentage.""" try: with open(session_file, 'rb') as f: # Read last 100KB f.seek(0, 2) file_size = f.tell() read_size = min(100000, file_size) f.seek(max(0, file_size - read_size)) content = f.read().decode('utf-8', errors='ignore')
# Parse lines from end
lines = content.strip().split('\n')
for line in reversed(lines):
try:
if not line.strip() or not line.startswith('{'):
continue
entry = json.loads(line)
message = entry.get("message", {})
usage = message.get("usage", {})
if usage:
input_tokens = usage.get("input_tokens", 0)
cache_read = usage.get("cache_read_input_tokens", 0)
cache_creation = usage.get("cache_creation_input_tokens", 0)
output_tokens = usage.get("output_tokens", 0)
total = cache_read + cache_creation + input_tokens + output_tokens
percentage = (total / CONTEXT_LIMIT_TOKENS) * 100
return percentage, {
"total_tokens": total,
"session_file": session_file.name
}
except json.JSONDecodeError:
continue
return -1, {}
except Exception as e:
return -1, {"error": str(e)}
def in_cooldown(state: dict) -> bool: """Check if in cooldown period.""" last_export = state.get("last_export") if not last_export: return False
try:
last_time = datetime.fromisoformat(last_export.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
elapsed = (now - last_time).total_seconds() / 60
return elapsed < COOLDOWN_MINUTES
except:
return False
def run_export() -> tuple[bool, str]: """Run session export.""" try: exporter = Path.home() / ".coditect" / "scripts" / "session-exporter.py" exports_pending = CONTEXT_STORAGE / "exports-pending" exports_pending.mkdir(parents=True, exist_ok=True)
if exporter.exists():
result = subprocess.run(
["python3", str(exporter), "--current"],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
# Find most recent export (.jsonl or legacy .txt)
jsonl_files = list(exports_pending.glob("*.jsonl"))
txt_files = list(exports_pending.glob("*.txt"))
all_files = jsonl_files + txt_files
files = sorted(all_files, key=lambda f: f.stat().st_mtime, reverse=True)
if files:
return True, files[0].name
return False, ""
except Exception as e:
log(f"Export error: {e}")
return False, ""
def run_cx_processing(export_file: Path) -> tuple[bool, dict]: """Run /cx processing on an export file.
Returns (success, stats) where stats contains messages_new, messages_duplicate.
Added in J.10 (2026-01-11) for automatic /cx processing.
"""
try:
extractor = Path.home() / ".coditect" / "scripts" / "unified-message-extractor.py"
exports_archive = CONTEXT_STORAGE / "exports-archive"
cx_reports = CONTEXT_STORAGE / "cx-processing-reports"
exports_archive.mkdir(parents=True, exist_ok=True)
cx_reports.mkdir(parents=True, exist_ok=True)
if not extractor.exists():
log(f"Extractor not found: {extractor}")
return False, {}
# Run unified-message-extractor
# Note: exports are session JSONL copies, not /export TXT format
result = subprocess.run(
["python3", str(extractor), "--jsonl", str(export_file)],
capture_output=True,
text=True,
timeout=120
)
stats = {"messages_new": 0, "messages_duplicate": 0}
if result.returncode == 0:
# Try to parse JSON output for stats
try:
for line in result.stdout.strip().split('\n'):
if line.startswith('{'):
data = json.loads(line)
stats["messages_new"] = data.get("new_messages", 0)
stats["messages_duplicate"] = data.get("duplicates", 0)
break
except:
pass
# Move to archive
archive_path = exports_archive / export_file.name
if archive_path.exists():
# Handle collision
stem = export_file.stem
ext = export_file.suffix
timestamp = datetime.now().strftime("%H%M%S")
archive_path = exports_archive / f"{stem}-{timestamp}{ext}"
export_file.rename(archive_path)
log(f"Processed and archived: {archive_path.name} ({stats['messages_new']} new messages)")
# Write processing report
report_file = cx_reports / f"cx-{datetime.now().strftime('%Y%m%d-%H%M%S')}.jsonl"
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"file": export_file.name,
"messages_new": stats["messages_new"],
"messages_duplicate": stats["messages_duplicate"],
"archived_to": str(archive_path)
}
with open(report_file, 'w') as f:
f.write(json.dumps(report) + "\n")
return True, stats
else:
log(f"Extractor failed: {result.stderr[:200]}")
return False, stats
except Exception as e:
log(f"CX processing error: {e}")
return False, {}
def process_all_pending_exports() -> tuple[int, int]: """Process all pending exports in exports-pending/.
Returns (files_processed, total_messages_new).
Added in J.10 (2026-01-11) for processing legacy and manual exports.
"""
exports_pending = CONTEXT_STORAGE / "exports-pending"
if not exports_pending.exists():
return 0, 0
# Find all pending files (.jsonl and .txt)
jsonl_files = list(exports_pending.glob("*.jsonl"))
txt_files = list(exports_pending.glob("*.txt"))
all_files = jsonl_files + txt_files
if not all_files:
return 0, 0
files_processed = 0
total_messages = 0
log(f"Processing {len(all_files)} pending export(s)...")
for export_file in sorted(all_files, key=lambda f: f.stat().st_mtime):
success, stats = run_cx_processing(export_file)
if success:
files_processed += 1
total_messages += stats.get("messages_new", 0)
if files_processed > 0:
log(f"Processed {files_processed} file(s), {total_messages} new messages")
notify(
"CODITECT Batch Processing Complete",
f"Processed {files_processed} export(s)\n{total_messages} new messages extracted"
)
return files_processed, total_messages
def check_and_export(): """Main check routine.""" state = load_state()
# Find active session
session = find_active_session()
if not session:
return
# Check if session file was modified recently (within 5 minutes)
mtime = session.stat().st_mtime
if time.time() - mtime > 300:
return # Session not active
# Get context percentage
context_pct, details = get_context_percent(session)
if context_pct < 0:
return
# Verbose logging for each poll
tokens = details.get("total_tokens", 0)
log(f"Poll: {context_pct:.1f}% ({tokens:,} tokens) - {session.name}")
# Update state
state["last_check"] = datetime.now(timezone.utc).isoformat()
state["last_context_percent"] = round(context_pct, 1)
state["last_tokens"] = tokens
state["session_file"] = details.get("session_file", "")
state["watcher_active"] = True
# Check threshold
if context_pct < MIN_CONTEXT_PERCENT or context_pct > MAX_CONTEXT_PERCENT:
save_state(state)
return
# Check cooldown
if in_cooldown(state):
save_state(state)
return
# TRIGGER EXPORT
log(f"Context at {context_pct:.1f}% - triggering export...")
success, filename = run_export()
if success:
now = datetime.now(timezone.utc).isoformat()
# Track pending exports
pending = state.get("pending_exports", [])
pending.append({
"filename": filename,
"exported_at": now,
"context_percent": round(context_pct, 1),
"tokens": details.get("total_tokens", 0),
"session_file": details.get("session_file", "")
})
state["pending_exports"] = pending
state["last_export"] = now
state["last_export_file"] = filename
state["export_context_percent"] = round(context_pct, 1)
state["trigger"] = "watcher"
export_path = CONTEXT_STORAGE / "exports-pending" / filename
log(f"Exported: {export_path} at {context_pct:.1f}%")
# Auto /cx processing (J.10)
cx_success, cx_stats = run_cx_processing(export_path)
if cx_success:
messages_new = cx_stats.get("messages_new", 0)
# Remove from pending since it's been processed
state["pending_exports"] = [p for p in pending if p.get("filename") != filename]
notify(
"CODITECT Auto-Export Complete",
f"Context at {context_pct:.1f}%\nProcessed: {messages_new} new messages\nAuto-archived"
)
log(f"Auto /cx complete: {messages_new} new messages extracted")
else:
# Fall back to manual /cx notification
notify(
"CODITECT Session Export",
f"Context at {context_pct:.1f}%\n{export_path}\nAuto-processing failed - run /cx manually"
)
# Open in VS Code for manual review
try:
subprocess.run(["code", str(export_path)], capture_output=True, timeout=5)
except Exception as e:
log(f"VS Code open error: {e}")
else:
log("Export failed")
notify("CODITECT Export Failed", "Auto-export failed. Check logs.", sound=True)
save_state(state)
def write_pid(): """Write PID file.""" PID_FILE.parent.mkdir(parents=True, exist_ok=True) PID_FILE.write_text(str(os.getpid()))
def cleanup_pid(): """Remove PID file.""" try: PID_FILE.unlink() except: pass
def check_status(): """Check if daemon is running and show status.""" print("CODITECT Context Watcher Status") print("=" * 40)
# Check PID file
if PID_FILE.exists():
try:
pid = int(PID_FILE.read_text().strip())
# Check if process is running
try:
os.kill(pid, 0) # Signal 0 = check if process exists
print(f"Status: RUNNING (PID: {pid})")
except OSError:
print(f"Status: NOT RUNNING (stale PID file: {pid})")
except ValueError:
print("Status: UNKNOWN (invalid PID file)")
else:
print("Status: NOT RUNNING")
# Show state
if STATE_FILE.exists():
try:
state = json.loads(STATE_FILE.read_text())
print()
print("Last Activity:")
if state.get("last_check"):
print(f" Last check: {state['last_check']}")
if state.get("last_context_percent"):
print(f" Context: {state['last_context_percent']}%")
if state.get("last_export"):
print(f" Last export: {state['last_export']}")
pending = state.get("pending_exports", [])
print(f" Pending exports: {len(pending)}")
except Exception:
pass
print()
def stop_daemon(): """Stop a running daemon.""" if not PID_FILE.exists(): print("No PID file found - daemon may not be running") return False
try:
pid = int(PID_FILE.read_text().strip())
os.kill(pid, signal.SIGTERM)
print(f"Sent SIGTERM to PID {pid}")
# Wait for process to exit
for _ in range(10):
try:
os.kill(pid, 0)
time.sleep(0.5)
except OSError:
print("Daemon stopped")
cleanup_pid()
return True
print("Daemon did not stop, sending SIGKILL...")
os.kill(pid, signal.SIGKILL)
cleanup_pid()
return True
except ProcessLookupError:
print("Process not found - cleaning up stale PID file")
cleanup_pid()
return True
except Exception as e:
print(f"Error stopping daemon: {e}")
return False
def run_daemon(): """Main daemon loop.""" global MIN_CONTEXT_PERCENT, MAX_CONTEXT_PERCENT, COOLDOWN_MINUTES, POLL_INTERVAL_SECONDS
log("Context Watcher Daemon starting...")
log(f" Platform: {platform.system()}")
log(f" Threshold: {MIN_CONTEXT_PERCENT}%")
log(f" Max threshold: {MAX_CONTEXT_PERCENT}%")
log(f" Poll interval: {POLL_INTERVAL_SECONDS}s")
log(f" Cooldown: {COOLDOWN_MINUTES} minutes")
# Ensure log directory exists
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
write_pid()
# Setup signal handlers
def handle_signal(signum, frame):
log(f"Received signal {signum}, shutting down...")
cleanup_pid()
sys.exit(0)
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
# Process any pending exports on startup (J.10)
log("Checking for pending exports on startup...")
try:
files, messages = process_all_pending_exports()
if files > 0:
log(f"Startup: processed {files} pending export(s)")
except Exception as e:
log(f"Startup pending processing error: {e}")
# Counter for periodic pending processing (every 60 seconds)
pending_check_interval = 60 # seconds
last_pending_check = time.time()
try:
while True:
try:
check_and_export()
except Exception as e:
log(f"Error in check loop: {e}")
# Periodic processing of pending exports (J.10)
if time.time() - last_pending_check >= pending_check_interval:
last_pending_check = time.time()
try:
process_all_pending_exports()
except Exception as e:
log(f"Pending processing error: {e}")
time.sleep(POLL_INTERVAL_SECONDS)
except KeyboardInterrupt:
log("Shutting down...")
finally:
cleanup_pid()
def main(): """CLI entry point.""" global MIN_CONTEXT_PERCENT, MAX_CONTEXT_PERCENT, COOLDOWN_MINUTES, POLL_INTERVAL_SECONDS
parser = argparse.ArgumentParser(
description="CODITECT Context Watcher Daemon",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples: python3 context-watcher-daemon.py # Start with defaults python3 context-watcher-daemon.py --threshold 80 # Export at 80% python3 context-watcher-daemon.py --status # Check status python3 context-watcher-daemon.py --stop # Stop daemon """, ) parser.add_argument( "--threshold", type=int, default=DEFAULT_MIN_CONTEXT_PERCENT, help=f"Context percentage to trigger export (default: {DEFAULT_MIN_CONTEXT_PERCENT})", ) parser.add_argument( "--max-threshold", type=int, default=DEFAULT_MAX_CONTEXT_PERCENT, help=f"Maximum threshold to trigger export (default: {DEFAULT_MAX_CONTEXT_PERCENT})", ) parser.add_argument( "--cooldown", type=int, default=DEFAULT_COOLDOWN_MINUTES, help=f"Minutes between exports (default: {DEFAULT_COOLDOWN_MINUTES})", ) parser.add_argument( "--poll-interval", type=int, default=DEFAULT_POLL_INTERVAL_SECONDS, help=f"Seconds between checks (default: {DEFAULT_POLL_INTERVAL_SECONDS})", ) parser.add_argument( "--status", action="store_true", help="Check daemon status and exit", ) parser.add_argument( "--stop", action="store_true", help="Stop running daemon", )
args = parser.parse_args()
# Handle status check
if args.status:
check_status()
return
# Handle stop
if args.stop:
stop_daemon()
return
# Set global config from args
MIN_CONTEXT_PERCENT = args.threshold
MAX_CONTEXT_PERCENT = args.max_threshold
COOLDOWN_MINUTES = args.cooldown
POLL_INTERVAL_SECONDS = args.poll_interval
# Run the daemon
run_daemon()
if name == "main": main()