Skip to main content

#!/usr/bin/env python3 """ Registry Loader - Component Discovery and Registration System

Scans CODITECT framework for all components (agents, skills, commands, scripts, hooks, prompts) and populates JSON registries for runtime activation.

Part of Component Activation Infrastructure (Phase 2) Created: 2025-11-29 """

import json import os import re import sys from dataclasses import dataclass, asdict from pathlib import Path from typing import List, Dict, Optional, Any import yaml import argparse import logging

Get script directory for path resolution (works from any cwd)

SCRIPT_DIR = Path(file).resolve().parent CORE_ROOT = SCRIPT_DIR.parent.parent # Go up from scripts/core/ to coditect-core/

Setup logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)

@dataclass class ComponentMetadata: """Metadata for a CODITECT component""" name: str type: str # agent, skill, command, script, hook, prompt path: str description: str tags: List[str] version: str = "1.0.0" status: str = "operational" category: Optional[str] = None tools: Optional[List[str]] = None use_cases: Optional[List[str]] = None

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
data = asdict(self)
# Remove None values
return {k: v for k, v in data.items() if v is not None}

class RegistryLoader: """Load and validate CODITECT components"""

def __init__(self, framework_root: Path):
self.framework_root = Path(framework_root)
self.components: List[ComponentMetadata] = []

def scan_agents(self) -> List[ComponentMetadata]:
"""Scan agents/ directory for agent definitions"""
agents = []
agents_dir = self.framework_root / "agents"

if not agents_dir.exists():
logger.warning(f"Agents directory not found: {agents_dir}")
return agents

for agent_file in agents_dir.glob("*.md"):
if agent_file.name == "README.md":
continue

try:
content = agent_file.read_text()
metadata = self._extract_agent_metadata(agent_file.name, content)
if metadata:
agents.append(metadata)
logger.debug(f"Loaded agent: {metadata.name}")
except Exception as e:
logger.error(f"Error loading agent {agent_file}: {e}")

logger.info(f"Loaded {len(agents)} agents")
return agents

def scan_skills(self) -> List[ComponentMetadata]:
"""Scan skills/ directory for skill definitions"""
skills = []
skills_dir = self.framework_root / "skills"

if not skills_dir.exists():
logger.warning(f"Skills directory not found: {skills_dir}")
return skills

for skill_dir in skills_dir.iterdir():
if not skill_dir.is_dir():
continue

skill_file = skill_dir / "SKILL.md"
if not skill_file.exists():
continue

try:
content = skill_file.read_text()
metadata = self._extract_skill_metadata(skill_dir.name, content, skill_file)
if metadata:
skills.append(metadata)
logger.debug(f"Loaded skill: {metadata.name}")
except Exception as e:
logger.error(f"Error loading skill {skill_file}: {e}")

logger.info(f"Loaded {len(skills)} skills")
return skills

def scan_commands(self) -> List[ComponentMetadata]:
"""Scan commands/ directory for command definitions"""
commands = []
commands_dir = self.framework_root / "commands"

if not commands_dir.exists():
logger.warning(f"Commands directory not found: {commands_dir}")
return commands

for command_file in commands_dir.glob("*.md"):
if command_file.name == "README.md":
continue

try:
content = command_file.read_text()
metadata = self._extract_command_metadata(command_file.name, content)
if metadata:
commands.append(metadata)
logger.debug(f"Loaded command: {metadata.name}")
except Exception as e:
logger.error(f"Error loading command {command_file}: {e}")

logger.info(f"Loaded {len(commands)} commands")
return commands

def scan_scripts(self) -> List[ComponentMetadata]:
"""Scan scripts/ directory for Python and shell scripts"""
scripts = []
scripts_dir = self.framework_root / "scripts"

if not scripts_dir.exists():
logger.warning(f"Scripts directory not found: {scripts_dir}")
return scripts

# Scan for Python scripts
for script_file in scripts_dir.rglob("*.py"):
try:
metadata = self._extract_script_metadata(script_file, "python")
if metadata:
scripts.append(metadata)
logger.debug(f"Loaded Python script: {metadata.name}")
except Exception as e:
logger.error(f"Error loading script {script_file}: {e}")

# Scan for shell scripts
for script_file in scripts_dir.rglob("*.sh"):
try:
metadata = self._extract_script_metadata(script_file, "shell")
if metadata:
scripts.append(metadata)
logger.debug(f"Loaded shell script: {metadata.name}")
except Exception as e:
logger.error(f"Error loading script {script_file}: {e}")

logger.info(f"Loaded {len(scripts)} scripts")
return scripts

def scan_hooks(self) -> List[ComponentMetadata]:
"""Scan hooks/ directory for git hooks"""
hooks = []
hooks_dir = self.framework_root / "hooks"

if not hooks_dir.exists():
logger.warning(f"Hooks directory not found: {hooks_dir}")
return hooks

# Scan for shell hooks
for hook_file in hooks_dir.glob("*.sh"):
if hook_file.name == "README.md":
continue

try:
metadata = self._extract_hook_metadata(hook_file, "shell")
if metadata:
hooks.append(metadata)
logger.debug(f"Loaded shell hook: {metadata.name}")
except Exception as e:
logger.error(f"Error loading hook {hook_file}: {e}")

# Scan for Python hook implementations
for hook_file in hooks_dir.glob("*.py"):
try:
metadata = self._extract_hook_metadata(hook_file, "python")
if metadata:
hooks.append(metadata)
logger.debug(f"Loaded Python hook: {metadata.name}")
except Exception as e:
logger.error(f"Error loading hook {hook_file}: {e}")

logger.info(f"Loaded {len(hooks)} hooks")
return hooks

def scan_prompts(self) -> List[ComponentMetadata]:
"""Scan prompts/ directory for prompt templates"""
prompts = []
prompts_dir = self.framework_root / "prompts"

if not prompts_dir.exists():
logger.warning(f"Prompts directory not found: {prompts_dir}")
return prompts

for prompt_file in prompts_dir.glob("*.md"):
try:
content = prompt_file.read_text()
metadata = self._extract_prompt_metadata(prompt_file.name, content)
if metadata:
prompts.append(metadata)
logger.debug(f"Loaded prompt: {metadata.name}")
except Exception as e:
logger.error(f"Error loading prompt {prompt_file}: {e}")

logger.info(f"Loaded {len(prompts)} prompts")
return prompts

def load_all_components(self) -> List[ComponentMetadata]:
"""Scan all component types"""
logger.info("Starting component scan...")

all_components = []
all_components.extend(self.scan_agents())
all_components.extend(self.scan_skills())
all_components.extend(self.scan_commands())
all_components.extend(self.scan_scripts())
all_components.extend(self.scan_hooks())
all_components.extend(self.scan_prompts())

self.components = all_components
logger.info(f"Total components loaded: {len(all_components)}")

return all_components

def validate_component(self, component: ComponentMetadata) -> bool:
"""Validate component has required fields and valid path"""
if not component.name:
logger.error(f"Component missing name: {component}")
return False

if not component.type:
logger.error(f"Component {component.name} missing type")
return False

# Validate path exists
full_path = self.framework_root / component.path
if not full_path.exists():
logger.warning(f"Component path does not exist: {full_path}")
return False

return True

def export_to_json(self, output_path: Path, component_type: Optional[str] = None) -> None:
"""Export components to JSON registry file"""
if component_type:
components = [c for c in self.components if c.type == component_type]
else:
components = self.components

# Group by type
by_type: Dict[str, List[Dict]] = {}
for component in components:
if not self.validate_component(component):
logger.warning(f"Skipping invalid component: {component.name}")
continue

comp_type = component.type
if comp_type not in by_type:
by_type[comp_type] = []

by_type[comp_type].append(component.to_dict())

# Create registry structure
registry = {
"framework_version": "1.0.0",
"last_updated": "2025-11-29",
"total_components": len(components),
"components": by_type
}

# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)

# Write JSON
with open(output_path, 'w') as f:
json.dump(registry, f, indent=2)

logger.info(f"Exported {len(components)} components to {output_path}")

# Helper methods for metadata extraction

def _extract_agent_metadata(self, filename: str, content: str) -> Optional[ComponentMetadata]:
"""Extract metadata from agent markdown file"""
name = filename.replace('.md', '')

# Try to extract YAML frontmatter
frontmatter = self._extract_yaml_frontmatter(content)

if frontmatter:
return ComponentMetadata(
name=name,
type="agent",
path=f"agents/{filename}",
description=frontmatter.get('description', 'No description available'),
tags=frontmatter.get('tags', []),
version=frontmatter.get('version', '1.0.0'),
status=frontmatter.get('status', 'operational'),
category=frontmatter.get('category'),
tools=frontmatter.get('tools'),
use_cases=frontmatter.get('use_cases')
)

# Fallback: extract from content
description = self._extract_first_paragraph(content)

return ComponentMetadata(
name=name,
type="agent",
path=f"agents/{filename}",
description=description,
tags=[],
category=self._infer_agent_category(name, content)
)

def _extract_skill_metadata(self, skill_name: str, content: str, skill_file: Path) -> Optional[ComponentMetadata]:
"""Extract metadata from skill SKILL.md file"""
frontmatter = self._extract_yaml_frontmatter(content)

if frontmatter:
return ComponentMetadata(
name=skill_name,
type="skill",
path=f"skills/{skill_name}/SKILL.md",
description=frontmatter.get('description', 'No description available'),
tags=frontmatter.get('tags', []),
version=frontmatter.get('version', '1.0.0'),
status=frontmatter.get('status', 'operational')
)

# Fallback
description = self._extract_first_paragraph(content)

return ComponentMetadata(
name=skill_name,
type="skill",
path=f"skills/{skill_name}/SKILL.md",
description=description,
tags=[]
)

def _extract_command_metadata(self, filename: str, content: str) -> Optional[ComponentMetadata]:
"""Extract metadata from command markdown file"""
name = filename.replace('.md', '')

frontmatter = self._extract_yaml_frontmatter(content)

if frontmatter:
return ComponentMetadata(
name=name,
type="command",
path=f"commands/{filename}",
description=frontmatter.get('description', 'No description available'),
tags=frontmatter.get('tags', []),
version=frontmatter.get('version', '1.0.0'),
status=frontmatter.get('status', 'operational')
)

# Fallback
description = self._extract_first_paragraph(content)

return ComponentMetadata(
name=name,
type="command",
path=f"commands/{filename}",
description=description,
tags=[]
)

def _extract_script_metadata(self, script_file: Path, script_type: str) -> Optional[ComponentMetadata]:
"""Extract metadata from Python or shell script"""
name = script_file.name
relative_path = script_file.relative_to(self.framework_root)

# Extract docstring (Python) or comments (shell)
content = script_file.read_text()
description = self._extract_script_description(content, script_type)

# Determine category based on directory
category = self._infer_script_category(relative_path)

return ComponentMetadata(
name=name,
type="script",
path=str(relative_path),
description=description,
tags=[script_type, category] if category else [script_type],
category=category
)

def _extract_hook_metadata(self, hook_file: Path, hook_type: str) -> Optional[ComponentMetadata]:
"""Extract metadata from git hook"""
name = hook_file.name
content = hook_file.read_text()

description = self._extract_script_description(content, hook_type)

# Infer hook phase (pre-commit, pre-push, etc.)
hook_phase = self._infer_hook_phase(name)

return ComponentMetadata(
name=name,
type="hook",
path=f"hooks/{name}",
description=description,
tags=[hook_type, hook_phase] if hook_phase else [hook_type],
category=hook_phase
)

def _extract_prompt_metadata(self, filename: str, content: str) -> Optional[ComponentMetadata]:
"""Extract metadata from prompt template"""
name = filename.replace('.md', '')

frontmatter = self._extract_yaml_frontmatter(content)

if frontmatter:
return ComponentMetadata(
name=name,
type="prompt",
path=f"prompts/{filename}",
description=frontmatter.get('description', 'No description available'),
tags=frontmatter.get('tags', []),
version=frontmatter.get('version', '1.0.0')
)

description = self._extract_first_paragraph(content)

return ComponentMetadata(
name=name,
type="prompt",
path=f"prompts/{filename}",
description=description,
tags=[]
)

# Utility methods

def _extract_yaml_frontmatter(self, content: str) -> Optional[Dict]:
"""Extract YAML frontmatter from markdown file"""
match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL)
if match:
try:
return yaml.safe_load(match.group(1))
except yaml.YAMLError as e:
logger.error(f"Error parsing YAML frontmatter: {e}")
return None

def _extract_first_paragraph(self, content: str) -> str:
"""Extract first paragraph as description"""
# Remove frontmatter if present
content = re.sub(r'^---\n.*?\n---\n', '', content, flags=re.DOTALL)

# Find first paragraph
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
return line[:200] # Limit to 200 chars

return "No description available"

def _extract_script_description(self, content: str, script_type: str) -> str:
"""Extract description from script docstring or comments"""
if script_type == "python":
# Look for module docstring
match = re.search(r'"""(.*?)"""', content, re.DOTALL)
if match:
lines = match.group(1).strip().split('\n')
return lines[0][:200] if lines else "No description"
elif script_type == "shell":
# Look for comment at top
lines = content.split('\n')
for line in lines[1:5]: # Check first few lines
if line.strip().startswith('#') and not line.strip().startswith('#!'):
return line.strip('#').strip()[:200]

return "No description available"

def _infer_agent_category(self, name: str, content: str) -> Optional[str]:
"""Infer agent category from name and content"""
if 'research' in name.lower() or 'analyst' in name.lower():
return "research"
elif 'orchestrat' in name.lower():
return "orchestration"
elif 'qa' in name.lower() or 'test' in name.lower() or 'quality' in name.lower():
return "quality"
elif 'devops' in name.lower() or 'cloud' in name.lower() or 'infrastructure' in name.lower():
return "infrastructure"
elif 'developer' in name.lower() or 'architect' in name.lower():
return "development"
elif 'ui' in name.lower() or 'frontend' in name.lower():
return "ui"
elif 'git' in name.lower():
return "git"

return "general"

def _infer_script_category(self, path: Path) -> Optional[str]:
"""Infer script category from directory structure"""
parts = path.parts
if len(parts) >= 2:
if parts[1] == "core":
return "core"
elif parts[1] == "generated":
return "generated"
elif parts[1] == "llm_execution":
return "llm_execution"
elif parts[1] == "workflows":
return "workflows"

return "root"

def _infer_hook_phase(self, filename: str) -> Optional[str]:
"""Infer git hook phase from filename"""
if 'pre-commit' in filename or 'pre_commit' in filename:
return "pre-commit"
elif 'pre-push' in filename or 'pre_push' in filename:
return "pre-push"
elif 'post-commit' in filename:
return "post-commit"

return None

def main(): """Main entry point""" parser = argparse.ArgumentParser(description="CODITECT Registry Loader") parser.add_argument('--root', type=Path, default=CORE_ROOT, help='Framework root directory (default: coditect-core root)') parser.add_argument('--scan', choices=['agents', 'skills', 'commands', 'scripts', 'hooks', 'prompts', 'all'], default='all', help='Component types to scan') parser.add_argument('--export', type=Path, help='Export to JSON file') parser.add_argument('--validate', action='store_true', help='Validate components only') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')

args = parser.parse_args()

if args.verbose:
logger.setLevel(logging.DEBUG)

# Initialize loader
loader = RegistryLoader(args.root)

# Scan components
if args.scan == 'all':
components = loader.load_all_components()
elif args.scan == 'agents':
components = loader.scan_agents()
elif args.scan == 'skills':
components = loader.scan_skills()
elif args.scan == 'commands':
components = loader.scan_commands()
elif args.scan == 'scripts':
components = loader.scan_scripts()
elif args.scan == 'hooks':
components = loader.scan_hooks()
elif args.scan == 'prompts':
components = loader.scan_prompts()

# Validate
if args.validate:
valid_count = sum(1 for c in components if loader.validate_component(c))
logger.info(f"Validation: {valid_count}/{len(components)} components valid")

# Export
if args.export:
loader.export_to_json(args.export, component_type=args.scan if args.scan != 'all' else None)

# Print summary
print(f"\n=== Component Scan Summary ===")
print(f"Total components: {len(components)}")

by_type = {}
for comp in components:
by_type[comp.type] = by_type.get(comp.type, 0) + 1

for comp_type, count in sorted(by_type.items()):
print(f" {comp_type}: {count}")

return 0

if name == "main": sys.exit(main())