Skip to main content

scripts-task-extraction-pipeline

#!/usr/bin/env python3 """

title: "============================================================================" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Task Extraction Pipeline" keywords: ['analysis', 'api', 'ci/cd', 'database', 'deployment'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "task-extraction-pipeline.py" language: python executable: true usage: "python3 scripts/task-extraction-pipeline.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Task Extraction Pipeline

Multi-stage pipeline to extract, analyze, and reconcile uncompleted work:

Stage 1: Extract unique messages by date chunks → chunk files Stage 2: LLM analyze each chunk for tasks → task extractions Stage 3: Categorize and deduplicate → categorized tasks Stage 4: Cross-check against tasklist files → gap analysis Stage 5: Update master outstanding tasks list Stage 6: Generate executive summary with recommendations

Usage: python3 scripts/task-extraction-pipeline.py --stage 1 # Run stage 1 only python3 scripts/task-extraction-pipeline.py --all # Run all stages python3 scripts/task-extraction-pipeline.py --resume # Resume from last stage """

import argparse import json import os import re import sqlite3 import subprocess import sys from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path from typing import Optional

============================================================================

CONFIGURATION

============================================================================

CODITECT_CORE = Path(file).parent.parent PIPELINE_DIR = CODITECT_CORE / "reports" / "task-extraction-pipeline" PIPELINE_DIR.mkdir(parents=True, exist_ok=True)

Stage output directories

STAGE_DIRS = { 1: PIPELINE_DIR / "stage1-date-chunks", 2: PIPELINE_DIR / "stage2-llm-extractions", 3: PIPELINE_DIR / "stage3-categorized", 4: PIPELINE_DIR / "stage4-gap-analysis", 5: PIPELINE_DIR / "stage5-master-list", 6: PIPELINE_DIR / "stage6-summary", }

for d in STAGE_DIRS.values(): d.mkdir(exist_ok=True)

Context database - ADR-114 & ADR-118: Use centralized path discovery

SCRIPT_DIR = Path(file).parent sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_sessions_db_path, SESSIONS_DB CONTEXT_DB = SESSIONS_DB # Task extraction reads from sessions.db (Tier 3) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): CONTEXT_DB = _user_data / "sessions.db" else: CONTEXT_DB = CODITECT_CORE / "context-storage" / "sessions.db"

Date configuration

END_DATE = datetime.now() START_DATE = datetime(2025, 10, 1) # Last ~2.5 months CHUNK_DAYS = 7 # 1-week chunks OVERLAP_DAYS = 2 # 2-day overlap

Task categories

TASK_CATEGORIES = [ "agents", "commands", "skills", "workflows", "scripts", "hooks", "documentation", "infrastructure", "testing", "deployment", "security", "performance", "ui/ux", "database", "api", "integration", "refactoring", "bugs", "features", "other" ]

Status values

TASK_STATUSES = [ "not_started", "in_progress", "blocked", "pending_review", "completed", "unknown" ]

============================================================================

STAGE 1: Extract messages by date chunks

============================================================================

def stage1_extract_by_date(): """Extract unique messages from context DB in date chunks.""" print("\n" + "=" * 70) print("STAGE 1: Extracting messages by date chunks") print("=" * 70)

if not CONTEXT_DB.exists():
print(f"ERROR: Context database not found at {CONTEXT_DB}")
print("Run /cx first to populate the database")
return False

chunks = generate_date_chunks()
print(f"Generated {len(chunks)} date chunks ({CHUNK_DAYS}-day chunks, {OVERLAP_DAYS}-day overlap)")

conn = sqlite3.connect(CONTEXT_DB)
cursor = conn.cursor()

# Check table structure
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [t[0] for t in cursor.fetchall()]
print(f"Available tables: {tables}")

# Determine the messages table
if 'unified_messages' in tables:
msg_table = 'unified_messages'
elif 'messages' in tables:
msg_table = 'messages'
else:
print("ERROR: No messages table found")
conn.close()
return False

# Get column info
cursor.execute(f"PRAGMA table_info({msg_table})")
columns = [col[1] for col in cursor.fetchall()]
print(f"Columns in {msg_table}: {columns}")

# Determine date column
date_col = None
for col in ['timestamp', 'created_at', 'date', 'extracted_at']:
if col in columns:
date_col = col
break

if not date_col:
print("WARNING: No date column found, will extract all messages")
date_col = None

# Content column
content_col = 'content' if 'content' in columns else 'message' if 'message' in columns else columns[1]

total_messages = 0

for i, chunk in enumerate(chunks):
chunk_file = STAGE_DIRS[1] / f"chunk_{i:03d}_{chunk['start']}_{chunk['end']}.json"

if date_col:
query = f"""
SELECT DISTINCT {content_col}, {date_col}
FROM {msg_table}
WHERE {date_col} >= ? AND {date_col} <= ?
ORDER BY {date_col} DESC
"""
cursor.execute(query, (chunk['start'], chunk['end'] + ' 23:59:59'))
else:
query = f"SELECT DISTINCT {content_col} FROM {msg_table} LIMIT 10000"
cursor.execute(query)

rows = cursor.fetchall()

messages = []
for row in rows:
content = row[0] if row[0] else ""
# Filter for task-related content
if any(kw in content.lower() for kw in [
'todo', 'task', 'pending', 'implement', 'create', 'add', 'fix',
'need', 'should', 'must', 'will', 'plan', 'feature', 'bug',
'phase', 'milestone', 'sprint', 'blocked', 'progress'
]):
messages.append({
'content': content[:2000], # Truncate long messages
'date': row[1] if len(row) > 1 else chunk['start']
})

chunk_data = {
'chunk_index': i,
'date_range': chunk,
'message_count': len(messages),
'messages': messages
}

with open(chunk_file, 'w') as f:
json.dump(chunk_data, f, indent=2)

total_messages += len(messages)
print(f" [{i+1}/{len(chunks)}] {chunk['start']} to {chunk['end']}: {len(messages)} task-related messages")

conn.close()

# Write stage 1 summary
summary = {
'stage': 1,
'completed_at': datetime.now().isoformat(),
'total_chunks': len(chunks),
'total_messages': total_messages,
'date_range': {
'start': START_DATE.strftime('%Y-%m-%d'),
'end': END_DATE.strftime('%Y-%m-%d')
}
}

with open(STAGE_DIRS[1] / "_stage1_summary.json", 'w') as f:
json.dump(summary, f, indent=2)

print(f"\n✅ Stage 1 complete: {total_messages} messages in {len(chunks)} chunks")
return True

def generate_date_chunks(): """Generate overlapping date chunks.""" chunks = [] current_end = END_DATE

while current_end > START_DATE:
chunk_start = current_end - timedelta(days=CHUNK_DAYS)
if chunk_start < START_DATE:
chunk_start = START_DATE

chunks.append({
'start': chunk_start.strftime('%Y-%m-%d'),
'end': current_end.strftime('%Y-%m-%d'),
})

current_end = chunk_start + timedelta(days=OVERLAP_DAYS)

return list(reversed(chunks)) # Oldest first

============================================================================

STAGE 2: LLM analyze each chunk for tasks

============================================================================

def stage2_llm_analyze(): """Analyze each chunk with pattern matching to extract tasks.""" print("\n" + "=" * 70) print("STAGE 2: Analyzing chunks for task extraction") print("=" * 70)

chunk_files = sorted(STAGE_DIRS[1].glob("chunk_*.json"))

if not chunk_files:
print("ERROR: No chunk files found. Run stage 1 first.")
return False

print(f"Found {len(chunk_files)} chunk files to analyze")

all_tasks = []

# Task extraction patterns
task_patterns = [
# Direct task markers
(r'\[ \]\s*(.+?)(?:\n|$)', 'checkbox'),
(r'TODO:?\s*(.+?)(?:\n|$)', 'todo'),
(r'FIXME:?\s*(.+?)(?:\n|$)', 'fixme'),
# Action items
(r'(?:need to|needs to|should|must|will)\s+(\w+.+?)(?:\.|,|\n|$)', 'action'),
# Feature/implementation mentions
(r'implement\s+(.+?)(?:\.|,|\n|$)', 'implement'),
(r'create\s+(.+?)(?:\.|,|\n|$)', 'create'),
(r'add\s+(.+?)(?:\.|,|\n|$)', 'add'),
(r'fix\s+(.+?)(?:\.|,|\n|$)', 'fix'),
# Status mentions
(r'(?:pending|blocked|in progress)[:\s]+(.+?)(?:\.|,|\n|$)', 'status'),
# Phase/milestone
(r'phase\s+(\d+.+?)(?:\.|,|\n|$)', 'phase'),
]

for chunk_file in chunk_files:
with open(chunk_file) as f:
chunk_data = json.load(f)

chunk_tasks = []

for msg in chunk_data.get('messages', []):
content = msg.get('content', '')
msg_date = msg.get('date', chunk_data['date_range']['start'])

for pattern, task_type in task_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
task_text = match.strip() if isinstance(match, str) else ' '.join(match).strip()
if len(task_text) > 10 and len(task_text) < 500: # Reasonable length
# Categorize the task
category = categorize_task(task_text)
status = infer_status(content, task_text)

chunk_tasks.append({
'text': task_text,
'type': task_type,
'category': category,
'status': status,
'source_date': msg_date,
'context': content[:300]
})

# Save chunk analysis
analysis_file = STAGE_DIRS[2] / f"analysis_{chunk_file.stem}.json"
analysis = {
'source_chunk': chunk_file.name,
'date_range': chunk_data['date_range'],
'tasks_found': len(chunk_tasks),
'tasks': chunk_tasks
}

with open(analysis_file, 'w') as f:
json.dump(analysis, f, indent=2)

all_tasks.extend(chunk_tasks)
print(f" {chunk_file.stem}: {len(chunk_tasks)} tasks extracted")

# Write stage 2 summary
summary = {
'stage': 2,
'completed_at': datetime.now().isoformat(),
'chunks_analyzed': len(chunk_files),
'total_tasks_extracted': len(all_tasks),
'by_type': defaultdict(int),
'by_category': defaultdict(int),
'by_status': defaultdict(int)
}

for task in all_tasks:
summary['by_type'][task['type']] += 1
summary['by_category'][task['category']] += 1
summary['by_status'][task['status']] += 1

summary['by_type'] = dict(summary['by_type'])
summary['by_category'] = dict(summary['by_category'])
summary['by_status'] = dict(summary['by_status'])

with open(STAGE_DIRS[2] / "_stage2_summary.json", 'w') as f:
json.dump(summary, f, indent=2)

print(f"\n✅ Stage 2 complete: {len(all_tasks)} tasks extracted")
return True

def categorize_task(task_text: str) -> str: """Categorize a task based on keywords.""" text_lower = task_text.lower()

category_keywords = {
'agents': ['agent', 'subagent'],
'commands': ['command', 'slash', '/'],
'skills': ['skill'],
'workflows': ['workflow', 'pipeline'],
'scripts': ['script', 'python', 'bash', 'shell'],
'hooks': ['hook', 'pre-', 'post-'],
'documentation': ['doc', 'readme', 'guide', 'manual', 'changelog'],
'infrastructure': ['infra', 'server', 'cloud', 'gcp', 'aws', 'kubernetes', 'k8s', 'docker'],
'testing': ['test', 'spec', 'coverage', 'unit', 'e2e', 'integration'],
'deployment': ['deploy', 'release', 'ci/cd', 'build', 'publish'],
'security': ['security', 'auth', 'permission', 'credential', 'secret'],
'performance': ['performance', 'optimize', 'speed', 'cache', 'memory'],
'ui/ux': ['ui', 'ux', 'frontend', 'component', 'react', 'vue', 'css'],
'database': ['database', 'db', 'sql', 'schema', 'migration', 'query'],
'api': ['api', 'endpoint', 'rest', 'graphql', 'route'],
'integration': ['integration', 'connect', 'sync', 'import', 'export'],
'refactoring': ['refactor', 'cleanup', 'reorganize', 'restructure'],
'bugs': ['bug', 'fix', 'error', 'issue', 'broken'],
'features': ['feature', 'implement', 'add', 'create', 'new'],
}

for category, keywords in category_keywords.items():
if any(kw in text_lower for kw in keywords):
return category

return 'other'

def infer_status(context: str, task_text: str) -> str: """Infer task status from context.""" combined = (context + ' ' + task_text).lower()

if any(kw in combined for kw in ['completed', 'done', 'finished', '✅', '[x]']):
return 'completed'
elif any(kw in combined for kw in ['blocked', 'waiting', 'depends on', 'blocker']):
return 'blocked'
elif any(kw in combined for kw in ['in progress', 'working on', 'started', 'wip']):
return 'in_progress'
elif any(kw in combined for kw in ['review', 'pr', 'pull request']):
return 'pending_review'
elif any(kw in combined for kw in ['todo', 'need to', 'should', 'must', 'will', '[ ]', 'pending', 'planned']):
return 'not_started'

return 'unknown'

============================================================================

STAGE 3: Categorize and deduplicate

============================================================================

def stage3_categorize(): """Consolidate and deduplicate tasks by category.""" print("\n" + "=" * 70) print("STAGE 3: Categorizing and deduplicating tasks") print("=" * 70)

analysis_files = sorted(STAGE_DIRS[2].glob("analysis_*.json"))

if not analysis_files:
print("ERROR: No analysis files found. Run stage 2 first.")
return False

# Collect all tasks
all_tasks = []
for f in analysis_files:
with open(f) as fp:
data = json.load(fp)
all_tasks.extend(data.get('tasks', []))

print(f"Total tasks before deduplication: {len(all_tasks)}")

# Deduplicate by normalized text
seen = set()
unique_tasks = []

for task in all_tasks:
# Normalize for comparison
normalized = re.sub(r'\s+', ' ', task['text'].lower().strip())[:100]

if normalized not in seen and len(normalized) > 10:
seen.add(normalized)
unique_tasks.append(task)

print(f"Unique tasks after deduplication: {len(unique_tasks)}")

# Organize by category
by_category = defaultdict(list)
for task in unique_tasks:
by_category[task['category']].append(task)

# Organize by status
by_status = defaultdict(list)
for task in unique_tasks:
by_status[task['status']].append(task)

# Save categorized output
categorized = {
'stage': 3,
'completed_at': datetime.now().isoformat(),
'total_unique_tasks': len(unique_tasks),
'by_category': {k: {'count': len(v), 'tasks': v} for k, v in sorted(by_category.items())},
'by_status': {k: {'count': len(v), 'tasks': v} for k, v in sorted(by_status.items())},
}

with open(STAGE_DIRS[3] / "categorized_tasks.json", 'w') as f:
json.dump(categorized, f, indent=2)

# Also save as readable markdown
md_content = generate_categorized_markdown(categorized)
with open(STAGE_DIRS[3] / "categorized_tasks.md", 'w') as f:
f.write(md_content)

print(f"\n✅ Stage 3 complete: {len(unique_tasks)} unique tasks categorized")

# Print summary
print("\n📊 Tasks by Category:")
for cat, data in sorted(by_category.items(), key=lambda x: -len(x[1])):
print(f" {cat}: {len(data)}")

print("\n📊 Tasks by Status:")
for status, data in sorted(by_status.items(), key=lambda x: -len(x[1])):
print(f" {status}: {len(data)}")

return True

def generate_categorized_markdown(data: dict) -> str: """Generate markdown report of categorized tasks.""" lines = [ "# Categorized Tasks Report", f"\nGenerated: {data['completed_at']}", f"\nTotal Unique Tasks: {data['total_unique_tasks']}", "\n---\n", "## Tasks by Category\n" ]

for category, cat_data in sorted(data['by_category'].items(), key=lambda x: -x[1]['count']):
lines.append(f"\n### {category.title()} ({cat_data['count']} tasks)\n")
for task in cat_data['tasks'][:20]: # Show top 20 per category
status_emoji = {
'not_started': '⬜',
'in_progress': '🔄',
'blocked': '🚫',
'pending_review': '👀',
'completed': '✅',
'unknown': '❓'
}.get(task['status'], '❓')
lines.append(f"- {status_emoji} {task['text'][:100]}")
if cat_data['count'] > 20:
lines.append(f"- ... and {cat_data['count'] - 20} more")

lines.append("\n---\n")
lines.append("## Tasks by Status\n")

for status, status_data in sorted(data['by_status'].items(), key=lambda x: -x[1]['count']):
lines.append(f"\n### {status.replace('_', ' ').title()} ({status_data['count']} tasks)\n")

return '\n'.join(lines)

============================================================================

STAGE 4: Cross-check against tasklist files (gap analysis)

============================================================================

def stage4_gap_analysis(): """Cross-check extracted tasks against existing tasklist files.""" print("\n" + "=" * 70) print("STAGE 4: Gap analysis - cross-checking against tasklists") print("=" * 70)

# Load categorized tasks from stage 3
cat_file = STAGE_DIRS[3] / "categorized_tasks.json"
if not cat_file.exists():
print("ERROR: Categorized tasks not found. Run stage 3 first.")
return False

with open(cat_file) as f:
categorized = json.load(f)

# Scan all tasklist files
print("Scanning tasklist files...")
tasklist_data = scan_all_tasklists()

print(f"Found {len(tasklist_data)} files with pending tasks")
total_file_pending = sum(t['pending'] for t in tasklist_data.values())
total_file_completed = sum(t['completed'] for t in tasklist_data.values())
print(f"Total from files: {total_file_pending} pending, {total_file_completed} completed")

# Compare context-extracted vs file-based
context_tasks = categorized['total_unique_tasks']

gap_analysis = {
'stage': 4,
'completed_at': datetime.now().isoformat(),
'context_extracted_tasks': context_tasks,
'file_based_tasks': {
'total_pending': total_file_pending,
'total_completed': total_file_completed,
'files_count': len(tasklist_data)
},
'top_files_by_pending': sorted(
[{'file': k, **v} for k, v in tasklist_data.items()],
key=lambda x: -x['pending']
)[:30],
'category_coverage': {},
'potential_gaps': [],
'recommendations': []
}

# Analyze category coverage
for category, cat_data in categorized['by_category'].items():
# Check if there are tasklist files covering this category
relevant_files = [
f for f in tasklist_data.keys()
if category.lower() in f.lower() or
any(kw in f.lower() for kw in category.split('/'))
]

gap_analysis['category_coverage'][category] = {
'context_mentions': cat_data['count'],
'relevant_files': len(relevant_files),
'coverage': 'good' if relevant_files else 'potential_gap'
}

if not relevant_files and cat_data['count'] > 5:
gap_analysis['potential_gaps'].append({
'category': category,
'mentions': cat_data['count'],
'issue': f"No dedicated tasklist file found for {category}"
})

# Generate recommendations
if gap_analysis['potential_gaps']:
gap_analysis['recommendations'].append(
"Consider creating dedicated tasklist files for categories with gaps"
)

if context_tasks < total_file_pending * 0.1:
gap_analysis['recommendations'].append(
"Context mentions are much lower than file tasks - consider running /cx to capture more context"
)

# Save gap analysis
with open(STAGE_DIRS[4] / "gap_analysis.json", 'w') as f:
json.dump(gap_analysis, f, indent=2)

# Generate markdown report
md_content = generate_gap_analysis_markdown(gap_analysis)
with open(STAGE_DIRS[4] / "gap_analysis.md", 'w') as f:
f.write(md_content)

print(f"\n✅ Stage 4 complete: Gap analysis generated")
print(f" Context-extracted tasks: {context_tasks}")
print(f" File-based pending tasks: {total_file_pending}")
print(f" Potential gaps identified: {len(gap_analysis['potential_gaps'])}")

return True

def scan_all_tasklists() -> dict: """Scan all markdown files for pending tasks.""" tasklist_data = {}

for md_file in CODITECT_CORE.rglob("*.md"):
if 'node_modules' in str(md_file) or '.git' in str(md_file):
continue

try:
content = md_file.read_text(encoding='utf-8', errors='ignore')
pending = len(re.findall(r'\[ \]', content))
completed = len(re.findall(r'\[x\]', content, re.IGNORECASE))

if pending > 0:
rel_path = str(md_file.relative_to(CODITECT_CORE))
tasklist_data[rel_path] = {
'pending': pending,
'completed': completed,
'total': pending + completed,
'completion_rate': round(completed / (pending + completed) * 100, 1) if (pending + completed) > 0 else 0
}
except Exception:
continue

return tasklist_data

def generate_gap_analysis_markdown(data: dict) -> str: """Generate markdown gap analysis report.""" lines = [ "# Gap Analysis Report", f"\nGenerated: {data['completed_at']}", "\n---\n", "## Summary\n", f"- Context-extracted tasks: {data['context_extracted_tasks']}", f"- File-based pending tasks: {data['file_based_tasks']['total_pending']}", f"- File-based completed tasks: {data['file_based_tasks']['total_completed']}", f"- Files with pending tasks: {data['file_based_tasks']['files_count']}", "\n---\n", "## Top 20 Files by Pending Tasks\n", ]

for i, f in enumerate(data['top_files_by_pending'][:20], 1):
lines.append(f"{i}. **{f['pending']}** pending ({f['completion_rate']}% done) - `{f['file']}`")

lines.append("\n---\n")
lines.append("## Category Coverage\n")

for cat, coverage in sorted(data['category_coverage'].items()):
emoji = '✅' if coverage['coverage'] == 'good' else '⚠️'
lines.append(f"- {emoji} **{cat}**: {coverage['context_mentions']} mentions, {coverage['relevant_files']} relevant files")

if data['potential_gaps']:
lines.append("\n---\n")
lines.append("## Potential Gaps\n")
for gap in data['potential_gaps']:
lines.append(f"- ⚠️ **{gap['category']}**: {gap['mentions']} mentions - {gap['issue']}")

if data['recommendations']:
lines.append("\n---\n")
lines.append("## Recommendations\n")
for rec in data['recommendations']:
lines.append(f"- 💡 {rec}")

return '\n'.join(lines)

============================================================================

STAGE 5: Update master outstanding tasks list

============================================================================

def stage5_master_list(): """Generate master list of all outstanding tasks.""" print("\n" + "=" * 70) print("STAGE 5: Generating master outstanding tasks list") print("=" * 70)

# Load data from previous stages
cat_file = STAGE_DIRS[3] / "categorized_tasks.json"
gap_file = STAGE_DIRS[4] / "gap_analysis.json"

if not cat_file.exists() or not gap_file.exists():
print("ERROR: Previous stage data not found. Run stages 3 and 4 first.")
return False

with open(cat_file) as f:
categorized = json.load(f)

with open(gap_file) as f:
gap_analysis = json.load(f)

# Build master list
master_list = {
'stage': 5,
'generated_at': datetime.now().isoformat(),
'totals': {
'context_extracted': categorized['total_unique_tasks'],
'file_pending': gap_analysis['file_based_tasks']['total_pending'],
'file_completed': gap_analysis['file_based_tasks']['total_completed'],
},
'by_priority': {
'p0_critical': [],
'p1_high': [],
'p2_medium': [],
'p3_low': [],
},
'by_category': {},
'top_files': gap_analysis['top_files_by_pending'][:15],
}

# Prioritize tasks
for category, cat_data in categorized['by_category'].items():
master_list['by_category'][category] = {
'total': cat_data['count'],
'not_started': 0,
'in_progress': 0,
'blocked': 0,
'tasks': []
}

for task in cat_data['tasks']:
# Assign priority based on category and keywords
priority = assign_priority(task, category)
master_list['by_priority'][priority].append({
'category': category,
'text': task['text'][:150],
'status': task['status']
})

master_list['by_category'][category][task['status']] = \
master_list['by_category'][category].get(task['status'], 0) + 1

# Save master list
with open(STAGE_DIRS[5] / "master_outstanding_tasks.json", 'w') as f:
json.dump(master_list, f, indent=2)

# Generate markdown
md_content = generate_master_list_markdown(master_list)
with open(STAGE_DIRS[5] / "MASTER-OUTSTANDING-TASKS.md", 'w') as f:
f.write(md_content)

print(f"\n✅ Stage 5 complete: Master list generated")
print(f" P0 Critical: {len(master_list['by_priority']['p0_critical'])}")
print(f" P1 High: {len(master_list['by_priority']['p1_high'])}")
print(f" P2 Medium: {len(master_list['by_priority']['p2_medium'])}")
print(f" P3 Low: {len(master_list['by_priority']['p3_low'])}")

return True

def assign_priority(task: dict, category: str) -> str: """Assign priority to a task.""" text_lower = task['text'].lower()

# P0: Critical blockers
if any(kw in text_lower for kw in ['blocker', 'critical', 'urgent', 'security', 'production']):
return 'p0_critical'
if category in ['security', 'deployment'] and task['status'] == 'blocked':
return 'p0_critical'

# P1: High priority
if any(kw in text_lower for kw in ['autonomy', 'infrastructure', 'core', 'foundation']):
return 'p1_high'
if category in ['infrastructure', 'api', 'database']:
return 'p1_high'

# P2: Medium priority
if category in ['features', 'documentation', 'testing']:
return 'p2_medium'

# P3: Low priority
return 'p3_low'

def generate_master_list_markdown(data: dict) -> str: """Generate master list markdown.""" lines = [ "# Master Outstanding Tasks List", f"\nGenerated: {data['generated_at']}", "\n---\n", "## Executive Summary\n", f"| Metric | Count |", f"|--------|------:|", f"| Context-extracted tasks | {data['totals']['context_extracted']} |", f"| File-based pending | {data['totals']['file_pending']} |", f"| File-based completed | {data['totals']['file_completed']} |", f"| P0 Critical | {len(data['by_priority']['p0_critical'])} |", f"| P1 High | {len(data['by_priority']['p1_high'])} |", f"| P2 Medium | {len(data['by_priority']['p2_medium'])} |", f"| P3 Low | {len(data['by_priority']['p3_low'])} |", "\n---\n", ]

# Priority sections
for priority, label in [
('p0_critical', '🔴 P0 Critical'),
('p1_high', '🟠 P1 High'),
('p2_medium', '🟡 P2 Medium'),
]:
tasks = data['by_priority'][priority]
if tasks:
lines.append(f"\n## {label} ({len(tasks)} tasks)\n")
for task in tasks[:30]:
lines.append(f"- [{task['category']}] {task['text']}")
if len(tasks) > 30:
lines.append(f"- ... and {len(tasks) - 30} more")

lines.append("\n---\n")
lines.append("## Top Files by Pending Tasks\n")

for i, f in enumerate(data['top_files'][:15], 1):
lines.append(f"{i}. **{f['pending']}** pending - `{f['file']}`")

lines.append("\n---\n")
lines.append("## Tasks by Category\n")

for cat, cat_data in sorted(data['by_category'].items(), key=lambda x: -x[1]['total']):
lines.append(f"- **{cat}**: {cat_data['total']} total")

return '\n'.join(lines)

============================================================================

STAGE 6: Generate executive summary

============================================================================

def stage6_summary(): """Generate executive summary with recommendations.""" print("\n" + "=" * 70) print("STAGE 6: Generating executive summary") print("=" * 70)

# Load all previous stage data
master_file = STAGE_DIRS[5] / "master_outstanding_tasks.json"
gap_file = STAGE_DIRS[4] / "gap_analysis.json"

if not master_file.exists() or not gap_file.exists():
print("ERROR: Previous stage data not found.")
return False

with open(master_file) as f:
master = json.load(f)

with open(gap_file) as f:
gap = json.load(f)

# Generate summary
summary = {
'stage': 6,
'generated_at': datetime.now().isoformat(),
'overview': {
'total_outstanding': master['totals']['file_pending'],
'completion_rate': round(
master['totals']['file_completed'] /
(master['totals']['file_pending'] + master['totals']['file_completed']) * 100, 1
) if (master['totals']['file_pending'] + master['totals']['file_completed']) > 0 else 0,
'critical_items': len(master['by_priority']['p0_critical']),
'high_priority_items': len(master['by_priority']['p1_high']),
},
'key_findings': [],
'recommendations': [],
'next_actions': [],
}

# Key findings
if summary['overview']['completion_rate'] < 25:
summary['key_findings'].append(
f"Low completion rate ({summary['overview']['completion_rate']}%) indicates significant backlog"
)

if summary['overview']['critical_items'] > 0:
summary['key_findings'].append(
f"{summary['overview']['critical_items']} critical (P0) items require immediate attention"
)

if gap['potential_gaps']:
summary['key_findings'].append(
f"{len(gap['potential_gaps'])} category gaps identified in task tracking"
)

# Recommendations
summary['recommendations'] = [
"Focus on P0 critical items first - these are blockers",
"Review and update stale tasklist files",
"Consider consolidating duplicate tasklist files",
"Run /cx regularly to capture context for future analysis",
"Create dedicated tracking for identified gap categories",
]

# Next actions
summary['next_actions'] = [
"1. Address P0 critical items (immediate)",
"2. Triage P1 high priority items (this week)",
"3. Update completion status in tasklist files",
"4. Archive or consolidate old planning documents",
"5. Schedule regular task review sessions",
]

# Save summary
with open(STAGE_DIRS[6] / "executive_summary.json", 'w') as f:
json.dump(summary, f, indent=2)

# Generate markdown
md_content = generate_executive_summary_markdown(summary, master, gap)
with open(STAGE_DIRS[6] / "EXECUTIVE-SUMMARY.md", 'w') as f:
f.write(md_content)

print(f"\n✅ Stage 6 complete: Executive summary generated")
print(f"\n📁 All reports saved to: {PIPELINE_DIR}")

return True

def generate_executive_summary_markdown(summary: dict, master: dict, gap: dict) -> str: """Generate executive summary markdown.""" lines = [ "# Executive Summary: Outstanding Tasks Analysis", f"\nGenerated: {summary['generated_at']}", "\n---\n", "## Overview\n", f"| Metric | Value |", f"|--------|------:|", f"| Total Outstanding Tasks | {summary['overview']['total_outstanding']:,} |", f"| Overall Completion Rate | {summary['overview']['completion_rate']}% |", f"| P0 Critical Items | {summary['overview']['critical_items']} |", f"| P1 High Priority Items | {summary['overview']['high_priority_items']} |", "\n---\n", "## Key Findings\n", ]

for finding in summary['key_findings']:
lines.append(f"- ⚠️ {finding}")

lines.append("\n---\n")
lines.append("## Priority Breakdown\n")
lines.append("```")
lines.append(f"P0 Critical: {'█' * min(len(master['by_priority']['p0_critical']), 50)} ({len(master['by_priority']['p0_critical'])})")
lines.append(f"P1 High: {'█' * min(len(master['by_priority']['p1_high']) // 5, 50)} ({len(master['by_priority']['p1_high'])})")
lines.append(f"P2 Medium: {'█' * min(len(master['by_priority']['p2_medium']) // 10, 50)} ({len(master['by_priority']['p2_medium'])})")
lines.append(f"P3 Low: {'█' * min(len(master['by_priority']['p3_low']) // 10, 50)} ({len(master['by_priority']['p3_low'])})")
lines.append("```")

lines.append("\n---\n")
lines.append("## Recommendations\n")

for rec in summary['recommendations']:
lines.append(f"- 💡 {rec}")

lines.append("\n---\n")
lines.append("## Next Actions\n")

for action in summary['next_actions']:
lines.append(f"- {action}")

lines.append("\n---\n")
lines.append("## Report Files\n")
lines.append("- `stage1-date-chunks/` - Raw message extractions by date")
lines.append("- `stage2-llm-extractions/` - Task extractions from messages")
lines.append("- `stage3-categorized/` - Deduplicated, categorized tasks")
lines.append("- `stage4-gap-analysis/` - Cross-reference with tasklist files")
lines.append("- `stage5-master-list/` - Master outstanding tasks list")
lines.append("- `stage6-summary/` - This executive summary")

return '\n'.join(lines)

============================================================================

MAIN

============================================================================

def main(): parser = argparse.ArgumentParser(description="Task Extraction Pipeline") parser.add_argument('--stage', type=int, choices=[1, 2, 3, 4, 5, 6], help="Run specific stage") parser.add_argument('--all', action='store_true', help="Run all stages") parser.add_argument('--resume', action='store_true', help="Resume from last completed stage")

args = parser.parse_args()

print("\n" + "=" * 70)
print("TASK EXTRACTION PIPELINE")
print("=" * 70)
print(f"Output directory: {PIPELINE_DIR}")

stages = {
1: ("Extract messages by date", stage1_extract_by_date),
2: ("Analyze for tasks", stage2_llm_analyze),
3: ("Categorize and deduplicate", stage3_categorize),
4: ("Gap analysis", stage4_gap_analysis),
5: ("Generate master list", stage5_master_list),
6: ("Executive summary", stage6_summary),
}

if args.stage:
# Run single stage
name, func = stages[args.stage]
print(f"\nRunning Stage {args.stage}: {name}")
success = func()
sys.exit(0 if success else 1)

elif args.all or args.resume:
# Run all stages (or resume)
start_stage = 1

if args.resume:
# Find last completed stage
for i in range(6, 0, -1):
summary_file = STAGE_DIRS[i] / f"_stage{i}_summary.json"
if summary_file.exists() or (STAGE_DIRS[i] / "executive_summary.json").exists():
start_stage = i + 1
break
print(f"\nResuming from Stage {start_stage}")

for stage_num in range(start_stage, 7):
name, func = stages[stage_num]
print(f"\n{'=' * 70}")
print(f"STAGE {stage_num}: {name}")
print('=' * 70)

success = func()
if not success:
print(f"\n❌ Stage {stage_num} failed. Stopping pipeline.")
sys.exit(1)

print("\n" + "=" * 70)
print("✅ PIPELINE COMPLETE")
print("=" * 70)
print(f"\nAll reports saved to: {PIPELINE_DIR}")
print("\nKey outputs:")
print(f" - {STAGE_DIRS[5] / 'MASTER-OUTSTANDING-TASKS.md'}")
print(f" - {STAGE_DIRS[6] / 'EXECUTIVE-SUMMARY.md'}")

else:
parser.print_help()
print("\nExamples:")
print(" python3 scripts/task-extraction-pipeline.py --all")
print(" python3 scripts/task-extraction-pipeline.py --stage 1")
print(" python3 scripts/task-extraction-pipeline.py --resume")

if name == "main": main()