scripts-skill-pattern-analyzer
#!/usr/bin/env python3 """ā
title: Skill Pattern Analyzer component_type: script version: 1.0.0 audience: contributor status: active summary: Analyze skill invocation patterns to identify success/failure trends and optimization opportunities keywords:
- patterns
- analysis
- skills
- optimization
- learning tokens: ~2000 created: 2025-01-01 updated: 2025-01-01 script_name: skill-pattern-analyzer.py language: python executable: true usage: python3 scripts/skill-pattern-analyzer.py [options] python_version: 3.10+ dependencies: [] modifies_files: true network_access: false requires_auth: false
Skill Pattern Analyzer for CODITECT-core
Analyzes skill invocation patterns across sessions to identify:
- Success patterns - what contexts lead to successful outcomes
- Failure patterns - what contexts lead to failures
- Anti-patterns - recurring problematic behaviors
- Optimization opportunities - skills that need improvement
- Trend analysis - performance changes over time
Usage: python3 scripts/skill-pattern-analyzer.py --analyze python3 scripts/skill-pattern-analyzer.py --trends python3 scripts/skill-pattern-analyzer.py --recommendations python3 scripts/skill-pattern-analyzer.py --export-report """
import argparse import json import sqlite3 import sys from collections import defaultdict from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple
class SkillPatternAnalyzer: """Analyze skill patterns for optimization."""
def __init__(self, coditect_root: Path):
self.root = coditect_root
self.skills_dir = self.root / "skills"
# ADR-114 & ADR-118: Use centralized path discovery for user data
try:
import sys
sys.path.insert(0, str(self.root / "scripts" / "core"))
from paths import (
get_context_storage_dir,
get_org_db_path,
ORG_DB,
)
self.context_dir = get_context_storage_dir()
self.db_path = ORG_DB # skill_learnings is TIER 2
except ImportError:
# Fallback for backward compatibility
_user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if _user_data.exists():
self.context_dir = _user_data
else:
self.context_dir = self.root / "context-storage"
self.db_path = self.context_dir / "org.db"
self.learnings_file = self.context_dir / "skill-learnings.json"
self.patterns_file = self.context_dir / "skill-patterns.json"
self.report_dir = self.context_dir / "reports"
self.report_dir.mkdir(exist_ok=True)
def load_learnings(self) -> Dict[str, Any]:
"""Load skill learnings database."""
if self.learnings_file.exists():
with open(self.learnings_file, 'r') as f:
return json.load(f)
return {"sessions": [], "skill_history": {}, "anti_pattern_trends": {}}
def analyze_success_patterns(self) -> Dict[str, List[Dict]]:
"""Identify patterns that lead to successful outcomes."""
learnings = self.load_learnings()
success_patterns = defaultdict(list)
for skill_name, history in learnings.get("skill_history", {}).items():
if history["success_count"] > 0:
success_rate = history["success_count"] / history["total_invocations"]
if success_rate >= 0.8:
success_patterns[skill_name].append({
"pattern": "high_success_rate",
"rate": round(success_rate, 2),
"invocations": history["total_invocations"],
"insight": f"Skill is reliable with {success_rate*100:.0f}% success rate"
})
# Analyze score trends
scores = history.get("score_history", [])
if len(scores) >= 3:
recent_scores = [s["score"] for s in scores[-3:]]
if all(s >= 80 for s in recent_scores):
success_patterns[skill_name].append({
"pattern": "consistent_performance",
"recent_scores": recent_scores,
"insight": "Skill maintains consistent high performance"
})
return dict(success_patterns)
def analyze_failure_patterns(self) -> Dict[str, List[Dict]]:
"""Identify patterns that lead to failures."""
learnings = self.load_learnings()
failure_patterns = defaultdict(list)
for skill_name, history in learnings.get("skill_history", {}).items():
total = history["total_invocations"]
if total == 0:
continue
failure_rate = history["failed_count"] / total
# High failure rate
if failure_rate >= 0.3:
failure_patterns[skill_name].append({
"pattern": "high_failure_rate",
"rate": round(failure_rate, 2),
"severity": "critical" if failure_rate >= 0.5 else "warning",
"insight": f"Skill fails {failure_rate*100:.0f}% of the time"
})
# Recurring errors
if history.get("common_errors"):
for error in history["common_errors"][:3]:
failure_patterns[skill_name].append({
"pattern": "recurring_error",
"error": error,
"severity": "warning",
"insight": f"Error recurs: {error[:100]}"
})
# Declining performance
scores = history.get("score_history", [])
if len(scores) >= 5:
early_avg = sum(s["score"] for s in scores[:3]) / 3
recent_avg = sum(s["score"] for s in scores[-3:]) / 3
if recent_avg < early_avg * 0.8:
failure_patterns[skill_name].append({
"pattern": "declining_performance",
"early_avg": round(early_avg, 1),
"recent_avg": round(recent_avg, 1),
"severity": "warning",
"insight": f"Performance dropped from {early_avg:.0f}% to {recent_avg:.0f}%"
})
return dict(failure_patterns)
def analyze_anti_patterns(self) -> Dict[str, Dict]:
"""Analyze anti-pattern trends."""
learnings = self.load_learnings()
anti_patterns = learnings.get("anti_pattern_trends", {})
analysis = {}
for pattern_name, data in anti_patterns.items():
history = data.get("history", [])
total = data.get("total_occurrences", 0)
if total == 0:
continue
# Calculate trend
if len(history) >= 3:
early = sum(h["occurrences"] for h in history[:len(history)//2])
recent = sum(h["occurrences"] for h in history[len(history)//2:])
if recent > early:
trend = "increasing"
elif recent < early * 0.5:
trend = "decreasing"
else:
trend = "stable"
else:
trend = "insufficient_data"
analysis[pattern_name] = {
"total_occurrences": total,
"sessions_detected": data.get("sessions_detected", 0),
"trend": trend,
"severity": "critical" if total > 10 else "warning" if total > 5 else "info",
"recommendation": self._get_anti_pattern_recommendation(pattern_name)
}
return analysis
def _get_anti_pattern_recommendation(self, pattern_name: str) -> str:
"""Get recommendation for anti-pattern."""
recommendations = {
"excessive_retries": "Add clearer examples and pre-conditions to skills",
"context_confusion": "Narrow skill scope and add explicit boundaries",
"tool_misuse": "Improve 'When to Use' triggers in skills",
"incomplete_output": "Add completion checklists to skills",
"hallucination_risk": "Add verification steps and source requirements"
}
return recommendations.get(pattern_name, "Review and update affected skills")
def calculate_skill_health(self) -> Dict[str, Dict]:
"""Calculate overall health score for each skill."""
learnings = self.load_learnings()
health_scores = {}
for skill_name, history in learnings.get("skill_history", {}).items():
total = history["total_invocations"]
if total == 0:
continue
# Calculate component scores
success_rate = history["success_count"] / total
error_penalty = min(len(history.get("common_errors", [])) * 0.05, 0.25)
# Score trend
scores = history.get("score_history", [])
if scores:
trend_score = scores[-1]["score"] / 100
else:
trend_score = 0.5
# Calculate weighted health score
health_score = (
success_rate * 0.5 +
trend_score * 0.3 +
(1 - error_penalty) * 0.2
)
health_scores[skill_name] = {
"health_score": round(health_score * 100),
"success_rate": round(success_rate * 100),
"trend_score": round(trend_score * 100),
"error_penalty": round(error_penalty * 100),
"total_invocations": total,
"status": self._get_health_status(health_score)
}
return health_scores
def _get_health_status(self, score: float) -> str:
"""Get health status label."""
if score >= 0.9:
return "excellent"
elif score >= 0.7:
return "good"
elif score >= 0.5:
return "needs_work"
else:
return "critical"
def generate_recommendations(self) -> List[Dict[str, Any]]:
"""Generate prioritized improvement recommendations."""
recommendations = []
# Analyze patterns
failure_patterns = self.analyze_failure_patterns()
anti_patterns = self.analyze_anti_patterns()
health_scores = self.calculate_skill_health()
# 1. Critical skills (health < 50%)
for skill_name, health in health_scores.items():
if health["status"] == "critical":
recommendations.append({
"priority": "P0",
"type": "critical_skill",
"skill": skill_name,
"health_score": health["health_score"],
"action": "Major revision required",
"details": [
"Review all recent failures",
"Narrow scope significantly",
"Add comprehensive examples",
"Consider splitting into smaller skills"
]
})
# 2. Skills needing work (health 50-70%)
for skill_name, health in health_scores.items():
if health["status"] == "needs_work":
failures = failure_patterns.get(skill_name, [])
recommendations.append({
"priority": "P1",
"type": "skill_improvement",
"skill": skill_name,
"health_score": health["health_score"],
"action": "Targeted improvements needed",
"details": [
f"Address failure patterns: {[f['pattern'] for f in failures]}" if failures else "General optimization needed",
"Add more specific triggers",
"Include error handling examples"
]
})
# 3. Anti-pattern fixes
for pattern_name, data in anti_patterns.items():
if data["severity"] in ["critical", "warning"]:
recommendations.append({
"priority": "P1" if data["severity"] == "critical" else "P2",
"type": "anti_pattern",
"pattern": pattern_name,
"occurrences": data["total_occurrences"],
"trend": data["trend"],
"action": data["recommendation"]
})
# 4. Declining performance alerts
for skill_name, patterns in failure_patterns.items():
for pattern in patterns:
if pattern["pattern"] == "declining_performance":
recommendations.append({
"priority": "P2",
"type": "performance_decline",
"skill": skill_name,
"action": "Investigate recent changes",
"details": [
f"Score dropped: {pattern['early_avg']:.0f}% -> {pattern['recent_avg']:.0f}%",
"Check for context changes",
"Review recent skill modifications"
]
})
# Sort by priority
priority_order = {"P0": 0, "P1": 1, "P2": 2, "P3": 3}
recommendations.sort(key=lambda x: priority_order.get(x["priority"], 99))
return recommendations
def analyze_trends(self, days: int = 30) -> Dict[str, Any]:
"""Analyze trends over time period."""
learnings = self.load_learnings()
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
trends = {
"period_days": days,
"session_count": 0,
"skill_usage": defaultdict(int),
"success_trend": [],
"anti_pattern_trend": [],
"top_improved": [],
"top_declined": []
}
# Count sessions in period
for session in learnings.get("sessions", []):
session_date = datetime.fromisoformat(
session["analyzed_at"].replace("Z", "+00:00")
)
if session_date >= cutoff:
trends["session_count"] += 1
# Analyze skill trends
skill_trends = []
for skill_name, history in learnings.get("skill_history", {}).items():
trends["skill_usage"][skill_name] = history["total_invocations"]
scores = history.get("score_history", [])
if len(scores) >= 2:
first = scores[0]["score"]
last = scores[-1]["score"]
change = last - first
skill_trends.append({
"skill": skill_name,
"first_score": first,
"last_score": last,
"change": change,
"direction": "improved" if change > 5 else "declined" if change < -5 else "stable"
})
# Top improved and declined
skill_trends.sort(key=lambda x: x["change"], reverse=True)
trends["top_improved"] = [s for s in skill_trends[:5] if s["change"] > 0]
trends["top_declined"] = [s for s in skill_trends[-5:] if s["change"] < 0][::-1]
# Anti-pattern trend
for pattern_name, data in learnings.get("anti_pattern_trends", {}).items():
history = data.get("history", [])
if history:
trends["anti_pattern_trend"].append({
"pattern": pattern_name,
"recent_count": sum(h["occurrences"] for h in history[-5:]),
"total": data["total_occurrences"]
})
return dict(trends)
def export_report(self, output_path: Optional[Path] = None) -> Path:
"""Export comprehensive analysis report."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if output_path is None:
output_path = self.report_dir / f"skill-analysis-{timestamp}.json"
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"summary": {},
"success_patterns": self.analyze_success_patterns(),
"failure_patterns": self.analyze_failure_patterns(),
"anti_patterns": self.analyze_anti_patterns(),
"health_scores": self.calculate_skill_health(),
"recommendations": self.generate_recommendations(),
"trends": self.analyze_trends()
}
# Calculate summary
health_scores = report["health_scores"]
report["summary"] = {
"total_skills_tracked": len(health_scores),
"excellent_skills": sum(1 for h in health_scores.values() if h["status"] == "excellent"),
"good_skills": sum(1 for h in health_scores.values() if h["status"] == "good"),
"needs_work_skills": sum(1 for h in health_scores.values() if h["status"] == "needs_work"),
"critical_skills": sum(1 for h in health_scores.values() if h["status"] == "critical"),
"total_recommendations": len(report["recommendations"]),
"p0_recommendations": sum(1 for r in report["recommendations"] if r["priority"] == "P0"),
"average_health_score": round(
sum(h["health_score"] for h in health_scores.values()) / len(health_scores)
if health_scores else 0
)
}
with open(output_path, 'w') as f:
json.dump(report, f, indent=2)
return output_path
def print_dashboard(self):
"""Print skill health dashboard to console."""
health_scores = self.calculate_skill_health()
recommendations = self.generate_recommendations()
anti_patterns = self.analyze_anti_patterns()
print("\n" + "="*70)
print(" SKILL HEALTH DASHBOARD")
print("="*70)
# Summary
excellent = sum(1 for h in health_scores.values() if h["status"] == "excellent")
good = sum(1 for h in health_scores.values() if h["status"] == "good")
needs_work = sum(1 for h in health_scores.values() if h["status"] == "needs_work")
critical = sum(1 for h in health_scores.values() if h["status"] == "critical")
print(f"\nš Overview: {len(health_scores)} skills tracked")
print(f" š¢ Excellent: {excellent}")
print(f" š” Good: {good}")
print(f" š Needs Work: {needs_work}")
print(f" š“ Critical: {critical}")
# Top performers
sorted_health = sorted(
health_scores.items(),
key=lambda x: x[1]["health_score"],
reverse=True
)
print("\nš Top Performers:")
for skill, health in sorted_health[:5]:
print(f" {skill}: {health['health_score']}% ({health['status']})")
# Needs attention
print("\nā ļø Needs Attention:")
for skill, health in sorted_health[-5:]:
if health["status"] in ["needs_work", "critical"]:
print(f" {skill}: {health['health_score']}% ({health['status']})")
# Anti-patterns
if anti_patterns:
print("\nšØ Anti-Pattern Alerts:")
for pattern, data in anti_patterns.items():
if data["severity"] in ["critical", "warning"]:
icon = "š“" if data["severity"] == "critical" else "š "
print(f" {icon} {pattern}: {data['total_occurrences']} occurrences ({data['trend']})")
# Priority recommendations
p0_recs = [r for r in recommendations if r["priority"] == "P0"]
if p0_recs:
print("\nš„ P0 - Immediate Action Required:")
for rec in p0_recs[:3]:
print(f" [{rec['type']}] {rec.get('skill', rec.get('pattern', 'general'))}")
print(f" Action: {rec['action']}")
print("\n" + "="*70)
print(f"Run with --export-report for full JSON analysis")
print("="*70 + "\n")
def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Skill Pattern Analyzer for CODITECT-core", formatter_class=argparse.RawDescriptionHelpFormatter )
parser.add_argument(
"--analyze",
action="store_true",
help="Run full pattern analysis"
)
parser.add_argument(
"--dashboard",
action="store_true",
help="Show skill health dashboard"
)
parser.add_argument(
"--trends",
action="store_true",
help="Show trend analysis"
)
parser.add_argument(
"--recommendations",
action="store_true",
help="Generate recommendations"
)
parser.add_argument(
"--export-report",
action="store_true",
help="Export full analysis report"
)
parser.add_argument(
"--output",
help="Output path for report"
)
parser.add_argument(
"--root",
default=".",
help="CODITECT root directory"
)
parser.add_argument(
"--json",
action="store_true",
help="Output as JSON"
)
args = parser.parse_args()
root = Path(args.root).resolve()
analyzer = SkillPatternAnalyzer(root)
try:
if args.dashboard or (not any([args.analyze, args.trends, args.recommendations, args.export_report])):
analyzer.print_dashboard()
elif args.analyze:
results = {
"success_patterns": analyzer.analyze_success_patterns(),
"failure_patterns": analyzer.analyze_failure_patterns(),
"anti_patterns": analyzer.analyze_anti_patterns(),
"health_scores": analyzer.calculate_skill_health()
}
if args.json:
print(json.dumps(results, indent=2))
else:
print("\nš Pattern Analysis Complete")
print(f" Success patterns: {len(results['success_patterns'])} skills")
print(f" Failure patterns: {len(results['failure_patterns'])} skills")
print(f" Anti-patterns: {len(results['anti_patterns'])} types")
print("\nRun with --json for full details")
elif args.trends:
trends = analyzer.analyze_trends()
if args.json:
print(json.dumps(trends, indent=2))
else:
print("\nš Trend Analysis (Last 30 Days)")
print(f" Sessions analyzed: {trends['session_count']}")
print(f"\n Top Improved Skills:")
for s in trends["top_improved"][:3]:
print(f" {s['skill']}: +{s['change']:.0f}%")
print(f"\n Declining Skills:")
for s in trends["top_declined"][:3]:
print(f" {s['skill']}: {s['change']:.0f}%")
elif args.recommendations:
recs = analyzer.generate_recommendations()
if args.json:
print(json.dumps(recs, indent=2))
else:
print("\nš” Recommendations")
for rec in recs[:10]:
print(f"\n [{rec['priority']}] {rec['type']}")
if 'skill' in rec:
print(f" Skill: {rec['skill']}")
print(f" Action: {rec['action']}")
elif args.export_report:
output = Path(args.output) if args.output else None
report_path = analyzer.export_report(output)
print(f"\nā
Report exported to: {report_path}")
except Exception as e:
print(f"\nā Analysis failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if name == "main": main()