#!/usr/bin/env python3 """ Project Registration Module (ADR-118 TIER 4)
Provides project registration, indexing, and querying for /cx command integration. Now with cloud sync support via CloudProjectClient (ADR-158).
Task ID: J.15.2.1, J.15.6.3.4 Created: 2026-01-28 Updated: 2026-02-05 (J.15.6.3.4 - Cloud integration)
Usage via /cx: /cx --register-project ~/my-project /cx --register-project ~/my-project --cloud # Also register with cloud /cx --index-project my-project /cx --list-projects /cx --project-stats /cx --sync-projects # Sync pending cloud registrations """
import json import os import re import sqlite3 import subprocess import sys import uuid from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Tuple
Add parent to path for imports
sys.path.insert(0, str(Path(file).parent.parent))
try: from scripts.core.paths import get_projects_db_path, PROJECTS_DB except ImportError: # Fallback def get_projects_db_path() -> Path: home = Path.home() candidates = [ home / "PROJECTS" / ".coditect-data" / "context-storage" / "projects.db", home / ".coditect-data" / "context-storage" / "projects.db", ] for c in candidates: if c.exists(): return c return candidates[0] PROJECTS_DB = get_projects_db_path()
Cloud client (J.15.6.3.4 - ADR-158)
try: from scripts.core.cloud_project_client import ( CloudProjectClient, ProjectRegistration, SyncResult, ) CLOUD_CLIENT_AVAILABLE = True except ImportError: CLOUD_CLIENT_AVAILABLE = False CloudProjectClient = None ProjectRegistration = None SyncResult = None
=============================================================================
Utility Functions
=============================================================================
def get_git_info(project_path: Path) -> Dict[str, Optional[str]]: """Extract git information from a project directory.""" git_info = { 'github_owner': None, 'github_repo': None, 'github_url': None, 'default_branch': 'main', }
git_dir = project_path / '.git'
if not git_dir.exists():
return git_info
try:
# Get remote URL
result = subprocess.run(
['git', '-C', str(project_path), 'remote', 'get-url', 'origin'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
remote_url = result.stdout.strip()
git_info['github_url'] = remote_url
# Parse GitHub URL
# Formats: git@github.com:owner/repo.git, https://github.com/owner/repo.git
github_match = re.match(
r'(?:git@github\.com:|https://github\.com/)(+)/([^/]+?)(?:\.git)?$',
remote_url
)
if github_match:
git_info['github_owner'] = github_match.group(1)
git_info['github_repo'] = github_match.group(2)
# Get default branch
result = subprocess.run(
['git', '-C', str(project_path), 'symbolic-ref', '--short', 'HEAD'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
git_info['default_branch'] = result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return git_info
def detect_project_info(project_path: Path) -> Dict[str, Optional[str]]: """Detect project metadata from various sources.""" info = { 'name': project_path.name, 'description': None, 'primary_language': None, 'framework': None, }
# Check package.json
package_json = project_path / 'package.json'
if package_json.exists():
try:
with open(package_json) as f:
pkg = json.load(f)
info['name'] = pkg.get('name', info['name'])
info['description'] = pkg.get('description')
# Detect framework from dependencies
deps = {**pkg.get('dependencies', {}), **pkg.get('devDependencies', {})}
if 'react' in deps:
info['framework'] = 'react'
info['primary_language'] = 'typescript' if 'typescript' in deps else 'javascript'
elif 'vue' in deps:
info['framework'] = 'vue'
info['primary_language'] = 'typescript' if 'typescript' in deps else 'javascript'
elif 'next' in deps:
info['framework'] = 'nextjs'
info['primary_language'] = 'typescript' if 'typescript' in deps else 'javascript'
else:
info['primary_language'] = 'typescript' if 'typescript' in deps else 'javascript'
except (json.JSONDecodeError, IOError):
pass
# Check pyproject.toml
pyproject = project_path / 'pyproject.toml'
if pyproject.exists():
info['primary_language'] = 'python'
try:
with open(pyproject) as f:
content = f.read()
if 'django' in content.lower():
info['framework'] = 'django'
elif 'fastapi' in content.lower():
info['framework'] = 'fastapi'
elif 'flask' in content.lower():
info['framework'] = 'flask'
except IOError:
pass
# Check Cargo.toml
cargo = project_path / 'Cargo.toml'
if cargo.exists():
info['primary_language'] = 'rust'
# Check go.mod
gomod = project_path / 'go.mod'
if gomod.exists():
info['primary_language'] = 'go'
return info
def ensure_db_exists() -> bool: """Ensure projects.db exists, create if needed.""" db_path = get_projects_db_path() if db_path.exists(): return True
# Run init script
init_script = Path(__file__).parent / 'init_projects_db.py'
if init_script.exists():
result = subprocess.run(
[sys.executable, str(init_script)],
capture_output=True,
text=True
)
return result.returncode == 0
return False
=============================================================================
Cloud Registration (J.15.6.3.4 - ADR-158)
=============================================================================
def _register_with_cloud( project_uuid: str, project_name: str, root_path: str, primary_language: str = "", framework: str = "", project_type: str = "standalone", parent_uuid: Optional[str] = None, ) -> Optional[str]: """ Register project with cloud API.
Returns cloud UUID on success, None on failure (queued for later).
"""
if not CLOUD_CLIENT_AVAILABLE:
print("\n⚠️ Cloud client not available - install requests or httpx")
return None
try:
client = CloudProjectClient()
# Compute content hash for change detection
content_hash = _compute_project_hash(root_path)
registration = ProjectRegistration(
local_project_uuid=project_uuid,
project_name=project_name,
root_path=root_path,
primary_language=primary_language,
framework=framework,
content_hash=content_hash,
project_type=project_type,
parent_project_uuid=parent_uuid,
)
result = client.register_project(registration)
if result.success:
print(f"\n☁️ Cloud Registration: SUCCESS")
print(f" Cloud UUID: {result.cloud_uuid}")
print(f" Status: {result.status}")
return result.cloud_uuid
elif result.queued:
print(f"\n⏳ Cloud Registration: QUEUED (offline)")
print(f" Sync later: /cx --sync-projects")
return None
else:
print(f"\n❌ Cloud Registration: FAILED")
print(f" Error: {result.error}")
return None
except Exception as e:
print(f"\n❌ Cloud Registration Error: {e}")
return None
def _compute_project_hash(root_path: str) -> str: """Compute a hash of project structure for change detection.""" import hashlib
path = Path(root_path)
hasher = hashlib.sha256()
# Hash key files that indicate project identity
key_files = [
'package.json', 'pyproject.toml', 'Cargo.toml', 'go.mod',
'CLAUDE.md', 'README.md', '.gitignore',
]
for filename in sorted(key_files):
filepath = path / filename
if filepath.exists():
try:
content = filepath.read_bytes()
hasher.update(f"{filename}:{len(content)}:".encode())
hasher.update(content[:4096]) # First 4KB
except IOError:
pass
return hasher.hexdigest()[:16]
def sync_pending_projects(dry_run: bool = False) -> Tuple[int, int]: """ Sync pending cloud registrations from offline queue.
J.15.6.3.5: Implements /cx --sync-projects
Returns:
Tuple of (success_count, remaining_count)
"""
if not CLOUD_CLIENT_AVAILABLE:
print("Error: Cloud client not available - install requests or httpx")
return (0, 0)
print("=" * 60)
print("SYNCING PENDING CLOUD REGISTRATIONS")
print("=" * 60)
if dry_run:
print("*** DRY RUN - No changes will be made ***\n")
try:
client = CloudProjectClient()
# Get queue status
status = client.get_queue_status()
pending = status.get('queue_size', 0)
print(f"Pending registrations: {pending}")
if pending == 0:
print("\n✅ No pending registrations to sync.")
print("=" * 60)
return (0, 0)
if dry_run:
print(f"\nWould process {pending} pending registration(s).")
print("=" * 60)
return (0, pending)
# Process queue
success_count = client.process_queue(batch_size=50)
# Get updated status
new_status = client.get_queue_status()
remaining = new_status.get('queue_size', 0)
print(f"\n✅ Sync complete!")
print(f" Successfully synced: {success_count}")
print(f" Remaining in queue: {remaining}")
# Update local database with cloud UUIDs
_update_local_cloud_uuids(client)
print("=" * 60)
return (success_count, remaining)
except Exception as e:
print(f"\n❌ Sync Error: {e}")
return (0, 0)
def _update_local_cloud_uuids(client: 'CloudProjectClient') -> int: """Update local projects.db with cloud UUIDs from cache.""" if not ensure_db_exists(): return 0
db_path = get_projects_db_path()
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
updated = 0
try:
# Get projects without cloud_uuid
cursor.execute("""
SELECT project_uuid, name FROM projects
WHERE metadata IS NULL
OR json_extract(metadata, '$.cloud_uuid') IS NULL
""")
for row in cursor.fetchall():
local_uuid = row['project_uuid']
cloud_uuid = client.resolve_uuid(local_uuid, use_cache=True)
if cloud_uuid:
cursor.execute("""
UPDATE projects
SET metadata = json_set(
COALESCE(metadata, '{}'),
'$.cloud_uuid', ?
)
WHERE project_uuid = ?
""", (cloud_uuid, local_uuid))
updated += 1
print(f" Updated {row['name']}: {cloud_uuid[:8]}...")
conn.commit()
return updated
except sqlite3.Error as e:
print(f"Database error updating cloud UUIDs: {e}")
return 0
finally:
conn.close()
=============================================================================
Registration Functions
=============================================================================
def register_project( path_str: str, parent_uuid: Optional[str] = None, cloud_register: bool = False, ) -> bool: """ Register a project for indexing and semantic search.
Args:
path_str: Path to the project directory
parent_uuid: Optional parent project UUID for submodule/monorepo hierarchy
cloud_register: Also register with cloud API (ADR-158)
Returns:
True if registration successful, False otherwise
"""
project_path = Path(path_str).expanduser().resolve()
# Validate path
if not project_path.exists():
print(f"Error: Path does not exist: {project_path}")
return False
if not project_path.is_dir():
print(f"Error: Path is not a directory: {project_path}")
return False
# Ensure database exists
if not ensure_db_exists():
print("Error: Could not initialize projects.db")
return False
db_path = get_projects_db_path()
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
# Check if already registered
cursor.execute("SELECT project_uuid, name FROM projects WHERE path = ?", (str(project_path),))
existing = cursor.fetchone()
if existing:
print(f"Project already registered:")
print(f" Name: {existing['name']}")
print(f" UUID: {existing['project_uuid']}")
print(f" Path: {project_path}")
return True
# Generate UUID
project_uuid = str(uuid.uuid4())
# Get git info
git_info = get_git_info(project_path)
# Detect project info
project_info = detect_project_info(project_path)
# Determine project type
if parent_uuid:
project_type = 'submodule'
elif (project_path / '.git' / 'modules').exists():
project_type = 'monorepo'
else:
project_type = 'standalone'
# Insert project
cursor.execute("""
INSERT INTO projects (
project_uuid, name, path,
github_owner, github_repo, github_url, default_branch,
parent_project_id, project_type,
description, primary_language, framework,
status, created_at, updated_at
) VALUES (
?, ?, ?,
?, ?, ?, ?,
(SELECT id FROM projects WHERE project_uuid = ?), ?,
?, ?, ?,
'active', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
)
""", (
project_uuid,
project_info['name'],
str(project_path),
git_info['github_owner'],
git_info['github_repo'],
git_info['github_url'],
git_info['default_branch'],
parent_uuid,
project_type,
project_info['description'],
project_info['primary_language'],
project_info['framework'],
))
conn.commit()
print("=" * 60)
print("PROJECT REGISTERED (ADR-118 TIER 4)")
print("=" * 60)
print(f"\n UUID: {project_uuid}")
print(f" Name: {project_info['name']}")
print(f" Path: {project_path}")
print(f" Type: {project_type}")
if git_info['github_url']:
print(f" GitHub: {git_info['github_url']}")
if project_info['primary_language']:
print(f" Language: {project_info['primary_language']}")
if project_info['framework']:
print(f" Framework: {project_info['framework']}")
print(f"\nNext steps:")
print(f" Index project: /cx --index-project {project_info['name']}")
print(f" List projects: /cx --list-projects")
# Cloud registration (J.15.6.3.4 - ADR-158)
cloud_uuid = None
if cloud_register:
cloud_result = _register_with_cloud(
project_uuid=project_uuid,
project_name=project_info['name'],
root_path=str(project_path),
primary_language=project_info['primary_language'] or "",
framework=project_info['framework'] or "",
project_type=project_type,
parent_uuid=parent_uuid,
)
if cloud_result:
cloud_uuid = cloud_result
# Update local record with cloud UUID
cursor.execute("""
UPDATE projects
SET metadata = json_set(
COALESCE(metadata, '{}'),
'$.cloud_uuid', ?
)
WHERE project_uuid = ?
""", (cloud_uuid, project_uuid))
conn.commit()
print("=" * 60)
return True
except sqlite3.Error as e:
print(f"Database error: {e}")
return False
finally:
conn.close()
def index_project(path_or_name: str, incremental: bool = True, dry_run: bool = False) -> bool: """ Index a registered project for semantic search.
J.15.3.4: Integrates with project_indexer.py for full file indexing.
Args:
path_or_name: Project path or registered name
incremental: Only index changed files (default: True)
dry_run: Preview without making changes
Returns:
True if indexing successful, False otherwise
"""
if not ensure_db_exists():
print("Error: Could not initialize projects.db")
return False
db_path = get_projects_db_path()
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
# Find project by path or name
path = Path(path_or_name).expanduser().resolve()
cursor.execute("""
SELECT id, project_uuid, name, path FROM projects
WHERE path = ? OR name = ? OR project_uuid = ?
""", (str(path), path_or_name, path_or_name))
project = cursor.fetchone()
if not project:
print(f"Error: Project not found: {path_or_name}")
print(" Register with: /cx --register-project <path>")
return False
print("=" * 60)
print("PROJECT INDEXING (ADR-118 TIER 4)")
print("=" * 60)
print(f"\n Project: {project['name']}")
print(f" Path: {project['path']}")
print(f" UUID: {project['project_uuid']}")
print(f" Mode: {'Incremental' if incremental else 'Full'}")
if dry_run:
print(f" *** DRY RUN - No changes will be made ***")
# J.15.3.4: Call project_indexer for actual file indexing
try:
from scripts.project_indexer import index_project as index_project_files
result = index_project_files(
project_path=project['path'],
incremental=incremental,
dry_run=dry_run,
verbose=True
)
# Update project metadata with indexing results
if not dry_run:
cursor.execute("""
UPDATE projects
SET last_indexed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP,
metadata = json_set(
COALESCE(metadata, '{}'),
'$.last_index_stats.files_indexed', ?,
'$.last_index_stats.files_skipped', ?,
'$.last_index_stats.total_size_bytes', ?
)
WHERE id = ?
""", (
result.get('indexed', 0),
result.get('skipped', 0),
result.get('total_size_bytes', 0),
project['id']
))
conn.commit()
print(f"\n✅ Project indexing complete!")
print(f" Files indexed: {result.get('indexed', 0):,}")
print(f" Files skipped: {result.get('skipped', 0):,}")
print(f" Total size: {result.get('total_size_bytes', 0) / (1024*1024):.1f} MB")
print(f" Timestamp: {datetime.now(timezone.utc).isoformat()}")
print("=" * 60)
return True
except ImportError as e:
print(f"\n⚠️ project_indexer.py not available: {e}")
print(f" Falling back to timestamp update only...")
cursor.execute("""
UPDATE projects
SET last_indexed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (project['id'],))
conn.commit()
return True
except sqlite3.Error as e:
print(f"Database error: {e}")
return False
finally:
conn.close()
def embed_project(path_or_name: str, reembed_all: bool = False, dry_run: bool = False) -> bool: """ Generate semantic embeddings for a project's source files.
J.15.4: Integrates with project_embedder.py for embedding generation.
Args:
path_or_name: Project path or registered name
reembed_all: Force re-embed all files (ignore existing)
dry_run: Preview without making changes
Returns:
True if embedding successful, False otherwise
"""
try:
from scripts.project_embedder import embed_project as embed_project_files
result = embed_project_files(
project_path=path_or_name,
incremental=not reembed_all,
dry_run=dry_run,
verbose=True,
reembed_all=reembed_all
)
if "error" in result:
print(f"\n❌ Error: {result['error']}")
return False
return True
except ImportError as e:
print(f"\n⚠️ project_embedder.py not available: {e}")
print(f" Install: pip install sentence-transformers")
return False
def list_projects() -> None: """List all registered projects.""" if not ensure_db_exists(): print("Error: Could not initialize projects.db") return
db_path = get_projects_db_path()
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
cursor.execute("""
SELECT
project_uuid, name, path, project_type, status,
primary_language, framework, last_indexed_at, created_at
FROM projects
ORDER BY name
""")
projects = cursor.fetchall()
print("=" * 60)
print("REGISTERED PROJECTS (ADR-118 TIER 4)")
print("=" * 60)
if not projects:
print("\nNo projects registered.")
print("\nRegister a project:")
print(" /cx --register-project <path>")
print("=" * 60)
return
print(f"\n{len(projects)} project(s) registered:\n")
for p in projects:
status_icon = "✅" if p['status'] == 'active' else "⏸️"
indexed = "✓" if p['last_indexed_at'] else "✗"
print(f" {status_icon} {p['name']}")
print(f" UUID: {p['project_uuid'][:8]}...")
print(f" Path: {p['path']}")
print(f" Type: {p['project_type']}")
if p['primary_language']:
lang = p['primary_language']
if p['framework']:
lang += f" ({p['framework']})"
print(f" Language: {lang}")
print(f" Indexed: {indexed}")
print()
print("=" * 60)
except sqlite3.Error as e:
print(f"Database error: {e}")
finally:
conn.close()
def show_project_stats() -> None: """Show projects.db statistics.""" db_path = get_projects_db_path()
if not db_path.exists():
print("Error: projects.db does not exist")
print(" Initialize with: python3 scripts/init_projects_db.py")
return
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
print("=" * 60)
print("PROJECTS DATABASE STATISTICS (ADR-118 TIER 4)")
print("=" * 60)
print(f"\nDatabase: {db_path}")
print(f"Size: {db_path.stat().st_size:,} bytes ({db_path.stat().st_size / 1024:.1f} KB)")
# Table counts
tables = ['projects', 'content_hashes', 'project_embeddings',
'exclude_patterns', 'project_tags', 'project_activity']
print("\nTable Row Counts:")
print("-" * 40)
for table in tables:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f" {table:25} {count:>10}")
except sqlite3.OperationalError:
print(f" {table:25} {'(missing)':>10}")
# Project summary
cursor.execute("SELECT COUNT(*) FROM projects WHERE status = 'active'")
active = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM projects WHERE last_indexed_at IS NOT NULL")
indexed = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM projects")
total = cursor.fetchone()[0]
print("\nSummary:")
print("-" * 40)
print(f" Total projects: {total:>10}")
print(f" Active projects: {active:>10}")
print(f" Indexed projects: {indexed:>10}")
# Language breakdown
cursor.execute("""
SELECT primary_language, COUNT(*) as cnt
FROM projects
WHERE primary_language IS NOT NULL
GROUP BY primary_language
ORDER BY cnt DESC
LIMIT 5
""")
langs = cursor.fetchall()
if langs:
print("\nLanguage Breakdown:")
print("-" * 40)
for lang in langs:
print(f" {lang['primary_language']:25} {lang['cnt']:>10}")
print("\n" + "=" * 60)
except sqlite3.Error as e:
print(f"Database error: {e}")
finally:
conn.close()
=============================================================================
CLI
=============================================================================
if name == "main": import argparse
parser = argparse.ArgumentParser(
description="Project Registration (ADR-118 TIER 4, ADR-158 Cloud)"
)
parser.add_argument('--register', '-r', metavar='PATH',
help='Register a project')
parser.add_argument('--index', '-i', metavar='PATH_OR_NAME',
help='Index a registered project')
parser.add_argument('--list', '-l', action='store_true',
help='List all registered projects')
parser.add_argument('--stats', '-s', action='store_true',
help='Show database statistics')
parser.add_argument('--parent', metavar='UUID',
help='Parent project UUID (for submodules)')
# Cloud options (J.15.6.3.4 - ADR-158)
parser.add_argument('--cloud', '-c', action='store_true',
help='Also register with cloud API')
parser.add_argument('--sync-projects', action='store_true',
help='Sync pending cloud registrations')
parser.add_argument('--dry-run', action='store_true',
help='Preview without making changes')
args = parser.parse_args()
if args.register:
sys.exit(0 if register_project(
args.register,
args.parent,
cloud_register=args.cloud
) else 1)
elif args.index:
sys.exit(0 if index_project(args.index) else 1)
elif args.sync_projects:
success, remaining = sync_pending_projects(dry_run=args.dry_run)
sys.exit(0 if remaining == 0 else 1)
elif args.list:
list_projects()
elif args.stats:
show_project_stats()
else:
parser.print_help()