Skip to main content

scripts-semantic-search

#!/usr/bin/env python3 """ CODITECT Semantic Search CLI - Search session messages by meaning.

Queries the sessions.db embeddings table using sentence-transformers to find semantically similar messages. Supports semantic, hybrid (semantic + FTS5), and keyword-only search modes.

Usage: python3 scripts/semantic-search.py "your search query" python3 scripts/semantic-search.py "query" --limit 20 python3 scripts/semantic-search.py "query" --hybrid python3 scripts/semantic-search.py "query" --keyword python3 scripts/semantic-search.py "query" --role assistant python3 scripts/semantic-search.py "query" --llm claude python3 scripts/semantic-search.py "query" --project PILOT python3 scripts/semantic-search.py --stats

Task: J.4.1 - Implement semantic search over messages ADR: ADR-118 (Database Architecture), ADR-122 (Multi-LLM) """

import argparse import json import os import sqlite3 import struct import sys import textwrap import time from pathlib import Path

def _inject_venv(): """Inject venv site-packages into sys.path for numpy/sentence-transformers.""" venv_paths = [ os.path.expanduser("~/.coditect/.venv"), os.path.join(os.path.dirname(os.path.abspath(file)), "..", ".venv"), ] for venv in venv_paths: site_pkg = os.path.join(venv, "lib") if os.path.isdir(site_pkg): for d in sorted(os.listdir(site_pkg), reverse=True): if d.startswith("python3"): sp = os.path.join(site_pkg, d, "site-packages") if os.path.isdir(sp) and sp not in sys.path: sys.path.insert(0, sp) return True return False

_inject_venv()

try: import numpy as np except ImportError: print("ERROR: numpy not installed.", file=sys.stderr) print("Install: source ~/.coditect/.venv/bin/activate && pip install numpy", file=sys.stderr) sys.exit(1)

def get_sessions_db_path(): """Resolve sessions.db path using CODITECT conventions.""" # Try coditect-core paths module try: core_paths = [ os.path.expanduser("~/.coditect"), os.path.join(os.path.dirname(file), ".."), ] for base in core_paths: core_init = os.path.join(base, "scripts", "core", "paths.py") if os.path.exists(core_init): sys.path.insert(0, base) from scripts.core.paths import get_sessions_db_path as _get return _get() except Exception: pass

# Fallback: standard location
candidates = [
os.path.expanduser("~/PROJECTS/.coditect-data/context-storage/sessions.db"),
os.path.expanduser("~/.coditect-data/context-storage/sessions.db"),
]
for path in candidates:
if os.path.exists(path):
return path

return candidates[0] # Return default even if missing

def load_model(): """Load sentence-transformers model (venv already injected at module level).""" try: from sentence_transformers import SentenceTransformer model = SentenceTransformer("all-MiniLM-L6-v2") return model except ImportError: print("ERROR: sentence-transformers not installed.", file=sys.stderr) print("Install: source ~/.coditect/.venv/bin/activate && pip install sentence-transformers", file=sys.stderr) sys.exit(1)

def encode_query(model, query): """Encode query string to embedding vector.""" embedding = model.encode([query], normalize_embeddings=True)[0] return embedding.astype(np.float32)

def blob_to_vector(blob): """Convert SQLite BLOB to numpy float32 array.""" n = len(blob) // 4 return np.array(struct.unpack(f'{n}f', blob), dtype=np.float32)

def cosine_similarity(a, b): """Compute cosine similarity between two vectors.""" dot = np.dot(a, b) norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: return 0.0 return float(dot / (norm_a * norm_b))

def semantic_search(db_path, query_vec, limit=10, threshold=0.3, role_filter=None, llm_filter=None, project_filter=None): """Pure semantic search using cosine similarity over embeddings.""" db = sqlite3.connect(db_path) db.row_factory = sqlite3.Row

# Build filter conditions
conditions = []
params = []
if role_filter:
conditions.append("m.role = ?")
params.append(role_filter)
if llm_filter:
conditions.append("m.llm_source = ?")
params.append(llm_filter)
if project_filter:
conditions.append("m.project_id = ?")
params.append(project_filter)

where_clause = ""
if conditions:
where_clause = "WHERE " + " AND ".join(conditions)

sql = f"""
SELECT e.message_id, e.embedding, m.content, m.role, m.timestamp,
m.llm_source, m.llm_model, m.source_file, m.session_id,
m.project_id, m.content_length
FROM embeddings e
JOIN messages m ON e.message_id = m.id
{where_clause}
"""

results = []
cursor = db.execute(sql, params)

# Process in batches to avoid loading all into memory
batch_size = 10000
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
vec = blob_to_vector(row["embedding"])
sim = cosine_similarity(query_vec, vec)
if sim >= threshold:
results.append({
"message_id": row["message_id"],
"similarity": sim,
"content": row["content"],
"role": row["role"],
"timestamp": row["timestamp"],
"llm_source": row["llm_source"],
"llm_model": row["llm_model"],
"source_file": row["source_file"],
"session_id": row["session_id"],
"project_id": row["project_id"],
"content_length": row["content_length"],
})

db.close()

# Sort by similarity descending
results.sort(key=lambda x: x["similarity"], reverse=True)
return results[:limit]

def keyword_search(db_path, query, limit=10, role_filter=None, llm_filter=None, project_filter=None): """FTS5 keyword search over messages.""" db = sqlite3.connect(db_path) db.row_factory = sqlite3.Row

conditions = []
params = []

if role_filter:
conditions.append("m.role = ?")
params.append(role_filter)
if llm_filter:
conditions.append("m.llm_source = ?")
params.append(llm_filter)
if project_filter:
conditions.append("m.project_id = ?")
params.append(project_filter)

where_extra = ""
if conditions:
where_extra = "AND " + " AND ".join(conditions)

# FTS5 search with BM25 ranking
sql = f"""
SELECT m.id as message_id, m.content, m.role, m.timestamp,
m.llm_source, m.llm_model, m.source_file, m.session_id,
m.project_id, m.content_length,
rank as bm25_score
FROM messages_fts
JOIN messages m ON messages_fts.rowid = m.id
WHERE messages_fts MATCH ?
{where_extra}
ORDER BY rank
LIMIT ?
"""
params = [query] + params + [limit]

try:
rows = db.execute(sql, params).fetchall()
except sqlite3.OperationalError as e:
if "no such table" in str(e):
print("ERROR: FTS5 index not found. Run /cx to rebuild.", file=sys.stderr)
db.close()
return []
raise

results = []
for row in rows:
results.append({
"message_id": row["message_id"],
"similarity": None,
"bm25_score": row["bm25_score"],
"content": row["content"],
"role": row["role"],
"timestamp": row["timestamp"],
"llm_source": row["llm_source"],
"llm_model": row["llm_model"],
"source_file": row["source_file"],
"session_id": row["session_id"],
"project_id": row["project_id"],
"content_length": row["content_length"],
})

db.close()
return results

def hybrid_search(db_path, query, query_vec, limit=10, threshold=0.3, semantic_weight=0.6, fts_weight=0.4, role_filter=None, llm_filter=None, project_filter=None): """Hybrid search combining semantic similarity and FTS5 BM25 via RRF.""" # Get semantic results (more than limit for fusion) sem_results = semantic_search( db_path, query_vec, limit=limit * 3, threshold=threshold, role_filter=role_filter, llm_filter=llm_filter, project_filter=project_filter )

# Get keyword results
kw_results = keyword_search(
db_path, query, limit=limit * 3,
role_filter=role_filter, llm_filter=llm_filter,
project_filter=project_filter
)

# Reciprocal Rank Fusion (RRF)
k = 60 # RRF constant
scores = {}
details = {}

for rank, r in enumerate(sem_results):
mid = r["message_id"]
scores[mid] = scores.get(mid, 0) + semantic_weight / (k + rank + 1)
details[mid] = r

for rank, r in enumerate(kw_results):
mid = r["message_id"]
scores[mid] = scores.get(mid, 0) + fts_weight / (k + rank + 1)
if mid not in details:
details[mid] = r

# Sort by fused score
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]

results = []
for mid, score in ranked:
entry = details[mid].copy()
entry["rrf_score"] = score
# Preserve original similarity if available
results.append(entry)

return results

def get_stats(db_path): """Get database statistics.""" db = sqlite3.connect(db_path) stats = {}

stats["messages"] = db.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
stats["embeddings"] = db.execute("SELECT COUNT(*) FROM embeddings").fetchone()[0]
stats["coverage"] = (
f"{stats['embeddings'] / stats['messages'] * 100:.1f}%"
if stats["messages"] > 0 else "0%"
)

# Model info
row = db.execute("SELECT DISTINCT model FROM embeddings").fetchone()
stats["model"] = row[0] if row else "none"

# Embedding dimensions
row = db.execute("SELECT length(embedding) FROM embeddings LIMIT 1").fetchone()
stats["dimensions"] = row[0] // 4 if row else 0

# Role distribution
roles = db.execute(
"SELECT role, COUNT(*) FROM messages GROUP BY role ORDER BY COUNT(*) DESC"
).fetchall()
stats["roles"] = {r[0]: r[1] for r in roles}

# LLM source distribution
llms = db.execute(
"SELECT COALESCE(llm_source, 'unknown'), COUNT(*) FROM messages GROUP BY llm_source ORDER BY COUNT(*) DESC"
).fetchall()
stats["llm_sources"] = {r[0]: r[1] for r in llms}

# Project distribution (top 10)
projects = db.execute(
"SELECT COALESCE(project_id, 'unscoped'), COUNT(*) FROM messages GROUP BY project_id ORDER BY COUNT(*) DESC LIMIT 10"
).fetchall()
stats["projects"] = {r[0]: r[1] for r in projects}

db.close()
return stats

def format_result(r, index, mode="semantic", snippet_len=200): """Format a single search result for display.""" content = r["content"] or "" # Truncate content for display if len(content) > snippet_len: content = content[:snippet_len] + "..."

# Clean up content for display
content = content.replace("\n", " ").strip()

sim_str = ""
if mode == "semantic" and r.get("similarity") is not None:
sim_str = f" | sim: {r['similarity']:.4f}"
elif mode == "hybrid" and r.get("rrf_score") is not None:
sim_str = f" | rrf: {r['rrf_score']:.6f}"
if r.get("similarity") is not None:
sim_str += f" | sim: {r['similarity']:.4f}"
elif mode == "keyword" and r.get("bm25_score") is not None:
sim_str = f" | bm25: {r['bm25_score']:.2f}"

role = r.get("role", "?")
llm = r.get("llm_source", "?")
ts = r.get("timestamp", "")[:19] if r.get("timestamp") else ""
project = r.get("project_id", "")

header_parts = [f"[{index}]", f"id:{r['message_id']}", f"role:{role}"]
if llm and llm != "?":
header_parts.append(f"llm:{llm}")
if project:
header_parts.append(f"project:{project}")
if ts:
header_parts.append(ts)
header_parts.append(sim_str.strip(" |"))

header = " | ".join(header_parts)

return f" {header}\n {content}\n"

def format_json_results(results, mode, query, elapsed): """Format results as JSON.""" output = { "query": query, "mode": mode, "elapsed_seconds": round(elapsed, 3), "count": len(results), "results": [], } for r in results: entry = { "message_id": r["message_id"], "role": r.get("role"), "content": r.get("content", "")[:500], "timestamp": r.get("timestamp"), "llm_source": r.get("llm_source"), "project_id": r.get("project_id"), } if r.get("similarity") is not None: entry["similarity"] = round(r["similarity"], 4) if r.get("rrf_score") is not None: entry["rrf_score"] = round(r["rrf_score"], 6) if r.get("bm25_score") is not None: entry["bm25_score"] = round(r["bm25_score"], 4) output["results"].append(entry) return json.dumps(output, indent=2)

def main(): parser = argparse.ArgumentParser( description="CODITECT Semantic Search - Search messages by meaning", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=textwrap.dedent(""" Examples: %(prog)s "authentication patterns" %(prog)s "database migration" --hybrid --limit 20 %(prog)s "error handling" --keyword %(prog)s "API design" --role assistant --llm claude %(prog)s "deployment" --project PILOT --limit 5 %(prog)s --stats """) )

parser.add_argument("query", nargs="?", help="Search query")
parser.add_argument("--limit", "-n", type=int, default=10,
help="Maximum results (default: 10)")
parser.add_argument("--threshold", "-t", type=float, default=0.3,
help="Minimum similarity threshold 0-1 (default: 0.3)")

# Search modes
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument("--semantic", action="store_true", default=True,
help="Semantic similarity search (default)")
mode_group.add_argument("--hybrid", action="store_true",
help="Hybrid: semantic + FTS5 with RRF fusion")
mode_group.add_argument("--keyword", action="store_true",
help="FTS5 keyword search only")

# Filters
parser.add_argument("--role", choices=["user", "assistant", "system"],
help="Filter by message role")
parser.add_argument("--llm", help="Filter by LLM source (claude, codex, gemini)")
parser.add_argument("--project", help="Filter by project ID")

# Output
parser.add_argument("--json", action="store_true",
help="Output results as JSON")
parser.add_argument("--snippet", type=int, default=200,
help="Content snippet length (default: 200)")
parser.add_argument("--full", action="store_true",
help="Show full message content (no truncation)")

# Hybrid tuning
parser.add_argument("--semantic-weight", type=float, default=0.6,
help="Semantic weight for hybrid mode (default: 0.6)")
parser.add_argument("--fts-weight", type=float, default=0.4,
help="FTS weight for hybrid mode (default: 0.4)")

# Database
parser.add_argument("--db", help="Path to sessions.db")

# Stats
parser.add_argument("--stats", action="store_true",
help="Show database statistics")

args = parser.parse_args()

# Resolve database path
db_path = args.db or get_sessions_db_path()
if not os.path.exists(db_path):
print(f"ERROR: Database not found: {db_path}", file=sys.stderr)
print("Run /cx to create and populate the database.", file=sys.stderr)
sys.exit(1)

# Stats mode
if args.stats:
stats = get_stats(db_path)
print("=" * 60)
print("CODITECT Semantic Search - Database Statistics")
print("=" * 60)
print(f" Database: {db_path}")
print(f" Messages: {stats['messages']:,}")
print(f" Embeddings: {stats['embeddings']:,}")
print(f" Coverage: {stats['coverage']}")
print(f" Model: {stats['model']}")
print(f" Dimensions: {stats['dimensions']}")
print()
print(" Roles:")
for role, count in stats["roles"].items():
print(f" {role:12s} {count:>10,}")
print()
print(" LLM Sources:")
for llm, count in stats["llm_sources"].items():
print(f" {llm:12s} {count:>10,}")
print()
print(" Projects (top 10):")
for proj, count in stats["projects"].items():
print(f" {proj:30s} {count:>10,}")
print("=" * 60)
return

# Query required for search
if not args.query:
parser.print_help()
sys.exit(1)

# Determine mode
if args.keyword:
mode = "keyword"
elif args.hybrid:
mode = "hybrid"
else:
mode = "semantic"

snippet_len = 999999 if args.full else args.snippet

# Execute search
t0 = time.time()

if mode == "keyword":
results = keyword_search(
db_path, args.query, limit=args.limit,
role_filter=args.role, llm_filter=args.llm,
project_filter=args.project
)
else:
# Load model for semantic/hybrid
print(f"Loading model...", end="", flush=True, file=sys.stderr)
model = load_model()
query_vec = encode_query(model, args.query)
print(f" done ({time.time() - t0:.1f}s)", file=sys.stderr)

if mode == "hybrid":
results = hybrid_search(
db_path, args.query, query_vec, limit=args.limit,
threshold=args.threshold,
semantic_weight=args.semantic_weight,
fts_weight=args.fts_weight,
role_filter=args.role, llm_filter=args.llm,
project_filter=args.project
)
else:
results = semantic_search(
db_path, query_vec, limit=args.limit,
threshold=args.threshold,
role_filter=args.role, llm_filter=args.llm,
project_filter=args.project
)

elapsed = time.time() - t0

# Output
if args.json:
print(format_json_results(results, mode, args.query, elapsed))
return

# Header
print(f"\n{'=' * 60}")
print(f"CODITECT Semantic Search ({mode} mode)")
print(f"{'=' * 60}")
print(f" Query: \"{args.query}\"")
filters = []
if args.role:
filters.append(f"role={args.role}")
if args.llm:
filters.append(f"llm={args.llm}")
if args.project:
filters.append(f"project={args.project}")
if filters:
print(f" Filters: {', '.join(filters)}")
print(f" Results: {len(results)} (limit: {args.limit}, threshold: {args.threshold})")
print(f" Time: {elapsed:.2f}s")
print(f"{'-' * 60}")

if not results:
print(" No results found.")
else:
for i, r in enumerate(results, 1):
print(format_result(r, i, mode=mode, snippet_len=snippet_len))

print(f"{'=' * 60}")

if name == "main": main()