Skip to main content

scripts-populate-capability-registry

#!/usr/bin/env python3 """

title: "Capability Taxonomy - 12 domain categories with subcapabilities" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Capability Registry Populator Analyzes agents, skills, and commands to extract and populate c..." keywords: ['analysis', 'api', 'automation', 'capability', 'database'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "populate-capability-registry.py" language: python executable: true usage: "python3 scripts/populate-capability-registry.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Capability Registry Populator Analyzes agents, skills, and commands to extract and populate capabilities.

This script:

  1. Scans all agent, skill, and command files
  2. Extracts capabilities from descriptions, tools, and content
  3. Updates framework-registry.json with populated capabilities
  4. Creates capability-registry.json for the dynamic-capability-router

Usage: python scripts/populate-capability-registry.py [--dry-run] [--verbose] """

import json import os import re import sys from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Set, Tuple, Any import yaml

Capability Taxonomy - 12 domain categories with subcapabilities

CAPABILITY_TAXONOMY = { "code_analysis": { "keywords": ["analyze", "analysis", "review", "inspect", "examine", "understand", "explore", "codebase", "pattern", "structure"], "subcapabilities": ["static_analysis", "pattern_detection", "architecture_analysis", "dependency_analysis", "code_navigation", "structure_mapping"] }, "code_generation": { "keywords": ["generate", "create", "implement", "build", "scaffold", "write", "develop", "code"], "subcapabilities": ["component_generation", "boilerplate_creation", "refactoring", "code_completion", "template_generation"] }, "testing": { "keywords": ["test", "testing", "tdd", "unit", "integration", "e2e", "coverage", "qa", "quality"], "subcapabilities": ["unit_testing", "integration_testing", "e2e_testing", "test_generation", "coverage_analysis", "test_automation"] }, "security": { "keywords": ["security", "vulnerability", "audit", "penetration", "hardening", "compliance", "sast", "secrets", "authentication"], "subcapabilities": ["vulnerability_scanning", "security_audit", "penetration_testing", "compliance_checking", "secrets_detection", "hardening"] }, "devops": { "keywords": ["deploy", "deployment", "ci", "cd", "pipeline", "container", "docker", "kubernetes", "infrastructure", "cloud"], "subcapabilities": ["ci_cd_pipelines", "container_orchestration", "infrastructure_as_code", "deployment_automation", "monitoring_setup"] }, "documentation": { "keywords": ["document", "documentation", "docs", "readme", "api", "guide", "reference", "explain"], "subcapabilities": ["api_documentation", "code_documentation", "user_guides", "architecture_docs", "readme_generation"] }, "research": { "keywords": ["research", "search", "find", "locate", "discover", "investigate", "explore", "web", "fetch"], "subcapabilities": ["web_research", "codebase_research", "competitive_analysis", "market_research", "technical_research"] }, "database": { "keywords": ["database", "db", "sql", "query", "schema", "migration", "foundationdb", "postgres", "redis"], "subcapabilities": ["schema_design", "query_optimization", "migration_management", "database_administration", "data_modeling"] }, "architecture": { "keywords": ["architect", "architecture", "design", "system", "c4", "adr", "pattern", "structure"], "subcapabilities": ["system_design", "api_design", "component_architecture", "pattern_application", "adr_management"] }, "optimization": { "keywords": ["optimize", "performance", "profiling", "memory", "token", "efficiency", "speed"], "subcapabilities": ["performance_optimization", "memory_optimization", "token_optimization", "query_optimization", "load_testing"] }, "project_management": { "keywords": ["project", "plan", "organize", "task", "workflow", "orchestrate", "coordinate", "manage"], "subcapabilities": ["task_management", "workflow_orchestration", "project_planning", "progress_tracking", "resource_allocation"] }, "specialized": { "keywords": ["rust", "react", "vue", "svelte", "flutter", "mobile", "blockchain", "ml", "ai", "websocket", "graphql"], "subcapabilities": ["rust_development", "frontend_frameworks", "mobile_development", "blockchain_development", "ml_ops", "realtime_systems"] } }

Tool to capability mapping

TOOL_CAPABILITIES = { "Read": ["code_analysis", "code_navigation"], "Write": ["code_generation", "documentation"], "Edit": ["code_generation", "refactoring"], "Grep": ["code_analysis", "pattern_detection", "codebase_research"], "Glob": ["code_navigation", "file_discovery"], "Bash": ["devops", "automation", "deployment_automation"], "WebSearch": ["web_research", "market_research"], "WebFetch": ["web_research", "technical_research"], "TodoWrite": ["task_management", "progress_tracking"], "Task": ["workflow_orchestration", "multi_agent_coordination"], "LS": ["code_navigation", "structure_mapping"], }

def extract_yaml_frontmatter(content: str) -> Dict[str, Any]: """Extract YAML frontmatter from markdown file.""" if not content.startswith('---'): return {}

try:
end_idx = content.find('---', 3)
if end_idx == -1:
return {}
yaml_content = content[3:end_idx].strip()
return yaml.safe_load(yaml_content) or {}
except yaml.YAMLError:
return {}

def extract_capabilities_from_text(text: str) -> Set[str]: """Extract capabilities by analyzing text content.""" capabilities = set() text_lower = text.lower()

for domain, config in CAPABILITY_TAXONOMY.items():
# Check keywords
keyword_matches = sum(1 for kw in config["keywords"] if kw in text_lower)
if keyword_matches >= 2: # At least 2 keyword matches
capabilities.add(domain)

# Add specific subcapabilities based on strong matches
for subcap in config["subcapabilities"]:
subcap_readable = subcap.replace("_", " ")
if subcap_readable in text_lower or subcap in text_lower:
capabilities.add(subcap)

return capabilities

def extract_capabilities_from_tools(tools: str) -> Set[str]: """Extract capabilities from tool list.""" capabilities = set() if not tools: return capabilities

# Parse tools - can be string or list
tool_list = []
if isinstance(tools, str):
tool_list = [t.strip() for t in tools.replace(",", " ").split()]
elif isinstance(tools, list):
tool_list = tools

for tool in tool_list:
tool_name = tool.strip()
if tool_name in TOOL_CAPABILITIES:
capabilities.update(TOOL_CAPABILITIES[tool_name])

return capabilities

def analyze_agent_file(file_path: Path) -> Dict[str, Any]: """Analyze an agent file and extract capabilities.""" try: content = file_path.read_text(encoding='utf-8') except Exception as e: print(f" Warning: Could not read {file_path}: {e}") return {}

frontmatter = extract_yaml_frontmatter(content)

# Get basic info
name = frontmatter.get('name', file_path.stem)
description = frontmatter.get('description', '')
tools = frontmatter.get('tools', '')

# Extract capabilities
capabilities = set()

# From description
capabilities.update(extract_capabilities_from_text(description))

# From tools
capabilities.update(extract_capabilities_from_tools(tools))

# From full content (less weight)
content_caps = extract_capabilities_from_text(content)
# Only add content capabilities if we don't have enough from description
if len(capabilities) < 3:
capabilities.update(content_caps)

# Extract use cases from content
use_cases = []
use_case_pattern = r'(?:use case|example|usage)[:\s]*["\']?([^"\'\n]+)["\']?'
matches = re.findall(use_case_pattern, content.lower())
use_cases = list(set(matches))[:5] # Top 5 unique use cases

return {
"name": name,
"description": description,
"capabilities": sorted(list(capabilities)),
"use_cases": use_cases,
"tools": tools if isinstance(tools, str) else ", ".join(tools) if tools else ""
}

def analyze_skill_file(file_path: Path) -> Dict[str, Any]: """Analyze a skill file and extract capabilities.""" try: content = file_path.read_text(encoding='utf-8') except Exception as e: print(f" Warning: Could not read {file_path}: {e}") return {}

frontmatter = extract_yaml_frontmatter(content)

name = frontmatter.get('name', file_path.parent.name)
description = frontmatter.get('description', '')

# Check for explicit capabilities in frontmatter
explicit_caps = frontmatter.get('capabilities', [])

capabilities = set(explicit_caps) if explicit_caps else set()

# Extract from description and content
capabilities.update(extract_capabilities_from_text(description))

if len(capabilities) < 3:
capabilities.update(extract_capabilities_from_text(content))

# Check for auto_triggers
auto_triggers = frontmatter.get('auto_triggers', [])
if auto_triggers:
capabilities.add('auto_triggerable')

return {
"name": name,
"description": description,
"capabilities": sorted(list(capabilities)),
"auto_triggers": auto_triggers
}

def scan_agents(agents_dir: Path, verbose: bool = False) -> Dict[str, Dict]: """Scan all agent files and extract capabilities.""" agents = {}

if not agents_dir.exists():
print(f"Warning: Agents directory not found: {agents_dir}")
return agents

agent_files = list(agents_dir.glob("*.md"))
print(f"Found {len(agent_files)} agent files")

for agent_file in agent_files:
if verbose:
print(f" Analyzing: {agent_file.name}")

agent_data = analyze_agent_file(agent_file)
if agent_data:
agent_id = agent_file.stem
agents[agent_id] = agent_data

if verbose:
caps = agent_data.get('capabilities', [])
print(f" Capabilities: {', '.join(caps[:5])}{'...' if len(caps) > 5 else ''}")

return agents

def scan_skills(skills_dir: Path, verbose: bool = False) -> Dict[str, Dict]: """Scan all skill files and extract capabilities.""" skills = {}

if not skills_dir.exists():
print(f"Warning: Skills directory not found: {skills_dir}")
return skills

skill_files = list(skills_dir.glob("*/SKILL.md"))
print(f"Found {len(skill_files)} skill files")

for skill_file in skill_files:
if verbose:
print(f" Analyzing: {skill_file.parent.name}")

skill_data = analyze_skill_file(skill_file)
if skill_data:
skill_id = skill_file.parent.name
skills[skill_id] = skill_data

if verbose:
caps = skill_data.get('capabilities', [])
print(f" Capabilities: {', '.join(caps[:5])}{'...' if len(caps) > 5 else ''}")

return skills

def update_framework_registry(registry_path: Path, agents: Dict, skills: Dict, dry_run: bool = False): """Update framework-registry.json with extracted capabilities.""" if not registry_path.exists(): print(f"Warning: Registry file not found: {registry_path}") return

try:
with open(registry_path, 'r') as f:
registry = json.load(f)
except Exception as e:
print(f"Error reading registry: {e}")
return

updated_count = 0

# Update agents in registry
if 'components' in registry and 'agents' in registry['components']:
categories = registry['components']['agents'].get('categories', {})
for category, agent_list in categories.items():
for agent in agent_list:
agent_id = agent.get('id', '')
if agent_id in agents:
agent_data = agents[agent_id]
agent['capabilities'] = agent_data.get('capabilities', [])
if agent_data.get('use_cases'):
agent['use_cases'] = agent_data['use_cases']
updated_count += 1

# Update skills in registry
if 'components' in registry and 'skills' in registry['components']:
categories = registry['components']['skills'].get('categories', {})
for category, skill_list in categories.items():
for skill in skill_list:
skill_id = skill.get('id', '')
if skill_id in skills:
skill_data = skills[skill_id]
skill['capabilities'] = skill_data.get('capabilities', [])
if skill_data.get('auto_triggers'):
skill['auto_triggers'] = skill_data['auto_triggers']
updated_count += 1

registry['last_updated'] = datetime.now(timezone.utc).isoformat()

if dry_run:
print(f"\n[DRY RUN] Would update {updated_count} components in framework-registry.json")
else:
try:
with open(registry_path, 'w') as f:
json.dump(registry, f, indent=2)
print(f"\nUpdated {updated_count} components in framework-registry.json")
except Exception as e:
print(f"Error writing registry: {e}")

def create_capability_registry(output_path: Path, agents: Dict, skills: Dict, dry_run: bool = False): """Create a dedicated capability registry for the dynamic router."""

# Build capability index (capability -> components that have it)
capability_index = {}

for agent_id, agent_data in agents.items():
for cap in agent_data.get('capabilities', []):
if cap not in capability_index:
capability_index[cap] = {"agents": [], "skills": []}
capability_index[cap]["agents"].append({
"id": agent_id,
"name": agent_data.get('name', agent_id),
"description": agent_data.get('description', '')[:200]
})

for skill_id, skill_data in skills.items():
for cap in skill_data.get('capabilities', []):
if cap not in capability_index:
capability_index[cap] = {"agents": [], "skills": []}
capability_index[cap]["skills"].append({
"id": skill_id,
"name": skill_data.get('name', skill_id),
"description": skill_data.get('description', '')[:200]
})

registry = {
"version": "1.0.0",
"generated_at": datetime.now(timezone.utc).isoformat(),
"taxonomy": CAPABILITY_TAXONOMY,
"capability_index": capability_index,
"agents": {
agent_id: {
"capabilities": data.get('capabilities', []),
"tools": data.get('tools', ''),
"description": data.get('description', '')[:300]
}
for agent_id, data in agents.items()
},
"skills": {
skill_id: {
"capabilities": data.get('capabilities', []),
"auto_triggers": data.get('auto_triggers', []),
"description": data.get('description', '')[:300]
}
for skill_id, data in skills.items()
},
"statistics": {
"total_agents": len(agents),
"total_skills": len(skills),
"total_capabilities": len(capability_index),
"agents_with_capabilities": sum(1 for a in agents.values() if a.get('capabilities')),
"skills_with_capabilities": sum(1 for s in skills.values() if s.get('capabilities'))
}
}

if dry_run:
print(f"\n[DRY RUN] Would create capability-registry.json with {len(capability_index)} capabilities")
print(f" - {len(agents)} agents analyzed")
print(f" - {len(skills)} skills analyzed")
else:
try:
with open(output_path, 'w') as f:
json.dump(registry, f, indent=2)
print(f"\nCreated capability-registry.json with {len(capability_index)} capabilities")
except Exception as e:
print(f"Error writing capability registry: {e}")

return registry

def main(): import argparse

parser = argparse.ArgumentParser(description="Populate capability registry for dynamic routing")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without making changes")
parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output")
args = parser.parse_args()

# Determine project root
script_dir = Path(__file__).parent
project_root = script_dir.parent

print("=" * 60)
print("CODITECT Capability Registry Populator")
print("=" * 60)
print(f"Project root: {project_root}")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
print()

# Scan components
print("Scanning agents...")
agents = scan_agents(project_root / "agents", args.verbose)

print("\nScanning skills...")
skills = scan_skills(project_root / "skills", args.verbose)

# Summary
print("\n" + "=" * 60)
print("ANALYSIS SUMMARY")
print("=" * 60)

agents_with_caps = sum(1 for a in agents.values() if a.get('capabilities'))
skills_with_caps = sum(1 for s in skills.values() if s.get('capabilities'))

print(f"Agents analyzed: {len(agents)} ({agents_with_caps} with capabilities)")
print(f"Skills analyzed: {len(skills)} ({skills_with_caps} with capabilities)")

# Capability distribution
all_caps = set()
for a in agents.values():
all_caps.update(a.get('capabilities', []))
for s in skills.values():
all_caps.update(s.get('capabilities', []))

print(f"Unique capabilities found: {len(all_caps)}")

# Top capabilities
cap_counts = {}
for a in agents.values():
for cap in a.get('capabilities', []):
cap_counts[cap] = cap_counts.get(cap, 0) + 1
for s in skills.values():
for cap in s.get('capabilities', []):
cap_counts[cap] = cap_counts.get(cap, 0) + 1

print("\nTop 10 capabilities:")
for cap, count in sorted(cap_counts.items(), key=lambda x: -x[1])[:10]:
print(f" {cap}: {count} components")

# Update registries
print("\n" + "=" * 60)
print("UPDATING REGISTRIES")
print("=" * 60)

# Update framework-registry.json
update_framework_registry(
project_root / "config" / "framework-registry.json",
agents, skills, args.dry_run
)

# Create capability-registry.json
create_capability_registry(
project_root / "config" / "capability-registry.json",
agents, skills, args.dry_run
)

print("\n" + "=" * 60)
print("COMPLETE")
print("=" * 60)

if args.dry_run:
print("\nRun without --dry-run to apply changes.")
else:
print("\nCapability registry populated successfully!")
print("The dynamic-capability-router can now use capability-registry.json for routing.")

if name == "main": main()