Skip to main content

scripts-component-indexer

#!/usr/bin/env python3 """

title: "=============================================================================" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Component Indexer for CODITECT Self-Awareness" keywords: ['api', 'backend', 'component', 'database', 'docker'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "component-indexer.py" language: python executable: true usage: "python3 scripts/component-indexer.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Component Indexer for CODITECT Self-Awareness

Extracts capabilities from all component files and populates a searchable SQLite database with FTS5 full-text search. Supports A2A Protocol metadata enrichment from JSON files.

Usage: python3 scripts/component-indexer.py # Full index rebuild python3 scripts/component-indexer.py --incremental # Only new/changed python3 scripts/component-indexer.py --stats # Show statistics python3 scripts/component-indexer.py --search "query" # Test search

A2A Protocol Search: python3 scripts/component-indexer.py --model sonnet # Filter by LLM model python3 scripts/component-indexer.py --tools "Read,Write" # Filter by tools python3 scripts/component-indexer.py --type agent # Filter by type python3 scripts/component-indexer.py --orchestrators # List orchestrators python3 scripts/component-indexer.py --capability "security" # By capability python3 scripts/component-indexer.py -q "review" --model sonnet # Combined

A2A JSON Sources: config/agents/.json - Internal CODITECT format (llm_binding, tools) config/agent-cards/.json - A2A Protocol format ($schema, composability) config/commands/.json - Command metadata config/skills/.json - Skill metadata

Output: context-storage/platform.db (ADR-118 TIER 1 - regenerable component index) """

import argparse import hashlib import json import re import sqlite3 import sys from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Set, Tuple

import yaml

=============================================================================

Configuration

=============================================================================

SCRIPT_DIR = Path(file).parent ROOT_DIR = SCRIPT_DIR.parent

ADR-118: Component index goes to platform.db (TIER 1 - regenerable)

sys.path.insert(0, str(SCRIPT_DIR / "core")) try: from paths import get_context_storage_dir, CONTEXT_STORAGE DB_PATH = CONTEXT_STORAGE / "platform.db" except ImportError: # Fallback for ADR-114 user data location _user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _user_data.exists(): DB_PATH = _user_data / "platform.db" else: DB_PATH = ROOT_DIR / "context-storage" / "platform.db"

COMPONENT_PATHS = { "agent": ROOT_DIR / "agents", "command": ROOT_DIR / "commands", "skill": ROOT_DIR / "skills", "script": ROOT_DIR / "scripts", "hook": ROOT_DIR / "hooks", # Workflows can be in multiple locations - docs/workflows AND workflows/ "workflow": [ROOT_DIR / "docs" / "workflows", ROOT_DIR / "workflows"], # Documents: all markdown files (guides, ADRs, standards, references, analysis, etc.) # Excludes: submodules/, external/, .venv/, node_modules/ (handled by EXCLUDE_DIRS) "document": [ ROOT_DIR / "docs", ROOT_DIR / "docs-contributor", ROOT_DIR / "docs-customer", ROOT_DIR / "docs-generated", ROOT_DIR / "internal", ROOT_DIR / "coditect-core-standards", ROOT_DIR / "prompts", ROOT_DIR / "templates", # Additional directories with documentation ROOT_DIR / "analyze-new-artifacts", ROOT_DIR / "tools", ROOT_DIR / "config", ROOT_DIR / "lib", ROOT_DIR / "containers", ROOT_DIR / "docker", ROOT_DIR / "kubernetes", ROOT_DIR / "distribution", ROOT_DIR / "memory-context", ROOT_DIR / "reports", ROOT_DIR / "licensing", ROOT_DIR / "data", ROOT_DIR / "codanna", ], }

Additional workflow JSON paths (n8n workflows)

WORKFLOW_JSON_PATHS = [ ROOT_DIR / "workflows", ]

Document subcategory patterns (for smart categorization)

DOCUMENT_SUBCATEGORY_PATTERNS = { "adr": ["/adrs/", "/architecture/adrs/", "ADR-"], "standard": ["/coditect-core-standards/", "/standards/", "STANDARD"], "guide": ["/guides/", "/guide/", "GUIDE", "-GUIDE"], "reference": ["/reference/", "REFERENCE", "-REF"], "plan": ["/plans/", "/project/plans/", "PLAN", "-PLAN"], "research": ["/research/", "RESEARCH"], "template": ["/templates/", "TEMPLATE"], "prompt": ["/prompts/", "PROMPT"], "tutorial": ["/tutorials/", "TUTORIAL"], "api": ["/api/", "API-DOC", "-API"], "architecture": ["/architecture/", "ARCHITECTURE"], "security": ["/security/", "SECURITY"], "workflow-doc": ["/workflows/", "WORKFLOW"], "session-log": ["/session-logs/", "SESSION-LOG"], "changelog": ["CHANGELOG", "HISTORY"], "readme": ["README"], }

A2A JSON metadata paths (enriches .md components with JSON metadata)

A2A_JSON_PATHS = { "agent": ROOT_DIR / "config" / "agents", "command": ROOT_DIR / "config" / "commands", "skill": ROOT_DIR / "config" / "skills", "script": ROOT_DIR / "config" / "scripts", "agent-card": ROOT_DIR / "config" / "agent-cards", }

Keywords that indicate capabilities

ACTION_KEYWORDS = { "review": ["review", "audit", "check", "validate", "verify", "assess"], "create": ["create", "generate", "build", "write", "produce", "make"], "analyze": ["analyze", "examine", "investigate", "research", "study"], "deploy": ["deploy", "release", "ship", "publish", "launch"], "test": ["test", "verify", "validate", "check", "assert"], "document": ["document", "describe", "explain", "annotate"], "optimize": ["optimize", "improve", "enhance", "tune", "speed"], "secure": ["secure", "protect", "harden", "encrypt", "authenticate"], "orchestrate": ["orchestrate", "coordinate", "manage", "schedule"], "transform": ["transform", "convert", "migrate", "translate"], }

DOMAIN_KEYWORDS = { "security": ["security", "auth", "encryption", "vulnerability", "owasp"], "compliance": ["compliance", "hipaa", "gdpr", "soc2", "fda", "audit"], "performance": ["performance", "speed", "latency", "throughput", "memory"], "testing": ["test", "coverage", "unit", "integration", "e2e"], "documentation": ["document", "readme", "guide", "reference", "api-doc"], "devops": ["deploy", "ci", "cd", "docker", "kubernetes", "infrastructure"], "database": ["database", "sql", "query", "schema", "migration"], "frontend": ["frontend", "react", "vue", "css", "ui", "component"], "backend": ["backend", "api", "server", "endpoint", "handler"], "ai": ["ai", "llm", "model", "prompt", "agent", "ml"], }

=============================================================================

Data Classes

=============================================================================

@dataclass class ComponentRecord: """Represents a parsed component with extracted metadata.""" id: str type: str name: str version: str status: str path: str category: str subcategory: str description: str

# Extracted capabilities
capabilities_primary: List[str] = field(default_factory=list)
capabilities_tags: List[str] = field(default_factory=list)
capabilities_domains: List[str] = field(default_factory=list)
capabilities_actions: List[str] = field(default_factory=list)

# Triggers
triggers_use_when: List[str] = field(default_factory=list)
triggers_avoid_when: List[str] = field(default_factory=list)
triggers_keywords: List[str] = field(default_factory=list)
complexity: str = "medium"

# Relationships
invokes: List[str] = field(default_factory=list)
invoked_by: List[str] = field(default_factory=list)
alternatives: List[str] = field(default_factory=list)
complements: List[str] = field(default_factory=list)

# Quality
maturity: str = "production"
confidence: float = 0.5
documentation_quality: str = "partial"

# Metadata
content_hash: str = ""
indexed_at: str = ""

# A2A Protocol Fields
llm_provider: str = "" # anthropic-claude, openai, etc.
llm_model: str = "" # sonnet, opus, gpt-4, etc.
llm_temperature: float = 0.7
llm_max_tokens: int = 4096
tools_list: List[str] = field(default_factory=list) # Available tools
a2a_schema: str = "" # A2A schema version if applicable
token_budget_recommended: int = 0
token_budget_maximum: int = 0
invocation_method: str = "" # Task, direct, etc.
can_orchestrate: List[str] = field(default_factory=list)
can_be_orchestrated_by: List[str] = field(default_factory=list)
parallel_safe: bool = True
vendor_name: str = ""
vendor_url: str = ""

# Component Origin (ADR-180)
component_origin: str = "system" # system, product, user

# Version Observability (H.24)
content_updated: str = "" # Date from frontmatter 'updated:' field
last_reviewed: str = "" # Date from frontmatter 'last_reviewed:' field

# Document Taxonomy (J.20)
raw_metadata: Dict = field(default_factory=dict) # Full frontmatter for document_frontmatter table
project_id: str = "" # Project scope (e.g., "PILOT", "BIO-QMS")

def to_search_text(self) -> str:
"""Generate concatenated text for FTS indexing."""
parts = [
self.id,
self.name,
self.type,
self.description,
" ".join(self.capabilities_primary),
" ".join(self.capabilities_tags),
" ".join(self.capabilities_domains),
" ".join(self.capabilities_actions),
" ".join(self.triggers_use_when),
" ".join(self.triggers_keywords),
]
return " ".join(parts)

=============================================================================

Database Schema

=============================================================================

Schema split into tables and indexes for safe migration

SCHEMA_TABLES_SQL = """ -- Core component table (with A2A Protocol support) CREATE TABLE IF NOT EXISTS components ( id TEXT PRIMARY KEY, type TEXT NOT NULL, name TEXT NOT NULL, version TEXT, status TEXT DEFAULT 'operational', path TEXT NOT NULL, category TEXT, subcategory TEXT, description TEXT, complexity TEXT DEFAULT 'medium', maturity TEXT DEFAULT 'production', confidence REAL DEFAULT 0.5, documentation_quality TEXT DEFAULT 'partial', content_hash TEXT, indexed_at TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, -- A2A Protocol Fields llm_provider TEXT, -- anthropic-claude, openai, etc. llm_model TEXT, -- sonnet, opus, gpt-4, etc. llm_temperature REAL DEFAULT 0.7, llm_max_tokens INTEGER DEFAULT 4096, tools_list TEXT, -- JSON array of tools a2a_schema TEXT, -- A2A schema version token_budget_recommended INTEGER DEFAULT 0, token_budget_maximum INTEGER DEFAULT 0, invocation_method TEXT, -- Task, direct, etc. parallel_safe INTEGER DEFAULT 1, -- Boolean: 1=true, 0=false vendor_name TEXT, vendor_url TEXT, -- Component Origin (ADR-180 Modular Product Installation) component_origin TEXT DEFAULT 'system', -- system, product, user -- Version Observability (H.24) content_updated TEXT, -- Date from frontmatter 'updated:' field last_reviewed TEXT -- Date from frontmatter 'last_reviewed:' field );

-- Capabilities (many-to-many) CREATE TABLE IF NOT EXISTS capabilities ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT REFERENCES components(id) ON DELETE CASCADE, capability TEXT NOT NULL, capability_type TEXT NOT NULL -- primary, tag, domain, action );

-- Triggers (when to use) CREATE TABLE IF NOT EXISTS triggers ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT REFERENCES components(id) ON DELETE CASCADE, trigger_type TEXT NOT NULL, -- use_when, avoid_when, keyword description TEXT NOT NULL );

-- Relationships CREATE TABLE IF NOT EXISTS component_relationships ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_id TEXT REFERENCES components(id) ON DELETE CASCADE, target_id TEXT, relationship_type TEXT NOT NULL, -- invokes, invoked_by, alternative, complement notes TEXT );

-- Usage statistics (populated from session history) CREATE TABLE IF NOT EXISTS component_usage_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT REFERENCES components(id) ON DELETE CASCADE, invocation_count INTEGER DEFAULT 0, success_count INTEGER DEFAULT 0, failure_count INTEGER DEFAULT 0, avg_duration_ms INTEGER, last_used TEXT, updated_at TEXT DEFAULT CURRENT_TIMESTAMP );

-- Component composability (A2A Protocol - who can orchestrate whom) CREATE TABLE IF NOT EXISTS component_composability ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT REFERENCES components(id) ON DELETE CASCADE, target_id TEXT, -- The component this relates to relationship TEXT NOT NULL, -- can_orchestrate, can_be_orchestrated_by UNIQUE(component_id, target_id, relationship) );

-- Full-text search index CREATE VIRTUAL TABLE IF NOT EXISTS component_search USING fts5( id, name, type, description, capabilities, triggers, content='components', content_rowid='rowid' );

-- Document frontmatter storage: all key-value pairs per document (J.20) CREATE TABLE IF NOT EXISTS document_frontmatter ( id INTEGER PRIMARY KEY AUTOINCREMENT, component_id TEXT NOT NULL REFERENCES components(id) ON DELETE CASCADE, key TEXT NOT NULL, value TEXT, UNIQUE(component_id, key) );

-- Document taxonomy configuration per project (J.20) CREATE TABLE IF NOT EXISTS document_taxonomy ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id TEXT NOT NULL DEFAULT 'default', category_slug TEXT NOT NULL, display_name TEXT NOT NULL, description TEXT, icon TEXT, sort_order INTEGER DEFAULT 100, visible INTEGER DEFAULT 1, parent_slug TEXT, UNIQUE(project_id, category_slug) ); """

Indexes separated so they run AFTER migration adds columns

SCHEMA_INDEXES_SQL = """ -- Indexes for fast lookups (run after migration ensures columns exist) CREATE INDEX IF NOT EXISTS idx_components_type ON components(type); CREATE INDEX IF NOT EXISTS idx_components_category ON components(category); CREATE INDEX IF NOT EXISTS idx_components_status ON components(status); CREATE INDEX IF NOT EXISTS idx_components_llm_model ON components(llm_model); CREATE INDEX IF NOT EXISTS idx_components_llm_provider ON components(llm_provider); CREATE INDEX IF NOT EXISTS idx_capabilities_component ON capabilities(component_id); CREATE INDEX IF NOT EXISTS idx_capabilities_type ON capabilities(capability_type); CREATE INDEX IF NOT EXISTS idx_triggers_component ON triggers(component_id); CREATE INDEX IF NOT EXISTS idx_comp_rel_source ON component_relationships(source_id); CREATE INDEX IF NOT EXISTS idx_comp_rel_target ON component_relationships(target_id); CREATE INDEX IF NOT EXISTS idx_composability_component ON component_composability(component_id); CREATE INDEX IF NOT EXISTS idx_composability_target ON component_composability(target_id); CREATE INDEX IF NOT EXISTS idx_doc_frontmatter_component ON document_frontmatter(component_id); CREATE INDEX IF NOT EXISTS idx_doc_frontmatter_key ON document_frontmatter(key); CREATE INDEX IF NOT EXISTS idx_doc_taxonomy_project ON document_taxonomy(project_id); CREATE INDEX IF NOT EXISTS idx_components_project ON components(project_id); """

=============================================================================

Document Taxonomy (J.20)

=============================================================================

Module-level taxonomy cache

_taxonomy_config: Optional[Dict] = None

def load_taxonomy_config(config_path: Optional[Path] = None) -> Dict: """Load document taxonomy configuration from JSON.

Returns dict with keys: categories, category_resolution_map, type_to_category_map.
Falls back to empty config if file not found.
"""
global _taxonomy_config
if _taxonomy_config is not None:
return _taxonomy_config

if config_path is None:
config_path = ROOT_DIR / "config" / "document-taxonomy.json"

if config_path.exists():
try:
with open(config_path, 'r', encoding='utf-8') as f:
_taxonomy_config = json.load(f)
return _taxonomy_config
except (json.JSONDecodeError, IOError) as e:
print(f" Warning: Failed to load taxonomy config: {e}")

# Fallback: empty taxonomy
_taxonomy_config = {
"categories": [],
"category_resolution_map": {},
"type_to_category_map": {},
}
return _taxonomy_config

def resolve_document_category( metadata: Dict, file_path: Path, root_dir: Path, ) -> Tuple[str, str]: """Resolve document category using the taxonomy priority chain.

Priority:
1. frontmatter.category (explicit)
2. category_resolution_map[directory_name] (directory-based)
3. type_to_category_map[frontmatter.type or frontmatter.component_type]
4. "reference" (fallback)

Returns: (category_slug, subcategory)
"""
taxonomy = load_taxonomy_config()
dir_map = taxonomy.get("category_resolution_map", {})
type_map = taxonomy.get("type_to_category_map", {})

# Strip _comment keys from maps
dir_map = {k: v for k, v in dir_map.items() if k != "_comment"}
type_map = {k: v for k, v in type_map.items() if k != "_comment"}

# Priority 1: Explicit frontmatter category
explicit_cat = metadata.get("category", "")
if explicit_cat and explicit_cat in {c["slug"] for c in taxonomy.get("categories", [])}:
subcategory = metadata.get("subcategory", "")
return explicit_cat, subcategory

# Priority 2: Directory-based resolution
try:
rel_path = file_path.relative_to(root_dir)
except ValueError:
rel_path = file_path

# Walk path parts from most specific (deepest dir) to least specific
path_parts = list(rel_path.parts)
for part in reversed(path_parts[:-1]): # Exclude filename
part_lower = part.lower()
if part_lower in dir_map:
category = dir_map[part_lower]
# Infer subcategory from the directory name if different from category
subcategory = part_lower if part_lower != category else ""
return category, subcategory

# Priority 3: Type-based resolution
doc_type = str(metadata.get("type", "")).lower()
component_type = str(metadata.get("component_type", "")).lower()

for type_val in [doc_type, component_type]:
if type_val and type_val in type_map:
return type_map[type_val], ""

# Priority 4: Fallback
return "reference", ""

def populate_document_taxonomy(conn: sqlite3.Connection, project_id: str = "default"): """Load taxonomy categories into the document_taxonomy table for a project.""" taxonomy = load_taxonomy_config() cursor = conn.cursor()

# Clear existing taxonomy for this project
cursor.execute("DELETE FROM document_taxonomy WHERE project_id = ?", (project_id,))

for cat in taxonomy.get("categories", []):
cursor.execute("""
INSERT OR REPLACE INTO document_taxonomy
(project_id, category_slug, display_name, description, icon, sort_order, visible, parent_slug)
VALUES (?, ?, ?, ?, ?, ?, ?, NULL)
""", (
project_id,
cat["slug"],
cat["name"],
cat.get("description", ""),
cat.get("icon", ""),
cat.get("sort_order", 100),
1 if cat.get("visible", True) else 0,
))

# Insert subcategories
for subcat in cat.get("subcategories", []):
cursor.execute("""
INSERT OR REPLACE INTO document_taxonomy
(project_id, category_slug, display_name, description, icon, sort_order, visible, parent_slug)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
project_id,
subcat["slug"],
subcat["name"],
"",
"",
subcat.get("sort_order", 100),
1,
cat["slug"],
))

conn.commit()

def insert_document_frontmatter(conn: sqlite3.Connection, component_id: str, metadata: Dict): """Store all frontmatter key-value pairs in the document_frontmatter table.""" cursor = conn.cursor()

# Clear existing frontmatter for this component
cursor.execute("DELETE FROM document_frontmatter WHERE component_id = ?", (component_id,))

for key, value in metadata.items():
if key.startswith("$"): # Skip JSON schema reference
continue

# Serialize complex values to JSON string
if isinstance(value, (dict, list)):
value_str = json.dumps(value, ensure_ascii=False)
elif value is None:
value_str = ""
else:
value_str = str(value)

cursor.execute(
"INSERT OR REPLACE INTO document_frontmatter (component_id, key, value) VALUES (?, ?, ?)",
(component_id, str(key), value_str)
)

=============================================================================

Parsing Functions

=============================================================================

def parse_yaml_frontmatter(content: str) -> Tuple[Dict, str]: """Extract YAML frontmatter from markdown, bash, or Python content.

Supports:
- Markdown: starts with '---', ends with '---'
- Bash/Shell: starts with '# ---', ends with '# ---' (after shebang)
- Python: docstring containing '---' delimited YAML
"""
# Strategy 1: Standard markdown frontmatter (starts with ---)
if content.startswith("---"):
end_match = re.search(r"\n---\n", content[3:])
if end_match:
frontmatter_end = end_match.start() + 3
yaml_content = content[3:frontmatter_end]
body = content[frontmatter_end + 4:]
try:
metadata = yaml.safe_load(yaml_content)
return metadata or {}, body
except yaml.YAMLError:
pass

# Strategy 2: Bash/Shell script with comment-style frontmatter
# Pattern: #!/bin/bash (or similar), then # --- ... # ---
bash_match = re.search(
r'^#![^\n]*\n(?:#[^\n]*\n)*?# ---\n((?:# [^\n]*\n)+?)# ---',
content,
re.MULTILINE
)
if bash_match:
# Extract YAML lines and strip the '# ' prefix
yaml_lines = bash_match.group(1)
yaml_content = '\n'.join(
line[2:] if line.startswith('# ') else line[1:] if line.startswith('#') else line
for line in yaml_lines.split('\n')
)
body = content[bash_match.end():]
try:
metadata = yaml.safe_load(yaml_content)
return metadata or {}, body
except yaml.YAMLError:
pass

# Strategy 3: Python docstring with embedded YAML frontmatter
# Pattern: """...\n---\nyaml content\n---\n..."""
python_match = re.search(
r'^(?:#!/[^\n]*\n)?["\'][\'"]{2}\s*\n---\n(.*?)\n---',
content,
re.MULTILINE | re.DOTALL
)
if python_match:
yaml_content = python_match.group(1)
body = content[python_match.end():]
try:
metadata = yaml.safe_load(yaml_content)
return metadata or {}, body
except yaml.YAMLError:
pass

return {}, content

def extract_actions_from_text(text: str) -> Set[str]: """Extract action verbs from text.""" text_lower = text.lower() found_actions = set()

for action, keywords in ACTION_KEYWORDS.items():
for keyword in keywords:
if keyword in text_lower:
found_actions.add(action)
break

return found_actions

def extract_domains_from_text(text: str) -> Set[str]: """Extract domain categories from text.""" text_lower = text.lower() found_domains = set()

for domain, keywords in DOMAIN_KEYWORDS.items():
for keyword in keywords:
if keyword in text_lower:
found_domains.add(domain)
break

return found_domains

def extract_keywords_from_description(description: str) -> List[str]: """Extract meaningful keywords from description.""" # Remove common words stop_words = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can", "this", "that", "these", "those", "it", "its", "you", "your", "we", "our", "use", "using", "used", "agent", "command", "skill", "tool", "tools" }

# Extract words
words = re.findall(r'\b[a-z]{3,}\b', description.lower())
keywords = [w for w in words if w not in stop_words]

# Deduplicate while preserving order
seen = set()
unique_keywords = []
for kw in keywords:
if kw not in seen:
seen.add(kw)
unique_keywords.append(kw)

return unique_keywords[:20] # Limit to top 20

def extract_relationships(content: str, metadata: Dict) -> Tuple[List[str], List[str], List[str], List[str]]: """Extract relationship information from content and metadata.""" invokes = [] invoked_by = [] alternatives = [] complements = []

# From metadata
if "tools" in metadata:
tools = metadata["tools"]
if isinstance(tools, list):
invokes.extend([t.lower().replace(" ", "-") for t in tools if isinstance(t, str)])

# From auto_trigger_integration
if "auto_trigger_integration" in metadata:
ati = metadata["auto_trigger_integration"]
if isinstance(ati, dict):
if "capability_routing" in ati and isinstance(ati["capability_routing"], dict):
mappings = ati["capability_routing"].get("mappings", {})
for agents in mappings.values():
if isinstance(agents, list):
invokes.extend(agents)

# From content - look for Task(...) patterns
task_matches = re.findall(r'Task\s*\([^)]*subagent_type\s*=\s*["\']([^"\']+)["\']', content)
invokes.extend(task_matches)

# From content - look for "invokes", "calls", "uses" patterns
invoke_patterns = re.findall(r'(?:invokes?|calls?|uses?)\s*[:\-]?\s*`?([a-z\-]+)`?', content.lower())
invokes.extend(invoke_patterns)

# From content - look for alternatives
alt_patterns = re.findall(r'(?:alternative|instead of|rather than)\s*[:\-]?\s*`?([a-z\-]+)`?', content.lower())
alternatives.extend(alt_patterns)

# Deduplicate
invokes = list(set(invokes))
invoked_by = list(set(invoked_by))
alternatives = list(set(alternatives))
complements = list(set(complements))

return invokes, invoked_by, alternatives, complements

def load_a2a_json(component_id: str, component_type: str) -> Optional[Dict]: """Load A2A JSON metadata for a component if it exists.""" # Extract base name from component_id (e.g., "agent/orchestrator" -> "orchestrator") base_name = component_id.split("/")[-1] if "/" in component_id else component_id

# Check both internal format and agent-card format
paths_to_check = []

# Internal format: config/{type}s/{base_name}.json
type_path = A2A_JSON_PATHS.get(component_type)
if type_path and type_path.exists():
paths_to_check.append(type_path / f"{base_name}.json")

# Agent-card format: config/agent-cards/{base_name}.json
agent_card_path = A2A_JSON_PATHS.get("agent-card")
if agent_card_path and agent_card_path.exists():
paths_to_check.append(agent_card_path / f"{base_name}.json")

for json_path in paths_to_check:
if json_path.exists():
try:
with open(json_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
continue

return None

def enrich_with_a2a(record: ComponentRecord, a2a_data: Dict) -> ComponentRecord: """Enrich a ComponentRecord with A2A JSON metadata.""" if not a2a_data: return record

# Check if it's A2A Protocol format (has $schema)
if "$schema" in a2a_data:
return enrich_with_a2a_protocol(record, a2a_data)
else:
return enrich_with_internal_format(record, a2a_data)

def enrich_with_a2a_protocol(record: ComponentRecord, data: Dict) -> ComponentRecord: """Enrich with A2A Protocol format (agent-cards).""" record.a2a_schema = data.get("$schema", "")

# Agent info
agent = data.get("agent", {})
if agent:
if agent.get("version"):
record.version = agent["version"]
if agent.get("description") and not record.description:
record.description = agent["description"]

vendor = agent.get("vendor", {})
record.vendor_name = vendor.get("name", "")
record.vendor_url = vendor.get("url", "")

# Capabilities
caps = data.get("capabilities", {})
if caps:
tools = caps.get("tools", [])
if tools:
record.tools_list = tools
skills = caps.get("skills", [])
if skills:
record.capabilities_tags.extend(skills)
domains = caps.get("domains", [])
if domains:
record.capabilities_domains.extend(domains)

# Interaction
interaction = data.get("interaction", {})
if interaction:
invocation = interaction.get("invocation", {})
record.invocation_method = invocation.get("method", "")

# Constraints
constraints = data.get("constraints", {})
if constraints:
record.llm_model = constraints.get("model_preference", "")
token_budget = constraints.get("token_budget", {})
record.token_budget_recommended = token_budget.get("recommended", 0)
record.token_budget_maximum = token_budget.get("maximum", 0)

# Composability
composability = data.get("composability", {})
if composability:
record.can_orchestrate = composability.get("can_orchestrate", [])
record.can_be_orchestrated_by = composability.get("can_be_orchestrated_by", [])
record.parallel_safe = composability.get("parallel_safe", True)

return record

def enrich_with_internal_format(record: ComponentRecord, data: Dict) -> ComponentRecord: """Enrich with internal CODITECT JSON format.""" # LLM binding llm_binding = data.get("llm_binding", {}) if llm_binding: record.llm_provider = llm_binding.get("provider", "") record.llm_model = llm_binding.get("model", "") record.llm_temperature = llm_binding.get("temperature", 0.7) record.llm_max_tokens = llm_binding.get("max_tokens", 4096)

# Tools
tools = data.get("tools", "")
if tools:
if isinstance(tools, str):
record.tools_list = [t.strip() for t in tools.split(",")]
elif isinstance(tools, list):
record.tools_list = tools

# Description (if not already set)
if data.get("description") and not record.description:
record.description = data["description"]

# Category
if data.get("category") and not record.category:
record.category = data["category"]

# Tags
if data.get("tags"):
tags = data["tags"]
if isinstance(tags, list):
record.capabilities_tags.extend(tags)

# Capabilities
if data.get("capabilities"):
caps = data["capabilities"]
if isinstance(caps, list):
record.capabilities_primary.extend(caps)

# Use cases
if data.get("use_cases"):
use_cases = data["use_cases"]
if isinstance(use_cases, list):
record.triggers_use_when.extend(use_cases)

# Typical invocation
if data.get("typical_invocation"):
record.invocation_method = data["typical_invocation"]

return record

def infer_complexity(content: str, metadata: Dict) -> str: """Infer complexity level from content.""" content_lower = content.lower()

# Check for explicit complexity in metadata
if "complexity" in metadata:
return str(metadata["complexity"])

# Infer from content
if any(kw in content_lower for kw in ["enterprise", "multi-agent", "orchestrat", "pipeline"]):
return "high"
elif any(kw in content_lower for kw in ["simple", "basic", "quick", "single"]):
return "low"
else:
return "medium"

def infer_maturity(path: str, metadata: Dict) -> str: """Infer maturity level.""" if "status" in metadata: status = str(metadata["status"]).lower() if status in ["experimental", "beta", "deprecated"]: return status

if "maturity" in metadata:
return str(metadata["maturity"])

# Default based on path patterns
if "experimental" in path.lower() or "beta" in path.lower():
return "beta"

return "production"

def calculate_documentation_quality(content: str, metadata: Dict) -> str: """Assess documentation quality.""" score = 0

# Has frontmatter
if metadata:
score += 1

# Has description
if metadata.get("description") or len(content) > 500:
score += 1

# Has examples
if "```" in content or "example" in content.lower():
score += 1

# Has usage section
if "## usage" in content.lower() or "### usage" in content.lower():
score += 1

# Length check
if len(content) > 2000:
score += 1

if score >= 4:
return "comprehensive"
elif score >= 3:
return "complete"
elif score >= 2:
return "partial"
else:
return "stub"

def parse_component_file(file_path: Path, component_type: str) -> Optional[ComponentRecord]: """Parse a single component file and extract metadata.""" try: content = file_path.read_text(encoding="utf-8") except Exception as e: print(f" Error reading {file_path}: {e}") return None

# Calculate content hash
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]

# Parse frontmatter
metadata, body = parse_yaml_frontmatter(content)

# Determine component ID (includes type prefix for uniqueness)
if component_type == "skill":
# Skills are in directories: skills/name/SKILL.md
base_name = file_path.parent.name
component_id = f"skill/{base_name}"
elif component_type == "document":
# Documents: use relative path for uniqueness (many files have same name)
rel_path = file_path.relative_to(ROOT_DIR)
# Create ID from path: docs/guides/USER-GUIDE.md -> document/docs/guides/USER-GUIDE
path_parts = list(rel_path.parts[:-1]) # Directory parts
base_name = file_path.stem
if path_parts:
component_id = f"document/{'/'.join(path_parts)}/{base_name}"
else:
component_id = f"document/{base_name}"
else:
# Use type/name format to avoid collisions (e.g., agent/orchestrator vs script/orchestrator)
base_name = file_path.stem
component_id = f"{component_type}/{base_name}"

# Extract basic info (name should be short name, not full ID)
name = metadata.get("name", base_name)
version = metadata.get("version", "1.0.0")
status = metadata.get("status", "operational")
description = metadata.get("description", "") or metadata.get("summary", "")
if isinstance(description, list):
description = " ".join(str(d) for d in description)

# Version Observability (H.24) — extract update and review dates from frontmatter
content_updated = str(metadata.get("updated", "")) if metadata.get("updated") else ""
last_reviewed = str(metadata.get("last_reviewed", "")) if metadata.get("last_reviewed") else ""

# If no description in frontmatter, try multiple extraction strategies
if not description and body:
# Strategy 1: First paragraph after any headers (most common in markdown)
after_header = re.search(r'(?:^#[^\n]+\n+)+([^#\n][^\n]+)', body.strip(), re.MULTILINE)
if after_header:
description = after_header.group(1).strip()[:500]

# Strategy 2: First non-header, non-empty line
if not description:
first_content = re.search(r'^(?!#|\s*$)(.+)$', body.strip(), re.MULTILINE)
if first_content:
description = first_content.group(1).strip()[:500]

# Strategy 3: Extract from title if present (e.g., "# Title - Description")
if not description:
title_desc = re.search(r'^#\s+[^-\n]+\s*-\s*(.+)$', body.strip(), re.MULTILINE)
if title_desc:
description = title_desc.group(1).strip()[:500]

# For scripts, try to extract from docstrings or comments
if not description and component_type == "script":
# Python docstring
docstring = re.search(r'"""(.+?)"""', content, re.DOTALL)
if docstring:
description = docstring.group(1).strip().split('\n')[0][:500]
else:
# Shell script comment header
shell_comment = re.search(r'^#\s*(?!!)(.+)$', content, re.MULTILINE)
if shell_comment:
description = shell_comment.group(1).strip()[:500]

# Extract LLM model/provider from frontmatter (common in CODITECT agents)
llm_model = metadata.get("model", "") or metadata.get("llm_model", "")
llm_provider = metadata.get("provider", "") or metadata.get("llm_provider", "")

# Normalize model names
if llm_model and not llm_provider:
if llm_model.lower() in ("sonnet", "opus", "haiku", "claude"):
llm_provider = "anthropic-claude"
elif llm_model.lower() in ("gpt-4", "gpt-4o", "gpt-3.5"):
llm_provider = "openai"

# Determine category — use taxonomy-based resolution for documents (J.20)
if component_type == "document":
# Determine the root directory for path resolution
# For files outside ROOT_DIR (submodule docs), use file_path's root
try:
_doc_root = ROOT_DIR if file_path.is_relative_to(ROOT_DIR) else file_path.parent
except AttributeError:
# Python < 3.9 fallback
try:
file_path.relative_to(ROOT_DIR)
_doc_root = ROOT_DIR
except ValueError:
_doc_root = file_path.parent

category, subcategory = resolve_document_category(metadata, file_path, _doc_root)

# Additional subcategory refinement from DOCUMENT_SUBCATEGORY_PATTERNS
if not subcategory:
try:
rel_path = str(file_path.relative_to(_doc_root))
except ValueError:
rel_path = str(file_path)
file_name = file_path.name.upper()
path_upper = rel_path.upper()

for subcat, patterns in DOCUMENT_SUBCATEGORY_PATTERNS.items():
for pattern in patterns:
if pattern in path_upper or pattern in file_name:
subcategory = subcat
break
if subcategory:
break

# Default subcategory based on parent directory
if not subcategory:
parent_dir = file_path.parent.name.lower()
if parent_dir in ("docs", "docs-contributor", "docs-customer"):
subcategory = "general"
elif parent_dir == "internal":
subcategory = "internal"
else:
subcategory = parent_dir
else:
# Non-document components: existing logic
category = metadata.get("category", "")
if not category:
rel_path = str(file_path.relative_to(ROOT_DIR))
if "orchestrat" in rel_path.lower() or "orchestrat" in description.lower():
category = "orchestration"
elif "security" in rel_path.lower() or "security" in description.lower():
category = "security"
elif "test" in rel_path.lower() or "test" in description.lower():
category = "testing"
else:
category = component_type
subcategory = metadata.get("subcategory", "")

# Extract capabilities
full_text = f"{name} {description} {body}"
actions = list(extract_actions_from_text(full_text))
domains = list(extract_domains_from_text(full_text))

# Primary capabilities from description
primary_caps = []
if description:
# Extract verb phrases
verb_patterns = re.findall(
r'(?:for|to|that|which)\s+([a-z]+(?:\s+[a-z]+){0,3})',
description.lower()
)
primary_caps = verb_patterns[:5]

# Tags from metadata or inferred
tags = metadata.get("tags", [])
if not tags:
tags = list(domains) + actions[:3]

# Extract keywords
keywords = extract_keywords_from_description(description)

# Extract relationships
invokes, invoked_by, alternatives, complements = extract_relationships(content, metadata)

# Infer complexity and maturity
complexity = infer_complexity(content, metadata)
maturity = infer_maturity(str(file_path), metadata)
doc_quality = calculate_documentation_quality(content, metadata)

# Calculate confidence based on documentation quality
confidence_map = {"stub": 0.3, "partial": 0.5, "complete": 0.7, "comprehensive": 0.9}
confidence = confidence_map.get(doc_quality, 0.5)

record = ComponentRecord(
id=component_id,
type=component_type,
name=name,
version=version,
status=status,
path=str(file_path.relative_to(ROOT_DIR)),
category=category,
subcategory=subcategory,
description=description,
capabilities_primary=primary_caps,
capabilities_tags=tags if isinstance(tags, list) else [tags],
capabilities_domains=domains,
capabilities_actions=actions,
triggers_use_when=[], # Would need more sophisticated extraction
triggers_avoid_when=[],
triggers_keywords=keywords,
complexity=complexity,
invokes=invokes,
invoked_by=invoked_by,
alternatives=alternatives,
complements=complements,
maturity=maturity,
confidence=confidence,
documentation_quality=doc_quality,
content_hash=content_hash,
indexed_at=datetime.now(timezone.utc).isoformat(),
# LLM bindings from frontmatter
llm_model=llm_model,
llm_provider=llm_provider,
# Version Observability (H.24)
content_updated=content_updated,
last_reviewed=last_reviewed,
# Document Taxonomy (J.20)
raw_metadata=metadata if component_type == "document" else {},
)

# Enrich with A2A JSON metadata if available (can override frontmatter)
a2a_data = load_a2a_json(component_id, component_type)
if a2a_data:
record = enrich_with_a2a(record, a2a_data)

return record

=============================================================================

Database Functions

=============================================================================

def init_database(db_path: Path) -> sqlite3.Connection: """Initialize database with schema, handling migration for A2A columns.""" db_path.parent.mkdir(parents=True, exist_ok=True)

conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()

# Step 1: Create tables (IF NOT EXISTS handles existing tables)
conn.executescript(SCHEMA_TABLES_SQL)
conn.commit()

# Step 2: Migrate existing database: add A2A columns if missing
cursor.execute("PRAGMA table_info(components)")
columns = {row[1] for row in cursor.fetchall()}

a2a_columns = [
("llm_provider", "TEXT"),
("llm_model", "TEXT"),
("llm_temperature", "REAL DEFAULT 0.7"),
("llm_max_tokens", "INTEGER DEFAULT 4096"),
("tools_list", "TEXT"),
("a2a_schema", "TEXT"),
("token_budget_recommended", "INTEGER DEFAULT 0"),
("token_budget_maximum", "INTEGER DEFAULT 0"),
("invocation_method", "TEXT"),
("parallel_safe", "INTEGER DEFAULT 1"),
("vendor_name", "TEXT"),
("vendor_url", "TEXT"),
# ADR-180: Component Origin
("component_origin", "TEXT DEFAULT 'system'"),
# H.24: Version Observability
("content_updated", "TEXT"),
("last_reviewed", "TEXT"),
# J.20: Document Taxonomy
("project_id", "TEXT DEFAULT ''"),
]

migrated = False
for col_name, col_type in a2a_columns:
if col_name not in columns:
try:
cursor.execute(f"ALTER TABLE components ADD COLUMN {col_name} {col_type}")
print(f" Migrated: added column {col_name}")
migrated = True
except sqlite3.OperationalError:
pass # Column already exists

if migrated:
conn.commit()

# Step 3: Create indexes (now safe because columns exist)
conn.executescript(SCHEMA_INDEXES_SQL)
conn.commit()

return conn

def insert_component(conn: sqlite3.Connection, record: ComponentRecord): """Insert or update a component record.""" cursor = conn.cursor()

# Serialize tools_list to JSON
tools_json = json.dumps(record.tools_list) if record.tools_list else None

# Insert/replace main record with A2A fields + project_id (J.20)
cursor.execute("""
INSERT OR REPLACE INTO components (
id, type, name, version, status, path, category, subcategory,
description, complexity, maturity, confidence, documentation_quality,
content_hash, indexed_at, updated_at,
llm_provider, llm_model, llm_temperature, llm_max_tokens,
tools_list, a2a_schema, token_budget_recommended, token_budget_maximum,
invocation_method, parallel_safe, vendor_name, vendor_url,
component_origin, content_updated, last_reviewed, project_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record.id, record.type, record.name, record.version, record.status,
record.path, record.category, record.subcategory, record.description,
record.complexity, record.maturity, record.confidence,
record.documentation_quality, record.content_hash, record.indexed_at,
record.llm_provider, record.llm_model, record.llm_temperature, record.llm_max_tokens,
tools_json, record.a2a_schema, record.token_budget_recommended, record.token_budget_maximum,
record.invocation_method, 1 if record.parallel_safe else 0, record.vendor_name, record.vendor_url,
record.component_origin, record.content_updated or None, record.last_reviewed or None,
record.project_id or "",
))

# Clear existing capabilities/triggers/relationships/composability for this component
cursor.execute("DELETE FROM capabilities WHERE component_id = ?", (record.id,))
cursor.execute("DELETE FROM triggers WHERE component_id = ?", (record.id,))
cursor.execute("DELETE FROM component_relationships WHERE source_id = ?", (record.id,))
cursor.execute("DELETE FROM component_composability WHERE component_id = ?", (record.id,))

# Store document frontmatter key-value pairs (J.20)
if record.raw_metadata and record.type == "document":
insert_document_frontmatter(conn, record.id, record.raw_metadata)

# Helper to normalize capability values to strings
def normalize_cap(cap) -> Optional[str]:
if cap is None:
return None
if isinstance(cap, str):
return cap.strip() if cap.strip() else None
if isinstance(cap, dict):
# Extract 'name' or first string value from dict
if 'name' in cap:
return str(cap['name'])
for v in cap.values():
if isinstance(v, str):
return v
return json.dumps(cap)
if isinstance(cap, (list, tuple)):
# Join list elements
return ", ".join(str(x) for x in cap if x)
return str(cap)

# Insert capabilities
for cap in record.capabilities_primary:
cap_str = normalize_cap(cap)
if cap_str:
cursor.execute(
"INSERT INTO capabilities (component_id, capability, capability_type) VALUES (?, ?, ?)",
(record.id, cap_str, "primary")
)
for cap in record.capabilities_tags:
cap_str = normalize_cap(cap)
if cap_str:
cursor.execute(
"INSERT INTO capabilities (component_id, capability, capability_type) VALUES (?, ?, ?)",
(record.id, cap_str, "tag")
)
for cap in record.capabilities_domains:
cap_str = normalize_cap(cap)
if cap_str:
cursor.execute(
"INSERT INTO capabilities (component_id, capability, capability_type) VALUES (?, ?, ?)",
(record.id, cap_str, "domain")
)
for cap in record.capabilities_actions:
cap_str = normalize_cap(cap)
if cap_str:
cursor.execute(
"INSERT INTO capabilities (component_id, capability, capability_type) VALUES (?, ?, ?)",
(record.id, cap_str, "action")
)

# Insert triggers
for kw in record.triggers_keywords:
kw_str = normalize_cap(kw)
if kw_str:
cursor.execute(
"INSERT INTO triggers (component_id, trigger_type, description) VALUES (?, ?, ?)",
(record.id, "keyword", kw_str)
)

# Insert relationships
for target in record.invokes:
target_str = normalize_cap(target)
if target_str:
cursor.execute(
"INSERT INTO component_relationships (source_id, target_id, relationship_type) VALUES (?, ?, ?)",
(record.id, target_str, "invokes")
)
for target in record.alternatives:
target_str = normalize_cap(target)
if target_str:
cursor.execute(
"INSERT INTO component_relationships (source_id, target_id, relationship_type) VALUES (?, ?, ?)",
(record.id, target_str, "alternative")
)
for target in record.complements:
target_str = normalize_cap(target)
if target_str:
cursor.execute(
"INSERT INTO component_relationships (source_id, target_id, relationship_type) VALUES (?, ?, ?)",
(record.id, target_str, "complement")
)

# Insert composability (A2A Protocol)
for target in record.can_orchestrate:
target_str = normalize_cap(target)
if target_str and target_str != "*": # Skip wildcard
cursor.execute(
"INSERT OR IGNORE INTO component_composability (component_id, target_id, relationship) VALUES (?, ?, ?)",
(record.id, target_str, "can_orchestrate")
)
for target in record.can_be_orchestrated_by:
target_str = normalize_cap(target)
if target_str and target_str != "*": # Skip wildcard
cursor.execute(
"INSERT OR IGNORE INTO component_composability (component_id, target_id, relationship) VALUES (?, ?, ?)",
(record.id, target_str, "can_be_orchestrated_by")
)

# Insert tools as capabilities (from A2A)
for tool in record.tools_list:
tool_str = normalize_cap(tool)
if tool_str:
cursor.execute(
"INSERT INTO capabilities (component_id, capability, capability_type) VALUES (?, ?, ?)",
(record.id, tool_str, "tool")
)

conn.commit()

def update_fts_index(conn: sqlite3.Connection): """Rebuild FTS index with capability, trigger, and frontmatter data (J.20).""" cursor = conn.cursor()

# Drop and recreate FTS table to avoid issues
cursor.execute("DROP TABLE IF EXISTS component_search")
cursor.execute("""
CREATE VIRTUAL TABLE component_search USING fts5(
id,
name,
type,
description,
capabilities,
triggers,
frontmatter
)
""")

# Get all components with their capabilities, triggers, and frontmatter
cursor.execute("""
SELECT
c.id,
c.name,
c.type,
c.description,
(SELECT GROUP_CONCAT(DISTINCT capability) FROM capabilities WHERE component_id = c.id) as capabilities,
(SELECT GROUP_CONCAT(DISTINCT description) FROM triggers WHERE component_id = c.id) as triggers,
(SELECT GROUP_CONCAT(key || ':' || value, ' ') FROM document_frontmatter WHERE component_id = c.id) as frontmatter
FROM components c
""")

rows = cursor.fetchall()

# Insert into FTS
for row in rows:
comp_id, name, comp_type, description, capabilities, triggers, frontmatter = row
cursor.execute("""
INSERT INTO component_search (id, name, type, description, capabilities, triggers, frontmatter)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (comp_id, name, comp_type, description or "", capabilities or "", triggers or "", frontmatter or ""))

conn.commit()

def search_components( conn: sqlite3.Connection, query: str = "", limit: int = 20, model: str = "", tools: str = "", can_orchestrate: bool = False, component_type: str = "" ) -> List[Dict]: """ Search components using FTS with optional A2A filters.

Args:
query: FTS search query (optional if using filters)
limit: Maximum results
model: Filter by LLM model (sonnet, opus, haiku)
tools: Filter by tool availability (comma-separated)
can_orchestrate: Filter to components that can orchestrate others
component_type: Filter by component type (agent, command, skill, etc.)
"""
cursor = conn.cursor()

# Build WHERE clauses for filters
where_clauses = []
params = []

if model:
where_clauses.append("c.llm_model = ?")
params.append(model)

if tools:
# Check if any of the specified tools are in the tools_list JSON array
tool_list = [t.strip() for t in tools.split(",")]
tool_conditions = []
for tool in tool_list:
tool_conditions.append("c.tools_list LIKE ?")
params.append(f'%"{tool}"%')
where_clauses.append(f"({' OR '.join(tool_conditions)})")

if can_orchestrate:
where_clauses.append("""
EXISTS (SELECT 1 FROM component_composability cc
WHERE cc.component_id = c.id AND cc.relationship = 'can_orchestrate')
""")

if component_type:
where_clauses.append("c.type = ?")
params.append(component_type)

# Build the query
if query:
# Use FTS search with optional filters
# Quote the query to handle hyphens (FTS5 interprets - as NOT operator)
# Use double quotes for phrase matching with special characters
fts_query = f'"{query}"' if '-' in query or ' ' in query else query

base_sql = """
SELECT
c.id, c.name, c.type, c.description, c.confidence, c.maturity,
c.llm_model, c.tools_list, c.invocation_method,
bm25(component_search) as score
FROM component_search
JOIN components c ON component_search.id = c.id
WHERE component_search MATCH ?
"""
params.insert(0, fts_query)

if where_clauses:
base_sql += " AND " + " AND ".join(where_clauses)

base_sql += " ORDER BY score LIMIT ?"
params.append(limit)
else:
# Filter-only search (no FTS query)
base_sql = """
SELECT
c.id, c.name, c.type, c.description, c.confidence, c.maturity,
c.llm_model, c.tools_list, c.invocation_method,
c.confidence as score
FROM components c
"""

if where_clauses:
base_sql += " WHERE " + " AND ".join(where_clauses)

base_sql += " ORDER BY c.confidence DESC, c.name LIMIT ?"
params.append(limit)

cursor.execute(base_sql, params)

results = []
for row in cursor.fetchall():
# Parse tools_list JSON
tools_list = []
if row[7]:
try:
tools_list = json.loads(row[7])
except (json.JSONDecodeError, TypeError):
pass

results.append({
"id": row[0],
"name": row[1],
"type": row[2],
"description": row[3][:200] + "..." if row[3] and len(row[3]) > 200 else row[3],
"confidence": row[4],
"maturity": row[5],
"llm_model": row[6] or "",
"tools": tools_list,
"invocation_method": row[8] or "",
"score": row[9]
})

return results

def search_by_capability(conn: sqlite3.Connection, capability: str, limit: int = 20) -> List[Dict]: """Search components by specific capability.""" cursor = conn.cursor()

cursor.execute("""
SELECT DISTINCT
c.id, c.name, c.type, c.description, c.confidence, c.maturity,
c.llm_model, c.tools_list
FROM components c
JOIN capabilities cap ON c.id = cap.component_id
WHERE cap.capability LIKE ?
ORDER BY c.confidence DESC
LIMIT ?
""", (f"%{capability}%", limit))

results = []
for row in cursor.fetchall():
tools_list = []
if row[7]:
try:
tools_list = json.loads(row[7])
except (json.JSONDecodeError, TypeError):
pass

results.append({
"id": row[0],
"name": row[1],
"type": row[2],
"description": row[3][:200] + "..." if row[3] and len(row[3]) > 200 else row[3],
"confidence": row[4],
"maturity": row[5],
"llm_model": row[6] or "",
"tools": tools_list
})

return results

def get_orchestrators(conn: sqlite3.Connection) -> List[Dict]: """Get all components that can orchestrate other agents.""" cursor = conn.cursor()

cursor.execute("""
SELECT DISTINCT
c.id, c.name, c.type, c.description, c.llm_model,
GROUP_CONCAT(cc.target_id, ', ') as can_orchestrate
FROM components c
JOIN component_composability cc ON c.id = cc.component_id
WHERE cc.relationship = 'can_orchestrate'
GROUP BY c.id
ORDER BY c.name
""")

results = []
for row in cursor.fetchall():
results.append({
"id": row[0],
"name": row[1],
"type": row[2],
"description": row[3][:150] + "..." if row[3] and len(row[3]) > 150 else row[3],
"llm_model": row[4] or "",
"can_orchestrate": row[5] or "*"
})

return results

def get_statistics(conn: sqlite3.Connection) -> Dict: """Get index statistics.""" cursor = conn.cursor()

stats = {}

# Component counts by type
cursor.execute("SELECT type, COUNT(*) FROM components GROUP BY type ORDER BY COUNT(*) DESC")
stats["by_type"] = dict(cursor.fetchall())

# Total components
cursor.execute("SELECT COUNT(*) FROM components")
stats["total_components"] = cursor.fetchone()[0]

# Capabilities count
cursor.execute("SELECT COUNT(*) FROM capabilities")
stats["total_capabilities"] = cursor.fetchone()[0]

# Relationships count
cursor.execute("SELECT COUNT(*) FROM component_relationships")
stats["total_relationships"] = cursor.fetchone()[0]

# Composability count (A2A)
cursor.execute("SELECT COUNT(*) FROM component_composability")
stats["total_composability"] = cursor.fetchone()[0]

# By maturity
cursor.execute("SELECT maturity, COUNT(*) FROM components GROUP BY maturity")
stats["by_maturity"] = dict(cursor.fetchall())

# By documentation quality
cursor.execute("SELECT documentation_quality, COUNT(*) FROM components GROUP BY documentation_quality")
stats["by_doc_quality"] = dict(cursor.fetchall())

# Average confidence
cursor.execute("SELECT AVG(confidence) FROM components")
stats["avg_confidence"] = round(cursor.fetchone()[0] or 0, 2)

# A2A Protocol statistics
cursor.execute("SELECT COUNT(*) FROM components WHERE llm_model IS NOT NULL AND llm_model != ''")
stats["a2a_with_llm_binding"] = cursor.fetchone()[0]

cursor.execute("SELECT COUNT(*) FROM components WHERE tools_list IS NOT NULL AND tools_list != '[]'")
stats["a2a_with_tools"] = cursor.fetchone()[0]

cursor.execute("SELECT COUNT(*) FROM components WHERE a2a_schema IS NOT NULL AND a2a_schema != ''")
stats["a2a_protocol_compliant"] = cursor.fetchone()[0]

# By LLM model
cursor.execute("""
SELECT llm_model, COUNT(*) FROM components
WHERE llm_model IS NOT NULL AND llm_model != ''
GROUP BY llm_model ORDER BY COUNT(*) DESC
""")
stats["by_llm_model"] = dict(cursor.fetchall())

return stats

=============================================================================

Main Indexing Functions

=============================================================================

def parse_workflow_json(file_path: Path) -> Optional[ComponentRecord]: """Parse an n8n workflow JSON file.""" try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) except (json.JSONDecodeError, IOError) as e: print(f" Error reading {file_path}: {e}") return None

# Calculate content hash
content = file_path.read_text(encoding='utf-8')
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]

# Extract workflow name and ID (includes type prefix for uniqueness)
base_name = file_path.stem.replace(".workflow", "")
name = data.get("name", base_name)
component_id = f"workflow-json/{base_name}"

# Extract description from nodes or name
description = ""
nodes = data.get("nodes", [])
if nodes and len(nodes) > 0:
# Get trigger node notes or first node notes
for node in nodes:
if "trigger" in node.get("name", "").lower() or node.get("type", "").endswith(".webhook"):
description = node.get("notes", "")
break
if not description and nodes[0].get("notes"):
description = nodes[0].get("notes", "")

if not description:
description = f"n8n workflow: {name}"

# Extract capabilities from nodes
capabilities = []
actions = set()
domains = set()

for node in nodes:
node_name = node.get("name", "")
node_notes = node.get("notes", "")
_node_type = node.get("type", "") # Reserved for future use

# Extract agent references
if "Agent:" in node_notes:
agent_match = re.search(r'Agent:\s*(\w+)', node_notes)
if agent_match:
capabilities.append(agent_match.group(1))

# Extract purpose
if "Purpose:" in node_notes:
purpose_match = re.search(r'Purpose:\s*(.+?)(?:\n|$)', node_notes)
if purpose_match:
capabilities.append(purpose_match.group(1).strip())

# Infer actions
actions.update(extract_actions_from_text(node_name + " " + node_notes))
domains.update(extract_domains_from_text(node_name + " " + node_notes))

# Determine category from path
rel_path = file_path.relative_to(ROOT_DIR)
path_parts = list(rel_path.parts)
category = path_parts[1] if len(path_parts) > 1 else "workflow"
subcategory = path_parts[2] if len(path_parts) > 2 else ""

# Extract keywords
keywords = extract_keywords_from_description(name + " " + description)

record = ComponentRecord(
id=component_id,
type="workflow-json",
name=name,
version="1.0.0",
status="operational",
path=str(rel_path),
category=category,
subcategory=subcategory,
description=description[:500] if description else "",
capabilities_primary=capabilities[:10],
capabilities_tags=list(domains)[:10],
capabilities_domains=list(domains),
capabilities_actions=list(actions),
triggers_use_when=[],
triggers_avoid_when=[],
triggers_keywords=keywords,
complexity="medium",
invokes=[],
invoked_by=[],
alternatives=[],
complements=[],
maturity="production",
confidence=0.7,
documentation_quality="complete" if description else "partial",
content_hash=content_hash,
indexed_at=datetime.now(timezone.utc).isoformat(),
)

return record

def discover_component_files() -> Dict[str, List[Path]]: """Discover all component files including nested directories.""" files = {}

# Exclusions for all types
# Note: README.md, CLAUDE.md, INDEX.md, GUIDE.md are now indexed as documents
EXCLUDE_FILES = {"__init__.py"}
# Note: submodules/ and external/ are excluded by not being in COMPONENT_PATHS
EXCLUDE_DIRS = {"__pycache__", ".git", "node_modules", "venv", ".venv"}

for comp_type, path_config in COMPONENT_PATHS.items():
# Normalize to list (supports both single path and list of paths)
base_paths = path_config if isinstance(path_config, list) else [path_config]

found = []

for base_path in base_paths:
if not base_path.exists():
continue

if comp_type == "skill":
# Skills are in subdirectories: skills/*/SKILL.md
found.extend(base_path.glob("*/SKILL.md"))

elif comp_type == "workflow":
# Workflows: recursive markdown files
found.extend([f for f in base_path.rglob("*.md")
if not any(p in EXCLUDE_DIRS for p in f.parts)])

elif comp_type == "script":
# Scripts: recursive .py and .sh files
for pattern in ["**/*.py", "**/*.sh"]:
for f in base_path.glob(pattern):
# Skip excluded directories
if any(p in EXCLUDE_DIRS for p in f.parts):
continue
# Skip test files (they're not components)
if "/tests/" in str(f) or f.name.startswith("test_"):
continue
found.append(f)

elif comp_type == "hook":
# Hooks: .md, .sh, and .py files (all are valid hook formats)
for pattern in ["*.md", "*.sh", "*.py"]:
found.extend(base_path.glob(pattern))

elif comp_type == "document":
# Documents: recursive markdown files, excluding workflows (indexed separately)
for f in base_path.rglob("*.md"):
# Skip excluded directories
if any(p in EXCLUDE_DIRS for p in f.parts):
continue
# Skip workflow directories (already indexed as type: workflow)
if "/workflows/" in str(f):
continue
found.append(f)

else:
# Agents, commands: markdown files at top level
found.extend(base_path.glob("*.md"))

# Filter out excluded files
found = [f for f in found if f.name not in EXCLUDE_FILES]

files[comp_type] = found

# Discover workflow JSON files (n8n workflows)
workflow_jsons = []
for wf_path in WORKFLOW_JSON_PATHS:
if wf_path.exists():
# Recursively find all .workflow.json and .json files
workflow_jsons.extend(wf_path.rglob("*.workflow.json"))
# Also get regular JSON files in workflow directories (but not index files)
for json_file in wf_path.rglob("*.json"):
if json_file.name not in ["workflow-index.json", "package.json"] and ".workflow.json" not in json_file.name:
# Check if it looks like a workflow (has "nodes" key)
try:
with open(json_file, 'r') as f:
data = json.load(f)
if "nodes" in data:
workflow_jsons.append(json_file)
except (json.JSONDecodeError, IOError, OSError):
pass

files["workflow-json"] = workflow_jsons

return files

def index_all_components(conn: sqlite3.Connection, incremental: bool = False, quiet: bool = False) -> Dict: """Index all components.""" stats = {"indexed": 0, "skipped": 0, "errors": 0, "by_type": {}}

def log(msg):
if not quiet:
print(msg)

# Load taxonomy configuration (J.20)
load_taxonomy_config()
log("Loaded document taxonomy configuration")

files = discover_component_files()

# Get existing hashes for incremental mode
existing_hashes = {}
if incremental:
cursor = conn.cursor()
cursor.execute("SELECT id, content_hash FROM components")
existing_hashes = dict(cursor.fetchall())

for comp_type, file_list in files.items():
type_count = 0
log(f"\nIndexing {comp_type}s ({len(file_list)} files)...")

for file_path in file_list:
# Use appropriate parser based on type
if comp_type == "workflow-json":
record = parse_workflow_json(file_path)
else:
record = parse_component_file(file_path, comp_type)

if record is None:
stats["errors"] += 1
continue

# Check if unchanged in incremental mode
if incremental and record.id in existing_hashes:
if existing_hashes[record.id] == record.content_hash:
stats["skipped"] += 1
continue

insert_component(conn, record)
stats["indexed"] += 1
type_count += 1

# Progress indicator
if stats["indexed"] % 50 == 0:
log(f" Indexed {stats['indexed']} components...")

stats["by_type"][comp_type] = type_count

# Index user components (ADR-180: component_origin = 'user')
user_components_dir = _resolve_user_components_dir()
if user_components_dir and user_components_dir.exists():
user_type_map = {
'agents': 'agent',
'commands': 'command',
'skills': 'skill',
'hooks': 'hook',
}
user_count = 0
for dir_name, comp_type in user_type_map.items():
comp_dir = user_components_dir / dir_name
if not comp_dir.exists():
continue

if comp_type == 'skill':
user_files = list(comp_dir.glob("*/SKILL.md"))
elif comp_type == 'hook':
user_files = list(comp_dir.glob("*.md")) + list(comp_dir.glob("*.py"))
else:
user_files = list(comp_dir.glob("*.md"))

for file_path in user_files:
record = parse_component_file(file_path, comp_type)
if record:
record.component_origin = 'user'
insert_component(conn, record)
user_count += 1

if user_count > 0:
log(f"\nIndexed {user_count} user component(s)")
stats["by_type"]["user"] = user_count
stats["indexed"] += user_count

# Index product components (ADR-180: component_origin = 'product')
products_dir = _resolve_products_dir()
if products_dir and products_dir.exists():
product_count = 0
for product_path in products_dir.iterdir():
if not product_path.is_dir():
continue
for dir_name, comp_type in [('agents', 'agent'), ('commands', 'command'),
('skills', 'skill'), ('hooks', 'hook')]:
comp_dir = product_path / dir_name
if not comp_dir.exists():
continue

if comp_type == 'skill':
prod_files = list(comp_dir.glob("*/SKILL.md"))
elif comp_type == 'hook':
prod_files = list(comp_dir.glob("*.md")) + list(comp_dir.glob("*.py"))
else:
prod_files = list(comp_dir.glob("*.md"))

for file_path in prod_files:
record = parse_component_file(file_path, comp_type)
if record:
record.component_origin = 'product'
insert_component(conn, record)
product_count += 1

if product_count > 0:
log(f"\nIndexed {product_count} product component(s)")
stats["by_type"]["product"] = product_count
stats["indexed"] += product_count

# Populate document taxonomy table (J.20)
log("\nPopulating document taxonomy...")
populate_document_taxonomy(conn, project_id="default")

# Update FTS index (now includes frontmatter data)
log("\nUpdating full-text search index...")
update_fts_index(conn)

# Count frontmatter entries
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM document_frontmatter")
fm_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT component_id) FROM document_frontmatter")
fm_docs = cursor.fetchone()[0]
stats["frontmatter_entries"] = fm_count
stats["documents_with_frontmatter"] = fm_docs

return stats

def _resolve_user_components_dir() -> Optional[Path]: """Resolve user-components directory (ADR-180).

Priority:
1. user-components-config.json (set during install, may point to custom path)
2. paths.py data dir / user-components
3. Common locations fallback
"""
import json as _json

# Priority 1: Config file from install script (ADR-180 D4/D8)
home = Path.home()
for projects_dir in [home / "PROJECTS", home / "projects"]:
config_path = projects_dir / ".coditect-data" / "user-components-config.json"
if config_path.exists():
try:
with open(config_path) as f:
config = _json.load(f)
configured_path = Path(config.get('path', '')).expanduser().resolve()
if configured_path.exists():
return configured_path
except (ValueError, KeyError, OSError):
pass

# Priority 2: Try paths.py
try:
from core.paths import get_coditect_data_dir
data_dir = get_coditect_data_dir()
if data_dir:
return Path(data_dir) / "user-components"
except ImportError:
pass

# Priority 3: Fallback to common locations
for projects_dir in [home / "PROJECTS", home / "projects"]:
uc_dir = projects_dir / ".coditect-data" / "user-components"
if uc_dir.exists():
return uc_dir

return None

def _resolve_products_dir() -> Optional[Path]: """Resolve installed products directory (ADR-180).""" import platform as plat home = Path.home()

if plat.system() == "Darwin":
products = home / "Library" / "Application Support" / "CODITECT" / "products"
elif plat.system() == "Windows":
localappdata = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
products = localappdata / "CODITECT" / "products"
else:
xdg = Path(os.environ.get("XDG_DATA_HOME", home / ".local" / "share"))
products = xdg / "coditect" / "products"

return products if products.exists() else None

=============================================================================

CLI

=============================================================================

def main(): parser = argparse.ArgumentParser( description="Index CODITECT components for self-awareness search" ) parser.add_argument( "--incremental", "-i", action="store_true", help="Only index new or changed components" ) parser.add_argument( "--stats", "-s", action="store_true", help="Show index statistics" ) parser.add_argument( "--search", "-q", type=str, help="Search query to test" ) parser.add_argument( "--db", type=str, default=str(DB_PATH), help=f"Database path (default: {DB_PATH})" ) parser.add_argument( "--quiet", "-Q", action="store_true", help="Suppress output (for use in hooks)" )

# A2A-specific search filters
parser.add_argument(
"--model", "-m",
type=str,
choices=["sonnet", "opus", "haiku"],
help="Filter by LLM model preference"
)
parser.add_argument(
"--tools", "-t",
type=str,
help="Filter by tools (comma-separated, e.g., 'Read,Write,Bash')"
)
parser.add_argument(
"--type",
type=str,
choices=["agent", "command", "skill", "script", "hook", "workflow", "document"],
help="Filter by component type"
)
parser.add_argument(
"--orchestrators",
action="store_true",
help="List all orchestrator agents (can_orchestrate others)"
)
parser.add_argument(
"--capability",
type=str,
help="Search by specific capability"
)

# ADR-159: Project scoping
parser.add_argument(
"--project",
type=str,
default=None,
help="Scope indexing/search to project components (ADR-159)"
)

# J.20: Document Taxonomy
parser.add_argument(
"--submodule-path",
type=str,
default=None,
help="Path to a submodule directory to index documents from (J.20)"
)
parser.add_argument(
"--taxonomy",
type=str,
default=None,
help="Path to document-taxonomy.json config (default: config/document-taxonomy.json)"
)
parser.add_argument(
"--frontmatter-stats",
action="store_true",
help="Show document frontmatter indexing statistics (J.20)"
)

# H.24: Version Observability
parser.add_argument(
"--stale",
type=int,
nargs="?",
const=90,
metavar="DAYS",
help="List components not reviewed within N days (default: 90)"
)
parser.add_argument(
"--outdated",
action="store_true",
help="List components where updated > last_reviewed (changed but not verified)"
)
parser.add_argument(
"--version-query",
type=str,
metavar="COMPONENT",
help="Query version, updated, and last_reviewed for a specific component"
)

args = parser.parse_args()
db_path = Path(args.db)
quiet = args.quiet

def log(msg):
"""Print unless quiet mode is enabled"""
if not quiet:
print(msg)

log("=" * 60)
log("CODITECT Component Indexer")
log("=" * 60)
log(f"Database: {db_path}")

# Load taxonomy config if specified (J.20)
if args.taxonomy:
load_taxonomy_config(Path(args.taxonomy))

# Initialize database
conn = init_database(db_path)

if args.frontmatter_stats:
# J.20: Show frontmatter indexing statistics
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM document_frontmatter")
total_entries = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT component_id) FROM document_frontmatter")
total_docs = cursor.fetchone()[0]
cursor.execute("SELECT key, COUNT(*) as cnt FROM document_frontmatter GROUP BY key ORDER BY cnt DESC LIMIT 25")
key_counts = cursor.fetchall()
cursor.execute("SELECT COUNT(*) FROM document_taxonomy")
taxonomy_count = cursor.fetchone()[0]

print("\nDocument Frontmatter Statistics (J.20):")
print("-" * 50)
print(f"Total frontmatter entries: {total_entries}")
print(f"Documents with frontmatter: {total_docs}")
print(f"Taxonomy categories loaded: {taxonomy_count}")
print("\nTop frontmatter keys:")
for key, cnt in key_counts:
print(f" {key}: {cnt}")

# Category distribution for documents
cursor.execute("""
SELECT category, COUNT(*) as cnt
FROM components WHERE type = 'document'
GROUP BY category ORDER BY cnt DESC
""")
cat_counts = cursor.fetchall()
print("\nDocument category distribution:")
for cat, cnt in cat_counts:
print(f" {cat}: {cnt}")

conn.close()
return

elif args.stats:
print("\nIndex Statistics:")
print("-" * 40)
stats = get_statistics(conn)
print(f"Total components: {stats['total_components']}")
print(f"Total capabilities: {stats['total_capabilities']}")
print(f"Total relationships: {stats['total_relationships']}")
print(f"Average confidence: {stats['avg_confidence']}")
print("\nBy type:")
for t, c in stats["by_type"].items():
print(f" {t}: {c}")
print("\nBy maturity:")
for m, c in stats["by_maturity"].items():
print(f" {m}: {c}")
print("\nBy documentation quality:")
for q, c in stats["by_doc_quality"].items():
print(f" {q}: {c}")
print("\nA2A Protocol Statistics:")
print(f" With LLM binding: {stats.get('a2a_with_llm_binding', 0)}")
print(f" With tools defined: {stats.get('a2a_with_tools', 0)}")
print(f" A2A protocol compliant: {stats.get('a2a_protocol_compliant', 0)}")
print(f" Composability relations: {stats.get('total_composability', 0)}")
if stats.get('by_llm_model'):
print("\nBy LLM model:")
for model, count in stats["by_llm_model"].items():
print(f" {model}: {count}")

elif args.orchestrators:
# List all orchestrator components
print("\nOrchestrator Components:")
print("-" * 40)
results = get_orchestrators(conn)
if results:
for r in results:
print(f"\n[{r['type']}] {r['name']} ({r['id']})")
if r['llm_model']:
print(f" Model: {r['llm_model']}")
print(f" Can orchestrate: {r['can_orchestrate']}")
if r['description']:
print(f" {r['description']}")
print(f"\nTotal: {len(results)} orchestrators")
else:
print("No orchestrators found.")

elif args.capability:
# Search by capability
print(f"\nSearching by capability: {args.capability}")
print("-" * 40)
results = search_by_capability(conn, args.capability)
if results:
for r in results:
print(f"\n[{r['type']}] {r['name']} ({r['id']})")
print(f" Confidence: {r['confidence']:.2f} | Maturity: {r['maturity']}")
if r['llm_model']:
print(f" Model: {r['llm_model']}")
if r['tools']:
print(f" Tools: {', '.join(r['tools'][:5])}")
if r['description']:
print(f" {r['description']}")
print(f"\nTotal: {len(results)} results")
else:
print("No results found.")

elif args.version_query:
# H.24: Query version info for a specific component
cursor = conn.cursor()
cursor.execute("""
SELECT id, type, name, version, content_updated, last_reviewed, status, path
FROM components
WHERE name LIKE ? OR id LIKE ?
ORDER BY type, name
""", (f"%{args.version_query}%", f"%{args.version_query}%"))
rows = cursor.fetchall()
if rows:
print(f"\nVersion info for '{args.version_query}':")
print("-" * 70)
for row in rows:
cid, ctype, cname, ver, updated, reviewed, status, path = row
print(f"\n [{ctype}] {cname} ({cid})")
print(f" Version: {ver or 'unknown'}")
print(f" Updated: {updated or 'not tracked'}")
print(f" Last Reviewed: {reviewed or 'not tracked'}")
print(f" Status: {status}")
print(f" Path: {path}")
print(f"\nTotal: {len(rows)} components")
else:
print(f"No components matching '{args.version_query}'")

elif args.stale is not None:
# H.24: List components not reviewed within N days
from datetime import timedelta
cutoff = (datetime.now(timezone.utc) - timedelta(days=args.stale)).strftime("%Y-%m-%d")
cursor = conn.cursor()
cursor.execute("""
SELECT id, type, name, version, content_updated, last_reviewed, path
FROM components
WHERE last_reviewed IS NULL OR last_reviewed < ? OR last_reviewed = ''
ORDER BY last_reviewed ASC, type, name
""", (cutoff,))
rows = cursor.fetchall()
print(f"\nStale components (not reviewed in {args.stale} days, cutoff: {cutoff}):")
print("-" * 70)
if rows:
no_review = sum(1 for r in rows if not r[5])
old_review = len(rows) - no_review
for row in rows:
cid, ctype, cname, ver, updated, reviewed, path = row
review_status = reviewed or "NEVER"
print(f" [{ctype:8s}] {cname:40s} v{ver or '?':8s} reviewed: {review_status}")
print(f"\nTotal: {len(rows)} stale ({no_review} never reviewed, {old_review} reviewed before {cutoff})")
else:
print("All components are up-to-date!")

elif args.outdated:
# H.24: List components where updated > last_reviewed
cursor = conn.cursor()
cursor.execute("""
SELECT id, type, name, version, content_updated, last_reviewed, path
FROM components
WHERE content_updated IS NOT NULL AND content_updated != ''
AND last_reviewed IS NOT NULL AND last_reviewed != ''
AND content_updated > last_reviewed
ORDER BY content_updated DESC, type, name
""")
rows = cursor.fetchall()
print(f"\nOutdated components (updated after last review):")
print("-" * 70)
if rows:
for row in rows:
cid, ctype, cname, ver, updated, reviewed, path = row
print(f" [{ctype:8s}] {cname:40s} updated: {updated} reviewed: {reviewed}")
print(f"\nTotal: {len(rows)} components need review")
else:
print("No components are outdated — all reviewed after last update!")

elif args.search or args.model or args.tools or args.type:
# A2A-aware search with filters
filters = []
if args.search:
filters.append(f"query='{args.search}'")
if args.model:
filters.append(f"model={args.model}")
if args.tools:
filters.append(f"tools={args.tools}")
if args.type:
filters.append(f"type={args.type}")

print(f"\nSearch: {' | '.join(filters)}")
print("-" * 40)

results = search_components(
conn,
query=args.search or "",
model=args.model or "",
tools=args.tools or "",
component_type=args.type or ""
)

if results:
for r in results:
print(f"\n[{r['type']}] {r['name']} ({r['id']})")
meta_parts = [f"Confidence: {r['confidence']:.2f}"]
if r['llm_model']:
meta_parts.append(f"Model: {r['llm_model']}")
if r['invocation_method']:
meta_parts.append(f"Invoke: {r['invocation_method']}")
print(f" {' | '.join(meta_parts)}")
if r['tools']:
tools_display = r['tools'][:5]
more = f" +{len(r['tools']) - 5} more" if len(r['tools']) > 5 else ""
print(f" Tools: {', '.join(tools_display)}{more}")
if r['description']:
print(f" {r['description']}")
print(f"\nTotal: {len(results)} results")
else:
print("No results found.")

else:
log(f"\nMode: {'Incremental' if args.incremental else 'Full rebuild'}")
log("-" * 40)

stats = index_all_components(conn, incremental=args.incremental, quiet=quiet)

# Index submodule documents if --submodule-path specified (J.20)
if args.submodule_path:
submodule_dir = Path(args.submodule_path).resolve()
if submodule_dir.exists():
project_id = args.project or submodule_dir.name
log(f"\nIndexing submodule documents: {submodule_dir} (project={project_id})")
sub_count = 0
sub_errors = 0

# Walk submodule for markdown files in standard doc directories
doc_dirs = ["docs", "internal", "research", "config", "prompts"]
for doc_dir_name in doc_dirs:
doc_dir = submodule_dir / doc_dir_name
if not doc_dir.exists():
continue
for md_file in doc_dir.rglob("*.md"):
if md_file.is_symlink():
continue
record = parse_component_file(md_file, "document")
if record:
record.project_id = project_id
# Use submodule-relative path for the ID
try:
rel = md_file.relative_to(submodule_dir)
except ValueError:
rel = md_file
record.id = f"document/{project_id}/{rel}"
record.path = str(rel)
insert_component(conn, record)
sub_count += 1
else:
sub_errors += 1

log(f" Indexed {sub_count} submodule documents ({sub_errors} errors)")
stats["indexed"] += sub_count
stats["errors"] += sub_errors
stats["by_type"][f"document:{project_id}"] = sub_count

# Populate taxonomy for this project
populate_document_taxonomy(conn, project_id=project_id)

# Rebuild FTS to include new documents
log(" Rebuilding FTS index...")
update_fts_index(conn)
else:
log(f" WARNING: Submodule path not found: {submodule_dir}")

log("\n" + "=" * 60)
log("Indexing Complete")
log("=" * 60)
log(f"Indexed: {stats['indexed']}")
log(f"Skipped (unchanged): {stats['skipped']}")
log(f"Errors: {stats['errors']}")
log("\nBy type:")
for t, c in stats["by_type"].items():
log(f" {t}: {c}")

# Show final stats
log("\nFinal index statistics:")
final_stats = get_statistics(conn)
log(f" Total components: {final_stats['total_components']}")
log(f" Total capabilities: {final_stats['total_capabilities']}")
log(f" Total relationships: {final_stats['total_relationships']}")
log(f" A2A with LLM binding: {final_stats.get('a2a_with_llm_binding', 0)}")
log(f" A2A protocol compliant: {final_stats.get('a2a_protocol_compliant', 0)}")

# J.20: Document frontmatter stats
log(f"\nDocument taxonomy (J.20):")
log(f" Frontmatter entries: {stats.get('frontmatter_entries', 0)}")
log(f" Documents with frontmatter: {stats.get('documents_with_frontmatter', 0)}")

conn.close()
log(f"\nDatabase saved to: {db_path}")

if name == "main": main()