#!/usr/bin/env python3 """ Cloud Project Client - J.15.6.3
Handles bidirectional project synchronization between local coditect-core and the cloud multi-tenant platform.
ADR: ADR-158 Cloud Project Registration API Track: J.15.6 Cloud Integration
Features:
- Offline queue for failed registrations
- Retry with exponential backoff
- Local UUID → Cloud UUID resolution cache
- Batch sync for efficiency
- Tenant-aware authentication """
import json import logging import sqlite3 import time from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from uuid import UUID
Optional: use requests if available, otherwise httpx
try: import requests HTTP_CLIENT = 'requests' except ImportError: try: import httpx HTTP_CLIENT = 'httpx' except ImportError: HTTP_CLIENT = None
from .paths import ( get_sessions_db_path, get_machine_id, get_coditect_data_dir, )
logger = logging.getLogger(name)
@dataclass class ProjectRegistration: """Local project data for cloud registration.""" local_project_uuid: str project_name: str root_path: str primary_language: str = "" framework: str = "" content_hash: str = "" project_type: str = "standalone" # standalone, submodule, monorepo parent_project_uuid: Optional[str] = None
@dataclass class CloudProject: """Cloud project response.""" cloud_uuid: str local_project_uuid: str registration_status: str registered_at: str tenant_id: str = ""
@dataclass class SyncResult: """Result of a sync operation.""" success: bool cloud_uuid: Optional[str] = None status: str = "" error: Optional[str] = None queued: bool = False
class OfflineQueue: """SQLite-backed queue for offline operations."""
def __init__(self, db_path: Optional[Path] = None):
if db_path is None:
data_dir = get_coditect_data_dir()
db_path = data_dir / "context-storage" / "sync_queue.db"
self.db_path = db_path
self._init_db()
def _init_db(self) -> None:
"""Initialize queue database."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS project_sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
operation TEXT NOT NULL,
payload TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 5,
next_retry_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
last_error TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_next_retry
ON project_sync_queue(next_retry_at)
""")
def enqueue(
self,
operation: str,
payload: Dict[str, Any],
max_retries: int = 5,
) -> int:
"""Add operation to queue."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
INSERT INTO project_sync_queue
(operation, payload, max_retries, next_retry_at)
VALUES (?, ?, ?, ?)
""",
(
operation,
json.dumps(payload),
max_retries,
datetime.now(timezone.utc).isoformat(),
),
)
return cursor.lastrowid
def get_pending(self, limit: int = 10) -> List[Tuple[int, str, Dict]]:
"""Get pending operations ready for retry."""
now = datetime.now(timezone.utc).isoformat()
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"""
SELECT id, operation, payload
FROM project_sync_queue
WHERE next_retry_at <= ?
AND retry_count < max_retries
ORDER BY created_at
LIMIT ?
""",
(now, limit),
).fetchall()
return [
(row["id"], row["operation"], json.loads(row["payload"]))
for row in rows
]
def mark_success(self, queue_id: int) -> None:
"""Remove successfully processed item."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"DELETE FROM project_sync_queue WHERE id = ?",
(queue_id,),
)
def mark_failed(self, queue_id: int, error: str) -> None:
"""Update retry count and schedule next attempt."""
with sqlite3.connect(self.db_path) as conn:
# Get current retry count
row = conn.execute(
"SELECT retry_count FROM project_sync_queue WHERE id = ?",
(queue_id,),
).fetchone()
if row:
retry_count = row[0] + 1
# Exponential backoff: 1min, 2min, 4min, 8min, 16min
delay_seconds = 60 * (2 ** retry_count)
next_retry = datetime.now(timezone.utc).timestamp() + delay_seconds
conn.execute(
"""
UPDATE project_sync_queue
SET retry_count = ?, next_retry_at = ?, last_error = ?
WHERE id = ?
""",
(
retry_count,
datetime.fromtimestamp(next_retry, timezone.utc).isoformat(),
error,
queue_id,
),
)
def queue_size(self) -> int:
"""Get total items in queue."""
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT COUNT(*) FROM project_sync_queue"
).fetchone()
return row[0] if row else 0
class UUIDCache: """Cache for local UUID → cloud UUID mappings."""
def __init__(self, db_path: Optional[Path] = None):
if db_path is None:
db_path = get_sessions_db_path()
self.db_path = db_path
self._init_table()
def _init_table(self) -> None:
"""Ensure cache table exists."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS cloud_project_cache (
local_uuid TEXT PRIMARY KEY,
cloud_uuid TEXT NOT NULL,
tenant_id TEXT,
cached_at TEXT DEFAULT CURRENT_TIMESTAMP,
expires_at TEXT
)
""")
def get(self, local_uuid: str) -> Optional[str]:
"""Get cloud UUID for local UUID."""
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"""
SELECT cloud_uuid FROM cloud_project_cache
WHERE local_uuid = ?
AND (expires_at IS NULL OR expires_at > datetime('now'))
""",
(local_uuid,),
).fetchone()
return row[0] if row else None
def set(
self,
local_uuid: str,
cloud_uuid: str,
tenant_id: str = "",
ttl_hours: int = 24,
) -> None:
"""Cache UUID mapping."""
expires = None
if ttl_hours > 0:
expires = datetime.now(timezone.utc).timestamp() + (ttl_hours * 3600)
expires = datetime.fromtimestamp(expires, timezone.utc).isoformat()
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO cloud_project_cache
(local_uuid, cloud_uuid, tenant_id, cached_at, expires_at)
VALUES (?, ?, ?, datetime('now'), ?)
""",
(local_uuid, cloud_uuid, tenant_id, expires),
)
def invalidate(self, local_uuid: str) -> None:
"""Remove cached mapping."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"DELETE FROM cloud_project_cache WHERE local_uuid = ?",
(local_uuid,),
)
class CloudProjectClient: """ Client for cloud project registration and sync.
Handles:
- Project registration with cloud platform
- Offline queue for failed operations
- UUID resolution caching
- Batch synchronization
"""
def __init__(
self,
api_url: Optional[str] = None,
auth_token: Optional[str] = None,
config_path: Optional[Path] = None,
):
"""
Initialize client.
Args:
api_url: Cloud API base URL (default: from config)
auth_token: JWT token (default: from config)
config_path: Path to config.json
"""
self.api_url = api_url
self.auth_token = auth_token
self.machine_uuid = get_machine_id()
self.queue = OfflineQueue()
self.cache = UUIDCache()
# Load config if not provided
if not self.api_url or not self.auth_token:
self._load_config(config_path)
# HTTP client setup
if HTTP_CLIENT is None:
logger.warning("No HTTP client available (install requests or httpx)")
def _load_config(self, config_path: Optional[Path] = None) -> None:
"""Load configuration from file."""
if config_path is None:
config_path = Path.home() / ".coditect" / "config" / "config.json"
if config_path.exists():
with open(config_path) as f:
config = json.load(f)
cloud_config = config.get("cloud_sync", {})
if not self.api_url:
self.api_url = cloud_config.get("api_url", "https://api.coditect.ai")
if not self.auth_token:
self.auth_token = cloud_config.get("auth_token", "")
else:
if not self.api_url:
self.api_url = "https://api.coditect.ai"
def _request(
self,
method: str,
endpoint: str,
data: Optional[Dict] = None,
params: Optional[Dict] = None,
) -> Tuple[int, Optional[Dict]]:
"""Make HTTP request to cloud API."""
if HTTP_CLIENT is None:
return 503, {"error": "No HTTP client available"}
url = f"{self.api_url.rstrip('/')}{endpoint}"
headers = {
"Content-Type": "application/json",
"X-Machine-UUID": self.machine_uuid,
}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
try:
if HTTP_CLIENT == 'requests':
response = requests.request(
method,
url,
json=data,
params=params,
headers=headers,
timeout=30,
)
return response.status_code, response.json() if response.content else None
else: # httpx
with httpx.Client(timeout=30) as client:
response = client.request(
method,
url,
json=data,
params=params,
headers=headers,
)
return response.status_code, response.json() if response.content else None
except Exception as e:
logger.error(f"Request failed: {e}")
return 0, {"error": str(e)}
def register_project(
self,
project: ProjectRegistration,
force: bool = False,
) -> SyncResult:
"""
Register local project with cloud.
Args:
project: Project registration data
force: Skip cache and re-register
Returns:
SyncResult with cloud UUID or queue status
"""
# Check cache first
if not force:
cached = self.cache.get(project.local_project_uuid)
if cached:
logger.debug(f"Using cached cloud UUID: {cached}")
return SyncResult(
success=True,
cloud_uuid=cached,
status="cached",
)
# Prepare payload
payload = {
"local_project_uuid": project.local_project_uuid,
"project_name": project.project_name,
"machine_uuid": self.machine_uuid,
"root_path": project.root_path,
"primary_language": project.primary_language,
"framework": project.framework,
"content_hash": project.content_hash,
"project_type": project.project_type,
}
if project.parent_project_uuid:
payload["parent_project_uuid"] = project.parent_project_uuid
# Make request
status, response = self._request("POST", "/api/v1/projects/register/", payload)
if status in (200, 201) and response:
cloud_uuid = response.get("cloud_uuid")
if cloud_uuid:
self.cache.set(
project.local_project_uuid,
cloud_uuid,
response.get("tenant_id", ""),
)
return SyncResult(
success=True,
cloud_uuid=cloud_uuid,
status=response.get("registration_status", "registered"),
)
# Offline or failed - queue for retry
if status == 0 or status >= 500:
queue_id = self.queue.enqueue("register", payload)
logger.info(f"Queued project registration: {queue_id}")
return SyncResult(
success=False,
status="queued",
queued=True,
error=response.get("error") if response else "Network error",
)
# Client error
return SyncResult(
success=False,
status="error",
error=response.get("error") if response else f"HTTP {status}",
)
def resolve_uuid(
self,
local_uuid: str,
use_cache: bool = True,
) -> Optional[str]:
"""
Resolve local project UUID to cloud UUID.
Args:
local_uuid: Local project UUID
use_cache: Check cache first
Returns:
Cloud UUID or None if not registered
"""
if use_cache:
cached = self.cache.get(local_uuid)
if cached:
return cached
status, response = self._request(
"GET",
f"/api/v1/projects/by-local-uuid/{local_uuid}/",
params={"machine_uuid": self.machine_uuid},
)
if status == 200 and response:
cloud_uuid = response.get("cloud_uuid")
if cloud_uuid:
self.cache.set(local_uuid, cloud_uuid)
return cloud_uuid
return None
def get_registration_status(self, cloud_uuid: str) -> Optional[Dict]:
"""Get registration status for cloud project."""
status, response = self._request(
"GET",
f"/api/v1/projects/{cloud_uuid}/registration-status/",
)
return response if status == 200 else None
def sync_projects(
self,
projects: List[ProjectRegistration],
) -> Dict[str, SyncResult]:
"""
Batch sync multiple projects.
Args:
projects: List of projects to sync
Returns:
Dict mapping local_uuid to SyncResult
"""
results = {}
# Prepare batch payload
payload = {
"projects": [
{
"local_project_uuid": p.local_project_uuid,
"project_name": p.project_name,
"machine_uuid": self.machine_uuid,
"root_path": p.root_path,
"primary_language": p.primary_language,
"framework": p.framework,
"content_hash": p.content_hash,
"project_type": p.project_type,
}
for p in projects
]
}
status, response = self._request("POST", "/api/v1/projects/sync/", payload)
if status == 200 and response:
for item in response.get("results", []):
local_uuid = item.get("local_project_uuid")
cloud_uuid = item.get("cloud_uuid")
if local_uuid and cloud_uuid:
self.cache.set(local_uuid, cloud_uuid)
results[local_uuid] = SyncResult(
success=True,
cloud_uuid=cloud_uuid,
status=item.get("status", "synced"),
)
else:
# Queue all for retry
for p in projects:
self.queue.enqueue("register", {
"local_project_uuid": p.local_project_uuid,
"project_name": p.project_name,
"machine_uuid": self.machine_uuid,
"root_path": p.root_path,
})
results[p.local_project_uuid] = SyncResult(
success=False,
status="queued",
queued=True,
)
return results
def process_queue(self, batch_size: int = 10) -> int:
"""
Process pending items in offline queue.
Args:
batch_size: Number of items to process
Returns:
Number of successfully processed items
"""
pending = self.queue.get_pending(batch_size)
success_count = 0
for queue_id, operation, payload in pending:
if operation == "register":
project = ProjectRegistration(
local_project_uuid=payload["local_project_uuid"],
project_name=payload.get("project_name", ""),
root_path=payload.get("root_path", ""),
primary_language=payload.get("primary_language", ""),
framework=payload.get("framework", ""),
content_hash=payload.get("content_hash", ""),
)
result = self.register_project(project, force=True)
if result.success:
self.queue.mark_success(queue_id)
success_count += 1
elif not result.queued:
self.queue.mark_failed(queue_id, result.error or "Unknown error")
return success_count
def get_queue_status(self) -> Dict[str, Any]:
"""Get offline queue statistics."""
return {
"queue_size": self.queue.queue_size(),
"pending": len(self.queue.get_pending(100)),
}
def main(): """CLI interface for cloud project client.""" import argparse
parser = argparse.ArgumentParser(description="Cloud Project Client")
parser.add_argument("--register", help="Register project by local UUID")
parser.add_argument("--resolve", help="Resolve local UUID to cloud UUID")
parser.add_argument("--process-queue", action="store_true", help="Process offline queue")
parser.add_argument("--queue-status", action="store_true", help="Show queue status")
parser.add_argument("--api-url", help="Cloud API URL")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
client = CloudProjectClient(api_url=args.api_url)
if args.register:
# For CLI, we need minimal project data
project = ProjectRegistration(
local_project_uuid=args.register,
project_name="CLI Registration",
root_path=str(Path.cwd()),
)
result = client.register_project(project)
print(f"Result: {result}")
elif args.resolve:
cloud_uuid = client.resolve_uuid(args.resolve)
if cloud_uuid:
print(f"Cloud UUID: {cloud_uuid}")
else:
print("Not registered")
elif args.process_queue:
count = client.process_queue()
print(f"Processed {count} items from queue")
elif args.queue_status:
status = client.get_queue_status()
print(f"Queue status: {json.dumps(status, indent=2)}")
if name == "main": main()