Skip to main content

scripts-component-db-cli

#!/usr/bin/env python3 """

title: Component Database CLI component_type: script version: 1.0.0 created: '2026-01-22' updated: '2026-01-22' status: active tags:

  • database
  • component
  • cli
  • search
  • index summary: Unified CLI for component database operations including search, index, validate, and export language: python executable: true usage: python3 scripts/component-db-cli.py [options] python_version: '3.10+' dependencies: []

Component Database CLI

Unified command-line interface for CODITECT component database operations.

Usage: python3 scripts/component-db-cli.py search [options] python3 scripts/component-db-cli.py index [options] python3 scripts/component-db-cli.py validate [path] [options] python3 scripts/component-db-cli.py export [options] python3 scripts/component-db-cli.py stats [options] python3 scripts/component-db-cli.py maintenance [options] """

import argparse import hashlib import json import sqlite3 import sys from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional

=============================================================================

Configuration (ADR-114 Path Discovery)

=============================================================================

SCRIPT_DIR = Path(file).parent ROOT_DIR = SCRIPT_DIR.parent

Add to path for imports

if str(ROOT_DIR) not in sys.path: sys.path.insert(0, str(ROOT_DIR))

ADR-114: Use centralized path discovery

try: from scripts.core.paths import get_context_storage_dir, get_platform_db_path, PLATFORM_DB CONTEXT_STORAGE = get_context_storage_dir() DB_PATH = PLATFORM_DB # ADR-118: Component data goes to platform.db (Tier 1) except ImportError: _new_location = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" CONTEXT_STORAGE = _new_location if _new_location.exists() else Path.home() / ".coditect" / "context-storage" DB_PATH = CONTEXT_STORAGE / "platform.db" # ADR-118: Tier 1

=============================================================================

Database Connection

=============================================================================

@contextmanager def get_db_connection(readonly: bool = False): """Get a database connection with proper settings.""" if not DB_PATH.exists(): print(f"Error: Database not found at {DB_PATH}") print("Run 'python3 scripts/component-indexer.py' to create it.") sys.exit(1)

uri = f"file:{DB_PATH}{'?mode=ro' if readonly else ''}"
conn = sqlite3.connect(uri, uri=True, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
finally:
conn.close()

=============================================================================

Search Command

=============================================================================

def cmd_search(args): """Search components in the database.""" query = args.query

with get_db_connection(readonly=True) as conn:
# Build search query
if args.semantic:
# Semantic search would use embeddings - for now, fallback to FTS
print("Note: Semantic search requires embeddings. Using FTS.")

# Escape special FTS characters
escaped_query = query.replace('"', '""')

# Base FTS query
sql_parts = ["""
SELECT c.*, bm25(component_search) as score
FROM component_search cs
JOIN components c ON c.rowid = cs.rowid
WHERE component_search MATCH ?
"""]
params = [f'"{escaped_query}"']

# Add filters
if args.type:
sql_parts.append("AND c.type = ?")
params.append(args.type)

if args.category:
sql_parts.append("AND c.category = ?")
params.append(args.category)

if args.model:
sql_parts.append("AND c.llm_model = ?")
params.append(args.model)

# Order and limit
sql_parts.append("ORDER BY score LIMIT ?")
params.append(args.limit)

sql = "\n".join(sql_parts)

try:
cursor = conn.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError as e:
if "no such table" in str(e):
print("Error: FTS index not found. Run 'python3 scripts/component-indexer.py'")
sys.exit(1)
raise

# Output results
if args.format == "json":
print(json.dumps(results, indent=2, default=str))
elif args.format == "brief":
for r in results:
print(r['id'])
else: # table format
if not results:
print("No results found.")
return

print(f"\nFound {len(results)} results for '{query}':\n")
print(f"{'ID':<50} {'Type':<10} {'Score':<8} Description")
print("-" * 100)
for r in results:
desc = (r['description'] or '')[:40]
print(f"{r['id']:<50} {r['type']:<10} {r['score']:.4f} {desc}")

def cmd_search_related(args): """Find components related to a given component ID.""" component_id = args.related

with get_db_connection(readonly=True) as conn:
# Get outgoing relationships
cursor = conn.execute("""
SELECT target_id, relationship_type, notes
FROM component_relationships
WHERE source_id = ?
""", (component_id,))
outgoing = [dict(row) for row in cursor.fetchall()]

# Get incoming relationships
cursor = conn.execute("""
SELECT source_id, relationship_type, notes
FROM component_relationships
WHERE target_id = ?
""", (component_id,))
incoming = [dict(row) for row in cursor.fetchall()]

print(f"\nRelationships for: {component_id}\n")

if outgoing:
print("Outgoing (this component uses):")
for r in outgoing:
print(f" → {r['target_id']} ({r['relationship_type']})")

if incoming:
print("\nIncoming (uses this component):")
for r in incoming:
print(f" ← {r['source_id']} ({r['relationship_type']})")

if not outgoing and not incoming:
print("No relationships found.")

=============================================================================

Index Command

=============================================================================

def cmd_index(args): """Manage the component index.""" if args.rebuild_fts: rebuild_fts_index() return

if args.verify:
verify_index_integrity()
return

# Full or incremental index - defer to component-indexer.py
import subprocess

cmd = ["python3", str(SCRIPT_DIR / "component-indexer.py")]

if args.full:
# Full rebuild - no special flag, it's the default
pass
else:
cmd.append("--incremental")

if args.type:
# Filter by type - component-indexer doesn't support this yet
print(f"Note: Type filter not supported in indexer. Indexing all.")

if args.verbose:
cmd.append("--verbose")

result = subprocess.run(cmd)
sys.exit(result.returncode)

def rebuild_fts_index(): """Rebuild the FTS5 index.""" print("Rebuilding FTS index...") with get_db_connection() as conn: try: conn.execute( "INSERT INTO component_search(component_search) VALUES('rebuild')" ) conn.commit() print("FTS index rebuilt successfully.") except sqlite3.OperationalError as e: print(f"Error rebuilding FTS index: {e}") sys.exit(1)

def verify_index_integrity(): """Verify index integrity.""" print("Verifying index integrity...")

with get_db_connection(readonly=True) as conn:
errors = []

# Check database integrity
cursor = conn.execute("PRAGMA integrity_check")
result = cursor.fetchone()[0]
if result != "ok":
errors.append(f"Database integrity: {result}")
else:
print(" ✓ Database integrity OK")

# Check FTS integrity
try:
conn.execute(
"INSERT INTO component_search(component_search) VALUES('integrity-check')"
)
print(" ✓ FTS index integrity OK")
except sqlite3.IntegrityError as e:
errors.append(f"FTS integrity: {e}")
except sqlite3.OperationalError:
# Read-only mode, can't run integrity check
print(" ? FTS integrity check skipped (read-only)")

# Check for orphaned records
cursor = conn.execute("""
SELECT COUNT(*) FROM capabilities
WHERE component_id NOT IN (SELECT id FROM components)
""")
orphaned = cursor.fetchone()[0]
if orphaned > 0:
errors.append(f"Orphaned capability records: {orphaned}")
else:
print(" ✓ No orphaned capability records")

# Check for missing files
cursor = conn.execute("SELECT id, path FROM components LIMIT 100")
missing_files = 0
for row in cursor:
path = Path(row['path'])
if not path.exists():
missing_files += 1

if missing_files > 0:
errors.append(f"Missing files (sampled): {missing_files}")
else:
print(" ✓ All sampled files exist")

# Summary
if errors:
print("\nErrors found:")
for e in errors:
print(f" ✗ {e}")
sys.exit(1)
else:
print("\nAll integrity checks passed.")

=============================================================================

Validate Command

=============================================================================

def cmd_validate(args): """Validate component files.""" import yaml

path = Path(args.path) if args.path else ROOT_DIR

errors = []
warnings = []
info = []

# Find component files
if path.is_file():
files = [path]
else:
files = list(path.rglob("*.md"))
if args.type:
# Filter by directory
type_dirs = {
"agent": "agents",
"command": "commands",
"skill": "skills",
"script": "scripts",
"hook": "hooks",
}
if args.type in type_dirs:
files = [f for f in files if type_dirs[args.type] in f.parts]

for file_path in files:
# Skip non-component files
if any(skip in str(file_path) for skip in [
'node_modules', '.git', 'venv', '__pycache__'
]):
continue

try:
content = file_path.read_text()
except Exception as e:
errors.append((file_path, 1, "read_error", str(e)))
continue

# Check frontmatter
if not content.startswith("---"):
# Not a component file, skip
continue

try:
# Parse frontmatter
parts = content.split("---", 2)
if len(parts) >= 3:
frontmatter = yaml.safe_load(parts[1])
else:
frontmatter = {}
except yaml.YAMLError as e:
errors.append((file_path, 1, "yaml_error", str(e)))
continue

# Validate frontmatter
if not frontmatter.get("title"):
warnings.append((file_path, 1, "title_missing", "Missing title field"))

if not frontmatter.get("component_type"):
info.append((file_path, 1, "component_type_missing", "Missing component_type"))

# Check filename
if " " in file_path.name:
errors.append((file_path, 1, "filename_space", "Filename contains spaces"))

if file_path.name != file_path.name.lower():
warnings.append((file_path, 1, "filename_case", "Filename should be lowercase"))

# Output results
total = len(files)

if args.format == "json":
result = {
"total": total,
"errors": len(errors),
"warnings": len(warnings),
"info": len(info),
"issues": [
{"file": str(f), "line": l, "rule": r, "message": m, "severity": "error"}
for f, l, r, m in errors
] + [
{"file": str(f), "line": l, "rule": r, "message": m, "severity": "warning"}
for f, l, r, m in warnings
]
}
print(json.dumps(result, indent=2))
else:
print(f"\nComponent Validation Report")
print("=" * 50)
print(f"Scanned: {total} files\n")

if errors:
print(f"ERRORS ({len(errors)}):")
for f, l, r, m in errors[:20]:
print(f" {f}:{l}")
print(f" ✗ {r}: {m}")

if warnings and args.verbose:
print(f"\nWARNINGS ({len(warnings)}):")
for f, l, r, m in warnings[:20]:
print(f" {f}:{l}")
print(f" ⚠ {r}: {m}")

print(f"\nSummary:")
print(f" Errors: {len(errors)}")
print(f" Warnings: {len(warnings)}")
print(f" Info: {len(info)}")

if errors:
print(f"\nStatus: FAILED")
sys.exit(1)
elif warnings and args.strict:
print(f"\nStatus: FAILED (strict mode)")
sys.exit(2)
else:
print(f"\nStatus: PASSED")

=============================================================================

Export Command

=============================================================================

def cmd_export(args): """Export component data.""" with get_db_connection(readonly=True) as conn: # Build query sql = "SELECT * FROM components" params = []

    if args.type:
sql += " WHERE type = ?"
params.append(args.type)

sql += " ORDER BY type, name"

cursor = conn.execute(sql, params)
components = [dict(row) for row in cursor.fetchall()]

# Add extended data if requested
if args.full or args.include_stats:
for comp in components:
cursor = conn.execute("""
SELECT * FROM component_usage_stats WHERE component_id = ?
""", (comp['id'],))
stats = cursor.fetchone()
comp['usage_stats'] = dict(stats) if stats else None

if args.full or args.include_relationships:
for comp in components:
cursor = conn.execute("""
SELECT target_id, relationship_type
FROM component_relationships WHERE source_id = ?
""", (comp['id'],))
comp['relationships'] = [dict(r) for r in cursor.fetchall()]

# Format output
output = {
"version": "1.0.0",
"exported_at": datetime.now(timezone.utc).isoformat(),
"total_count": len(components),
"components": components
}

if args.format == "json":
result = json.dumps(output, indent=2, default=str)
elif args.format == "jsonl":
result = "\n".join(json.dumps(c, default=str) for c in components)
elif args.format == "csv":
import csv
import io

buf = io.StringIO()
if components:
writer = csv.DictWriter(buf, fieldnames=components[0].keys())
writer.writeheader()
writer.writerows(components)
result = buf.getvalue()
else:
result = json.dumps(output, indent=2, default=str)

# Output
if args.output:
output_path = Path(args.output)

if args.compress:
import gzip
with gzip.open(output_path, 'wt') as f:
f.write(result)
else:
output_path.write_text(result)

print(f"Exported {len(components)} components to {output_path}")
else:
print(result)

=============================================================================

Stats Command

=============================================================================

def cmd_stats(args): """Show database statistics.""" with get_db_connection(readonly=True) as conn: stats = {}

    # Count by type
cursor = conn.execute(
"SELECT type, COUNT(*) as count FROM components GROUP BY type ORDER BY count DESC"
)
stats['by_type'] = {row['type']: row['count'] for row in cursor}
stats['total'] = sum(stats['by_type'].values())

# With LLM bindings
cursor = conn.execute(
"SELECT COUNT(*) FROM components WHERE llm_model IS NOT NULL AND llm_model != ''"
)
stats['with_llm_binding'] = cursor.fetchone()[0]

# By model
cursor = conn.execute("""
SELECT llm_model, COUNT(*) as count
FROM components
WHERE llm_model IS NOT NULL AND llm_model != ''
GROUP BY llm_model
ORDER BY count DESC
""")
stats['by_model'] = {row['llm_model']: row['count'] for row in cursor}

# Database size
cursor = conn.execute("SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()")
stats['db_size_bytes'] = cursor.fetchone()[0]

# FTS records
cursor = conn.execute("SELECT COUNT(*) FROM component_search")
stats['fts_records'] = cursor.fetchone()[0]

# Output
if args.format == "json":
print(json.dumps(stats, indent=2))
else:
print("\nComponent Database Statistics")
print("=" * 50)
print(f"\nTotal Components: {stats['total']}")
print(f"\nBy Type:")
for t, c in stats['by_type'].items():
print(f" {t:<15} {c:>6}")

print(f"\nLLM Bindings: {stats['with_llm_binding']}")
if stats['by_model']:
print(f"\nBy Model:")
for m, c in stats['by_model'].items():
print(f" {m:<15} {c:>6}")

print(f"\nDatabase Size: {stats['db_size_bytes'] / 1024 / 1024:.2f} MB")
print(f"FTS Records: {stats['fts_records']}")

=============================================================================

Maintenance Command

=============================================================================

def cmd_maintenance(args): """Database maintenance operations.""" if args.vacuum: print("Vacuuming database...") with get_db_connection() as conn: conn.execute("VACUUM") print("Vacuum complete.")

if args.analyze:
print("Analyzing database...")
with get_db_connection() as conn:
conn.execute("ANALYZE")
print("Analysis complete.")

if args.backup:
backup_path = Path(args.backup)
print(f"Backing up to {backup_path}...")
with get_db_connection(readonly=True) as conn:
with sqlite3.connect(backup_path) as backup_conn:
conn.backup(backup_conn)
print("Backup complete.")

=============================================================================

Main Entry Point

=============================================================================

def main(): parser = argparse.ArgumentParser( description="CODITECT Component Database CLI", formatter_class=argparse.RawDescriptionHelpFormatter )

subparsers = parser.add_subparsers(dest="command", help="Commands")

# Search command
search_parser = subparsers.add_parser("search", help="Search components")
search_parser.add_argument("query", nargs="?", help="Search query")
search_parser.add_argument("--type", "-t", help="Filter by type")
search_parser.add_argument("--category", "-c", help="Filter by category")
search_parser.add_argument("--model", "-m", help="Filter by LLM model")
search_parser.add_argument("--capability", help="Filter by capability")
search_parser.add_argument("--limit", "-l", type=int, default=20, help="Limit results")
search_parser.add_argument("--format", "-f", choices=["table", "json", "brief"], default="table")
search_parser.add_argument("--semantic", action="store_true", help="Use semantic search")
search_parser.add_argument("--related", help="Find related components")
search_parser.add_argument("--orchestrators", action="store_true", help="List orchestrators")

# Index command
index_parser = subparsers.add_parser("index", help="Manage index")
index_parser.add_argument("--full", action="store_true", help="Full rebuild")
index_parser.add_argument("--incremental", "-i", action="store_true", help="Incremental update")
index_parser.add_argument("--rebuild-fts", action="store_true", help="Rebuild FTS index")
index_parser.add_argument("--verify", action="store_true", help="Verify integrity")
index_parser.add_argument("--type", "-t", help="Index only specific type")
index_parser.add_argument("--verbose", "-v", action="store_true")

# Validate command
validate_parser = subparsers.add_parser("validate", help="Validate components")
validate_parser.add_argument("path", nargs="?", help="Path to validate")
validate_parser.add_argument("--type", "-t", help="Validate only specific type")
validate_parser.add_argument("--strict", action="store_true", help="Fail on warnings")
validate_parser.add_argument("--fix", action="store_true", help="Auto-fix issues")
validate_parser.add_argument("--format", "-f", choices=["text", "json"], default="text")
validate_parser.add_argument("--verbose", "-v", action="store_true")

# Export command
export_parser = subparsers.add_parser("export", help="Export data")
export_parser.add_argument("--format", "-f", choices=["json", "jsonl", "csv", "sql"], default="json")
export_parser.add_argument("--type", "-t", help="Export only specific type")
export_parser.add_argument("--output", "-o", help="Output file")
export_parser.add_argument("--full", action="store_true", help="Include all data")
export_parser.add_argument("--include-stats", action="store_true")
export_parser.add_argument("--include-relationships", action="store_true")
export_parser.add_argument("--compress", action="store_true", help="Gzip output")

# Stats command
stats_parser = subparsers.add_parser("stats", help="Show statistics")
stats_parser.add_argument("--format", "-f", choices=["text", "json"], default="text")

# Maintenance command
maint_parser = subparsers.add_parser("maintenance", help="Maintenance tasks")
maint_parser.add_argument("--vacuum", action="store_true", help="Vacuum database")
maint_parser.add_argument("--analyze", action="store_true", help="Update statistics")
maint_parser.add_argument("--backup", help="Backup to file")

args = parser.parse_args()

if not args.command:
parser.print_help()
sys.exit(1)

# Dispatch
if args.command == "search":
if args.related:
cmd_search_related(args)
elif args.query:
cmd_search(args)
else:
print("Error: Query required for search")
sys.exit(1)
elif args.command == "index":
cmd_index(args)
elif args.command == "validate":
cmd_validate(args)
elif args.command == "export":
cmd_export(args)
elif args.command == "stats":
cmd_stats(args)
elif args.command == "maintenance":
cmd_maintenance(args)

if name == "main": main()