scripts-analyze-uncompleted-work
#!/usr/bin/env python3 """ā
title: "Configuration" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Analyze Uncompleted Work from Context Database" keywords: ['analysis', 'analyze', 'database', 'uncompleted', 'work'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "analyze-uncompleted-work.py" language: python executable: true usage: "python3 scripts/analyze-uncompleted-work.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: falseā
Analyze Uncompleted Work from Context Database
Queries the context database in overlapping date chunks going backwards to find all mentions of uncompleted work, pending tasks, blockers, and TODOs.
Output: JSON report cross-referencing what's mentioned vs what exists in tasklists. """
import subprocess import json import re import os from datetime import datetime, timedelta from pathlib import Path from collections import defaultdict
Configuration
CODITECT_CORE = Path(file).parent.parent
ADR-118: Use context-query.py (ADR-118 compliant) instead of deprecated context-db.py
CONTEXT_QUERY_SCRIPT = CODITECT_CORE / "scripts" / "context-query.py" OUTPUT_DIR = CODITECT_CORE / "reports" OUTPUT_DIR.mkdir(exist_ok=True)
Date range configuration
END_DATE = datetime.now() START_DATE = datetime(2025, 9, 1) # Go back to September 2025 CHUNK_DAYS = 14 # 2-week chunks OVERLAP_DAYS = 3 # 3-day overlap between chunks
Search patterns for uncompleted work
UNCOMPLETED_PATTERNS = [ # Direct task markers r'[ ]', r'TODO:?', r'FIXME:?', r'HACK:?', r'XXX:?', # Status indicators r'pending', r'in progress', r'not yet', r'not implemented', r'not complete', r'incomplete', r'unfinished', r'blocked', r'blocker', # Planning language r'need to', r'needs to', r'should be', r'must be', r'will need', r'planned', r'upcoming', r'future', # Phase/milestone indicators r'phase \d.*pending', r'phase \d.*planned', r'milestone.*pending', r'sprint.*pending', ]
Patterns to identify work item types
WORK_ITEM_PATTERNS = { 'agents': r'agent[s]?[\s:]+([a-z-]+)', 'commands': r'command[s]?[\s:/]+([a-z-]+)', 'skills': r'skill[s]?[\s:]+([a-z-]+)', 'workflows': r'workflow[s]?[\s:]+([a-z-]+)', 'features': r'feature[s]?[\s:]+(.+?)(?:.|,|$)', 'phases': r'phase\s+(\d+)', 'adrs': r'ADR[- ]?(\d+)', }
def run_context_query(since_date: str, until_date: str, search_term: str = None, limit: int = 500) -> str: """Run context-query.py query and return output (ADR-118 compliant).""" cmd = ["python3", str(CONTEXT_QUERY_SCRIPT)]
if search_term:
cmd.extend(["--search", search_term])
else:
cmd.append("--recent")
cmd.append(str(limit))
cmd.extend(["--since", since_date])
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
cwd=str(CODITECT_CORE)
)
return result.stdout + result.stderr
except subprocess.TimeoutExpired:
return f"TIMEOUT querying {since_date} to {until_date}"
except Exception as e:
return f"ERROR: {e}"
def generate_date_chunks(): """Generate overlapping date chunks going backwards.""" chunks = [] current_end = END_DATE
while current_end > START_DATE:
chunk_start = current_end - timedelta(days=CHUNK_DAYS)
if chunk_start < START_DATE:
chunk_start = START_DATE
chunks.append({
'start': chunk_start.strftime('%Y-%m-%d'),
'end': current_end.strftime('%Y-%m-%d'),
})
# Move back with overlap
current_end = chunk_start + timedelta(days=OVERLAP_DAYS)
return chunks
def extract_uncompleted_items(text: str) -> dict: """Extract mentions of uncompleted work from text.""" findings = defaultdict(list)
lines = text.split('\n')
for line in lines:
line_lower = line.lower()
# Check each uncompleted pattern
for pattern in UNCOMPLETED_PATTERNS:
if re.search(pattern, line_lower, re.IGNORECASE):
# Extract work item types mentioned
for item_type, item_pattern in WORK_ITEM_PATTERNS.items():
matches = re.findall(item_pattern, line_lower)
for match in matches:
clean_match = match.strip()
if clean_match and len(clean_match) > 2:
findings[item_type].append({
'name': clean_match,
'context': line[:200].strip(),
'pattern_matched': pattern
})
# Also capture general uncompleted mentions
if '[ ]' in line or 'TODO' in line.upper():
findings['tasks'].append({
'context': line[:200].strip(),
'pattern_matched': pattern
})
break
return dict(findings)
def scan_tasklist_files() -> dict: """Scan all tasklist files for pending items.""" tasklist_summary = {}
# Find all tasklist files
for md_file in CODITECT_CORE.rglob("*.md"):
if 'node_modules' in str(md_file):
continue
try:
content = md_file.read_text(encoding='utf-8', errors='ignore')
pending = len(re.findall(r'\[ \]', content))
completed = len(re.findall(r'\[x\]', content, re.IGNORECASE))
if pending > 0:
rel_path = str(md_file.relative_to(CODITECT_CORE))
tasklist_summary[rel_path] = {
'pending': pending,
'completed': completed,
'total': pending + completed,
'completion_rate': round(completed / (pending + completed) * 100, 1) if (pending + completed) > 0 else 0
}
except Exception as e:
continue
return tasklist_summary
def analyze_context_by_chunks(): """Main analysis function - query context DB in chunks.""" print("=" * 60) print("UNCOMPLETED WORK ANALYSIS") print(f"Analyzing from {START_DATE.strftime('%Y-%m-%d')} to {END_DATE.strftime('%Y-%m-%d')}") print("=" * 60)
all_findings = defaultdict(list)
chunk_summaries = []
# Generate date chunks
chunks = generate_date_chunks()
print(f"\nGenerated {len(chunks)} date chunks with {OVERLAP_DAYS}-day overlap")
# Search terms focused on uncompleted work
search_terms = [
"pending",
"TODO",
"blocked",
"not implemented",
"in progress",
"planned",
"phase",
"milestone",
]
for i, chunk in enumerate(chunks):
print(f"\n[{i+1}/{len(chunks)}] Scanning {chunk['start']} to {chunk['end']}...")
chunk_findings = defaultdict(list)
for term in search_terms:
output = run_context_query(chunk['start'], chunk['end'], term, limit=200)
findings = extract_uncompleted_items(output)
for category, items in findings.items():
chunk_findings[category].extend(items)
all_findings[category].extend(items)
# Summarize chunk
chunk_summary = {
'date_range': f"{chunk['start']} to {chunk['end']}",
'findings_count': sum(len(v) for v in chunk_findings.values()),
'categories': {k: len(v) for k, v in chunk_findings.items()}
}
chunk_summaries.append(chunk_summary)
if chunk_summary['findings_count'] > 0:
print(f" Found {chunk_summary['findings_count']} mentions: {chunk_summary['categories']}")
return all_findings, chunk_summaries
def deduplicate_findings(findings: dict) -> dict: """Remove duplicate findings based on context.""" deduped = {}
for category, items in findings.items():
seen_contexts = set()
unique_items = []
for item in items:
# Create a normalized key for deduplication
context = item.get('context', item.get('name', ''))
context_key = re.sub(r'\s+', ' ', context.lower()[:100])
if context_key not in seen_contexts:
seen_contexts.add(context_key)
unique_items.append(item)
deduped[category] = unique_items
return deduped
def generate_report(findings: dict, chunk_summaries: list, tasklist_summary: dict) -> dict: """Generate comprehensive report."""
# Deduplicate findings
findings = deduplicate_findings(findings)
report = {
'generated_at': datetime.now().isoformat(),
'date_range': {
'start': START_DATE.strftime('%Y-%m-%d'),
'end': END_DATE.strftime('%Y-%m-%d'),
},
'summary': {
'total_context_mentions': sum(len(v) for v in findings.values()),
'total_tasklist_pending': sum(t['pending'] for t in tasklist_summary.values()),
'total_tasklist_completed': sum(t['completed'] for t in tasklist_summary.values()),
'files_with_pending_tasks': len(tasklist_summary),
},
'context_findings_by_category': {
category: {
'count': len(items),
'unique_items': list(set(
item.get('name', item.get('context', '')[:50])
for item in items
))[:20] # Top 20 unique
}
for category, items in findings.items()
},
'top_pending_tasklists': sorted(
[
{'file': k, **v}
for k, v in tasklist_summary.items()
],
key=lambda x: x['pending'],
reverse=True
)[:25],
'chunk_analysis': chunk_summaries,
'cross_reference': {
'mentioned_but_may_be_incomplete': [],
'high_pending_areas': [],
}
}
# Identify high-pending areas
for file_info in report['top_pending_tasklists'][:10]:
if file_info['pending'] > 100:
report['cross_reference']['high_pending_areas'].append({
'file': file_info['file'],
'pending': file_info['pending'],
'completion_rate': file_info['completion_rate']
})
# Cross-reference context mentions with tasklist files
for category, data in report['context_findings_by_category'].items():
for item in data['unique_items'][:5]:
report['cross_reference']['mentioned_but_may_be_incomplete'].append({
'category': category,
'item': item,
})
return report
def main(): """Main execution.""" print("\n" + "=" * 60) print("CODITECT UNCOMPLETED WORK ANALYZER") print("=" * 60)
# Step 1: Scan tasklist files
print("\n[1/3] Scanning tasklist files...")
tasklist_summary = scan_tasklist_files()
print(f" Found {len(tasklist_summary)} files with pending tasks")
total_pending = sum(t['pending'] for t in tasklist_summary.values())
total_completed = sum(t['completed'] for t in tasklist_summary.values())
print(f" Total: {total_pending} pending, {total_completed} completed")
# Step 2: Analyze context database
print("\n[2/3] Analyzing context database by date chunks...")
findings, chunk_summaries = analyze_context_by_chunks()
# Step 3: Generate report
print("\n[3/3] Generating report...")
report = generate_report(findings, chunk_summaries, tasklist_summary)
# Save report
timestamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
report_file = OUTPUT_DIR / f"uncompleted-work-analysis-{timestamp}.json"
with open(report_file, 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\nā
Report saved to: {report_file}")
# Print summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"\nTotal pending tasks in tasklists: {report['summary']['total_tasklist_pending']}")
print(f"Total completed tasks: {report['summary']['total_tasklist_completed']}")
print(f"Files with pending work: {report['summary']['files_with_pending_tasks']}")
print(f"Context mentions of uncompleted work: {report['summary']['total_context_mentions']}")
print("\nš TOP 15 FILES BY PENDING TASKS:")
print("-" * 60)
for i, item in enumerate(report['top_pending_tasklists'][:15], 1):
print(f"{i:2}. {item['pending']:4} pending | {item['completion_rate']:5.1f}% done | {item['file'][:60]}")
print("\nš CONTEXT FINDINGS BY CATEGORY:")
print("-" * 60)
for category, data in report['context_findings_by_category'].items():
print(f" {category}: {data['count']} mentions")
print("\nā ļø HIGH PENDING AREAS (>100 tasks):")
print("-" * 60)
for area in report['cross_reference']['high_pending_areas']:
print(f" ⢠{area['file']}: {area['pending']} pending ({area['completion_rate']}% done)")
return report
if name == "main": main()