Skip to main content

scripts-work-items

#!/usr/bin/env python3 """​

title: "Paths" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Work Items Management - Pop-the-Top Task Automation for CODITECT" keywords: ['automation', 'database', 'items', 'work'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "work_items.py" language: python executable: true usage: "python3 scripts/work_items.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false​

Work Items Management - Pop-the-Top Task Automation for CODITECT

Provides task queue management for the V2 project plan, enabling automatic selection and management of the next priority task.

Usage: python3 scripts/work_items.py next # Get next priority task python3 scripts/work_items.py next E001 # Next task from specific epic python3 scripts/work_items.py next --sprint Sprint-26 # Next task from sprint python3 scripts/work_items.py start E001-T001 # Mark task as in_progress python3 scripts/work_items.py complete E001-T001 # Mark task as completed python3 scripts/work_items.py show E001-T001 # Show task details python3 scripts/work_items.py dashboard # Show work overview python3 scripts/work_items.py search "query" # Search tasks

Version: 1.0.0 """

import sqlite3 import sys import json import argparse from pathlib import Path from datetime import datetime from typing import Optional, List, Dict, Any

Paths

SCRIPT_DIR = Path(file).parent REPO_ROOT = SCRIPT_DIR.parent

ADR-114 & ADR-118: Use centralized path discovery for user data

sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import ( get_context_storage_dir, get_sessions_db_path, SESSIONS_DB, ) DB_PATH = SESSIONS_DB # Work items are TIER 3 (session/task data) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "sessions.db" else: DB_PATH = REPO_ROOT / "context-storage" / "sessions.db"

NOTE: context.db is DEPRECATED - NO FALLBACK per ADR-118

def get_db() -> sqlite3.Connection: """Get database connection (ADR-118 four-tier architecture) - NO FALLBACK to context.db.""" if not DB_PATH.exists(): print(f"ERROR: sessions.db not found at {DB_PATH}") print("Run: python3 scripts/init-work-item-db.py or /cx to create it") sys.exit(1)

conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn

def get_next_task(epic_id: Optional[str] = None, sprint_id: Optional[str] = None, task_type: str = 'task') -> Optional[Dict[str, Any]]: """Get the next priority task that's planned (not started).""" conn = get_db()

# Build query based on filters
sql = """
SELECT
w.id,
w.type,
w.title,
w.description,
w.parent_id,
w.status,
w.priority,
w.estimate_hours,
w.sprint_id,
p.title as parent_title,
e.id as epic_id,
e.title as epic_title
FROM work_items w
LEFT JOIN work_items p ON p.id = w.parent_id
LEFT JOIN work_items f ON f.id = w.parent_id AND f.type = 'feature'
LEFT JOIN work_items e ON e.id = f.parent_id AND e.type = 'epic'
WHERE w.type = ?
AND w.status = 'planned'
"""
params = [task_type]

if epic_id:
sql += " AND (e.id = ? OR w.id LIKE ? || '-%')"
params.extend([epic_id, epic_id])

if sprint_id:
sql += " AND w.sprint_id = ?"
params.append(sprint_id)

# Order by priority (higher first), then by ID for determinism
sql += " ORDER BY COALESCE(w.priority, 50) DESC, w.id LIMIT 1"

row = conn.execute(sql, params).fetchone()
conn.close()

if not row:
return None

return dict(row)

def get_task_context(task_id: str) -> Dict[str, Any]: """Get full context for a task including hierarchy and related items.""" conn = get_db()

# Get the task
task_sql = """
SELECT w.*, p.title as parent_title
FROM work_items w
LEFT JOIN work_items p ON p.id = w.parent_id
WHERE w.id = ?
"""
task = conn.execute(task_sql, (task_id,)).fetchone()

if not task:
conn.close()
return {'error': f'Task {task_id} not found'}

result = dict(task)

# Get subtasks if any
subtasks_sql = """
SELECT id, title, status, estimate_hours
FROM work_items
WHERE parent_id = ?
ORDER BY id
"""
subtasks = [dict(row) for row in conn.execute(subtasks_sql, (task_id,)).fetchall()]
result['subtasks'] = subtasks

# Get sibling tasks (same parent)
siblings_sql = """
SELECT id, title, status
FROM work_items
WHERE parent_id = ? AND id != ?
ORDER BY id
LIMIT 10
"""
siblings = [dict(row) for row in conn.execute(siblings_sql, (task['parent_id'], task_id)).fetchall()]
result['siblings'] = siblings

# Build hierarchy path
path = [task_id]
current_parent = task['parent_id']
while current_parent:
path.insert(0, current_parent)
parent_row = conn.execute("SELECT parent_id FROM work_items WHERE id = ?", (current_parent,)).fetchone()
current_parent = parent_row['parent_id'] if parent_row else None

result['hierarchy_path'] = ' -> '.join(path)

conn.close()
return result

def update_task_status(task_id: str, new_status: str) -> Dict[str, Any]: """Update task status and return result.""" valid_statuses = ['planned', 'in_progress', 'completed', 'blocked', 'cancelled'] if new_status not in valid_statuses: return {'error': f'Invalid status. Must be one of: {", ".join(valid_statuses)}'}

conn = get_db()

# Check task exists
task = conn.execute("SELECT id, title, status FROM work_items WHERE id = ?", (task_id,)).fetchone()
if not task:
conn.close()
return {'error': f'Task {task_id} not found'}

old_status = task['status']

# Update
conn.execute("""
UPDATE work_items
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_status, task_id))

conn.commit()
conn.close()

return {
'success': True,
'task_id': task_id,
'title': task['title'],
'old_status': old_status,
'new_status': new_status
}

def start_task(task_id: str) -> Dict[str, Any]: """Mark a task as in_progress.""" return update_task_status(task_id, 'in_progress')

def complete_task(task_id: str) -> Dict[str, Any]: """Mark a task as completed.""" result = update_task_status(task_id, 'completed')

if result.get('success'):
# Check if all siblings are complete to potentially update parent
conn = get_db()
task = conn.execute("SELECT parent_id FROM work_items WHERE id = ?", (task_id,)).fetchone()

if task and task['parent_id']:
# Check sibling completion
siblings = conn.execute("""
SELECT COUNT(*) as total,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed
FROM work_items
WHERE parent_id = ?
""", (task['parent_id'],)).fetchone()

result['parent_progress'] = {
'parent_id': task['parent_id'],
'completed': siblings['completed'],
'total': siblings['total'],
'all_complete': siblings['completed'] == siblings['total']
}

conn.close()

return result

def search_tasks(query: str, status: Optional[str] = None, task_type: Optional[str] = None, limit: int = 20) -> List[Dict[str, Any]]: """Search tasks by title/description.""" conn = get_db()

sql = """
SELECT
w.id, w.type, w.title, w.status, w.priority, w.estimate_hours,
w.parent_id, p.title as parent_title
FROM work_items w
LEFT JOIN work_items p ON p.id = w.parent_id
WHERE (w.title LIKE ? OR w.description LIKE ?)
"""
params = [f'%{query}%', f'%{query}%']

if status:
sql += " AND w.status = ?"
params.append(status)

if task_type:
sql += " AND w.type = ?"
params.append(task_type)

sql += " ORDER BY w.priority DESC, w.id LIMIT ?"
params.append(limit)

results = [dict(row) for row in conn.execute(sql, params).fetchall()]
conn.close()

return results

def get_dashboard() -> Dict[str, Any]: """Get work item dashboard overview.""" conn = get_db()

dashboard = {}

# Overall stats by type and status
stats_sql = """
SELECT
type,
status,
COUNT(*) as count
FROM work_items
GROUP BY type, status
ORDER BY type, status
"""
stats = {}
for row in conn.execute(stats_sql).fetchall():
if row['type'] not in stats:
stats[row['type']] = {}
stats[row['type']][row['status']] = row['count']
dashboard['by_type_status'] = stats

# In progress items
in_progress_sql = """
SELECT w.id, w.type, w.title, w.parent_id, p.title as parent_title
FROM work_items w
LEFT JOIN work_items p ON p.id = w.parent_id
WHERE w.status = 'in_progress'
ORDER BY w.priority DESC, w.id
LIMIT 10
"""
dashboard['in_progress'] = [dict(row) for row in conn.execute(in_progress_sql).fetchall()]

# Next up (planned, ready to start)
next_up_sql = """
SELECT w.id, w.type, w.title, w.priority, w.estimate_hours
FROM work_items w
WHERE w.type = 'task' AND w.status = 'planned'
ORDER BY w.priority DESC, w.id
LIMIT 5
"""
dashboard['next_up'] = [dict(row) for row in conn.execute(next_up_sql).fetchall()]

# Epic progress summary
epic_sql = """
SELECT
e.id, e.title,
COUNT(CASE WHEN w.type IN ('task', 'subtask') THEN 1 END) as total,
COUNT(CASE WHEN w.type IN ('task', 'subtask') AND w.status = 'completed' THEN 1 END) as completed
FROM work_items e
LEFT JOIN work_items f ON f.parent_id = e.id
LEFT JOIN work_items w ON w.parent_id = f.id OR w.parent_id LIKE e.id || '-%'
WHERE e.type = 'epic'
GROUP BY e.id
ORDER BY e.id
"""
epics = []
for row in conn.execute(epic_sql).fetchall():
epic = dict(row)
epic['percent'] = round(epic['completed'] / epic['total'] * 100, 1) if epic['total'] > 0 else 0
epics.append(epic)
dashboard['epics'] = epics

# Current sprint
sprint_sql = """
SELECT s.id, s.name, s.goal, s.start_date, s.end_date,
COUNT(w.id) as tasks,
SUM(CASE WHEN w.status = 'completed' THEN 1 ELSE 0 END) as completed
FROM sprints s
LEFT JOIN work_items w ON w.sprint_id = s.id AND w.type = 'task'
WHERE s.status = 'active'
GROUP BY s.id
"""
sprint = conn.execute(sprint_sql).fetchone()
dashboard['active_sprint'] = dict(sprint) if sprint else None

conn.close()
return dashboard

def print_task_details(task: Dict[str, Any], full: bool = False): """Pretty print task details.""" print("=" * 70) print(f"Task: {task['id']} - {task['title']}") print("=" * 70) print(f" Type: {task['type']}") print(f" Status: {task['status']}") print(f" Priority: {task.get('priority', 'N/A')}")

if task.get('parent_title'):
print(f" Parent: {task['parent_id']} ({task['parent_title']})")

if task.get('epic_title'):
print(f" Epic: {task.get('epic_id')} ({task['epic_title']})")

if task.get('sprint_id'):
print(f" Sprint: {task['sprint_id']}")

if task.get('estimate_hours'):
print(f" Estimate: {task['estimate_hours']}h")

if task.get('hierarchy_path'):
print(f" Path: {task['hierarchy_path']}")

if full and task.get('description'):
print(f"\nDescription:\n {task['description']}")

if task.get('subtasks'):
print(f"\nSubtasks ({len(task['subtasks'])}):")
for st in task['subtasks'][:10]:
status_icon = "[x]" if st['status'] == 'completed' else "[ ]"
print(f" {status_icon} {st['id']}: {st['title']}")

if task.get('siblings'):
print(f"\nSibling Tasks:")
for sib in task['siblings'][:5]:
status_icon = "[x]" if sib['status'] == 'completed' else "[ ]"
print(f" {status_icon} {sib['id']}: {sib['title'][:50]}")

def print_dashboard(dashboard: Dict[str, Any]): """Pretty print dashboard.""" print("=" * 70) print("CODITECT Work Item Dashboard") print("=" * 70)

# Active Sprint
if dashboard.get('active_sprint'):
s = dashboard['active_sprint']
completed = s['completed'] or 0
total = s['tasks'] or 0
pct = round(completed / total * 100, 1) if total > 0 else 0
print(f"\nActive Sprint: {s['name']}")
print(f" Goal: {s['goal']}")
print(f" Progress: {completed}/{total} tasks ({pct}%)")
print(f" Dates: {s['start_date']} to {s['end_date']}")

# In Progress
if dashboard.get('in_progress'):
print(f"\nIn Progress ({len(dashboard['in_progress'])}):")
for item in dashboard['in_progress']:
print(f" [{item['type']}] {item['id']}: {item['title'][:45]}...")
else:
print("\nIn Progress: None")

# Next Up
if dashboard.get('next_up'):
print(f"\nNext Up (Ready to Start):")
for item in dashboard['next_up']:
hrs = f"{item['estimate_hours']}h" if item.get('estimate_hours') else "?"
print(f" {item['id']}: {item['title'][:45]}... ({hrs})")

# Epic Progress
if dashboard.get('epics'):
print(f"\nEpic Progress:")
for epic in dashboard['epics']:
bar_len = 20
filled = int(bar_len * epic['percent'] / 100)
bar = "=" * filled + "-" * (bar_len - filled)
print(f" {epic['id']}: [{bar}] {epic['percent']:5.1f}% {epic['title'][:30]}")

# Summary counts
if dashboard.get('by_type_status'):
print(f"\nSummary:")
for item_type, statuses in dashboard['by_type_status'].items():
total = sum(statuses.values())
completed = statuses.get('completed', 0)
print(f" {item_type:10} {completed:>4}/{total:<4} completed")

def main(): parser = argparse.ArgumentParser( description="Work Items Management - Pop-the-Top Task Automation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Commands: next Get next priority task (auto-select) start ID Mark task as in_progress complete ID Mark task as completed show ID Show task details search QUERY Search tasks dashboard Show work overview update ID STATUS Update task status

Examples: python3 scripts/work_items.py next # Pop next task python3 scripts/work_items.py next E001 # Next from epic E001 python3 scripts/work_items.py next --sprint Sprint-26 python3 scripts/work_items.py start E001-T001 # Start working python3 scripts/work_items.py complete E001-T001 # Mark done python3 scripts/work_items.py dashboard # Overview """ )

parser.add_argument('command', choices=['next', 'start', 'complete', 'show', 'search', 'dashboard', 'update'],
help='Command to execute')
parser.add_argument('arg', nargs='?', help='Task ID or search query')
parser.add_argument('--epic', '-e', help='Filter by epic ID')
parser.add_argument('--sprint', '-s', help='Filter by sprint ID')
parser.add_argument('--status', help='Filter by status')
parser.add_argument('--type', dest='task_type', default='task', help='Item type (default: task)')
parser.add_argument('--json', '-j', action='store_true', help='Output as JSON')
parser.add_argument('--full', '-f', action='store_true', help='Show full details')
parser.add_argument('--limit', '-l', type=int, default=20, help='Result limit')

args = parser.parse_args()

# Handle commands
if args.command == 'next':
epic = args.arg or args.epic
task = get_next_task(epic_id=epic, sprint_id=args.sprint, task_type=args.task_type)

if not task:
print("No tasks available matching criteria.")
return 1

if args.json:
print(json.dumps(task, indent=2))
else:
# Get full context
context = get_task_context(task['id'])
print_task_details(context, full=args.full)
print("\nTo start this task:")
print(f" python3 scripts/work_items.py start {task['id']}")

elif args.command == 'start':
if not args.arg:
print("Error: Task ID required")
return 1

result = start_task(args.arg)
if args.json:
print(json.dumps(result, indent=2))
elif result.get('error'):
print(f"Error: {result['error']}")
return 1
else:
print(f"Started: {result['task_id']} - {result['title']}")
print(f" Status: {result['old_status']} -> {result['new_status']}")

elif args.command == 'complete':
if not args.arg:
print("Error: Task ID required")
return 1

result = complete_task(args.arg)
if args.json:
print(json.dumps(result, indent=2))
elif result.get('error'):
print(f"Error: {result['error']}")
return 1
else:
print(f"Completed: {result['task_id']} - {result['title']}")
if result.get('parent_progress'):
pp = result['parent_progress']
print(f" Parent {pp['parent_id']}: {pp['completed']}/{pp['total']} complete")
if pp['all_complete']:
print(f" All tasks in {pp['parent_id']} are complete!")

elif args.command == 'show':
if not args.arg:
print("Error: Task ID required")
return 1

context = get_task_context(args.arg)
if context.get('error'):
print(f"Error: {context['error']}")
return 1

if args.json:
print(json.dumps(context, indent=2))
else:
print_task_details(context, full=True)

elif args.command == 'search':
if not args.arg:
print("Error: Search query required")
return 1

results = search_tasks(args.arg, status=args.status,
task_type=args.task_type, limit=args.limit)

if args.json:
print(json.dumps(results, indent=2))
else:
print(f"Found {len(results)} results for '{args.arg}':\n")
for r in results:
status_icon = "[x]" if r['status'] == 'completed' else "[ ]"
print(f" {status_icon} {r['id']}: {r['title'][:50]}")
if r.get('parent_title'):
print(f" Parent: {r['parent_id']} ({r['parent_title'][:30]})")

elif args.command == 'dashboard':
dashboard = get_dashboard()

if args.json:
print(json.dumps(dashboard, indent=2))
else:
print_dashboard(dashboard)

elif args.command == 'update':
if not args.arg or not args.status:
print("Error: Task ID and --status required")
print("Usage: work_items.py update TASK_ID --status STATUS")
return 1

result = update_task_status(args.arg, args.status)
if args.json:
print(json.dumps(result, indent=2))
elif result.get('error'):
print(f"Error: {result['error']}")
return 1
else:
print(f"Updated: {result['task_id']}")
print(f" Status: {result['old_status']} -> {result['new_status']}")

return 0

if name == "main": sys.exit(main())