#!/usr/bin/env python3 """ Framework Bridge - Automated Sync Between Files and Registries
Maintains consistency between component files (.md, .py, .sh) and JSON registries. Detects drift, auto-updates registries, and validates cross-references.
Part of Component Activation Infrastructure (Phase 4) Created: 2025-11-29 """
import json import hashlib import logging import sys from pathlib import Path from typing import Dict, List, Set, Tuple, Optional from datetime import datetime from dataclasses import dataclass
Import related modules
try: from registry_loader import RegistryLoader from component_activator import ComponentActivator except ImportError: sys.path.insert(0, str(Path(file).parent)) from registry_loader import RegistryLoader from component_activator import ComponentActivator
logger = logging.getLogger(name)
@dataclass class SyncResult: """Result of a sync operation""" files_scanned: int registries_checked: int components_added: int components_updated: int components_removed: int errors: List[str] warnings: List[str] timestamp: str
class FrameworkBridge: """ Framework Bridge - File to Registry Synchronization
Maintains consistency between component files and JSON registries.
Detects changes, updates registries automatically, and validates references.
"""
def __init__(self, framework_root: Optional[Path] = None):
"""
Initialize FrameworkBridge
Args:
framework_root: Path to framework root (auto-detected if None)
"""
if framework_root is None:
# Auto-detect (2 levels up from scripts/core/)
framework_root = Path(__file__).parent.parent.parent
self.framework_root = Path(framework_root)
self.registry_loader = RegistryLoader(self.framework_root)
self.activator = ComponentActivator(self.framework_root)
# Cache for file hashes
self.file_hashes: Dict[str, str] = {}
logger.info(f"FrameworkBridge initialized for: {self.framework_root}")
def compute_file_hash(self, file_path: Path) -> str:
"""Compute MD5 hash of file contents"""
try:
content = file_path.read_bytes()
return hashlib.md5(content).hexdigest()
except Exception as e:
logger.error(f"Error computing hash for {file_path}: {e}")
return ""
def scan_component_files(self) -> Dict[str, List[Path]]:
"""
Scan filesystem for all component files
Returns:
Dictionary mapping component type to list of file paths
"""
components = {
'agent': [],
'skill': [],
'command': [],
'script': [],
'hook': [],
'prompt': []
}
# Scan agents
agents_dir = self.framework_root / "agents"
if agents_dir.exists():
components['agent'] = [
f for f in agents_dir.glob("*.md")
if f.name != "README.md"
]
# Scan skills
skills_dir = self.framework_root / "skills"
if skills_dir.exists():
for skill_dir in skills_dir.iterdir():
if skill_dir.is_dir():
skill_file = skill_dir / "SKILL.md"
if skill_file.exists():
components['skill'].append(skill_file)
# Scan commands
commands_dir = self.framework_root / "commands"
if commands_dir.exists():
components['command'] = [
f for f in commands_dir.glob("*.md")
if f.name != "README.md"
]
# Scan scripts
scripts_dir = self.framework_root / "scripts"
if scripts_dir.exists():
components['script'] = list(scripts_dir.rglob("*.py")) + list(scripts_dir.rglob("*.sh"))
# Scan hooks
hooks_dir = self.framework_root / "hooks"
if hooks_dir.exists():
components['hook'] = list(hooks_dir.glob("*.sh")) + list(hooks_dir.glob("*.py"))
# Scan prompts
prompts_dir = self.framework_root / "prompts"
if prompts_dir.exists():
components['prompt'] = list(prompts_dir.glob("*.md"))
logger.info(f"Scanned files: {sum(len(v) for v in components.values())} total")
return components
def load_registries(self) -> Dict[str, Set[str]]:
"""
Load all registries and extract component names
Returns:
Dictionary mapping component type to set of registered names
"""
# Load main registry
self.activator.load_all_registries()
registries = {
'agent': set(self.activator.agents.keys()),
'skill': set(self.activator.skills.keys()),
'command': set(self.activator.commands.keys()),
'script': set(self.activator.scripts.keys()),
'hook': set(self.activator.hooks.keys()),
'prompt': set(self.activator.prompts.keys())
}
logger.info(f"Loaded registries: {sum(len(v) for v in registries.values())} components")
return registries
def compare_files_vs_registries(self) -> Tuple[Dict, Dict, Dict]:
"""
Compare filesystem files against registries
Returns:
Tuple of (missing_in_registry, missing_in_files, mismatches)
"""
file_components = self.scan_component_files()
registry_components = self.load_registries()
missing_in_registry = {}
missing_in_files = {}
mismatches = {}
for comp_type in file_components.keys():
# Get file-based names
file_names = set()
for file_path in file_components[comp_type]:
if comp_type == 'skill':
# For skills, name is directory name
file_names.add(file_path.parent.name)
elif comp_type == 'script' or comp_type == 'hook':
# For scripts/hooks, name is filename
file_names.add(file_path.name)
else:
# For others, name is filename without extension
file_names.add(file_path.stem)
registry_names = registry_components.get(comp_type, set())
# Find missing in registry
missing = file_names - registry_names
if missing:
missing_in_registry[comp_type] = missing
# Find missing in files (orphaned registry entries)
orphaned = registry_names - file_names
if orphaned:
missing_in_files[comp_type] = orphaned
return missing_in_registry, missing_in_files, mismatches
def sync_registries_from_files(self) -> SyncResult:
"""
Synchronize registries from filesystem
Scans files, updates registries to match, and reports changes.
Returns:
SyncResult with sync statistics
"""
logger.info("Starting registry sync from files...")
result = SyncResult(
files_scanned=0,
registries_checked=0,
components_added=0,
components_updated=0,
components_removed=0,
errors=[],
warnings=[],
timestamp=datetime.now().isoformat()
)
try:
# Scan all components
components = self.registry_loader.load_all_components()
result.files_scanned = len(components)
# Export to registry
registry_path = self.framework_root / "config" / "framework-registry.json"
self.registry_loader.export_to_json(registry_path)
# Compare before/after
missing_in_reg, missing_in_files, _ = self.compare_files_vs_registries()
# Count changes
result.components_added = sum(len(v) for v in missing_in_reg.values())
result.components_removed = sum(len(v) for v in missing_in_files.values())
# Add warnings for orphaned entries
for comp_type, orphaned in missing_in_files.items():
for name in orphaned:
result.warnings.append(f"Orphaned {comp_type}: {name} (in registry but file not found)")
logger.info(f"Sync complete: {result.files_scanned} files scanned, "
f"{result.components_added} added, {result.components_removed} orphaned")
except Exception as e:
logger.error(f"Error during sync: {e}")
result.errors.append(str(e))
return result
def validate_consistency(self) -> Dict[str, any]:
"""
Validate consistency between files and registries
Returns:
Validation report dictionary
"""
logger.info("Validating framework consistency...")
report = {
"timestamp": datetime.now().isoformat(),
"status": "PASS",
"checks": {},
"errors": [],
"warnings": []
}
try:
# Check 1: Files vs Registries
missing_in_reg, missing_in_files, mismatches = self.compare_files_vs_registries()
report["checks"]["files_vs_registries"] = {
"status": "PASS" if not missing_in_reg and not missing_in_files else "WARN",
"missing_in_registry": {k: list(v) for k, v in missing_in_reg.items()},
"orphaned_in_registry": {k: list(v) for k, v in missing_in_files.items()}
}
# Check 2: Registry file exists
registry_path = self.framework_root / "config" / "framework-registry.json"
report["checks"]["registry_exists"] = {
"status": "PASS" if registry_path.exists() else "FAIL",
"path": str(registry_path),
"exists": registry_path.exists()
}
# Check 3: Load validation
try:
self.activator.load_all_registries()
stats = self.activator.get_statistics()
report["checks"]["registry_loadable"] = {
"status": "PASS",
"components_loaded": stats['total_loaded'],
"failed_loads": stats['failed_loads']
}
if stats['failed_loads'] > 0:
report["warnings"].append(f"{stats['failed_loads']} components failed to load")
except Exception as e:
report["checks"]["registry_loadable"] = {
"status": "FAIL",
"error": str(e)
}
report["errors"].append(f"Failed to load registries: {e}")
# Overall status
check_statuses = [check.get("status", "UNKNOWN") for check in report["checks"].values()]
if "FAIL" in check_statuses:
report["status"] = "FAIL"
elif "WARN" in check_statuses:
report["status"] = "WARN"
logger.info(f"Validation complete: {report['status']}")
except Exception as e:
logger.error(f"Validation error: {e}")
report["status"] = "ERROR"
report["errors"].append(str(e))
return report
def generate_sync_report(self, result: SyncResult, output_path: Path) -> None:
"""Generate sync report in Markdown format"""
lines = [
"# Framework Sync Report",
"",
f"**Generated:** {result.timestamp}",
"",
"## Summary",
"",
f"- **Files Scanned:** {result.files_scanned}",
f"- **Registries Checked:** {result.registries_checked}",
f"- **Components Added:** {result.components_added}",
f"- **Components Updated:** {result.components_updated}",
f"- **Components Removed:** {result.components_removed}",
"",
]
if result.errors:
lines.extend([
"## Errors",
"",
])
for error in result.errors:
lines.append(f"- ❌ {error}")
lines.append("")
if result.warnings:
lines.extend([
"## Warnings",
"",
])
for warning in result.warnings:
lines.append(f"- ⚠️ {warning}")
lines.append("")
lines.extend([
"## Status",
"",
f"{'✅ SYNC COMPLETE' if not result.errors else '❌ SYNC FAILED'}",
""
])
output_path.write_text('\n'.join(lines))
logger.info(f"Sync report saved to: {output_path}")
def main(): """Main entry point for framework bridge""" import argparse
parser = argparse.ArgumentParser(description="CODITECT Framework Bridge")
parser.add_argument('--sync', action='store_true', help='Sync registries from files')
parser.add_argument('--validate', action='store_true', help='Validate consistency')
parser.add_argument('--report', type=Path, help='Generate sync report')
parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# Initialize bridge
bridge = FrameworkBridge()
if args.sync:
print("=" * 60)
print("FRAMEWORK SYNC")
print("=" * 60)
result = bridge.sync_registries_from_files()
print(f"\n✅ Sync Complete:")
print(f" Files Scanned: {result.files_scanned}")
print(f" Components Added: {result.components_added}")
print(f" Orphaned Entries: {result.components_removed}")
if result.warnings:
print(f"\n⚠️ Warnings: {len(result.warnings)}")
for warning in result.warnings[:5]:
print(f" - {warning}")
if args.report:
bridge.generate_sync_report(result, args.report)
if args.validate:
print("\n" + "=" * 60)
print("CONSISTENCY VALIDATION")
print("=" * 60)
report = bridge.validate_consistency()
print(f"\n{'✅' if report['status'] == 'PASS' else '❌'} Status: {report['status']}")
for check_name, check_data in report['checks'].items():
status_icon = {"PASS": "✅", "WARN": "⚠️", "FAIL": "❌"}.get(check_data['status'], "❓")
print(f" {status_icon} {check_name}: {check_data['status']}")
if report['errors']:
print(f"\n❌ Errors: {len(report['errors'])}")
for error in report['errors']:
print(f" - {error}")
if report['warnings']:
print(f"\n⚠️ Warnings: {len(report['warnings'])}")
for warning in report['warnings']:
print(f" - {warning}")
# Save validation report
if args.report:
report_path = args.report.with_suffix('.validation.json')
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
print(f"\n📄 Validation report saved: {report_path}")
if not args.sync and not args.validate:
print("FrameworkBridge - Use --sync or --validate")
return 1
return 0
if name == "main": sys.exit(main())