#!/usr/bin/env python3 """ CODITECT Context Vacuum — Orphan Task Detection (ADR-178)
Detects orphaned tasks — work that was started, claimed, or partially completed but abandoned when sessions died, contexts compacted, or projects were parked — with automated resurfacing through context vacuum sweeps.
Three sweep modes:
- Quick (<2s) — dead PIDs, stale claims, unresolved alerts
- Standard (10-30s) — cross-references TRACK files + messaging.db + session logs
- Deep (2-5 min) — full history, git commits, task lifecycle timeline
Usage: from scripts.core.context_vacuum import ContextVacuum
vacuum = ContextVacuum()
# Quick sweep (for /orient)
report = vacuum.quick_sweep()
# Standard sweep
report = vacuum.standard_sweep()
# Deep sweep
report = vacuum.deep_sweep()
# Actions
vacuum.adopt_task("H.13.9.7")
vacuum.defer_task("K.3.1")
vacuum.cancel_task("K.3.1")
CLI: python3 scripts/core/context_vacuum.py # Standard sweep python3 scripts/core/context_vacuum.py --quick # Quick sweep python3 scripts/core/context_vacuum.py --deep # Deep sweep python3 scripts/core/context_vacuum.py --adopt H.13.9.7 # Adopt orphan python3 scripts/core/context_vacuum.py --defer K.3.1 # Defer orphan python3 scripts/core/context_vacuum.py --cancel K.3.1 # Cancel orphan python3 scripts/core/context_vacuum.py --json # JSON output
Created: 2026-02-12 ADR: ADR-178 (Orphan Task Detection and Context Vacuum) Track: H.13.10 """
import argparse import glob import json import logging import os import re import sqlite3 import subprocess import sys import time from dataclasses import asdict, dataclass, field from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(name)
---------------------------------------------------------------------------
Data Classes
---------------------------------------------------------------------------
@dataclass class OrphanTask: """An orphaned task detected by the vacuum.""" task_id: str orphan_type: str # dead_claim, stale_in_progress, conflict_abandoned, never_started, parked_project track: str description: str = "" last_activity: Optional[str] = None # ISO 8601 last_session_id: Optional[str] = None evidence: Dict[str, Any] = field(default_factory=dict) resolution: Optional[str] = None # None, adopted, completed, deferred, cancelled
@dataclass class VacuumReport: """Result of a vacuum sweep.""" sweep_type: str # quick, standard, deep created_at: str orphans: List[OrphanTask] = field(default_factory=list) parked_tracks: List[Dict[str, Any]] = field(default_factory=list) active_tasks: List[Dict[str, Any]] = field(default_factory=list) summary: Dict[str, Any] = field(default_factory=dict) duration_ms: int = 0
---------------------------------------------------------------------------
Schema
---------------------------------------------------------------------------
_VACUUM_REPORTS_SQL = """ CREATE TABLE IF NOT EXISTS vacuum_reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, sweep_type TEXT NOT NULL, created_at TEXT NOT NULL, task_id TEXT NOT NULL, orphan_type TEXT NOT NULL, last_activity TEXT, last_session_id TEXT, track TEXT, description TEXT, evidence TEXT, resolution TEXT, resolved_at TEXT, resolved_by TEXT );
CREATE INDEX IF NOT EXISTS idx_vacuum_task ON vacuum_reports(task_id); CREATE INDEX IF NOT EXISTS idx_vacuum_orphan ON vacuum_reports(orphan_type); CREATE INDEX IF NOT EXISTS idx_vacuum_unresolved ON vacuum_reports(resolution) WHERE resolution IS NULL; """
---------------------------------------------------------------------------
Context Vacuum
---------------------------------------------------------------------------
class ContextVacuum: """Orphan task detection and context vacuum sweeps (ADR-178)."""
def __init__(
self,
coditect_core_dir: Optional[Path] = None,
messaging_db_path: Optional[Path] = None,
session_logs_dir: Optional[Path] = None,
):
# Resolve coditect-core directory
if coditect_core_dir:
self._core_dir = Path(coditect_core_dir)
else:
# Try common locations
candidates = [
Path(__file__).parent.parent.parent, # scripts/core/ -> coditect-core
Path.home() / "PROJECTS" / "coditect-rollout-master" / "submodules" / "core" / "coditect-core",
Path.home() / ".coditect",
]
self._core_dir = next((c for c in candidates if (c / "internal").exists()), candidates[0])
# Resolve messaging.db path
if messaging_db_path:
self._db_path = Path(messaging_db_path)
else:
try:
sys.path.insert(0, str(self._core_dir))
from scripts.core.paths import get_messaging_db_path
self._db_path = get_messaging_db_path()
except ImportError:
self._db_path = (
Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "messaging.db"
)
# Resolve session logs directory
if session_logs_dir:
self._logs_dir = Path(session_logs_dir)
else:
self._logs_dir = Path.home() / "PROJECTS" / ".coditect-data" / "session-logs"
# TRACK files directory
self._tracks_dir = self._core_dir / "internal" / "project" / "plans" / "pilot-tracks"
# Ensure vacuum_reports table exists
self._ensure_schema()
# -- Schema ---------------------------------------------------------------
def _ensure_schema(self) -> None:
"""Create vacuum_reports table if needed."""
if not self._db_path.exists():
return
try:
conn = sqlite3.connect(str(self._db_path), timeout=5.0)
conn.executescript(_VACUUM_REPORTS_SQL)
conn.close()
except sqlite3.Error as e:
logger.warning(f"Could not create vacuum_reports table: {e}")
def _get_conn(self) -> sqlite3.Connection:
"""Get a SQLite connection to messaging.db."""
conn = sqlite3.connect(str(self._db_path), timeout=5.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
# -- Quick Sweep (Phase 1) -----------------------------------------------
def quick_sweep(self) -> VacuumReport:
"""
Quick sweep — <2s, queries only messaging.db.
Checks:
1. Dead PIDs in task_claims
2. Stale claims (>24h)
3. Unresolved conflict alerts
"""
start = time.monotonic()
now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
orphans: List[OrphanTask] = []
if not self._db_path.exists():
return VacuumReport(
sweep_type="quick",
created_at=now_utc,
summary={"error": "messaging.db not found"},
)
try:
conn = self._get_conn()
# 1. Dead PID claims
dead_pid_orphans = self._check_dead_pid_claims(conn)
orphans.extend(dead_pid_orphans)
# 2. Stale claims (>24h)
stale_orphans = self._check_stale_claims(conn, hours=24)
orphans.extend(stale_orphans)
# 3. Unresolved conflict alerts
conflict_orphans = self._check_unresolved_conflicts(conn)
orphans.extend(conflict_orphans)
conn.close()
except sqlite3.Error as e:
logger.error(f"Quick sweep database error: {e}")
elapsed = int((time.monotonic() - start) * 1000)
return VacuumReport(
sweep_type="quick",
created_at=now_utc,
orphans=orphans,
summary={
"total_orphans": len(orphans),
"dead_claims": sum(1 for o in orphans if o.orphan_type == "dead_claim"),
"stale_claims": sum(1 for o in orphans if o.orphan_type == "stale_in_progress"),
"conflicts": sum(1 for o in orphans if o.orphan_type == "conflict_abandoned"),
},
duration_ms=elapsed,
)
def _check_dead_pid_claims(self, conn: sqlite3.Connection) -> List[OrphanTask]:
"""Find task claims where the owning session's PID is dead."""
orphans = []
try:
rows = conn.execute(
"SELECT tc.task_id, tc.session_id, tc.claimed_at, "
"sr.pid, sr.last_active_at, sr.status "
"FROM task_claims tc "
"JOIN session_registry sr ON tc.session_id = sr.session_id"
).fetchall()
for row in rows:
pid = row["pid"]
if pid and not self._is_pid_alive(pid):
orphans.append(OrphanTask(
task_id=row["task_id"],
orphan_type="dead_claim",
track=self._extract_track(row["task_id"]),
last_activity=row["last_active_at"] or row["claimed_at"],
last_session_id=row["session_id"],
evidence={
"claimed_at": row["claimed_at"],
"dead_pid": pid,
"session_status": row["status"],
},
))
except sqlite3.Error as e:
logger.debug(f"Dead PID check error: {e}")
return orphans
def _check_stale_claims(self, conn: sqlite3.Connection, hours: int = 24) -> List[OrphanTask]:
"""Find task claims older than N hours."""
orphans = []
try:
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
rows = conn.execute(
"SELECT tc.task_id, tc.session_id, tc.claimed_at, "
"sr.pid, sr.last_active_at "
"FROM task_claims tc "
"JOIN session_registry sr ON tc.session_id = sr.session_id "
"WHERE tc.claimed_at < ?",
(cutoff,),
).fetchall()
for row in rows:
# Skip if PID is still alive (already caught by dead PID check)
pid = row["pid"]
if pid and self._is_pid_alive(pid):
continue
orphans.append(OrphanTask(
task_id=row["task_id"],
orphan_type="stale_in_progress",
track=self._extract_track(row["task_id"]),
last_activity=row["last_active_at"] or row["claimed_at"],
last_session_id=row["session_id"],
evidence={
"claimed_at": row["claimed_at"],
"stale_hours": hours,
},
))
except sqlite3.Error as e:
logger.debug(f"Stale claims check error: {e}")
return orphans
def _check_unresolved_conflicts(self, conn: sqlite3.Connection) -> List[OrphanTask]:
"""Find unresolved task_conflict alerts in session_messages."""
orphans = []
try:
# Check if session_messages table exists
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='session_messages'"
).fetchone()
if not tables:
return orphans
rows = conn.execute(
"SELECT id, task_id, sender_id, created_at, payload "
"FROM session_messages "
"WHERE channel = 'operator_alert' "
"AND message_type = 'task_conflict' "
"AND status IN ('pending', 'delivered') "
"ORDER BY created_at DESC"
).fetchall()
seen_tasks = set()
for row in rows:
task_id = row["task_id"]
if not task_id or task_id in seen_tasks:
continue
seen_tasks.add(task_id)
payload = {}
try:
payload = json.loads(row["payload"]) if row["payload"] else {}
except (json.JSONDecodeError, TypeError):
pass
orphans.append(OrphanTask(
task_id=task_id,
orphan_type="conflict_abandoned",
track=self._extract_track(task_id),
last_activity=row["created_at"],
last_session_id=row["sender_id"],
evidence={
"alert_id": row["id"],
"conflict_details": payload,
},
))
except sqlite3.Error as e:
logger.debug(f"Conflict check error: {e}")
return orphans
# -- Standard Sweep (Phase 2) --------------------------------------------
def standard_sweep(self) -> VacuumReport:
"""
Standard sweep — 10-30s, cross-references TRACK files + messaging.db + session logs.
1. Parse all TRACK files for unchecked [ ] tasks
2. Cross-reference with messaging.db (claims, broadcasts)
3. Grep session logs (last 30 days) for task ID mentions
4. Classify orphans
5. Detect parked tracks
"""
start = time.monotonic()
now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
orphans: List[OrphanTask] = []
parked_tracks: List[Dict[str, Any]] = []
active_tasks: List[Dict[str, Any]] = []
# Step 1: Parse TRACK files
track_tasks = self._parse_track_files()
# Step 2: Get messaging.db activity
db_activity = self._get_db_activity()
# Step 3: Get session log mentions (last 30 days)
log_mentions = self._get_log_mentions(days=30)
# Step 4: Classify each unchecked task
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
for task in track_tasks:
task_id = task["task_id"]
track = task["track"]
desc = task["description"]
# Find last activity across all sources
claim_info = db_activity.get("claims", {}).get(task_id)
broadcast_time = db_activity.get("broadcasts", {}).get(task_id)
log_time = log_mentions.get(task_id)
activities = []
if claim_info:
activities.append(("claim", claim_info.get("claimed_at", "")))
if broadcast_time:
activities.append(("broadcast", broadcast_time))
if log_time:
activities.append(("log_mention", log_time))
if not activities:
# Never started — not an orphan, just planned
continue
last_type, last_time = max(activities, key=lambda x: x[1])
evidence = {}
if claim_info:
evidence["claim"] = claim_info
if broadcast_time:
evidence["last_broadcast"] = broadcast_time
if log_time:
evidence["last_log_mention"] = log_time
# Check if dead claim
if claim_info and claim_info.get("pid"):
if not self._is_pid_alive(claim_info["pid"]):
orphans.append(OrphanTask(
task_id=task_id,
orphan_type="dead_claim",
track=track,
description=desc,
last_activity=last_time,
last_session_id=claim_info.get("session_id"),
evidence=evidence,
))
continue
# Check if conflict-abandoned
conflict = db_activity.get("conflicts", {}).get(task_id)
if conflict:
orphans.append(OrphanTask(
task_id=task_id,
orphan_type="conflict_abandoned",
track=track,
description=desc,
last_activity=last_time,
last_session_id=claim_info.get("session_id") if claim_info else None,
evidence={**evidence, "conflict": conflict},
))
continue
# Check if stale (>7 days since last activity)
if last_time < cutoff_7d:
orphans.append(OrphanTask(
task_id=task_id,
orphan_type="stale_in_progress",
track=track,
description=desc,
last_activity=last_time,
evidence=evidence,
))
else:
# Active — not orphaned
active_tasks.append({
"task_id": task_id,
"track": track,
"description": desc,
"last_activity": last_time,
"activity_type": last_type,
})
# Step 5: Detect parked tracks
parked_tracks = self._detect_parked_tracks(track_tasks, db_activity, log_mentions)
# Also include quick sweep orphans
quick = self.quick_sweep()
# Merge quick sweep orphans not already found
existing_ids = {o.task_id for o in orphans}
for o in quick.orphans:
if o.task_id not in existing_ids:
orphans.append(o)
elapsed = int((time.monotonic() - start) * 1000)
report = VacuumReport(
sweep_type="standard",
created_at=now_utc,
orphans=orphans,
parked_tracks=parked_tracks,
active_tasks=active_tasks,
summary={
"total_orphans": len(orphans),
"dead_claims": sum(1 for o in orphans if o.orphan_type == "dead_claim"),
"stale_in_progress": sum(1 for o in orphans if o.orphan_type == "stale_in_progress"),
"conflict_abandoned": sum(1 for o in orphans if o.orphan_type == "conflict_abandoned"),
"parked_tracks": len(parked_tracks),
"active_tasks": len(active_tasks),
"total_unchecked": len(track_tasks),
},
duration_ms=elapsed,
)
# Store results
self._store_report(report)
return report
# -- Deep Sweep (Phase 4) ------------------------------------------------
def deep_sweep(self) -> VacuumReport:
"""
Deep sweep — 2-5 min, full historical analysis.
Adds to standard sweep:
- All session logs (not just 30 days)
- Git commit correlation (feat(X.n): pattern)
- Task lifecycle timeline per ID
"""
start = time.monotonic()
now_utc = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# Run standard sweep as baseline
std_report = self.standard_sweep()
orphans = list(std_report.orphans)
existing_ids = {o.task_id for o in orphans}
# Extended: Full session log scan
all_log_mentions = self._get_log_mentions(days=365)
# Extended: Git commit correlation
git_mentions = self._get_git_mentions()
# Re-evaluate tasks with extended data
track_tasks = self._parse_track_files()
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
for task in track_tasks:
task_id = task["task_id"]
if task_id in existing_ids:
continue # Already classified
# Check extended sources
log_time = all_log_mentions.get(task_id)
git_time = git_mentions.get(task_id)
activities = []
if log_time:
activities.append(("log_mention", log_time))
if git_time:
activities.append(("git_commit", git_time))
if not activities:
continue # Still never started
last_type, last_time = max(activities, key=lambda x: x[1])
if last_time < cutoff_7d:
evidence = {}
if log_time:
evidence["last_log_mention"] = log_time
if git_time:
evidence["last_git_commit"] = git_time
orphans.append(OrphanTask(
task_id=task_id,
orphan_type="stale_in_progress",
track=task["track"],
description=task["description"],
last_activity=last_time,
evidence=evidence,
))
# Build task lifecycle timelines for orphans
for orphan in orphans:
timeline = self._build_lifecycle_timeline(
orphan.task_id, all_log_mentions, git_mentions
)
if timeline:
orphan.evidence["lifecycle_timeline"] = timeline
elapsed = int((time.monotonic() - start) * 1000)
report = VacuumReport(
sweep_type="deep",
created_at=now_utc,
orphans=orphans,
parked_tracks=std_report.parked_tracks,
active_tasks=std_report.active_tasks,
summary={
"total_orphans": len(orphans),
"dead_claims": sum(1 for o in orphans if o.orphan_type == "dead_claim"),
"stale_in_progress": sum(1 for o in orphans if o.orphan_type == "stale_in_progress"),
"conflict_abandoned": sum(1 for o in orphans if o.orphan_type == "conflict_abandoned"),
"parked_tracks": len(std_report.parked_tracks),
"active_tasks": len(std_report.active_tasks),
"total_unchecked": std_report.summary.get("total_unchecked", 0),
"git_correlated": sum(
1 for o in orphans if o.evidence.get("last_git_commit")
),
},
duration_ms=elapsed,
)
self._store_report(report)
return report
# -- Actions (Phase 3) ---------------------------------------------------
def adopt_task(self, task_id: str, session_id: Optional[str] = None) -> bool:
"""Claim an orphaned task for the current session."""
sid = session_id or self._detect_session_id()
if not sid:
logger.error("No session_id — cannot adopt task")
return False
# Mark resolved in vacuum_reports
self._resolve_orphan(task_id, "adopted", sid)
# Claim via message bus
try:
sys.path.insert(0, str(self._core_dir))
from scripts.core.session_message_bus import get_session_message_bus
bus = get_session_message_bus(session_id=sid)
result = bus.claim_task(task_id, session_id=sid)
if result:
bus.publish(
"task_broadcast",
{"action": "orphan_adopted", "task_id": task_id},
message_type="adopted",
task_id=task_id,
)
return result
except Exception as e:
logger.error(f"Failed to adopt task via bus: {e}")
return False
def defer_task(self, task_id: str, session_id: Optional[str] = None) -> bool:
"""Mark an orphaned task as intentionally deferred."""
sid = session_id or self._detect_session_id()
self._resolve_orphan(task_id, "deferred", sid or "manual")
return True
def cancel_task(self, task_id: str, session_id: Optional[str] = None) -> bool:
"""Mark an orphaned task as no longer needed."""
sid = session_id or self._detect_session_id()
self._resolve_orphan(task_id, "cancelled", sid or "manual")
return True
def _resolve_orphan(self, task_id: str, resolution: str, resolved_by: str) -> None:
"""Update vacuum_reports with resolution."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
try:
conn = self._get_conn()
conn.execute(
"UPDATE vacuum_reports SET resolution = ?, resolved_at = ?, resolved_by = ? "
"WHERE task_id = ? AND resolution IS NULL",
(resolution, now, resolved_by, task_id),
)
conn.commit()
conn.close()
except sqlite3.Error as e:
logger.error(f"Failed to resolve orphan {task_id}: {e}")
# -- TRACK File Parsing --------------------------------------------------
def _parse_track_files(self) -> List[Dict[str, Any]]:
"""Parse all TRACK files for unchecked [ ] tasks with IDs."""
tasks = []
if not self._tracks_dir.exists():
logger.warning(f"TRACK directory not found: {self._tracks_dir}")
return tasks
task_pattern = re.compile(
r"^-\s+\[\s\]\s+([A-Z]\.\d+(?:\.\d+)*(?:\.\d+)*):\s*(.*)",
re.MULTILINE,
)
for track_file in sorted(self._tracks_dir.glob("TRACK-*.md")):
try:
content = track_file.read_text(encoding="utf-8")
track_letter = self._extract_track_from_filename(track_file.name)
for match in task_pattern.finditer(content):
task_id = match.group(1)
desc = match.group(2).strip()
tasks.append({
"task_id": task_id,
"track": track_letter,
"description": desc,
"file": track_file.name,
})
except Exception as e:
logger.debug(f"Error parsing {track_file.name}: {e}")
return tasks
# -- Database Activity ---------------------------------------------------
def _get_db_activity(self) -> Dict[str, Dict]:
"""Get task-related activity from messaging.db."""
result: Dict[str, Dict] = {
"claims": {},
"broadcasts": {},
"conflicts": {},
}
if not self._db_path.exists():
return result
try:
conn = self._get_conn()
# Task claims
try:
for row in conn.execute(
"SELECT tc.task_id, tc.session_id, tc.claimed_at, sr.pid "
"FROM task_claims tc "
"LEFT JOIN session_registry sr ON tc.session_id = sr.session_id"
).fetchall():
result["claims"][row["task_id"]] = {
"session_id": row["session_id"],
"claimed_at": row["claimed_at"],
"pid": row["pid"],
}
except sqlite3.Error:
pass
# Task broadcasts (last activity per task from session_messages)
try:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='session_messages'"
).fetchone()
if tables:
for row in conn.execute(
"SELECT task_id, MAX(created_at) as last_at "
"FROM session_messages "
"WHERE task_id IS NOT NULL AND task_id != '' "
"GROUP BY task_id"
).fetchall():
result["broadcasts"][row["task_id"]] = row["last_at"]
# Unresolved task conflicts
for row in conn.execute(
"SELECT task_id, created_at, sender_id "
"FROM session_messages "
"WHERE channel = 'operator_alert' "
"AND message_type = 'task_conflict' "
"AND status IN ('pending', 'delivered') "
"AND task_id IS NOT NULL"
).fetchall():
result["conflicts"][row["task_id"]] = {
"alert_at": row["created_at"],
"sender_id": row["sender_id"],
}
except sqlite3.Error:
pass
conn.close()
except sqlite3.Error as e:
logger.debug(f"DB activity query error: {e}")
return result
# -- Session Log Scanning ------------------------------------------------
def _get_log_mentions(self, days: int = 30) -> Dict[str, str]:
"""Grep session logs for task ID mentions. Returns {task_id: last_timestamp}."""
mentions: Dict[str, str] = {}
if not self._logs_dir.exists():
return mentions
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
task_pattern = re.compile(r"\b([A-Z]\.\d+(?:\.\d+){1,3})\b")
# Scan project-scoped logs
for log_file in self._logs_dir.rglob("SESSION-LOG-*.md"):
# Filter by date from filename
date_match = re.search(r"SESSION-LOG-(\d{4}-\d{2}-\d{2})\.md$", log_file.name)
if not date_match:
continue
try:
file_date = datetime.strptime(date_match.group(1), "%Y-%m-%d").replace(
tzinfo=timezone.utc
)
if file_date < cutoff:
continue
except ValueError:
continue
try:
content = log_file.read_text(encoding="utf-8")
# Extract timestamps and task IDs from entries
current_timestamp = date_match.group(1) + "T00:00:00Z"
for line in content.split("\n"):
# Update timestamp from entry headers
ts_match = re.match(
r"###\s+(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)", line
)
if ts_match:
current_timestamp = ts_match.group(1)
# Find task IDs
for task_match in task_pattern.finditer(line):
tid = task_match.group(1)
if current_timestamp > mentions.get(tid, ""):
mentions[tid] = current_timestamp
except Exception:
continue
return mentions
# -- Git Commit Correlation (Deep Sweep) ---------------------------------
def _get_git_mentions(self) -> Dict[str, str]:
"""Correlate task IDs with git commit messages. Returns {task_id: last_commit_date}."""
mentions: Dict[str, str] = {}
task_pattern = re.compile(r"\b([A-Z]\.\d+(?:\.\d+){1,3})\b")
try:
result = subprocess.run(
["git", "log", "--oneline", "--since=365 days ago", "--format=%aI %s"],
capture_output=True,
text=True,
cwd=str(self._core_dir),
timeout=30,
)
if result.returncode == 0:
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split(" ", 1)
if len(parts) < 2:
continue
commit_date = parts[0][:19] + "Z" # Truncate to second
msg = parts[1]
for match in task_pattern.finditer(msg):
tid = match.group(1)
if commit_date > mentions.get(tid, ""):
mentions[tid] = commit_date
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
logger.debug(f"Git log scan error: {e}")
return mentions
# -- Parked Track Detection ----------------------------------------------
def _detect_parked_tracks(
self,
track_tasks: List[Dict[str, Any]],
db_activity: Dict[str, Dict],
log_mentions: Dict[str, str],
) -> List[Dict[str, Any]]:
"""Detect parked tracks (0% complete or >30 days inactive)."""
parked = []
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
# Group tasks by track
tracks: Dict[str, List[Dict]] = {}
for task in track_tasks:
tracks.setdefault(task["track"], []).append(task)
# Count checked tasks per track file
track_checked: Dict[str, int] = {}
if self._tracks_dir.exists():
checked_pattern = re.compile(r"^-\s+\[x\]", re.MULTILINE)
for track_file in self._tracks_dir.glob("TRACK-*.md"):
letter = self._extract_track_from_filename(track_file.name)
content = track_file.read_text(encoding="utf-8")
track_checked[letter] = len(checked_pattern.findall(content))
for track_letter, tasks in tracks.items():
total_unchecked = len(tasks)
total_checked = track_checked.get(track_letter, 0)
total = total_unchecked + total_checked
if total == 0:
continue
completion_pct = round(total_checked / total * 100) if total else 0
# Find last activity across all tasks in this track
last_activity = None
for task in tasks:
tid = task["task_id"]
for source in [
db_activity.get("claims", {}).get(tid, {}).get("claimed_at"),
db_activity.get("broadcasts", {}).get(tid),
log_mentions.get(tid),
]:
if source and (not last_activity or source > last_activity):
last_activity = source
# Check for active claims
has_active_claims = any(
task["task_id"] in db_activity.get("claims", {})
for task in tasks
)
# Parked if: 0% complete OR (>30 days inactive AND no active claims)
if completion_pct == 0 or (
last_activity
and last_activity < cutoff_30d
and not has_active_claims
):
parked.append({
"track": track_letter,
"total_tasks": total,
"unchecked": total_unchecked,
"checked": total_checked,
"completion_pct": completion_pct,
"last_activity": last_activity or "never",
"has_active_claims": has_active_claims,
})
return parked
# -- Lifecycle Timeline (Deep Sweep) -------------------------------------
def _build_lifecycle_timeline(
self,
task_id: str,
log_mentions: Dict[str, str],
git_mentions: Dict[str, str],
) -> List[Dict[str, str]]:
"""Build a chronological timeline of events for a task."""
events = []
if task_id in log_mentions:
events.append({
"type": "log_mention",
"timestamp": log_mentions[task_id],
})
if task_id in git_mentions:
events.append({
"type": "git_commit",
"timestamp": git_mentions[task_id],
})
# Check DB for claim/broadcast activity
if self._db_path.exists():
try:
conn = self._get_conn()
claim = conn.execute(
"SELECT claimed_at FROM task_claims WHERE task_id = ?",
(task_id,),
).fetchone()
if claim:
events.append({
"type": "task_claimed",
"timestamp": claim["claimed_at"],
})
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='session_messages'"
).fetchone()
if tables:
msgs = conn.execute(
"SELECT created_at, message_type, channel "
"FROM session_messages WHERE task_id = ? "
"ORDER BY created_at",
(task_id,),
).fetchall()
for msg in msgs:
events.append({
"type": f"{msg['channel']}/{msg['message_type']}",
"timestamp": msg["created_at"],
})
conn.close()
except sqlite3.Error:
pass
events.sort(key=lambda e: e["timestamp"])
return events
# -- Storage (Phase 3) ---------------------------------------------------
def _store_report(self, report: VacuumReport) -> None:
"""Store vacuum results in messaging.db vacuum_reports table."""
if not self._db_path.exists():
return
try:
conn = self._get_conn()
for orphan in report.orphans:
conn.execute(
"INSERT INTO vacuum_reports "
"(sweep_type, created_at, task_id, orphan_type, last_activity, "
"last_session_id, track, description, evidence, resolution) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
report.sweep_type,
report.created_at,
orphan.task_id,
orphan.orphan_type,
orphan.last_activity,
orphan.last_session_id,
orphan.track,
orphan.description,
json.dumps(orphan.evidence),
orphan.resolution,
),
)
conn.commit()
conn.close()
except sqlite3.Error as e:
logger.warning(f"Failed to store vacuum report: {e}")
# -- Helpers -------------------------------------------------------------
@staticmethod
def _is_pid_alive(pid: int) -> bool:
"""Check if a process ID is still running."""
try:
os.kill(pid, 0)
return True
except (OSError, ProcessLookupError):
return False
@staticmethod
def _extract_track(task_id: str) -> str:
"""Extract track letter from task ID (e.g., 'H' from 'H.13.10')."""
if task_id and task_id[0].isalpha():
return task_id[0]
return "?"
@staticmethod
def _extract_track_from_filename(filename: str) -> str:
"""Extract track letter from TRACK filename."""
match = re.match(r"TRACK-([A-Z])-", filename)
return match.group(1) if match else "?"
@staticmethod
def _detect_session_id() -> Optional[str]:
"""Try to detect current session ID."""
# Check environment
sid = os.environ.get("CODITECT_SESSION_ID")
if sid:
return sid
# Fallback: construct from PID
return f"claude-{os.getpid()}"
# -- Formatting ----------------------------------------------------------
def format_report(self, report: VacuumReport, compact: bool = False) -> str:
"""Format a vacuum report for terminal display."""
lines = []
if compact:
# One-liner for /orient
n = report.summary.get("total_orphans", 0)
if n == 0:
return "Orphaned Tasks: 0"
details = []
dc = report.summary.get("dead_claims", 0)
sc = report.summary.get("stale_claims", report.summary.get("stale_in_progress", 0))
cc = report.summary.get("conflicts", report.summary.get("conflict_abandoned", 0))
if dc:
details.append(f"{dc} dead claim{'s' if dc != 1 else ''}")
if sc:
details.append(f"{sc} stale")
if cc:
details.append(f"{cc} conflict{'s' if cc != 1 else ''}")
return f"Orphaned Tasks: {n} ({', '.join(details)})"
# Full report
lines.append(f"Context Vacuum Report — {report.created_at[:10]}")
lines.append("=" * 55)
lines.append(f"Sweep: {report.sweep_type} | Duration: {report.duration_ms}ms")
lines.append("")
if report.orphans:
lines.append("ORPHANED TASKS (action required):")
for o in report.orphans:
age = self._format_age(o.last_activity)
type_tag = {
"dead_claim": "DEAD_CLAIM",
"stale_in_progress": f"STALE {age}",
"conflict_abandoned": "CONFLICT",
}.get(o.orphan_type, o.orphan_type.upper())
desc = o.description[:40] if o.description else ""
session = f" ({o.last_session_id})" if o.last_session_id else ""
lines.append(f" [{type_tag:18s}] {o.task_id:14s} {desc:40s} Last: {o.last_activity or 'unknown'}{session}")
else:
lines.append("ORPHANED TASKS: None found")
if report.parked_tracks:
lines.append("")
lines.append("PARKED PROJECTS (no activity >30 days):")
for p in report.parked_tracks:
lines.append(
f" Track {p['track']:2s} {p['unchecked']:4d} tasks, "
f"{p['completion_pct']:3d}% done, last activity: {p['last_activity']}"
)
if report.active_tasks:
lines.append("")
lines.append(f"RECENTLY ACTIVE: {len(report.active_tasks)} tasks (no action needed)")
lines.append("")
total_unc = report.summary.get("total_unchecked", "?")
lines.append(
f"Summary: {len(report.orphans)} orphan{'s' if len(report.orphans) != 1 else ''}, "
f"{len(report.parked_tracks)} parked track{'s' if len(report.parked_tracks) != 1 else ''}, "
f"{total_unc} total unchecked"
)
lines.append(f"Run /vacuum --adopt <task_id> to claim an orphan")
return "\n".join(lines)
@staticmethod
def _format_age(timestamp: Optional[str]) -> str:
"""Format a timestamp as relative age (e.g., '14d')."""
if not timestamp:
return "?"
try:
dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
delta = datetime.now(timezone.utc) - dt
if delta.days > 0:
return f"{delta.days}d"
hours = delta.seconds // 3600
if hours > 0:
return f"{hours}h"
return f"{delta.seconds // 60}m"
except (ValueError, TypeError):
return "?"
# -- Next-Task Integration (Phase 4) -------------------------------------
def get_orphan_priority_list(self) -> List[Dict[str, Any]]:
"""
Get orphaned tasks prioritized for /next-task.
Priority: dead_claim > conflict_abandoned > stale_in_progress
"""
report = self.quick_sweep()
if not report.orphans:
return []
priority_order = {
"dead_claim": 0,
"conflict_abandoned": 1,
"stale_in_progress": 2,
}
sorted_orphans = sorted(
report.orphans,
key=lambda o: priority_order.get(o.orphan_type, 99),
)
return [
{
"task_id": o.task_id,
"orphan_type": o.orphan_type,
"track": o.track,
"last_activity": o.last_activity,
"priority_reason": f"Orphan: {o.orphan_type}",
}
for o in sorted_orphans
]
# -- Session End Warning (Phase 5) ---------------------------------------
def check_uncompleted_tasks(self, session_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Check if the current session has uncompleted claimed tasks.
For /session-end integration — warns before ending session.
"""
sid = session_id or self._detect_session_id()
uncompleted = []
if not self._db_path.exists():
return uncompleted
try:
conn = self._get_conn()
rows = conn.execute(
"SELECT task_id, claimed_at FROM task_claims WHERE session_id = ?",
(sid,),
).fetchall()
conn.close()
for row in rows:
uncompleted.append({
"task_id": row["task_id"],
"claimed_at": row["claimed_at"],
"session_id": sid,
})
except sqlite3.Error:
pass
return uncompleted
---------------------------------------------------------------------------
CLI
---------------------------------------------------------------------------
def main(): parser = argparse.ArgumentParser( description="CODITECT Context Vacuum — Orphan Task Detection (ADR-178)", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--quick", action="store_true", help="Quick sweep (<2s, messaging.db only)") parser.add_argument("--deep", action="store_true", help="Deep sweep (full history + git)") parser.add_argument("--adopt", metavar="TASK_ID", help="Adopt an orphaned task") parser.add_argument("--defer", metavar="TASK_ID", help="Defer an orphaned task") parser.add_argument("--cancel", metavar="TASK_ID", help="Cancel an orphaned task") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--compact", action="store_true", help="Compact one-line output") parser.add_argument( "--core-dir", metavar="PATH", help="Path to coditect-core directory" )
args = parser.parse_args()
logging.basicConfig(
level=logging.WARNING,
format="%(levelname)s: %(message)s",
)
vacuum = ContextVacuum(
coditect_core_dir=Path(args.core_dir) if args.core_dir else None,
)
# Actions
if args.adopt:
ok = vacuum.adopt_task(args.adopt)
print(f"{'Adopted' if ok else 'Failed to adopt'}: {args.adopt}")
return 0 if ok else 1
if args.defer:
ok = vacuum.defer_task(args.defer)
print(f"Deferred: {args.defer}")
return 0
if args.cancel:
confirm = input(f"Cancel task {args.cancel}? This marks it as no longer needed. [y/N] ")
if confirm.lower() != "y":
print("Cancelled.")
return 0
ok = vacuum.cancel_task(args.cancel)
print(f"Cancelled: {args.cancel}")
return 0
# Sweeps
if args.quick:
report = vacuum.quick_sweep()
elif args.deep:
report = vacuum.deep_sweep()
else:
report = vacuum.standard_sweep()
if args.json:
output = {
"sweep_type": report.sweep_type,
"created_at": report.created_at,
"duration_ms": report.duration_ms,
"orphans": [asdict(o) for o in report.orphans],
"parked_tracks": report.parked_tracks,
"summary": report.summary,
}
print(json.dumps(output, indent=2))
else:
print(vacuum.format_report(report, compact=args.compact))
return 0
if name == "main": sys.exit(main())