scripts-init-work-item-db
#!/usr/bin/env python3 """
title: "Resolve paths" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Initialize Work Item Hierarchy Database Schema (ADR-006)" keywords: ['database', 'init', 'item', 'optimization', 'review'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "init-work-item-db.py" language: python executable: true usage: "python3 scripts/init-work-item-db.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Initialize Work Item Hierarchy Database Schema (ADR-006)
This script creates the complete database schema for hierarchical work item tracking:
- Tables: projects, sub_projects, sprints, work_items
- Views: work_item_children_stats, work_item_progress, epic_progress, sprint_burndown, project_progress, sub_project_progress
- Indexes: All foreign key and query optimization indexes
Usage: python3 scripts/init-work-item-db.py
Database: sessions.db (ADR-118 Tier 3 - session-scoped data) """
import sqlite3 import sys from pathlib import Path from datetime import datetime
Resolve paths - ADR-114 & ADR-118: Use centralized path discovery
SCRIPT_DIR = Path(file).parent REPO_ROOT = SCRIPT_DIR.parent sys.path.insert(0, str(SCRIPT_DIR / "core"))
try: from paths import get_sessions_db_path, SESSIONS_DB as _SESSIONS_DB SESSIONS_DB = _SESSIONS_DB except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): SESSIONS_DB = _user_data / "sessions.db" else: SESSIONS_DB = Path.home() / ".coditect" / "context-storage" / "sessions.db"
DB_PATH = SESSIONS_DB # Backward compatibility alias
def connect_db(): """Connect to sessions.db (ADR-118).""" if not DB_PATH.exists(): print(f"ERROR: Database not found at {DB_PATH}") print("Please ensure sessions.db exists (run /cx first).") sys.exit(1)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def create_tables(conn): """Create all tables for Work Item Hierarchy.""" cursor = conn.cursor()
print("Creating tables...")
# ============================================================
# PROJECTS - Top-level container
# ============================================================
cursor.execute("""
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
status TEXT DEFAULT 'active',
owner TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
print(" ✓ projects table")
# ============================================================
# SUB-PROJECTS - Maps to submodules in CODITECT
# ============================================================
cursor.execute("""
CREATE TABLE IF NOT EXISTS sub_projects (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
name TEXT NOT NULL,
path TEXT,
description TEXT,
status TEXT DEFAULT 'active',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE
)
""")
print(" ✓ sub_projects table")
# ============================================================
# SPRINTS - Time-boxed iterations
# ============================================================
cursor.execute("""
CREATE TABLE IF NOT EXISTS sprints (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
name TEXT NOT NULL,
goal TEXT,
start_date TEXT NOT NULL,
end_date TEXT NOT NULL,
status TEXT DEFAULT 'planned',
velocity_planned INTEGER,
velocity_actual INTEGER,
retrospective TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE
)
""")
print(" ✓ sprints table")
# ============================================================
# WORK_ITEMS - Unified table for Epic, Feature, Task, Subtask
# ============================================================
cursor.execute("""
CREATE TABLE IF NOT EXISTS work_items (
id TEXT PRIMARY KEY,
type TEXT NOT NULL CHECK (type IN ('epic', 'feature', 'task', 'subtask')),
title TEXT NOT NULL,
description TEXT,
acceptance_criteria TEXT,
-- Hierarchy relationships
project_id TEXT,
sub_project_id TEXT,
parent_id TEXT,
-- Sprint assignment (for tasks/subtasks)
sprint_id TEXT,
-- Status tracking
status TEXT DEFAULT 'backlog' CHECK (status IN (
'backlog',
'planned',
'in_progress',
'blocked',
'review',
'completed',
'cancelled'
)),
-- Priority and ordering
priority INTEGER DEFAULT 50,
sort_order INTEGER DEFAULT 0,
-- Estimation
estimate_points INTEGER,
estimate_hours REAL,
actual_hours REAL,
-- Assignment and categorization
assignee TEXT,
labels TEXT,
-- Dependencies (JSON array of work_item IDs)
depends_on TEXT,
blocks TEXT,
-- Timestamps
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
started_at TEXT,
completed_at TEXT,
due_date TEXT,
-- Source tracking (for imports)
source_file TEXT,
source_line INTEGER,
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY (sub_project_id) REFERENCES sub_projects(id) ON DELETE SET NULL,
FOREIGN KEY (parent_id) REFERENCES work_items(id) ON DELETE CASCADE,
FOREIGN KEY (sprint_id) REFERENCES sprints(id) ON DELETE SET NULL
)
""")
print(" ✓ work_items table")
conn.commit()
print("Tables created successfully.\n")
def create_indexes(conn): """Create all indexes for performance optimization.""" cursor = conn.cursor()
print("Creating indexes...")
# Sub-projects indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sub_projects_project ON sub_projects(project_id)")
print(" ✓ idx_sub_projects_project")
# Sprints indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sprints_project ON sprints(project_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sprints_status ON sprints(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sprints_dates ON sprints(start_date, end_date)")
print(" ✓ idx_sprints_project, idx_sprints_status, idx_sprints_dates")
# Work items indexes
cursor.execute("CREATE INDEX IF NOT EXISTS idx_work_items_type ON work_items(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_work_items_parent ON work_items(parent_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_work_items_project ON work_items(project_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_work_items_sub_project ON work_items(sub_project_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_work_items_sprint ON work_items(sprint_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_work_items_status ON work_items(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_work_items_assignee ON work_items(assignee)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_work_items_due_date ON work_items(due_date)")
print(" ✓ 8 work_items indexes")
conn.commit()
print("Indexes created successfully.\n")
def create_views(conn): """Create all views for aggregations and reporting.""" cursor = conn.cursor()
print("Creating views...")
# ============================================================
# work_item_children_stats - Direct children count and completion
# ============================================================
cursor.execute("""
CREATE VIEW IF NOT EXISTS work_item_children_stats AS
SELECT
parent_id,
COUNT(*) as child_count,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed_count,
SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_count,
SUM(CASE WHEN status = 'blocked' THEN 1 ELSE 0 END) as blocked_count,
SUM(COALESCE(estimate_points, 0)) as total_points,
SUM(CASE WHEN status = 'completed' THEN COALESCE(estimate_points, 0) ELSE 0 END) as completed_points,
SUM(COALESCE(estimate_hours, 0)) as total_hours,
SUM(COALESCE(actual_hours, 0)) as actual_hours
FROM work_items
WHERE parent_id IS NOT NULL
GROUP BY parent_id
""")
print(" ✓ work_item_children_stats")
# ============================================================
# work_item_progress - Work item with completion percentage
# ============================================================
cursor.execute("""
CREATE VIEW IF NOT EXISTS work_item_progress AS
SELECT
wi.id,
wi.type,
wi.title,
wi.status,
wi.parent_id,
wi.project_id,
wi.sub_project_id,
wi.sprint_id,
wi.estimate_points,
wi.estimate_hours,
wi.assignee,
wi.due_date,
COALESCE(cs.child_count, 0) as child_count,
COALESCE(cs.completed_count, 0) as completed_children,
COALESCE(cs.in_progress_count, 0) as in_progress_children,
COALESCE(cs.blocked_count, 0) as blocked_children,
COALESCE(cs.total_points, 0) as total_child_points,
COALESCE(cs.completed_points, 0) as completed_child_points,
CASE
WHEN cs.child_count IS NULL OR cs.child_count = 0 THEN
CASE WHEN wi.status = 'completed' THEN 100.0 ELSE 0.0 END
ELSE
ROUND(100.0 * cs.completed_count / cs.child_count, 1)
END as percent_complete,
CASE
WHEN cs.total_points IS NULL OR cs.total_points = 0 THEN
CASE WHEN wi.status = 'completed' THEN 100.0 ELSE 0.0 END
ELSE
ROUND(100.0 * cs.completed_points / cs.total_points, 1)
END as percent_complete_weighted
FROM work_items wi
LEFT JOIN work_item_children_stats cs ON wi.id = cs.parent_id
""")
print(" ✓ work_item_progress")
# ============================================================
# epic_progress - Epic progress summary
# ============================================================
cursor.execute("""
CREATE VIEW IF NOT EXISTS epic_progress AS
SELECT
e.id as epic_id,
e.title as epic_title,
e.project_id,
e.sub_project_id,
e.status as epic_status,
COUNT(DISTINCT f.id) as feature_count,
COUNT(t.id) as task_count,
SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_tasks,
SUM(CASE WHEN t.status = 'blocked' THEN 1 ELSE 0 END) as blocked_tasks,
SUM(COALESCE(t.estimate_points, 0)) as total_points,
SUM(CASE WHEN t.status = 'completed' THEN COALESCE(t.estimate_points, 0) ELSE 0 END) as completed_points,
CASE
WHEN COUNT(t.id) = 0 THEN 0.0
ELSE ROUND(100.0 * SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) / COUNT(t.id), 1)
END as percent_complete
FROM work_items e
LEFT JOIN work_items f ON f.parent_id = e.id AND f.type = 'feature'
LEFT JOIN work_items t ON (t.parent_id = f.id OR t.parent_id = e.id) AND t.type IN ('task', 'subtask')
WHERE e.type = 'epic'
GROUP BY e.id
""")
print(" ✓ epic_progress")
# ============================================================
# sprint_burndown - Sprint burndown data
# ============================================================
cursor.execute("""
CREATE VIEW IF NOT EXISTS sprint_burndown AS
SELECT
s.id as sprint_id,
s.name as sprint_name,
s.start_date,
s.end_date,
s.status as sprint_status,
s.goal,
COUNT(wi.id) as total_items,
SUM(CASE WHEN wi.status = 'completed' THEN 1 ELSE 0 END) as completed_items,
SUM(CASE WHEN wi.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress_items,
SUM(CASE WHEN wi.status = 'blocked' THEN 1 ELSE 0 END) as blocked_items,
SUM(COALESCE(wi.estimate_points, 0)) as total_points,
SUM(CASE WHEN wi.status = 'completed' THEN COALESCE(wi.estimate_points, 0) ELSE 0 END) as completed_points,
SUM(COALESCE(wi.estimate_hours, 0)) as total_hours,
SUM(CASE WHEN wi.status = 'completed' THEN COALESCE(wi.estimate_hours, 0) ELSE 0 END) as completed_hours,
CASE
WHEN COUNT(wi.id) = 0 THEN 0.0
ELSE ROUND(100.0 * SUM(CASE WHEN wi.status = 'completed' THEN 1 ELSE 0 END) / COUNT(wi.id), 1)
END as percent_complete
FROM sprints s
LEFT JOIN work_items wi ON wi.sprint_id = s.id
GROUP BY s.id
""")
print(" ✓ sprint_burndown")
# ============================================================
# project_progress - Project overview
# ============================================================
cursor.execute("""
CREATE VIEW IF NOT EXISTS project_progress AS
SELECT
p.id as project_id,
p.name as project_name,
p.status as project_status,
COUNT(DISTINCT sp.id) as sub_project_count,
COUNT(DISTINCT e.id) as epic_count,
COUNT(DISTINCT f.id) as feature_count,
COUNT(DISTINCT t.id) as task_count,
SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
SUM(COALESCE(t.estimate_points, 0)) as total_points,
SUM(CASE WHEN t.status = 'completed' THEN COALESCE(t.estimate_points, 0) ELSE 0 END) as completed_points,
CASE
WHEN COUNT(DISTINCT t.id) = 0 THEN 0.0
ELSE ROUND(100.0 * SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) / COUNT(DISTINCT t.id), 1)
END as percent_complete
FROM projects p
LEFT JOIN sub_projects sp ON sp.project_id = p.id
LEFT JOIN work_items e ON e.project_id = p.id AND e.type = 'epic'
LEFT JOIN work_items f ON f.parent_id = e.id AND f.type = 'feature'
LEFT JOIN work_items t ON (t.parent_id = f.id OR t.parent_id = e.id) AND t.type IN ('task', 'subtask')
GROUP BY p.id
""")
print(" ✓ project_progress")
# ============================================================
# sub_project_progress - Sub-project progress
# ============================================================
cursor.execute("""
CREATE VIEW IF NOT EXISTS sub_project_progress AS
SELECT
sp.id as sub_project_id,
sp.name as sub_project_name,
sp.path,
sp.project_id,
COUNT(DISTINCT e.id) as epic_count,
COUNT(DISTINCT f.id) as feature_count,
COUNT(t.id) as task_count,
SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
CASE
WHEN COUNT(t.id) = 0 THEN 0.0
ELSE ROUND(100.0 * SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) / COUNT(t.id), 1)
END as percent_complete
FROM sub_projects sp
LEFT JOIN work_items e ON e.sub_project_id = sp.id AND e.type = 'epic'
LEFT JOIN work_items f ON f.parent_id = e.id AND f.type = 'feature'
LEFT JOIN work_items t ON (t.parent_id = f.id OR t.parent_id = e.id) AND t.type IN ('task', 'subtask')
GROUP BY sp.id
""")
print(" ✓ sub_project_progress")
conn.commit()
print("Views created successfully.\n")
def initialize_data(conn): """Initialize database with sample data.""" cursor = conn.cursor()
print("Initializing sample data...")
# Insert Project P001
cursor.execute("""
INSERT OR IGNORE INTO projects (id, name, description, status, owner, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
'P001',
'CODITECT Platform',
'Complete CODITECT platform development and rollout',
'active',
'Hal Casteel',
datetime.now().isoformat()
))
print(" ✓ Project P001: CODITECT Platform")
# Insert Sub-Project SP001
cursor.execute("""
INSERT OR IGNORE INTO sub_projects (id, project_id, name, path, description, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
'SP001',
'P001',
'coditect-core',
'.',
'Core framework for CODITECT',
'active',
datetime.now().isoformat()
))
print(" ✓ Sub-Project SP001: coditect-core")
# Insert Sprint-25
cursor.execute("""
INSERT OR IGNORE INTO sprints (id, project_id, name, goal, start_date, end_date, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
'Sprint-25',
'P001',
'Sprint 25',
'V2 Project Structure',
'2025-12-16',
'2025-12-27',
'planned',
datetime.now().isoformat()
))
print(" ✓ Sprint-25: V2 Project Structure (2025-12-16 to 2025-12-27)")
conn.commit()
print("Sample data initialized successfully.\n")
def verify_schema(conn): """Verify schema creation by querying table metadata.""" cursor = conn.cursor()
print("Verifying schema...")
# Count tables
cursor.execute("""
SELECT COUNT(*) FROM sqlite_master
WHERE type='table' AND name IN ('projects', 'sub_projects', 'sprints', 'work_items')
""")
table_count = cursor.fetchone()[0]
print(f" ✓ {table_count}/4 tables created")
# Count indexes
cursor.execute("""
SELECT COUNT(*) FROM sqlite_master
WHERE type='index' AND name LIKE 'idx_%'
""")
index_count = cursor.fetchone()[0]
print(f" ✓ {index_count} indexes created")
# Count views
cursor.execute("""
SELECT COUNT(*) FROM sqlite_master
WHERE type='view' AND name IN (
'work_item_children_stats', 'work_item_progress', 'epic_progress',
'sprint_burndown', 'project_progress', 'sub_project_progress'
)
""")
view_count = cursor.fetchone()[0]
print(f" ✓ {view_count}/6 views created")
# Verify data initialization
cursor.execute("SELECT COUNT(*) FROM projects")
project_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM sub_projects")
subproject_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM sprints")
sprint_count = cursor.fetchone()[0]
print(f" ✓ {project_count} project(s) initialized")
print(f" ✓ {subproject_count} sub-project(s) initialized")
print(f" ✓ {sprint_count} sprint(s) initialized")
print("\nSchema verification complete.\n")
def print_summary(): """Print final success summary.""" print("=" * 70) print("Work Item Hierarchy Database Schema Initialized Successfully") print("=" * 70) print() print("Database: context-storage/sessions.db") print() print("Tables Created:") print(" - projects (top-level container)") print(" - sub_projects (maps to submodules)") print(" - sprints (time-boxed iterations)") print(" - work_items (epic, feature, task, subtask)") print() print("Views Created:") print(" - work_item_children_stats (completion rollup)") print(" - work_item_progress (progress with percentages)") print(" - epic_progress (epic-level aggregation)") print(" - sprint_burndown (sprint metrics)") print(" - project_progress (project-level overview)") print(" - sub_project_progress (sub-project metrics)") print() print("Indexes: All foreign keys and query optimization indexes created") print() print("Sample Data Initialized:") print(" - Project P001: CODITECT Platform") print(" - Sub-Project SP001: coditect-core (path: .)") print(" - Sprint-25: V2 Project Structure (2025-12-16 to 2025-12-27)") print() print("Next Steps:") print(" 1. Create work items using /epic, /feature, /task commands") print(" 2. Query progress using /cxq --epic-progress, --sprint-status") print(" 3. Import existing tasks from PROJECT-PLAN.md") print() print("ADR Reference: docs/03-architecture/adrs/ADR-006-WORK-ITEM-HIERARCHY.md") print("=" * 70)
def main(): """Main execution function.""" print() print("=" * 70) print("ADR-006: Work Item Hierarchy Database Initialization") print("=" * 70) print() print(f"Database Path: {DB_PATH}") print()
try:
# Connect to database
conn = connect_db()
# Create schema
create_tables(conn)
create_indexes(conn)
create_views(conn)
# Initialize data
initialize_data(conn)
# Verify
verify_schema(conn)
# Close connection
conn.close()
# Print summary
print_summary()
return 0
except Exception as e:
print(f"\nERROR: {e}")
import traceback
traceback.print_exc()
return 1
if name == "main": sys.exit(main())