#!/usr/bin/env python3 """ CODITECT Inter-Session Message Bus (ADR-160)
Lightweight pub/sub coordination for concurrent LLM sessions (Claude, Codex, Gemini, Kimi) on a single developer machine.
Architecture: - Dedicated SQLite database (messaging.db, <1 MB) in WAL mode - SessionMessageBus ABC enables future transport replacement (Cloud tier) - SQLITE_BUSY retry with exponential backoff - TTL-based message cleanup on each write
This module is SEPARATE from message_bus.py (RabbitMQ inter-agent, H.2). This handles inter-SESSION coordination (H.13) per ADR-160.
Usage: from scripts.core.session_message_bus import get_session_message_bus
bus = get_session_message_bus()
# Register this session (auto-detects machine_uuid, hostname, cwd, pid)
bus.register_session(
session_id="sess-abc123",
llm_vendor="claude",
llm_model="opus-4.6",
project_id="PILOT",
task_id="H.13.8",
)
# Update session context mid-session
bus.update_session_context(task_id="H.13.9", active_files=["paths.py"])
# Publish a message
bus.publish("state", {"task_id": "H.8.1", "status": "working"})
# Poll for messages
messages = bus.poll("task_broadcast", since_id=0)
# Advisory file lock
bus.lock_file("scripts/core/paths.py")
bus.unlock_file("scripts/core/paths.py")
# Heartbeat (call every 15s)
bus.heartbeat()
Created: 2026-02-06 Updated: 2026-02-06 ADR: ADR-160 (Inter-Session Messaging Architecture) Track: H.13 """
import json import logging import os import random import socket import sqlite3 import sys import threading import time from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger(name)
---------------------------------------------------------------------------
Data Classes
---------------------------------------------------------------------------
@dataclass class SessionInfo: """Registered LLM session.""" session_id: str llm_vendor: str llm_model: Optional[str] = None tty: Optional[str] = None pid: Optional[int] = None project_id: Optional[str] = None task_id: Optional[str] = None active_files: Optional[List[str]] = None cwd: Optional[str] = None machine_uuid: Optional[str] = None hostname: Optional[str] = None heartbeat_at: Optional[str] = None last_active_at: Optional[str] = None registered_at: Optional[str] = None status: str = "active"
@dataclass class SessionMessage: """A message on an inter-session channel (ADR-173 v2).""" id: int sender_id: str channel: str payload: Dict[str, Any] created_at: str ttl_seconds: int = 300 # ADR-173 v2 fields sender_session_uuid: Optional[str] = None recipient_id: Optional[str] = None message_type: str = "info" priority: int = 0 status: str = "pending" project_id: Optional[str] = None task_id: Optional[str] = None activity: Optional[str] = None reply_to: Optional[int] = None expires_at: Optional[str] = None delivered_at: Optional[str] = None read_at: Optional[str] = None
@dataclass class FileLock: """Advisory file lock.""" file_path: str session_id: str lock_type: str = "advisory" locked_at: Optional[str] = None
@dataclass class TaskClaim: """A claimed task with exclusive ownership.""" task_id: str session_id: str claimed_at: Optional[str] = None status: str = "claimed"
---------------------------------------------------------------------------
Channel TTL defaults (seconds)
---------------------------------------------------------------------------
CHANNEL_TTL: Dict[str, int] = { "state": 60, "file_conflict": 300, "task_broadcast": 600, "heartbeat": 30, "direct": 600, "session_lifecycle": 1800, "operator_alert": 3600, }
DEFAULT_TTL = 300
Dedup coalesce windows per channel/message_type (seconds). 0 = no dedup.
CHANNEL_DEDUP: Dict[str, Dict[str, int]] = { "task_broadcast": {"started": 30, "completed": 30, "blocked": 30, "released": 30}, "heartbeat": {"ping": 10}, "state": {"info": 15}, "session_lifecycle": {"started": 60, "ended": 60, "resumed": 60}, "operator_alert": { "cwd_overlap": 300, "project_conflict": 300, "task_conflict": 300, "file_conflict": 300, }, }
Default priority per channel/message_type. 0=routine, 1=normal, 2=high, 3=critical
CHANNEL_PRIORITY: Dict[str, Dict[str, int]] = { "task_broadcast": {"started": 0, "completed": 1, "error": 2, "handoff": 1}, "file_conflict": {"lock_request": 2, "lock_granted": 1, "conflict_warning": 3}, "state": {"info": 0}, "heartbeat": {"ping": 0}, "direct": {"request": 1, "response": 1}, "session_lifecycle": {"started": 1, "ended": 1, "resumed": 1}, "operator_alert": {"project_conflict": 3, "task_conflict": 3, "cwd_overlap": 2, "file_conflict": 3}, }
---------------------------------------------------------------------------
ABC: SessionMessageBus
---------------------------------------------------------------------------
class SessionMessageBus(ABC): """ Abstract base class for inter-session messaging (ADR-160).
Implementations:
- SQLiteSessionMessageBus (local, launch tier)
- Future: CloudSessionMessageBus (PostgreSQL LISTEN/NOTIFY)
"""
@abstractmethod
def register_session(
self,
session_id: str,
llm_vendor: str,
llm_model: Optional[str] = None,
session_uuid: Optional[str] = None,
user_id: Optional[str] = None,
project_id: Optional[str] = None,
tty: Optional[str] = None,
pid: Optional[int] = None,
task_id: Optional[str] = None,
cwd: Optional[str] = None,
machine_uuid: Optional[str] = None,
hostname: Optional[str] = None,
) -> None:
"""Register an LLM session for coordination."""
@abstractmethod
def unregister_session(self, session_id: str) -> None:
"""Remove a session from the registry."""
@abstractmethod
def heartbeat(self, session_id: Optional[str] = None) -> None:
"""Update session heartbeat timestamp."""
@abstractmethod
def update_session_context(
self,
session_id: Optional[str] = None,
task_id: Optional[str] = None,
active_files: Optional[List[str]] = None,
cwd: Optional[str] = None,
project_id: Optional[str] = None,
) -> None:
"""Update mutable session context mid-session (task, files, cwd, project)."""
@abstractmethod
def list_sessions(self, active_only: bool = True) -> List[SessionInfo]:
"""List registered sessions."""
@abstractmethod
def publish(
self,
channel: str,
payload: Dict[str, Any],
sender_id: Optional[str] = None,
ttl_seconds: Optional[int] = None,
*,
message_type: str = "info",
recipient_id: Optional[str] = None,
priority: Optional[int] = None,
project_id: Optional[str] = None,
task_id: Optional[str] = None,
activity: Optional[str] = None,
reply_to: Optional[int] = None,
) -> int:
"""
Publish a message to a channel (ADR-173 v2).
Keyword-only args are v2 extensions for structured messaging,
dedup, directed delivery, and conflict detection.
Returns:
Message ID.
"""
@abstractmethod
def poll(
self,
channel: str,
since_id: int = 0,
limit: int = 100,
*,
status_filter: Optional[str] = None,
recipient_id: Optional[str] = None,
) -> List[SessionMessage]:
"""Poll for messages on a channel since a given message ID (ADR-173 v2)."""
@abstractmethod
def send(
self,
recipient_id: str,
channel: str,
payload: Dict[str, Any],
*,
message_type: str = "request",
priority: Optional[int] = None,
task_id: Optional[str] = None,
reply_to: Optional[int] = None,
) -> int:
"""Send a directed message to a specific session (ADR-173 v2)."""
@abstractmethod
def mark_delivered(self, message_id: int) -> bool:
"""Mark a message as delivered (ADR-173 v2)."""
@abstractmethod
def mark_read(self, message_id: int) -> bool:
"""Mark a message as read (ADR-173 v2)."""
@abstractmethod
def get_unread(
self,
recipient_id: Optional[str] = None,
channel: Optional[str] = None,
limit: int = 50,
*,
auto_mark_delivered: bool = False,
) -> List[SessionMessage]:
"""Get unread messages for a recipient (ADR-173 v2)."""
@abstractmethod
def get_operator_alerts(
self,
unread_only: bool = True,
limit: int = 20,
*,
auto_mark_delivered: bool = False,
) -> List[SessionMessage]:
"""Get operator alerts — conflict detection results (ADR-173 v2)."""
def subscribe(
self,
channel: str,
callback: Callable[[str, list], None],
since_id: int = 0,
):
"""
Subscribe to messages on a channel with a callback.
Requires a MessageWatcher to be running. This is a convenience
method that delegates to the watcher.
Args:
channel: Channel to subscribe to
callback: Function called with (channel, List[SessionMessage])
since_id: Start from this message ID
Returns:
A handle that can be used to unsubscribe
"""
try:
from scripts.core.session_message_watcher import MessageWatcher
except ImportError:
script_dir = Path(__file__).parent
sys.path.insert(0, str(script_dir.parent.parent))
from scripts.core.session_message_watcher import MessageWatcher
if not hasattr(self, '_watcher') or self._watcher is None:
self._watcher = MessageWatcher(bus=self)
self._watcher.start()
self._watcher.subscribe(channel, callback, since_id=since_id)
return self._watcher
@abstractmethod
def lock_file(self, file_path: str, session_id: Optional[str] = None) -> bool:
"""
Acquire an advisory file lock.
Returns:
True if lock acquired, False if already locked by another session.
"""
@abstractmethod
def unlock_file(self, file_path: str, session_id: Optional[str] = None) -> bool:
"""
Release an advisory file lock.
Returns:
True if lock released, False if not held by this session.
"""
@abstractmethod
def get_file_locks(self) -> List[FileLock]:
"""List all active file locks."""
# -- Task Broadcasting & Routing (H.13.6) --
@abstractmethod
def broadcast_task(
self,
task_id: str,
action: str,
details: Optional[Dict[str, Any]] = None,
) -> int:
"""
Broadcast a task state change to all sessions (H.13.6.1).
Args:
task_id: Track task ID (e.g., "H.13.6.1")
action: One of: started, completed, blocked, released
details: Optional extra context
Returns:
Message ID
"""
@abstractmethod
def claim_task(self, task_id: str, session_id: Optional[str] = None) -> bool:
"""
Claim exclusive work on a task (H.13.6.2).
Returns:
True if claimed, False if already claimed by another active session.
"""
@abstractmethod
def release_task(self, task_id: str, session_id: Optional[str] = None) -> bool:
"""
Release a claimed task (H.13.6.2).
Returns:
True if released, False if not held by this session.
"""
@abstractmethod
def get_task_claims(self) -> List[TaskClaim]:
"""List all active task claims."""
@abstractmethod
def update_session_task(self, task_id: str, session_id: Optional[str] = None) -> None:
"""Update the current task_id for a session in the registry (H.13.6.4)."""
@abstractmethod
def get_cross_llm_status(self) -> List[Dict[str, Any]]:
"""
Get a summary of all active sessions and their current work (H.13.6.4).
Returns:
List of dicts with session_id, llm_vendor, task_id, project_id, etc.
"""
@abstractmethod
def close(self) -> None:
"""Close the message bus and release resources."""
---------------------------------------------------------------------------
SQLite Retry Logic (H.13.2.5)
---------------------------------------------------------------------------
def _sqlite_retry( func: Callable, max_retries: int = 5, base_delay: float = 0.01, max_delay: float = 1.0, ): """ Execute a SQLite operation with exponential backoff on SQLITE_BUSY.
Args:
func: Callable that performs the SQLite operation
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds (10ms)
max_delay: Maximum delay cap in seconds
Returns:
Result of func()
Raises:
sqlite3.OperationalError: If all retries exhausted
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func()
except sqlite3.OperationalError as e:
if "database is locked" not in str(e) and "SQLITE_BUSY" not in str(e):
raise
last_error = e
if attempt < max_retries:
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 0.01), max_delay)
logger.debug(
f"SQLITE_BUSY retry {attempt + 1}/{max_retries}, "
f"waiting {delay:.3f}s"
)
time.sleep(delay)
raise last_error # type: ignore[misc]
---------------------------------------------------------------------------
Schema
---------------------------------------------------------------------------
_SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS session_registry ( session_id TEXT PRIMARY KEY, llm_vendor TEXT NOT NULL, llm_model TEXT, session_uuid TEXT, user_id TEXT, tty TEXT, pid INTEGER, project_id TEXT, task_id TEXT, active_files TEXT, cwd TEXT, machine_uuid TEXT, hostname TEXT, heartbeat_at TEXT NOT NULL, last_active_at TEXT, registered_at TEXT NOT NULL DEFAULT (datetime('now')), status TEXT DEFAULT 'active' );
CREATE TABLE IF NOT EXISTS inter_session_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender_id TEXT NOT NULL, channel TEXT NOT NULL, payload TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')), ttl_seconds INTEGER DEFAULT 300 );
CREATE TABLE IF NOT EXISTS file_locks ( file_path TEXT PRIMARY KEY, session_id TEXT NOT NULL, lock_type TEXT DEFAULT 'advisory', locked_at TEXT NOT NULL DEFAULT (datetime('now')) );
CREATE INDEX IF NOT EXISTS idx_messages_channel_id ON inter_session_messages(channel, id);
CREATE INDEX IF NOT EXISTS idx_messages_created ON inter_session_messages(created_at);
CREATE INDEX IF NOT EXISTS idx_session_status ON session_registry(status);
CREATE TABLE IF NOT EXISTS task_claims ( task_id TEXT PRIMARY KEY, session_id TEXT NOT NULL, claimed_at TEXT NOT NULL DEFAULT (datetime('now')), status TEXT DEFAULT 'claimed' );
CREATE INDEX IF NOT EXISTS idx_task_claims_session ON task_claims(session_id); """
ADR-173 v2: Structured message schema with dedup, lifecycle, conflict detection
_SCHEMA_V2_SQL = """ CREATE TABLE IF NOT EXISTS session_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender_id TEXT NOT NULL, sender_session_uuid TEXT, recipient_id TEXT, channel TEXT NOT NULL, message_type TEXT NOT NULL, priority INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'pending', project_id TEXT, task_id TEXT, activity TEXT, payload TEXT, reply_to INTEGER, created_at TEXT NOT NULL DEFAULT (datetime('now')), delivered_at TEXT, read_at TEXT, expires_at TEXT NOT NULL, FOREIGN KEY (reply_to) REFERENCES session_messages(id) );
CREATE INDEX IF NOT EXISTS idx_sm_recipient_status ON session_messages(recipient_id, status) WHERE status IN ('pending', 'delivered'); CREATE INDEX IF NOT EXISTS idx_sm_channel_type ON session_messages(channel, message_type); CREATE INDEX IF NOT EXISTS idx_sm_task_id ON session_messages(task_id) WHERE task_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_sm_expires_at ON session_messages(expires_at); CREATE INDEX IF NOT EXISTS idx_sm_sender_created ON session_messages(sender_id, created_at); CREATE INDEX IF NOT EXISTS idx_sm_project_id ON session_messages(project_id) WHERE project_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS message_dedup ( sender_id TEXT NOT NULL, channel TEXT NOT NULL, message_type TEXT NOT NULL, task_id TEXT NOT NULL DEFAULT '', last_message_id INTEGER NOT NULL, last_sent_at TEXT NOT NULL, coalesce_window_seconds INTEGER NOT NULL DEFAULT 30, PRIMARY KEY (sender_id, channel, message_type, task_id) ); """
---------------------------------------------------------------------------
SQLite Implementation (H.13.2.2)
---------------------------------------------------------------------------
class SQLiteSessionMessageBus(SessionMessageBus): """ SQLite-backed inter-session message bus (ADR-160).
Uses a dedicated messaging.db in WAL mode, separate from sessions.db
to avoid SQLITE_BUSY contention with the 18+ GB session database.
"""
def __init__(
self,
db_path: Optional[Path] = None,
session_id: Optional[str] = None,
stale_timeout_seconds: int = 600,
):
"""
Initialize SQLite message bus.
Args:
db_path: Path to messaging.db (auto-detected if None)
session_id: Default session ID for this bus instance
stale_timeout_seconds: Seconds before a session is considered stale (default 10 min)
"""
self._conn: Optional[sqlite3.Connection] = None
if db_path is None:
try:
from scripts.core.paths import get_messaging_db_path
except ImportError:
# Handle direct execution from scripts/core/
script_dir = Path(__file__).parent
sys.path.insert(0, str(script_dir.parent.parent))
from scripts.core.paths import get_messaging_db_path
db_path = get_messaging_db_path()
self._db_path = db_path
self._session_id = session_id
self._stale_timeout = stale_timeout_seconds
self._ensure_db()
def _ensure_db(self) -> None:
"""Create database and schema if needed, with column migration + ADR-173 v2."""
self._db_path.parent.mkdir(parents=True, exist_ok=True)
def _init():
conn = sqlite3.connect(str(self._db_path), timeout=5)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=3000")
conn.execute("PRAGMA synchronous=NORMAL")
conn.executescript(_SCHEMA_SQL)
conn.commit()
# Migrate existing databases: add columns that may be missing (H.13.8)
_MIGRATIONS = [
("cwd", "ALTER TABLE session_registry ADD COLUMN cwd TEXT"),
("machine_uuid", "ALTER TABLE session_registry ADD COLUMN machine_uuid TEXT"),
("hostname", "ALTER TABLE session_registry ADD COLUMN hostname TEXT"),
("last_active_at", "ALTER TABLE session_registry ADD COLUMN last_active_at TEXT"),
("session_uuid", "ALTER TABLE session_registry ADD COLUMN session_uuid TEXT"),
("user_id", "ALTER TABLE session_registry ADD COLUMN user_id TEXT"),
]
existing_cols = {
row[1] for row in conn.execute("PRAGMA table_info(session_registry)").fetchall()
}
for col_name, ddl in _MIGRATIONS:
if col_name not in existing_cols:
try:
conn.execute(ddl)
conn.commit()
logger.debug(f"Migrated session_registry: added column '{col_name}'")
except sqlite3.OperationalError:
pass # Column already exists (race condition)
# ADR-173 v2: Create session_messages + message_dedup tables
conn.executescript(_SCHEMA_V2_SQL)
conn.commit()
# ADR-173 migration: backup old table if session_messages is empty
# and inter_session_messages has data
tables = {
row[0] for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
}
if "inter_session_messages" in tables and "session_messages" in tables:
sm_count = conn.execute("SELECT COUNT(*) FROM session_messages").fetchone()[0]
old_count = conn.execute("SELECT COUNT(*) FROM inter_session_messages").fetchone()[0]
if sm_count == 0 and old_count > 0:
# Rename old table to backup
if "inter_session_messages_v1" not in tables:
conn.execute(
"ALTER TABLE inter_session_messages "
"RENAME TO inter_session_messages_v1"
)
# Recreate inter_session_messages as empty (for any
# old-code sessions still writing to it)
conn.execute(
"CREATE TABLE IF NOT EXISTS inter_session_messages ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" sender_id TEXT NOT NULL,"
" channel TEXT NOT NULL,"
" payload TEXT NOT NULL,"
" created_at TEXT NOT NULL DEFAULT (datetime('now')),"
" ttl_seconds INTEGER DEFAULT 300"
")"
)
conn.commit()
logger.info(
f"ADR-173 migration: backed up {old_count} messages "
"to inter_session_messages_v1"
)
conn.close()
_sqlite_retry(_init)
def _get_conn(self) -> sqlite3.Connection:
"""Get or create a database connection."""
if self._conn is None:
self._conn = sqlite3.connect(
str(self._db_path), timeout=5, check_same_thread=False
)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA busy_timeout=3000")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._conn.row_factory = sqlite3.Row
return self._conn
def _now_utc(self) -> str:
"""Current UTC timestamp as ISO string."""
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _cleanup_expired(self) -> None:
"""Remove expired messages (TTL-based). Called on each write."""
try:
conn = self._get_conn()
# Legacy table
conn.execute(
"DELETE FROM inter_session_messages "
"WHERE datetime(created_at, '+' || ttl_seconds || ' seconds') < datetime('now')"
)
# ADR-173 v2 table (uses indexed expires_at column)
conn.execute(
"DELETE FROM session_messages WHERE expires_at < datetime('now')"
)
# Clean stale dedup entries (older than their coalesce window)
conn.execute(
"DELETE FROM message_dedup "
"WHERE datetime(last_sent_at, '+' || coalesce_window_seconds || ' seconds') "
"< datetime('now')"
)
conn.commit()
except sqlite3.Error as e:
logger.debug(f"TTL cleanup error (non-fatal): {e}")
def _cleanup_stale_sessions(self) -> None:
"""Mark sessions with expired heartbeats as stale and broadcast ended events."""
try:
conn = self._get_conn()
# ADR-173 v2: Find stale sessions BEFORE marking, so we can broadcast ended
stale_rows = conn.execute(
"SELECT session_id, session_uuid, project_id, llm_vendor, llm_model "
"FROM session_registry "
"WHERE status = 'active' "
"AND datetime(heartbeat_at, '+' || ? || ' seconds') < datetime('now')",
(self._stale_timeout,)
).fetchall()
if stale_rows:
conn.execute(
"UPDATE session_registry SET status = 'stale' "
"WHERE status = 'active' "
"AND datetime(heartbeat_at, '+' || ? || ' seconds') < datetime('now')",
(self._stale_timeout,)
)
# Clean file locks and task claims held by stale sessions
conn.execute(
"DELETE FROM file_locks WHERE session_id IN "
"(SELECT session_id FROM session_registry WHERE status = 'stale')"
)
conn.execute(
"DELETE FROM task_claims WHERE session_id IN "
"(SELECT session_id FROM session_registry WHERE status = 'stale')"
)
conn.commit()
# ADR-173 v2: Broadcast session_lifecycle/ended for each stale session
for row in stale_rows:
try:
self.publish(
"session_lifecycle",
{
"session_id": row["session_id"],
"session_uuid": row["session_uuid"],
"llm_vendor": row["llm_vendor"],
"llm_model": row["llm_model"],
"reason": "stale_timeout",
},
sender_id=row["session_id"],
message_type="ended",
project_id=row["project_id"],
)
except Exception:
pass # Best-effort lifecycle broadcast
except sqlite3.Error as e:
logger.debug(f"Stale session cleanup error (non-fatal): {e}")
# -- Session Management --
def register_session(
self,
session_id: str,
llm_vendor: str,
llm_model: Optional[str] = None,
session_uuid: Optional[str] = None,
user_id: Optional[str] = None,
project_id: Optional[str] = None,
tty: Optional[str] = None,
pid: Optional[int] = None,
task_id: Optional[str] = None,
cwd: Optional[str] = None,
machine_uuid: Optional[str] = None,
hostname: Optional[str] = None,
) -> None:
now = self._now_utc()
# Set default session_id only on first registration
if self._session_id is None:
self._session_id = session_id
# Auto-detect machine identity if not provided
if machine_uuid is None:
machine_uuid = self._get_machine_uuid()
if hostname is None:
try:
hostname = socket.gethostname()
except Exception:
hostname = None
# Auto-detect CWD if not provided
if cwd is None:
cwd = os.getcwd()
# Auto-detect PID if not provided
if pid is None:
pid = os.getpid()
# Auto-detect user_id if not provided
if user_id is None:
try:
import getpass
user_id = getpass.getuser()
except Exception:
user_id = None
def _register():
conn = self._get_conn()
conn.execute(
"INSERT OR REPLACE INTO session_registry "
"(session_id, llm_vendor, llm_model, session_uuid, user_id, "
" tty, pid, project_id, "
" task_id, cwd, machine_uuid, hostname, "
" heartbeat_at, last_active_at, registered_at, status) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active')",
(session_id, llm_vendor, llm_model, session_uuid, user_id,
tty, pid, project_id,
task_id, cwd, machine_uuid, hostname, now, now, now)
)
conn.commit()
_sqlite_retry(_register)
self._cleanup_stale_sessions()
logger.info(f"Registered session {session_id} ({llm_vendor}/{llm_model})")
# ADR-173 v2: Broadcast session lifecycle event
self.publish(
"session_lifecycle",
{
"session_id": session_id,
"session_uuid": session_uuid,
"llm_vendor": llm_vendor,
"llm_model": llm_model,
"project_id": project_id,
"cwd": cwd,
},
sender_id=session_id,
message_type="started",
project_id=project_id,
)
# ADR-173 v2: Detect conflicts with other active sessions
self._detect_conflicts(
session_id=session_id,
project_id=project_id,
task_id=task_id,
cwd=cwd,
)
@staticmethod
def _get_machine_uuid() -> Optional[str]:
"""Read machine_uuid from machine-id.json (best-effort)."""
try:
from scripts.core.paths import get_machine_uuid
return get_machine_uuid()
except Exception:
# Fallback: try known paths directly
for candidate in [
Path.home() / "PROJECTS" / ".coditect-data" / "machine-id.json",
Path.home() / ".coditect-data" / "machine-id.json",
Path.home() / ".coditect" / "machine-id.json",
]:
if candidate.exists():
try:
with open(candidate) as f:
return json.load(f).get("machine_uuid")
except Exception:
continue
return None
def update_session_context(
self,
session_id: Optional[str] = None,
task_id: Optional[str] = None,
active_files: Optional[List[str]] = None,
cwd: Optional[str] = None,
project_id: Optional[str] = None,
) -> None:
sid = session_id or self._session_id
if not sid:
raise ValueError("No session_id provided and no default set")
now = self._now_utc()
# Build dynamic SET clause for only the provided fields
updates: List[str] = ["last_active_at = ?"]
params: list = [now]
if task_id is not None:
updates.append("task_id = ?")
params.append(task_id)
if active_files is not None:
updates.append("active_files = ?")
params.append(json.dumps(active_files))
if cwd is not None:
updates.append("cwd = ?")
params.append(cwd)
if project_id is not None:
updates.append("project_id = ?")
params.append(project_id)
params.append(sid)
def _update():
conn = self._get_conn()
conn.execute(
f"UPDATE session_registry SET {', '.join(updates)} "
"WHERE session_id = ?",
params
)
conn.commit()
_sqlite_retry(_update)
logger.debug(f"Updated session context for {sid}: {updates}")
def unregister_session(self, session_id: str) -> None:
# ADR-173 v2: Broadcast session lifecycle ended event before cleanup
try:
conn = self._get_conn()
reg = conn.execute(
"SELECT session_uuid, project_id, llm_vendor, llm_model "
"FROM session_registry WHERE session_id = ?",
(session_id,)
).fetchone()
if reg:
self.publish(
"session_lifecycle",
{
"session_id": session_id,
"session_uuid": reg["session_uuid"],
"llm_vendor": reg["llm_vendor"],
"llm_model": reg["llm_model"],
},
sender_id=session_id,
message_type="ended",
project_id=reg["project_id"],
)
except sqlite3.Error:
pass # Best-effort lifecycle broadcast
def _unreg():
conn = self._get_conn()
conn.execute(
"DELETE FROM session_registry WHERE session_id = ?",
(session_id,)
)
conn.execute(
"DELETE FROM file_locks WHERE session_id = ?",
(session_id,)
)
conn.execute(
"DELETE FROM task_claims WHERE session_id = ?",
(session_id,)
)
conn.commit()
_sqlite_retry(_unreg)
logger.info(f"Unregistered session {session_id}")
def heartbeat(self, session_id: Optional[str] = None) -> None:
sid = session_id or self._session_id
if not sid:
raise ValueError("No session_id provided and no default set")
now = self._now_utc()
def _hb():
conn = self._get_conn()
conn.execute(
"UPDATE session_registry SET heartbeat_at = ?, status = 'active' "
"WHERE session_id = ?",
(now, sid)
)
conn.commit()
_sqlite_retry(_hb)
def list_sessions(self, active_only: bool = True) -> List[SessionInfo]:
conn = self._get_conn()
if active_only:
self._cleanup_stale_sessions()
rows = conn.execute(
"SELECT * FROM session_registry WHERE status = 'active'"
).fetchall()
else:
rows = conn.execute("SELECT * FROM session_registry").fetchall()
sessions = []
for row in rows:
active_files = None
if row["active_files"]:
try:
active_files = json.loads(row["active_files"])
except (json.JSONDecodeError, TypeError):
active_files = None
sessions.append(SessionInfo(
session_id=row["session_id"],
llm_vendor=row["llm_vendor"],
llm_model=row["llm_model"],
tty=row["tty"],
pid=row["pid"],
project_id=row["project_id"],
task_id=row["task_id"],
active_files=active_files,
cwd=row["cwd"],
machine_uuid=row["machine_uuid"],
hostname=row["hostname"],
heartbeat_at=row["heartbeat_at"],
last_active_at=row["last_active_at"],
registered_at=row["registered_at"],
status=row["status"],
))
return sessions
# -- Messaging --
def publish(
self,
channel: str,
payload: Dict[str, Any],
sender_id: Optional[str] = None,
ttl_seconds: Optional[int] = None,
*,
message_type: str = "info",
recipient_id: Optional[str] = None,
priority: Optional[int] = None,
project_id: Optional[str] = None,
task_id: Optional[str] = None,
activity: Optional[str] = None,
reply_to: Optional[int] = None,
) -> int:
sid = sender_id or self._session_id or "unknown"
ttl = ttl_seconds if ttl_seconds is not None else CHANNEL_TTL.get(channel, DEFAULT_TTL)
payload_json = json.dumps(payload, default=str)
now = self._now_utc()
# ADR-173 v2: Resolve priority from config if not explicit
if priority is None:
priority = CHANNEL_PRIORITY.get(channel, {}).get(message_type, 0)
# ADR-173 v2: Calculate expires_at
expires_at = (
datetime.now(timezone.utc) + timedelta(seconds=ttl)
).strftime("%Y-%m-%dT%H:%M:%SZ")
# ADR-173 v2: Enrich with session identity from registry
sender_session_uuid = None
try:
conn = self._get_conn()
reg = conn.execute(
"SELECT session_uuid, project_id FROM session_registry "
"WHERE session_id = ?",
(sid,)
).fetchone()
if reg:
sender_session_uuid = reg["session_uuid"]
if project_id is None:
project_id = reg["project_id"]
except sqlite3.Error:
pass
# ADR-173 v2: Dedup check
dedup_window = CHANNEL_DEDUP.get(channel, {}).get(message_type, 0)
if dedup_window > 0:
try:
conn = self._get_conn()
existing = conn.execute(
"SELECT last_message_id, last_sent_at FROM message_dedup "
"WHERE sender_id = ? AND channel = ? AND message_type = ? "
"AND task_id = ?",
(sid, channel, message_type, task_id or "")
).fetchone()
if existing and existing["last_sent_at"]:
last_dt = datetime.strptime(
existing["last_sent_at"], "%Y-%m-%dT%H:%M:%SZ"
).replace(tzinfo=timezone.utc)
if (datetime.now(timezone.utc) - last_dt).total_seconds() < dedup_window:
logger.debug(
f"Dedup suppressed {channel}/{message_type} from {sid} "
f"(within {dedup_window}s window)"
)
return existing["last_message_id"]
except sqlite3.Error:
pass # Dedup failure is non-fatal
msg_id = 0
def _pub():
nonlocal msg_id
conn = self._get_conn()
# ADR-173 v2: Insert into session_messages
cursor = conn.execute(
"INSERT INTO session_messages "
"(sender_id, sender_session_uuid, recipient_id, channel, "
" message_type, priority, status, project_id, task_id, "
" activity, payload, reply_to, created_at, expires_at) "
"VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?, ?, ?)",
(sid, sender_session_uuid, recipient_id, channel,
message_type, priority, project_id, task_id,
activity, payload_json, reply_to, now, expires_at)
)
msg_id = cursor.lastrowid or 0
# Backward compat: also insert into legacy table
conn.execute(
"INSERT INTO inter_session_messages "
"(sender_id, channel, payload, created_at, ttl_seconds) "
"VALUES (?, ?, ?, ?, ?)",
(sid, channel, payload_json, now, ttl)
)
# Update dedup table
if dedup_window > 0:
conn.execute(
"INSERT OR REPLACE INTO message_dedup "
"(sender_id, channel, message_type, task_id, "
" last_message_id, last_sent_at, coalesce_window_seconds) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(sid, channel, message_type, task_id or "", msg_id, now, dedup_window)
)
conn.commit()
_sqlite_retry(_pub)
self._cleanup_expired()
logger.debug(f"Published message {msg_id} to channel '{channel}/{message_type}'")
return msg_id
def poll(
self,
channel: str,
since_id: int = 0,
limit: int = 100,
*,
status_filter: Optional[str] = None,
recipient_id: Optional[str] = None,
) -> List[SessionMessage]:
conn = self._get_conn()
# ADR-173 v2: Query session_messages with optional filters
sql = (
"SELECT id, sender_id, sender_session_uuid, recipient_id, "
" channel, message_type, priority, status, project_id, "
" task_id, activity, payload, reply_to, created_at, "
" delivered_at, read_at, expires_at "
"FROM session_messages "
"WHERE channel = ? AND id > ? AND expires_at >= datetime('now')"
)
params: list = [channel, since_id]
if status_filter:
sql += " AND status = ?"
params.append(status_filter)
if recipient_id:
sql += " AND (recipient_id IS NULL OR recipient_id = ?)"
params.append(recipient_id)
sql += " ORDER BY id ASC LIMIT ?"
params.append(limit)
rows = conn.execute(sql, params).fetchall()
messages = []
for row in rows:
try:
payload = json.loads(row["payload"]) if row["payload"] else {}
except (json.JSONDecodeError, TypeError):
payload = {"raw": row["payload"]}
messages.append(SessionMessage(
id=row["id"],
sender_id=row["sender_id"],
channel=row["channel"],
payload=payload,
created_at=row["created_at"],
sender_session_uuid=row["sender_session_uuid"],
recipient_id=row["recipient_id"],
message_type=row["message_type"],
priority=row["priority"],
status=row["status"],
project_id=row["project_id"],
task_id=row["task_id"],
activity=row["activity"],
reply_to=row["reply_to"],
expires_at=row["expires_at"],
delivered_at=row["delivered_at"],
read_at=row["read_at"],
))
return messages
# -- File Locks --
def lock_file(self, file_path: str, session_id: Optional[str] = None) -> bool:
sid = session_id or self._session_id
if not sid:
raise ValueError("No session_id provided and no default set")
now = self._now_utc()
acquired = False
def _lock():
nonlocal acquired
conn = self._get_conn()
# Check existing lock
existing = conn.execute(
"SELECT session_id FROM file_locks WHERE file_path = ?",
(file_path,)
).fetchone()
if existing:
if existing["session_id"] == sid:
# Already held by us - update timestamp
conn.execute(
"UPDATE file_locks SET locked_at = ? WHERE file_path = ?",
(now, file_path)
)
conn.commit()
acquired = True
else:
# Held by another session - check if that session is still active
session = conn.execute(
"SELECT status FROM session_registry WHERE session_id = ?",
(existing["session_id"],)
).fetchone()
if session and session["status"] == "active":
acquired = False
else:
# Stale lock - take it over
conn.execute(
"UPDATE file_locks SET session_id = ?, locked_at = ? "
"WHERE file_path = ?",
(sid, now, file_path)
)
conn.commit()
acquired = True
else:
conn.execute(
"INSERT INTO file_locks (file_path, session_id, locked_at) "
"VALUES (?, ?, ?)",
(file_path, sid, now)
)
conn.commit()
acquired = True
_sqlite_retry(_lock)
if acquired:
logger.debug(f"Locked file '{file_path}' for session {sid}")
else:
logger.debug(f"Cannot lock file '{file_path}' - held by another active session")
return acquired
def unlock_file(self, file_path: str, session_id: Optional[str] = None) -> bool:
sid = session_id or self._session_id
if not sid:
raise ValueError("No session_id provided and no default set")
released = False
def _unlock():
nonlocal released
conn = self._get_conn()
cursor = conn.execute(
"DELETE FROM file_locks WHERE file_path = ? AND session_id = ?",
(file_path, sid)
)
released = cursor.rowcount > 0
conn.commit()
_sqlite_retry(_unlock)
if released:
logger.debug(f"Unlocked file '{file_path}' for session {sid}")
return released
def get_file_locks(self) -> List[FileLock]:
conn = self._get_conn()
rows = conn.execute("SELECT * FROM file_locks").fetchall()
return [
FileLock(
file_path=row["file_path"],
session_id=row["session_id"],
lock_type=row["lock_type"],
locked_at=row["locked_at"],
)
for row in rows
]
# -- ADR-173 v2: Directed Messaging, Delivery Tracking, Conflict Detection --
def send(
self,
recipient_id: str,
channel: str,
payload: Dict[str, Any],
*,
message_type: str = "request",
priority: Optional[int] = None,
task_id: Optional[str] = None,
reply_to: Optional[int] = None,
) -> int:
"""Send a directed message to a specific session (ADR-173 v2)."""
return self.publish(
channel, payload,
message_type=message_type,
recipient_id=recipient_id,
priority=priority,
task_id=task_id,
reply_to=reply_to,
)
def mark_delivered(self, message_id: int) -> bool:
"""Mark a message as delivered (ADR-173 v2)."""
now = self._now_utc()
updated = False
def _mark():
nonlocal updated
conn = self._get_conn()
cursor = conn.execute(
"UPDATE session_messages SET status = 'delivered', delivered_at = ? "
"WHERE id = ? AND status = 'pending'",
(now, message_id)
)
updated = cursor.rowcount > 0
conn.commit()
_sqlite_retry(_mark)
return updated
def mark_read(self, message_id: int) -> bool:
"""Mark a message as read (ADR-173 v2)."""
now = self._now_utc()
updated = False
def _mark():
nonlocal updated
conn = self._get_conn()
cursor = conn.execute(
"UPDATE session_messages SET status = 'read', read_at = ? "
"WHERE id = ? AND status IN ('pending', 'delivered')",
(now, message_id)
)
updated = cursor.rowcount > 0
conn.commit()
_sqlite_retry(_mark)
return updated
def get_unread(
self,
recipient_id: Optional[str] = None,
channel: Optional[str] = None,
limit: int = 50,
*,
auto_mark_delivered: bool = False,
) -> List[SessionMessage]:
"""Get unread messages for a recipient (ADR-173 v2).
Args:
auto_mark_delivered: If True, mark returned pending messages as
delivered (for use by commands that display messages to user).
"""
rid = recipient_id or self._session_id
conn = self._get_conn()
sql = (
"SELECT id, sender_id, sender_session_uuid, recipient_id, "
" channel, message_type, priority, status, project_id, "
" task_id, activity, payload, reply_to, created_at, "
" delivered_at, read_at, expires_at "
"FROM session_messages "
"WHERE status IN ('pending', 'delivered') "
"AND expires_at >= datetime('now')"
)
params: list = []
if rid:
sql += " AND (recipient_id = ? OR recipient_id IS NULL)"
params.append(rid)
if channel:
sql += " AND channel = ?"
params.append(channel)
sql += " ORDER BY priority DESC, id ASC LIMIT ?"
params.append(limit)
rows = conn.execute(sql, params).fetchall()
messages = []
pending_ids: List[int] = []
for row in rows:
try:
payload = json.loads(row["payload"]) if row["payload"] else {}
except (json.JSONDecodeError, TypeError):
payload = {"raw": row["payload"]}
if auto_mark_delivered and row["status"] == "pending":
pending_ids.append(row["id"])
messages.append(SessionMessage(
id=row["id"],
sender_id=row["sender_id"],
channel=row["channel"],
payload=payload,
created_at=row["created_at"],
sender_session_uuid=row["sender_session_uuid"],
recipient_id=row["recipient_id"],
message_type=row["message_type"],
priority=row["priority"],
status=row["status"],
project_id=row["project_id"],
task_id=row["task_id"],
activity=row["activity"],
reply_to=row["reply_to"],
expires_at=row["expires_at"],
delivered_at=row["delivered_at"],
read_at=row["read_at"],
))
# ADR-173 v2: Auto-mark pending messages as delivered when displayed
if pending_ids:
now = self._now_utc()
try:
placeholders = ",".join("?" for _ in pending_ids)
conn.execute(
f"UPDATE session_messages SET status = 'delivered', "
f"delivered_at = ? WHERE id IN ({placeholders}) "
f"AND status = 'pending'",
[now] + pending_ids
)
conn.commit()
except sqlite3.Error:
pass # Non-fatal — messages still returned
return messages
def get_operator_alerts(
self,
unread_only: bool = True,
limit: int = 20,
*,
auto_mark_delivered: bool = False,
) -> List[SessionMessage]:
"""Get operator alerts (ADR-173 v2 conflict detection).
Args:
auto_mark_delivered: If True, mark returned pending alerts as
delivered (for use by /orient and /session-status).
"""
alerts = self.poll(
"operator_alert",
status_filter="pending" if unread_only else None,
limit=limit,
)
# ADR-173 v2: Auto-mark as delivered when displayed
if auto_mark_delivered and alerts:
pending_ids = [a.id for a in alerts if a.status == "pending"]
if pending_ids:
now = self._now_utc()
try:
conn = self._get_conn()
placeholders = ",".join("?" for _ in pending_ids)
conn.execute(
f"UPDATE session_messages SET status = 'delivered', "
f"delivered_at = ? WHERE id IN ({placeholders}) "
f"AND status = 'pending'",
[now] + pending_ids
)
conn.commit()
except sqlite3.Error:
pass # Non-fatal
return alerts
def _detect_conflicts(
self,
session_id: str,
project_id: Optional[str] = None,
task_id: Optional[str] = None,
cwd: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Detect conflicts with other active sessions (ADR-173 v2).
Checks for: same project, same task, same CWD.
Auto-publishes operator_alert messages for conflicts found.
Returns list of conflict dicts for logging.
"""
conflicts: List[Dict[str, Any]] = []
conn = self._get_conn()
# Get other active sessions
others = conn.execute(
"SELECT session_id, project_id, task_id, cwd, llm_vendor, llm_model "
"FROM session_registry "
"WHERE session_id != ? AND status = 'active'",
(session_id,)
).fetchall()
for other in others:
# Same project conflict
if project_id and other["project_id"] and project_id == other["project_id"]:
conflict = {
"type": "project_conflict",
"other_session": other["session_id"],
"project_id": project_id,
"other_llm": f"{other['llm_vendor']}/{other['llm_model']}",
}
conflicts.append(conflict)
self.publish(
"operator_alert", conflict,
message_type="project_conflict",
project_id=project_id,
)
# Same task conflict
if task_id and other["task_id"] and task_id == other["task_id"]:
conflict = {
"type": "task_conflict",
"other_session": other["session_id"],
"task_id": task_id,
"other_llm": f"{other['llm_vendor']}/{other['llm_model']}",
}
conflicts.append(conflict)
self.publish(
"operator_alert", conflict,
message_type="task_conflict",
task_id=task_id,
)
# Same CWD overlap
if cwd and other["cwd"] and cwd == other["cwd"]:
conflict = {
"type": "cwd_overlap",
"other_session": other["session_id"],
"cwd": cwd,
"other_llm": f"{other['llm_vendor']}/{other['llm_model']}",
}
conflicts.append(conflict)
self.publish(
"operator_alert", conflict,
message_type="cwd_overlap",
)
if conflicts:
logger.warning(
f"Session {session_id}: {len(conflicts)} conflict(s) detected "
f"with other active sessions"
)
return conflicts
# -- Task Broadcasting & Routing (H.13.6) --
def broadcast_task(
self,
task_id: str,
action: str,
details: Optional[Dict[str, Any]] = None,
) -> int:
payload = {
"task_id": task_id,
"action": action,
}
if details:
payload["details"] = details
return self.publish(
"task_broadcast", payload,
message_type=action, task_id=task_id,
)
def claim_task(self, task_id: str, session_id: Optional[str] = None) -> bool:
sid = session_id or self._session_id
if not sid:
raise ValueError("No session_id provided and no default set")
now = self._now_utc()
acquired = False
def _claim():
nonlocal acquired
conn = self._get_conn()
existing = conn.execute(
"SELECT session_id FROM task_claims WHERE task_id = ?",
(task_id,)
).fetchone()
if existing:
if existing["session_id"] == sid:
# Already claimed by us - refresh
conn.execute(
"UPDATE task_claims SET claimed_at = ? WHERE task_id = ?",
(now, task_id)
)
conn.commit()
acquired = True
else:
# Claimed by another session - check if still active
session = conn.execute(
"SELECT status FROM session_registry WHERE session_id = ?",
(existing["session_id"],)
).fetchone()
if session and session["status"] == "active":
acquired = False
else:
# Stale claim - take over
conn.execute(
"UPDATE task_claims SET session_id = ?, claimed_at = ? "
"WHERE task_id = ?",
(sid, now, task_id)
)
conn.commit()
acquired = True
else:
try:
conn.execute(
"INSERT INTO task_claims (task_id, session_id, claimed_at) "
"VALUES (?, ?, ?)",
(task_id, sid, now)
)
conn.commit()
acquired = True
except sqlite3.IntegrityError:
# Race: another thread claimed between our check and insert
acquired = False
_sqlite_retry(_claim)
if acquired:
self.broadcast_task(task_id, "started")
logger.debug(f"Claimed task '{task_id}' for session {sid}")
return acquired
def release_task(self, task_id: str, session_id: Optional[str] = None) -> bool:
sid = session_id or self._session_id
if not sid:
raise ValueError("No session_id provided and no default set")
released = False
def _release():
nonlocal released
conn = self._get_conn()
cursor = conn.execute(
"DELETE FROM task_claims WHERE task_id = ? AND session_id = ?",
(task_id, sid)
)
released = cursor.rowcount > 0
conn.commit()
_sqlite_retry(_release)
if released:
self.broadcast_task(task_id, "released")
logger.debug(f"Released task '{task_id}' for session {sid}")
return released
def get_task_claims(self) -> List[TaskClaim]:
conn = self._get_conn()
# Clean stale claims first
conn.execute(
"DELETE FROM task_claims WHERE session_id IN "
"(SELECT session_id FROM session_registry WHERE status = 'stale')"
)
conn.commit()
rows = conn.execute("SELECT * FROM task_claims").fetchall()
return [
TaskClaim(
task_id=row["task_id"],
session_id=row["session_id"],
claimed_at=row["claimed_at"],
status=row["status"],
)
for row in rows
]
def update_session_task(self, task_id: str, session_id: Optional[str] = None) -> None:
sid = session_id or self._session_id
if not sid:
raise ValueError("No session_id provided and no default set")
now = self._now_utc()
def _update():
conn = self._get_conn()
conn.execute(
"UPDATE session_registry SET task_id = ?, last_active_at = ? "
"WHERE session_id = ?",
(task_id, now, sid)
)
conn.commit()
_sqlite_retry(_update)
def get_cross_llm_status(self) -> List[Dict[str, Any]]:
self._cleanup_stale_sessions()
conn = self._get_conn()
# Detect available columns (handles pre-migration databases)
col_info = conn.execute("PRAGMA table_info(session_registry)").fetchall()
col_names = {row[1] for row in col_info}
has_session_uuid = "session_uuid" in col_names
has_user_id = "user_id" in col_names
uuid_col = ", s.session_uuid" if has_session_uuid else ", NULL as session_uuid"
user_col = ", s.user_id" if has_user_id else ", NULL as user_id"
rows = conn.execute(
"SELECT s.session_id, s.llm_vendor, s.llm_model, s.project_id, "
"s.task_id, s.pid, s.tty, s.cwd, s.machine_uuid, s.hostname, "
"s.heartbeat_at, s.last_active_at, s.active_files, "
f"t.task_id as claimed_task{uuid_col}{user_col} "
"FROM session_registry s "
"LEFT JOIN task_claims t ON s.session_id = t.session_id "
"WHERE s.status = 'active' "
"ORDER BY s.last_active_at DESC, s.heartbeat_at DESC"
).fetchall()
result = []
for row in rows:
active_files = None
if row["active_files"]:
try:
active_files = json.loads(row["active_files"])
except (json.JSONDecodeError, TypeError):
active_files = None
result.append({
"session_id": row["session_id"],
"llm_vendor": row["llm_vendor"],
"llm_model": row["llm_model"],
"session_uuid": row["session_uuid"],
"user_id": row["user_id"],
"project_id": row["project_id"],
"task_id": row["task_id"],
"claimed_task": row["claimed_task"],
"pid": row["pid"],
"tty": row["tty"],
"cwd": row["cwd"],
"machine_uuid": row["machine_uuid"],
"hostname": row["hostname"],
"active_files": active_files,
"heartbeat_at": row["heartbeat_at"],
"last_active_at": row["last_active_at"],
})
return result
# -- Heartbeat Thread (H.13.4.2) --
def start_heartbeat_thread(
self,
interval: float = 15.0,
session_id: Optional[str] = None,
) -> threading.Thread:
"""
Start a background daemon thread that sends heartbeats at a fixed interval.
The thread automatically stops when the main thread exits (daemon=True)
or when stop_heartbeat_thread() is called.
Args:
interval: Seconds between heartbeats (default 15s per ADR-160)
session_id: Session to heartbeat (defaults to bus default)
Returns:
The heartbeat Thread object
"""
sid = session_id or self._session_id
if not sid:
raise ValueError("No session_id provided and no default set")
self._heartbeat_stop = threading.Event()
def _heartbeat_loop():
while not self._heartbeat_stop.is_set():
try:
self.heartbeat(session_id=sid)
except Exception as e:
logger.debug(f"Heartbeat error (non-fatal): {e}")
self._heartbeat_stop.wait(interval)
t = threading.Thread(
target=_heartbeat_loop,
name=f"coditect-heartbeat-{sid}",
daemon=True,
)
self._heartbeat_thread = t
t.start()
logger.info(f"Started heartbeat thread for {sid} (interval={interval}s)")
return t
def stop_heartbeat_thread(self) -> None:
"""Stop the background heartbeat thread if running."""
stop_event = getattr(self, '_heartbeat_stop', None)
thread = getattr(self, '_heartbeat_thread', None)
if stop_event:
stop_event.set()
if thread and thread.is_alive():
thread.join(timeout=2.0)
logger.info("Heartbeat thread stopped")
self._heartbeat_stop = None
self._heartbeat_thread = None
# -- Lifecycle --
def close(self) -> None:
self.stop_heartbeat_thread()
watcher = getattr(self, '_watcher', None)
if watcher:
watcher.stop()
self._watcher = None
if self._conn:
try:
self._conn.close()
except sqlite3.Error:
pass
self._conn = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def __del__(self):
if hasattr(self, '_conn'):
self.close()
# -- Status / Debug --
def stats(self) -> Dict[str, Any]:
"""Get messaging system statistics (ADR-173 v2)."""
conn = self._get_conn()
active = conn.execute(
"SELECT COUNT(*) FROM session_registry WHERE status = 'active'"
).fetchone()[0]
total_sessions = conn.execute(
"SELECT COUNT(*) FROM session_registry"
).fetchone()[0]
legacy_messages = conn.execute(
"SELECT COUNT(*) FROM inter_session_messages"
).fetchone()[0]
v2_messages = conn.execute(
"SELECT COUNT(*) FROM session_messages"
).fetchone()[0]
v2_pending = conn.execute(
"SELECT COUNT(*) FROM session_messages WHERE status = 'pending'"
).fetchone()[0]
v2_unread_alerts = conn.execute(
"SELECT COUNT(*) FROM session_messages "
"WHERE channel = 'operator_alert' AND status IN ('pending', 'delivered')"
).fetchone()[0]
locks = conn.execute(
"SELECT COUNT(*) FROM file_locks"
).fetchone()[0]
claims = conn.execute(
"SELECT COUNT(*) FROM task_claims"
).fetchone()[0]
db_size = 0
if self._db_path.exists():
db_size = self._db_path.stat().st_size
return {
"active_sessions": active,
"total_sessions": total_sessions,
"legacy_messages": legacy_messages,
"v2_messages": v2_messages,
"v2_pending": v2_pending,
"v2_unread_alerts": v2_unread_alerts,
"file_locks": locks,
"task_claims": claims,
"db_path": str(self._db_path),
"db_size_bytes": db_size,
}
---------------------------------------------------------------------------
Factory
---------------------------------------------------------------------------
_default_bus: Optional[SQLiteSessionMessageBus] = None
def get_session_message_bus( db_path: Optional[Path] = None, session_id: Optional[str] = None, ) -> SQLiteSessionMessageBus: """ Get or create the default session message bus instance.
This is the primary entry point for inter-session coordination.
Args:
db_path: Optional custom database path (for testing)
session_id: Optional default session ID
Returns:
SQLiteSessionMessageBus instance
"""
global _default_bus
if _default_bus is None or db_path is not None:
bus = SQLiteSessionMessageBus(db_path=db_path, session_id=session_id)
if db_path is None:
_default_bus = bus
return bus
if session_id and _default_bus._session_id is None:
_default_bus._session_id = session_id
return _default_bus
---------------------------------------------------------------------------
CLI
---------------------------------------------------------------------------
if name == "main": import argparse import sys
parser = argparse.ArgumentParser(
description="CODITECT Inter-Session Message Bus (ADR-160 + ADR-173 v2)"
)
parser.add_argument(
"command",
choices=[
"stats", "sessions", "messages", "locks", "publish", "poll",
"unread", "alerts", "send", "mark-read",
],
help="Command to run"
)
parser.add_argument("--channel", default="state", help="Message channel")
parser.add_argument("--message", default="{}", help="JSON payload for publish")
parser.add_argument("--message-type", default="info", help="Message type (ADR-173)")
parser.add_argument("--recipient", default=None, help="Recipient session ID (ADR-173)")
parser.add_argument("--since-id", type=int, default=0, help="Poll since message ID")
parser.add_argument("--message-id", type=int, default=0, help="Message ID (for mark-read)")
parser.add_argument("--session-id", default="cli-debug", help="Session ID")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
bus = get_session_message_bus(session_id=args.session_id)
if args.command == "stats":
result = bus.stats()
if args.json:
print(json.dumps(result, indent=2))
else:
print("Inter-Session Message Bus Statistics")
print("=" * 45)
for k, v in result.items():
print(f" {k}: {v}")
elif args.command == "sessions":
sessions = bus.list_sessions(active_only=False)
if args.json:
print(json.dumps([
{
"session_id": s.session_id,
"llm_vendor": s.llm_vendor,
"llm_model": s.llm_model,
"project_id": s.project_id,
"status": s.status,
"heartbeat_at": s.heartbeat_at,
}
for s in sessions
], indent=2))
else:
if not sessions:
print("No registered sessions.")
else:
for s in sessions:
print(
f" [{s.status}] {s.session_id} "
f"({s.llm_vendor}/{s.llm_model}) "
f"project={s.project_id} "
f"heartbeat={s.heartbeat_at}"
)
elif args.command == "messages":
messages = bus.poll(args.channel, since_id=args.since_id)
if args.json:
print(json.dumps([
{
"id": m.id,
"sender_id": m.sender_id,
"channel": m.channel,
"payload": m.payload,
"created_at": m.created_at,
}
for m in messages
], indent=2))
else:
if not messages:
print(f"No messages on channel '{args.channel}' since ID {args.since_id}.")
else:
for m in messages:
print(f" [{m.id}] {m.sender_id} -> {m.channel}: {m.payload}")
elif args.command == "locks":
locks = bus.get_file_locks()
if args.json:
print(json.dumps([
{"file_path": l.file_path, "session_id": l.session_id, "locked_at": l.locked_at}
for l in locks
], indent=2))
else:
if not locks:
print("No active file locks.")
else:
for l in locks:
print(f" {l.file_path} -> {l.session_id} (since {l.locked_at})")
elif args.command == "publish":
try:
payload = json.loads(args.message)
except json.JSONDecodeError:
payload = {"raw": args.message}
msg_id = bus.publish(args.channel, payload)
print(f"Published message {msg_id} to channel '{args.channel}'")
elif args.command == "poll":
messages = bus.poll(args.channel, since_id=args.since_id)
if args.json:
print(json.dumps([
{
"id": m.id, "sender_id": m.sender_id,
"channel": m.channel, "message_type": m.message_type,
"priority": m.priority, "status": m.status,
"payload": m.payload, "created_at": m.created_at,
}
for m in messages
], indent=2))
else:
for m in messages:
print(f" [{m.id}] {m.sender_id} -> {m.channel}/{m.message_type} "
f"[P{m.priority}|{m.status}]: {json.dumps(m.payload)}")
if not messages:
print(f"No new messages on '{args.channel}' since ID {args.since_id}")
elif args.command == "unread":
messages = bus.get_unread(
recipient_id=args.session_id,
channel=args.channel if args.channel != "state" else None,
)
if args.json:
print(json.dumps([
{
"id": m.id, "sender_id": m.sender_id,
"channel": m.channel, "message_type": m.message_type,
"priority": m.priority, "payload": m.payload,
"created_at": m.created_at,
}
for m in messages
], indent=2))
else:
if not messages:
print("No unread messages.")
else:
print(f"Unread messages ({len(messages)}):")
for m in messages:
print(f" [{m.id}] P{m.priority} {m.sender_id} -> "
f"{m.channel}/{m.message_type}: {json.dumps(m.payload)}")
elif args.command == "alerts":
alerts = bus.get_operator_alerts()
if args.json:
print(json.dumps([
{
"id": m.id, "sender_id": m.sender_id,
"message_type": m.message_type, "priority": m.priority,
"payload": m.payload, "created_at": m.created_at,
}
for m in alerts
], indent=2))
else:
if not alerts:
print("No operator alerts.")
else:
print(f"Operator Alerts ({len(alerts)}):")
for m in alerts:
print(f" [{m.id}] P{m.priority} {m.message_type}: "
f"{json.dumps(m.payload)}")
elif args.command == "send":
if not args.recipient:
print("Error: --recipient required for send command", file=sys.stderr)
sys.exit(1)
try:
payload = json.loads(args.message)
except json.JSONDecodeError:
payload = {"raw": args.message}
msg_id = bus.send(
args.recipient, args.channel, payload,
message_type=args.message_type,
)
print(f"Sent message {msg_id} to '{args.recipient}' on '{args.channel}'")
elif args.command == "mark-read":
if args.message_id <= 0:
print("Error: --message-id required for mark-read command", file=sys.stderr)
sys.exit(1)
ok = bus.mark_read(args.message_id)
print(f"Message {args.message_id}: {'marked read' if ok else 'not found or already read'}")
bus.close()