scripts-populate-capability-registry
#!/usr/bin/env python3 """
title: "Capability Taxonomy - 12 domain categories with subcapabilities" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Capability Registry Populator Analyzes agents, skills, and commands to extract and populate c..." keywords: ['analysis', 'api', 'automation', 'capability', 'database'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "populate-capability-registry.py" language: python executable: true usage: "python3 scripts/populate-capability-registry.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Capability Registry Populator Analyzes agents, skills, and commands to extract and populate capabilities.
This script:
- Scans all agent, skill, and command files
- Extracts capabilities from descriptions, tools, and content
- Updates framework-registry.json with populated capabilities
- Creates capability-registry.json for the dynamic-capability-router
Usage: python scripts/populate-capability-registry.py [--dry-run] [--verbose] """
import json import os import re import sys from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Set, Tuple, Any import yaml
Capability Taxonomy - 12 domain categories with subcapabilities
CAPABILITY_TAXONOMY = { "code_analysis": { "keywords": ["analyze", "analysis", "review", "inspect", "examine", "understand", "explore", "codebase", "pattern", "structure"], "subcapabilities": ["static_analysis", "pattern_detection", "architecture_analysis", "dependency_analysis", "code_navigation", "structure_mapping"] }, "code_generation": { "keywords": ["generate", "create", "implement", "build", "scaffold", "write", "develop", "code"], "subcapabilities": ["component_generation", "boilerplate_creation", "refactoring", "code_completion", "template_generation"] }, "testing": { "keywords": ["test", "testing", "tdd", "unit", "integration", "e2e", "coverage", "qa", "quality"], "subcapabilities": ["unit_testing", "integration_testing", "e2e_testing", "test_generation", "coverage_analysis", "test_automation"] }, "security": { "keywords": ["security", "vulnerability", "audit", "penetration", "hardening", "compliance", "sast", "secrets", "authentication"], "subcapabilities": ["vulnerability_scanning", "security_audit", "penetration_testing", "compliance_checking", "secrets_detection", "hardening"] }, "devops": { "keywords": ["deploy", "deployment", "ci", "cd", "pipeline", "container", "docker", "kubernetes", "infrastructure", "cloud"], "subcapabilities": ["ci_cd_pipelines", "container_orchestration", "infrastructure_as_code", "deployment_automation", "monitoring_setup"] }, "documentation": { "keywords": ["document", "documentation", "docs", "readme", "api", "guide", "reference", "explain"], "subcapabilities": ["api_documentation", "code_documentation", "user_guides", "architecture_docs", "readme_generation"] }, "research": { "keywords": ["research", "search", "find", "locate", "discover", "investigate", "explore", "web", "fetch"], "subcapabilities": ["web_research", "codebase_research", "competitive_analysis", "market_research", "technical_research"] }, "database": { "keywords": ["database", "db", "sql", "query", "schema", "migration", "foundationdb", "postgres", "redis"], "subcapabilities": ["schema_design", "query_optimization", "migration_management", "database_administration", "data_modeling"] }, "architecture": { "keywords": ["architect", "architecture", "design", "system", "c4", "adr", "pattern", "structure"], "subcapabilities": ["system_design", "api_design", "component_architecture", "pattern_application", "adr_management"] }, "optimization": { "keywords": ["optimize", "performance", "profiling", "memory", "token", "efficiency", "speed"], "subcapabilities": ["performance_optimization", "memory_optimization", "token_optimization", "query_optimization", "load_testing"] }, "project_management": { "keywords": ["project", "plan", "organize", "task", "workflow", "orchestrate", "coordinate", "manage"], "subcapabilities": ["task_management", "workflow_orchestration", "project_planning", "progress_tracking", "resource_allocation"] }, "specialized": { "keywords": ["rust", "react", "vue", "svelte", "flutter", "mobile", "blockchain", "ml", "ai", "websocket", "graphql"], "subcapabilities": ["rust_development", "frontend_frameworks", "mobile_development", "blockchain_development", "ml_ops", "realtime_systems"] } }
Tool to capability mapping
TOOL_CAPABILITIES = { "Read": ["code_analysis", "code_navigation"], "Write": ["code_generation", "documentation"], "Edit": ["code_generation", "refactoring"], "Grep": ["code_analysis", "pattern_detection", "codebase_research"], "Glob": ["code_navigation", "file_discovery"], "Bash": ["devops", "automation", "deployment_automation"], "WebSearch": ["web_research", "market_research"], "WebFetch": ["web_research", "technical_research"], "TodoWrite": ["task_management", "progress_tracking"], "Task": ["workflow_orchestration", "multi_agent_coordination"], "LS": ["code_navigation", "structure_mapping"], }
def extract_yaml_frontmatter(content: str) -> Dict[str, Any]: """Extract YAML frontmatter from markdown file.""" if not content.startswith('---'): return {}
try:
end_idx = content.find('---', 3)
if end_idx == -1:
return {}
yaml_content = content[3:end_idx].strip()
return yaml.safe_load(yaml_content) or {}
except yaml.YAMLError:
return {}
def extract_capabilities_from_text(text: str) -> Set[str]: """Extract capabilities by analyzing text content.""" capabilities = set() text_lower = text.lower()
for domain, config in CAPABILITY_TAXONOMY.items():
# Check keywords
keyword_matches = sum(1 for kw in config["keywords"] if kw in text_lower)
if keyword_matches >= 2: # At least 2 keyword matches
capabilities.add(domain)
# Add specific subcapabilities based on strong matches
for subcap in config["subcapabilities"]:
subcap_readable = subcap.replace("_", " ")
if subcap_readable in text_lower or subcap in text_lower:
capabilities.add(subcap)
return capabilities
def extract_capabilities_from_tools(tools: str) -> Set[str]: """Extract capabilities from tool list.""" capabilities = set() if not tools: return capabilities
# Parse tools - can be string or list
tool_list = []
if isinstance(tools, str):
tool_list = [t.strip() for t in tools.replace(",", " ").split()]
elif isinstance(tools, list):
tool_list = tools
for tool in tool_list:
tool_name = tool.strip()
if tool_name in TOOL_CAPABILITIES:
capabilities.update(TOOL_CAPABILITIES[tool_name])
return capabilities
def analyze_agent_file(file_path: Path) -> Dict[str, Any]: """Analyze an agent file and extract capabilities.""" try: content = file_path.read_text(encoding='utf-8') except Exception as e: print(f" Warning: Could not read {file_path}: {e}") return {}
frontmatter = extract_yaml_frontmatter(content)
# Get basic info
name = frontmatter.get('name', file_path.stem)
description = frontmatter.get('description', '')
tools = frontmatter.get('tools', '')
# Extract capabilities
capabilities = set()
# From description
capabilities.update(extract_capabilities_from_text(description))
# From tools
capabilities.update(extract_capabilities_from_tools(tools))
# From full content (less weight)
content_caps = extract_capabilities_from_text(content)
# Only add content capabilities if we don't have enough from description
if len(capabilities) < 3:
capabilities.update(content_caps)
# Extract use cases from content
use_cases = []
use_case_pattern = r'(?:use case|example|usage)[:\s]*["\']?([^"\'\n]+)["\']?'
matches = re.findall(use_case_pattern, content.lower())
use_cases = list(set(matches))[:5] # Top 5 unique use cases
return {
"name": name,
"description": description,
"capabilities": sorted(list(capabilities)),
"use_cases": use_cases,
"tools": tools if isinstance(tools, str) else ", ".join(tools) if tools else ""
}
def analyze_skill_file(file_path: Path) -> Dict[str, Any]: """Analyze a skill file and extract capabilities.""" try: content = file_path.read_text(encoding='utf-8') except Exception as e: print(f" Warning: Could not read {file_path}: {e}") return {}
frontmatter = extract_yaml_frontmatter(content)
name = frontmatter.get('name', file_path.parent.name)
description = frontmatter.get('description', '')
# Check for explicit capabilities in frontmatter
explicit_caps = frontmatter.get('capabilities', [])
capabilities = set(explicit_caps) if explicit_caps else set()
# Extract from description and content
capabilities.update(extract_capabilities_from_text(description))
if len(capabilities) < 3:
capabilities.update(extract_capabilities_from_text(content))
# Check for auto_triggers
auto_triggers = frontmatter.get('auto_triggers', [])
if auto_triggers:
capabilities.add('auto_triggerable')
return {
"name": name,
"description": description,
"capabilities": sorted(list(capabilities)),
"auto_triggers": auto_triggers
}
def scan_agents(agents_dir: Path, verbose: bool = False) -> Dict[str, Dict]: """Scan all agent files and extract capabilities.""" agents = {}
if not agents_dir.exists():
print(f"Warning: Agents directory not found: {agents_dir}")
return agents
agent_files = list(agents_dir.glob("*.md"))
print(f"Found {len(agent_files)} agent files")
for agent_file in agent_files:
if verbose:
print(f" Analyzing: {agent_file.name}")
agent_data = analyze_agent_file(agent_file)
if agent_data:
agent_id = agent_file.stem
agents[agent_id] = agent_data
if verbose:
caps = agent_data.get('capabilities', [])
print(f" Capabilities: {', '.join(caps[:5])}{'...' if len(caps) > 5 else ''}")
return agents
def scan_skills(skills_dir: Path, verbose: bool = False) -> Dict[str, Dict]: """Scan all skill files and extract capabilities.""" skills = {}
if not skills_dir.exists():
print(f"Warning: Skills directory not found: {skills_dir}")
return skills
skill_files = list(skills_dir.glob("*/SKILL.md"))
print(f"Found {len(skill_files)} skill files")
for skill_file in skill_files:
if verbose:
print(f" Analyzing: {skill_file.parent.name}")
skill_data = analyze_skill_file(skill_file)
if skill_data:
skill_id = skill_file.parent.name
skills[skill_id] = skill_data
if verbose:
caps = skill_data.get('capabilities', [])
print(f" Capabilities: {', '.join(caps[:5])}{'...' if len(caps) > 5 else ''}")
return skills
def update_framework_registry(registry_path: Path, agents: Dict, skills: Dict, dry_run: bool = False): """Update framework-registry.json with extracted capabilities.""" if not registry_path.exists(): print(f"Warning: Registry file not found: {registry_path}") return
try:
with open(registry_path, 'r') as f:
registry = json.load(f)
except Exception as e:
print(f"Error reading registry: {e}")
return
updated_count = 0
# Update agents in registry
if 'components' in registry and 'agents' in registry['components']:
categories = registry['components']['agents'].get('categories', {})
for category, agent_list in categories.items():
for agent in agent_list:
agent_id = agent.get('id', '')
if agent_id in agents:
agent_data = agents[agent_id]
agent['capabilities'] = agent_data.get('capabilities', [])
if agent_data.get('use_cases'):
agent['use_cases'] = agent_data['use_cases']
updated_count += 1
# Update skills in registry
if 'components' in registry and 'skills' in registry['components']:
categories = registry['components']['skills'].get('categories', {})
for category, skill_list in categories.items():
for skill in skill_list:
skill_id = skill.get('id', '')
if skill_id in skills:
skill_data = skills[skill_id]
skill['capabilities'] = skill_data.get('capabilities', [])
if skill_data.get('auto_triggers'):
skill['auto_triggers'] = skill_data['auto_triggers']
updated_count += 1
registry['last_updated'] = datetime.now(timezone.utc).isoformat()
if dry_run:
print(f"\n[DRY RUN] Would update {updated_count} components in framework-registry.json")
else:
try:
with open(registry_path, 'w') as f:
json.dump(registry, f, indent=2)
print(f"\nUpdated {updated_count} components in framework-registry.json")
except Exception as e:
print(f"Error writing registry: {e}")
def create_capability_registry(output_path: Path, agents: Dict, skills: Dict, dry_run: bool = False): """Create a dedicated capability registry for the dynamic router."""
# Build capability index (capability -> components that have it)
capability_index = {}
for agent_id, agent_data in agents.items():
for cap in agent_data.get('capabilities', []):
if cap not in capability_index:
capability_index[cap] = {"agents": [], "skills": []}
capability_index[cap]["agents"].append({
"id": agent_id,
"name": agent_data.get('name', agent_id),
"description": agent_data.get('description', '')[:200]
})
for skill_id, skill_data in skills.items():
for cap in skill_data.get('capabilities', []):
if cap not in capability_index:
capability_index[cap] = {"agents": [], "skills": []}
capability_index[cap]["skills"].append({
"id": skill_id,
"name": skill_data.get('name', skill_id),
"description": skill_data.get('description', '')[:200]
})
registry = {
"version": "1.0.0",
"generated_at": datetime.now(timezone.utc).isoformat(),
"taxonomy": CAPABILITY_TAXONOMY,
"capability_index": capability_index,
"agents": {
agent_id: {
"capabilities": data.get('capabilities', []),
"tools": data.get('tools', ''),
"description": data.get('description', '')[:300]
}
for agent_id, data in agents.items()
},
"skills": {
skill_id: {
"capabilities": data.get('capabilities', []),
"auto_triggers": data.get('auto_triggers', []),
"description": data.get('description', '')[:300]
}
for skill_id, data in skills.items()
},
"statistics": {
"total_agents": len(agents),
"total_skills": len(skills),
"total_capabilities": len(capability_index),
"agents_with_capabilities": sum(1 for a in agents.values() if a.get('capabilities')),
"skills_with_capabilities": sum(1 for s in skills.values() if s.get('capabilities'))
}
}
if dry_run:
print(f"\n[DRY RUN] Would create capability-registry.json with {len(capability_index)} capabilities")
print(f" - {len(agents)} agents analyzed")
print(f" - {len(skills)} skills analyzed")
else:
try:
with open(output_path, 'w') as f:
json.dump(registry, f, indent=2)
print(f"\nCreated capability-registry.json with {len(capability_index)} capabilities")
except Exception as e:
print(f"Error writing capability registry: {e}")
return registry
def main(): import argparse
parser = argparse.ArgumentParser(description="Populate capability registry for dynamic routing")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without making changes")
parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output")
args = parser.parse_args()
# Determine project root
script_dir = Path(__file__).parent
project_root = script_dir.parent
print("=" * 60)
print("CODITECT Capability Registry Populator")
print("=" * 60)
print(f"Project root: {project_root}")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
print()
# Scan components
print("Scanning agents...")
agents = scan_agents(project_root / "agents", args.verbose)
print("\nScanning skills...")
skills = scan_skills(project_root / "skills", args.verbose)
# Summary
print("\n" + "=" * 60)
print("ANALYSIS SUMMARY")
print("=" * 60)
agents_with_caps = sum(1 for a in agents.values() if a.get('capabilities'))
skills_with_caps = sum(1 for s in skills.values() if s.get('capabilities'))
print(f"Agents analyzed: {len(agents)} ({agents_with_caps} with capabilities)")
print(f"Skills analyzed: {len(skills)} ({skills_with_caps} with capabilities)")
# Capability distribution
all_caps = set()
for a in agents.values():
all_caps.update(a.get('capabilities', []))
for s in skills.values():
all_caps.update(s.get('capabilities', []))
print(f"Unique capabilities found: {len(all_caps)}")
# Top capabilities
cap_counts = {}
for a in agents.values():
for cap in a.get('capabilities', []):
cap_counts[cap] = cap_counts.get(cap, 0) + 1
for s in skills.values():
for cap in s.get('capabilities', []):
cap_counts[cap] = cap_counts.get(cap, 0) + 1
print("\nTop 10 capabilities:")
for cap, count in sorted(cap_counts.items(), key=lambda x: -x[1])[:10]:
print(f" {cap}: {count} components")
# Update registries
print("\n" + "=" * 60)
print("UPDATING REGISTRIES")
print("=" * 60)
# Update framework-registry.json
update_framework_registry(
project_root / "config" / "framework-registry.json",
agents, skills, args.dry_run
)
# Create capability-registry.json
create_capability_registry(
project_root / "config" / "capability-registry.json",
agents, skills, args.dry_run
)
print("\n" + "=" * 60)
print("COMPLETE")
print("=" * 60)
if args.dry_run:
print("\nRun without --dry-run to apply changes.")
else:
print("\nCapability registry populated successfully!")
print("The dynamic-capability-router can now use capability-registry.json for routing.")
if name == "main": main()