scripts-cloud-sync-client
#!/usr/bin/env python3 """ CODITECT Cloud Sync Client
Syncs local context data to CODITECT cloud backend. Supports multi-tenant, multi-user, multi-session architecture.
Authentication Methods: 1. Email/Password (default) - Uses /api/v1/auth/login/ 2. API Token - Pre-generated access token 3. License Key (future) - Auto-generates JWT from license
Configuration: ~/.coditect/config.json or $PROJECT/.coditect/config.json { "cloud": { "api_url": "https://api.coditect.ai", "email": "user@example.com", "password": "...", // or use CODITECT_PASSWORD env var "access_token": "...", // alternative: pre-generated token "tenant_id": "...", "offline_mode": false } }
Environment Variables: CODITECT_API_URL - Override API URL CODITECT_EMAIL - User email for authentication CODITECT_PASSWORD - User password (required with email) CODITECT_ACCESS_TOKEN - Pre-generated access token (alternative to email/password) CODITECT_TENANT_ID - Tenant ID CODITECT_OFFLINE - Set to "true" for offline-only mode """
import json import os import sqlite3 import sys from dataclasses import dataclass, asdict from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError
@dataclass class CloudConfig: """Cloud connection configuration.""" api_url: str = "https://api.coditect.ai" # Authentication options (in order of preference) email: Optional[str] = None password: Optional[str] = None access_token: Optional[str] = None # Pre-generated token refresh_token: Optional[str] = None # Tenant/project context tenant_id: Optional[str] = None team_id: Optional[str] = None project_id: Optional[str] = None # Behavior flags offline_mode: bool = False
@dataclass class TaskEvent: """Task tracking event to sync to cloud.""" task_id: str description: str status: str # pending, in_progress, completed project_id: str session_id: Optional[str] = None user_id: Optional[str] = None outcome: Optional[str] = None outcome_score: Optional[float] = None tool_success_count: int = 0 tool_error_count: int = 0 timestamp: Optional[str] = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now(timezone.utc).isoformat()
class CloudSyncClient: """ Client for syncing context data to CODITECT cloud.
Handles:
- Authentication via license key
- Multi-tenant data isolation
- Offline fallback with sync queue
- Retry logic for failed syncs
"""
def __init__(self, config: Optional[CloudConfig] = None, data_root: Optional[Path] = None):
self.config = config or self._load_config()
self.data_root = data_root or self._find_data_root()
# ADR-118: Cloud sync syncs org.db (TIER 2 - irreplaceable data)
try:
from paths import (
get_context_storage_dir,
get_org_db_path,
ORG_DB,
)
context_storage = get_context_storage_dir()
self.local_db = ORG_DB # Sync irreplaceable org data (ADR-118 Tier 2)
self.sync_queue_db = context_storage / "sync_queue.db"
# NOTE: context.db is DEPRECATED - NO FALLBACK per ADR-118
except ImportError:
# Fallback for backward compatibility with path resolution
if self.data_root:
_user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if _user_data.exists():
context_storage = _user_data
else:
context_storage = self.data_root / "context-storage"
self.local_db = context_storage / "org.db" # ADR-118 Tier 2
self.sync_queue_db = context_storage / "sync_queue.db"
# NOTE: context.db is DEPRECATED - NO FALLBACK per ADR-118
else:
self.local_db = None
self.sync_queue_db = None
self._access_token: Optional[str] = None
self._refresh_token: Optional[str] = None
def _find_data_root(self) -> Optional[Path]:
"""Find CODITECT data root from environment or common locations."""
# Environment variable takes precedence
if os.environ.get("CODITECT_DATA_ROOT"):
return Path(os.environ["CODITECT_DATA_ROOT"])
# Check current project
project_dir = os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd())
project_coditect = Path(project_dir) / ".coditect"
if project_coditect.exists():
return project_coditect
# Global fallback
global_coditect = Path.home() / ".coditect"
if global_coditect.exists():
return global_coditect
return None
def _load_config(self) -> CloudConfig:
"""Load configuration from file and environment."""
config = CloudConfig()
# Try to load from config file
config_paths = [
Path(os.environ.get("CLAUDE_PROJECT_DIR", ".")) / ".coditect" / "config.json",
Path.home() / ".coditect" / "config.json",
]
for config_path in config_paths:
if config_path.exists():
try:
with open(config_path) as f:
data = json.load(f)
cloud_config = data.get("cloud", {})
config.api_url = cloud_config.get("api_url", config.api_url)
# Auth credentials
config.email = cloud_config.get("email")
config.password = cloud_config.get("password")
config.access_token = cloud_config.get("access_token")
config.refresh_token = cloud_config.get("refresh_token")
# Context
config.tenant_id = cloud_config.get("tenant_id")
config.team_id = cloud_config.get("team_id")
config.project_id = cloud_config.get("project_id")
config.offline_mode = cloud_config.get("offline_mode", False)
break
except (json.JSONDecodeError, IOError):
continue
# Environment variables override config file
config.api_url = os.environ.get("CODITECT_API_URL", config.api_url)
config.email = os.environ.get("CODITECT_EMAIL", config.email)
config.password = os.environ.get("CODITECT_PASSWORD", config.password)
config.access_token = os.environ.get("CODITECT_ACCESS_TOKEN", config.access_token)
config.tenant_id = os.environ.get("CODITECT_TENANT_ID", config.tenant_id)
config.offline_mode = os.environ.get("CODITECT_OFFLINE", "").lower() == "true" or config.offline_mode
return config
def _get_access_token(self) -> Optional[str]:
"""
Get access token for API authentication.
Authentication priority:
1. Cached access token (from previous call)
2. Pre-configured access token (from config/env)
3. Email/password login (via /api/v1/auth/login/)
Returns:
Access token string or None if authentication fails.
"""
# Return cached token if available
if self._access_token:
return self._access_token
# Use pre-configured access token
if self.config.access_token:
self._access_token = self.config.access_token
return self._access_token
# Authenticate with email/password
if self.config.email and self.config.password:
return self._login_with_credentials()
print("No authentication configured. Set email/password or access_token.", file=sys.stderr)
return None
def _login_with_credentials(self) -> Optional[str]:
"""
Login with email/password to get JWT tokens.
Calls POST /api/v1/auth/login/ with credentials.
Stores both access and refresh tokens.
Returns:
Access token string or None if login fails.
"""
try:
login_url = f"{self.config.api_url}/api/v1/auth/login/"
request = Request(
login_url,
data=json.dumps({
"email": self.config.email,
"password": self.config.password
}).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urlopen(request, timeout=15) as response:
data = json.loads(response.read().decode())
self._access_token = data.get("access")
self._refresh_token = data.get("refresh")
# Store tenant_id from response if not configured
if not self.config.tenant_id and data.get("tenant_id"):
self.config.tenant_id = data.get("tenant_id")
return self._access_token
except HTTPError as e:
error_body = e.read().decode() if hasattr(e, 'read') else str(e)
print(f"Login failed ({e.code}): {error_body}", file=sys.stderr)
return None
except URLError as e:
print(f"Connection failed: {e}", file=sys.stderr)
return None
def _refresh_access_token(self) -> Optional[str]:
"""
Refresh the access token using the refresh token.
Calls POST /api/v1/auth/token/refresh/ with refresh token.
Returns:
New access token or None if refresh fails.
"""
refresh_token = getattr(self, '_refresh_token', None) or self.config.refresh_token
if not refresh_token:
return None
try:
refresh_url = f"{self.config.api_url}/api/v1/auth/token/refresh/"
request = Request(
refresh_url,
data=json.dumps({"refresh": refresh_token}).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urlopen(request, timeout=10) as response:
data = json.loads(response.read().decode())
self._access_token = data.get("access")
return self._access_token
except (URLError, HTTPError) as e:
print(f"Token refresh failed: {e}", file=sys.stderr)
# Clear tokens on refresh failure
self._access_token = None
self._refresh_token = None
return None
def _api_request(
self,
endpoint: str,
data: Dict[str, Any],
method: str = "POST",
retry_on_auth_failure: bool = True
) -> Optional[Dict]:
"""
Make authenticated API request.
Args:
endpoint: API endpoint path (e.g., "/api/v1/context/tasks/")
data: Request payload
method: HTTP method
retry_on_auth_failure: If True, refresh token and retry on 401
Returns:
Response data dict or None on failure.
"""
if self.config.offline_mode:
return None
token = self._get_access_token()
if not token:
return None
try:
url = f"{self.config.api_url}{endpoint}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
if self.config.tenant_id:
headers["X-Tenant-ID"] = self.config.tenant_id
request = Request(
url,
data=json.dumps(data).encode(),
headers=headers,
method=method
)
with urlopen(request, timeout=30) as response:
return json.loads(response.read().decode())
except HTTPError as e:
if e.code == 401 and retry_on_auth_failure:
# Token expired - try to refresh
self._access_token = None
new_token = self._refresh_access_token()
if new_token:
return self._api_request(endpoint, data, method, retry_on_auth_failure=False)
# Refresh failed - try re-login
new_token = self._login_with_credentials() if self.config.email else None
if new_token:
return self._api_request(endpoint, data, method, retry_on_auth_failure=False)
error_body = e.read().decode() if hasattr(e, 'read') else str(e)
print(f"API request failed ({e.code}): {error_body}", file=sys.stderr)
return None
except URLError as e:
print(f"Connection failed: {e}", file=sys.stderr)
return None
def _queue_for_sync(self, event_type: str, data: Dict[str, Any]):
"""Queue event for later sync when offline."""
if not self.sync_queue_db:
return
self.sync_queue_db.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self.sync_queue_db)
conn.execute("""
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
data TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
retry_count INTEGER DEFAULT 0,
last_error TEXT
)
""")
conn.execute(
"INSERT INTO sync_queue (event_type, data) VALUES (?, ?)",
(event_type, json.dumps(data))
)
conn.commit()
conn.close()
def sync_task(self, task: TaskEvent) -> bool:
"""
Sync a task event to the cloud.
Returns True if synced successfully, False if queued for later.
"""
data = {
**asdict(task),
"tenant_id": self.config.tenant_id,
"team_id": self.config.team_id,
"project_id": self.config.project_id or task.project_id,
}
result = self._api_request("/api/v1/context/tasks/", data)
if result:
return True
else:
# Queue for later sync
self._queue_for_sync("task", data)
return False
def sync_tasks_batch(self, tasks: List[TaskEvent]) -> Dict[str, int]:
"""
Sync multiple tasks in a batch.
Returns counts of synced vs queued tasks.
"""
if not tasks:
return {"synced": 0, "queued": 0}
data = {
"tasks": [asdict(t) for t in tasks],
"tenant_id": self.config.tenant_id,
"team_id": self.config.team_id,
"project_id": self.config.project_id,
}
result = self._api_request("/api/v1/context/tasks/batch/", data)
if result:
return {"synced": len(tasks), "queued": 0}
else:
self._queue_for_sync("tasks_batch", data)
return {"synced": 0, "queued": len(tasks)}
def process_sync_queue(self) -> Dict[str, int]:
"""
Process queued events that failed to sync previously.
Returns counts of processed, succeeded, and failed events.
"""
if not self.sync_queue_db or not self.sync_queue_db.exists():
return {"processed": 0, "succeeded": 0, "failed": 0}
conn = sqlite3.connect(self.sync_queue_db)
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"SELECT * FROM sync_queue WHERE retry_count < 5 ORDER BY created_at LIMIT 100"
)
results = {"processed": 0, "succeeded": 0, "failed": 0}
for row in cursor.fetchall():
results["processed"] += 1
try:
data = json.loads(row["data"])
endpoint = "/api/v1/context/tasks/"
if row["event_type"] == "tasks_batch":
endpoint = "/api/v1/context/tasks/batch/"
result = self._api_request(endpoint, data)
if result:
conn.execute("DELETE FROM sync_queue WHERE id = ?", (row["id"],))
results["succeeded"] += 1
else:
conn.execute(
"UPDATE sync_queue SET retry_count = retry_count + 1 WHERE id = ?",
(row["id"],)
)
results["failed"] += 1
except Exception as e:
conn.execute(
"UPDATE sync_queue SET retry_count = retry_count + 1, last_error = ? WHERE id = ?",
(str(e), row["id"])
)
results["failed"] += 1
conn.commit()
conn.close()
return results
def get_sync_status(self) -> Dict[str, Any]:
"""Get current sync status including queue depth and auth info."""
# Determine auth method
if self.config.access_token or self._access_token:
auth_method = "token"
elif self.config.email:
auth_method = "email"
else:
auth_method = "none"
status = {
"configured": auth_method != "none",
"auth_method": auth_method,
"authenticated": self._access_token is not None,
"offline_mode": self.config.offline_mode,
"api_url": self.config.api_url,
"tenant_id": self.config.tenant_id,
"queue_depth": 0,
}
if self.sync_queue_db and self.sync_queue_db.exists():
try:
conn = sqlite3.connect(self.sync_queue_db)
cursor = conn.execute("SELECT COUNT(*) FROM sync_queue WHERE retry_count < 5")
status["queue_depth"] = cursor.fetchone()[0]
conn.close()
except sqlite3.Error:
status["queue_depth"] = -1 # Indicate error
return status
def test_connection(self) -> Dict[str, Any]:
"""
Test connection and authentication to the cloud API.
Returns:
Dict with connection test results.
"""
result = {
"success": False,
"auth_method": "none",
"user": None,
"tenant": None,
"error": None,
}
# Determine auth method
if self.config.access_token:
result["auth_method"] = "token"
elif self.config.email:
result["auth_method"] = "email"
else:
result["error"] = "No authentication configured"
return result
# Try to authenticate
token = self._get_access_token()
if not token:
result["error"] = "Authentication failed"
return result
# Test with /api/v1/auth/me/ endpoint
try:
url = f"{self.config.api_url}/api/v1/auth/me/"
headers = {
"Authorization": f"Bearer {token}",
}
request = Request(url, headers=headers, method="GET")
with urlopen(request, timeout=10) as response:
data = json.loads(response.read().decode())
result["success"] = True
result["user"] = data.get("email") or data.get("id")
result["tenant"] = data.get("tenant_id") or self.config.tenant_id
except HTTPError as e:
error_body = e.read().decode() if hasattr(e, 'read') else str(e)
result["error"] = f"API error ({e.code}): {error_body}"
except URLError as e:
result["error"] = f"Connection error: {e}"
return result
def main(): """CLI for cloud sync operations.""" import argparse
parser = argparse.ArgumentParser(description="CODITECT Cloud Sync Client")
parser.add_argument("--status", action="store_true", help="Show sync status")
parser.add_argument("--test", action="store_true", help="Test connection and authentication")
parser.add_argument("--process-queue", action="store_true", help="Process sync queue")
parser.add_argument("--sync-local", action="store_true", help="Sync local database to cloud")
args = parser.parse_args()
client = CloudSyncClient()
if args.status:
status = client.get_sync_status()
print(json.dumps(status, indent=2))
elif args.test:
print("Testing connection to CODITECT cloud...")
result = client.test_connection()
print(json.dumps(result, indent=2))
if result["success"]:
print(f"\n✓ Successfully authenticated as {result['user']}")
else:
print(f"\n✗ Connection failed: {result['error']}")
elif args.process_queue:
result = client.process_sync_queue()
print(json.dumps(result, indent=2))
elif args.sync_local:
# Sync tasks from local database to cloud
if not client.local_db or not client.local_db.exists():
print("No local database found")
return
conn = sqlite3.connect(client.local_db)
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM task_tracking
WHERE synced_to_cloud IS NULL OR synced_to_cloud = 0
LIMIT 100
""")
tasks = []
for row in cursor.fetchall():
tasks.append(TaskEvent(
task_id=row["task_id"],
description=row.get("task_description", ""),
status=row["status"],
project_id=client.config.project_id or "unknown",
outcome=row.get("outcome"),
outcome_score=row.get("outcome_score"),
tool_success_count=row.get("tool_success_count", 0),
tool_error_count=row.get("tool_error_count", 0),
))
if tasks:
result = client.sync_tasks_batch(tasks)
print(json.dumps(result, indent=2))
else:
print("No tasks to sync")
conn.close()
if name == "main": main()