#!/usr/bin/env python3 """ Update Project Plan Script - v2.0 with Auto-Discovery
Comprehensive script for updating PILOT project plans with intelligent auto-discovery of session work from multiple data sources:
- task_tracking table - Direct task IDs with status
- task_messages table - Task ↔ message correlation
- TodoWrite state - Current in-memory todos
- Git status/diff - Files changed in session
- Context messages - Task ID pattern matching
- Session logs - Recent activity
Usage: # Auto-discover and update (RECOMMENDED) python3 update-project-plan.py --auto-discover --commit --session-log
# Manual task specification
python3 update-project-plan.py --tasks F.2.1 F.2.2 --commit
Features: - Intelligent work discovery from context database - Git change correlation with tasks - Automatic track percentage calculation - Session log integration - Document classification - Git commit workflow """
import argparse import glob import json import os import re import sqlite3 import subprocess import sys from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Set, Tuple, Any
Task ID patterns
TASK_ID_PATTERNS = [ re.compile(r'[([A-N].\d+(?:.\d+))]'), # [A.1.1] re.compile(r'([A-N].\d+(?:.\d+)):'), # A.1.1: re.compile(r'([A-N].\d+(?:.\d+))\s'), # A.1.1 (space after) re.compile(r'^\s-\s*[[x ]]\s*([A-N].\d+(?:.\d+)*)'), # - [x] A.1.1 ]
ADR-114 & ADR-118: Context database paths
task_tracking and task_messages tables are in sessions.db (Tier 3)
SCRIPT_DIR = Path(file).resolve().parent sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import SESSIONS_DB, USER_DATA_DIR CONTEXT_DB_PATH = str(SESSIONS_DB) UNIFIED_MESSAGES_PATH = str(USER_DATA_DIR / "context-storage" / "unified_messages.jsonl") except ImportError: _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" CONTEXT_DB_PATH = str(_user_data / "sessions.db") UNIFIED_MESSAGES_PATH = str(_user_data / "unified_messages.jsonl")
class SessionWorkDiscovery: """Discovers session work from multiple data sources."""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.discovered_tasks: Dict[str, Dict[str, Any]] = {}
self.modified_files: List[str] = []
self.git_commits: List[Dict[str, str]] = []
def log(self, message: str):
"""Log verbose messages."""
if self.verbose:
print(f" [discovery] {message}")
def discover_from_task_tracking(self) -> List[Dict[str, Any]]:
"""Query task_tracking table for today's tasks."""
if not os.path.exists(CONTEXT_DB_PATH):
self.log("Context database not found")
return []
try:
conn = sqlite3.connect(CONTEXT_DB_PATH)
conn.row_factory = sqlite3.Row
# Query tasks from today
query = """
SELECT
task_id,
task_description,
status,
started_at,
completed_at,
tool_success_count,
tool_error_count,
outcome
FROM task_tracking
WHERE date(started_at) >= date('now', '-1 day', 'localtime')
OR date(completed_at) >= date('now', '-1 day', 'localtime')
OR date(updated_at) >= date('now', '-1 day', 'localtime')
ORDER BY
CASE status
WHEN 'completed' THEN 1
WHEN 'in_progress' THEN 2
ELSE 3
END,
started_at DESC
"""
results = conn.execute(query).fetchall()
conn.close()
tasks = []
for row in results:
task = dict(row)
task['source'] = 'task_tracking'
tasks.append(task)
self.log(f"Found task: {task['task_id']} ({task['status']})")
return tasks
except Exception as e:
self.log(f"Error querying task_tracking: {e}")
return []
def discover_from_messages(self, hours_back: int = 24) -> List[str]:
"""Find task IDs mentioned in recent messages."""
if not os.path.exists(CONTEXT_DB_PATH):
return []
try:
conn = sqlite3.connect(CONTEXT_DB_PATH)
conn.row_factory = sqlite3.Row
# Query recent messages
cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat()
query = """
SELECT content, tool_use, tool_result
FROM messages
WHERE timestamp >= ?
ORDER BY timestamp DESC
LIMIT 1000
"""
results = conn.execute(query, (cutoff,)).fetchall()
conn.close()
task_ids: Set[str] = set()
for row in results:
content = row['content'] or ''
tool_use = row['tool_use'] or ''
tool_result = row['tool_result'] or ''
# Search all text fields
for text in [content, str(tool_use), str(tool_result)]:
for pattern in TASK_ID_PATTERNS:
matches = pattern.findall(text)
for match in matches:
if self._is_valid_task_id(match):
task_ids.add(match)
self.log(f"Found task mention: {match}")
return list(task_ids)
except Exception as e:
self.log(f"Error querying messages: {e}")
return []
def discover_from_cxq(
self, limit: int = 500, run_cx_first: bool = True
) -> List[Dict[str, Any]]:
"""Query recent context using /cxq to verify work accomplished."""
tasks: List[Dict[str, Any]] = []
cx_script = os.path.expanduser("~/.coditect/scripts/cx.py")
# context-db.py is the actual /cxq implementation
cxq_script = os.path.expanduser("~/.coditect/scripts/context-db.py")
# Run /cx first to process any exports
if run_cx_first and os.path.exists(cx_script):
self.log("Running /cx to process exports first...")
try:
cx_result = subprocess.run(
["python3", cx_script],
capture_output=True,
text=True,
timeout=60
)
if cx_result.returncode == 0:
# Extract new message count if available
if "New messages:" in cx_result.stdout:
self.log("Context extraction complete")
else:
self.log("Context extraction complete (no new)")
else:
self.log(f"cx warning: {cx_result.stderr[:50]}")
except subprocess.TimeoutExpired:
self.log("cx timed out, continuing with cxq")
except Exception as e:
self.log(f"cx error: {e}, continuing with cxq")
if not os.path.exists(cxq_script):
self.log("context-db.py not found, skipping context query")
return tasks
try:
# Run context-db.py --recent N to get recent messages
self.log(f"Querying context: context-db.py --recent {limit}")
result = subprocess.run(
["python3", cxq_script, "--recent", str(limit), "--full"],
capture_output=True,
text=True,
timeout=60
)
if result.returncode != 0:
self.log(f"cxq failed: {result.stderr[:100]}")
return tasks
content = result.stdout
# Parse for task completions
# Look for patterns like:
# "completed F.2.1", "F.2.1 complete", "✅ F.2.1",
# "marked F.2.1 complete", "finished F.2.1"
completion_patterns = [
# "completed F.2.1" or "F.2.1 completed"
re.compile(
r'(?:completed|finished|done)\s+([A-N]\.\d+(?:\.\d+)*)',
re.IGNORECASE
),
re.compile(
r'([A-N]\.\d+(?:\.\d+)*)\s+(?:completed?|finished|done)',
re.IGNORECASE
),
# "✅ F.2.1" or "F.2.1 ✅"
re.compile(r'✅\s*([A-N]\.\d+(?:\.\d+)*)'),
re.compile(r'([A-N]\.\d+(?:\.\d+)*)\s*✅'),
# "marked F.2.1 complete"
re.compile(
r'marked?\s+([A-N]\.\d+(?:\.\d+)*)\s+complete',
re.IGNORECASE
),
# "[x] F.2.1"
re.compile(r'\[x\]\s*([A-N]\.\d+(?:\.\d+)*)'),
# "F.2.1 COMPLETE" or "F.2.1: COMPLETE"
re.compile(
r'([A-N]\.\d+(?:\.\d+)*)[:\s]+COMPLETE',
re.IGNORECASE
),
]
found_completed: Set[str] = set()
for pattern in completion_patterns:
matches = pattern.findall(content)
for match in matches:
if self._is_valid_task_id(match):
found_completed.add(match)
for task_id in found_completed:
tasks.append({
'task_id': task_id,
'status': 'completed',
'source': 'cxq_context',
'verified': True
})
self.log(f"cxq verified completed: {task_id}")
# Also look for WIP mentions
wip_patterns = [
re.compile(
r'working on\s+([A-N]\.\d+(?:\.\d+)*)',
re.IGNORECASE
),
re.compile(
r'([A-N]\.\d+(?:\.\d+)*)\s+in progress',
re.IGNORECASE
),
re.compile(
r'started\s+([A-N]\.\d+(?:\.\d+)*)',
re.IGNORECASE
),
]
found_wip: Set[str] = set()
for pattern in wip_patterns:
matches = pattern.findall(content)
for match in matches:
if (self._is_valid_task_id(match) and
match not in found_completed):
found_wip.add(match)
for task_id in found_wip:
tasks.append({
'task_id': task_id,
'status': 'in_progress',
'source': 'cxq_context'
})
self.log(f"cxq found WIP: {task_id}")
except subprocess.TimeoutExpired:
self.log("cxq timed out")
except Exception as e:
self.log(f"Error running cxq: {e}")
return tasks
def discover_from_git(self, repo_path: str = ".") -> Tuple[List[str], List[Dict[str, str]]]:
"""Get git status and recent commits."""
modified_files = []
commits = []
try:
# Get modified files
result = subprocess.run(
["git", "status", "--porcelain"],
capture_output=True,
text=True,
cwd=repo_path
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if line.strip():
# Parse git status line: "M path/to/file"
status = line[:2].strip()
filepath = line[3:].strip()
if status in ('M', 'A', 'MM', 'AM', '??'):
modified_files.append(filepath)
self.log(f"Modified file: {filepath}")
# Get today's commits
result = subprocess.run(
["git", "log", "--since=midnight", "--format=%H|%s|%an", "--all"],
capture_output=True,
text=True,
cwd=repo_path
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if line.strip() and '|' in line:
parts = line.split('|', 2)
if len(parts) >= 2:
commits.append({
'sha': parts[0][:8],
'message': parts[1],
'author': parts[2] if len(parts) > 2 else ''
})
self.log(f"Commit: {parts[0][:8]} - {parts[1][:50]}")
except Exception as e:
self.log(f"Error querying git: {e}")
self.modified_files = modified_files
self.git_commits = commits
return modified_files, commits
def discover_from_todowrite(self, todos_json: str = None) -> List[Dict[str, Any]]:
"""Parse current TodoWrite state."""
tasks = []
# Try to read from environment or passed JSON
todos_data = todos_json or os.environ.get('CLAUDE_TODOS', '')
if not todos_data:
# Try to find recent todo file
todo_patterns = [
os.path.expanduser("~/.claude/todos/*.json"),
".claude/todos/*.json"
]
for pattern in todo_patterns:
matches = sorted(glob.glob(pattern), key=os.path.getmtime, reverse=True)
if matches:
try:
with open(matches[0]) as f:
todos_data = f.read()
self.log(f"Found todos file: {matches[0]}")
break
except:
pass
if todos_data:
try:
todos = json.loads(todos_data)
for todo in todos:
content = todo.get('content', '')
status = todo.get('status', 'unknown')
# Extract task ID
for pattern in TASK_ID_PATTERNS:
match = pattern.search(content)
if match:
task_id = match.group(1)
tasks.append({
'task_id': task_id,
'task_description': content,
'status': status,
'source': 'todowrite'
})
self.log(f"TodoWrite task: {task_id} ({status})")
break
except json.JSONDecodeError:
self.log("Failed to parse todos JSON")
return tasks
def discover_from_session_log(self, session_log_dir: str = "docs/session-logs") -> List[Dict[str, Any]]:
"""Find task IDs mentioned in today's session log with completion status."""
tasks: List[Dict[str, Any]] = []
today = datetime.now().strftime("%Y-%m-%d")
log_path = Path(session_log_dir) / f"SESSION-LOG-{today}.md"
if log_path.exists():
try:
content = log_path.read_text()
lines = content.split('\n')
# Pattern for completed task in table: | F.2.1 | description | ✅ |
table_completed_pattern = re.compile(r'\|\s*([A-N]\.\d+(?:\.\d+)*)\s*\|[^|]*\|\s*✅\s*\|')
# Pattern for completed task in list: - [x] F.2.1 or F.2.1: ... ✅
completed_patterns = [
re.compile(r'^\s*-\s*\[x\]\s*([A-N]\.\d+(?:\.\d+)*)'), # - [x] F.2.1
re.compile(r'([A-N]\.\d+(?:\.\d+)*)[^|✅]*✅'), # F.2.1 ... ✅
re.compile(r'\|\s*([A-N]\.\d+(?:\.\d+)*)\s*\|.*\|\s*100%\s*✅?\s*\|'), # | F.2.1 | ... | 100% ✅ |
]
# Pattern for "Tasks Completed:" section
in_completed_section = False
found_completed: Set[str] = set()
for i, line in enumerate(lines):
# Check for completed section header
if re.search(r'\*\*Tasks Completed\*\*|\*\*Completed\*\*|Tasks Completed:', line, re.IGNORECASE):
in_completed_section = True
continue
# Exit completed section on next header
if in_completed_section and line.startswith('##'):
in_completed_section = False
# Check table pattern
match = table_completed_pattern.search(line)
if match:
task_id = match.group(1)
if self._is_valid_task_id(task_id) and task_id not in found_completed:
found_completed.add(task_id)
self.log(f"Session log completed task (table): {task_id}")
continue
# Check completed patterns
for pattern in completed_patterns:
match = pattern.search(line)
if match:
task_id = match.group(1)
if self._is_valid_task_id(task_id) and task_id not in found_completed:
found_completed.add(task_id)
self.log(f"Session log completed task: {task_id}")
break
# In completed section, any task ID is completed
if in_completed_section:
for pattern in TASK_ID_PATTERNS:
match = pattern.search(line)
if match:
task_id = match.group(1)
if self._is_valid_task_id(task_id) and task_id not in found_completed:
found_completed.add(task_id)
self.log(f"Session log completed task (section): {task_id}")
break
# Create task entries for all found completed tasks
for task_id in found_completed:
tasks.append({
'task_id': task_id,
'status': 'completed',
'source': 'session_log',
'completion_date': today
})
except Exception as e:
self.log(f"Error reading session log: {e}")
return tasks
def correlate_files_with_tasks(self, pilot_plan_path: Path) -> Dict[str, List[str]]:
"""Correlate modified files with tasks in PILOT plan."""
file_to_tasks: Dict[str, List[str]] = {}
if not pilot_plan_path.exists():
return file_to_tasks
try:
content = pilot_plan_path.read_text()
for filepath in self.modified_files:
# Find tasks that mention this file
# Pattern: `path/to/file` or path/to/file in task description
escaped_path = re.escape(filepath)
pattern = rf"- \[[x ]\] ([A-N]\.\d+(?:\.\d+)*)[^\n]*{escaped_path}"
matches = re.findall(pattern, content)
if matches:
file_to_tasks[filepath] = matches
self.log(f"File {filepath} relates to tasks: {matches}")
except Exception as e:
self.log(f"Error correlating files: {e}")
return file_to_tasks
def discover_all(
self,
repo_path: str = ".",
session_log_dir: str = "docs/session-logs",
cxq_limit: int = 500
) -> Dict[str, Dict[str, Any]]:
"""Run all discovery methods and merge results."""
self.log("Starting auto-discovery...")
# 1. Task tracking table (highest priority - database truth)
for task in self.discover_from_task_tracking():
task_id = task['task_id']
if task_id not in self.discovered_tasks:
self.discovered_tasks[task_id] = task
else:
self.discovered_tasks[task_id].update(task)
# 2. CXQ context query (verified from recent session context)
self.log(f"Querying cxq --recent {cxq_limit} for verification...")
for task in self.discover_from_cxq(limit=cxq_limit):
task_id = task['task_id']
if task_id not in self.discovered_tasks:
self.discovered_tasks[task_id] = task
elif task.get('verified') and task.get('status') == 'completed':
# cxq verified completion is authoritative
self.discovered_tasks[task_id]['status'] = 'completed'
self.discovered_tasks[task_id]['verified'] = True
self.discovered_tasks[task_id]['source'] = 'cxq_context'
# 3. TodoWrite state
for task in self.discover_from_todowrite():
task_id = task['task_id']
if task_id not in self.discovered_tasks:
self.discovered_tasks[task_id] = task
elif task['status'] == 'completed':
# TodoWrite completion is authoritative
self.discovered_tasks[task_id]['status'] = 'completed'
# 4. Recent messages (direct DB query)
for task_id in self.discover_from_messages():
if task_id not in self.discovered_tasks:
self.discovered_tasks[task_id] = {
'task_id': task_id,
'status': 'mentioned',
'source': 'messages'
}
# 5. Session log (parsed from today's log file)
for task in self.discover_from_session_log(session_log_dir):
task_id = task['task_id']
if task_id not in self.discovered_tasks:
self.discovered_tasks[task_id] = task
elif task.get('status') == 'completed':
# Session log completion is authoritative
self.discovered_tasks[task_id]['status'] = 'completed'
self.discovered_tasks[task_id]['source'] = 'session_log'
# 6. Git changes (modified files and commits)
self.discover_from_git(repo_path)
self.log(
f"Discovered {len(self.discovered_tasks)} tasks, "
f"{len(self.modified_files)} modified files"
)
return self.discovered_tasks
def _is_valid_task_id(self, task_id: str) -> bool:
"""Validate task ID format."""
# Must match pattern: A.1 or A.1.1 or A.1.1.1
pattern = re.compile(r'^[A-N]\.\d+(?:\.\d+)*$')
return bool(pattern.match(task_id))
def get_completed_tasks(self) -> List[str]:
"""Get list of completed task IDs."""
return [
task_id for task_id, data in self.discovered_tasks.items()
if data.get('status') == 'completed'
]
def get_in_progress_tasks(self) -> List[str]:
"""Get list of in-progress task IDs."""
return [
task_id for task_id, data in self.discovered_tasks.items()
if data.get('status') == 'in_progress'
]
def get_summary(self) -> Dict[str, Any]:
"""Get discovery summary."""
return {
'total_tasks': len(self.discovered_tasks),
'completed': len(self.get_completed_tasks()),
'in_progress': len(self.get_in_progress_tasks()),
'modified_files': len(self.modified_files),
'git_commits': len(self.git_commits),
'tasks': self.discovered_tasks,
'files': self.modified_files,
'commits': self.git_commits
}
def verify_against_plan(self, plan_path: Path) -> Dict[str, Dict[str, Any]]:
"""Verify discovered tasks against actual PILOT plan status."""
if not plan_path.exists():
return self.discovered_tasks
try:
content = plan_path.read_text()
for task_id in self.discovered_tasks:
task_data = self.discovered_tasks[task_id]
# Check if task exists and its status in the plan
# Pattern for completed: - [x] F.2.1...
completed_pattern = rf'- \[x\] {re.escape(task_id)}[:\s]'
# Pattern for pending: - [ ] F.2.1...
pending_pattern = rf'- \[ \] {re.escape(task_id)}[:\s]'
if re.search(completed_pattern, content):
task_data['plan_status'] = 'completed'
task_data['in_plan'] = True
self.log(f"Plan verified: {task_id} is COMPLETE")
elif re.search(pending_pattern, content):
task_data['plan_status'] = 'pending'
task_data['in_plan'] = True
self.log(f"Plan verified: {task_id} is PENDING")
else:
# Try without colon/space requirement
if re.search(rf'- \[x\] {re.escape(task_id)}', content):
task_data['plan_status'] = 'completed'
task_data['in_plan'] = True
elif re.search(rf'- \[ \] {re.escape(task_id)}', content):
task_data['plan_status'] = 'pending'
task_data['in_plan'] = True
else:
task_data['plan_status'] = 'not_found'
task_data['in_plan'] = False
self.log(f"Plan verified: {task_id} NOT IN PLAN")
# Determine action needed
if task_data.get('in_plan'):
if task_data['plan_status'] == 'completed':
task_data['action'] = 'none' # Already done
elif task_data.get('status') == 'completed':
task_data['action'] = 'mark_complete' # Needs marking
else:
task_data['action'] = 'wip' # Work in progress
else:
task_data['action'] = 'not_in_plan'
except Exception as e:
self.log(f"Error verifying against plan: {e}")
return self.discovered_tasks
def get_tasks_needing_update(self) -> List[str]:
"""Get tasks that need to be marked complete in plan."""
return [
task_id for task_id, data in self.discovered_tasks.items()
if data.get('action') == 'mark_complete'
]
def get_wip_tasks(self) -> List[str]:
"""Get tasks that are work-in-progress."""
return [
task_id for task_id, data in self.discovered_tasks.items()
if data.get('action') == 'wip' or
(data.get('in_plan') and data.get('plan_status') == 'pending')
]
def get_already_complete_tasks(self) -> List[str]:
"""Get tasks already marked complete in plan."""
return [
task_id for task_id, data in self.discovered_tasks.items()
if data.get('plan_status') == 'completed'
]
class ProjectPlanUpdater: """Updates PILOT project plans with task completion and progress tracking."""
def __init__(self, dry_run: bool = False, verbose: bool = False):
self.dry_run = dry_run
self.verbose = verbose
self.plan_path: Optional[Path] = None
self.changes_made: List[str] = []
self.files_modified: List[Path] = []
self.discovery = SessionWorkDiscovery(verbose=verbose)
def log(self, message: str, level: str = "info"):
"""Log a message."""
prefix = {"info": "ℹ", "success": "✓", "error": "✗", "warn": "⚠", "discovery": "🔍"}
print(f" {prefix.get(level, '•')} {message}")
def find_pilot_plan(self, start_dir: str = ".") -> Optional[Path]:
"""Find the PILOT plan file."""
patterns = [
# ADR-213: coditect-documentation is the primary location
"../docs/coditect-documentation/coditect-core/project/plans/PILOT-PARALLEL-EXECUTION-PLAN.md",
# Legacy locations (fallback)
"internal/project/plans/PILOT-PARALLEL-EXECUTION-PLAN.md",
"internal/project/plans/PILOT*.md",
"**/PILOT-PARALLEL-EXECUTION-PLAN.md",
"**/PILOT*.md"
]
for pattern in patterns:
matches = glob.glob(os.path.join(start_dir, pattern), recursive=True)
if matches:
self.plan_path = Path(matches[0])
return self.plan_path
return None
def auto_discover_work(
self,
repo_path: str = ".",
session_log_dir: str = "docs/session-logs",
cxq_limit: int = 500
) -> Dict[str, Any]:
"""Auto-discover session work and verify against PILOT plan."""
self.log("Auto-discovering session work...", "discovery")
# Run discovery with cxq limit
self.discovery.discover_all(
repo_path, session_log_dir, cxq_limit=cxq_limit
)
# Correlate with PILOT plan
if self.plan_path:
self.discovery.correlate_files_with_tasks(self.plan_path)
# Verify status against actual plan
self.log("Verifying against PILOT plan...", "discovery")
self.discovery.verify_against_plan(self.plan_path)
summary = self.discovery.get_summary()
# Calculate verification stats
needs_update = self.discovery.get_tasks_needing_update()
already_done = self.discovery.get_already_complete_tasks()
wip_tasks = self.discovery.get_wip_tasks()
# Print discovery results
print("\n" + "=" * 50)
print(" DISCOVERY & VERIFICATION RESULTS")
print("=" * 50)
print(f"\n Tasks Found: {summary['total_tasks']}")
print(f" Modified Files: {summary['modified_files']}")
print(f" Git Commits: {summary['git_commits']}")
print("\n Verification Status:")
print(f" ✅ Already complete in plan: {len(already_done)}")
print(f" 🔄 Need to mark complete: {len(needs_update)}")
print(f" 📝 Work in progress (WIP): {len(wip_tasks)}")
if already_done:
print("\n Already Complete (no action needed):")
for task_id in sorted(already_done):
print(f" ✅ {task_id}")
if needs_update:
print("\n Need to Mark Complete:")
for task_id in sorted(needs_update):
data = self.discovery.discovered_tasks.get(task_id, {})
src = data.get('source', 'unknown')
print(f" 🔄 {task_id} (source: {src})")
if wip_tasks:
print("\n Work in Progress:")
for task_id in sorted(wip_tasks):
print(f" 📝 {task_id}")
# Show tasks not in plan
not_in_plan = [
tid for tid, d in self.discovery.discovered_tasks.items()
if d.get('action') == 'not_in_plan'
]
if not_in_plan:
print("\n Not Found in Plan (may be section headers):")
for task_id in sorted(not_in_plan):
print(f" ⚠ {task_id}")
print("=" * 50 + "\n")
# Add needs_update to summary for use by caller
summary['needs_update'] = needs_update
summary['already_done'] = already_done
summary['wip_tasks'] = wip_tasks
return summary
def mark_tasks_complete(self, tasks: List[str], details: Dict[str, str] = None) -> bool:
"""Mark tasks as complete in the PILOT plan."""
if not self.plan_path or not self.plan_path.exists():
self.log("PILOT plan not found", "error")
return False
content = self.plan_path.read_text()
original_content = content
date_str = datetime.now().strftime("%b %d, %Y")
details = details or {}
for task_id in tasks:
# Pattern: - [ ] F.2.1: Description or - [ ] F.2.1 Description
pattern = rf"(- \[ \] {re.escape(task_id)}[:\s]+)([^\n]*)"
match = re.search(pattern, content)
if match:
description = match.group(2).strip()
detail_text = details.get(task_id, "")
# Build replacement
replacement = f"- [x] {task_id}: {description} ✅ ({date_str})"
if detail_text:
replacement += f"\n - **Details:** {detail_text}"
content = re.sub(pattern, replacement, content, count=1)
self.changes_made.append(f"Marked {task_id} complete")
self.log(f"Marked {task_id} complete", "success")
else:
# Try simpler patterns
patterns_to_try = [
rf"(- \[ \] {re.escape(task_id)})",
rf"(- \[ \] \[?{re.escape(task_id)}\]?)",
]
found = False
for pat in patterns_to_try:
if re.search(pat, content):
content = re.sub(
pat,
f"- [x] {task_id} ✅ ({date_str})",
content,
count=1
)
self.changes_made.append(f"Marked {task_id} complete")
self.log(f"Marked {task_id} complete", "success")
found = True
break
if not found:
self.log(f"Task {task_id} not found in plan", "warn")
if content != original_content:
if not self.dry_run:
self.plan_path.write_text(content)
self.files_modified.append(self.plan_path)
return True
return False
def calculate_track_percentage(self, track: str) -> Tuple[int, int, int]:
"""Calculate track completion percentage from checkbox counts."""
if not self.plan_path or not self.plan_path.exists():
return (0, 0, 0)
content = self.plan_path.read_text()
# Find track section
track_headers = {
"A": r"## Track A:",
"B": r"## Track B:",
"C": r"## Track C:",
"D": r"## Track D:",
"E": r"## Track E:",
"F": r"## Track F:",
"G": r"## Track G:",
}
header_pattern = track_headers.get(track.upper())
if not header_pattern:
return (0, 0, 0)
# Find section start
match = re.search(header_pattern, content)
if not match:
return (0, 0, 0)
section_start = match.start()
# Find next track section
next_track_pattern = r"## Track [A-N]:"
next_match = re.search(next_track_pattern, content[section_start + 10:])
section_end = section_start + 10 + next_match.start() if next_match else len(content)
section = content[section_start:section_end]
# Count checkboxes
completed = len(re.findall(r'- \[x\]', section))
incomplete = len(re.findall(r'- \[ \]', section))
total = completed + incomplete
if total == 0:
return (0, 0, 0)
percentage = int((completed / total) * 100)
return (percentage, completed, total)
def update_track_percentage(
self,
track: str,
new_percent: int = None,
allow_decrease: bool = False
) -> bool:
"""Update track percentage in status table.
Args:
track: Track letter (A-N)
new_percent: New percentage (auto-calculated if None)
allow_decrease: If False, won't decrease percentage (prevents regression)
"""
if not self.plan_path or not self.plan_path.exists():
return False
content = self.plan_path.read_text()
# Calculate if not provided
if new_percent is None:
new_percent, completed, total = self.calculate_track_percentage(track)
if total == 0:
self.log(f"Could not calculate percentage for track {track}", "warn")
return False
# Track names in status table
track_names = {
"A": "Backend|License Server",
"B": "Frontend",
"C": "DevOps|Production Deployment",
"D": "Security",
"E": "Testing|Integration Testing",
"F": "Documentation",
"G": "DMS"
}
track_pattern = track_names.get(track.upper(), track)
pattern = rf"\| ({track_pattern}[^|]*)\| (\d+)% \| (\d+)% \|"
match = re.search(pattern, content)
if match:
old_percent = int(match.group(2))
# Don't decrease percentage unless explicitly allowed
# This prevents auto-calculation from overriding manually-set progress
if new_percent < old_percent and not allow_decrease:
self.log(
f"Track {track}: Keeping {old_percent}% "
f"(calculated {new_percent}% but not decreasing)",
"info"
)
return False
# Don't update if same
if new_percent == old_percent:
self.log(f"Track {track}: Already at {old_percent}%", "info")
return False
remaining = 100 - new_percent
replacement = f"| {match.group(1)}| {new_percent}% | {remaining}% |"
content = re.sub(pattern, replacement, content, count=1)
if not self.dry_run:
self.plan_path.write_text(content)
self.changes_made.append(f"Track {track}: {old_percent}% → {new_percent}%")
self.log(f"Track {track}: {old_percent}% → {new_percent}%", "success")
return True
else:
self.log(f"Track {track} not found in status table", "warn")
return False
def add_progress_update(self, update_text: str) -> bool:
"""Add a progress update entry to the plan."""
if not self.plan_path or not self.plan_path.exists():
return False
content = self.plan_path.read_text()
date_str = datetime.now().strftime("%b %d")
# Find existing progress updates section
pattern = r"(\*\*Progress Update \([^)]+\):\*\*)"
match = re.search(pattern, content)
if match:
# Insert new update before the first existing one
new_update = f"**Progress Update ({date_str}):** {update_text}\n\n"
content = content[:match.start()] + new_update + content[match.start():]
if not self.dry_run:
self.plan_path.write_text(content)
self.changes_made.append("Added progress update entry")
self.log("Added progress update entry", "success")
return True
return False
def increment_version(self) -> Tuple[str, str]:
"""Increment document version number."""
if not self.plan_path or not self.plan_path.exists():
return ("", "")
content = self.plan_path.read_text()
# Pattern: **Document Version:** 1.9.3
pattern = r"\*\*Document Version:\*\* (\d+)\.(\d+)\.(\d+)"
match = re.search(pattern, content)
if match:
major, minor, patch = int(match.group(1)), int(match.group(2)), int(match.group(3))
old_version = f"{major}.{minor}.{patch}"
new_version = f"{major}.{minor}.{patch + 1}"
content = re.sub(pattern, f"**Document Version:** {new_version}", content)
# Also update the Updated date
today = datetime.now().strftime("%B %d, %Y")
content = re.sub(
r"\*\*Updated:\*\* [^\n]+",
f"**Updated:** {today}",
content
)
# Update status header date
today_short = datetime.now().strftime("%b %d, %Y")
content = re.sub(
r"### Current Status \([^)]+\)",
f"### Current Status ({today_short})",
content
)
if not self.dry_run:
self.plan_path.write_text(content)
self.changes_made.append(f"Version: {old_version} → {new_version}")
self.log(f"Version: {old_version} → {new_version}", "success")
return (old_version, new_version)
return ("", "")
def update_session_log(self, tasks: List[str], summary: str = "", session_log_dir: str = "docs/session-logs") -> bool:
"""Update today's session log."""
today = datetime.now().strftime("%Y-%m-%d")
session_log_patterns = [
f"{session_log_dir}/SESSION-LOG-{today}.md",
f"**/SESSION-LOG-{today}.md"
]
session_log_path = None
for pattern in session_log_patterns:
matches = glob.glob(pattern, recursive=True)
if matches:
session_log_path = Path(matches[0])
break
if not session_log_path or not session_log_path.exists():
self.log(f"Session log not found for {today}", "warn")
return False
content = session_log_path.read_text()
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
# Build entry
entry = f"\n### {timestamp} - Auto-Update via project-plan-updater\n\n"
entry += "**Tasks Updated:**\n\n"
entry += "| Task ID | Status |\n"
entry += "|---------|--------|\n"
for task in tasks:
entry += f"| {task} | ✅ |\n"
if summary:
entry += f"\n**Summary:** {summary}\n"
if self.discovery.modified_files:
entry += "\n**Files Modified:**\n"
for f in self.discovery.modified_files[:10]: # Limit to 10
entry += f"- `{f}`\n"
# Append to content
if "---\n" in content and content.rstrip().endswith("---"):
content = content.rstrip()[:-3] + entry + "\n---\n"
else:
content += entry
if not self.dry_run:
session_log_path.write_text(content)
self.files_modified.append(session_log_path)
self.log(f"Updated session log: {session_log_path.name}", "success")
return True
def classify_files(self, files: List[str]) -> bool:
"""Classify files using MoE classifier."""
classifier_script = os.path.expanduser("~/.coditect/scripts/moe_classifier/classify.py")
if not os.path.exists(classifier_script):
self.log("MoE classifier not found", "warn")
return False
for file_path in files:
if not os.path.exists(file_path):
continue
if not self.dry_run:
try:
result = subprocess.run(
["python3", classifier_script, file_path, "--update-frontmatter"],
capture_output=True,
text=True
)
if result.returncode == 0:
self.log(f"Classified: {file_path}", "success")
else:
self.log(f"Classification failed: {file_path}", "warn")
except Exception as e:
self.log(f"Classification error: {e}", "error")
else:
self.log(f"Would classify: {file_path}", "info")
return True
def git_commit(self, message: str, push: bool = False) -> bool:
"""Create git commit with modified files."""
if not self.files_modified:
self.log("No files to commit", "info")
return False
if self.dry_run:
self.log(f"Would commit: {message}", "info")
return True
try:
# Stage files
for f in self.files_modified:
subprocess.run(["git", "add", str(f)], check=True, capture_output=True)
# Commit
commit_msg = f"{message}\n\nCo-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>"
result = subprocess.run(
["git", "commit", "-m", commit_msg],
capture_output=True,
text=True
)
if result.returncode == 0:
# Get commit hash
hash_result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
capture_output=True,
text=True
)
commit_hash = hash_result.stdout.strip()
self.log(f"Created commit: {commit_hash}", "success")
if push:
push_result = subprocess.run(
["git", "push"],
capture_output=True,
text=True
)
if push_result.returncode == 0:
self.log("Pushed to remote", "success")
else:
self.log(f"Push failed: {push_result.stderr}", "error")
return True
else:
if "nothing to commit" in result.stdout:
self.log("No changes to commit", "info")
else:
self.log(f"Commit failed: {result.stderr}", "error")
return False
except Exception as e:
self.log(f"Git error: {e}", "error")
return False
def print_summary(self):
"""Print summary of changes made."""
print("\n" + "=" * 50)
print(" PROJECT PLAN UPDATE SUMMARY")
print("=" * 50)
if self.plan_path:
print(f"\n Plan: {self.plan_path}")
if self.changes_made:
print("\n Changes:")
for change in self.changes_made:
print(f" • {change}")
else:
print("\n No changes made")
if self.files_modified:
print("\n Files Modified:")
for f in self.files_modified:
print(f" • {f}")
print("\n" + "=" * 50)
def main(): parser = argparse.ArgumentParser( description="Update PILOT project plan with intelligent auto-discovery" )
# Discovery options
parser.add_argument("--auto-discover", "-a", action="store_true",
help="Auto-discover session work from all sources")
parser.add_argument("--discover-only", action="store_true",
help="Only run discovery, don't update plan")
parser.add_argument("--cxq-limit", type=int, default=1000,
help="Number of recent messages to query (default: 1000)")
parser.add_argument("--skip-cx", action="store_true",
help="Skip running /cx before /cxq")
# Manual options
parser.add_argument("--tasks", "-t", nargs="+", help="Task IDs to mark complete")
parser.add_argument("--details", "-d", help="Completion details as JSON")
parser.add_argument("--track", help="Track letter for percentage update")
parser.add_argument("--percent", "-p", type=int, help="New track percentage (auto-calculated if omitted)")
parser.add_argument("--progress", help="Progress update text")
parser.add_argument("--allow-decrease", action="store_true",
help="Allow track percentage to decrease (default: only increase)")
parser.add_argument("--skip-percent-update", action="store_true",
help="Skip automatic track percentage updates")
# Workflow options
parser.add_argument("--commit", "-c", action="store_true", help="Create git commit")
parser.add_argument("--push", action="store_true", help="Push to remote")
parser.add_argument("--session-log", "-s", action="store_true", help="Update session log")
parser.add_argument("--classify", nargs="+", help="Files to classify")
parser.add_argument("--message", "-m", help="Commit message")
# Output options
parser.add_argument("--dry-run", "-n", action="store_true", help="Preview changes")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--json", "-j", action="store_true", help="Output discovery as JSON")
args = parser.parse_args()
print("🔄 Project Plan Updater v2.0")
updater = ProjectPlanUpdater(dry_run=args.dry_run, verbose=args.verbose)
# Find PILOT plan
if not updater.find_pilot_plan():
print(" ✗ PILOT plan not found")
return 1
print(f" ℹ Found: {updater.plan_path}")
# Auto-discovery mode
if args.auto_discover or args.discover_only:
# Configure discovery options
updater.discovery.cxq_limit = args.cxq_limit
updater.discovery.run_cx_first = not args.skip_cx
summary = updater.auto_discover_work(
cxq_limit=args.cxq_limit
)
if args.json:
print(json.dumps(summary, indent=2, default=str))
return 0
if args.discover_only:
return 0
# Use only tasks that NEED to be marked complete
# (verified completed but not yet marked in plan)
needs_update = summary.get('needs_update', [])
if needs_update:
args.tasks = needs_update
print(f"\n ✓ Found {len(needs_update)} tasks to mark complete")
else:
# Check if there are already-complete tasks
already_done = summary.get('already_done', [])
if already_done:
print(f"\n ✓ {len(already_done)} tasks already complete in plan")
else:
print(" ℹ No tasks need updating")
# Auto-calculate track percentages for affected tracks (unless skipped)
if not args.skip_percent_update:
tracks_with_tasks = set()
for task_id in (args.tasks or []):
if '.' in task_id:
track = task_id.split('.')[0]
tracks_with_tasks.add(track)
for track in tracks_with_tasks:
pct, completed, total = updater.calculate_track_percentage(track)
if total > 0:
updater.update_track_percentage(
track, pct, allow_decrease=args.allow_decrease
)
else:
print(" ℹ Skipping track percentage updates (--skip-percent-update)")
# Parse details JSON
details = {}
if args.details:
try:
details = json.loads(args.details)
except json.JSONDecodeError:
print(f" ✗ Invalid JSON in --details")
return 1
# Mark tasks complete
if args.tasks:
print("\n📋 Marking tasks complete")
updater.mark_tasks_complete(args.tasks, details)
# Update track percentage (manual override)
if args.track and args.percent:
print(f"\n📊 Updating track {args.track} percentage")
updater.update_track_percentage(
args.track, args.percent, allow_decrease=args.allow_decrease
)
# Add progress update
if args.progress:
print("\n📝 Adding progress update")
updater.add_progress_update(args.progress)
elif args.tasks and updater.changes_made:
# Auto-generate progress update
task_list = ", ".join(args.tasks[:5])
if len(args.tasks) > 5:
task_list += f" (+{len(args.tasks) - 5} more)"
progress_text = f"Tasks completed: {task_list}. Updated via auto-discovery."
updater.add_progress_update(progress_text)
# Increment version
if updater.changes_made:
print("\n🔢 Incrementing version")
updater.increment_version()
# Update session log
if args.session_log and args.tasks:
print("\n📅 Updating session log")
updater.update_session_log(args.tasks)
# Classify files
if args.classify:
print("\n🏷️ Classifying documents")
updater.classify_files(args.classify)
# Git commit
if args.commit and updater.files_modified:
print("\n🔀 Git workflow")
task_ids = ", ".join(args.tasks[:3]) if args.tasks else "updates"
if args.tasks and len(args.tasks) > 3:
task_ids += f" +{len(args.tasks) - 3}"
message = args.message or f"chore: Update project plan ({task_ids})"
updater.git_commit(message, push=args.push)
# Print summary
updater.print_summary()
return 0
if name == "main": sys.exit(main())