Skip to main content

#!/usr/bin/env python3 """ H.1.6: Capability-Based Agent Discovery

Provides capability-based agent/component discovery by integrating:

  1. Component indexer SQLite database (FTS5 search)
  2. H.1.5 DiscoveryService (Redis/local backend)
  3. Frontmatter capability extraction

Usage: python3 scripts/core/capability_discovery.py "task description" python3 scripts/core/capability_discovery.py --capability "security" python3 scripts/core/capability_discovery.py --domain "backend" python3 scripts/core/capability_discovery.py --sync # Sync to discovery service """

import asyncio import argparse import json import re import sqlite3 from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Set, Tuple, Any

Add parent to path for imports

import sys sys.path.insert(0, str(Path(file).parent.parent.parent))

try: from scripts.core.discovery_service import ( DiscoveryService, Component, Capability, ComponentStatus, ) HAS_DISCOVERY_SERVICE = True except ImportError: HAS_DISCOVERY_SERVICE = False

=============================================================================

Configuration

=============================================================================

SCRIPT_DIR = Path(file).parent ROOT_DIR = SCRIPT_DIR.parent.parent

ADR-114 & ADR-118: Use centralized path discovery

try: from paths import get_platform_db_path, PLATFORM_DB as _PLATFORM_DB PLATFORM_DB = _PLATFORM_DB # Component data goes to platform.db (Tier 1) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): PLATFORM_DB = _user_data / "platform.db" else: PLATFORM_DB = Path.home() / ".coditect" / "context-storage" / "platform.db"

DB_PATH = PLATFORM_DB # Backward compatibility alias

Capability classification keywords

CAPABILITY_KEYWORDS = { # Actions "review": ["review", "audit", "check", "validate", "verify", "assess", "inspect"], "create": ["create", "generate", "build", "write", "produce", "make", "compose"], "analyze": ["analyze", "examine", "investigate", "research", "study", "diagnose"], "deploy": ["deploy", "release", "ship", "publish", "launch", "rollout"], "test": ["test", "verify", "validate", "check", "assert", "qa", "quality"], "document": ["document", "describe", "explain", "annotate", "readme", "guide"], "optimize": ["optimize", "improve", "enhance", "tune", "speed", "performance"], "secure": ["secure", "protect", "harden", "encrypt", "authenticate", "security"], "orchestrate": ["orchestrate", "coordinate", "manage", "schedule", "workflow"], "transform": ["transform", "convert", "migrate", "translate", "refactor"], "debug": ["debug", "troubleshoot", "fix", "diagnose", "resolve", "error"], "monitor": ["monitor", "observe", "trace", "log", "metrics", "alert"], "integrate": ["integrate", "connect", "api", "webhook", "sync", "bridge"], }

DOMAIN_KEYWORDS = { "security": ["security", "auth", "encryption", "vulnerability", "owasp", "penetration"], "compliance": ["compliance", "hipaa", "gdpr", "soc2", "fda", "audit", "regulatory"], "performance": ["performance", "speed", "latency", "throughput", "memory", "benchmark"], "testing": ["test", "coverage", "unit", "integration", "e2e", "qa", "mock"], "documentation": ["document", "readme", "guide", "reference", "api-doc", "markdown"], "devops": ["deploy", "ci", "cd", "docker", "kubernetes", "infrastructure", "helm"], "database": ["database", "sql", "query", "schema", "migration", "postgres", "redis"], "frontend": ["frontend", "react", "vue", "css", "ui", "component", "typescript"], "backend": ["backend", "api", "server", "endpoint", "handler", "rest", "graphql"], "ai": ["ai", "llm", "model", "prompt", "agent", "ml", "embedding", "rag"], "architecture": ["architecture", "design", "pattern", "system", "diagram", "c4"], "code-quality": ["lint", "format", "style", "convention", "clean", "refactor"], }

=============================================================================

Data Classes

=============================================================================

@dataclass class CapabilityMatch: """A matched capability with confidence score.""" capability: str capability_type: str # action, domain, primary, tag confidence: float source: str # where it was matched (frontmatter, content, keyword)

@dataclass class AgentRecommendation: """An agent recommendation with reasoning.""" component_id: str name: str component_type: str description: str confidence: float matched_capabilities: List[CapabilityMatch] path: str llm_model: str = "" invocation_method: str = ""

def to_dict(self) -> Dict[str, Any]:
return {
"component_id": self.component_id,
"name": self.name,
"component_type": self.component_type,
"description": self.description,
"confidence": self.confidence,
"matched_capabilities": [
{
"capability": m.capability,
"type": m.capability_type,
"confidence": m.confidence,
"source": m.source
}
for m in self.matched_capabilities
],
"path": self.path,
"llm_model": self.llm_model,
"invocation_method": self.invocation_method,
}

=============================================================================

Capability Extraction

=============================================================================

def extract_capabilities_from_query(query: str) -> List[CapabilityMatch]: """Extract capability indicators from a natural language query.""" query_lower = query.lower() matches = []

# Match action keywords
for action, keywords in CAPABILITY_KEYWORDS.items():
for keyword in keywords:
if keyword in query_lower:
# Calculate confidence based on keyword specificity
confidence = 0.9 if keyword == action else 0.7
matches.append(CapabilityMatch(
capability=action,
capability_type="action",
confidence=confidence,
source="keyword"
))
break

# Match domain keywords
for domain, keywords in DOMAIN_KEYWORDS.items():
for keyword in keywords:
if keyword in query_lower:
confidence = 0.9 if keyword == domain else 0.7
matches.append(CapabilityMatch(
capability=domain,
capability_type="domain",
confidence=confidence,
source="keyword"
))
break

return matches

def extract_keywords_from_query(query: str) -> List[str]: """Extract searchable keywords from query.""" # Remove common words stop_words = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can", "this", "that", "these", "those", "it", "its", "you", "your", "we", "our", "me", "my", "i", "need", "want", "help", "please", "how" }

# Extract words
words = re.findall(r'\b[a-z]{3,}\b', query.lower())
keywords = [w for w in words if w not in stop_words]

return list(dict.fromkeys(keywords)) # Dedupe while preserving order

=============================================================================

Database Queries

=============================================================================

def get_db_connection() -> sqlite3.Connection: """Get database connection with row factory.""" if not DB_PATH.exists(): raise FileNotFoundError(f"Database not found at {DB_PATH}. Run component-indexer.py first.")

conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn

def search_components_fts(query: str, component_type: Optional[str] = None, limit: int = 10) -> List[Dict]: """Search components using FTS5 full-text search.""" conn = get_db_connection()

try:
# Build FTS query
fts_query = " OR ".join(query.split())

sql = """
SELECT
c.id, c.name, c.type, c.description, c.path,
c.llm_model, c.invocation_method,
cs.rank
FROM component_search cs
JOIN components c ON c.id = cs.id
WHERE component_search MATCH ?
"""
params = [fts_query]

if component_type:
sql += " AND c.type = ?"
params.append(component_type)

sql += " ORDER BY cs.rank LIMIT ?"
params.append(limit)

cursor = conn.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
return results

finally:
conn.close()

def search_by_capability(capability: str, capability_type: Optional[str] = None, limit: int = 10) -> List[Dict]: """Search components by specific capability.""" conn = get_db_connection()

try:
sql = """
SELECT DISTINCT
c.id, c.name, c.type, c.description, c.path,
c.llm_model, c.invocation_method,
cap.capability, cap.capability_type
FROM components c
JOIN capabilities cap ON cap.component_id = c.id
WHERE cap.capability LIKE ?
"""
params = [f"%{capability}%"]

if capability_type:
sql += " AND cap.capability_type = ?"
params.append(capability_type)

sql += " LIMIT ?"
params.append(limit)

cursor = conn.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
return results

finally:
conn.close()

def get_component_capabilities(component_id: str) -> List[Dict]: """Get all capabilities for a component.""" conn = get_db_connection()

try:
cursor = conn.execute("""
SELECT capability, capability_type
FROM capabilities
WHERE component_id = ?
""", [component_id])
return [dict(row) for row in cursor.fetchall()]

finally:
conn.close()

def get_all_agents(limit: int = 200) -> List[Dict]: """Get all agent components.""" conn = get_db_connection()

try:
cursor = conn.execute("""
SELECT
c.id, c.name, c.type, c.description, c.path,
c.llm_model, c.invocation_method
FROM components c
WHERE c.type = 'agent'
LIMIT ?
""", [limit])
return [dict(row) for row in cursor.fetchall()]

finally:
conn.close()

def get_components_by_domain(domain: str, limit: int = 20) -> List[Dict]: """Get components by domain capability.""" return search_by_capability(domain, capability_type="domain", limit=limit)

def get_components_by_action(action: str, limit: int = 20) -> List[Dict]: """Get components by action capability.""" return search_by_capability(action, capability_type="action", limit=limit)

=============================================================================

Recommendation Engine

=============================================================================

def find_agents_for_task(task_description: str, limit: int = 5) -> List[AgentRecommendation]: """ Find the best agents for a given task description.

Uses multi-signal matching:
1. FTS5 full-text search
2. Capability keyword matching
3. Domain matching
"""
recommendations = []
seen_ids = set()

# Extract capabilities from query
capability_matches = extract_capabilities_from_query(task_description)
keywords = extract_keywords_from_query(task_description)

# Build search query from keywords
search_query = " ".join(keywords)

# Signal 1: FTS search
if search_query:
fts_results = search_components_fts(search_query, component_type="agent", limit=limit * 2)
for result in fts_results:
if result["id"] not in seen_ids:
seen_ids.add(result["id"])

# Get component capabilities
component_caps = get_component_capabilities(result["id"])
matched_caps = []

# Check which query capabilities match component capabilities
for cap_match in capability_matches:
for comp_cap in component_caps:
if cap_match.capability in comp_cap["capability"].lower():
matched_caps.append(cap_match)
break

# Calculate confidence from FTS rank and capability matches
base_confidence = min(1.0, -result.get("rank", -5) / 10 + 0.5)
cap_bonus = len(matched_caps) * 0.1
confidence = min(1.0, base_confidence + cap_bonus)

recommendations.append(AgentRecommendation(
component_id=result["id"],
name=result["name"],
component_type=result["type"],
description=result.get("description", ""),
confidence=confidence,
matched_capabilities=matched_caps,
path=result.get("path", ""),
llm_model=result.get("llm_model", ""),
invocation_method=result.get("invocation_method", ""),
))

# Signal 2: Search by extracted capabilities
for cap_match in capability_matches:
cap_results = search_by_capability(cap_match.capability, limit=limit)
for result in cap_results:
if result["id"] not in seen_ids and result["type"] == "agent":
seen_ids.add(result["id"])

recommendations.append(AgentRecommendation(
component_id=result["id"],
name=result["name"],
component_type=result["type"],
description=result.get("description", ""),
confidence=cap_match.confidence * 0.8, # Slightly lower than FTS
matched_capabilities=[cap_match],
path=result.get("path", ""),
llm_model=result.get("llm_model", ""),
invocation_method=result.get("invocation_method", ""),
))

# Sort by confidence and limit
recommendations.sort(key=lambda r: r.confidence, reverse=True)
return recommendations[:limit]

=============================================================================

Discovery Service Integration

=============================================================================

async def sync_to_discovery_service( redis_url: Optional[str] = None, force_local: bool = False ) -> Dict[str, Any]: """ Sync indexed components to the discovery service.

This bridges H.1.5 (DiscoveryService) with the component indexer database.
"""
if not HAS_DISCOVERY_SERVICE:
return {
"success": False,
"error": "discovery_service module not found"
}

service = DiscoveryService(redis_url=redis_url, force_local=force_local)

conn = get_db_connection()
registered = 0
errors = []

try:
# Get all agents from indexer database
cursor = conn.execute("""
SELECT
c.id, c.name, c.type, c.description, c.path,
c.llm_model, c.status, c.category
FROM components c
WHERE c.type = 'agent'
""")

for row in cursor:
try:
# Get capabilities for this agent
cap_cursor = conn.execute("""
SELECT capability, capability_type
FROM capabilities
WHERE component_id = ?
""", [row["id"]])

capabilities = []
for cap_row in cap_cursor:
capabilities.append(Capability(
name=cap_row["capability"],
description=f"{cap_row['capability_type']} capability",
tags=[cap_row["capability_type"]],
))

# Create component for discovery service
component = Component(
id=row["id"],
name=row["name"],
component_type=row["type"],
capabilities=capabilities,
status=ComponentStatus.AVAILABLE,
metadata={
"path": row["path"],
"llm_model": row["llm_model"] or "",
"category": row["category"] or "",
}
)

# Register with discovery service
success = await service.register(component)
if success:
registered += 1
else:
errors.append(f"Failed to register {row['id']}")

except Exception as e:
errors.append(f"Error processing {row['id']}: {str(e)}")

stats = await service.get_stats()

return {
"success": True,
"registered": registered,
"errors": errors[:10], # Limit error output
"total_errors": len(errors),
"discovery_stats": stats,
}

finally:
conn.close()

=============================================================================

CLI Interface

=============================================================================

def print_recommendations(recommendations: List[AgentRecommendation], json_output: bool = False): """Print agent recommendations.""" if json_output: print(json.dumps([r.to_dict() for r in recommendations], indent=2)) return

if not recommendations:
print("\nNo matching agents found.")
return

print()
print("=" * 70)
print("AGENT RECOMMENDATIONS")
print("=" * 70)
print()

for i, rec in enumerate(recommendations, 1):
confidence_bar = "=" * int(rec.confidence * 20)
confidence_pct = f"{rec.confidence * 100:.0f}%"

print(f"{i}. {rec.name}")
print(f" ID: {rec.component_id}")
print(f" Confidence: [{confidence_bar:<20}] {confidence_pct}")

if rec.description:
desc = rec.description[:100] + "..." if len(rec.description) > 100 else rec.description
print(f" Description: {desc}")

if rec.matched_capabilities:
caps = ", ".join([f"{c.capability}({c.capability_type})" for c in rec.matched_capabilities[:3]])
print(f" Matched: {caps}")

if rec.llm_model:
print(f" Model: {rec.llm_model}")

# Show invocation
print(f" Invoke: /agent {rec.name} \"task\"")
print()

def main(): parser = argparse.ArgumentParser( description="H.1.6: Capability-Based Agent Discovery", formatter_class=argparse.RawDescriptionHelpFormatter )

parser.add_argument(
"query",
nargs="?",
help="Natural language task description to find agents for"
)

parser.add_argument(
"--capability", "-c",
help="Search by specific capability"
)

parser.add_argument(
"--domain", "-d",
help="Search by domain (security, backend, frontend, etc.)"
)

parser.add_argument(
"--action", "-a",
help="Search by action (review, create, deploy, etc.)"
)

parser.add_argument(
"--limit", "-l",
type=int,
default=5,
help="Maximum number of recommendations (default: 5)"
)

parser.add_argument(
"--json", "-j",
action="store_true",
help="Output as JSON"
)

parser.add_argument(
"--sync",
action="store_true",
help="Sync indexed components to discovery service"
)

parser.add_argument(
"--redis",
help="Redis URL for discovery service (optional)"
)

parser.add_argument(
"--local",
action="store_true",
help="Force local discovery backend"
)

parser.add_argument(
"--stats",
action="store_true",
help="Show database statistics"
)

args = parser.parse_args()

# Handle sync operation
if args.sync:
print("Syncing to discovery service...")
result = asyncio.run(sync_to_discovery_service(
redis_url=args.redis,
force_local=args.local
))

if args.json:
print(json.dumps(result, indent=2))
else:
if result["success"]:
print(f" Registered: {result['registered']} agents")
if result["total_errors"] > 0:
print(f" Errors: {result['total_errors']}")
print(f" Backend: {result['discovery_stats'].get('backend', 'unknown')}")
else:
print(f" Error: {result.get('error', 'Unknown error')}")
return

# Handle stats
if args.stats:
conn = get_db_connection()
try:
stats = {}

# Component counts by type
cursor = conn.execute("""
SELECT type, COUNT(*) as count
FROM components
GROUP BY type
ORDER BY count DESC
""")
stats["components_by_type"] = {row["type"]: row["count"] for row in cursor}

# Capability counts
cursor = conn.execute("""
SELECT capability_type, COUNT(*) as count
FROM capabilities
GROUP BY capability_type
""")
stats["capabilities_by_type"] = {row["capability_type"]: row["count"] for row in cursor}

# Total
cursor = conn.execute("SELECT COUNT(*) FROM components")
stats["total_components"] = cursor.fetchone()[0]

cursor = conn.execute("SELECT COUNT(*) FROM capabilities")
stats["total_capabilities"] = cursor.fetchone()[0]

if args.json:
print(json.dumps(stats, indent=2))
else:
print()
print("=" * 50)
print("CAPABILITY DISCOVERY STATS")
print("=" * 50)
print()
print("Components by Type:")
for comp_type, count in stats["components_by_type"].items():
print(f" {comp_type}: {count}")
print()
print("Capabilities by Type:")
for cap_type, count in stats["capabilities_by_type"].items():
print(f" {cap_type}: {count}")
print()
print(f"Total Components: {stats['total_components']}")
print(f"Total Capabilities: {stats['total_capabilities']}")

finally:
conn.close()
return

# Handle searches
recommendations = []

if args.domain:
results = get_components_by_domain(args.domain, limit=args.limit)
for r in results:
if r["type"] == "agent":
recommendations.append(AgentRecommendation(
component_id=r["id"],
name=r["name"],
component_type=r["type"],
description=r.get("description", ""),
confidence=0.8,
matched_capabilities=[CapabilityMatch(
capability=args.domain,
capability_type="domain",
confidence=0.8,
source="domain_filter"
)],
path=r.get("path", ""),
llm_model=r.get("llm_model", ""),
))

elif args.action:
results = get_components_by_action(args.action, limit=args.limit)
for r in results:
if r["type"] == "agent":
recommendations.append(AgentRecommendation(
component_id=r["id"],
name=r["name"],
component_type=r["type"],
description=r.get("description", ""),
confidence=0.8,
matched_capabilities=[CapabilityMatch(
capability=args.action,
capability_type="action",
confidence=0.8,
source="action_filter"
)],
path=r.get("path", ""),
llm_model=r.get("llm_model", ""),
))

elif args.capability:
results = search_by_capability(args.capability, limit=args.limit)
for r in results:
if r["type"] == "agent":
recommendations.append(AgentRecommendation(
component_id=r["id"],
name=r["name"],
component_type=r["type"],
description=r.get("description", ""),
confidence=0.8,
matched_capabilities=[CapabilityMatch(
capability=args.capability,
capability_type=r.get("capability_type", "unknown"),
confidence=0.8,
source="capability_filter"
)],
path=r.get("path", ""),
llm_model=r.get("llm_model", ""),
))

elif args.query:
recommendations = find_agents_for_task(args.query, limit=args.limit)

else:
parser.print_help()
return

print_recommendations(recommendations, json_output=args.json)

if name == "main": main()