scripts-sync-daemon
#!/usr/bin/env python3 """
title: "ANSI colors" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Sync Daemon - Automated bidirectional sync between markdown tasklist and database." keywords: ['daemon', 'database', 'sync'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "sync-daemon.py" language: python executable: true usage: "python3 scripts/sync-daemon.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Sync Daemon - Automated bidirectional sync between markdown tasklist and database.
Features:
- Watches markdown file for changes
- Polls database for status updates
- Auto-syncs on change detection with debouncing
- Logs all operations to v2_plan_sync table
- Can run as background process or via cron
Usage: python3 sync-daemon.py # Run in foreground python3 sync-daemon.py --daemon # Run as background daemon python3 sync-daemon.py --interval 30 # Custom poll interval (seconds) python3 sync-daemon.py --once # Single sync and exit """
import argparse import hashlib import json import os import re import signal import sqlite3 import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional
ANSI colors
Shared Colors module (consolidates 36 duplicate definitions)
from colors import Colors
SCRIPT_DIR = Path(file).parent CORE_DIR = SCRIPT_DIR.parent PROJECT_ROOT = CORE_DIR.parent.parent.parent
ADR-114 & ADR-118: Use centralized path discovery
sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_sessions_db_path, SESSIONS_DB DB_PATH = SESSIONS_DB # Task sync data goes to sessions.db (Tier 3) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "sessions.db" else: DB_PATH = CORE_DIR / "context-storage" / "sessions.db"
TASKLIST_PATH = PROJECT_ROOT / "docs" / "project-management" / "V2-TASKLIST-WITH-CHECKBOXES.md" LOG_FILE = CORE_DIR / "logs" / "sync-daemon.log"
Ensure log directory exists
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
class SyncDaemon: def init(self, interval: int = 30, debounce: float = 2.0): self.interval = interval self.debounce = debounce self.running = True self.last_md_hash: Optional[str] = None self.last_db_hash: Optional[str] = None self.last_sync_time: Optional[datetime] = None self.pending_sync: Optional[str] = None self.pending_sync_time: Optional[float] = None
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully."""
self.log("Received shutdown signal, stopping daemon...")
self.running = False
def log(self, message: str, level: str = "INFO"):
"""Log message to console and file."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
color = {
"INFO": Colors.BLUE,
"SYNC": Colors.GREEN,
"WARN": Colors.YELLOW,
"ERROR": Colors.RED,
}.get(level, Colors.RESET)
console_msg = f"{Colors.DIM}{timestamp}{Colors.RESET} {color}[{level}]{Colors.RESET} {message}"
file_msg = f"{timestamp} [{level}] {message}"
print(console_msg)
try:
with open(LOG_FILE, "a") as f:
f.write(file_msg + "\n")
except Exception:
pass
def get_file_hash(self, filepath: Path) -> Optional[str]:
"""Get MD5 hash of file contents."""
try:
with open(filepath, "rb") as f:
return hashlib.md5(f.read()).hexdigest()
except FileNotFoundError:
return None
def get_db_state_hash(self) -> Optional[str]:
"""Get hash of current database task states."""
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT task_id, status, completed_at
FROM v2_tasks
ORDER BY task_id
""")
rows = cursor.fetchall()
conn.close()
state_str = json.dumps(rows, sort_keys=True)
return hashlib.md5(state_str.encode()).hexdigest()
except Exception as e:
self.log(f"Error getting DB state: {e}", "ERROR")
return None
def sync_plan_to_db(self) -> int:
"""Sync markdown checkbox states to database."""
try:
with open(TASKLIST_PATH, "r") as f:
content = f.read()
# Pattern: - [x] or - [ ] followed by task ID
pattern = r'^\- \[([ x])\] .*?\*\*(T[\d\.]+):'
matches = re.findall(pattern, content, re.MULTILINE)
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
updated = 0
for checked, task_id in matches:
new_status = "completed" if checked == "x" else "pending"
completed_at = datetime.now(timezone.utc).isoformat() if checked == "x" else None
cursor.execute("""
UPDATE v2_tasks
SET status = ?, completed_at = ?
WHERE task_id = ? AND status != ?
""", (new_status, completed_at, task_id, new_status))
if cursor.rowcount > 0:
updated += 1
# Update epic completed counts
cursor.execute("""
UPDATE v2_epics SET completed_count = (
SELECT COUNT(*) FROM v2_tasks t
JOIN v2_features f ON t.feature_id = f.feature_id
WHERE f.epic_id = v2_epics.epic_id AND t.status = 'completed'
)
""")
# Log sync
cursor.execute("""
INSERT INTO v2_plan_sync (sync_type, synced_at, items_synced, notes)
VALUES ('plan_to_db', ?, ?, 'Automated sync from daemon')
""", (datetime.now(timezone.utc).isoformat(), updated))
conn.commit()
conn.close()
return updated
except Exception as e:
self.log(f"Error syncing plan to DB: {e}", "ERROR")
return 0
def sync_db_to_plan(self) -> int:
"""Sync database task states to markdown checkboxes."""
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT task_id, status FROM v2_tasks")
task_states = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
with open(TASKLIST_PATH, "r") as f:
content = f.read()
updated = 0
lines = content.split("\n")
new_lines = []
for line in lines:
match = re.match(r'^(\- \[)([ x])(\] .*?\*\*)(T[\d\.]+)(:.*)$', line)
if match:
prefix, current_check, middle, task_id, suffix = match.groups()
db_status = task_states.get(task_id)
if db_status:
new_check = "x" if db_status == "completed" else " "
if new_check != current_check:
line = f"{prefix}{new_check}{middle}{task_id}{suffix}"
updated += 1
new_lines.append(line)
if updated > 0:
with open(TASKLIST_PATH, "w") as f:
f.write("\n".join(new_lines))
# Log sync
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO v2_plan_sync (sync_type, synced_at, items_synced, notes)
VALUES ('db_to_plan', ?, ?, 'Automated sync from daemon')
""", (datetime.now(timezone.utc).isoformat(), updated))
conn.commit()
conn.close()
return updated
except Exception as e:
self.log(f"Error syncing DB to plan: {e}", "ERROR")
return 0
def check_and_sync(self) -> bool:
"""Check for changes and sync if needed."""
current_md_hash = self.get_file_hash(TASKLIST_PATH)
current_db_hash = self.get_db_state_hash()
synced = False
# Check markdown changes
if current_md_hash and current_md_hash != self.last_md_hash:
if self.last_md_hash is not None:
self.log("Markdown file changed, syncing to database...", "SYNC")
updated = self.sync_plan_to_db()
self.log(f"Updated {updated} tasks in database", "SYNC")
synced = True
self.last_md_hash = current_md_hash
# Check database changes
if current_db_hash and current_db_hash != self.last_db_hash:
if self.last_db_hash is not None and not synced:
self.log("Database changed, syncing to markdown...", "SYNC")
updated = self.sync_db_to_plan()
self.log(f"Updated {updated} checkboxes in markdown", "SYNC")
synced = True
self.last_db_hash = current_db_hash
if synced:
self.last_sync_time = datetime.now()
# Refresh hashes after sync
self.last_md_hash = self.get_file_hash(TASKLIST_PATH)
self.last_db_hash = self.get_db_state_hash()
return synced
def get_status(self) -> dict:
"""Get current sync status."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Task counts
cursor.execute("""
SELECT status, COUNT(*) FROM v2_tasks GROUP BY status
""")
status_counts = dict(cursor.fetchall())
# Last syncs
cursor.execute("""
SELECT sync_type, synced_at, items_synced
FROM v2_plan_sync
ORDER BY synced_at DESC
LIMIT 5
""")
recent_syncs = cursor.fetchall()
conn.close()
return {
"tasks": status_counts,
"total": sum(status_counts.values()),
"completed": status_counts.get("completed", 0),
"pending": status_counts.get("pending", 0),
"recent_syncs": recent_syncs,
"daemon_running": self.running,
"last_sync": self.last_sync_time.isoformat() if self.last_sync_time else None
}
def run_once(self):
"""Run a single sync cycle and exit."""
self.log("Running single sync cycle...")
# Initialize hashes
self.last_md_hash = self.get_file_hash(TASKLIST_PATH)
self.last_db_hash = self.get_db_state_hash()
# Force sync both directions
md_updated = self.sync_plan_to_db()
db_updated = self.sync_db_to_plan()
self.log(f"Sync complete: {md_updated} DB updates, {db_updated} markdown updates", "SYNC")
status = self.get_status()
print(f"\n{Colors.BOLD}Current Status:{Colors.RESET}")
print(f" Total tasks: {status['total']}")
print(f" Completed: {Colors.GREEN}{status['completed']}{Colors.RESET}")
print(f" Pending: {Colors.YELLOW}{status['pending']}{Colors.RESET}")
def run(self):
"""Run the sync daemon loop."""
self.log(f"Starting sync daemon (interval: {self.interval}s)")
self.log(f"Watching: {TASKLIST_PATH}")
self.log(f"Database: {DB_PATH}")
# Initialize hashes
self.last_md_hash = self.get_file_hash(TASKLIST_PATH)
self.last_db_hash = self.get_db_state_hash()
self.log("Initial state captured, watching for changes...")
while self.running:
try:
self.check_and_sync()
time.sleep(self.interval)
except Exception as e:
self.log(f"Error in sync loop: {e}", "ERROR")
time.sleep(self.interval)
self.log("Sync daemon stopped")
def main(): parser = argparse.ArgumentParser(description="V2 Project Plan Sync Daemon") parser.add_argument("--daemon", "-d", action="store_true", help="Run as background daemon") parser.add_argument("--interval", "-i", type=int, default=30, help="Poll interval in seconds") parser.add_argument("--once", action="store_true", help="Run single sync and exit") parser.add_argument("--status", action="store_true", help="Show current sync status")
args = parser.parse_args()
daemon = SyncDaemon(interval=args.interval)
if args.status:
status = daemon.get_status()
print(f"\n{Colors.BOLD}V2 Project Plan Sync Status{Colors.RESET}")
print("=" * 40)
print(f"Total tasks: {status['total']}")
print(f"Completed: {Colors.GREEN}{status['completed']}{Colors.RESET}")
print(f"Pending: {Colors.YELLOW}{status['pending']}{Colors.RESET}")
print(f"\n{Colors.BOLD}Recent Syncs:{Colors.RESET}")
for sync_type, synced_at, items in status['recent_syncs']:
print(f" {synced_at[:19]} | {sync_type:12} | {items} items")
return
if args.once:
daemon.run_once()
return
if args.daemon:
# Fork to background
pid = os.fork()
if pid > 0:
print(f"Sync daemon started with PID {pid}")
print(f"Logs: {LOG_FILE}")
sys.exit(0)
daemon.run()
if name == "main": main()