#!/usr/bin/env python3 """ Unified Search Coordinator (H.5.7.4)
Implements multi-database search with scope flags, parallel execution, and Reciprocal Rank Fusion (RRF) result merging.
Part of ADR-103: Four-Database Separation Architecture
Databases (ADR-118 Four-Tier Architecture):
- platform.db - TIER 1: Component metadata (agents, skills, commands)
- org.db - TIER 2: IRREPLACEABLE: decisions, skill_learnings, error_solutions
- sessions.db - TIER 3: REGENERABLE: messages, tool_analytics, token_economics
- projects.db - TIER 4: Project code/docs embeddings
Scopes:
- --framework → platform.db
- --project → projects.db only
- --context → org.db + sessions.db
- --all → All databases
- (default) → org.db + sessions.db + current project
Usage: python3 unified-search.py "query" # Default scope python3 unified-search.py "query" --framework # Framework only python3 unified-search.py "query" --project UUID # Specific project python3 unified-search.py "query" --context # Context only python3 unified-search.py "query" --all # All databases python3 unified-search.py "query" --semantic # Semantic search python3 unified-search.py --stats # Database stats """
import argparse import json import os import sqlite3 import sys import time from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple, Any
=============================================================================
Configuration (ADR-114 Path Discovery)
=============================================================================
Add parent to path for imports
_script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))
ADR-114 & ADR-118: Use centralized path discovery
try: from scripts.core.paths import ( get_context_storage_dir, ORG_DB, SESSIONS_DB, CONTEXT_STORAGE as _CONTEXT_STORAGE, ) CONTEXT_STORAGE = get_context_storage_dir() except ImportError: # Legacy fallback _new_location = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" if _new_location.exists(): CONTEXT_STORAGE = _new_location else: CONTEXT_STORAGE = Path.home() / ".coditect" / "context-storage" ORG_DB = CONTEXT_STORAGE / "org.db" SESSIONS_DB = CONTEXT_STORAGE / "sessions.db"
CODITECT_HOME = CONTEXT_STORAGE.parent # For backward compatibility
Database paths (ADR-118 Four-Tier Database Architecture)
DATABASES = { 'platform': CONTEXT_STORAGE / "platform.db", # TIER 1: Framework index 'org': ORG_DB, # TIER 2: Decisions, errors, learnings 'sessions': SESSIONS_DB, # TIER 3: Messages, tool_analytics # Legacy aliases (during migration) 'context': SESSIONS_DB, # Alias: sessions.db 'platform_index': CONTEXT_STORAGE / "platform.db", # Alias: platform.db }
Default timeout per database (seconds)
DEFAULT_TIMEOUT = 10.0
RRF constant (standard value)
RRF_K = 60
=============================================================================
H.5.7.4.1: Search Scope Determination
=============================================================================
@dataclass class SearchScope: """Defines which databases to search and with what configuration."""
databases: List[str] = field(default_factory=list)
project_uuid: Optional[str] = None
semantic: bool = False
limit: int = 20
threshold: float = 0.3
timeout: float = DEFAULT_TIMEOUT
@classmethod
def framework(cls, **kwargs) -> 'SearchScope':
"""Search framework components only."""
return cls(databases=['platform', 'platform_index'], **kwargs)
@classmethod
def project(cls, project_uuid: str, **kwargs) -> 'SearchScope':
"""Search specific project only."""
return cls(databases=['projects'], project_uuid=project_uuid, **kwargs)
@classmethod
def context(cls, **kwargs) -> 'SearchScope':
"""Search context (sessions, messages) only."""
return cls(databases=['context'], **kwargs)
@classmethod
def all_databases(cls, project_uuid: Optional[str] = None, **kwargs) -> 'SearchScope':
"""Search all databases."""
return cls(
databases=['platform', 'platform_index', 'context', 'projects'],
project_uuid=project_uuid,
**kwargs
)
@classmethod
def default(cls, project_uuid: Optional[str] = None, **kwargs) -> 'SearchScope':
"""Default: context + current project."""
dbs = ['context']
if project_uuid:
dbs.append('projects')
return cls(databases=dbs, project_uuid=project_uuid, **kwargs)
def detect_current_project() -> Optional[str]: """ Detect the current project UUID from environment or .coditect/project.json.
Returns project_uuid if found, None otherwise.
"""
# Check environment variable first
project_uuid = os.environ.get('CODITECT_PROJECT_UUID')
if project_uuid:
return project_uuid
# Check for .coditect/project.json in current directory
project_config = Path.cwd() / ".coditect" / "project.json"
if project_config.exists():
try:
with open(project_config) as f:
config = json.load(f)
return config.get('project_uuid')
except Exception:
pass
# Check projects.db for project matching current path
projects_db = DATABASES['projects']
if projects_db.exists():
try:
conn = sqlite3.connect(str(projects_db))
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT project_uuid FROM projects WHERE project_path = ?",
(str(Path.cwd()),)
)
row = cursor.fetchone()
conn.close()
if row:
return row['project_uuid']
except Exception:
pass
return None
=============================================================================
H.5.7.4.2: Parallel Search Execution
=============================================================================
@dataclass class SearchResult: """Individual search result from any database."""
source: str # 'context', 'project', 'framework'
database: str # Actual database name
content: str
metadata: Dict[str, Any] = field(default_factory=dict)
score: float = 0.0
rank: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
'source': self.source,
'database': self.database,
'content': self.content[:500] if len(self.content) > 500 else self.content,
'metadata': self.metadata,
'score': self.score,
'rank': self.rank,
}
def get_connection(db_name: str) -> Optional[sqlite3.Connection]: """Get database connection with timeout and error handling.""" db_path = DATABASES.get(db_name) if not db_path or not db_path.exists(): return None
try:
conn = sqlite3.connect(str(db_path), timeout=5.0)
conn.row_factory = sqlite3.Row
return conn
except Exception as e:
print(f"Warning: Could not connect to {db_name}: {e}", file=sys.stderr)
return None
def search_context_db(query: str, scope: SearchScope) -> List[SearchResult]: """ Search sessions.db/org.db for messages, decisions, patterns.
ADR-118: 'context' key aliases to sessions.db (Tier 3) for backward compat.
Returns results with source='context'.
"""
results = []
conn = get_connection('context')
if not conn:
return results
try:
# FTS5 search on messages
cursor = conn.execute("""
SELECT m.id, m.content, m.role, m.session_id, m.timestamp,
rank as score
FROM messages_fts fts
JOIN messages m ON fts.rowid = m.id
WHERE messages_fts MATCH ?
ORDER BY rank
LIMIT ?
""", (query, scope.limit))
for row in cursor.fetchall():
results.append(SearchResult(
source='context',
database='context',
content=row['content'] or '',
metadata={
'id': row['id'],
'role': row['role'],
'session_id': row['session_id'],
'timestamp': row['timestamp'],
'type': 'message',
},
score=abs(row['score']) if row['score'] else 0,
))
# Also search decisions
cursor = conn.execute("""
SELECT id, decision, rationale, decision_type, confidence
FROM decisions
WHERE decision LIKE ? OR rationale LIKE ?
ORDER BY confidence DESC
LIMIT ?
""", (f'%{query}%', f'%{query}%', scope.limit // 2))
for row in cursor.fetchall():
results.append(SearchResult(
source='context',
database='context',
content=f"{row['decision']}\n\nRationale: {row['rationale'] or 'N/A'}",
metadata={
'id': row['id'],
'type': 'decision',
'decision_type': row['decision_type'],
'confidence': row['confidence'],
},
score=row['confidence'] if row['confidence'] else 0.5,
))
except sqlite3.OperationalError as e:
# FTS table might not exist
if 'no such table' in str(e):
# Fall back to LIKE search
cursor = conn.execute("""
SELECT id, content, role, session_id, timestamp
FROM messages
WHERE content LIKE ?
ORDER BY timestamp DESC
LIMIT ?
""", (f'%{query}%', scope.limit))
for row in cursor.fetchall():
results.append(SearchResult(
source='context',
database='context',
content=row['content'] or '',
metadata={
'id': row['id'],
'role': row['role'],
'session_id': row['session_id'],
'timestamp': row['timestamp'],
'type': 'message',
},
score=0.5,
))
except Exception as e:
print(f"Warning: Context search error: {e}", file=sys.stderr)
finally:
conn.close()
return results
def search_platform_db(query: str, scope: SearchScope) -> List[SearchResult]: """ Search platform.db for components (agents, skills, commands).
Returns results with source='framework'.
"""
results = []
conn = get_connection('platform')
if not conn:
return results
try:
# Check if FTS table exists
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='components_fts'
""")
has_fts = cursor.fetchone() is not None
if has_fts:
cursor = conn.execute("""
SELECT c.id, c.name, c.type, c.description, c.path,
c.status, rank as score
FROM components_fts fts
JOIN components c ON fts.rowid = c.id
WHERE components_fts MATCH ?
ORDER BY rank
LIMIT ?
""", (query, scope.limit))
else:
cursor = conn.execute("""
SELECT id, name, type, description, path, status
FROM components
WHERE name LIKE ? OR description LIKE ?
ORDER BY name
LIMIT ?
""", (f'%{query}%', f'%{query}%', scope.limit))
for row in cursor.fetchall():
results.append(SearchResult(
source='framework',
database='platform',
content=f"{row['name']}: {row['description'] or 'No description'}",
metadata={
'id': row['id'],
'name': row['name'],
'type': row['type'],
'path': row['path'],
'status': row['status'],
'component_type': row['type'],
},
score=abs(row['score']) if 'score' in row.keys() and row['score'] else 0.5,
))
except Exception as e:
print(f"Warning: Platform search error: {e}", file=sys.stderr)
finally:
conn.close()
return results
def search_platform_index_db(query: str, scope: SearchScope) -> List[SearchResult]: """ Semantic search on platform-index.db using embeddings.
Returns results with source='framework'.
"""
results = []
if not scope.semantic:
return results # Only used for semantic search
conn = get_connection('platform_index')
if not conn:
return results
try:
# Generate query embedding
query_embedding = generate_query_embedding(query)
if query_embedding is None:
conn.close()
return results
# Get all embeddings and compute similarity
cursor = conn.execute("""
SELECT component_id, embedding, content_preview
FROM component_embeddings
WHERE embedding IS NOT NULL
""")
import numpy as np
query_vec = np.frombuffer(query_embedding, dtype=np.float32)
similarities = []
for row in cursor.fetchall():
emb = np.frombuffer(row['embedding'], dtype=np.float32)
# Cosine similarity
similarity = np.dot(query_vec, emb) / (np.linalg.norm(query_vec) * np.linalg.norm(emb) + 1e-8)
if similarity >= scope.threshold:
similarities.append((row['component_id'], row['content_preview'], float(similarity)))
# Sort by similarity and take top results
similarities.sort(key=lambda x: x[2], reverse=True)
for comp_id, preview, sim in similarities[:scope.limit]:
results.append(SearchResult(
source='framework',
database='platform_index',
content=preview or '',
metadata={
'component_id': comp_id,
'similarity': sim,
'type': 'semantic',
},
score=sim,
))
except Exception as e:
print(f"Warning: Platform index search error: {e}", file=sys.stderr)
finally:
conn.close()
return results
def search_projects_db(query: str, scope: SearchScope) -> List[SearchResult]: """ Search projects.db for project code and documents.
Returns results with source='project'.
"""
results = []
conn = get_connection('projects')
if not conn:
return results
try:
# Build query based on scope
if scope.project_uuid:
# Search specific project
if scope.semantic:
results.extend(search_projects_semantic(conn, query, scope))
else:
cursor = conn.execute("""
SELECT pe.file_path, pe.content_preview, pe.content_type,
pe.chunk_index, pe.metadata, p.project_name
FROM project_embeddings pe
JOIN projects p ON pe.project_uuid = p.project_uuid
WHERE pe.project_uuid = ? AND pe.content_preview LIKE ?
ORDER BY pe.file_path
LIMIT ?
""", (scope.project_uuid, f'%{query}%', scope.limit))
for row in cursor.fetchall():
results.append(SearchResult(
source='project',
database='projects',
content=row['content_preview'] or '',
metadata={
'file_path': row['file_path'],
'content_type': row['content_type'],
'chunk_index': row['chunk_index'],
'project_name': row['project_name'],
'chunk_metadata': json.loads(row['metadata']) if row['metadata'] else {},
},
score=0.5,
))
else:
# Search all projects
cursor = conn.execute("""
SELECT pe.file_path, pe.content_preview, pe.content_type,
pe.project_uuid, pe.chunk_index, pe.metadata, p.project_name
FROM project_embeddings pe
JOIN projects p ON pe.project_uuid = p.project_uuid
WHERE pe.content_preview LIKE ?
ORDER BY pe.file_path
LIMIT ?
""", (f'%{query}%', scope.limit))
for row in cursor.fetchall():
results.append(SearchResult(
source='project',
database='projects',
content=row['content_preview'] or '',
metadata={
'file_path': row['file_path'],
'content_type': row['content_type'],
'project_uuid': row['project_uuid'],
'project_name': row['project_name'],
'chunk_index': row['chunk_index'],
},
score=0.5,
))
except Exception as e:
print(f"Warning: Projects search error: {e}", file=sys.stderr)
finally:
conn.close()
return results
def search_projects_semantic(conn: sqlite3.Connection, query: str, scope: SearchScope) -> List[SearchResult]: """Semantic search on projects.db embeddings.""" results = []
try:
query_embedding = generate_query_embedding(query)
if query_embedding is None:
return results
# Build query based on project scope
if scope.project_uuid:
cursor = conn.execute("""
SELECT pe.file_path, pe.content_preview, pe.content_type,
pe.embedding, pe.metadata, p.project_name
FROM project_embeddings pe
JOIN projects p ON pe.project_uuid = p.project_uuid
WHERE pe.project_uuid = ? AND pe.embedding IS NOT NULL
""", (scope.project_uuid,))
else:
cursor = conn.execute("""
SELECT pe.file_path, pe.content_preview, pe.content_type,
pe.embedding, pe.metadata, pe.project_uuid, p.project_name
FROM project_embeddings pe
JOIN projects p ON pe.project_uuid = p.project_uuid
WHERE pe.embedding IS NOT NULL
""")
import numpy as np
query_vec = np.frombuffer(query_embedding, dtype=np.float32)
similarities = []
for row in cursor.fetchall():
emb = np.frombuffer(row['embedding'], dtype=np.float32)
similarity = np.dot(query_vec, emb) / (np.linalg.norm(query_vec) * np.linalg.norm(emb) + 1e-8)
if similarity >= scope.threshold:
similarities.append((row, float(similarity)))
similarities.sort(key=lambda x: x[1], reverse=True)
for row, sim in similarities[:scope.limit]:
results.append(SearchResult(
source='project',
database='projects',
content=row['content_preview'] or '',
metadata={
'file_path': row['file_path'],
'content_type': row['content_type'],
'project_name': row['project_name'],
'similarity': sim,
},
score=sim,
))
except Exception as e:
print(f"Warning: Projects semantic search error: {e}", file=sys.stderr)
return results
def generate_query_embedding(query: str) -> Optional[bytes]: """Generate embedding for search query.""" try: from sentence_transformers import SentenceTransformer import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
embedding = model.encode(query, convert_to_numpy=True)
return embedding.astype(np.float32).tobytes()
except ImportError:
print("Warning: sentence-transformers not installed for semantic search", file=sys.stderr)
return None
except Exception as e:
print(f"Warning: Embedding generation error: {e}", file=sys.stderr)
return None
Database search function mapping
SEARCH_FUNCTIONS = { 'context': search_context_db, 'platform': search_platform_db, 'platform_index': search_platform_index_db, 'projects': search_projects_db, }
def execute_parallel_search(query: str, scope: SearchScope) -> Dict[str, List[SearchResult]]: """ Execute searches across multiple databases in parallel.
Uses ThreadPoolExecutor for concurrent queries with per-database timeout.
Returns dict mapping database name to list of results.
"""
results = {}
# Determine which search functions to use
search_tasks = []
for db_name in scope.databases:
if db_name in SEARCH_FUNCTIONS:
search_tasks.append((db_name, SEARCH_FUNCTIONS[db_name]))
if not search_tasks:
return results
# Execute in parallel
with ThreadPoolExecutor(max_workers=len(search_tasks)) as executor:
futures = {}
for db_name, search_func in search_tasks:
future = executor.submit(search_func, query, scope)
futures[future] = db_name
for future in futures:
db_name = futures[future]
try:
db_results = future.result(timeout=scope.timeout)
results[db_name] = db_results
except FuturesTimeoutError:
print(f"Warning: {db_name} search timed out after {scope.timeout}s", file=sys.stderr)
results[db_name] = []
except Exception as e:
print(f"Warning: {db_name} search failed: {e}", file=sys.stderr)
results[db_name] = []
return results
=============================================================================
H.5.7.4.3: RRF Result Merging
=============================================================================
def compute_rrf_score(rank: int, k: int = RRF_K) -> float: """ Compute Reciprocal Rank Fusion score.
RRF(d) = sum(1 / (k + r(d))) across all ranking lists
where r(d) is the rank of document d in each list.
"""
return 1.0 / (k + rank)
def merge_results_rrf( results_by_db: Dict[str, List[SearchResult]], limit: int = 20 ) -> List[SearchResult]: """ Merge results from multiple databases using Reciprocal Rank Fusion.
RRF is a simple but effective fusion method that:
1. Ranks results within each database by score
2. Computes RRF score based on rank
3. Sums RRF scores for duplicate results across databases
4. Returns final ranked list
Args:
results_by_db: Dict mapping database name to list of results
limit: Maximum results to return
Returns:
Merged and ranked list of SearchResult objects
"""
# Assign ranks within each database
for db_name, db_results in results_by_db.items():
# Sort by score descending
db_results.sort(key=lambda r: r.score, reverse=True)
# Assign ranks (1-indexed)
for i, result in enumerate(db_results):
result.rank = i + 1
# Compute RRF scores
# Key: (source, content_hash) to identify "same" results
rrf_scores: Dict[str, Tuple[SearchResult, float]] = {}
for db_name, db_results in results_by_db.items():
for result in db_results:
# Create a key for deduplication
# Use first 200 chars of content as a simple hash
content_key = result.content[:200] if result.content else ''
key = f"{result.source}:{hash(content_key)}"
rrf_score = compute_rrf_score(result.rank)
if key in rrf_scores:
# Add to existing RRF score (document appears in multiple lists)
existing_result, existing_score = rrf_scores[key]
rrf_scores[key] = (existing_result, existing_score + rrf_score)
else:
rrf_scores[key] = (result, rrf_score)
# Sort by RRF score and return top results
merged = sorted(rrf_scores.values(), key=lambda x: x[1], reverse=True)
final_results = []
for result, rrf_score in merged[:limit]:
result.score = rrf_score # Replace original score with RRF score
final_results.append(result)
return final_results
def group_results_by_source(results: List[SearchResult]) -> Dict[str, List[SearchResult]]: """ Group results by source category.
Returns:
Dict with keys: 'context', 'project', 'framework'
"""
groups = {
'context': [],
'project': [],
'framework': [],
}
for result in results:
if result.source in groups:
groups[result.source].append(result)
return groups
=============================================================================
Main Search Function
=============================================================================
def unified_search( query: str, scope: SearchScope, group_by_source: bool = True ) -> Dict[str, Any]: """ Main unified search function.
Args:
query: Search query
scope: SearchScope defining which databases to search
group_by_source: If True, group results by source category
Returns:
Dict with:
- 'results': List of SearchResult or dict grouped by source
- 'stats': Search statistics
"""
start_time = time.time()
# Execute parallel search
results_by_db = execute_parallel_search(query, scope)
# Merge using RRF
merged_results = merge_results_rrf(results_by_db, scope.limit)
# Build statistics
stats = {
'query': query,
'scope': {
'databases': scope.databases,
'project_uuid': scope.project_uuid,
'semantic': scope.semantic,
},
'databases_searched': list(results_by_db.keys()),
'results_per_database': {db: len(r) for db, r in results_by_db.items()},
'total_results': len(merged_results),
'elapsed_time': time.time() - start_time,
}
if group_by_source:
grouped = group_results_by_source(merged_results)
return {
'results': grouped,
'stats': stats,
}
else:
return {
'results': merged_results,
'stats': stats,
}
=============================================================================
Statistics
=============================================================================
def get_database_stats() -> Dict[str, Any]: """Get statistics for all databases.""" stats = {}
for db_name, db_path in DATABASES.items():
db_stats = {
'path': str(db_path),
'exists': db_path.exists(),
'size_bytes': 0,
'tables': [],
'row_counts': {},
}
if db_path.exists():
db_stats['size_bytes'] = db_path.stat().st_size
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
# Get tables
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
tables = [row['name'] for row in cursor.fetchall()]
db_stats['tables'] = tables
# Get row counts for key tables
for table in tables[:10]: # Limit to first 10 tables
try:
cursor = conn.execute(f"SELECT COUNT(*) as count FROM {table}")
db_stats['row_counts'][table] = cursor.fetchone()['count']
except Exception:
pass
conn.close()
except Exception as e:
db_stats['error'] = str(e)
stats[db_name] = db_stats
return stats
=============================================================================
CLI
=============================================================================
def format_results(search_results: Dict[str, Any], output_format: str = 'text') -> str: """Format search results for display.""" if output_format == 'json': # Convert SearchResult objects to dicts output = { 'stats': search_results['stats'], 'results': {}, } if isinstance(search_results['results'], dict): for source, results in search_results['results'].items(): output['results'][source] = [r.to_dict() for r in results] else: output['results'] = [r.to_dict() for r in search_results['results']] return json.dumps(output, indent=2)
# Text format
lines = []
stats = search_results['stats']
lines.append("=" * 60)
lines.append(f"Unified Search Results")
lines.append("=" * 60)
lines.append(f"Query: {stats['query']}")
lines.append(f"Databases: {', '.join(stats['databases_searched'])}")
lines.append(f"Total Results: {stats['total_results']}")
lines.append(f"Time: {stats['elapsed_time']:.2f}s")
lines.append("")
if isinstance(search_results['results'], dict):
# Grouped by source
for source, results in search_results['results'].items():
if results:
source_label = source.upper()
lines.append(f"[{source_label}] ({len(results)} results)")
lines.append("-" * 40)
for i, result in enumerate(results[:10]): # Limit display
preview = result.content[:200].replace('\n', ' ')
if len(result.content) > 200:
preview += '...'
lines.append(f" {i+1}. (score: {result.score:.3f})")
# Show metadata based on type
if result.metadata.get('type') == 'message':
lines.append(f" Role: {result.metadata.get('role', 'unknown')}")
elif result.metadata.get('type') == 'decision':
lines.append(f" Type: {result.metadata.get('decision_type', 'unknown')}")
elif result.metadata.get('file_path'):
lines.append(f" File: {result.metadata.get('file_path')}")
elif result.metadata.get('path'):
lines.append(f" Path: {result.metadata.get('path')}")
elif result.metadata.get('name'):
lines.append(f" Component: {result.metadata.get('name')} ({result.metadata.get('component_type', 'unknown')})")
lines.append(f" {preview}")
lines.append("")
else:
# Flat list
for i, result in enumerate(search_results['results'][:20]):
preview = result.content[:200].replace('\n', ' ')
if len(result.content) > 200:
preview += '...'
lines.append(f"{i+1}. [{result.source.upper()}] (score: {result.score:.3f})")
lines.append(f" {preview}")
lines.append("")
return '\n'.join(lines)
def main(): parser = argparse.ArgumentParser( description="Unified Search Coordinator (H.5.7.4)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Default search (context + current project) python3 unified-search.py "authentication"
# Framework search (components only)
python3 unified-search.py "database" --framework
# Project search
python3 unified-search.py "api" --project UUID
# Context search (sessions/messages)
python3 unified-search.py "error" --context
# All databases
python3 unified-search.py "function" --all
# Semantic search
python3 unified-search.py "how to implement auth" --semantic
# Database statistics
python3 unified-search.py --stats
""" )
parser.add_argument('query', nargs='?', help='Search query')
# Scope flags (mutually exclusive)
scope_group = parser.add_mutually_exclusive_group()
scope_group.add_argument('--framework', action='store_true',
help='Search framework components (platform.db + platform-index.db)')
scope_group.add_argument('--project', metavar='UUID',
help='Search specific project (projects.db)')
scope_group.add_argument('--context', action='store_true',
help='Search context only (org.db + sessions.db)')
scope_group.add_argument('--all', action='store_true',
help='Search all databases')
# Search options
parser.add_argument('--semantic', action='store_true',
help='Use semantic (embedding-based) search')
parser.add_argument('--limit', type=int, default=20,
help='Maximum results (default: 20)')
parser.add_argument('--threshold', type=float, default=0.3,
help='Similarity threshold for semantic search (default: 0.3)')
parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT,
help=f'Timeout per database in seconds (default: {DEFAULT_TIMEOUT})')
# Output options
parser.add_argument('--json', action='store_true',
help='Output as JSON')
parser.add_argument('--flat', action='store_true',
help='Flat result list (no grouping by source)')
# Info
parser.add_argument('--stats', action='store_true',
help='Show database statistics')
args = parser.parse_args()
# Handle --stats
if args.stats:
stats = get_database_stats()
if args.json:
print(json.dumps(stats, indent=2))
else:
print("\n" + "=" * 60)
print("Four-Database Architecture Statistics (ADR-103)")
print("=" * 60)
for db_name, db_stats in stats.items():
status = "✓" if db_stats['exists'] else "✗"
size_kb = db_stats['size_bytes'] / 1024
print(f"\n{status} {db_name}")
print(f" Path: {db_stats['path']}")
if db_stats['exists']:
print(f" Size: {size_kb:.1f} KB")
print(f" Tables: {len(db_stats['tables'])}")
if db_stats['row_counts']:
print(f" Key tables:")
for table, count in list(db_stats['row_counts'].items())[:5]:
print(f" - {table}: {count} rows")
else:
print(" Status: Not initialized")
return 0
# Require query for search
if not args.query:
parser.error("Query required (or use --stats)")
# Determine scope
common_opts = {
'semantic': args.semantic,
'limit': args.limit,
'threshold': args.threshold,
'timeout': args.timeout,
}
if args.framework:
scope = SearchScope.framework(**common_opts)
elif args.project:
scope = SearchScope.project(args.project, **common_opts)
elif args.context:
scope = SearchScope.context(**common_opts)
elif args.all:
project_uuid = detect_current_project()
scope = SearchScope.all_databases(project_uuid, **common_opts)
else:
# Default: context + current project
project_uuid = detect_current_project()
scope = SearchScope.default(project_uuid, **common_opts)
# Execute search
results = unified_search(args.query, scope, group_by_source=not args.flat)
# Format and print results
output_format = 'json' if args.json else 'text'
print(format_results(results, output_format))
return 0
if name == 'main': sys.exit(main())