Skip to main content

#!/usr/bin/env python3 """ CODITECT Session Activity Manifest Manager (H.13)

Thin facade over SQLiteSessionMessageBus (ADR-160) for multi-session coordination. Provides a simple API for session registration, file tracking, conflict detection, and session discovery.

Backend: messaging.db via SQLiteSessionMessageBus (replaced active-sessions.json)

Usage: # Self-test mode python3 scripts/core/manifest_manager.py --self-test

# Register current session
python3 scripts/core/manifest_manager.py --register --llm claude --task-id H.13.1

# Check for conflicts
python3 scripts/core/manifest_manager.py --check /path/to/file.py

# Show active sessions
python3 scripts/core/manifest_manager.py --list

# Cleanup stale sessions
python3 scripts/core/manifest_manager.py --cleanup

Author: CODITECT Framework Version: 2.0.0 Created: February 6, 2026 Updated: February 7, 2026 Task Reference: H.13 Backend: SQLiteSessionMessageBus (ADR-160) - replaces active-sessions.json """

import argparse import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any

Add parent directories to path for imports

SCRIPT_DIR = Path(file).resolve().parent CORE_DIR = SCRIPT_DIR.parent.parent sys.path.insert(0, str(CORE_DIR)) sys.path.insert(0, str(SCRIPT_DIR))

def _get_tty() -> str: """Get current TTY device path.""" try: return os.ttyname(sys.stdin.fileno()) except (OSError, AttributeError): return "unknown"

def _pid_alive(pid: int) -> bool: """Check if a process is still running.""" try: os.kill(pid, 0) return True except ProcessLookupError: return False except PermissionError: return True # Process exists but we can't signal it

class ManifestManager: """ Facade over SQLiteSessionMessageBus for multi-session coordination.

Provides a simple API compatible with the original file-based manifest,
backed by messaging.db (ADR-160) for ACID safety and real-time
notifications.
"""

def __init__(self, db_path: Optional[Path] = None):
self._bus = None
self._db_path = db_path
self._session_id: Optional[str] = None

def _get_bus(self):
"""Lazy-init the message bus singleton."""
if self._bus is None:
from scripts.core.session_message_bus import get_session_message_bus
self._bus = get_session_message_bus(
db_path=self._db_path,
session_id=self._session_id,
)
return self._bus

@property
def session_id(self) -> Optional[str]:
return self._session_id

def register_session(self, llm: str = "claude", task_id: str = "") -> str:
"""
Register current session.

Args:
llm: LLM identifier (claude, codex, gemini, kimi)
task_id: Current task ID (e.g., H.13.1)

Returns:
Session ID string
"""
pid = os.getpid()
self._session_id = f"{llm}-{pid}"
bus = self._get_bus()
bus.register_session(
session_id=self._session_id,
llm_vendor=llm,
tty=_get_tty(),
pid=pid,
)
# Update task_id if provided (register_session doesn't take task_id directly)
if task_id:
try:
bus._conn.execute(
"UPDATE session_registry SET task_id = ? WHERE session_id = ?",
(task_id, self._session_id),
)
bus._conn.commit()
except Exception:
pass
return self._session_id

def heartbeat(self) -> None:
"""Update heartbeat timestamp for current session."""
if not self._session_id:
return
try:
self._get_bus().heartbeat(self._session_id)
except Exception:
pass

def track_file(self, file_path: str) -> None:
"""Acquire an advisory lock on a file for current session."""
if not self._session_id:
return
resolved = str(Path(file_path).resolve())
try:
self._get_bus().lock_file(resolved, self._session_id)
except Exception:
pass

def untrack_file(self, file_path: str) -> None:
"""Release an advisory lock on a file."""
if not self._session_id:
return
resolved = str(Path(file_path).resolve())
try:
self._get_bus().unlock_file(resolved, self._session_id)
except Exception:
pass

def check_conflict(self, file_path: str) -> Optional[Dict[str, Any]]:
"""
Check if another session holds a lock on the given file.

Args:
file_path: Path to check for conflicts

Returns:
Dict with conflicting session info, or None if no conflict
"""
resolved = str(Path(file_path).resolve())
try:
bus = self._get_bus()
locks = bus.get_file_locks()
for lock in locks:
# Skip our own locks
if lock.session_id == self._session_id:
continue
if lock.file_path == resolved:
# Look up session info
sessions = bus.list_sessions(active_only=True)
for s in sessions:
if s.session_id == lock.session_id:
return {
"session_id": s.session_id,
"llm": s.llm_vendor,
"task_id": s.task_id or "",
"pid": s.pid,
"tty": s.tty or "",
}
# Lock exists but session not found - return basic info
return {
"session_id": lock.session_id,
"llm": lock.session_id.split("-")[0] if "-" in lock.session_id else "unknown",
"task_id": "",
"pid": None,
"tty": "",
}
except Exception:
pass
return None

def deregister_session(self) -> None:
"""Remove current session from registry."""
if not self._session_id:
return
try:
self._get_bus().unregister_session(self._session_id)
except Exception:
pass
self._session_id = None

def cleanup_stale(self) -> List[str]:
"""
Remove stale sessions (dead PIDs or expired heartbeats).

The SQLite bus handles heartbeat expiry internally (5min timeout).
This method additionally checks PID liveness.

Returns:
List of removed session IDs
"""
removed = []
try:
bus = self._get_bus()
# list_sessions(active_only=False) to get all including stale
sessions = bus.list_sessions(active_only=False)
for s in sessions:
should_remove = False

# Check PID liveness
if s.pid and not _pid_alive(s.pid):
should_remove = True

# Check if already marked inactive by bus
if s.status != "active":
should_remove = True

if should_remove:
bus.unregister_session(s.session_id)
removed.append(s.session_id)
except Exception:
pass
return removed

def list_sessions(self) -> Dict[str, Any]:
"""
Return all sessions in a dict format compatible with the original manifest.

Returns:
Dict with 'version' and 'sessions' keys
"""
result: Dict[str, Any] = {"version": "2.0.0", "sessions": {}}
try:
bus = self._get_bus()
sessions = bus.list_sessions(active_only=True)
locks = bus.get_file_locks()

# Build file map: session_id -> [file_paths]
file_map: Dict[str, List[str]] = {}
for lock in locks:
file_map.setdefault(lock.session_id, []).append(lock.file_path)

for s in sessions:
result["sessions"][s.session_id] = {
"pid": s.pid,
"tty": s.tty or "unknown",
"llm": s.llm_vendor,
"task_id": s.task_id or "",
"start_time": s.registered_at or "",
"last_active": s.heartbeat_at or "",
"active_files": file_map.get(s.session_id, []),
"cwd": "",
"status": s.status,
}
except Exception:
pass
return result

def _run_self_test() -> bool: """Run self-test to verify manifest operations via SQLite backend.""" import tempfile as _tempfile

print("Session Activity Manifest - Self Test (SQLite Backend)")
print("=" * 55)

# Use temp db for testing
with _tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
test_db = Path(f.name)

try:
mgr = ManifestManager(db_path=test_db)

# Test 1: Register
print("1. Register session...", end=" ")
sid = mgr.register_session(llm="claude", task_id="H.13.TEST")
assert sid.startswith("claude-"), f"Unexpected session ID: {sid}"
print(f"OK ({sid})")

# Test 2: Heartbeat
print("2. Heartbeat...", end=" ")
mgr.heartbeat()
data = mgr.list_sessions()
assert sid in data["sessions"], f"Session {sid} not in {list(data['sessions'].keys())}"
print("OK")

# Test 3: Track file (lock)
print("3. Track file (lock)...", end=" ")
mgr.track_file("/tmp/test-file.py")
data = mgr.list_sessions()
files = data["sessions"][sid].get("active_files", [])
assert any("/tmp/test-file.py" in f for f in files), f"File not tracked: {files}"
print("OK")

# Test 4: Check conflict (no conflict with self)
print("4. Self-conflict check...", end=" ")
conflict = mgr.check_conflict("/tmp/test-file.py")
assert conflict is None, "Should not conflict with self"
print("OK (no self-conflict)")

# Test 5: Simulate another session with a lock on a different file
print("5. Simulate conflict...", end=" ")
bus = mgr._get_bus()
bus.register_session(
session_id="codex-99999",
llm_vendor="codex",
pid=99999,
)
# Use a separate file for codex (bus enforces exclusive locks for active sessions)
codex_file = str(Path("/tmp/codex-conflict-test.py").resolve())
bus.lock_file(codex_file, "codex-99999")
conflict = mgr.check_conflict("/tmp/codex-conflict-test.py")
assert conflict is not None, "Should detect conflict with other session"
assert conflict["session_id"] == "codex-99999"
assert conflict["llm"] == "codex"
print(f"OK (detected: {conflict['session_id']})")

# Test 6: Cleanup stale (PID 99999 is dead)
print("6. Cleanup stale...", end=" ")
removed = mgr.cleanup_stale()
assert "codex-99999" in removed, f"Expected codex-99999 in removed, got {removed}"
print(f"OK (removed {len(removed)} stale)")

# Test 7: Untrack file (unlock)
print("7. Untrack file (unlock)...", end=" ")
mgr.untrack_file("/tmp/test-file.py")
data = mgr.list_sessions()
files = data["sessions"][sid].get("active_files", [])
assert not any("/tmp/test-file.py" in f for f in files), f"File still tracked: {files}"
print("OK")

# Test 8: Deregister
print("8. Deregister...", end=" ")
mgr.deregister_session()
data = mgr.list_sessions()
assert sid not in data["sessions"], f"Session {sid} still present"
print("OK")

# Cleanup bus
bus.close()

print("=" * 55)
print("All 8 tests passed (SQLite backend)")
return True

finally:
try:
test_db.unlink(missing_ok=True)
test_db.with_suffix('.db-wal').unlink(missing_ok=True)
test_db.with_suffix('.db-shm').unlink(missing_ok=True)
except OSError:
pass

def main(): """CLI entry point.""" parser = argparse.ArgumentParser( description="CODITECT Session Activity Manifest Manager (H.13) - SQLite Backend" ) parser.add_argument("--self-test", action="store_true", help="Run self-test") parser.add_argument("--register", action="store_true", help="Register current session") parser.add_argument("--llm", default="claude", help="LLM identifier") parser.add_argument("--task-id", default="", help="Current task ID") parser.add_argument("--check", metavar="FILE", help="Check file for conflicts") parser.add_argument("--list", action="store_true", help="List active sessions") parser.add_argument("--cleanup", action="store_true", help="Remove stale sessions") parser.add_argument("--json", action="store_true", help="JSON output")

args = parser.parse_args()

if args.self_test:
success = _run_self_test()
sys.exit(0 if success else 1)

mgr = ManifestManager()

if args.register:
sid = mgr.register_session(llm=args.llm, task_id=args.task_id)
if args.json:
print(json.dumps({"session_id": sid}))
else:
print(f"Registered: {sid}")
return

if args.check:
conflict = mgr.check_conflict(args.check)
if conflict:
if args.json:
print(json.dumps(conflict))
else:
print(f"CONFLICT: {args.check}")
print(f" Session: {conflict['session_id']}")
print(f" LLM: {conflict['llm']}")
print(f" Task: {conflict['task_id']}")
print(f" PID: {conflict['pid']}")
else:
if args.json:
print(json.dumps({"conflict": None}))
else:
print(f"No conflict: {args.check}")
return

if args.cleanup:
removed = mgr.cleanup_stale()
if args.json:
print(json.dumps({"removed": removed}))
else:
if removed:
print(f"Removed {len(removed)} stale sessions: {', '.join(removed)}")
else:
print("No stale sessions found")
return

if args.list:
data = mgr.list_sessions()
if args.json:
print(json.dumps(data, indent=2, default=str))
else:
sessions = data.get("sessions", {})
if not sessions:
print("No active sessions")
else:
print(f"Active Sessions ({len(sessions)}):")
print("-" * 60)
for sid, s in sessions.items():
files = s.get("active_files", [])
print(f" {sid}")
print(f" LLM: {s.get('llm')} | Task: {s.get('task_id', 'N/A')}")
print(f" PID: {s.get('pid')} | TTY: {s.get('tty')}")
print(f" Files: {len(files)}")
for fp in files[:5]:
print(f" - {fp}")
if len(files) > 5:
print(f" ... and {len(files) - 5} more")
return

parser.print_help()

if name == "main": main()