Skip to main content

scripts-sync-project-plan

#!/usr/bin/env python3 """

title: "ANSI color codes for output" component_type: script version: "1.0.0" audience: contributor status: stable summary: "V2 Project Plan Bidirectional Sync Script" keywords: ['api', 'database', 'plan', 'project', 'review'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "sync-project-plan.py" language: python executable: true usage: "python3 scripts/sync-project-plan.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

V2 Project Plan Bidirectional Sync Script

Synchronizes CODITECT V2 project plan between markdown tasklist and SQLite database.

Usage: ./sync-project-plan.py --plan-to-db # Sync markdown → database ./sync-project-plan.py --db-to-plan # Sync database → markdown ./sync-project-plan.py --status # Show sync statistics ./sync-project-plan.py --checkpoint # Create backup before sync ./sync-project-plan.py --watch # Continuous sync mode ./sync-project-plan.py --init # Initialize database schema ./sync-project-plan.py --dry-run --plan-to-db # Preview changes

Author: AZ1.AI INC Version: 1.0.0 """

import argparse import hashlib import json import os import re import shutil import sqlite3 import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple

ANSI color codes for output

Shared Colors module (consolidates 36 duplicate definitions)

from colors import Colors

SCRIPT_DIR = Path(file).parent.absolute() REPO_ROOT = SCRIPT_DIR.parent.parent.parent.parent # Up to rollout-master (scripts -> coditect-core -> core -> submodules -> rollout-master)

ADR-114 & ADR-118: Use centralized path discovery

sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_sessions_db_path, SESSIONS_DB DB_PATH = SESSIONS_DB # Task sync data goes to sessions.db (Tier 3) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "sessions.db" else: DB_PATH = SCRIPT_DIR.parent / "context-storage" / "sessions.db"

MARKDOWN_PATH = REPO_ROOT / "docs/project-management/V2-TASKLIST-WITH-CHECKBOXES.md" JSON_PATH = REPO_ROOT / "docs/project-management/v2-work-items.json" BACKUP_DIR = REPO_ROOT / "docs/project-management/backups"

Database schema

DB_SCHEMA = """ -- V2 Epics CREATE TABLE IF NOT EXISTS v2_epics ( epic_id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'planned', -- planned, active, completed, blocked target_start TEXT, target_end TEXT, task_count INTEGER DEFAULT 0, completed_count INTEGER DEFAULT 0, timeline_weeks INTEGER, alignment TEXT, created_at TEXT DEFAULT (datetime('now', 'utc')), updated_at TEXT DEFAULT (datetime('now', 'utc')) );

-- V2 Features CREATE TABLE IF NOT EXISTS v2_features ( feature_id TEXT PRIMARY KEY, epic_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'planned', -- planned, active, completed, blocked created_at TEXT DEFAULT (datetime('now', 'utc')), updated_at TEXT DEFAULT (datetime('now', 'utc')), FOREIGN KEY (epic_id) REFERENCES v2_epics(epic_id) );

-- V2 Tasks CREATE TABLE IF NOT EXISTS v2_tasks ( task_id TEXT PRIMARY KEY, feature_id TEXT NOT NULL, description TEXT NOT NULL, priority TEXT DEFAULT 'P1', -- P0, P1, P2, P3 status TEXT DEFAULT 'pending', -- pending, in_progress, completed, blocked estimated_hours INTEGER, target_date TEXT, completed_at TEXT, checksum TEXT, created_at TEXT DEFAULT (datetime('now', 'utc')), updated_at TEXT DEFAULT (datetime('now', 'utc')), FOREIGN KEY (feature_id) REFERENCES v2_features(feature_id) );

-- V2 Plan Sync History CREATE TABLE IF NOT EXISTS v2_plan_sync ( sync_id INTEGER PRIMARY KEY AUTOINCREMENT, sync_type TEXT NOT NULL, -- plan-to-db, db-to-plan, checkpoint synced_at TEXT DEFAULT (datetime('now', 'utc')), items_synced INTEGER DEFAULT 0, checksum TEXT, notes TEXT, success INTEGER DEFAULT 1 );

-- Indexes CREATE INDEX IF NOT EXISTS idx_tasks_feature ON v2_tasks(feature_id); CREATE INDEX IF NOT EXISTS idx_tasks_status ON v2_tasks(status); CREATE INDEX IF NOT EXISTS idx_tasks_priority ON v2_tasks(priority); CREATE INDEX IF NOT EXISTS idx_features_epic ON v2_features(epic_id); CREATE INDEX IF NOT EXISTS idx_sync_type ON v2_plan_sync(sync_type, synced_at); """

class ProjectPlanSync: """Bidirectional sync manager for V2 project plan."""

def __init__(self, db_path: Path, markdown_path: Path, json_path: Path, dry_run: bool = False):
self.db_path = db_path
self.markdown_path = markdown_path
self.json_path = json_path
self.dry_run = dry_run
self.conn = None

def connect(self):
"""Connect to database."""
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row
return self.conn

def close(self):
"""Close database connection."""
if self.conn:
self.conn.close()

def init_schema(self):
"""Initialize database schema."""
print(f"{Colors.CYAN}Initializing database schema...{Colors.RESET}")

if self.dry_run:
print(f"{Colors.YELLOW}[DRY RUN] Would create tables{Colors.RESET}")
return

cursor = self.conn.cursor()
cursor.executescript(DB_SCHEMA)
self.conn.commit()

print(f"{Colors.GREEN}✓ Schema initialized{Colors.RESET}")

def verify_schema(self) -> bool:
"""Verify all required tables exist."""
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'v2_%'")
tables = {row[0] for row in cursor.fetchall()}

required = {'v2_epics', 'v2_features', 'v2_tasks', 'v2_plan_sync'}
missing = required - tables

if missing:
print(f"{Colors.YELLOW}Missing tables: {', '.join(missing)}{Colors.RESET}")
return False
return True

def calculate_checksum(self, content: str) -> str:
"""Calculate MD5 checksum of content."""
return hashlib.md5(content.encode('utf-8')).hexdigest()

def parse_markdown_tasks(self) -> Dict[str, Dict]:
"""Parse markdown file and extract task information."""
print(f"{Colors.CYAN}Parsing markdown file...{Colors.RESET}")

if not self.markdown_path.exists():
print(f"{Colors.RED}Error: Markdown file not found: {self.markdown_path}{Colors.RESET}")
return {}

content = self.markdown_path.read_text(encoding='utf-8')

tasks = {}
current_epic = None
current_feature = None

# Regex patterns
epic_pattern = re.compile(r'^### (E\d+):\s+(.+?)(?:\s+[⚡📅✅🔴🟡🟢].*)?$', re.MULTILINE)
feature_pattern = re.compile(r'^#### (F\d+\.\d+):\s+(.+)$', re.MULTILINE)
task_pattern = re.compile(r'^-\s+\[([ x])\]\s+([🔴🟡🟢])?\s*\*\*([T\d.]+):\*\*\s+(.+?)\s+-\s+(\d+)h(?:\s+[⏰✅].*)?$', re.MULTILINE)

# Parse epics
for match in epic_pattern.finditer(content):
current_epic = match.group(1)
epic_name = match.group(2).strip()

# Parse features
for match in feature_pattern.finditer(content):
current_feature = match.group(1)
feature_name = match.group(2).strip()

# Parse tasks
for match in task_pattern.finditer(content):
checkbox = match.group(1)
priority_icon = match.group(2)
task_id = match.group(3)
description = match.group(4).strip()
hours = int(match.group(5))

# Map checkbox to status
status = 'completed' if checkbox == 'x' else 'pending'

# Map priority icon to priority level
priority_map = {'🔴': 'P0', '🟡': 'P1', '🟢': 'P2'}
priority = priority_map.get(priority_icon, 'P1')

tasks[task_id] = {
'task_id': task_id,
'description': description,
'status': status,
'priority': priority,
'estimated_hours': hours,
'checksum': self.calculate_checksum(f"{task_id}:{description}:{status}")
}

print(f"{Colors.GREEN}✓ Parsed {len(tasks)} tasks from markdown{Colors.RESET}")
return tasks

def sync_plan_to_db(self):
"""Sync markdown → database."""
print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f"{Colors.BOLD}SYNC: Markdown → Database{Colors.RESET}")
print(f"{Colors.BOLD}{Colors.BLUE}{'='*60}{Colors.RESET}\n")

if not self.verify_schema():
print(f"{Colors.YELLOW}Initializing schema first...{Colors.RESET}")
self.init_schema()

# Parse JSON for structure (epics/features)
if not self.json_path.exists():
print(f"{Colors.RED}Error: JSON file not found: {self.json_path}{Colors.RESET}")
return

with open(self.json_path, 'r') as f:
data = json.load(f)

# Parse markdown for task statuses
md_tasks = self.parse_markdown_tasks()

cursor = self.conn.cursor()
stats = {
'epics_synced': 0,
'features_synced': 0,
'tasks_updated': 0,
'tasks_inserted': 0,
'no_change': 0
}

# Sync epics
for epic in data.get('epics', []):
epic_id = epic['epic_id']

if self.dry_run:
print(f"{Colors.DIM}[DRY RUN] Would sync epic {epic_id}{Colors.RESET}")
stats['epics_synced'] += 1
continue

cursor.execute("""
INSERT INTO v2_epics (epic_id, name, description, status, target_start, target_end,
task_count, timeline_weeks, alignment)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(epic_id) DO UPDATE SET
name=excluded.name,
description=excluded.description,
target_start=excluded.target_start,
target_end=excluded.target_end,
task_count=excluded.task_count,
timeline_weeks=excluded.timeline_weeks,
alignment=excluded.alignment,
updated_at=datetime('now', 'utc')
""", (
epic_id,
epic['name'],
epic.get('description'),
epic.get('status', 'planned'),
epic.get('target_start'),
epic.get('target_end'),
epic.get('task_count', 0),
epic.get('timeline_weeks'),
epic.get('alignment')
))
stats['epics_synced'] += 1

# Sync features
for feature in epic.get('features', []):
feature_id = feature['feature_id']

cursor.execute("""
INSERT INTO v2_features (feature_id, epic_id, name, description, status)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(feature_id) DO UPDATE SET
name=excluded.name,
description=excluded.description,
updated_at=datetime('now', 'utc')
""", (
feature_id,
epic_id,
feature['name'],
feature.get('description'),
feature.get('status', 'planned')
))
stats['features_synced'] += 1

# Sync tasks
for task in feature.get('tasks', []):
task_id = task['task_id']

# Get status from markdown if available
md_task = md_tasks.get(task_id)
status = md_task['status'] if md_task else task.get('status', 'pending')
completed_at = datetime.now(timezone.utc).isoformat() if status == 'completed' else None

# Check if task exists
cursor.execute("SELECT status, checksum FROM v2_tasks WHERE task_id = ?", (task_id,))
existing = cursor.fetchone()

checksum = self.calculate_checksum(f"{task_id}:{task['description']}:{status}")

if existing:
# Update if changed
if existing['checksum'] != checksum or existing['status'] != status:
cursor.execute("""
UPDATE v2_tasks
SET status=?, completed_at=?, checksum=?, updated_at=datetime('now', 'utc')
WHERE task_id=?
""", (status, completed_at, checksum, task_id))
stats['tasks_updated'] += 1
print(f"{Colors.GREEN}↑ {task_id}: {existing['status']} → {status}{Colors.RESET}")
else:
stats['no_change'] += 1
else:
# Insert new task
cursor.execute("""
INSERT INTO v2_tasks
(task_id, feature_id, description, priority, status, estimated_hours,
target_date, completed_at, checksum)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
task_id,
feature_id,
task['description'],
task.get('priority', 'P1'),
status,
task.get('estimated_hours'),
task.get('target_date'),
completed_at,
checksum
))
stats['tasks_inserted'] += 1
print(f"{Colors.CYAN}+ {task_id}: {status}{Colors.RESET}")

# Update epic completion counts
if not self.dry_run:
cursor.execute("""
UPDATE v2_epics SET completed_count = (
SELECT COUNT(*) FROM v2_tasks t
JOIN v2_features f ON t.feature_id = f.feature_id
WHERE f.epic_id = v2_epics.epic_id AND t.status = 'completed'
)
""")

# Record sync
cursor.execute("""
INSERT INTO v2_plan_sync (sync_type, items_synced, checksum, notes)
VALUES (?, ?, ?, ?)
""", (
'plan-to-db',
stats['tasks_updated'] + stats['tasks_inserted'],
self.calculate_checksum(self.markdown_path.read_text()),
json.dumps(stats)
))

self.conn.commit()

# Print summary
print(f"\n{Colors.BOLD}Sync Summary:{Colors.RESET}")
print(f" Epics synced: {stats['epics_synced']}")
print(f" Features synced: {stats['features_synced']}")
print(f" Tasks inserted: {Colors.GREEN}{stats['tasks_inserted']}{Colors.RESET}")
print(f" Tasks updated: {Colors.YELLOW}{stats['tasks_updated']}{Colors.RESET}")
print(f" No change: {Colors.DIM}{stats['no_change']}{Colors.RESET}")

if self.dry_run:
print(f"\n{Colors.YELLOW}[DRY RUN] No changes committed{Colors.RESET}")

def sync_db_to_plan(self):
"""Sync database → markdown."""
print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f"{Colors.BOLD}SYNC: Database → Markdown{Colors.RESET}")
print(f"{Colors.BOLD}{Colors.BLUE}{'='*60}{Colors.RESET}\n")

if not self.verify_schema():
print(f"{Colors.RED}Error: Database schema not initialized. Run --init first.{Colors.RESET}")
return

# Read current markdown
if not self.markdown_path.exists():
print(f"{Colors.RED}Error: Markdown file not found: {self.markdown_path}{Colors.RESET}")
return

content = self.markdown_path.read_text(encoding='utf-8')
original_content = content

# Get all tasks from database
cursor = self.conn.cursor()
cursor.execute("SELECT task_id, status FROM v2_tasks")
db_tasks = {row['task_id']: row['status'] for row in cursor.fetchall()}

print(f"{Colors.CYAN}Found {len(db_tasks)} tasks in database{Colors.RESET}")

# Update checkbox states
stats = {'updated': 0, 'no_change': 0}

def replace_checkbox(match):
task_id = match.group(4) # Task ID is the 4th group
checkbox_state = match.group(2) # Checkbox state is the 2nd group

if task_id in db_tasks:
db_status = db_tasks[task_id]
new_state = 'x' if db_status == 'completed' else ' '

if checkbox_state != new_state:
stats['updated'] += 1
print(f"{Colors.GREEN}↑ {task_id}: [{'x' if checkbox_state == 'x' else ' '}] → [{new_state}]{Colors.RESET}")
return match.group(0).replace(f'[{checkbox_state}]', f'[{new_state}]', 1)
else:
stats['no_change'] += 1

return match.group(0)

# Replace all task checkboxes
task_pattern = re.compile(r'^(-\s+\[)([ x])(\]\s+[🔴🟡🟢]?\s*\*\*)([T\d.]+)(:\*\*.+)$', re.MULTILINE)
updated_content = task_pattern.sub(replace_checkbox, content)

# Update progress counts in header
cursor.execute("SELECT COUNT(*) as total FROM v2_tasks")
total_tasks = cursor.fetchone()['total']

cursor.execute("SELECT COUNT(*) as completed FROM v2_tasks WHERE status = 'completed'")
completed_tasks = cursor.fetchone()['completed']

progress_pct = int((completed_tasks / total_tasks * 100)) if total_tasks > 0 else 0

# Update YAML frontmatter
updated_content = re.sub(
r'total_tasks: \d+',
f'total_tasks: {total_tasks}',
updated_content
)
updated_content = re.sub(
r'completed: \d+',
f'completed: {completed_tasks}',
updated_content
)
updated_content = re.sub(
r'progress: "\d+%"',
f'progress: "{progress_pct}%"',
updated_content
)

# Update progress line
updated_content = re.sub(
r'\*\*Progress:\*\* \d+/\d+ tasks \(\d+%\)',
f'**Progress:** {completed_tasks}/{total_tasks} tasks ({progress_pct}%)',
updated_content
)

# Write updated content
if updated_content != original_content:
if self.dry_run:
print(f"\n{Colors.YELLOW}[DRY RUN] Would update {stats['updated']} tasks{Colors.RESET}")
else:
self.markdown_path.write_text(updated_content, encoding='utf-8')

# Record sync
cursor.execute("""
INSERT INTO v2_plan_sync (sync_type, items_synced, checksum, notes)
VALUES (?, ?, ?, ?)
""", (
'db-to-plan',
stats['updated'],
self.calculate_checksum(updated_content),
json.dumps(stats)
))
self.conn.commit()

print(f"\n{Colors.GREEN}✓ Markdown file updated{Colors.RESET}")
else:
print(f"\n{Colors.DIM}No changes needed{Colors.RESET}")

# Print summary
print(f"\n{Colors.BOLD}Sync Summary:{Colors.RESET}")
print(f" Tasks updated: {Colors.GREEN}{stats['updated']}{Colors.RESET}")
print(f" No change: {Colors.DIM}{stats['no_change']}{Colors.RESET}")
print(f" Total tasks: {total_tasks}")
print(f" Completed: {completed_tasks} ({progress_pct}%)")

def show_status(self):
"""Show sync statistics and progress."""
print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*60}{Colors.RESET}")
print(f"{Colors.BOLD}V2 Project Plan Status{Colors.RESET}")
print(f"{Colors.BOLD}{Colors.BLUE}{'='*60}{Colors.RESET}\n")

if not self.verify_schema():
print(f"{Colors.RED}Database not initialized. Run --init first.{Colors.RESET}")
return

cursor = self.conn.cursor()

# Overall progress
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress,
SUM(CASE WHEN status = 'blocked' THEN 1 ELSE 0 END) as blocked,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending
FROM v2_tasks
""")
stats = cursor.fetchone()

total = stats['total']
completed = stats['completed'] or 0
in_progress = stats['in_progress'] or 0
blocked = stats['blocked'] or 0
pending = stats['pending'] or 0
progress = int((completed / total * 100)) if total > 0 else 0

print(f"{Colors.BOLD}Overall Progress:{Colors.RESET}")
print(f" Total Tasks: {total}")
print(f" {Colors.GREEN}Completed: {completed} ({int(completed/total*100) if total > 0 else 0}%){Colors.RESET}")
print(f" {Colors.YELLOW}In Progress: {in_progress}{Colors.RESET}")
print(f" {Colors.RED}Blocked: {blocked}{Colors.RESET}")
print(f" {Colors.DIM}Pending: {pending}{Colors.RESET}")

# Progress bar
bar_width = 40
filled = int(bar_width * completed / total) if total > 0 else 0
bar = '█' * filled + '░' * (bar_width - filled)
print(f"\n [{bar}] {progress}%\n")

# Progress by epic
print(f"{Colors.BOLD}Progress by Epic:{Colors.RESET}\n")
cursor.execute("""
SELECT
e.epic_id,
e.name,
e.status,
COUNT(t.task_id) as total_tasks,
SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_tasks
FROM v2_epics e
LEFT JOIN v2_features f ON e.epic_id = f.epic_id
LEFT JOIN v2_tasks t ON f.feature_id = t.feature_id
GROUP BY e.epic_id
ORDER BY e.epic_id
""")

status_icons = {
'planned': '📅',
'active': '⚡',
'completed': '✅',
'blocked': '🔴'
}

for row in cursor.fetchall():
epic_id = row['epic_id']
name = row['name']
status = row['status']
total_tasks = row['total_tasks'] or 0
completed_tasks = row['completed_tasks'] or 0
epic_progress = int((completed_tasks / total_tasks * 100)) if total_tasks > 0 else 0

icon = status_icons.get(status, '📋')
print(f" {icon} {epic_id}: {name}")
print(f" {completed_tasks}/{total_tasks} tasks ({epic_progress}%)")

# Progress by priority
print(f"\n{Colors.BOLD}Progress by Priority:{Colors.RESET}\n")
cursor.execute("""
SELECT
priority,
COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed
FROM v2_tasks
GROUP BY priority
ORDER BY priority
""")

priority_icons = {'P0': '🔴', 'P1': '🟡', 'P2': '🟢', 'P3': '⚪'}

for row in cursor.fetchall():
priority = row['priority']
total = row['total']
completed = row['completed'] or 0
pct = int((completed / total * 100)) if total > 0 else 0
icon = priority_icons.get(priority, '⚪')

print(f" {icon} {priority}: {completed}/{total} ({pct}%)")

# Last sync info
print(f"\n{Colors.BOLD}Last Sync:{Colors.RESET}")
cursor.execute("""
SELECT sync_type, synced_at, items_synced, notes
FROM v2_plan_sync
ORDER BY synced_at DESC
LIMIT 5
""")

syncs = cursor.fetchall()
if syncs:
for row in syncs:
sync_type = row['sync_type']
synced_at = row['synced_at']
items = row['items_synced']
print(f" {sync_type}: {synced_at} ({items} items)")
else:
print(f" {Colors.DIM}No sync history{Colors.RESET}")

def create_checkpoint(self):
"""Create backup before sync."""
print(f"\n{Colors.BOLD}{Colors.CYAN}Creating checkpoint...{Colors.RESET}\n")

timestamp = datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')
backup_dir = BACKUP_DIR / timestamp
backup_dir.mkdir(parents=True, exist_ok=True)

if self.dry_run:
print(f"{Colors.YELLOW}[DRY RUN] Would create backup at {backup_dir}{Colors.RESET}")
return

# Backup markdown
if self.markdown_path.exists():
shutil.copy2(self.markdown_path, backup_dir / self.markdown_path.name)
print(f"{Colors.GREEN}✓ Backed up markdown{Colors.RESET}")

# Backup JSON
if self.json_path.exists():
shutil.copy2(self.json_path, backup_dir / self.json_path.name)
print(f"{Colors.GREEN}✓ Backed up JSON{Colors.RESET}")

# Backup database
if self.db_path.exists():
backup_db = backup_dir / self.db_path.name

# Use SQLite backup API for safe backup
source = sqlite3.connect(str(self.db_path))
dest = sqlite3.connect(str(backup_db))
source.backup(dest)
source.close()
dest.close()

print(f"{Colors.GREEN}✓ Backed up database{Colors.RESET}")

# Record checkpoint
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO v2_plan_sync (sync_type, items_synced, checksum, notes)
VALUES (?, ?, ?, ?)
""", (
'checkpoint',
0,
'',
f"Backup created at {backup_dir}"
))
self.conn.commit()

print(f"\n{Colors.GREEN}✓ Checkpoint created: {backup_dir}{Colors.RESET}")

def watch_mode(self, interval: int = 60):
"""Continuous sync mode (watches for changes)."""
print(f"\n{Colors.BOLD}{Colors.MAGENTA}{'='*60}{Colors.RESET}")
print(f"{Colors.BOLD}Watch Mode (sync every {interval}s){Colors.RESET}")
print(f"{Colors.BOLD}{Colors.MAGENTA}{'='*60}{Colors.RESET}\n")
print(f"{Colors.DIM}Press Ctrl+C to stop{Colors.RESET}\n")

last_md_checksum = None
last_db_checksum = None

try:
while True:
# Check markdown for changes
if self.markdown_path.exists():
current_md_checksum = self.calculate_checksum(self.markdown_path.read_text())

if last_md_checksum and current_md_checksum != last_md_checksum:
print(f"{Colors.CYAN}Markdown changed, syncing to DB...{Colors.RESET}")
self.sync_plan_to_db()

last_md_checksum = current_md_checksum

# Check database for changes
cursor = self.conn.cursor()
cursor.execute("SELECT MAX(updated_at) FROM v2_tasks")
current_db_checksum = cursor.fetchone()[0] or ''

if last_db_checksum and current_db_checksum != last_db_checksum:
print(f"{Colors.CYAN}Database changed, syncing to markdown...{Colors.RESET}")
self.sync_db_to_plan()

last_db_checksum = current_db_checksum

time.sleep(interval)

except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}Watch mode stopped{Colors.RESET}")

def main(): parser = argparse.ArgumentParser( description='V2 Project Plan Bidirectional Sync', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s --init Initialize database schema %(prog)s --plan-to-db Sync markdown → database %(prog)s --db-to-plan Sync database → markdown %(prog)s --status Show sync statistics %(prog)s --checkpoint Create backup %(prog)s --watch --interval 30 Watch mode (sync every 30s) %(prog)s --dry-run --plan-to-db Preview changes """ )

parser.add_argument('--init', action='store_true',
help='Initialize database schema')
parser.add_argument('--plan-to-db', action='store_true',
help='Sync markdown → database')
parser.add_argument('--db-to-plan', action='store_true',
help='Sync database → markdown')
parser.add_argument('--status', action='store_true',
help='Show sync statistics')
parser.add_argument('--checkpoint', action='store_true',
help='Create backup before sync')
parser.add_argument('--watch', action='store_true',
help='Continuous sync mode')
parser.add_argument('--interval', type=int, default=60,
help='Watch interval in seconds (default: 60)')
parser.add_argument('--dry-run', action='store_true',
help='Preview changes without committing')

args = parser.parse_args()

# Validate paths
if not DB_PATH.exists():
print(f"{Colors.RED}Error: Database not found: {DB_PATH}{Colors.RESET}")
print(f"{Colors.YELLOW}Run with --init to create it{Colors.RESET}")
sys.exit(1)

# Create sync manager
sync = ProjectPlanSync(DB_PATH, MARKDOWN_PATH, JSON_PATH, dry_run=args.dry_run)
sync.connect()

try:
# Execute commands
if args.init:
sync.init_schema()

elif args.checkpoint:
sync.create_checkpoint()

elif args.plan_to_db:
sync.sync_plan_to_db()

elif args.db_to_plan:
sync.sync_db_to_plan()

elif args.status:
sync.show_status()

elif args.watch:
sync.watch_mode(interval=args.interval)

else:
parser.print_help()

finally:
sync.close()

if name == 'main': main()