scripts-skill-learning-extractor
#!/usr/bin/env python3 """ CP-15: SkillLearning Node Extractor (ADR-151)
Migrates skill_learning entities from org.db skill_learnings table:
- node_type: 'skill_learning'
- Properties: skill_name, learning, anti_pattern, improvement
Note: skill_learnings table has 758K+ rows. This extractor:
- Aggregates by skill_name to reduce node count
- Samples top learnings per skill
- Computes effectiveness metrics
Source: org.db skill_learnings table Target: org.db kg_nodes table
Created: 2026-02-03 Track: J (Memory Intelligence) Task: J.3.4.5 """
import hashlib import json import logging from collections import defaultdict from pathlib import Path from typing import Any, Dict, Generator, Optional, Tuple
from .base_extractor import SQLiteSourceExtractor
logger = logging.getLogger(name)
class SkillLearningExtractor(SQLiteSourceExtractor): """ Migrate skill_learnings from org.db into kg_nodes.
Strategy: Aggregate learnings by skill_name to create one node per skill
with aggregated learning data. This reduces 758K rows to ~500 skill nodes.
"""
@property
def node_type(self) -> str:
return "skill_learning"
def extract_entities(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract skill_learnings aggregated by skill_name.
Yields:
Tuple of (node_id, name, subtype, properties, source_table, source_id)
"""
conn = self.connect_source()
# Aggregate by skill_name
cursor = conn.execute("""
SELECT
skill_name,
COUNT(*) as learning_count,
AVG(CASE WHEN effectiveness_score IS NOT NULL THEN effectiveness_score ELSE NULL END) as avg_effectiveness,
MAX(analyzed_at) as last_analyzed,
MIN(analyzed_at) as first_analyzed,
COUNT(DISTINCT session_id) as session_count
FROM skill_learnings
WHERE skill_name IS NOT NULL AND skill_name != ''
GROUP BY skill_name
ORDER BY learning_count DESC
""")
for row in cursor:
skill_name = row['skill_name']
learning_count = row['learning_count']
# Generate node_id from skill_name
# Hash the skill name for consistent IDs
skill_hash = hashlib.md5(skill_name.encode()).hexdigest()[:12]
node_id = self.generate_node_id(f"{skill_name}:{skill_hash}")
# Display name
name = f"Skill: {skill_name} ({learning_count} learnings)"
# Determine subtype from skill category
subtype = self._categorize_skill(skill_name)
# Build properties
properties = {
"skill_name": skill_name,
"learning_count": learning_count,
"session_count": row['session_count'],
"first_analyzed": row['first_analyzed'],
"last_analyzed": row['last_analyzed'],
}
# Add effectiveness if computed
if row['avg_effectiveness'] is not None:
properties['avg_effectiveness'] = round(row['avg_effectiveness'], 2)
# Get sample learnings (top 5 by effectiveness)
sample_learnings = self._get_sample_learnings(conn, skill_name, limit=5)
if sample_learnings:
properties['sample_learnings'] = sample_learnings
# Get error patterns if any
error_patterns = self._get_error_patterns(conn, skill_name, limit=3)
if error_patterns:
properties['common_errors'] = error_patterns
# Clean None values
properties = {k: v for k, v in properties.items() if v is not None}
yield (
node_id,
name,
subtype,
properties,
"skill_learnings",
skill_name, # Use skill_name as source_id for aggregated node
)
def _categorize_skill(self, skill_name: str) -> str:
"""
Categorize skill into subtype based on name patterns.
Returns: category like 'development', 'testing', 'documentation', etc.
"""
name_lower = skill_name.lower()
categories = {
"development": ["code", "develop", "implement", "build", "create", "refactor"],
"testing": ["test", "qa", "quality", "validation", "verify"],
"documentation": ["doc", "document", "readme", "guide", "reference"],
"deployment": ["deploy", "release", "ci", "cd", "docker", "kubernetes"],
"database": ["database", "sql", "migration", "schema", "model"],
"security": ["security", "auth", "permission", "encrypt"],
"api": ["api", "rest", "graphql", "endpoint"],
"frontend": ["frontend", "ui", "react", "component", "style"],
"backend": ["backend", "server", "service"],
"analysis": ["analyze", "review", "audit", "assess"],
"optimization": ["optimize", "performance", "cache", "improve"],
"configuration": ["config", "setting", "environment", "setup"],
}
for category, keywords in categories.items():
for keyword in keywords:
if keyword in name_lower:
return category
return "general"
def _get_sample_learnings(self, conn, skill_name: str, limit: int = 5) -> list:
"""
Get sample learnings for a skill.
Returns list of learning outcomes.
"""
cursor = conn.execute("""
SELECT outcome, effectiveness_score
FROM skill_learnings
WHERE skill_name = ? AND outcome IS NOT NULL AND outcome != ''
ORDER BY effectiveness_score DESC NULLS LAST, analyzed_at DESC
LIMIT ?
""", (skill_name, limit))
learnings = []
for row in cursor:
if row['outcome']:
learning = {"outcome": row['outcome'][:200]} # Truncate
if row['effectiveness_score'] is not None:
learning['score'] = row['effectiveness_score']
learnings.append(learning)
return learnings
def _get_error_patterns(self, conn, skill_name: str, limit: int = 3) -> list:
"""
Get common error patterns for a skill.
Returns list of error types.
"""
cursor = conn.execute("""
SELECT errors, COUNT(*) as count
FROM skill_learnings
WHERE skill_name = ? AND errors IS NOT NULL AND errors != '' AND errors != '[]'
GROUP BY errors
ORDER BY count DESC
LIMIT ?
""", (skill_name, limit))
errors = []
for row in cursor:
try:
error_data = json.loads(row['errors'])
if isinstance(error_data, list) and error_data:
errors.extend(error_data[:2]) # Take first 2 from each group
except json.JSONDecodeError:
if row['errors']:
errors.append(row['errors'][:100])
return errors[:limit] if errors else []