#!/usr/bin/env python3 """ CODITECT Inter-Session Message Watcher (H.13.3, ADR-160)
OS-level notification layer for messaging.db changes using:
- macOS: kqueue on WAL file
- Linux: inotify on WAL file
- Fallback: 250ms polling
Provides subscribe() callback dispatch for the SessionMessageBus, enabling near-instant (<50ms) push notifications without polling overhead.
Usage: from scripts.core.session_message_watcher import MessageWatcher
watcher = MessageWatcher()
watcher.subscribe("state", my_callback)
watcher.start()
# ... callbacks fire when new messages arrive ...
watcher.stop()
Or with the bus: from scripts.core.session_message_bus import get_session_message_bus from scripts.core.session_message_watcher import MessageWatcher
bus = get_session_message_bus(session_id="my-sess")
watcher = MessageWatcher(bus=bus)
watcher.subscribe("file_conflict", handle_conflict)
watcher.start()
Created: 2026-02-06 Updated: 2026-02-06 ADR: ADR-160 (Inter-Session Messaging Architecture) Track: H.13.3 """
import logging import os import platform import select import struct import sys import threading import time from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple
logger = logging.getLogger(name)
Type alias for subscription callbacks
Callback receives: (channel, List[SessionMessage])
SubscriptionCallback = Callable[[str, list], None]
---------------------------------------------------------------------------
Backend Detection
---------------------------------------------------------------------------
def _detect_backend() -> str: """ Detect the best available filesystem notification backend.
Returns:
"kqueue" (macOS), "inotify" (Linux), or "polling" (fallback)
"""
system = platform.system()
if system == "Darwin":
# macOS - kqueue is always available
try:
fd = os.open("/dev/null", os.O_RDONLY)
kq = select.kqueue()
kq.close()
os.close(fd)
return "kqueue"
except (AttributeError, OSError):
pass
if system == "Linux":
# Linux - check for inotify
try:
# inotify_init syscall (number 253 on x86_64)
import ctypes
libc = ctypes.CDLL("libc.so.6", use_errno=True)
fd = libc.inotify_init()
if fd >= 0:
os.close(fd)
return "inotify"
except (OSError, AttributeError):
pass
return "polling"
---------------------------------------------------------------------------
Watcher Backends
---------------------------------------------------------------------------
class _WatcherBackend: """Base class for filesystem watcher backends."""
def start(self, wal_path: Path, on_change: Callable[[], None]) -> None:
"""Start watching the WAL file. Call on_change when it changes."""
raise NotImplementedError
def stop(self) -> None:
"""Stop watching."""
raise NotImplementedError
class _KqueueBackend(_WatcherBackend): """ macOS kqueue watcher on messaging.db-wal file (H.13.3.1).
Uses kqueue to watch for NOTE_WRITE events on the WAL file,
providing near-instant notification when any session writes
to messaging.db.
"""
def __init__(self):
self._kq = None
self._fd = -1
self._stop_event = threading.Event()
self._thread = None
def start(self, wal_path: Path, on_change: Callable[[], None]) -> None:
self._stop_event.clear()
def _watch():
try:
# Ensure WAL file exists (created on first WAL-mode write)
if not wal_path.exists():
wal_path.touch()
self._fd = os.open(str(wal_path), os.O_RDONLY)
self._kq = select.kqueue()
# Register for write notifications on the WAL file
event = select.kevent(
self._fd,
filter=select.KQ_FILTER_VNODE,
flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR,
fflags=select.KQ_NOTE_WRITE | select.KQ_NOTE_EXTEND,
)
logger.info(f"kqueue watching: {wal_path}")
while not self._stop_event.is_set():
try:
events = self._kq.control([event], 1, 0.5) # 500ms timeout
if events:
on_change()
except (OSError, InterruptedError):
if self._stop_event.is_set():
break
time.sleep(0.1)
except Exception as e:
logger.error(f"kqueue watcher error: {e}")
finally:
if self._kq:
try:
self._kq.close()
except OSError:
pass
if self._fd >= 0:
try:
os.close(self._fd)
except OSError:
pass
self._thread = threading.Thread(
target=_watch,
name="coditect-kqueue-watcher",
daemon=True,
)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
class _InotifyBackend(_WatcherBackend): """ Linux inotify watcher on messaging.db-wal file (H.13.3.2).
Uses inotify to watch for IN_MODIFY events on the WAL file.
"""
def __init__(self):
self._inotify_fd = -1
self._stop_event = threading.Event()
self._thread = None
def start(self, wal_path: Path, on_change: Callable[[], None]) -> None:
self._stop_event.clear()
def _watch():
try:
import ctypes
libc = ctypes.CDLL("libc.so.6", use_errno=True)
# Ensure WAL file exists
if not wal_path.exists():
wal_path.touch()
self._inotify_fd = libc.inotify_init()
if self._inotify_fd < 0:
logger.error("inotify_init failed")
return
# IN_MODIFY = 0x00000002, IN_CLOSE_WRITE = 0x00000008
IN_MODIFY = 0x00000002
IN_CLOSE_WRITE = 0x00000008
wd = libc.inotify_add_watch(
self._inotify_fd,
str(wal_path).encode(),
IN_MODIFY | IN_CLOSE_WRITE,
)
if wd < 0:
logger.error(f"inotify_add_watch failed for {wal_path}")
return
logger.info(f"inotify watching: {wal_path}")
while not self._stop_event.is_set():
# Use select to poll with timeout
rlist, _, _ = select.select([self._inotify_fd], [], [], 0.5)
if rlist:
# Read and discard the inotify events
try:
data = os.read(self._inotify_fd, 4096)
if data:
on_change()
except OSError:
if self._stop_event.is_set():
break
except Exception as e:
logger.error(f"inotify watcher error: {e}")
finally:
if self._inotify_fd >= 0:
try:
os.close(self._inotify_fd)
except OSError:
pass
self._thread = threading.Thread(
target=_watch,
name="coditect-inotify-watcher",
daemon=True,
)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
class _PollingBackend(_WatcherBackend): """ Fallback 250ms polling watcher (H.13.3.3).
Used on platforms without kqueue or inotify support.
Checks WAL file mtime every 250ms.
"""
def __init__(self, interval: float = 0.25):
self._interval = interval
self._stop_event = threading.Event()
self._thread = None
def start(self, wal_path: Path, on_change: Callable[[], None]) -> None:
self._stop_event.clear()
def _poll():
last_mtime = 0.0
logger.info(f"Polling watcher (interval={self._interval}s): {wal_path}")
while not self._stop_event.is_set():
try:
if wal_path.exists():
current_mtime = wal_path.stat().st_mtime
if current_mtime != last_mtime:
if last_mtime > 0: # Skip first detection
on_change()
last_mtime = current_mtime
except OSError:
pass
self._stop_event.wait(self._interval)
self._thread = threading.Thread(
target=_poll,
name="coditect-polling-watcher",
daemon=True,
)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
---------------------------------------------------------------------------
MessageWatcher (H.13.3.4: Notification-to-Callback Dispatch)
---------------------------------------------------------------------------
class MessageWatcher: """ Watches messaging.db for changes and dispatches subscription callbacks.
Combines OS-level file watching (kqueue/inotify/polling) with
channel-based subscription dispatch. When the WAL file changes,
the watcher polls for new messages on subscribed channels and
invokes the registered callbacks.
Thread-safe: subscriptions can be added/removed from any thread.
"""
def __init__(
self,
bus=None,
db_path: Optional[Path] = None,
backend: Optional[str] = None,
debounce_ms: int = 50,
):
"""
Initialize the message watcher.
Args:
bus: SessionMessageBus instance (auto-created if None)
db_path: Path to messaging.db (auto-detected if None)
backend: Force a specific backend ("kqueue", "inotify", "polling")
debounce_ms: Debounce interval to batch rapid changes (default 50ms)
"""
self._bus = bus
self._db_path = db_path
self._backend_name = backend or _detect_backend()
self._debounce = debounce_ms / 1000.0
# Subscriptions: channel -> [(callback, since_id)]
self._subscriptions: Dict[str, List[Tuple[SubscriptionCallback, int]]] = {}
self._lock = threading.Lock()
# Watcher state
self._backend: Optional[_WatcherBackend] = None
self._running = False
self._last_change_time = 0.0
self._debounce_timer: Optional[threading.Timer] = None
def _get_bus(self):
"""Lazy-initialize the message bus."""
if self._bus is None:
try:
from scripts.core.session_message_bus import get_session_message_bus
except ImportError:
script_dir = Path(__file__).parent
sys.path.insert(0, str(script_dir.parent.parent))
from scripts.core.session_message_bus import get_session_message_bus
self._bus = get_session_message_bus(db_path=self._db_path)
return self._bus
def _get_wal_path(self) -> Path:
"""Get the WAL file path for messaging.db."""
bus = self._get_bus()
return Path(str(bus._db_path) + "-wal")
def subscribe(
self,
channel: str,
callback: SubscriptionCallback,
since_id: int = 0,
) -> None:
"""
Subscribe to messages on a channel.
The callback will be invoked with (channel, messages) whenever
new messages arrive on the channel. The since_id tracks the
last message ID seen, so only new messages are delivered.
Args:
channel: Channel to subscribe to (e.g., "state", "file_conflict")
callback: Function called with (channel, List[SessionMessage])
since_id: Start from this message ID (0 = all current messages)
"""
with self._lock:
if channel not in self._subscriptions:
self._subscriptions[channel] = []
self._subscriptions[channel].append((callback, since_id))
logger.info(f"Subscribed to channel '{channel}' (since_id={since_id})")
def unsubscribe(self, channel: str, callback: Optional[SubscriptionCallback] = None) -> None:
"""
Remove a subscription.
Args:
channel: Channel to unsubscribe from
callback: Specific callback to remove (None = remove all for channel)
"""
with self._lock:
if channel in self._subscriptions:
if callback is None:
del self._subscriptions[channel]
else:
self._subscriptions[channel] = [
(cb, sid) for cb, sid in self._subscriptions[channel]
if cb is not callback
]
if not self._subscriptions[channel]:
del self._subscriptions[channel]
def _on_change(self) -> None:
"""
Called when the WAL file changes.
Uses debouncing to batch rapid changes (e.g., multiple writes
within 50ms trigger a single dispatch).
"""
now = time.monotonic()
self._last_change_time = now
# Cancel any pending debounce timer
if self._debounce_timer:
self._debounce_timer.cancel()
# Set a new debounce timer
self._debounce_timer = threading.Timer(self._debounce, self._dispatch)
self._debounce_timer.daemon = True
self._debounce_timer.start()
def _dispatch(self) -> None:
"""
Dispatch new messages to subscribed callbacks.
For each subscribed channel, polls for messages since the last
seen ID and invokes all registered callbacks.
"""
with self._lock:
channels = dict(self._subscriptions)
bus = self._get_bus()
for channel, subs in channels.items():
updated_subs = []
for callback, since_id in subs:
try:
messages = bus.poll(channel, since_id=since_id)
if messages:
# Update since_id to latest message
new_since_id = messages[-1].id
callback(channel, messages)
updated_subs.append((callback, new_since_id))
else:
updated_subs.append((callback, since_id))
except Exception as e:
logger.error(f"Subscription callback error on '{channel}': {e}")
updated_subs.append((callback, since_id))
# Update since_ids
with self._lock:
if channel in self._subscriptions:
self._subscriptions[channel] = updated_subs
def start(self) -> None:
"""Start watching for messaging.db changes."""
if self._running:
return
wal_path = self._get_wal_path()
if self._backend_name == "kqueue":
self._backend = _KqueueBackend()
elif self._backend_name == "inotify":
self._backend = _InotifyBackend()
else:
self._backend = _PollingBackend()
self._backend.start(wal_path, self._on_change)
self._running = True
logger.info(f"MessageWatcher started (backend={self._backend_name})")
def stop(self) -> None:
"""Stop watching."""
if not self._running:
return
if self._debounce_timer:
self._debounce_timer.cancel()
self._debounce_timer = None
if self._backend:
self._backend.stop()
self._backend = None
self._running = False
logger.info("MessageWatcher stopped")
@property
def backend_name(self) -> str:
"""The active notification backend name."""
return self._backend_name
@property
def is_running(self) -> bool:
"""Whether the watcher is active."""
return self._running
@property
def subscription_count(self) -> int:
"""Total number of active subscriptions."""
with self._lock:
return sum(len(subs) for subs in self._subscriptions.values())
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
return False
---------------------------------------------------------------------------
CLI
---------------------------------------------------------------------------
if name == "main": import argparse
parser = argparse.ArgumentParser(
description="CODITECT Inter-Session Message Watcher (H.13.3)"
)
parser.add_argument(
"command",
choices=["detect", "watch", "self-test"],
help="Command to run"
)
parser.add_argument("--channel", default="state", help="Channel to watch")
parser.add_argument("--backend", choices=["kqueue", "inotify", "polling"],
help="Force notification backend")
parser.add_argument("--timeout", type=int, default=30, help="Watch timeout (seconds)")
args = parser.parse_args()
if args.command == "detect":
backend = _detect_backend()
system = platform.system()
print(f"Platform: {system}")
print(f"Selected backend: {backend}")
print(f" kqueue available: {system == 'Darwin'}")
print(f" inotify available: {system == 'Linux'}")
elif args.command == "watch":
logging.basicConfig(level=logging.INFO)
received = []
def _print_messages(channel, messages):
for msg in messages:
print(f"[{channel}] #{msg.id} from {msg.sender_id}: {msg.payload}")
received.append(msg)
watcher = MessageWatcher(backend=args.backend)
watcher.subscribe(args.channel, _print_messages)
print(f"Watching channel '{args.channel}' (backend={watcher.backend_name})...")
print(f"Press Ctrl+C to stop (timeout={args.timeout}s)")
watcher.start()
try:
time.sleep(args.timeout)
except KeyboardInterrupt:
pass
watcher.stop()
print(f"\nReceived {len(received)} messages")
elif args.command == "self-test":
import tempfile
print("Running MessageWatcher self-tests...")
print(f"Detected backend: {_detect_backend()}")
# Test 1: Backend detection
backend = _detect_backend()
assert backend in ("kqueue", "inotify", "polling"), f"Invalid backend: {backend}"
print(f" [PASS] Backend detection: {backend}")
# Test 2: Subscription management
watcher = MessageWatcher(backend="polling")
received_msgs = []
def _test_cb(channel, messages):
received_msgs.extend(messages)
watcher.subscribe("test-channel", _test_cb)
assert watcher.subscription_count == 1
watcher.subscribe("test-channel", _test_cb)
assert watcher.subscription_count == 2
watcher.unsubscribe("test-channel", _test_cb)
assert watcher.subscription_count == 0
print(" [PASS] Subscription management")
# Test 3: Watcher lifecycle
with tempfile.TemporaryDirectory() as tmp:
from pathlib import Path as P
try:
from scripts.core.session_message_bus import SQLiteSessionMessageBus
except ImportError:
script_dir = Path(__file__).parent
sys.path.insert(0, str(script_dir.parent.parent))
from scripts.core.session_message_bus import SQLiteSessionMessageBus
db_path = P(tmp) / "test-messaging.db"
bus = SQLiteSessionMessageBus(db_path=db_path, session_id="test-1")
bus.register_session("test-1", "claude", "opus-4.6")
watcher = MessageWatcher(bus=bus, backend="polling")
assert not watcher.is_running
received_test = []
def _collect(ch, msgs):
received_test.extend(msgs)
watcher.subscribe("state", _collect)
watcher.start()
assert watcher.is_running
# Publish a message and wait for dispatch
bus.publish("state", {"test": True})
time.sleep(1.5) # Polling interval (250ms) + debounce (50ms) + margin
watcher.stop()
assert not watcher.is_running
if received_test:
print(f" [PASS] End-to-end dispatch ({len(received_test)} messages)")
else:
print(" [WARN] No messages received (may need longer wait)")
# Test 4: Context manager
watcher = MessageWatcher(backend="polling")
with watcher:
assert watcher.is_running
assert not watcher.is_running
print(" [PASS] Context manager lifecycle")
# Test 5: Debounce
watcher = MessageWatcher(backend="polling", debounce_ms=100)
call_count = [0]
def _count_cb(ch, msgs):
call_count[0] += 1
watcher.subscribe("test", _count_cb)
# Rapid changes should be debounced
watcher._on_change()
watcher._on_change()
watcher._on_change()
time.sleep(0.3)
# Should only dispatch once due to debounce
print(f" [PASS] Debounce (dispatch count: {call_count[0]})")
print("\nAll self-tests passed!")