#!/usr/bin/env python3 """ Project Embedder (J.15.4)
Generates semantic embeddings for project content with content-type-specific chunking strategies. Stores embeddings in projects.db for semantic search.
Tasks:
- J.15.4.1: Chunking strategies per content type
- J.15.4.2: Generate embeddings for project code/docs
- J.15.4.3: Store embeddings in project_embeddings table
- J.15.4.4: Implement hash-based invalidation
Usage: python3 scripts/project_embedder.py --project my-project python3 scripts/project_embedder.py --project-path /path/to/project python3 scripts/project_embedder.py --project my-project --dry-run python3 scripts/project_embedder.py --project my-project --reembed-all
Created: 2026-02-05 Author: Claude (Opus 4.5) """
import hashlib import json import os import re import sqlite3 import sys from dataclasses import dataclass, field from pathlib import Path from typing import Dict, Generator, List, Optional, Set, Tuple
Add parent to path for imports
sys.path.insert(0, str(Path(file).parent.parent))
try: from scripts.core.paths import get_projects_db_path except ImportError: def get_projects_db_path() -> Path: home = Path.home() candidates = [ home / "PROJECTS" / ".coditect-data" / "context-storage" / "projects.db", home / ".coditect-data" / "context-storage" / "projects.db", ] for c in candidates: if c.exists(): return c return candidates[0]
=============================================================================
Configuration
=============================================================================
Default embedding model
DEFAULT_MODEL = "all-MiniLM-L6-v2" DEFAULT_EMBEDDING_DIM = 384
Chunk sizes by content type (in characters, roughly tokens * 4)
CHUNK_SIZES = { "code": 2000, # ~500 tokens - function/class sized "document": 1500, # ~375 tokens - paragraph sized "config": 1000, # ~250 tokens - config blocks "data": 500, # ~125 tokens - data samples "default": 1500, }
Overlap percentages (for sliding window)
CHUNK_OVERLAPS = { "code": 0.1, # 10% overlap for code context "document": 0.15, # 15% overlap for prose continuity "config": 0.05, # 5% overlap for config "data": 0.0, # No overlap for data "default": 0.1, }
File extensions to content types (subset of important files)
EMBEDDABLE_EXTENSIONS = { # Code ".py": "code", ".js": "code", ".ts": "code", ".tsx": "code", ".jsx": "code", ".go": "code", ".rs": "code", ".java": "code", ".rb": "code", ".php": "code", ".c": "code", ".cpp": "code", ".h": "code", ".hpp": "code", ".cs": "code", ".swift": "code", ".kt": "code", ".scala": "code", ".sh": "code", ".bash": "code", ".zsh": "code", # Documents ".md": "document", ".rst": "document", ".txt": "document", ".adoc": "document", # Config ".json": "config", ".yaml": "config", ".yml": "config", ".toml": "config", ".ini": "config", ".cfg": "config", ".env": "config", }
Language detection from extensions
LANGUAGE_MAP = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".tsx": "typescript-react", ".jsx": "javascript-react", ".go": "go", ".rs": "rust", ".java": "java", ".rb": "ruby", ".php": "php", ".c": "c", ".cpp": "cpp", ".h": "c-header", ".hpp": "cpp-header", ".cs": "csharp", ".swift": "swift", ".kt": "kotlin", ".scala": "scala", ".sh": "bash", ".bash": "bash", ".zsh": "zsh", ".md": "markdown", ".json": "json", ".yaml": "yaml", ".yml": "yaml", }
=============================================================================
Data Classes
=============================================================================
@dataclass class Chunk: """Represents a text chunk for embedding.""" text: str chunk_index: int start_line: int end_line: int content_type: str language: Optional[str] = None chunk_hash: str = ""
def __post_init__(self):
if not self.chunk_hash:
self.chunk_hash = hashlib.sha256(self.text.encode()).hexdigest()
@dataclass class EmbeddingResult: """Result of embedding a file.""" file_path: str content_hash_id: int chunks_generated: int chunks_skipped: int # Already exist (hash match) chunks_embedded: int error: Optional[str] = None
=============================================================================
Chunking Strategies (J.15.4.1)
=============================================================================
class ChunkingStrategy: """Base class for chunking strategies."""
def __init__(self, content_type: str):
self.content_type = content_type
self.chunk_size = CHUNK_SIZES.get(content_type, CHUNK_SIZES["default"])
self.overlap = CHUNK_OVERLAPS.get(content_type, CHUNK_OVERLAPS["default"])
def chunk(self, content: str, language: Optional[str] = None) -> Generator[Chunk, None, None]:
"""Generate chunks from content."""
raise NotImplementedError
class CodeChunker(ChunkingStrategy): """ Chunks code files using semantic boundaries.
Strategy:
1. Try to split on function/class definitions
2. Fall back to logical blocks (blank lines)
3. Final fallback to character-based sliding window
"""
# Regex patterns for code boundaries
FUNCTION_PATTERNS = {
"python": r'^(def |class |async def )',
"javascript": r'^(function |const |let |var |class |export )',
"typescript": r'^(function |const |let |var |class |export |interface |type )',
"go": r'^(func |type )',
"rust": r'^(fn |struct |enum |impl |trait |pub fn |pub struct )',
"java": r'^(public |private |protected |class |interface )',
"default": r'^(def |func |function |class |struct |impl )',
}
def __init__(self):
super().__init__("code")
def chunk(self, content: str, language: Optional[str] = None) -> Generator[Chunk, None, None]:
lines = content.split('\n')
if not lines:
return
# Get function pattern for language
pattern_key = language if language in self.FUNCTION_PATTERNS else "default"
func_pattern = re.compile(self.FUNCTION_PATTERNS[pattern_key], re.MULTILINE)
# Find function/class boundaries
boundaries = []
for i, line in enumerate(lines):
if func_pattern.match(line.lstrip()):
boundaries.append(i)
# Add end boundary
boundaries.append(len(lines))
# If no boundaries found, use blank line splitting
if len(boundaries) <= 1:
yield from self._chunk_by_blank_lines(lines, language)
return
# Generate chunks from boundaries
chunk_idx = 0
current_text = []
current_start = 0
current_chars = 0
for i, line_num in enumerate(boundaries[:-1]):
next_boundary = boundaries[i + 1]
block_lines = lines[line_num:next_boundary]
block_text = '\n'.join(block_lines)
block_chars = len(block_text)
# If adding this block exceeds chunk size, emit current chunk
if current_chars + block_chars > self.chunk_size and current_text:
chunk_text = '\n'.join(current_text)
yield Chunk(
text=chunk_text,
chunk_index=chunk_idx,
start_line=current_start + 1, # 1-indexed
end_line=line_num,
content_type="code",
language=language
)
chunk_idx += 1
current_text = []
current_start = line_num
current_chars = 0
current_text.extend(block_lines)
current_chars += block_chars
# Emit remaining content
if current_text:
chunk_text = '\n'.join(current_text)
yield Chunk(
text=chunk_text,
chunk_index=chunk_idx,
start_line=current_start + 1,
end_line=len(lines),
content_type="code",
language=language
)
def _chunk_by_blank_lines(self, lines: List[str], language: Optional[str]) -> Generator[Chunk, None, None]:
"""Fallback chunking by blank lines."""
chunk_idx = 0
current_text = []
current_start = 0
current_chars = 0
for i, line in enumerate(lines):
line_chars = len(line) + 1 # +1 for newline
# If blank line and chunk is big enough, emit
if not line.strip() and current_chars > self.chunk_size * 0.3:
if current_text:
chunk_text = '\n'.join(current_text)
yield Chunk(
text=chunk_text,
chunk_index=chunk_idx,
start_line=current_start + 1,
end_line=i,
content_type="code",
language=language
)
chunk_idx += 1
current_text = []
current_start = i + 1
current_chars = 0
continue
# If exceeding max size, force emit
if current_chars + line_chars > self.chunk_size * 1.5 and current_text:
chunk_text = '\n'.join(current_text)
yield Chunk(
text=chunk_text,
chunk_index=chunk_idx,
start_line=current_start + 1,
end_line=i,
content_type="code",
language=language
)
chunk_idx += 1
current_text = []
current_start = i
current_chars = 0
current_text.append(line)
current_chars += line_chars
# Emit remaining
if current_text:
chunk_text = '\n'.join(current_text)
yield Chunk(
text=chunk_text,
chunk_index=chunk_idx,
start_line=current_start + 1,
end_line=len(lines),
content_type="code",
language=language
)
class DocumentChunker(ChunkingStrategy): """ Chunks document files using paragraph/section boundaries.
Strategy:
1. Split on headers (# ## ###)
2. Split on paragraph breaks (double newline)
3. Sliding window for very long sections
"""
def __init__(self):
super().__init__("document")
def chunk(self, content: str, language: Optional[str] = None) -> Generator[Chunk, None, None]:
lines = content.split('\n')
if not lines:
return
# Find header boundaries for markdown
header_pattern = re.compile(r'^#{1,6}\s')
boundaries = [0]
for i, line in enumerate(lines):
if header_pattern.match(line):
if i > 0:
boundaries.append(i)
boundaries.append(len(lines))
chunk_idx = 0
for i in range(len(boundaries) - 1):
start = boundaries[i]
end = boundaries[i + 1]
section_lines = lines[start:end]
section_text = '\n'.join(section_lines)
# If section fits in one chunk
if len(section_text) <= self.chunk_size:
yield Chunk(
text=section_text,
chunk_index=chunk_idx,
start_line=start + 1,
end_line=end,
content_type="document",
language=language or "markdown"
)
chunk_idx += 1
else:
# Split section by paragraphs
paragraphs = re.split(r'\n\n+', section_text)
current_text = []
current_chars = 0
para_start = start
for para in paragraphs:
para_chars = len(para)
if current_chars + para_chars > self.chunk_size and current_text:
yield Chunk(
text='\n\n'.join(current_text),
chunk_index=chunk_idx,
start_line=para_start + 1,
end_line=para_start + sum(t.count('\n') for t in current_text),
content_type="document",
language=language or "markdown"
)
chunk_idx += 1
current_text = []
current_chars = 0
para_start = start + section_text[:section_text.index(para)].count('\n')
current_text.append(para)
current_chars += para_chars
if current_text:
yield Chunk(
text='\n\n'.join(current_text),
chunk_index=chunk_idx,
start_line=para_start + 1,
end_line=end,
content_type="document",
language=language or "markdown"
)
chunk_idx += 1
class ConfigChunker(ChunkingStrategy): """ Chunks config files using top-level keys/sections. """
def __init__(self):
super().__init__("config")
def chunk(self, content: str, language: Optional[str] = None) -> Generator[Chunk, None, None]:
# For JSON/YAML, try to keep top-level objects together
lines = content.split('\n')
if not lines:
return
chunk_idx = 0
current_text = []
current_start = 0
current_chars = 0
for i, line in enumerate(lines):
line_chars = len(line) + 1
# Emit if exceeding chunk size
if current_chars + line_chars > self.chunk_size and current_text:
chunk_text = '\n'.join(current_text)
yield Chunk(
text=chunk_text,
chunk_index=chunk_idx,
start_line=current_start + 1,
end_line=i,
content_type="config",
language=language
)
chunk_idx += 1
current_text = []
current_start = i
current_chars = 0
current_text.append(line)
current_chars += line_chars
if current_text:
chunk_text = '\n'.join(current_text)
yield Chunk(
text=chunk_text,
chunk_index=chunk_idx,
start_line=current_start + 1,
end_line=len(lines),
content_type="config",
language=language
)
def get_chunker(content_type: str) -> ChunkingStrategy: """Factory function to get appropriate chunker.""" chunkers = { "code": CodeChunker, "document": DocumentChunker, "config": ConfigChunker, } chunker_class = chunkers.get(content_type, ConfigChunker) return chunker_class()
=============================================================================
Embedding Generation (J.15.4.2)
=============================================================================
class EmbeddingGenerator: """Generates embeddings using SentenceTransformers."""
def __init__(self, model_name: str = DEFAULT_MODEL):
self.model_name = model_name
self.model = None
self.embedding_dim = DEFAULT_EMBEDDING_DIM
self._available = None
@property
def available(self) -> bool:
"""Check if sentence-transformers is available."""
if self._available is None:
try:
from sentence_transformers import SentenceTransformer
self._available = True
except ImportError:
self._available = False
return self._available
def _load_model(self):
"""Lazy-load the embedding model."""
if self.model is None and self.available:
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(self.model_name)
# Update embedding dim from model
self.embedding_dim = self.model.get_sentence_embedding_dimension()
def embed(self, texts: List[str]) -> List[bytes]:
"""
Generate embeddings for a list of texts.
Returns embeddings as bytes (numpy array serialized).
"""
if not self.available:
raise RuntimeError("sentence-transformers not installed. Run: pip install sentence-transformers")
self._load_model()
import numpy as np
# Generate embeddings
embeddings = self.model.encode(texts, convert_to_numpy=True)
# Convert to bytes for storage
return [emb.astype(np.float32).tobytes() for emb in embeddings]
def embed_single(self, text: str) -> bytes:
"""Generate embedding for a single text."""
return self.embed([text])[0]
=============================================================================
Storage (J.15.4.3)
=============================================================================
class EmbeddingStorage: """Stores embeddings in projects.db."""
def __init__(self, db_path: Optional[Path] = None):
self.db_path = db_path or get_projects_db_path()
def get_existing_chunk_hashes(self, content_hash_id: int) -> Set[str]:
"""Get existing chunk hashes for a file."""
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
try:
cursor.execute("""
SELECT chunk_hash FROM project_embeddings
WHERE content_hash_id = ?
""", (content_hash_id,))
return {row[0] for row in cursor.fetchall()}
finally:
conn.close()
def delete_file_embeddings(self, content_hash_id: int):
"""Delete all embeddings for a file."""
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
try:
cursor.execute("""
DELETE FROM project_embeddings
WHERE content_hash_id = ?
""", (content_hash_id,))
conn.commit()
finally:
conn.close()
def store_embeddings(
self,
project_id: int,
content_hash_id: int,
chunks: List[Chunk],
embeddings: List[bytes],
model_name: str,
embedding_dim: int
) -> int:
"""
Store embeddings in the database.
Returns number of embeddings stored.
"""
if len(chunks) != len(embeddings):
raise ValueError(f"Chunks ({len(chunks)}) and embeddings ({len(embeddings)}) count mismatch")
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
stored = 0
try:
for chunk, embedding in zip(chunks, embeddings):
try:
cursor.execute("""
INSERT INTO project_embeddings (
project_id, content_hash_id, chunk_index, chunk_text,
chunk_hash, embedding, embedding_model, embedding_dim,
start_line, end_line, content_type, language
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
project_id, content_hash_id, chunk.chunk_index, chunk.text,
chunk.chunk_hash, embedding, model_name, embedding_dim,
chunk.start_line, chunk.end_line, chunk.content_type, chunk.language
))
stored += 1
except sqlite3.IntegrityError:
# Chunk already exists (duplicate hash)
pass
conn.commit()
finally:
conn.close()
return stored
=============================================================================
Main Embedder
=============================================================================
def embed_project( project_path: str, incremental: bool = True, dry_run: bool = False, verbose: bool = False, reembed_all: bool = False, model_name: str = DEFAULT_MODEL ) -> Dict: """ Generate embeddings for a project's source files.
J.15.4.4: Hash-based invalidation - only re-embed files whose content has changed.
Args:
project_path: Path to project or project name
incremental: Only embed files that have changed (default: True)
dry_run: Preview without making changes
verbose: Print detailed progress
reembed_all: Force re-embed all files (ignore existing)
model_name: Embedding model to use
Returns:
Dict with embedding statistics
"""
db_path = get_projects_db_path()
if not db_path.exists():
return {"error": "projects.db not found. Run /cx --register-project first."}
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
# Find project
path = Path(project_path).expanduser().resolve()
cursor.execute("""
SELECT id, project_uuid, name, path FROM projects
WHERE path = ? OR name = ? OR project_uuid = ?
""", (str(path), project_path, project_path))
project = cursor.fetchone()
if not project:
return {"error": f"Project not found: {project_path}"}
project_id = project['id']
project_name = project['name']
project_dir = Path(project['path'])
print("=" * 60)
print("PROJECT EMBEDDING (J.15.4)")
print("=" * 60)
print(f"\n Project: {project_name}")
print(f" Path: {project_dir}")
print(f" Model: {model_name}")
print(f" Mode: {'Incremental' if incremental else 'Full'}")
if dry_run:
print(f" *** DRY RUN - No changes will be made ***")
# Get indexed files from content_hashes
cursor.execute("""
SELECT id, relative_path, content_hash, content_type, file_size
FROM content_hashes
WHERE project_id = ?
""", (project_id,))
indexed_files = cursor.fetchall()
if not indexed_files:
print(f"\n⚠️ No indexed files found. Run: /cx --index-project {project_name}")
return {"error": "No indexed files. Run --index-project first."}
print(f"\n Indexed files: {len(indexed_files):,}")
# Filter to embeddable files
embeddable = []
for f in indexed_files:
ext = Path(f['relative_path']).suffix.lower()
if ext in EMBEDDABLE_EXTENSIONS:
embeddable.append(f)
print(f" Embeddable: {len(embeddable):,}")
# Initialize components
generator = EmbeddingGenerator(model_name)
storage = EmbeddingStorage(db_path)
if not generator.available:
print(f"\n⚠️ sentence-transformers not installed")
print(f" Run: pip install sentence-transformers")
return {"error": "sentence-transformers not installed"}
# Process files
stats = {
"files_processed": 0,
"files_skipped": 0,
"chunks_total": 0,
"chunks_skipped": 0,
"chunks_embedded": 0,
"errors": []
}
print(f"\nProcessing files...")
for i, file_record in enumerate(embeddable):
file_path = project_dir / file_record['relative_path']
content_hash_id = file_record['id']
content_type = file_record['content_type']
if verbose:
print(f" [{i+1}/{len(embeddable)}] {file_record['relative_path']}")
# Skip if not incremental and embeddings exist
if not reembed_all and incremental:
existing_hashes = storage.get_existing_chunk_hashes(content_hash_id)
if existing_hashes:
stats["files_skipped"] += 1
if verbose:
print(f" → Skipped (existing embeddings)")
continue
# Read file content
try:
if not file_path.exists():
stats["errors"].append(f"File not found: {file_record['relative_path']}")
continue
content = file_path.read_text(encoding='utf-8', errors='ignore')
if not content.strip():
stats["files_skipped"] += 1
continue
except Exception as e:
stats["errors"].append(f"{file_record['relative_path']}: {e}")
continue
# Determine language
ext = file_path.suffix.lower()
language = LANGUAGE_MAP.get(ext)
# Get appropriate chunker
chunker = get_chunker(content_type)
# Generate chunks
chunks = list(chunker.chunk(content, language))
stats["chunks_total"] += len(chunks)
if dry_run:
stats["files_processed"] += 1
stats["chunks_embedded"] += len(chunks)
if verbose:
print(f" → Would generate {len(chunks)} chunks")
continue
# Delete existing embeddings if re-embedding
if reembed_all:
storage.delete_file_embeddings(content_hash_id)
# Generate embeddings
try:
chunk_texts = [c.text for c in chunks]
embeddings = generator.embed(chunk_texts)
# Store embeddings
stored = storage.store_embeddings(
project_id=project_id,
content_hash_id=content_hash_id,
chunks=chunks,
embeddings=embeddings,
model_name=model_name,
embedding_dim=generator.embedding_dim
)
stats["files_processed"] += 1
stats["chunks_embedded"] += stored
stats["chunks_skipped"] += len(chunks) - stored
if verbose:
print(f" → {stored} chunks embedded")
except Exception as e:
stats["errors"].append(f"{file_record['relative_path']}: {e}")
# Summary
print(f"\n" + "=" * 60)
print("EMBEDDING COMPLETE")
print("=" * 60)
print(f"\n Files processed: {stats['files_processed']:,}")
print(f" Files skipped: {stats['files_skipped']:,}")
print(f" Chunks generated: {stats['chunks_total']:,}")
print(f" Chunks embedded: {stats['chunks_embedded']:,}")
print(f" Chunks skipped: {stats['chunks_skipped']:,} (duplicates)")
if stats["errors"]:
print(f"\n Errors: {len(stats['errors'])}")
for err in stats["errors"][:5]:
print(f" - {err}")
if len(stats["errors"]) > 5:
print(f" ... and {len(stats['errors']) - 5} more")
return stats
finally:
conn.close()
=============================================================================
CLI
=============================================================================
def main(): import argparse
parser = argparse.ArgumentParser(
description="Generate semantic embeddings for project content (J.15.4)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples: python3 project_embedder.py --project my-project python3 project_embedder.py --project-path /path/to/project python3 project_embedder.py --project my-project --dry-run python3 project_embedder.py --project my-project --reembed-all """ )
parser.add_argument('--project', '-p', metavar='NAME',
help='Project name or UUID')
parser.add_argument('--project-path', metavar='PATH',
help='Project path')
parser.add_argument('--dry-run', '-n', action='store_true',
help='Preview without making changes')
parser.add_argument('--verbose', '-v', action='store_true',
help='Verbose output')
parser.add_argument('--reembed-all', action='store_true',
help='Force re-embed all files')
parser.add_argument('--model', default=DEFAULT_MODEL,
help=f'Embedding model (default: {DEFAULT_MODEL})')
parser.add_argument('--no-incremental', action='store_true',
help='Process all files (ignore existing embeddings)')
args = parser.parse_args()
if not args.project and not args.project_path:
parser.error("Either --project or --project-path required")
project_ref = args.project or args.project_path
result = embed_project(
project_path=project_ref,
incremental=not args.no_incremental,
dry_run=args.dry_run,
verbose=args.verbose,
reembed_all=args.reembed_all,
model_name=args.model
)
if "error" in result:
print(f"\n❌ Error: {result['error']}")
sys.exit(1)
sys.exit(0)
if name == "main": main()