scripts-component-db-cli
#!/usr/bin/env python3 """
title: Component Database CLI component_type: script version: 1.0.0 created: '2026-01-22' updated: '2026-01-22' status: active tags:
- database
- component
- cli
- search
- index
summary: Unified CLI for component database operations including search, index, validate, and export
language: python
executable: true
usage: python3 scripts/component-db-cli.py
[options] python_version: '3.10+' dependencies: []
Component Database CLI
Unified command-line interface for CODITECT component database operations.
Usage:
python3 scripts/component-db-cli.py search
import argparse import hashlib import json import sqlite3 import sys from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional
=============================================================================
Configuration (ADR-114 Path Discovery)
=============================================================================
SCRIPT_DIR = Path(file).parent ROOT_DIR = SCRIPT_DIR.parent
Add to path for imports
if str(ROOT_DIR) not in sys.path: sys.path.insert(0, str(ROOT_DIR))
ADR-114: Use centralized path discovery
try: from scripts.core.paths import get_context_storage_dir, get_platform_db_path, PLATFORM_DB CONTEXT_STORAGE = get_context_storage_dir() DB_PATH = PLATFORM_DB # ADR-118: Component data goes to platform.db (Tier 1) except ImportError: _new_location = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" CONTEXT_STORAGE = _new_location if _new_location.exists() else Path.home() / ".coditect" / "context-storage" DB_PATH = CONTEXT_STORAGE / "platform.db" # ADR-118: Tier 1
=============================================================================
Database Connection
=============================================================================
@contextmanager def get_db_connection(readonly: bool = False): """Get a database connection with proper settings.""" if not DB_PATH.exists(): print(f"Error: Database not found at {DB_PATH}") print("Run 'python3 scripts/component-indexer.py' to create it.") sys.exit(1)
uri = f"file:{DB_PATH}{'?mode=ro' if readonly else ''}"
conn = sqlite3.connect(uri, uri=True, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
finally:
conn.close()
=============================================================================
Search Command
=============================================================================
def cmd_search(args): """Search components in the database.""" query = args.query
with get_db_connection(readonly=True) as conn:
# Build search query
if args.semantic:
# Semantic search would use embeddings - for now, fallback to FTS
print("Note: Semantic search requires embeddings. Using FTS.")
# Escape special FTS characters
escaped_query = query.replace('"', '""')
# Base FTS query
sql_parts = ["""
SELECT c.*, bm25(component_search) as score
FROM component_search cs
JOIN components c ON c.rowid = cs.rowid
WHERE component_search MATCH ?
"""]
params = [f'"{escaped_query}"']
# Add filters
if args.type:
sql_parts.append("AND c.type = ?")
params.append(args.type)
if args.category:
sql_parts.append("AND c.category = ?")
params.append(args.category)
if args.model:
sql_parts.append("AND c.llm_model = ?")
params.append(args.model)
# Order and limit
sql_parts.append("ORDER BY score LIMIT ?")
params.append(args.limit)
sql = "\n".join(sql_parts)
try:
cursor = conn.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError as e:
if "no such table" in str(e):
print("Error: FTS index not found. Run 'python3 scripts/component-indexer.py'")
sys.exit(1)
raise
# Output results
if args.format == "json":
print(json.dumps(results, indent=2, default=str))
elif args.format == "brief":
for r in results:
print(r['id'])
else: # table format
if not results:
print("No results found.")
return
print(f"\nFound {len(results)} results for '{query}':\n")
print(f"{'ID':<50} {'Type':<10} {'Score':<8} Description")
print("-" * 100)
for r in results:
desc = (r['description'] or '')[:40]
print(f"{r['id']:<50} {r['type']:<10} {r['score']:.4f} {desc}")
def cmd_search_related(args): """Find components related to a given component ID.""" component_id = args.related
with get_db_connection(readonly=True) as conn:
# Get outgoing relationships
cursor = conn.execute("""
SELECT target_id, relationship_type, notes
FROM component_relationships
WHERE source_id = ?
""", (component_id,))
outgoing = [dict(row) for row in cursor.fetchall()]
# Get incoming relationships
cursor = conn.execute("""
SELECT source_id, relationship_type, notes
FROM component_relationships
WHERE target_id = ?
""", (component_id,))
incoming = [dict(row) for row in cursor.fetchall()]
print(f"\nRelationships for: {component_id}\n")
if outgoing:
print("Outgoing (this component uses):")
for r in outgoing:
print(f" → {r['target_id']} ({r['relationship_type']})")
if incoming:
print("\nIncoming (uses this component):")
for r in incoming:
print(f" ← {r['source_id']} ({r['relationship_type']})")
if not outgoing and not incoming:
print("No relationships found.")
=============================================================================
Index Command
=============================================================================
def cmd_index(args): """Manage the component index.""" if args.rebuild_fts: rebuild_fts_index() return
if args.verify:
verify_index_integrity()
return
# Full or incremental index - defer to component-indexer.py
import subprocess
cmd = ["python3", str(SCRIPT_DIR / "component-indexer.py")]
if args.full:
# Full rebuild - no special flag, it's the default
pass
else:
cmd.append("--incremental")
if args.type:
# Filter by type - component-indexer doesn't support this yet
print(f"Note: Type filter not supported in indexer. Indexing all.")
if args.verbose:
cmd.append("--verbose")
result = subprocess.run(cmd)
sys.exit(result.returncode)
def rebuild_fts_index(): """Rebuild the FTS5 index.""" print("Rebuilding FTS index...") with get_db_connection() as conn: try: conn.execute( "INSERT INTO component_search(component_search) VALUES('rebuild')" ) conn.commit() print("FTS index rebuilt successfully.") except sqlite3.OperationalError as e: print(f"Error rebuilding FTS index: {e}") sys.exit(1)
def verify_index_integrity(): """Verify index integrity.""" print("Verifying index integrity...")
with get_db_connection(readonly=True) as conn:
errors = []
# Check database integrity
cursor = conn.execute("PRAGMA integrity_check")
result = cursor.fetchone()[0]
if result != "ok":
errors.append(f"Database integrity: {result}")
else:
print(" ✓ Database integrity OK")
# Check FTS integrity
try:
conn.execute(
"INSERT INTO component_search(component_search) VALUES('integrity-check')"
)
print(" ✓ FTS index integrity OK")
except sqlite3.IntegrityError as e:
errors.append(f"FTS integrity: {e}")
except sqlite3.OperationalError:
# Read-only mode, can't run integrity check
print(" ? FTS integrity check skipped (read-only)")
# Check for orphaned records
cursor = conn.execute("""
SELECT COUNT(*) FROM capabilities
WHERE component_id NOT IN (SELECT id FROM components)
""")
orphaned = cursor.fetchone()[0]
if orphaned > 0:
errors.append(f"Orphaned capability records: {orphaned}")
else:
print(" ✓ No orphaned capability records")
# Check for missing files
cursor = conn.execute("SELECT id, path FROM components LIMIT 100")
missing_files = 0
for row in cursor:
path = Path(row['path'])
if not path.exists():
missing_files += 1
if missing_files > 0:
errors.append(f"Missing files (sampled): {missing_files}")
else:
print(" ✓ All sampled files exist")
# Summary
if errors:
print("\nErrors found:")
for e in errors:
print(f" ✗ {e}")
sys.exit(1)
else:
print("\nAll integrity checks passed.")
=============================================================================
Validate Command
=============================================================================
def cmd_validate(args): """Validate component files.""" import yaml
path = Path(args.path) if args.path else ROOT_DIR
errors = []
warnings = []
info = []
# Find component files
if path.is_file():
files = [path]
else:
files = list(path.rglob("*.md"))
if args.type:
# Filter by directory
type_dirs = {
"agent": "agents",
"command": "commands",
"skill": "skills",
"script": "scripts",
"hook": "hooks",
}
if args.type in type_dirs:
files = [f for f in files if type_dirs[args.type] in f.parts]
for file_path in files:
# Skip non-component files
if any(skip in str(file_path) for skip in [
'node_modules', '.git', 'venv', '__pycache__'
]):
continue
try:
content = file_path.read_text()
except Exception as e:
errors.append((file_path, 1, "read_error", str(e)))
continue
# Check frontmatter
if not content.startswith("---"):
# Not a component file, skip
continue
try:
# Parse frontmatter
parts = content.split("---", 2)
if len(parts) >= 3:
frontmatter = yaml.safe_load(parts[1])
else:
frontmatter = {}
except yaml.YAMLError as e:
errors.append((file_path, 1, "yaml_error", str(e)))
continue
# Validate frontmatter
if not frontmatter.get("title"):
warnings.append((file_path, 1, "title_missing", "Missing title field"))
if not frontmatter.get("component_type"):
info.append((file_path, 1, "component_type_missing", "Missing component_type"))
# Check filename
if " " in file_path.name:
errors.append((file_path, 1, "filename_space", "Filename contains spaces"))
if file_path.name != file_path.name.lower():
warnings.append((file_path, 1, "filename_case", "Filename should be lowercase"))
# Output results
total = len(files)
if args.format == "json":
result = {
"total": total,
"errors": len(errors),
"warnings": len(warnings),
"info": len(info),
"issues": [
{"file": str(f), "line": l, "rule": r, "message": m, "severity": "error"}
for f, l, r, m in errors
] + [
{"file": str(f), "line": l, "rule": r, "message": m, "severity": "warning"}
for f, l, r, m in warnings
]
}
print(json.dumps(result, indent=2))
else:
print(f"\nComponent Validation Report")
print("=" * 50)
print(f"Scanned: {total} files\n")
if errors:
print(f"ERRORS ({len(errors)}):")
for f, l, r, m in errors[:20]:
print(f" {f}:{l}")
print(f" ✗ {r}: {m}")
if warnings and args.verbose:
print(f"\nWARNINGS ({len(warnings)}):")
for f, l, r, m in warnings[:20]:
print(f" {f}:{l}")
print(f" ⚠ {r}: {m}")
print(f"\nSummary:")
print(f" Errors: {len(errors)}")
print(f" Warnings: {len(warnings)}")
print(f" Info: {len(info)}")
if errors:
print(f"\nStatus: FAILED")
sys.exit(1)
elif warnings and args.strict:
print(f"\nStatus: FAILED (strict mode)")
sys.exit(2)
else:
print(f"\nStatus: PASSED")
=============================================================================
Export Command
=============================================================================
def cmd_export(args): """Export component data.""" with get_db_connection(readonly=True) as conn: # Build query sql = "SELECT * FROM components" params = []
if args.type:
sql += " WHERE type = ?"
params.append(args.type)
sql += " ORDER BY type, name"
cursor = conn.execute(sql, params)
components = [dict(row) for row in cursor.fetchall()]
# Add extended data if requested
if args.full or args.include_stats:
for comp in components:
cursor = conn.execute("""
SELECT * FROM component_usage_stats WHERE component_id = ?
""", (comp['id'],))
stats = cursor.fetchone()
comp['usage_stats'] = dict(stats) if stats else None
if args.full or args.include_relationships:
for comp in components:
cursor = conn.execute("""
SELECT target_id, relationship_type
FROM component_relationships WHERE source_id = ?
""", (comp['id'],))
comp['relationships'] = [dict(r) for r in cursor.fetchall()]
# Format output
output = {
"version": "1.0.0",
"exported_at": datetime.now(timezone.utc).isoformat(),
"total_count": len(components),
"components": components
}
if args.format == "json":
result = json.dumps(output, indent=2, default=str)
elif args.format == "jsonl":
result = "\n".join(json.dumps(c, default=str) for c in components)
elif args.format == "csv":
import csv
import io
buf = io.StringIO()
if components:
writer = csv.DictWriter(buf, fieldnames=components[0].keys())
writer.writeheader()
writer.writerows(components)
result = buf.getvalue()
else:
result = json.dumps(output, indent=2, default=str)
# Output
if args.output:
output_path = Path(args.output)
if args.compress:
import gzip
with gzip.open(output_path, 'wt') as f:
f.write(result)
else:
output_path.write_text(result)
print(f"Exported {len(components)} components to {output_path}")
else:
print(result)
=============================================================================
Stats Command
=============================================================================
def cmd_stats(args): """Show database statistics.""" with get_db_connection(readonly=True) as conn: stats = {}
# Count by type
cursor = conn.execute(
"SELECT type, COUNT(*) as count FROM components GROUP BY type ORDER BY count DESC"
)
stats['by_type'] = {row['type']: row['count'] for row in cursor}
stats['total'] = sum(stats['by_type'].values())
# With LLM bindings
cursor = conn.execute(
"SELECT COUNT(*) FROM components WHERE llm_model IS NOT NULL AND llm_model != ''"
)
stats['with_llm_binding'] = cursor.fetchone()[0]
# By model
cursor = conn.execute("""
SELECT llm_model, COUNT(*) as count
FROM components
WHERE llm_model IS NOT NULL AND llm_model != ''
GROUP BY llm_model
ORDER BY count DESC
""")
stats['by_model'] = {row['llm_model']: row['count'] for row in cursor}
# Database size
cursor = conn.execute("SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()")
stats['db_size_bytes'] = cursor.fetchone()[0]
# FTS records
cursor = conn.execute("SELECT COUNT(*) FROM component_search")
stats['fts_records'] = cursor.fetchone()[0]
# Output
if args.format == "json":
print(json.dumps(stats, indent=2))
else:
print("\nComponent Database Statistics")
print("=" * 50)
print(f"\nTotal Components: {stats['total']}")
print(f"\nBy Type:")
for t, c in stats['by_type'].items():
print(f" {t:<15} {c:>6}")
print(f"\nLLM Bindings: {stats['with_llm_binding']}")
if stats['by_model']:
print(f"\nBy Model:")
for m, c in stats['by_model'].items():
print(f" {m:<15} {c:>6}")
print(f"\nDatabase Size: {stats['db_size_bytes'] / 1024 / 1024:.2f} MB")
print(f"FTS Records: {stats['fts_records']}")
=============================================================================
Maintenance Command
=============================================================================
def cmd_maintenance(args): """Database maintenance operations.""" if args.vacuum: print("Vacuuming database...") with get_db_connection() as conn: conn.execute("VACUUM") print("Vacuum complete.")
if args.analyze:
print("Analyzing database...")
with get_db_connection() as conn:
conn.execute("ANALYZE")
print("Analysis complete.")
if args.backup:
backup_path = Path(args.backup)
print(f"Backing up to {backup_path}...")
with get_db_connection(readonly=True) as conn:
with sqlite3.connect(backup_path) as backup_conn:
conn.backup(backup_conn)
print("Backup complete.")
=============================================================================
Main Entry Point
=============================================================================
def main(): parser = argparse.ArgumentParser( description="CODITECT Component Database CLI", formatter_class=argparse.RawDescriptionHelpFormatter )
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Search command
search_parser = subparsers.add_parser("search", help="Search components")
search_parser.add_argument("query", nargs="?", help="Search query")
search_parser.add_argument("--type", "-t", help="Filter by type")
search_parser.add_argument("--category", "-c", help="Filter by category")
search_parser.add_argument("--model", "-m", help="Filter by LLM model")
search_parser.add_argument("--capability", help="Filter by capability")
search_parser.add_argument("--limit", "-l", type=int, default=20, help="Limit results")
search_parser.add_argument("--format", "-f", choices=["table", "json", "brief"], default="table")
search_parser.add_argument("--semantic", action="store_true", help="Use semantic search")
search_parser.add_argument("--related", help="Find related components")
search_parser.add_argument("--orchestrators", action="store_true", help="List orchestrators")
# Index command
index_parser = subparsers.add_parser("index", help="Manage index")
index_parser.add_argument("--full", action="store_true", help="Full rebuild")
index_parser.add_argument("--incremental", "-i", action="store_true", help="Incremental update")
index_parser.add_argument("--rebuild-fts", action="store_true", help="Rebuild FTS index")
index_parser.add_argument("--verify", action="store_true", help="Verify integrity")
index_parser.add_argument("--type", "-t", help="Index only specific type")
index_parser.add_argument("--verbose", "-v", action="store_true")
# Validate command
validate_parser = subparsers.add_parser("validate", help="Validate components")
validate_parser.add_argument("path", nargs="?", help="Path to validate")
validate_parser.add_argument("--type", "-t", help="Validate only specific type")
validate_parser.add_argument("--strict", action="store_true", help="Fail on warnings")
validate_parser.add_argument("--fix", action="store_true", help="Auto-fix issues")
validate_parser.add_argument("--format", "-f", choices=["text", "json"], default="text")
validate_parser.add_argument("--verbose", "-v", action="store_true")
# Export command
export_parser = subparsers.add_parser("export", help="Export data")
export_parser.add_argument("--format", "-f", choices=["json", "jsonl", "csv", "sql"], default="json")
export_parser.add_argument("--type", "-t", help="Export only specific type")
export_parser.add_argument("--output", "-o", help="Output file")
export_parser.add_argument("--full", action="store_true", help="Include all data")
export_parser.add_argument("--include-stats", action="store_true")
export_parser.add_argument("--include-relationships", action="store_true")
export_parser.add_argument("--compress", action="store_true", help="Gzip output")
# Stats command
stats_parser = subparsers.add_parser("stats", help="Show statistics")
stats_parser.add_argument("--format", "-f", choices=["text", "json"], default="text")
# Maintenance command
maint_parser = subparsers.add_parser("maintenance", help="Maintenance tasks")
maint_parser.add_argument("--vacuum", action="store_true", help="Vacuum database")
maint_parser.add_argument("--analyze", action="store_true", help="Update statistics")
maint_parser.add_argument("--backup", help="Backup to file")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
# Dispatch
if args.command == "search":
if args.related:
cmd_search_related(args)
elif args.query:
cmd_search(args)
else:
print("Error: Query required for search")
sys.exit(1)
elif args.command == "index":
cmd_index(args)
elif args.command == "validate":
cmd_validate(args)
elif args.command == "export":
cmd_export(args)
elif args.command == "stats":
cmd_stats(args)
elif args.command == "maintenance":
cmd_maintenance(args)
if name == "main": main()