scripts-project-indexer
#!/usr/bin/env python3 """
title: Project Indexer - Index project files for semantic search type: script component_type: script version: 1.0.0 status: active summary: "Index project source files into projects.db for content hashing and embedding preparation (J.15.3)" keywords: [project, indexing, files, hashing, content-type, incremental] created: '2026-02-05' updated: '2026-02-05' track: J task_ids: [J.15.3.1, J.15.3.2, J.15.3.3, J.15.3.4, J.15.3.5]
Project Indexer - J.15.3 Implementation
Indexes project source files for semantic search:
- J.15.3.1: File discovery with exclude patterns
- J.15.3.2: Content type detection (code, document, config, test)
- J.15.3.3: Content hashing for change detection
- J.15.3.4: /cx --index-project command support
- J.15.3.5: Incremental indexing (only changed files)
Usage: python3 scripts/project_indexer.py <project_path> python3 scripts/project_indexer.py <project_path> --incremental python3 scripts/project_indexer.py <project_path> --dry-run python3 scripts/project_indexer.py --help """
import argparse import hashlib import os import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Set, Tuple, Any
Add parent directory for imports
sys.path.insert(0, str(Path(file).parent))
try: from core.paths import get_projects_db_path PATHS_AVAILABLE = True except ImportError: PATHS_AVAILABLE = False def get_projects_db_path(): return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "projects.db"
============================================================================
J.15.3.1: Exclude Patterns
============================================================================
DEFAULT_EXCLUDE_PATTERNS: Set[str] = { # Version control ".git", ".svn", ".hg", ".bzr",
# Dependencies
"node_modules",
"vendor",
"venv",
".venv",
"env",
".env",
"__pycache__",
".pytest_cache",
".mypy_cache",
".tox",
".nox",
"eggs",
"*.egg-info",
".eggs",
# Build outputs
"build",
"dist",
"target",
"out",
"bin",
"obj",
".next",
".nuxt",
".output",
".vercel",
".netlify",
# IDE/Editor
".idea",
".vscode",
"*.swp",
"*.swo",
"*~",
".DS_Store",
"Thumbs.db",
# Logs and temp
"logs",
"*.log",
"tmp",
"temp",
".tmp",
".temp",
# Coverage and test artifacts
"coverage",
".coverage",
"htmlcov",
".nyc_output",
# Lock files (index but don't embed)
# "package-lock.json",
# "yarn.lock",
# "poetry.lock",
# "Cargo.lock",
# Large binary files
"*.pdf",
"*.zip",
"*.tar",
"*.gz",
"*.rar",
"*.7z",
"*.exe",
"*.dll",
"*.so",
"*.dylib",
"*.whl",
# Images (index metadata only)
"*.png",
"*.jpg",
"*.jpeg",
"*.gif",
"*.ico",
"*.svg",
"*.webp",
# Database files
"*.db",
"*.sqlite",
"*.sqlite3",
}
File extensions to always skip
SKIP_EXTENSIONS: Set[str] = { ".pyc", ".pyo", ".class", ".o", ".obj", ".exe", ".dll", ".so", ".dylib", ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".zip", ".tar", ".gz", ".rar", ".7z", ".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp", ".mp3", ".mp4", ".wav", ".avi", ".mov", ".woff", ".woff2", ".ttf", ".eot", ".db", ".sqlite", ".sqlite3", ".lock", # Lock files are often huge }
============================================================================
J.15.3.2: Content Type Detection
============================================================================
CONTENT_TYPE_MAP: Dict[str, str] = { # Code files ".py": "code", ".js": "code", ".ts": "code", ".jsx": "code", ".tsx": "code", ".java": "code", ".kt": "code", ".go": "code", ".rs": "code", ".c": "code", ".cpp": "code", ".cc": "code", ".h": "code", ".hpp": "code", ".cs": "code", ".rb": "code", ".php": "code", ".swift": "code", ".scala": "code", ".r": "code", ".R": "code", ".lua": "code", ".pl": "code", ".pm": "code", ".sh": "code", ".bash": "code", ".zsh": "code", ".fish": "code", ".ps1": "code", ".sql": "code", ".vue": "code", ".svelte": "code",
# Document files
".md": "document",
".markdown": "document",
".rst": "document",
".txt": "document",
".adoc": "document",
".asciidoc": "document",
".org": "document",
".tex": "document",
# Config files
".json": "config",
".yaml": "config",
".yml": "config",
".toml": "config",
".ini": "config",
".cfg": "config",
".conf": "config",
".config": "config",
".env": "config",
".env.example": "config",
".env.local": "config",
".properties": "config",
".xml": "config",
# Test files (detected by path pattern, but extension helps)
".test.js": "test",
".test.ts": "test",
".spec.js": "test",
".spec.ts": "test",
"_test.py": "test",
"_test.go": "test",
# Web files
".html": "document",
".htm": "document",
".css": "style",
".scss": "style",
".sass": "style",
".less": "style",
}
LANGUAGE_MAP: Dict[str, str] = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".jsx": "javascript", ".tsx": "typescript", ".java": "java", ".kt": "kotlin", ".go": "go", ".rs": "rust", ".c": "c", ".cpp": "cpp", ".cc": "cpp", ".h": "c", ".hpp": "cpp", ".cs": "csharp", ".rb": "ruby", ".php": "php", ".swift": "swift", ".scala": "scala", ".r": "r", ".R": "r", ".lua": "lua", ".pl": "perl", ".pm": "perl", ".sh": "bash", ".bash": "bash", ".zsh": "zsh", ".fish": "fish", ".ps1": "powershell", ".sql": "sql", ".vue": "vue", ".svelte": "svelte", ".md": "markdown", ".yaml": "yaml", ".yml": "yaml", ".json": "json", ".toml": "toml", ".xml": "xml", ".html": "html", ".css": "css", ".scss": "scss", }
def detect_content_type(file_path: Path, relative_path: str) -> str: """ J.15.3.2: Detect content type from file path.
Returns: code, document, config, test, style, or unknown
"""
# Check for test files by path pattern
rel_lower = relative_path.lower()
if any(pattern in rel_lower for pattern in [
"/test/", "/tests/", "/__tests__/",
"/spec/", "/specs/",
"_test.", ".test.", ".spec.",
"/test_", "/spec_",
]):
return "test"
# Check file extension
suffix = file_path.suffix.lower()
# Handle compound extensions
name_lower = file_path.name.lower()
for compound_ext in [".test.js", ".test.ts", ".spec.js", ".spec.ts", "_test.py", "_test.go"]:
if name_lower.endswith(compound_ext):
return "test"
return CONTENT_TYPE_MAP.get(suffix, "unknown")
def detect_language(file_path: Path) -> Optional[str]: """Detect programming language from file extension.""" suffix = file_path.suffix.lower() return LANGUAGE_MAP.get(suffix)
============================================================================
J.15.3.3: Content Hashing
============================================================================
def compute_file_hash(file_path: Path) -> str: """ J.15.3.3: Compute SHA-256 hash of file content.
Uses chunked reading for memory efficiency with large files.
"""
sha256 = hashlib.sha256()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()
except (IOError, OSError) as e:
print(f"Warning: Could not hash {file_path}: {e}", file=sys.stderr)
return ""
============================================================================
J.15.3.1 & J.15.3.5: File Discovery
============================================================================
def should_exclude(path: Path, exclude_patterns: Set[str]) -> bool: """Check if path should be excluded based on patterns.""" name = path.name
# Check exact name match
if name in exclude_patterns:
return True
# Check extension patterns
for pattern in exclude_patterns:
if pattern.startswith("*."):
ext = pattern[1:] # Remove *
if name.endswith(ext):
return True
# Check if any parent directory is excluded
for part in path.parts:
if part in exclude_patterns:
return True
return False
def discover_files( project_path: Path, exclude_patterns: Optional[Set[str]] = None, max_file_size_mb: float = 10.0, ) -> List[Tuple[Path, str]]: """ J.15.3.1: Discover indexable files in project.
Returns: List of (absolute_path, relative_path) tuples
"""
if exclude_patterns is None:
exclude_patterns = DEFAULT_EXCLUDE_PATTERNS
max_file_size = int(max_file_size_mb * 1024 * 1024)
files: List[Tuple[Path, str]] = []
for root, dirs, filenames in os.walk(project_path):
root_path = Path(root)
# Filter directories in-place to prevent descending into excluded dirs
dirs[:] = [
d for d in dirs
if not should_exclude(root_path / d, exclude_patterns)
]
for filename in filenames:
file_path = root_path / filename
# Skip excluded files
if should_exclude(file_path, exclude_patterns):
continue
# Skip by extension
if file_path.suffix.lower() in SKIP_EXTENSIONS:
continue
# Skip files that are too large
try:
if file_path.stat().st_size > max_file_size:
continue
except (OSError, IOError):
continue
# Compute relative path
try:
relative_path = str(file_path.relative_to(project_path))
files.append((file_path, relative_path))
except ValueError:
continue
return files
============================================================================
J.15.3.4 & J.15.3.5: Database Operations
============================================================================
def get_project_by_path(conn: sqlite3.Connection, project_path: str) -> Optional[Dict[str, Any]]: """Get project record by path.""" cursor = conn.execute( "SELECT id, project_uuid, name, path FROM projects WHERE path = ?", (project_path,) ) row = cursor.fetchone() if row: return { "id": row[0], "project_uuid": row[1], "name": row[2], "path": row[3], } return None
def get_existing_hashes(conn: sqlite3.Connection, project_id: int) -> Dict[str, Tuple[int, str]]: """ J.15.3.5: Get existing file hashes for incremental indexing.
Returns: Dict[relative_path] -> (content_hash_id, file_hash)
"""
cursor = conn.execute(
"""
SELECT id, relative_path, file_hash
FROM content_hashes
WHERE project_id = ?
""",
(project_id,)
)
return {row[1]: (row[0], row[2]) for row in cursor.fetchall()}
def upsert_content_hash( conn: sqlite3.Connection, project_id: int, relative_path: str, file_hash: str, file_size: int, content_type: str, language: Optional[str], needs_reindex: bool = True, ) -> int: """Insert or update content hash record.""" now = datetime.now(timezone.utc).isoformat()
cursor = conn.execute(
"""
INSERT INTO content_hashes (
project_id, relative_path, file_hash, file_size,
content_type, language, last_hashed_at, needs_reindex
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, relative_path) DO UPDATE SET
file_hash = excluded.file_hash,
file_size = excluded.file_size,
content_type = excluded.content_type,
language = excluded.language,
last_hashed_at = excluded.last_hashed_at,
needs_reindex = excluded.needs_reindex
RETURNING id
""",
(project_id, relative_path, file_hash, file_size,
content_type, language, now, 1 if needs_reindex else 0)
)
result = cursor.fetchone()
return result[0] if result else -1
def delete_removed_files( conn: sqlite3.Connection, project_id: int, current_paths: Set[str], ) -> int: """Remove content_hashes for files that no longer exist.""" cursor = conn.execute( "SELECT id, relative_path FROM content_hashes WHERE project_id = ?", (project_id,) ) to_delete = [row[0] for row in cursor.fetchall() if row[1] not in current_paths]
if to_delete:
placeholders = ",".join("?" * len(to_delete))
conn.execute(
f"DELETE FROM content_hashes WHERE id IN ({placeholders})",
to_delete
)
return len(to_delete)
def update_project_indexed_at(conn: sqlite3.Connection, project_id: int): """Update project's last_indexed_at timestamp.""" now = datetime.now(timezone.utc).isoformat() conn.execute( "UPDATE projects SET last_indexed_at = ?, updated_at = ? WHERE id = ?", (now, now, project_id) )
============================================================================
J.15.3.4: Main Index Function
============================================================================
def index_project( project_path: str, incremental: bool = True, dry_run: bool = False, verbose: bool = False, exclude_patterns: Optional[Set[str]] = None, ) -> Dict[str, Any]: """ J.15.3.4: Index project files into projects.db.
Args:
project_path: Absolute path to project root
incremental: Only index changed files (J.15.3.5)
dry_run: Preview without making changes
verbose: Print detailed progress
exclude_patterns: Custom exclude patterns (uses defaults if None)
Returns:
Statistics dict with counts
"""
project_path = os.path.abspath(project_path)
project_path_obj = Path(project_path)
if not project_path_obj.exists():
return {"error": f"Project path does not exist: {project_path}"}
if not project_path_obj.is_dir():
return {"error": f"Project path is not a directory: {project_path}"}
# Connect to projects.db
db_path = get_projects_db_path()
if not db_path.exists():
return {"error": f"projects.db not found at {db_path}. Run /cx --register-project first."}
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
# Get project record
project = get_project_by_path(conn, project_path)
if not project:
return {
"error": f"Project not registered: {project_path}. Run /cx --register-project first."
}
project_id = project["id"]
project_name = project["name"]
if verbose:
print(f"Indexing project: {project_name} ({project_path})")
# Get existing hashes for incremental mode
existing_hashes: Dict[str, Tuple[int, str]] = {}
if incremental:
existing_hashes = get_existing_hashes(conn, project_id)
if verbose:
print(f"Found {len(existing_hashes)} existing file records")
# Discover files
if exclude_patterns is None:
exclude_patterns = DEFAULT_EXCLUDE_PATTERNS
files = discover_files(project_path_obj, exclude_patterns)
current_paths = {rel_path for _, rel_path in files}
if verbose:
print(f"Discovered {len(files)} files to index")
# Statistics
stats = {
"project_name": project_name,
"project_path": project_path,
"files_discovered": len(files),
"files_new": 0,
"files_changed": 0,
"files_unchanged": 0,
"files_removed": 0,
"dry_run": dry_run,
"incremental": incremental,
"content_types": {},
"languages": {},
}
# Process files
for file_path, relative_path in files:
try:
file_size = file_path.stat().st_size
content_type = detect_content_type(file_path, relative_path)
language = detect_language(file_path)
# Track content types and languages
stats["content_types"][content_type] = stats["content_types"].get(content_type, 0) + 1
if language:
stats["languages"][language] = stats["languages"].get(language, 0) + 1
# Compute hash
file_hash = compute_file_hash(file_path)
if not file_hash:
continue
# Check if changed (incremental mode)
if incremental and relative_path in existing_hashes:
_, old_hash = existing_hashes[relative_path]
if old_hash == file_hash:
stats["files_unchanged"] += 1
if verbose and stats["files_unchanged"] <= 5:
print(f" Unchanged: {relative_path}")
continue
else:
stats["files_changed"] += 1
if verbose:
print(f" Changed: {relative_path}")
else:
stats["files_new"] += 1
if verbose:
print(f" New: {relative_path}")
# Insert/update record
if not dry_run:
upsert_content_hash(
conn, project_id, relative_path, file_hash,
file_size, content_type, language, needs_reindex=True
)
except Exception as e:
if verbose:
print(f" Error processing {relative_path}: {e}", file=sys.stderr)
# Remove deleted files
if not dry_run:
stats["files_removed"] = delete_removed_files(conn, project_id, current_paths)
update_project_indexed_at(conn, project_id)
conn.commit()
else:
# Count what would be removed in dry-run
if existing_hashes:
removed_paths = set(existing_hashes.keys()) - current_paths
stats["files_removed"] = len(removed_paths)
if verbose:
print(f"\nIndexing complete:")
print(f" New files: {stats['files_new']}")
print(f" Changed files: {stats['files_changed']}")
print(f" Unchanged files: {stats['files_unchanged']}")
print(f" Removed files: {stats['files_removed']}")
return stats
finally:
conn.close()
============================================================================
CLI Interface
============================================================================
def main(): parser = argparse.ArgumentParser( description="Index project files for semantic search (J.15.3)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Index a project (incremental by default) python3 scripts/project_indexer.py /path/to/project
# Full re-index (ignore existing hashes)
python3 scripts/project_indexer.py /path/to/project --full
# Preview what would be indexed
python3 scripts/project_indexer.py /path/to/project --dry-run
# Verbose output
python3 scripts/project_indexer.py /path/to/project -v
# Show statistics only
python3 scripts/project_indexer.py /path/to/project --stats
""" )
parser.add_argument(
"project_path",
help="Path to project directory to index"
)
parser.add_argument(
"--full", action="store_true",
help="Full re-index (ignore existing hashes)"
)
parser.add_argument(
"--dry-run", action="store_true",
help="Preview without making changes"
)
parser.add_argument(
"-v", "--verbose", action="store_true",
help="Verbose output"
)
parser.add_argument(
"--stats", action="store_true",
help="Show content type and language statistics"
)
parser.add_argument(
"--json", action="store_true",
help="Output results as JSON"
)
args = parser.parse_args()
# Run indexer
result = index_project(
project_path=args.project_path,
incremental=not args.full,
dry_run=args.dry_run,
verbose=args.verbose,
)
if "error" in result:
print(f"Error: {result['error']}", file=sys.stderr)
sys.exit(1)
if args.json:
import json
print(json.dumps(result, indent=2))
elif args.stats:
print(f"\n📊 Content Type Distribution:")
for ct, count in sorted(result["content_types"].items(), key=lambda x: -x[1]):
print(f" {ct}: {count}")
print(f"\n🗣️ Language Distribution:")
for lang, count in sorted(result["languages"].items(), key=lambda x: -x[1]):
print(f" {lang}: {count}")
else:
# Summary output
mode = "dry-run" if result["dry_run"] else ("incremental" if result["incremental"] else "full")
print(f"\n✅ Project indexed: {result['project_name']} ({mode})")
print(f" Files discovered: {result['files_discovered']}")
print(f" New: {result['files_new']}, Changed: {result['files_changed']}, "
f"Unchanged: {result['files_unchanged']}, Removed: {result['files_removed']}")
if name == "main": main()