scripts-skill-health-tracker
#!/usr/bin/env python3 """ā
title: Skill Health Tracker component_type: script version: 1.0.0 audience: contributor status: active summary: Track skill health scores over time with rolling averages and trend analysis keywords:
- health
- tracking
- skills
- trends
- monitoring tokens: ~2500 created: 2026-01-11 updated: 2026-01-11 script_name: skill-health-tracker.py language: python executable: true usage: python3 scripts/skill-health-tracker.py [options] python_version: 3.10+ dependencies: [] modifies_files: true network_access: false requires_auth: false
Skill Health Tracker for CODITECT-core (F.5.6.1)
Tracks historical skill health scores with:
- Per-skill score history across sessions
- Rolling averages (7-day, 30-day)
- Trend detection (improving/stable/degrading)
- Baseline comparison for regression detection
- Health status dashboard
Usage:
python3 scripts/skill-health-tracker.py --dashboard
python3 scripts/skill-health-tracker.py --skill
import argparse import json import sys from collections import defaultdict from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
class SkillHealthTracker: """Track and analyze skill health over time."""
def __init__(self, coditect_root: Path):
self.root = coditect_root
# ADR-114 & ADR-118: Use centralized path discovery
try:
sys.path.insert(0, str(self.root / "scripts" / "core"))
from paths import get_context_storage_dir, CONTEXT_STORAGE
self.context_dir = get_context_storage_dir()
except ImportError:
# Fallback for backward compatibility
_user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if _user_data.exists():
self.context_dir = _user_data
else:
self.context_dir = self.root / "context-storage"
# Skill health data goes to org.db location (Tier 2)
self.health_file = self.context_dir / "skill-health.json"
self.learnings_file = self.context_dir / "skill-learnings.json"
self.baseline_dir = self.context_dir / "baselines"
# Ensure directories exist
self.context_dir.mkdir(exist_ok=True, parents=True)
self.baseline_dir.mkdir(exist_ok=True)
def load_health_data(self) -> Dict[str, Any]:
"""Load skill health tracking data."""
if self.health_file.exists():
with open(self.health_file, 'r') as f:
return json.load(f)
return {
"version": "1.0.0",
"last_updated": None,
"skills": {}
}
def save_health_data(self, data: Dict[str, Any]) -> None:
"""Save skill health tracking data."""
data["last_updated"] = datetime.now(timezone.utc).isoformat()
with open(self.health_file, 'w') as f:
json.dump(data, f, indent=2)
def load_learnings(self) -> Dict[str, Any]:
"""Load skill learnings database."""
if self.learnings_file.exists():
with open(self.learnings_file, 'r') as f:
return json.load(f)
return {"sessions": [], "skill_history": {}, "anti_pattern_trends": {}}
def calculate_health_score(self, history: Dict[str, Any]) -> int:
"""Calculate health score for a skill based on its history."""
total = history.get("total_invocations", 0)
if total == 0:
return 50 # Default for untested skills
# Components of health score
success_count = history.get("success_count", 0)
failed_count = history.get("failed_count", 0)
success_rate = success_count / total if total > 0 else 0.5
error_penalty = min(len(history.get("common_errors", [])) * 0.05, 0.25)
# Recent score trend
scores = history.get("score_history", [])
if scores:
recent_scores = [s["score"] for s in scores[-5:]]
trend_score = sum(recent_scores) / len(recent_scores) / 100
else:
trend_score = 0.5
# Weighted calculation
health_score = (
success_rate * 0.5 +
trend_score * 0.3 +
(1 - error_penalty) * 0.2
)
return min(100, max(0, int(health_score * 100)))
def calculate_trend(self, scores: List[Dict]) -> str:
"""Determine trend direction from score history."""
if len(scores) < 3:
return "insufficient_data"
recent = [s["score"] for s in scores[-3:]]
earlier = [s["score"] for s in scores[-6:-3]] if len(scores) >= 6 else [s["score"] for s in scores[:3]]
recent_avg = sum(recent) / len(recent)
earlier_avg = sum(earlier) / len(earlier)
delta = recent_avg - earlier_avg
if delta > 5:
return "improving"
elif delta < -5:
return "degrading"
else:
return "stable"
def calculate_rolling_average(self, scores: List[Dict], days: int) -> Optional[float]:
"""Calculate rolling average for specified number of days."""
if not scores:
return None
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
recent_scores = []
for s in scores:
try:
score_date = datetime.fromisoformat(s["date"].replace("Z", "+00:00"))
if score_date >= cutoff:
recent_scores.append(s["score"])
except (KeyError, ValueError):
continue
if not recent_scores:
# Use most recent scores if no dates in range
recent_scores = [s["score"] for s in scores[-min(len(scores), days):]]
return round(sum(recent_scores) / len(recent_scores), 1) if recent_scores else None
def record_current_scores(self) -> Dict[str, Any]:
"""Record current health scores from skill learnings."""
learnings = self.load_learnings()
health_data = self.load_health_data()
now = datetime.now(timezone.utc).isoformat()
recorded = 0
for skill_name, history in learnings.get("skill_history", {}).items():
score = self.calculate_health_score(history)
if skill_name not in health_data["skills"]:
health_data["skills"][skill_name] = {
"scores": [],
"rolling_7d": None,
"rolling_30d": None,
"trend": "stable",
"first_tracked": now,
"total_invocations": 0
}
skill_data = health_data["skills"][skill_name]
# Add new score
skill_data["scores"].append({
"date": now,
"score": score,
"invocations": history.get("total_invocations", 0)
})
# Keep last 90 days of scores
skill_data["scores"] = skill_data["scores"][-90:]
# Update rolling averages
skill_data["rolling_7d"] = self.calculate_rolling_average(skill_data["scores"], 7)
skill_data["rolling_30d"] = self.calculate_rolling_average(skill_data["scores"], 30)
# Update trend
skill_data["trend"] = self.calculate_trend(skill_data["scores"])
skill_data["total_invocations"] = history.get("total_invocations", 0)
skill_data["last_updated"] = now
recorded += 1
self.save_health_data(health_data)
return {
"recorded": recorded,
"timestamp": now,
"file": str(self.health_file)
}
def create_baseline(self, name: Optional[str] = None) -> Path:
"""Create baseline snapshot of current health scores."""
if name is None:
name = datetime.now().strftime("%Y-%m-%d")
baseline_file = self.baseline_dir / f"skill-baseline-{name}.json"
health_data = self.load_health_data()
baseline = {
"version": "1.0.0",
"created": datetime.now(timezone.utc).isoformat(),
"name": name,
"skills": {}
}
for skill_name, data in health_data.get("skills", {}).items():
if data["scores"]:
latest = data["scores"][-1]
baseline["skills"][skill_name] = {
"score": latest["score"],
"invocations": latest.get("invocations", 0),
"date": latest["date"],
"trend": data.get("trend", "stable")
}
with open(baseline_file, 'w') as f:
json.dump(baseline, f, indent=2)
return baseline_file
def get_latest_baseline(self) -> Optional[Dict[str, Any]]:
"""Get most recent baseline snapshot."""
baselines = sorted(self.baseline_dir.glob("skill-baseline-*.json"), reverse=True)
if baselines:
with open(baselines[0], 'r') as f:
return json.load(f)
return None
def compare_to_baseline(self, baseline: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Compare current health to baseline."""
if baseline is None:
baseline = self.get_latest_baseline()
if baseline is None:
return {"error": "No baseline found", "regressions": [], "improvements": []}
health_data = self.load_health_data()
regressions = []
improvements = []
stable = []
for skill_name, baseline_data in baseline.get("skills", {}).items():
if skill_name not in health_data.get("skills", {}):
continue
current = health_data["skills"][skill_name]
if not current["scores"]:
continue
current_score = current["scores"][-1]["score"]
baseline_score = baseline_data["score"]
delta = current_score - baseline_score
record = {
"skill": skill_name,
"baseline_score": baseline_score,
"current_score": current_score,
"delta": delta,
"delta_percent": round(delta / baseline_score * 100, 1) if baseline_score > 0 else 0
}
if delta < -10: # >10% regression
regressions.append(record)
elif delta > 10: # >10% improvement
improvements.append(record)
else:
stable.append(record)
return {
"baseline_name": baseline.get("name", "unknown"),
"baseline_date": baseline.get("created"),
"regressions": sorted(regressions, key=lambda x: x["delta"]),
"improvements": sorted(improvements, key=lambda x: x["delta"], reverse=True),
"stable_count": len(stable),
"total_compared": len(regressions) + len(improvements) + len(stable)
}
def get_skill_health(self, skill_name: str) -> Optional[Dict[str, Any]]:
"""Get detailed health info for a specific skill."""
health_data = self.load_health_data()
if skill_name not in health_data.get("skills", {}):
return None
skill = health_data["skills"][skill_name]
# Get status label
if skill["scores"]:
current = skill["scores"][-1]["score"]
if current >= 90:
status = "excellent"
elif current >= 70:
status = "good"
elif current >= 50:
status = "needs_work"
else:
status = "critical"
else:
current = None
status = "unknown"
return {
"skill": skill_name,
"current_score": current,
"status": status,
"rolling_7d": skill.get("rolling_7d"),
"rolling_30d": skill.get("rolling_30d"),
"trend": skill.get("trend", "unknown"),
"total_invocations": skill.get("total_invocations", 0),
"score_history": skill.get("scores", [])[-10:], # Last 10 entries
"first_tracked": skill.get("first_tracked"),
"last_updated": skill.get("last_updated")
}
def get_dashboard_data(self) -> Dict[str, Any]:
"""Get dashboard summary data."""
health_data = self.load_health_data()
summary = {
"total_skills": 0,
"excellent": 0,
"good": 0,
"needs_work": 0,
"critical": 0,
"improving": 0,
"stable": 0,
"degrading": 0,
"avg_health": 0,
"top_performers": [],
"needs_attention": [],
"recently_improved": [],
"recently_degraded": []
}
all_scores = []
skill_list = []
for skill_name, data in health_data.get("skills", {}).items():
if not data.get("scores"):
continue
current = data["scores"][-1]["score"]
all_scores.append(current)
skill_info = {
"skill": skill_name,
"score": current,
"trend": data.get("trend", "stable"),
"rolling_7d": data.get("rolling_7d"),
"invocations": data.get("total_invocations", 0)
}
skill_list.append(skill_info)
# Count by status
if current >= 90:
summary["excellent"] += 1
elif current >= 70:
summary["good"] += 1
elif current >= 50:
summary["needs_work"] += 1
else:
summary["critical"] += 1
# Count by trend
trend = data.get("trend", "stable")
if trend == "improving":
summary["improving"] += 1
elif trend == "degrading":
summary["degrading"] += 1
else:
summary["stable"] += 1
summary["total_skills"] = len(skill_list)
summary["avg_health"] = round(sum(all_scores) / len(all_scores), 1) if all_scores else 0
# Sort for top/bottom lists
sorted_by_score = sorted(skill_list, key=lambda x: x["score"], reverse=True)
summary["top_performers"] = sorted_by_score[:5]
summary["needs_attention"] = [s for s in sorted_by_score if s["score"] < 50][:5]
summary["recently_improved"] = [s for s in skill_list if s["trend"] == "improving"][:5]
summary["recently_degraded"] = [s for s in skill_list if s["trend"] == "degrading"][:5]
return summary
def print_dashboard(self):
"""Print skill health dashboard to console."""
dashboard = self.get_dashboard_data()
print("\n" + "="*70)
print(" SKILL HEALTH TRACKER")
print("="*70)
# Summary stats
print(f"\nš Overview: {dashboard['total_skills']} skills tracked")
print(f" Average Health: {dashboard['avg_health']}%")
print()
print(f" š¢ Excellent (90%+): {dashboard['excellent']}")
print(f" š” Good (70-89%): {dashboard['good']}")
print(f" š Needs Work (50-69%): {dashboard['needs_work']}")
print(f" š“ Critical (<50%): {dashboard['critical']}")
# Trends
print(f"\nš Trends:")
print(f" ā Improving: {dashboard['improving']}")
print(f" ā Stable: {dashboard['stable']}")
print(f" ā Degrading: {dashboard['degrading']}")
# Top performers
if dashboard["top_performers"]:
print("\nš Top Performers:")
for s in dashboard["top_performers"]:
trend_icon = {"improving": "ā", "stable": "ā", "degrading": "ā"}.get(s["trend"], "?")
print(f" {s['skill']}: {s['score']}% {trend_icon}")
# Needs attention
if dashboard["needs_attention"]:
print("\nā ļø Needs Attention:")
for s in dashboard["needs_attention"]:
trend_icon = {"improving": "ā", "stable": "ā", "degrading": "ā"}.get(s["trend"], "?")
print(f" {s['skill']}: {s['score']}% {trend_icon}")
# Recently degraded
if dashboard["recently_degraded"]:
print("\nš Recently Degraded:")
for s in dashboard["recently_degraded"]:
print(f" {s['skill']}: {s['score']}% ā")
print("\n" + "="*70)
print("Run with --skill <name> for details | --record to update | --baseline to snapshot")
print("="*70 + "\n")
def print_skill_detail(self, skill_name: str):
"""Print detailed info for a specific skill."""
info = self.get_skill_health(skill_name)
if not info:
print(f"\nā Skill '{skill_name}' not found in health tracking")
return
status_icons = {
"excellent": "š¢",
"good": "š”",
"needs_work": "š ",
"critical": "š“",
"unknown": "āŖ"
}
trend_icons = {
"improving": "ā",
"stable": "ā",
"degrading": "ā"
}
print(f"\n{'='*60}")
print(f" Skill: {info['skill']}")
print(f"{'='*60}")
print(f"\n Current Score: {info['current_score']}% {status_icons.get(info['status'], '')}")
print(f" Status: {info['status'].upper()}")
print(f" Trend: {info['trend']} {trend_icons.get(info['trend'], '')}")
print(f"\n Rolling Averages:")
print(f" 7-day: {info['rolling_7d']}%" if info['rolling_7d'] else " 7-day: N/A")
print(f" 30-day: {info['rolling_30d']}%" if info['rolling_30d'] else " 30-day: N/A")
print(f"\n Invocations: {info['total_invocations']}")
print(f" First Tracked: {info['first_tracked'][:10] if info['first_tracked'] else 'N/A'}")
print(f" Last Updated: {info['last_updated'][:10] if info['last_updated'] else 'N/A'}")
if info['score_history']:
print(f"\n Recent Scores:")
for entry in info['score_history'][-5:]:
date = entry['date'][:10] if 'date' in entry else 'N/A'
print(f" {date}: {entry['score']}%")
print(f"\n{'='*60}\n")
def export_data(self, output_path: Optional[Path] = None) -> Path:
"""Export health data with analysis."""
if output_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = self.context_dir / "reports" / f"skill-health-export-{timestamp}.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
health_data = self.load_health_data()
dashboard = self.get_dashboard_data()
comparison = self.compare_to_baseline()
export = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"summary": dashboard,
"baseline_comparison": comparison,
"skills": health_data.get("skills", {})
}
with open(output_path, 'w') as f:
json.dump(export, f, indent=2)
return output_path
def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Skill Health Tracker for CODITECT-core", formatter_class=argparse.RawDescriptionHelpFormatter )
parser.add_argument(
"--dashboard",
action="store_true",
help="Show health dashboard (default)"
)
parser.add_argument(
"--skill",
metavar="NAME",
help="Show details for specific skill"
)
parser.add_argument(
"--record",
action="store_true",
help="Record current scores from learnings"
)
parser.add_argument(
"--baseline",
action="store_true",
help="Create baseline snapshot"
)
parser.add_argument(
"--baseline-name",
metavar="NAME",
help="Name for baseline snapshot"
)
parser.add_argument(
"--compare",
action="store_true",
help="Compare to latest baseline"
)
parser.add_argument(
"--export",
action="store_true",
help="Export full health data"
)
parser.add_argument(
"--output",
metavar="PATH",
help="Output path for export"
)
parser.add_argument(
"--root",
default=".",
help="CODITECT root directory"
)
parser.add_argument(
"--json",
action="store_true",
help="Output as JSON"
)
# ADR-159: Project scoping
parser.add_argument(
"--project",
default=None,
help="Project scope for skill health analysis (ADR-159). Auto-detected from $CODITECT_PROJECT."
)
args = parser.parse_args()
# ADR-159: Resolve project scope
import os
project_id = args.project or os.environ.get('CODITECT_PROJECT')
if not project_id:
try:
sys.path.insert(0, str(Path(__file__).parent / "core"))
from scope import resolve_scope
scope = resolve_scope()
project_id = scope.project
except ImportError:
pass
if project_id:
os.environ['CODITECT_PROJECT'] = project_id
root = Path(args.root).resolve()
tracker = SkillHealthTracker(root)
try:
if args.record:
result = tracker.record_current_scores()
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"\nā
Recorded health scores for {result['recorded']} skills")
print(f" File: {result['file']}")
elif args.baseline:
baseline_file = tracker.create_baseline(args.baseline_name)
print(f"\nā
Baseline created: {baseline_file}")
elif args.compare:
comparison = tracker.compare_to_baseline()
if args.json:
print(json.dumps(comparison, indent=2))
else:
print(f"\nš Comparison to baseline: {comparison.get('baseline_name', 'N/A')}")
print(f" Regressions: {len(comparison['regressions'])}")
print(f" Improvements: {len(comparison['improvements'])}")
print(f" Stable: {comparison['stable_count']}")
if comparison['regressions']:
print("\nā ļø Regressions (>10% drop):")
for r in comparison['regressions'][:5]:
print(f" {r['skill']}: {r['baseline_score']}% ā {r['current_score']}% ({r['delta']:+d})")
if comparison['improvements']:
print("\n⨠Improvements (>10% gain):")
for i in comparison['improvements'][:5]:
print(f" {i['skill']}: {i['baseline_score']}% ā {i['current_score']}% ({i['delta']:+d})")
elif args.skill:
if args.json:
info = tracker.get_skill_health(args.skill)
print(json.dumps(info, indent=2))
else:
tracker.print_skill_detail(args.skill)
elif args.export:
output = Path(args.output) if args.output else None
export_path = tracker.export_data(output)
print(f"\nā
Exported to: {export_path}")
else:
# Default: show dashboard
if args.json:
print(json.dumps(tracker.get_dashboard_data(), indent=2))
else:
tracker.print_dashboard()
except Exception as e:
print(f"\nā Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if name == "main": main()