#!/usr/bin/env python3 """ Projects Database Management (H.5.7.2, H.5.7.3)
Creates and manages the unified projects.db for all project code/doc embeddings with hash-based change tracking for incremental updates.
Key Features:
- CODITECT Cloud Integration: Projects are registered with cloud API to get globally unique UUIDs, tenant/team/user assignments from auth.coditect.ai
- GitHub Integration: Auto-detects GitHub repo from git remote origin
- Offline Support: Can register locally (pending sync) and sync later
- Multi-tenant: Supports tenant_id, team_id, owner_user_id from cloud
- Smart Chunking: Content-type-aware chunking (code by functions, docs by sections)
- Embedding Generation: Uses sentence-transformers for semantic search
Part of ADR-103: Four-Database Separation Architecture
Usage: python3 projects-db.py --init # Initialize database python3 projects-db.py --register ~/my-project # Register + sync to cloud python3 projects-db.py --register ~/pkg --offline # Register locally only python3 projects-db.py --register ./pkg --parent UUID # Register subproject python3 projects-db.py --index <PROJECT_UUID> # Index project files python3 projects-db.py --index-changed <PROJECT_UUID> # Index only changed python3 projects-db.py --embed <PROJECT_UUID> # Generate embeddings python3 projects-db.py --embed-all # Generate embeddings for all projects python3 projects-db.py --list # List all projects python3 projects-db.py --tree # Show project hierarchy python3 projects-db.py --stats # Show statistics python3 projects-db.py --unregister <PROJECT_UUID> # Remove a project python3 projects-db.py --sync-pending # Sync pending to cloud
Cloud API Endpoints: POST /api/v1/projects/register - Register new project, get UUID POST /api/v1/projects/{uuid}/deregister - Notify project removal """
import argparse import hashlib import json import os import re import sqlite3 import subprocess import sys import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple, Any
ADR-114 & ADR-118: Use centralized path discovery
projects.db is Tier 1 data (regenerable project embeddings)
CODITECT_HOME = Path.home() / ".coditect" # Framework install USER_DATA_DIR = Path.home() / "PROJECTS" / ".coditect-data" CONTEXT_STORAGE = USER_DATA_DIR / "context-storage" if USER_DATA_DIR.exists() else CODITECT_HOME / "context-storage" PROJECTS_DB = CONTEXT_STORAGE / "projects.db"
File patterns to index by content type
FILE_PATTERNS = { "code": { "extensions": [".py", ".js", ".ts", ".tsx", ".jsx", ".go", ".rs", ".java", ".rb", ".php", ".c", ".cpp", ".h", ".hpp", ".cs", ".swift", ".kt"], "exclude_patterns": [".min.js", ".min.css", "_test.py", ".spec.ts", ".test.js"], }, "document": { "extensions": [".md", ".mdx", ".txt", ".rst", ".adoc"], "exclude_patterns": [], }, "config": { "extensions": [".json", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf"], "exclude_patterns": ["package-lock.json", "yarn.lock", "pnpm-lock.yaml", "Cargo.lock"], }, "test": { "patterns": ["test.py", ".spec.ts", ".spec.js", ".test.js", ".test.ts", "test*.py"], "extensions": [], }, }
Global exclude patterns (applied to all projects)
GLOBAL_EXCLUDES = [ "node_modules/", "pycache/", ".git/", ".svn/", ".hg/", ".pyc", ".pyo", ".env", ".env.", ".log", "dist/", "build/", ".venv/", "venv/", ".min.js", ".min.css", "package-lock.json", "yarn.lock", "pnpm-lock.yaml", "Cargo.lock", ".DS_Store", "Thumbs.db", "*.egg-info/", ".tox/", ".pytest_cache/", ".mypy_cache/", "coverage/", ".coverage", "htmlcov/**", ]
Schema
SCHEMA = """ -- Registered projects (CODITECT Cloud-aligned schema) -- Mirrors cloud model: tenants.Project with local extensions CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT,
-- CODITECT Universal Identifiers
project_uuid TEXT UNIQUE NOT NULL, -- CODITECT cloud UUID (globally unique)
project_slug TEXT NOT NULL, -- Human-readable slug (e.g., 'my-webapp')
-- GitHub Integration
github_repo_url TEXT, -- Full GitHub URL (https://github.com/org/repo)
github_repo_id TEXT, -- GitHub repository ID (numeric)
github_org TEXT, -- GitHub organization/owner
github_repo_name TEXT, -- Repository name
-- Multi-tenant Context (from CODITECT cloud)
tenant_id TEXT, -- Cloud tenant UUID (customer organization)
team_id TEXT, -- Cloud team UUID
owner_user_id TEXT, -- Cloud user UUID who registered project
-- Local Context
project_name TEXT NOT NULL, -- Display name
project_path TEXT UNIQUE NOT NULL, -- Local absolute path
project_type TEXT DEFAULT 'customer', -- 'internal' | 'customer' | 'submodule'
parent_project_uuid TEXT, -- For submodules/monorepo children
-- Indexing Configuration
embedding_model TEXT DEFAULT 'all-MiniLM-L6-v2',
chunk_size INTEGER DEFAULT 1000,
chunk_overlap INTEGER DEFAULT 200,
-- Statistics
file_count INTEGER DEFAULT 0,
embedding_count INTEGER DEFAULT 0,
last_indexed TEXT,
-- Cloud Sync
cloud_synced_at TEXT, -- Last sync to CODITECT cloud
cloud_sync_status TEXT DEFAULT 'pending', -- 'synced' | 'pending' | 'error'
-- Timestamps
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (parent_project_uuid) REFERENCES projects(project_uuid)
);
-- Content hash tracking (references project by UUID) CREATE TABLE IF NOT EXISTS content_hashes ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_uuid TEXT NOT NULL, file_path TEXT NOT NULL, content_hash TEXT NOT NULL, file_size INTEGER, mtime REAL, content_type TEXT NOT NULL, language TEXT, chunk_count INTEGER DEFAULT 1, indexed_at TEXT DEFAULT CURRENT_TIMESTAMP, last_checked TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(project_uuid, file_path), FOREIGN KEY (project_uuid) REFERENCES projects(project_uuid) ON DELETE CASCADE );
-- Project content embeddings (references project by UUID) CREATE TABLE IF NOT EXISTS project_embeddings ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_uuid TEXT NOT NULL, file_path TEXT NOT NULL, content_hash TEXT NOT NULL, content_type TEXT NOT NULL, chunk_index INTEGER DEFAULT 0, chunk_total INTEGER DEFAULT 1, content_preview TEXT, embedding BLOB, model TEXT, metadata TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(project_uuid, content_hash, chunk_index), FOREIGN KEY (project_uuid) REFERENCES projects(project_uuid) ON DELETE CASCADE );
-- Global exclude patterns CREATE TABLE IF NOT EXISTS global_exclude_patterns ( id INTEGER PRIMARY KEY AUTOINCREMENT, pattern TEXT UNIQUE NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP );
-- Per-project exclude patterns CREATE TABLE IF NOT EXISTS project_exclude_patterns ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_uuid TEXT NOT NULL, pattern TEXT NOT NULL, created_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(project_uuid, pattern), FOREIGN KEY (project_uuid) REFERENCES projects(project_uuid) ON DELETE CASCADE );
-- Indexes for efficient queries CREATE INDEX IF NOT EXISTS idx_projects_uuid ON projects(project_uuid); CREATE INDEX IF NOT EXISTS idx_projects_github ON projects(github_repo_url); CREATE INDEX IF NOT EXISTS idx_projects_tenant ON projects(tenant_id); CREATE INDEX IF NOT EXISTS idx_projects_type ON projects(project_type); CREATE INDEX IF NOT EXISTS idx_projects_parent ON projects(parent_project_uuid); CREATE INDEX IF NOT EXISTS idx_content_hashes_project ON content_hashes(project_uuid); CREATE INDEX IF NOT EXISTS idx_content_hashes_type ON content_hashes(project_uuid, content_type); CREATE INDEX IF NOT EXISTS idx_project_embeddings_project ON project_embeddings(project_uuid); CREATE INDEX IF NOT EXISTS idx_project_embeddings_type ON project_embeddings(project_uuid, content_type); """
def compute_content_hash(file_path: Path) -> str: """Compute SHA256 hash of file content.""" with open(file_path, 'rb') as f: return hashlib.sha256(f.read()).hexdigest()
def generate_project_uuid() -> str: """Generate a unique CODITECT project UUID.""" import uuid return str(uuid.uuid4())
def generate_project_slug(project_path: Path) -> str: """Generate a human-readable slug from path.""" name = project_path.name.lower() name = re.sub(r'[^a-z0-9]+', '-', name) name = name.strip('-') return name
def detect_github_info(project_path: Path) -> Dict[str, Optional[str]]: """Detect GitHub repository information from git remote.""" info = { 'github_repo_url': None, 'github_org': None, 'github_repo_name': None, }
try:
result = subprocess.run(
['git', 'remote', 'get-url', 'origin'],
cwd=str(project_path),
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
remote_url = result.stdout.strip()
# Parse GitHub URL (https or ssh format)
# https://github.com/org/repo.git
# git@github.com:org/repo.git
if 'github.com' in remote_url:
info['github_repo_url'] = remote_url
# Extract org and repo
if remote_url.startswith('git@'):
# git@github.com:org/repo.git
match = re.match(r'git@github\.com:([^/]+)/(.+?)(?:\.git)?$', remote_url)
else:
# https://github.com/org/repo.git
match = re.match(r'https://github\.com/(+)/(.+?)(?:\.git)?$', remote_url)
if match:
info['github_org'] = match.group(1)
info['github_repo_name'] = match.group(2)
except Exception:
pass
return info
def detect_content_type(file_path: Path) -> Optional[str]: """Detect content type from file extension and name.""" ext = file_path.suffix.lower() name = file_path.name.lower()
# Check test patterns first
for pattern in FILE_PATTERNS["test"].get("patterns", []):
if file_path.match(pattern):
return "test"
# Check by extension
for content_type, config in FILE_PATTERNS.items():
if ext in config.get("extensions", []):
return content_type
return None
def detect_language(file_path: Path) -> Optional[str]: """Detect programming language from file extension.""" ext_to_lang = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".tsx": "typescript", ".jsx": "javascript", ".go": "go", ".rs": "rust", ".java": "java", ".rb": "ruby", ".php": "php", ".c": "c", ".cpp": "cpp", ".h": "c", ".hpp": "cpp", ".cs": "csharp", ".swift": "swift", ".kt": "kotlin", ".md": "markdown", ".json": "json", ".yaml": "yaml", ".yml": "yaml", ".toml": "toml", } return ext_to_lang.get(file_path.suffix.lower())
def should_exclude(file_path: Path, project_root: Path, exclude_patterns: List[str]) -> bool: """Check if file should be excluded.""" try: rel_path = file_path.relative_to(project_root) except ValueError: return True
rel_str = str(rel_path)
for pattern in exclude_patterns:
if pattern.endswith('/**'):
# Directory pattern
dir_pattern = pattern[:-3]
if rel_str.startswith(dir_pattern + '/') or rel_str.startswith(dir_pattern):
return True
elif '*' in pattern:
# Glob pattern
from fnmatch import fnmatch
if fnmatch(rel_str, pattern) or fnmatch(file_path.name, pattern):
return True
else:
# Exact match
if rel_str == pattern or file_path.name == pattern:
return True
return False
=============================================================================
H.5.7.3.3: Chunking Strategies (ADR-103)
=============================================================================
def chunk_text_simple(content: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]: """ Simple text chunking with overlap.
Splits content into chunks of approximately chunk_size characters,
with chunk_overlap characters of overlap between adjacent chunks.
Tries to split on sentence boundaries when possible.
"""
if len(content) <= chunk_size:
return [content]
chunks = []
start = 0
while start < len(content):
end = start + chunk_size
if end >= len(content):
# Last chunk
chunks.append(content[start:].strip())
break
# Try to find a good split point (sentence boundary)
split_point = end
for sep in ['\n\n', '\n', '. ', '! ', '? ', '; ', ', ', ' ']:
# Look for separator in the last 20% of the chunk
search_start = end - int(chunk_size * 0.2)
search_region = content[search_start:end]
last_sep = search_region.rfind(sep)
if last_sep != -1:
split_point = search_start + last_sep + len(sep)
break
chunk = content[start:split_point].strip()
if chunk:
chunks.append(chunk)
# Move start with overlap
start = split_point - chunk_overlap
if start < 0:
start = 0
return chunks
def chunk_code_by_functions(content: str, language: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Dict[str, Any]]: """ Chunk code by function/class boundaries.
For languages with clear function definitions (Python, JS/TS, Go, etc.),
tries to keep functions intact. Falls back to simple chunking for
functions larger than chunk_size.
Returns list of dicts with 'content' and optional 'metadata' (function name, etc.)
"""
chunks = []
# Language-specific patterns for function/class detection
patterns = {
'python': [
(r'^(class\s+\w+.*?:)', 'class'),
(r'^(def\s+\w+.*?:)', 'function'),
(r'^(async\s+def\s+\w+.*?:)', 'async_function'),
],
'javascript': [
(r'^(class\s+\w+)', 'class'),
(r'^(function\s+\w+)', 'function'),
(r'^(const\s+\w+\s*=\s*(?:async\s+)?\()', 'arrow_function'),
(r'^(export\s+(?:default\s+)?(?:async\s+)?function\s+\w+)', 'export_function'),
],
'typescript': [
(r'^(class\s+\w+)', 'class'),
(r'^(function\s+\w+)', 'function'),
(r'^(const\s+\w+\s*=\s*(?:async\s+)?\()', 'arrow_function'),
(r'^(export\s+(?:default\s+)?(?:async\s+)?function\s+\w+)', 'export_function'),
(r'^(interface\s+\w+)', 'interface'),
(r'^(type\s+\w+)', 'type'),
],
'go': [
(r'^(func\s+(?:\(\w+\s+\*?\w+\)\s+)?\w+)', 'function'),
(r'^(type\s+\w+\s+struct)', 'struct'),
(r'^(type\s+\w+\s+interface)', 'interface'),
],
'rust': [
(r'^(fn\s+\w+)', 'function'),
(r'^(pub\s+fn\s+\w+)', 'pub_function'),
(r'^(impl\s+)', 'impl'),
(r'^(struct\s+\w+)', 'struct'),
(r'^(enum\s+\w+)', 'enum'),
],
}
lang_patterns = patterns.get(language, [])
if not lang_patterns:
# No patterns for this language, use simple chunking
simple_chunks = chunk_text_simple(content, chunk_size, chunk_overlap)
return [{'content': c, 'metadata': {}} for c in simple_chunks]
# Split by lines and try to identify function boundaries
lines = content.split('\n')
current_block = []
current_block_type = None
current_block_name = None
for line in lines:
# Check if this line starts a new block
new_block_started = False
for pattern, block_type in lang_patterns:
match = re.match(pattern, line, re.MULTILINE)
if match:
# Save current block if it exists
if current_block:
block_content = '\n'.join(current_block)
if len(block_content) > chunk_size:
# Block too large, split it
sub_chunks = chunk_text_simple(block_content, chunk_size, chunk_overlap)
for i, sub in enumerate(sub_chunks):
chunks.append({
'content': sub,
'metadata': {
'block_type': current_block_type,
'name': current_block_name,
'part': i + 1 if len(sub_chunks) > 1 else None,
}
})
else:
chunks.append({
'content': block_content,
'metadata': {
'block_type': current_block_type,
'name': current_block_name,
}
})
# Start new block
current_block = [line]
current_block_type = block_type
# Extract name from match
name_match = re.search(r'(?:class|def|func|fn|function|const|type|struct|enum|interface|impl)\s+(\w+)', line)
current_block_name = name_match.group(1) if name_match else None
new_block_started = True
break
if not new_block_started:
current_block.append(line)
# Don't forget the last block
if current_block:
block_content = '\n'.join(current_block)
if len(block_content) > chunk_size:
sub_chunks = chunk_text_simple(block_content, chunk_size, chunk_overlap)
for i, sub in enumerate(sub_chunks):
chunks.append({
'content': sub,
'metadata': {
'block_type': current_block_type,
'name': current_block_name,
'part': i + 1 if len(sub_chunks) > 1 else None,
}
})
else:
chunks.append({
'content': block_content,
'metadata': {
'block_type': current_block_type,
'name': current_block_name,
}
})
# If no chunks were created (no functions found), use simple chunking
if not chunks:
simple_chunks = chunk_text_simple(content, chunk_size, chunk_overlap)
return [{'content': c, 'metadata': {}} for c in simple_chunks]
return chunks
def chunk_document_by_sections(content: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Dict[str, Any]]: """ Chunk markdown/document content by sections (headers).
Splits on ## headers first, then ### headers if sections are still too large.
Falls back to paragraph/sentence splitting for very long sections.
Returns list of dicts with 'content' and 'metadata' (section title, level).
"""
chunks = []
# Split by top-level sections (## headers)
section_pattern = r'^(#{1,3})\s+(.+?)$'
lines = content.split('\n')
current_section = []
current_title = None
current_level = 0
for line in lines:
match = re.match(section_pattern, line)
if match:
# Save current section if it exists
if current_section:
section_content = '\n'.join(current_section).strip()
if section_content:
if len(section_content) > chunk_size:
# Section too large, split it
sub_chunks = chunk_text_simple(section_content, chunk_size, chunk_overlap)
for i, sub in enumerate(sub_chunks):
chunks.append({
'content': sub,
'metadata': {
'section': current_title,
'level': current_level,
'part': i + 1 if len(sub_chunks) > 1 else None,
}
})
else:
chunks.append({
'content': section_content,
'metadata': {
'section': current_title,
'level': current_level,
}
})
# Start new section
current_level = len(match.group(1))
current_title = match.group(2).strip()
current_section = [line]
else:
current_section.append(line)
# Don't forget the last section
if current_section:
section_content = '\n'.join(current_section).strip()
if section_content:
if len(section_content) > chunk_size:
sub_chunks = chunk_text_simple(section_content, chunk_size, chunk_overlap)
for i, sub in enumerate(sub_chunks):
chunks.append({
'content': sub,
'metadata': {
'section': current_title,
'level': current_level,
'part': i + 1 if len(sub_chunks) > 1 else None,
}
})
else:
chunks.append({
'content': section_content,
'metadata': {
'section': current_title,
'level': current_level,
}
})
# If no chunks were created (no headers found), use simple chunking
if not chunks:
simple_chunks = chunk_text_simple(content, chunk_size, chunk_overlap)
return [{'content': c, 'metadata': {}} for c in simple_chunks]
return chunks
def chunk_config(content: str, chunk_size: int = 1000) -> List[Dict[str, Any]]: """ Chunk config files (JSON, YAML, TOML).
Config files are usually small, so we keep them whole unless very large.
For large configs, we try to split by top-level keys.
"""
if len(content) <= chunk_size:
return [{'content': content, 'metadata': {'whole_file': True}}]
# Try to parse and split by top-level keys
# For now, just use simple chunking for large configs
simple_chunks = chunk_text_simple(content, chunk_size, chunk_overlap=100)
return [{'content': c, 'metadata': {}} for c in simple_chunks]
def get_chunks(content: str, content_type: str, language: Optional[str], chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Dict[str, Any]]: """ Main chunking dispatcher.
Routes to appropriate chunking strategy based on content type.
Args:
content: File content to chunk
content_type: 'code', 'document', 'config', 'test'
language: Programming language (for code files)
chunk_size: Target chunk size in characters
chunk_overlap: Overlap between chunks in characters
Returns:
List of dicts with 'content' and 'metadata' keys
"""
if not content or not content.strip():
return []
if content_type == 'code' or content_type == 'test':
return chunk_code_by_functions(content, language or '', chunk_size, chunk_overlap)
elif content_type == 'document':
return chunk_document_by_sections(content, chunk_size, chunk_overlap)
elif content_type == 'config':
return chunk_config(content, chunk_size)
else:
# Unknown type, use simple chunking
simple_chunks = chunk_text_simple(content, chunk_size, chunk_overlap)
return [{'content': c, 'metadata': {}} for c in simple_chunks]
=============================================================================
H.5.7.3.4: Embedding Generation (ADR-103)
=============================================================================
Lazy-loaded embedding model (to avoid loading until needed)
_embedding_model = None _embedding_model_name = 'all-MiniLM-L6-v2' EMBEDDING_DIM = 384 # all-MiniLM-L6-v2 produces 384-dimensional vectors
def get_embedding_model(): """ Lazy-load the sentence-transformers embedding model.
Uses all-MiniLM-L6-v2 which produces 384-dimensional vectors.
Model is cached after first load.
"""
global _embedding_model
if _embedding_model is not None:
return _embedding_model
try:
from sentence_transformers import SentenceTransformer
_embedding_model = SentenceTransformer(_embedding_model_name)
return _embedding_model
except ImportError:
print("Error: sentence-transformers not installed.", file=sys.stderr)
print("Run: source ~/.coditect/.venv/bin/activate && pip install sentence-transformers", file=sys.stderr)
return None
def generate_embedding(text: str) -> Optional[bytes]: """ Generate embedding for a single text chunk.
Returns:
Embedding as bytes (BLOB for SQLite), or None if model unavailable
"""
model = get_embedding_model()
if model is None:
return None
if not text or not text.strip():
return None
try:
# Generate embedding (returns numpy array)
embedding = model.encode(text, convert_to_numpy=True)
# Convert to bytes for SQLite BLOB storage
import numpy as np
return embedding.astype(np.float32).tobytes()
except Exception as e:
print(f"Error generating embedding: {e}", file=sys.stderr)
return None
def generate_embeddings_batch(texts: List[str], batch_size: int = 32) -> List[Optional[bytes]]: """ Generate embeddings for multiple text chunks in batches.
More efficient than calling generate_embedding() for each text.
Args:
texts: List of text chunks
batch_size: Number of texts to process at once
Returns:
List of embeddings as bytes, or None for empty/failed texts
"""
model = get_embedding_model()
if model is None:
return [None] * len(texts)
results = []
import numpy as np
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
# Track which texts are empty
valid_indices = []
valid_texts = []
for j, text in enumerate(batch):
if text and text.strip():
valid_indices.append(j)
valid_texts.append(text)
# Generate embeddings for valid texts
if valid_texts:
try:
embeddings = model.encode(valid_texts, convert_to_numpy=True, show_progress_bar=False)
# Map embeddings back to original positions
batch_results = [None] * len(batch)
for k, idx in enumerate(valid_indices):
batch_results[idx] = embeddings[k].astype(np.float32).tobytes()
results.extend(batch_results)
except Exception as e:
print(f"Error in batch embedding: {e}", file=sys.stderr)
results.extend([None] * len(batch))
else:
results.extend([None] * len(batch))
return results
def generate_project_embeddings(project_uuid: str, force: bool = False) -> Dict[str, Any]: """ Generate embeddings for all chunks in a project (H.5.7.3.4).
Args:
project_uuid: Project to generate embeddings for
force: If True, regenerate even if embeddings exist
Returns:
Statistics dict with counts
"""
conn = get_connection()
# Get project info
cursor = conn.execute(
"SELECT project_name FROM projects WHERE project_uuid = ?",
(project_uuid,)
)
project = cursor.fetchone()
if not project:
conn.close()
raise ValueError(f"Project not found: {project_uuid}")
stats = {
'project_uuid': project_uuid,
'project_name': project['project_name'],
'total_chunks': 0,
'generated': 0,
'skipped': 0,
'errors': 0,
'start_time': time.time(),
}
# Get chunks that need embeddings
if force:
cursor = conn.execute("""
SELECT id, content_preview FROM project_embeddings
WHERE project_uuid = ?
ORDER BY id
""", (project_uuid,))
else:
cursor = conn.execute("""
SELECT id, content_preview FROM project_embeddings
WHERE project_uuid = ? AND embedding IS NULL
ORDER BY id
""", (project_uuid,))
rows = cursor.fetchall()
stats['total_chunks'] = len(rows)
if not rows:
conn.close()
stats['elapsed_time'] = time.time() - stats['start_time']
return stats
# Prepare texts for batch processing
ids = [row['id'] for row in rows]
texts = [row['content_preview'] or '' for row in rows]
# Generate embeddings in batches
print(f"Generating embeddings for {len(texts)} chunks...")
embeddings = generate_embeddings_batch(texts, batch_size=32)
# Update database with embeddings
for chunk_id, embedding in zip(ids, embeddings):
if embedding is not None:
try:
conn.execute("""
UPDATE project_embeddings
SET embedding = ?, model = ?, updated_at = ?
WHERE id = ?
""", (embedding, _embedding_model_name, datetime.utcnow().isoformat(), chunk_id))
stats['generated'] += 1
except Exception as e:
stats['errors'] += 1
print(f"Error updating chunk {chunk_id}: {e}", file=sys.stderr)
else:
stats['skipped'] += 1
conn.commit()
# Update project embedding count
cursor = conn.execute("""
SELECT COUNT(*) as count FROM project_embeddings
WHERE project_uuid = ? AND embedding IS NOT NULL
""", (project_uuid,))
embedding_count = cursor.fetchone()['count']
conn.execute("""
UPDATE projects SET embedding_count = ?, updated_at = ?
WHERE project_uuid = ?
""", (embedding_count, datetime.utcnow().isoformat(), project_uuid))
conn.commit()
conn.close()
stats['elapsed_time'] = time.time() - stats['start_time']
return stats
def generate_all_embeddings(force: bool = False) -> Dict[str, Any]: """ Generate embeddings for all projects that have pending chunks.
Returns:
Aggregated statistics
"""
conn = get_connection()
cursor = conn.execute("SELECT project_uuid, project_name FROM projects ORDER BY project_name")
projects = cursor.fetchall()
conn.close()
total_stats = {
'projects_processed': 0,
'total_chunks': 0,
'total_generated': 0,
'total_skipped': 0,
'total_errors': 0,
'start_time': time.time(),
'by_project': {},
}
for project in projects:
project_uuid = project['project_uuid']
project_name = project['project_name']
try:
stats = generate_project_embeddings(project_uuid, force=force)
if stats['total_chunks'] > 0:
total_stats['projects_processed'] += 1
total_stats['total_chunks'] += stats['total_chunks']
total_stats['total_generated'] += stats['generated']
total_stats['total_skipped'] += stats['skipped']
total_stats['total_errors'] += stats['errors']
total_stats['by_project'][project_name] = stats
except Exception as e:
print(f"Error processing {project_name}: {e}", file=sys.stderr)
total_stats['total_errors'] += 1
total_stats['elapsed_time'] = time.time() - total_stats['start_time']
return total_stats
def init_database() -> sqlite3.Connection: """Initialize the projects.db database.""" CONTEXT_STORAGE.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(PROJECTS_DB))
conn.row_factory = sqlite3.Row
# Enable WAL mode and foreign keys
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA cache_size=-64000")
# Create schema
conn.executescript(SCHEMA)
# Insert global excludes
for pattern in GLOBAL_EXCLUDES:
conn.execute(
"INSERT OR IGNORE INTO global_exclude_patterns (pattern) VALUES (?)",
(pattern,)
)
conn.commit()
return conn
def get_connection() -> sqlite3.Connection: """Get database connection.""" if not PROJECTS_DB.exists(): return init_database()
conn = sqlite3.connect(str(PROJECTS_DB))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys=ON")
return conn
def get_exclude_patterns(conn: sqlite3.Connection, project_uuid: str) -> List[str]: """Get all exclude patterns for a project.""" patterns = []
# Global patterns
cursor = conn.execute("SELECT pattern FROM global_exclude_patterns")
patterns.extend(row['pattern'] for row in cursor.fetchall())
# Project-specific patterns
cursor = conn.execute(
"SELECT pattern FROM project_exclude_patterns WHERE project_uuid = ?",
(project_uuid,)
)
patterns.extend(row['pattern'] for row in cursor.fetchall())
return patterns
def register_with_cloud( project_name: str, github_repo_url: Optional[str], github_org: Optional[str], github_repo_name: Optional[str], project_type: str, parent_uuid: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ Register project with CODITECT cloud and get assigned UUID.
Cloud API: POST https://api.coditect.ai/api/v1/projects/register
Returns: { project_uuid, tenant_id, team_id, owner_user_id, ... }
"""
config_path = CODITECT_HOME / "config" / "config.json"
if not config_path.exists():
return None
try:
with open(config_path) as f:
config = json.load(f)
cloud_config = config.get('cloud_sync', {})
if not cloud_config.get('enabled', False):
return None
api_url = cloud_config.get('api_url', 'https://api.coditect.ai')
auth_token = cloud_config.get('auth_token')
if not auth_token:
# Try to get from environment or auth file
auth_token = os.environ.get('CODITECT_AUTH_TOKEN')
if not auth_token:
print("Warning: No auth token configured for cloud sync", file=sys.stderr)
return None
import urllib.request
import urllib.error
payload = json.dumps({
'project_name': project_name,
'github_repo_url': github_repo_url,
'github_org': github_org,
'github_repo_name': github_repo_name,
'project_type': project_type,
'parent_project_uuid': parent_uuid,
}).encode('utf-8')
req = urllib.request.Request(
f"{api_url}/api/v1/projects/register",
data=payload,
headers={
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json',
},
method='POST'
)
with urllib.request.urlopen(req, timeout=10) as response:
return json.loads(response.read().decode('utf-8'))
except Exception as e:
print(f"Cloud registration failed: {e}", file=sys.stderr)
return None
def register_project( project_path: str, parent_uuid: Optional[str] = None, project_type: str = "customer", offline: bool = False, ) -> Dict[str, Any]: """ Register a new project with CODITECT.
1. Detects GitHub information from git remote
2. Registers with CODITECT cloud (if online) to get UUID
3. Stores locally in projects.db
Cloud registration assigns:
- project_uuid (globally unique, from cloud)
- tenant_id (customer organization)
- team_id (team within tenant)
- owner_user_id (registering user)
"""
path = Path(project_path).resolve()
if not path.exists():
raise ValueError(f"Project path does not exist: {path}")
if not path.is_dir():
raise ValueError(f"Project path is not a directory: {path}")
conn = get_connection()
# Check if already registered by path
cursor = conn.execute(
"SELECT * FROM projects WHERE project_path = ?",
(str(path),)
)
existing = cursor.fetchone()
if existing:
conn.close()
return dict(existing)
# Generate local slug and detect GitHub
project_slug = generate_project_slug(path)
project_name = path.name
github_info = detect_github_info(path)
# Check if parent exists locally
if parent_uuid:
cursor = conn.execute(
"SELECT project_uuid FROM projects WHERE project_uuid = ?",
(parent_uuid,)
)
if not cursor.fetchone():
raise ValueError(f"Parent project not found: {parent_uuid}")
# Register with CODITECT cloud (assigns UUID, tenant, team, user)
cloud_response = None
project_uuid = None
tenant_id = None
team_id = None
user_id = None
if not offline:
cloud_response = register_with_cloud(
project_name=project_name,
github_repo_url=github_info['github_repo_url'],
github_org=github_info['github_org'],
github_repo_name=github_info['github_repo_name'],
project_type=project_type,
parent_uuid=parent_uuid,
)
if cloud_response:
# Use cloud-assigned values
project_uuid = cloud_response.get('project_uuid')
tenant_id = cloud_response.get('tenant_id')
team_id = cloud_response.get('team_id')
user_id = cloud_response.get('owner_user_id')
cloud_sync_status = 'synced'
else:
# Offline mode: generate local UUID (will sync later)
project_uuid = generate_project_uuid()
cloud_sync_status = 'pending'
print(f"Note: Registered offline. Run '/cx --sync-projects' to sync with cloud.", file=sys.stderr)
# Insert project locally
conn.execute("""
INSERT INTO projects (
project_uuid, project_slug, project_name, project_path, project_type,
parent_project_uuid, github_repo_url, github_org, github_repo_name,
tenant_id, team_id, owner_user_id, cloud_sync_status, cloud_synced_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
project_uuid, project_slug, project_name, str(path), project_type,
parent_uuid, github_info['github_repo_url'], github_info['github_org'],
github_info['github_repo_name'], tenant_id, team_id, user_id,
cloud_sync_status,
datetime.utcnow().isoformat() if cloud_response else None
))
conn.commit()
conn.close()
return {
'project_uuid': project_uuid,
'project_slug': project_slug,
'project_name': project_name,
'project_path': str(path),
'project_type': project_type,
'parent_project_uuid': parent_uuid,
'github_repo_url': github_info['github_repo_url'],
'github_org': github_info['github_org'],
'github_repo_name': github_info['github_repo_name'],
'tenant_id': tenant_id,
'team_id': team_id,
'owner_user_id': user_id,
'cloud_sync_status': cloud_sync_status,
'status': 'registered'
}
def discover_files(project_path: Path, exclude_patterns: List[str]) -> List[Tuple[Path, str, str]]: """Discover all indexable files in a project.""" files = []
for file_path in project_path.rglob('*'):
if not file_path.is_file():
continue
if should_exclude(file_path, project_path, exclude_patterns):
continue
content_type = detect_content_type(file_path)
if not content_type:
continue
language = detect_language(file_path)
files.append((file_path, content_type, language))
return files
def check_file_changed(conn: sqlite3.Connection, project_uuid: str, file_path: Path, project_root: Path) -> Tuple[bool, Optional[str]]: """Check if file has changed since last index.""" rel_path = str(file_path.relative_to(project_root))
cursor = conn.execute(
"SELECT content_hash, mtime FROM content_hashes WHERE project_uuid = ? AND file_path = ?",
(project_uuid, rel_path)
)
row = cursor.fetchone()
if not row:
return True, None
current_mtime = file_path.stat().st_mtime
if current_mtime == row['mtime']:
return False, row['content_hash']
current_hash = compute_content_hash(file_path)
if current_hash == row['content_hash']:
conn.execute(
"UPDATE content_hashes SET mtime = ?, last_checked = ? WHERE project_uuid = ? AND file_path = ?",
(current_mtime, datetime.utcnow().isoformat(), project_uuid, rel_path)
)
return False, current_hash
return True, current_hash
=============================================================================
H.5.7.3.5: Hash-Based Incremental Updates (ADR-103)
=============================================================================
def cleanup_orphaned_records(conn: sqlite3.Connection, project_uuid: str, current_files: set) -> Dict[str, int]: """ Remove records for files that no longer exist in the project.
This is critical for maintaining database consistency when files are
deleted, renamed, or moved outside the project.
Args:
conn: Database connection
project_uuid: Project to clean up
current_files: Set of relative file paths that currently exist
Returns:
Dict with counts of removed records
"""
stats = {
'removed_hashes': 0,
'removed_embeddings': 0,
}
# Get all indexed file paths for this project
cursor = conn.execute(
"SELECT file_path FROM content_hashes WHERE project_uuid = ?",
(project_uuid,)
)
indexed_files = {row['file_path'] for row in cursor.fetchall()}
# Find orphaned files (indexed but no longer exist)
orphaned = indexed_files - current_files
if orphaned:
# Delete orphaned records
for rel_path in orphaned:
conn.execute(
"DELETE FROM content_hashes WHERE project_uuid = ? AND file_path = ?",
(project_uuid, rel_path)
)
stats['removed_hashes'] += 1
cursor = conn.execute(
"DELETE FROM project_embeddings WHERE project_uuid = ? AND file_path = ?",
(project_uuid, rel_path)
)
stats['removed_embeddings'] += cursor.rowcount
return stats
def cleanup_stale_chunks(conn: sqlite3.Connection, project_uuid: str, file_path: str, expected_chunks: int) -> int: """ Remove stale chunk records when chunk count changes.
If a file is re-indexed with fewer chunks than before, this removes
the orphaned chunk records.
Args:
conn: Database connection
project_uuid: Project UUID
file_path: Relative file path
expected_chunks: New chunk count
Returns:
Number of records removed
"""
cursor = conn.execute("""
DELETE FROM project_embeddings
WHERE project_uuid = ? AND file_path = ? AND chunk_index >= ?
""", (project_uuid, file_path, expected_chunks))
return cursor.rowcount
def get_incremental_stats(project_uuid: str) -> Dict[str, Any]: """ Get statistics about incremental update potential.
Shows how many files have changed vs unchanged, useful for
deciding whether to do full or incremental reindex.
Returns:
Dict with file counts by status
"""
conn = get_connection()
# Get project info
cursor = conn.execute(
"SELECT project_path FROM projects WHERE project_uuid = ?",
(project_uuid,)
)
project = cursor.fetchone()
if not project:
conn.close()
raise ValueError(f"Project not found: {project_uuid}")
project_root = Path(project['project_path'])
exclude_patterns = get_exclude_patterns(conn, project_uuid)
# Discover current files
current_files = discover_files(project_root, exclude_patterns)
current_paths = {str(f[0].relative_to(project_root)) for f, _, _ in [(f, ct, l) for f, ct, l in current_files]}
# Get indexed files
cursor = conn.execute(
"SELECT file_path, content_hash, mtime FROM content_hashes WHERE project_uuid = ?",
(project_uuid,)
)
indexed = {row['file_path']: (row['content_hash'], row['mtime']) for row in cursor.fetchall()}
stats = {
'new_files': 0,
'unchanged_files': 0,
'changed_files': 0,
'removed_files': 0,
'total_current': len(current_paths),
'total_indexed': len(indexed),
}
# Check each current file
for file_path, content_type, language in current_files:
rel_path = str(file_path.relative_to(project_root))
if rel_path not in indexed:
stats['new_files'] += 1
else:
old_hash, old_mtime = indexed[rel_path]
try:
current_mtime = file_path.stat().st_mtime
if current_mtime != old_mtime:
# mtime changed, check content
current_hash = compute_content_hash(file_path)
if current_hash != old_hash:
stats['changed_files'] += 1
else:
stats['unchanged_files'] += 1
else:
stats['unchanged_files'] += 1
except Exception:
stats['changed_files'] += 1
# Files that were indexed but no longer exist
stats['removed_files'] = len(indexed) - (stats['unchanged_files'] + stats['changed_files'])
conn.close()
return stats
def index_file(conn: sqlite3.Connection, project_uuid: str, file_path: Path, project_root: Path, content_type: str, language: str, content_hash: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> Dict[str, Any]: """ Index a single file with smart chunking (H.5.7.3.3).
Chunks content based on content_type:
- code/test: By function/class boundaries
- document: By markdown sections
- config: Keep whole or split by keys
Creates multiple project_embeddings records, one per chunk.
"""
rel_path = str(file_path.relative_to(project_root))
content = file_path.read_text(encoding='utf-8', errors='replace')
file_stat = file_path.stat()
# Get chunks using content-type-aware chunking
chunks = get_chunks(content, content_type, language, chunk_size, chunk_overlap)
chunk_count = len(chunks) if chunks else 1
# Update hash tracking with chunk count
conn.execute("""
INSERT OR REPLACE INTO content_hashes
(project_uuid, file_path, content_hash, file_size, mtime, content_type, language, chunk_count, indexed_at, last_checked)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
project_uuid,
rel_path,
content_hash,
file_stat.st_size,
file_stat.st_mtime,
content_type,
language,
chunk_count,
datetime.utcnow().isoformat(),
datetime.utcnow().isoformat()
))
# Delete old chunks for this file (in case chunk count changed)
conn.execute("""
DELETE FROM project_embeddings
WHERE project_uuid = ? AND file_path = ?
""", (project_uuid, rel_path))
# Insert embedding records for each chunk
if chunks:
for idx, chunk_data in enumerate(chunks):
chunk_content = chunk_data.get('content', '')
chunk_metadata = chunk_data.get('metadata', {})
# Add language to metadata
chunk_metadata['language'] = language
# Preview is first 500 chars of chunk
content_preview = chunk_content[:500].strip()
conn.execute("""
INSERT INTO project_embeddings
(project_uuid, file_path, content_hash, content_type, chunk_index, chunk_total,
content_preview, embedding, model, metadata, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
project_uuid,
rel_path,
content_hash,
content_type,
idx,
chunk_count,
content_preview,
None, # Embedding generated later (H.5.7.3.4)
'all-MiniLM-L6-v2',
json.dumps(chunk_metadata),
datetime.utcnow().isoformat()
))
else:
# Empty file or no chunks, create single placeholder
conn.execute("""
INSERT INTO project_embeddings
(project_uuid, file_path, content_hash, content_type, chunk_index, chunk_total,
content_preview, embedding, model, metadata, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
project_uuid,
rel_path,
content_hash,
content_type,
0,
1,
content[:500].strip() if content else '',
None,
'all-MiniLM-L6-v2',
json.dumps({'language': language}),
datetime.utcnow().isoformat()
))
return {
'file_path': rel_path,
'content_type': content_type,
'language': language,
'size': file_stat.st_size,
'chunk_count': chunk_count,
}
def index_project(project_uuid: str, only_changed: bool = False) -> Dict[str, Any]: """ Index all files in a project with smart chunking (H.5.7.3.3) and hash-based incremental updates (H.5.7.3.5).
Uses content-type-aware chunking strategies:
- code/test: By function/class boundaries
- document: By markdown sections
- config: Keep whole or split by keys
Incremental features:
- Skips unchanged files (same mtime and content hash)
- Removes orphaned records for deleted files
- Updates only changed chunks
"""
conn = get_connection()
# Get project info including chunk settings
cursor = conn.execute(
"SELECT * FROM projects WHERE project_uuid = ?",
(project_uuid,)
)
project = cursor.fetchone()
if not project:
conn.close()
raise ValueError(f"Project not found: {project_uuid}")
project_root = Path(project['project_path'])
exclude_patterns = get_exclude_patterns(conn, project_uuid)
# Get chunking settings from project (with defaults)
chunk_size = project['chunk_size'] or 1000
chunk_overlap = project['chunk_overlap'] or 200
stats = {
'project_uuid': project_uuid,
'project_name': project['project_name'],
'chunk_size': chunk_size,
'chunk_overlap': chunk_overlap,
'total_files': 0,
'indexed': 0,
'skipped': 0,
'errors': 0,
'total_chunks': 0,
'removed_files': 0,
'removed_embeddings': 0,
'by_type': {},
'start_time': time.time(),
}
# Discover files
files = discover_files(project_root, exclude_patterns)
stats['total_files'] = len(files)
# Build set of current file paths for orphan detection (H.5.7.3.5)
current_file_paths = {str(f.relative_to(project_root)) for f, _, _ in files}
# Clean up orphaned records (files that no longer exist)
cleanup_stats = cleanup_orphaned_records(conn, project_uuid, current_file_paths)
stats['removed_files'] = cleanup_stats['removed_hashes']
stats['removed_embeddings'] = cleanup_stats['removed_embeddings']
for file_path, content_type, language in files:
type_stats = stats['by_type'].setdefault(content_type, {'indexed': 0, 'skipped': 0, 'chunks': 0})
try:
changed, content_hash = check_file_changed(conn, project_uuid, file_path, project_root)
if only_changed and not changed:
stats['skipped'] += 1
type_stats['skipped'] += 1
continue
if content_hash is None:
content_hash = compute_content_hash(file_path)
result = index_file(
conn, project_uuid, file_path, project_root,
content_type, language, content_hash,
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
stats['indexed'] += 1
type_stats['indexed'] += 1
# Track chunk counts
chunk_count = result.get('chunk_count', 1)
stats['total_chunks'] += chunk_count
type_stats['chunks'] += chunk_count
except Exception as e:
stats['errors'] += 1
print(f"Error indexing {file_path}: {e}", file=sys.stderr)
# Update project stats (file_count and embedding_count)
conn.execute("""
UPDATE projects
SET file_count = ?, embedding_count = ?, last_indexed = ?, updated_at = ?
WHERE project_uuid = ?
""", (
stats['indexed'],
stats['total_chunks'],
datetime.utcnow().isoformat(),
datetime.utcnow().isoformat(),
project_uuid
))
conn.commit()
conn.close()
stats['elapsed_time'] = time.time() - stats['start_time']
stats['avg_chunks_per_file'] = stats['total_chunks'] / stats['indexed'] if stats['indexed'] > 0 else 0
return stats
def list_projects(show_tree: bool = False) -> List[Dict[str, Any]]: """List all registered projects.""" conn = get_connection()
cursor = conn.execute("""
SELECT project_uuid, project_slug, project_name, project_path, project_type,
parent_project_uuid, github_repo_url, github_org, github_repo_name,
tenant_id, team_id, owner_user_id, cloud_sync_status,
file_count, embedding_count, last_indexed
FROM projects
ORDER BY parent_project_uuid NULLS FIRST, project_name
""")
projects = [dict(row) for row in cursor.fetchall()]
conn.close()
if show_tree:
# Build tree structure
root_projects = [p for p in projects if not p['parent_project_uuid']]
for root in root_projects:
root['children'] = [p for p in projects if p['parent_project_uuid'] == root['project_uuid']]
return projects
def get_statistics() -> Dict[str, Any]: """Get database statistics.""" conn = get_connection()
stats = {
'database_path': str(PROJECTS_DB),
'database_size': PROJECTS_DB.stat().st_size if PROJECTS_DB.exists() else 0,
}
cursor = conn.execute("SELECT COUNT(*) as count FROM projects")
stats['total_projects'] = cursor.fetchone()['count']
cursor = conn.execute("""
SELECT project_type, COUNT(*) as count
FROM projects GROUP BY project_type
""")
stats['by_type'] = {row['project_type']: row['count'] for row in cursor.fetchall()}
cursor = conn.execute("SELECT COUNT(*) as count FROM content_hashes")
stats['total_files'] = cursor.fetchone()['count']
cursor = conn.execute("SELECT COUNT(*) as count FROM project_embeddings WHERE embedding IS NOT NULL")
stats['embeddings_generated'] = cursor.fetchone()['count']
cursor = conn.execute("SELECT COUNT(*) as count FROM project_embeddings WHERE embedding IS NULL")
stats['embeddings_pending'] = cursor.fetchone()['count']
cursor = conn.execute("""
SELECT content_type, COUNT(*) as count
FROM content_hashes GROUP BY content_type
""")
stats['files_by_type'] = {row['content_type']: row['count'] for row in cursor.fetchall()}
conn.close()
return stats
def unregister_project(project_uuid: str, sync_to_cloud: bool = True) -> bool: """ Remove a project and all its data.
If sync_to_cloud is True, notifies CODITECT cloud to deregister the project.
"""
conn = get_connection()
cursor = conn.execute(
"SELECT project_uuid, project_name, tenant_id FROM projects WHERE project_uuid = ?",
(project_uuid,)
)
project = cursor.fetchone()
if not project:
conn.close()
return False
# Check for children
cursor = conn.execute(
"SELECT COUNT(*) as count FROM projects WHERE parent_project_uuid = ?",
(project_uuid,)
)
if cursor.fetchone()['count'] > 0:
raise ValueError(f"Cannot unregister project with children. Remove children first.")
# Notify cloud (if enabled)
if sync_to_cloud and project['tenant_id']:
try:
_unregister_from_cloud(project_uuid)
except Exception as e:
print(f"Warning: Cloud deregistration failed: {e}", file=sys.stderr)
# Delete (cascades to content_hashes, project_embeddings, project_exclude_patterns)
conn.execute("DELETE FROM projects WHERE project_uuid = ?", (project_uuid,))
conn.commit()
conn.close()
return True
def _unregister_from_cloud(project_uuid: str) -> bool: """Notify CODITECT cloud that project is being removed.""" config_path = CODITECT_HOME / "config" / "config.json" if not config_path.exists(): return False
try:
with open(config_path) as f:
config = json.load(f)
cloud_config = config.get('cloud_sync', {})
if not cloud_config.get('enabled', False):
return False
api_url = cloud_config.get('api_url', 'https://api.coditect.ai')
auth_token = cloud_config.get('auth_token') or os.environ.get('CODITECT_AUTH_TOKEN')
if not auth_token:
return False
import urllib.request
req = urllib.request.Request(
f"{api_url}/api/v1/projects/{project_uuid}/deregister",
headers={
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json',
},
method='POST'
)
with urllib.request.urlopen(req, timeout=10) as response:
return response.status == 200
except Exception:
return False
def print_tree(projects: List[Dict[str, Any]]): """Print projects as a tree.""" root_projects = [p for p in projects if not p['parent_project_uuid']]
for root in root_projects:
sync_icon = "☁️ " if root.get('cloud_sync_status') == 'synced' else "⏳" if root.get('cloud_sync_status') == 'pending' else ""
github_info = f" [{root['github_org']}/{root['github_repo_name']}]" if root.get('github_org') else ""
print(f"\n{sync_icon}{root['project_name']} ({root['project_type']}){github_info}")
print(f" UUID: {root['project_uuid']}")
print(f" Path: {root['project_path']}")
print(f" Files: {root['file_count']} | Embeddings: {root.get('embedding_count', 0)}")
if root.get('tenant_id'):
print(f" Tenant: {root['tenant_id'][:8]}...")
children = [p for p in projects if p['parent_project_uuid'] == root['project_uuid']]
for i, child in enumerate(children):
prefix = "└──" if i == len(children) - 1 else "├──"
child_github = f" [{child['github_repo_name']}]" if child.get('github_repo_name') else ""
print(f" {prefix} {child['project_name']}{child_github} ({child['file_count']} files)")
def sync_pending_projects() -> Dict[str, Any]: """Sync all pending projects with CODITECT cloud.""" conn = get_connection()
cursor = conn.execute("""
SELECT project_uuid, project_name, project_path, project_type,
parent_project_uuid, github_repo_url, github_org, github_repo_name
FROM projects
WHERE cloud_sync_status = 'pending'
""")
pending = cursor.fetchall()
results = {'synced': 0, 'failed': 0, 'errors': []}
for project in pending:
try:
cloud_response = register_with_cloud(
project_name=project['project_name'],
github_repo_url=project['github_repo_url'],
github_org=project['github_org'],
github_repo_name=project['github_repo_name'],
project_type=project['project_type'],
parent_uuid=project['parent_project_uuid'],
)
if cloud_response:
# Update local with cloud-assigned values
conn.execute("""
UPDATE projects SET
project_uuid = ?,
tenant_id = ?,
team_id = ?,
owner_user_id = ?,
cloud_sync_status = 'synced',
cloud_synced_at = ?
WHERE project_uuid = ?
""", (
cloud_response.get('project_uuid', project['project_uuid']),
cloud_response.get('tenant_id'),
cloud_response.get('team_id'),
cloud_response.get('owner_user_id'),
datetime.utcnow().isoformat(),
project['project_uuid']
))
results['synced'] += 1
else:
results['failed'] += 1
results['errors'].append(f"{project['project_name']}: No cloud response")
except Exception as e:
results['failed'] += 1
results['errors'].append(f"{project['project_name']}: {str(e)}")
conn.commit()
conn.close()
return results
def main(): parser = argparse.ArgumentParser(description="Projects Database Management") parser.add_argument('--init', action='store_true', help='Initialize database') parser.add_argument('--register', metavar='PATH', help='Register a project') parser.add_argument('--parent', metavar='UUID', help='Parent project UUID (for subprojects)') parser.add_argument('--type', default='customer', choices=['internal', 'customer', 'submodule'], help='Project type') parser.add_argument('--offline', action='store_true', help='Register without cloud sync') parser.add_argument('--index', metavar='PROJECT_UUID', help='Index a project') parser.add_argument('--index-changed', metavar='PROJECT_UUID', help='Index only changed files') parser.add_argument('--index-status', metavar='PROJECT_UUID', help='Show incremental update status') parser.add_argument('--embed', metavar='PROJECT_UUID', help='Generate embeddings for a project') parser.add_argument('--embed-all', action='store_true', help='Generate embeddings for all projects') parser.add_argument('--force-embed', action='store_true', help='Regenerate even if embeddings exist') parser.add_argument('--list', action='store_true', help='List all projects') parser.add_argument('--tree', action='store_true', help='Show project hierarchy') parser.add_argument('--stats', action='store_true', help='Show statistics') parser.add_argument('--unregister', metavar='PROJECT_UUID', help='Remove a project') parser.add_argument('--sync-pending', action='store_true', help='Sync pending projects to cloud')
args = parser.parse_args()
if args.init:
print("Initializing projects.db...")
conn = init_database()
conn.close()
print(f"Created: {PROJECTS_DB}")
return 0
if args.register:
try:
result = register_project(args.register, args.parent, args.type, offline=args.offline)
sync_status = "☁️ synced" if result['cloud_sync_status'] == 'synced' else "⏳ pending"
print(f"\nRegistered project: {result['project_name']}")
print(f" UUID: {result['project_uuid']}")
print(f" Path: {result['project_path']}")
print(f" Type: {result['project_type']}")
print(f" Cloud: {sync_status}")
if result.get('github_repo_url'):
print(f" GitHub: {result['github_org']}/{result['github_repo_name']}")
if result.get('tenant_id'):
print(f" Tenant: {result['tenant_id']}")
if result.get('parent_project_uuid'):
print(f" Parent: {result['parent_project_uuid']}")
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if args.index_status:
try:
stats = get_incremental_stats(args.index_status)
print(f"\nIncremental Update Status:")
print(f" Current files: {stats['total_current']}")
print(f" Previously indexed: {stats['total_indexed']}")
print(f"\n Status breakdown:")
print(f" New files: {stats['new_files']}")
print(f" Changed files: {stats['changed_files']}")
print(f" Unchanged files: {stats['unchanged_files']}")
print(f" Removed files: {stats['removed_files']}")
needs_update = stats['new_files'] + stats['changed_files'] + stats['removed_files']
if needs_update == 0:
print(f"\n ✓ Project is up-to-date. No indexing needed.")
else:
print(f"\n → {needs_update} files need updating. Use --index-changed for incremental update.")
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if args.index or args.index_changed:
project_uuid = args.index or args.index_changed
only_changed = bool(args.index_changed)
try:
stats = index_project(project_uuid, only_changed=only_changed)
print(f"\nIndexed project: {stats['project_name']}")
print(f" UUID: {stats['project_uuid']}")
print(f" Total files: {stats['total_files']}")
print(f" Indexed: {stats['indexed']}")
print(f" Skipped: {stats['skipped']}")
print(f" Errors: {stats['errors']}")
print(f" Total chunks: {stats['total_chunks']} (avg {stats['avg_chunks_per_file']:.1f}/file)")
print(f" Chunk settings: size={stats['chunk_size']}, overlap={stats['chunk_overlap']}")
if stats.get('removed_files', 0) > 0 or stats.get('removed_embeddings', 0) > 0:
print(f" Cleanup: {stats.get('removed_files', 0)} orphaned files, {stats.get('removed_embeddings', 0)} embeddings removed")
print(f" Time: {stats['elapsed_time']:.2f}s")
print(f"\n By type:")
for content_type, type_stats in sorted(stats['by_type'].items()):
chunks_info = f", {type_stats.get('chunks', 0)} chunks" if type_stats.get('chunks', 0) > 0 else ""
print(f" {content_type}: {type_stats['indexed']} indexed, {type_stats['skipped']} skipped{chunks_info}")
return 0 if stats['errors'] == 0 else 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if args.list or args.tree:
projects = list_projects(show_tree=args.tree)
if args.tree:
print_tree(projects)
else:
for p in projects:
sync_icon = "☁️ " if p.get('cloud_sync_status') == 'synced' else "⏳" if p.get('cloud_sync_status') == 'pending' else ""
github = f" [{p['github_org']}/{p['github_repo_name']}]" if p.get('github_org') else ""
parent = f" (parent: {p['parent_project_uuid'][:8]}...)" if p.get('parent_project_uuid') else ""
print(f"{sync_icon}{p['project_uuid'][:8]}...: {p['project_name']}{github} [{p['project_type']}]{parent}")
return 0
if args.stats:
stats = get_statistics()
print("\n" + "=" * 60)
print("Projects Database Statistics")
print("=" * 60)
print(f"\nDatabase: {stats['database_path']}")
print(f"Size: {stats['database_size'] / 1024:.1f} KB")
print(f"\nTotal Projects: {stats['total_projects']}")
print(f" By type: {stats.get('by_type', {})}")
print(f"\nTotal Files Indexed: {stats['total_files']}")
print(f" By content type: {stats.get('files_by_type', {})}")
print(f"\nEmbeddings Generated: {stats['embeddings_generated']}")
print(f"Embeddings Pending: {stats['embeddings_pending']}")
return 0
if args.unregister:
try:
if unregister_project(args.unregister):
print(f"Unregistered project: {args.unregister}")
else:
print(f"Project not found: {args.unregister}")
return 1
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if args.embed:
try:
stats = generate_project_embeddings(args.embed, force=args.force_embed)
print(f"\nEmbeddings generated for: {stats['project_name']}")
print(f" UUID: {stats['project_uuid']}")
print(f" Total chunks: {stats['total_chunks']}")
print(f" Generated: {stats['generated']}")
print(f" Skipped: {stats['skipped']}")
print(f" Errors: {stats['errors']}")
print(f" Time: {stats['elapsed_time']:.2f}s")
return 0 if stats['errors'] == 0 else 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if args.embed_all:
print("Generating embeddings for all projects...")
stats = generate_all_embeddings(force=args.force_embed)
print(f"\nEmbedding generation complete:")
print(f" Projects processed: {stats['projects_processed']}")
print(f" Total chunks: {stats['total_chunks']}")
print(f" Generated: {stats['total_generated']}")
print(f" Skipped: {stats['total_skipped']}")
print(f" Errors: {stats['total_errors']}")
print(f" Time: {stats['elapsed_time']:.2f}s")
if stats['by_project']:
print(f"\n By project:")
for name, pstats in sorted(stats['by_project'].items()):
print(f" {name}: {pstats['generated']}/{pstats['total_chunks']} chunks ({pstats['elapsed_time']:.1f}s)")
return 0 if stats['total_errors'] == 0 else 1
if args.sync_pending:
print("Syncing pending projects to CODITECT cloud...")
results = sync_pending_projects()
print(f"\nSynced: {results['synced']}")
print(f"Failed: {results['failed']}")
if results['errors']:
print(f"\nErrors:")
for err in results['errors']:
print(f" - {err}")
return 0 if results['failed'] == 0 else 1
# Default: show stats
stats = get_statistics()
print(f"Projects: {stats['total_projects']} | Files: {stats['total_files']} | Embeddings: {stats['embeddings_generated']}")
print("Use --help for options")
return 0
if name == 'main': sys.exit(main())