Skip to main content

scripts-component-discover

#!/usr/bin/env python3 """​

title: "Database path" component_type: script version: "1.0.0" audience: contributor status: stable summary: "CODITECT Component Discovery - Self-Awareness Query Interface" keywords: ['component', 'database', 'discover', 'review', 'security'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "component-discover.py" language: python executable: true usage: "python3 scripts/component-discover.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false​

CODITECT Component Discovery - Self-Awareness Query Interface

Enables natural language discovery of CODITECT components using the capability index database (component-index.db).

Usage: python3 scripts/component-discover.py "security code review" python3 scripts/component-discover.py --type agent "orchestration" python3 scripts/component-discover.py --compare council-orchestrator code-reviewer python3 scripts/component-discover.py --relationships council-orchestrator python3 scripts/component-discover.py --stats

Author: CODITECT Core Team Version: 1.0.0 """

import argparse import json import os import sqlite3 import sys from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional, Tuple

Database path

SCRIPT_DIR = Path(file).parent PROJECT_ROOT = SCRIPT_DIR.parent

ADR-114 & ADR-118: Use centralized path discovery

sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_platform_db_path, PLATFORM_DB DB_PATH = PLATFORM_DB # Component data goes to platform.db (Tier 1) except ImportError: # Fallback for backward compatibility _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "platform.db" else: DB_PATH = PROJECT_ROOT / "context-storage" / "platform.db"

@dataclass class Component: """Component search result.""" id: str name: str type: str description: str confidence: float maturity: str category: str complexity: str

@dataclass class Relationship: """Component relationship.""" target_id: str target_name: str target_type: str relationship_type: str notes: str

def get_db_connection() -> sqlite3.Connection: """Get database connection.""" if not DB_PATH.exists(): print(f"Error: Database not found at {DB_PATH}") print("Run: python3 scripts/component-indexer.py") sys.exit(1)

conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn

def search_components(query: str, component_type: Optional[str] = None, limit: int = 10) -> List[Component]: """Search components using FTS5.""" conn = get_db_connection() cursor = conn.cursor()

# Build FTS query
fts_query = query.replace('"', '""')

sql = """
SELECT
c.id, c.name, c.type, c.description,
c.confidence, c.maturity, c.category, c.complexity,
bm25(component_search) as score
FROM component_search
JOIN components c ON component_search.id = c.id
WHERE component_search MATCH ?
"""

params = [fts_query]

if component_type:
sql += " AND c.type = ?"
params.append(component_type)

sql += " ORDER BY score LIMIT ?"
params.append(limit)

try:
cursor.execute(sql, params)
rows = cursor.fetchall()
except sqlite3.OperationalError as e:
# Fallback to LIKE search if FTS fails
sql = """
SELECT
c.id, c.name, c.type, c.description,
c.confidence, c.maturity, c.category, c.complexity,
0 as score
FROM components c
WHERE (c.name LIKE ? OR c.description LIKE ? OR c.id LIKE ?)
"""
like_query = f"%{query}%"
params = [like_query, like_query, like_query]

if component_type:
sql += " AND c.type = ?"
params.append(component_type)

sql += " ORDER BY c.confidence DESC LIMIT ?"
params.append(limit)

cursor.execute(sql, params)
rows = cursor.fetchall()

conn.close()

return [Component(
id=row['id'],
name=row['name'],
type=row['type'],
description=row['description'] or "",
confidence=row['confidence'] or 0.5,
maturity=row['maturity'] or "production",
category=row['category'] or "",
complexity=row['complexity'] or "medium"
) for row in rows]

def get_component(component_id: str) -> Optional[Component]: """Get a single component by ID.""" conn = get_db_connection() cursor = conn.cursor()

cursor.execute("""
SELECT id, name, type, description, confidence, maturity, category, complexity
FROM components WHERE id = ?
""", (component_id,))

row = cursor.fetchone()
conn.close()

if row:
return Component(
id=row['id'],
name=row['name'],
type=row['type'],
description=row['description'] or "",
confidence=row['confidence'] or 0.5,
maturity=row['maturity'] or "production",
category=row['category'] or "",
complexity=row['complexity'] or "medium"
)
return None

def get_triggers(component_id: str) -> Dict[str, List[str]]: """Get triggers for a component.""" conn = get_db_connection() cursor = conn.cursor()

cursor.execute("""
SELECT trigger_type, description
FROM triggers WHERE component_id = ?
""", (component_id,))

triggers = {"use_when": [], "avoid_when": [], "keyword": []}
for row in cursor.fetchall():
trigger_type = row['trigger_type']
if trigger_type in triggers:
triggers[trigger_type].append(row['description'])

conn.close()
return triggers

def get_capabilities(component_id: str) -> Dict[str, List[str]]: """Get capabilities for a component.""" conn = get_db_connection() cursor = conn.cursor()

cursor.execute("""
SELECT capability_type, capability
FROM capabilities WHERE component_id = ?
""", (component_id,))

caps = {"primary": [], "tag": [], "domain": [], "action": []}
for row in cursor.fetchall():
cap_type = row['capability_type']
if cap_type in caps:
caps[cap_type].append(row['capability'])

conn.close()
return caps

def get_relationships(component_id: str) -> Dict[str, List[Relationship]]: """Get relationships for a component.""" conn = get_db_connection() cursor = conn.cursor()

# Outgoing relationships
cursor.execute("""
SELECT r.target_id, c.name, c.type, r.relationship_type, r.notes
FROM relationships r
LEFT JOIN components c ON r.target_id = c.id
WHERE r.source_id = ?
""", (component_id,))

rels = {"invokes": [], "alternative": [], "complement": []}
for row in cursor.fetchall():
rel_type = row['relationship_type']
if rel_type not in rels:
rels[rel_type] = []
rels[rel_type].append(Relationship(
target_id=row['target_id'],
target_name=row['name'] or row['target_id'],
target_type=row['type'] or "unknown",
relationship_type=rel_type,
notes=row['notes'] or ""
))

# Incoming relationships (invoked_by)
cursor.execute("""
SELECT r.source_id, c.name, c.type, r.relationship_type, r.notes
FROM relationships r
LEFT JOIN components c ON r.source_id = c.id
WHERE r.target_id = ? AND r.relationship_type = 'invokes'
""", (component_id,))

rels["invoked_by"] = []
for row in cursor.fetchall():
rels["invoked_by"].append(Relationship(
target_id=row['source_id'],
target_name=row['name'] or row['source_id'],
target_type=row['type'] or "unknown",
relationship_type="invoked_by",
notes=row['notes'] or ""
))

conn.close()
return rels

def get_stats() -> Dict: """Get database statistics.""" conn = get_db_connection() cursor = conn.cursor()

stats = {}

# Component counts by type
cursor.execute("""
SELECT type, COUNT(*) as count FROM components GROUP BY type ORDER BY count DESC
""")
stats["by_type"] = {row['type']: row['count'] for row in cursor.fetchall()}
stats["total_components"] = sum(stats["by_type"].values())

# Capability count
cursor.execute("SELECT COUNT(*) as count FROM capabilities")
stats["total_capabilities"] = cursor.fetchone()['count']

# Trigger count
cursor.execute("SELECT COUNT(*) as count FROM triggers")
stats["total_triggers"] = cursor.fetchone()['count']

# Relationship count
cursor.execute("SELECT COUNT(*) as count FROM relationships")
stats["total_relationships"] = cursor.fetchone()['count']

# Top domains
cursor.execute("""
SELECT capability, COUNT(*) as count
FROM capabilities
WHERE capability_type = 'domain'
GROUP BY capability
ORDER BY count DESC
LIMIT 5
""")
stats["top_domains"] = [(row['capability'], row['count']) for row in cursor.fetchall()]

# Database size
stats["db_size_mb"] = round(os.path.getsize(DB_PATH) / (1024 * 1024), 2)

conn.close()
return stats

def print_search_results(components: List[Component], query: str): """Print search results.""" if not components: print(f"\nNo components found for "{query}"") return

print(f"\nFound {len(components)} components for \"{query}\":\n")

for i, comp in enumerate(components, 1):
print(f"{i}. {comp.id} ({comp.type}) - confidence: {comp.confidence:.2f}")
if comp.description:
desc = comp.description[:80] + "..." if len(comp.description) > 80 else comp.description
print(f" {desc}")

triggers = get_triggers(comp.id)
if triggers["use_when"]:
print(f" Use when: {', '.join(triggers['use_when'][:2])}")
if triggers["avoid_when"]:
print(f" Avoid when: {', '.join(triggers['avoid_when'][:2])}")
print()

def print_component_details(component_id: str, verbose: bool = False): """Print detailed component info.""" comp = get_component(component_id) if not comp: print(f"\nComponent not found: {component_id}") return

print(f"\n{'='*60}")
print(f"{comp.name} ({comp.type})")
print(f"{'='*60}")
print(f"ID: {comp.id}")
print(f"Type: {comp.type}")
print(f"Category: {comp.category}")
print(f"Complexity: {comp.complexity}")
print(f"Maturity: {comp.maturity}")
print(f"Confidence: {comp.confidence:.2f}")

if comp.description:
print(f"\nDescription:\n {comp.description}")

# Triggers
triggers = get_triggers(comp.id)
if triggers["use_when"]:
print(f"\nUse When:")
for t in triggers["use_when"]:
print(f" - {t}")
if triggers["avoid_when"]:
print(f"\nAvoid When:")
for t in triggers["avoid_when"]:
print(f" - {t}")

if verbose:
# Capabilities
caps = get_capabilities(comp.id)
if caps["primary"]:
print(f"\nPrimary Capabilities:")
for c in caps["primary"]:
print(f" - {c}")
if caps["tag"]:
print(f"\nTags: {', '.join(caps['tag'])}")
if caps["domain"]:
print(f"Domains: {', '.join(caps['domain'])}")
if caps["action"]:
print(f"Actions: {', '.join(caps['action'])}")

# Relationships
rels = get_relationships(comp.id)
if rels["invokes"]:
print(f"\nInvokes ({len(rels['invokes'])}):")
for r in rels["invokes"][:5]:
print(f" - {r.target_id} ({r.target_type}){' - ' + r.notes if r.notes else ''}")
if len(rels["invokes"]) > 5:
print(f" ... and {len(rels['invokes']) - 5} more")

if rels["invoked_by"]:
print(f"\nInvoked By ({len(rels['invoked_by'])}):")
for r in rels["invoked_by"][:5]:
print(f" - {r.target_id} ({r.target_type})")

if rels["alternative"]:
print(f"\nAlternatives:")
for r in rels["alternative"]:
print(f" - {r.target_id}{' - ' + r.notes if r.notes else ''}")

print()

def print_comparison(id_a: str, id_b: str): """Print side-by-side comparison.""" comp_a = get_component(id_a) comp_b = get_component(id_b)

if not comp_a:
print(f"\nComponent not found: {id_a}")
return
if not comp_b:
print(f"\nComponent not found: {id_b}")
return

triggers_a = get_triggers(id_a)
triggers_b = get_triggers(id_b)
rels_a = get_relationships(id_a)
rels_b = get_relationships(id_b)

print(f"\n{'='*60}")
print(f"Comparison: {id_a} vs {id_b}")
print(f"{'='*60}")
print()
print(f"| {'Aspect':<15} | {id_a:<20} | {id_b:<20} |")
print(f"|{'-'*17}|{'-'*22}|{'-'*22}|")
print(f"| {'Type':<15} | {comp_a.type:<20} | {comp_b.type:<20} |")
print(f"| {'Complexity':<15} | {comp_a.complexity:<20} | {comp_b.complexity:<20} |")
print(f"| {'Confidence':<15} | {comp_a.confidence:<20.2f} | {comp_b.confidence:<20.2f} |")
print(f"| {'Maturity':<15} | {comp_a.maturity:<20} | {comp_b.maturity:<20} |")

use_a = ", ".join(triggers_a["use_when"][:2]) if triggers_a["use_when"] else "-"
use_b = ", ".join(triggers_b["use_when"][:2]) if triggers_b["use_when"] else "-"
print(f"| {'Use When':<15} | {use_a[:20]:<20} | {use_b[:20]:<20} |")

avoid_a = ", ".join(triggers_a["avoid_when"][:1]) if triggers_a["avoid_when"] else "-"
avoid_b = ", ".join(triggers_b["avoid_when"][:1]) if triggers_b["avoid_when"] else "-"
print(f"| {'Avoid When':<15} | {avoid_a[:20]:<20} | {avoid_b[:20]:<20} |")

invokes_a = str(len(rels_a.get("invokes", [])))
invokes_b = str(len(rels_b.get("invokes", [])))
print(f"| {'Invokes':<15} | {invokes_a + ' components':<20} | {invokes_b + ' components':<20} |")

print()
print("Recommendation:")
if comp_a.confidence > comp_b.confidence:
print(f" - Use {id_a} for: higher confidence scenarios")
print(f" - Use {id_b} for: {', '.join(triggers_b['use_when'][:2]) if triggers_b['use_when'] else 'simpler cases'}")
else:
print(f" - Use {id_b} for: higher confidence scenarios")
print(f" - Use {id_a} for: {', '.join(triggers_a['use_when'][:2]) if triggers_a['use_when'] else 'simpler cases'}")
print()

def print_relationships(component_id: str): """Print all relationships for a component.""" comp = get_component(component_id) if not comp: print(f"\nComponent not found: {component_id}") return

rels = get_relationships(component_id)

print(f"\n{'='*60}")
print(f"Relationships for {component_id}")
print(f"{'='*60}")

if rels["invokes"]:
print(f"\nInvokes ({len(rels['invokes'])}):")
for r in rels["invokes"]:
print(f" - {r.target_id} ({r.target_type}){' - ' + r.notes if r.notes else ''}")

if rels["invoked_by"]:
print(f"\nInvoked By ({len(rels['invoked_by'])}):")
for r in rels["invoked_by"]:
print(f" - {r.target_id} ({r.target_type})")

if rels["alternative"]:
print(f"\nAlternatives ({len(rels['alternative'])}):")
for r in rels["alternative"]:
print(f" - {r.target_id}{' - ' + r.notes if r.notes else ''}")

if rels["complement"]:
print(f"\nComplements ({len(rels['complement'])}):")
for r in rels["complement"]:
print(f" - {r.target_id}{' - ' + r.notes if r.notes else ''}")

total = sum(len(v) for v in rels.values())
if total == 0:
print("\nNo relationships found.")
print()

def print_stats(): """Print database statistics.""" stats = get_stats()

print("\n" + "="*50)
print("CODITECT Component Index Statistics")
print("="*50)

print("\nComponents by Type:")
for comp_type, count in stats["by_type"].items():
print(f" {comp_type + ':':<12} {count:>4}")
print(f" {'-'*16}")
print(f" {'Total:':<12} {stats['total_components']:>4}")

avg_caps = stats['total_capabilities'] / stats['total_components'] if stats['total_components'] else 0
avg_triggers = stats['total_triggers'] / stats['total_components'] if stats['total_components'] else 0
avg_rels = stats['total_relationships'] / stats['total_components'] if stats['total_components'] else 0

print(f"\nCapabilities: {stats['total_capabilities']:,} (avg {avg_caps:.0f} per component)")
print(f"Triggers: {stats['total_triggers']:,} (avg {avg_triggers:.0f} per component)")
print(f"Relationships:{stats['total_relationships']:,} (avg {avg_rels:.0f} per component)")

if stats["top_domains"]:
print("\nTop Domains:")
for i, (domain, count) in enumerate(stats["top_domains"], 1):
print(f" {i}. {domain} ({count})")

print(f"\nDatabase: {DB_PATH}")
print(f"Size: {stats['db_size_mb']} MB")
print()

def list_components(component_type: str, limit: int = 20): """List components of a specific type.""" conn = get_db_connection() cursor = conn.cursor()

cursor.execute("""
SELECT id, name, description, confidence
FROM components
WHERE type = ?
ORDER BY confidence DESC, name
LIMIT ?
""", (component_type, limit))

rows = cursor.fetchall()
conn.close()

if not rows:
print(f"\nNo components found of type: {component_type}")
return

print(f"\n{component_type.title()}s ({len(rows)}):\n")
for row in rows:
desc = row['description'][:50] + "..." if row['description'] and len(row['description']) > 50 else (row['description'] or "")
print(f" {row['id']:<35} {row['confidence']:.2f} {desc}")
print()

def main(): parser = argparse.ArgumentParser( description="CODITECT Component Discovery - Self-Awareness Query Interface" )

parser.add_argument("query", nargs="*", help="Natural language search query")
parser.add_argument("--type", "-t", help="Filter by component type")
parser.add_argument("--compare", nargs=2, metavar=("A", "B"), help="Compare two components")
parser.add_argument("--relationships", "-r", metavar="ID", help="Show relationships for component")
parser.add_argument("--show", "-s", metavar="ID", help="Show component details")
parser.add_argument("--list", "-l", metavar="TYPE", help="List components of type")
parser.add_argument("--stats", action="store_true", help="Show database statistics")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--limit", type=int, default=10, help="Limit results")
parser.add_argument("--json", action="store_true", help="Output as JSON")

args = parser.parse_args()

# Handle different modes
if args.stats:
if args.json:
print(json.dumps(get_stats(), indent=2))
else:
print_stats()
elif args.compare:
print_comparison(args.compare[0], args.compare[1])
elif args.relationships:
print_relationships(args.relationships)
elif args.show:
print_component_details(args.show, args.verbose)
elif args.list:
list_components(args.list, args.limit)
elif args.query:
query = " ".join(args.query)
components = search_components(query, args.type, args.limit)
if args.json:
print(json.dumps([{
"id": c.id,
"name": c.name,
"type": c.type,
"description": c.description,
"confidence": c.confidence
} for c in components], indent=2))
else:
print_search_results(components, query)
else:
parser.print_help()

if name == "main": main()