scripts-import-tasks-from-markdown
#!/usr/bin/env python3 """
title: "Paths" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Import Tasks from TASKLIST.md Files into SQLite Database" keywords: ['database', 'from', 'import', 'markdown', 'review'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "import-tasks-from-markdown.py" language: python executable: true usage: "python3 scripts/import-tasks-from-markdown.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Import Tasks from TASKLIST.md Files into SQLite Database
Parses V2 TASKLIST.md files and imports tasks into the work_items table, enabling progress tracking and completion rollup queries.
Usage: python3 scripts/import-tasks-from-markdown.py # Import all epics python3 scripts/import-tasks-from-markdown.py E001 # Import specific epic python3 scripts/import-tasks-from-markdown.py --dry-run # Preview without importing python3 scripts/import-tasks-from-markdown.py --clear # Clear existing tasks first
Version: 1.0.0 """
import sqlite3 import re import sys from pathlib import Path from datetime import datetime from typing import List, Dict, Optional, Tuple import argparse
Paths - ADR-114 & ADR-118: Use centralized path discovery
SCRIPT_DIR = Path(file).parent REPO_ROOT = SCRIPT_DIR.parent sys.path.insert(0, str(SCRIPT_DIR / "core"))
try: from paths import get_sessions_db_path, SESSIONS_DB as _SESSIONS_DB SESSIONS_DB = _SESSIONS_DB except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): SESSIONS_DB = _user_data / "sessions.db" else: SESSIONS_DB = Path.home() / ".coditect" / "context-storage" / "sessions.db"
DB_PATH = SESSIONS_DB # Backward compatibility alias EPICS_PATH = REPO_ROOT / "docs" / "05-project-planning" / "v2" / "epics"
Task ID pattern: E001-T001, E001-T001.1 (for subtasks)
TASK_COUNTER = {}
def get_db() -> sqlite3.Connection: """Get database connection.""" if not DB_PATH.exists(): print(f"ERROR: Database not found at {DB_PATH}") print("Run: python3 scripts/init-work-item-db.py") sys.exit(1)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def parse_time_estimate(text: str) -> Optional[float]:
"""Extract time estimate from text like '(4 hours)' or '(2 days)'."""
# Match patterns like (4 hours), (2h), (1 day), (4h)
patterns = [
r'(?(\d+(?:.\d+)?)\sh(?:ours?)?)',
r'(~?(\d+(?:.\d+)?)\sdays?)',
r'((\d+(?:.\d+)?)\shours?,\s(\d+)\s*hours?)',
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
if 'day' in pattern:
return float(match.group(1)) * 8 # 8 hours per day
return float(match.group(1))
return None
def parse_tasklist(epic_id: str, tasklist_path: Path) -> List[Dict]: """Parse a TASKLIST.md file and extract all tasks.""" if not tasklist_path.exists(): print(f" Warning: {tasklist_path} not found") return []
with open(tasklist_path, 'r') as f:
content = f.read()
tasks = []
current_section = None
current_feature = None
task_num = 0
subtask_num = 0
lines = content.split('\n')
for i, line in enumerate(lines):
stripped = line.strip()
# Section header: ## 1.1 Infrastructure Setup (2 days, 16 hours)
section_match = re.match(r'^##\s+(\d+\.\d+)\s+(.+?)(?:\s*\(.*\))?\s*$', stripped)
if section_match:
current_section = section_match.group(2).strip()
current_feature = f"F{epic_id[1:]}.{section_match.group(1).split('.')[1]}"
continue
# Main task: - [ ] **Task 1.1.1:** Description (4 hours)
task_match = re.match(r'^-\s*\[([ xX])\]\s*\*\*Task\s+[\d.]+:\*\*\s*(.+)$', stripped)
if task_match:
task_num += 1
subtask_num = 0
completed = task_match.group(1).lower() == 'x'
title = task_match.group(2).strip()
estimate = parse_time_estimate(title)
# Remove time estimate from title
title = re.sub(r'\s*\(~?\d+(?:\.\d+)?\s*(?:hours?|days?|h)\)\s*$', '', title, flags=re.IGNORECASE)
task_id = f"{epic_id}-T{task_num:03d}"
tasks.append({
'id': task_id,
'type': 'task',
'title': title,
'parent_id': current_feature or epic_id,
'epic_id': epic_id,
'section': current_section,
'status': 'completed' if completed else 'planned',
'estimate_hours': estimate,
'line_number': i + 1,
})
continue
# Subtask: - [ ] Sub-task: Description
subtask_match = re.match(r'^-\s*\[([ xX])\]\s*(?:Sub-task:?\s*)?(.+)$', stripped)
if subtask_match and task_num > 0:
# Check if this looks like a subtask (indented or starts with Sub-task)
if line.startswith(' ') or 'sub-task' in stripped.lower():
subtask_num += 1
completed = subtask_match.group(1).lower() == 'x'
title = subtask_match.group(2).strip()
# Skip success criteria lines (they start with certain patterns)
if title.startswith(('All ', 'Unit test', 'Integration test', 'Can ', 'Failover')):
continue
parent_task_id = f"{epic_id}-T{task_num:03d}"
subtask_id = f"{parent_task_id}.{subtask_num}"
tasks.append({
'id': subtask_id,
'type': 'subtask',
'title': title,
'parent_id': parent_task_id,
'epic_id': epic_id,
'section': current_section,
'status': 'completed' if completed else 'planned',
'estimate_hours': 1, # Default 1 hour for subtasks
'line_number': i + 1,
})
return tasks
def import_tasks(conn: sqlite3.Connection, tasks: List[Dict], epic_id: str, dry_run: bool = False) -> Tuple[int, int]: """Import tasks into the database.""" cursor = conn.cursor()
inserted = 0
updated = 0
for task in tasks:
if dry_run:
print(f" Would import: {task['id']} - {task['title'][:50]}...")
inserted += 1
continue
# Check if task exists
cursor.execute("SELECT id, status FROM work_items WHERE id = ?", (task['id'],))
existing = cursor.fetchone()
if existing:
# Update if status changed
if existing['status'] != task['status']:
cursor.execute("""
UPDATE work_items
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (task['status'], task['id']))
updated += 1
else:
# Insert new task
cursor.execute("""
INSERT INTO work_items (
id, type, title, parent_id, project_id, sub_project_id,
status, estimate_hours, source_file, source_line
) VALUES (?, ?, ?, ?, 'P001', 'SP001', ?, ?, ?, ?)
""", (
task['id'],
task['type'],
task['title'],
task['parent_id'],
task['status'],
task['estimate_hours'],
f"epics/{epic_id}-*/TASKLIST.md",
task['line_number']
))
inserted += 1
if not dry_run:
conn.commit()
return inserted, updated
def import_features(conn: sqlite3.Connection, epic_id: str, tasks: List[Dict], dry_run: bool = False) -> int: """Create feature records based on sections found in tasks.""" cursor = conn.cursor()
# Get unique sections/features
features = {}
for task in tasks:
if task.get('section') and task['type'] == 'task':
parent = task.get('parent_id', '')
if parent.startswith('F'):
features[parent] = task['section']
inserted = 0
for feature_id, title in features.items():
if dry_run:
print(f" Would create feature: {feature_id} - {title}")
inserted += 1
continue
cursor.execute("""
INSERT OR IGNORE INTO work_items (
id, type, title, parent_id, project_id, sub_project_id, status
) VALUES (?, 'feature', ?, ?, 'P001', 'SP001', 'planned')
""", (feature_id, title, epic_id))
if cursor.rowcount > 0:
inserted += 1
if not dry_run:
conn.commit()
return inserted
def clear_tasks(conn: sqlite3.Connection, epic_id: Optional[str] = None): """Clear existing tasks (and subtasks) from database.""" cursor = conn.cursor()
if epic_id:
# Clear tasks for specific epic
cursor.execute("""
DELETE FROM work_items
WHERE type IN ('task', 'subtask') AND id LIKE ?
""", (f"{epic_id}-%",))
cursor.execute("""
DELETE FROM work_items
WHERE type = 'feature' AND parent_id = ?
""", (epic_id,))
else:
# Clear all tasks and subtasks
cursor.execute("DELETE FROM work_items WHERE type IN ('task', 'subtask')")
cursor.execute("DELETE FROM work_items WHERE type = 'feature'")
conn.commit()
return cursor.rowcount
def get_epic_dirs() -> List[Tuple[str, Path]]: """Get all epic directories.""" if not EPICS_PATH.exists(): return []
epics = []
for epic_dir in sorted(EPICS_PATH.iterdir()):
if epic_dir.is_dir() and epic_dir.name.startswith('E'):
epic_id = epic_dir.name.split('-')[0]
epics.append((epic_id, epic_dir))
return epics
def print_summary(conn: sqlite3.Connection): """Print import summary with epic progress.""" cursor = conn.cursor()
print("\n" + "=" * 70)
print("Import Summary - Work Item Progress")
print("=" * 70)
# Overall counts
cursor.execute("""
SELECT type, COUNT(*) as count,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed
FROM work_items
GROUP BY type
ORDER BY
CASE type
WHEN 'epic' THEN 1
WHEN 'feature' THEN 2
WHEN 'task' THEN 3
WHEN 'subtask' THEN 4
END
""")
print("\nOverall Counts:")
print("-" * 40)
for row in cursor.fetchall():
pct = (row['completed'] / row['count'] * 100) if row['count'] > 0 else 0
print(f" {row['type']:12} {row['count']:6} ({row['completed']} completed, {pct:.1f}%)")
# Epic progress
cursor.execute("""
SELECT
e.id as epic_id,
e.title as epic_title,
COUNT(t.id) as task_count,
SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed
FROM work_items e
LEFT JOIN work_items f ON f.parent_id = e.id AND f.type = 'feature'
LEFT JOIN work_items t ON t.parent_id = f.id AND t.type IN ('task', 'subtask')
WHERE e.type = 'epic'
GROUP BY e.id
ORDER BY e.id
""")
print("\nEpic Progress:")
print("-" * 70)
print(f"{'Epic':<8} {'Title':<35} {'Tasks':>8} {'Done':>8} {'%':>8}")
print("-" * 70)
for row in cursor.fetchall():
pct = (row['completed'] / row['task_count'] * 100) if row['task_count'] > 0 else 0
title = row['epic_title'][:33] + '..' if len(row['epic_title']) > 35 else row['epic_title']
print(f"{row['epic_id']:<8} {title:<35} {row['task_count']:>8} {row['completed']:>8} {pct:>7.1f}%")
print("=" * 70)
def main(): parser = argparse.ArgumentParser( description="Import tasks from TASKLIST.md files into SQLite database", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 scripts/import-tasks-from-markdown.py # Import all epics python3 scripts/import-tasks-from-markdown.py E001 # Import specific epic python3 scripts/import-tasks-from-markdown.py --dry-run # Preview without importing python3 scripts/import-tasks-from-markdown.py --clear # Clear existing tasks first """ ) parser.add_argument('epic', nargs='?', help='Specific epic to import (e.g., E001)') parser.add_argument('--dry-run', action='store_true', help='Preview without importing') parser.add_argument('--clear', action='store_true', help='Clear existing tasks before import') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
args = parser.parse_args()
print("=" * 70)
print("Import Tasks from TASKLIST.md Files")
print("=" * 70)
print(f"Database: {DB_PATH}")
print(f"Epics Path: {EPICS_PATH}")
if args.dry_run:
print("Mode: DRY RUN (no changes will be made)")
print()
conn = get_db()
# Clear if requested
if args.clear and not args.dry_run:
print("Clearing existing tasks...")
cleared = clear_tasks(conn, args.epic)
print(f" Cleared {cleared} records")
print()
# Get epics to process
epic_dirs = get_epic_dirs()
if args.epic:
epic_dirs = [(eid, epath) for eid, epath in epic_dirs if eid == args.epic]
if not epic_dirs:
print(f"ERROR: Epic {args.epic} not found")
sys.exit(1)
total_inserted = 0
total_updated = 0
total_features = 0
for epic_id, epic_dir in epic_dirs:
tasklist_path = epic_dir / "TASKLIST.md"
print(f"Processing {epic_id}: {epic_dir.name}")
# Parse tasks
tasks = parse_tasklist(epic_id, tasklist_path)
print(f" Found {len(tasks)} tasks/subtasks")
if not tasks:
continue
# Import features first
features_created = import_features(conn, epic_id, tasks, args.dry_run)
total_features += features_created
if features_created:
print(f" Created {features_created} features")
# Import tasks
inserted, updated = import_tasks(conn, tasks, epic_id, args.dry_run)
total_inserted += inserted
total_updated += updated
print(f" Imported: {inserted} new, {updated} updated")
print()
print(f"Total: {total_inserted} inserted, {total_updated} updated, {total_features} features")
# Print summary
if not args.dry_run:
print_summary(conn)
conn.close()
return 0
if name == "main": sys.exit(main())