Skip to main content

#!/usr/bin/env python3 """ CODITECT Ralph Wiggum Cloud Sync Client (H.8.5.4)

Syncs local Ralph Wiggum autonomous agent tables to CODITECT cloud backend. Extends the base CloudSyncClient with support for:

  • Agent Sessions
  • Checkpoints (ADR-108)
  • Health Events (ADR-110)
  • Token Records (ADR-111)
  • Budgets (ADR-111)

Architecture (ADR-118 Four-Tier): Local SQLite (sessions.db - Tier 3) Cloud PostgreSQL ┌─────────────────────────┐ ┌─────────────────────────┐ │ agent_sessions │ sync │ AgentSession │ │ checkpoints │ ────────────► │ Checkpoint │ │ health_events │ │ HealthEvent │ │ token_records │ │ TokenRecord │ │ budgets │ │ Budget │ └─────────────────────────┘ └─────────────────────────┘

Usage: from scripts.core.ralph_wiggum_sync_client import RalphWiggumSyncClient

client = RalphWiggumSyncClient()
result = client.sync_all()
print(f"Synced {result['total_synced']} records")

# Or sync specific tables
client.sync_sessions()
client.sync_checkpoints()
client.sync_health_events()
client.sync_token_records()

CLI: python3 scripts/core/ralph_wiggum_sync_client.py --status python3 scripts/core/ralph_wiggum_sync_client.py --sync python3 scripts/core/ralph_wiggum_sync_client.py --sync-checkpoints python3 scripts/core/ralph_wiggum_sync_client.py --sync-health

Author: CODITECT Framework Version: 1.0.0 Created: January 27, 2026 Task Reference: H.8.5.4 ADR References: ADR-108, ADR-110, ADR-111, ADR-112 """

import argparse import json import os import sqlite3 import sys from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional

Import base cloud sync client

sys.path.insert(0, str(Path(file).parent)) try: from cloud_sync_client import CloudSyncClient, CloudConfig except ImportError: # Fallback if run directly pass

=============================================================================

DATA MODELS

=============================================================================

@dataclass class AgentSessionEvent: """Agent session data for cloud sync.""" id: str task_id: Optional[str] = None agent_type: str = "implementation" loop_type: str = "ralph" started_at: str = "" completed_at: Optional[str] = None status: str = "running" total_iterations: int = 0 max_iterations: int = 100 config: Optional[str] = None # JSON string final_outcome: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
return asdict(self)

@dataclass class CheckpointEvent: """Checkpoint data for cloud sync.""" id: str session_id: Optional[str] = None task_id: str = "" agent_id: str = "" agent_type: str = "implementation" iteration: int = 1 phase: str = "planning" completed_items: Optional[str] = None # JSON array pending_items: Optional[str] = None # JSON array blocked_items: Optional[str] = None # JSON array current_focus: str = "" context_summary: Optional[str] = None # JSON object tokens_consumed: int = 0 tools_invoked: int = 0 files_modified: Optional[str] = None # JSON array tests_passed: int = 0 tests_failed: int = 0 tests_skipped: int = 0 coverage_percent: Optional[float] = None last_successful_state: Optional[str] = None rollback_instructions: Optional[str] = None continuation_prompt: Optional[str] = None hash: str = "" signature: Optional[str] = None created_at: str = ""

def to_dict(self) -> Dict[str, Any]:
return asdict(self)

@dataclass class HealthEventData: """Health event data for cloud sync.""" id: int # Auto-increment ID from SQLite agent_id: str session_id: Optional[str] = None task_id: Optional[str] = None timestamp: str = "" previous_state: Optional[str] = None new_state: str = "healthy" state_reason: Optional[str] = None heartbeat_data: Optional[str] = None # JSON object intervention_level: Optional[str] = None intervention_action: Optional[str] = None intervention_result: Optional[str] = None circuit_breaker_target: Optional[str] = None circuit_breaker_state: Optional[str] = None circuit_breaker_failures: Optional[int] = None

def to_dict(self) -> Dict[str, Any]:
return asdict(self)

@dataclass class TokenRecordEvent: """Token record data for cloud sync.""" id: str timestamp: str = "" session_id: Optional[str] = None task_id: Optional[str] = None agent_id: Optional[str] = None iteration: Optional[int] = None checkpoint_id: Optional[str] = None model: str = "" input_tokens: int = 0 output_tokens: int = 0 cache_read_tokens: int = 0 cache_write_tokens: int = 0 input_cost_usd: Optional[float] = None output_cost_usd: Optional[float] = None cache_cost_usd: Optional[float] = None total_cost_usd: Optional[float] = None tool_calls: Optional[str] = None # JSON array latency_ms: Optional[int] = None success: int = 1

def to_dict(self) -> Dict[str, Any]:
return asdict(self)

@dataclass class BudgetEvent: """Budget data for cloud sync.""" id: str scope: str scope_id: str parent_budget_id: Optional[str] = None limit_type: str = "tokens" limit_value: float = 0.0 current_value: float = 0.0 period: Optional[str] = None period_start: Optional[str] = None action: str = "alert_only" alert_threshold_percent: int = 80 status: str = "active" created_at: str = "" updated_at: str = ""

def to_dict(self) -> Dict[str, Any]:
return asdict(self)

=============================================================================

RALPH WIGGUM SYNC CLIENT

=============================================================================

class RalphWiggumSyncClient: """ Client for syncing Ralph Wiggum autonomous agent data to cloud.

Handles:
- Agent session lifecycle sync
- Checkpoint persistence for handoffs
- Health event tracking
- Token consumption records
- Budget enforcement data
"""

# API endpoints for Ralph Wiggum tables
ENDPOINTS = {
"sessions": "/api/v1/context/ralph/sessions/",
"sessions_batch": "/api/v1/context/ralph/sessions/batch/",
"checkpoints": "/api/v1/context/ralph/checkpoints/",
"checkpoints_batch": "/api/v1/context/ralph/checkpoints/batch/",
"health_events": "/api/v1/context/ralph/health-events/",
"health_events_batch": "/api/v1/context/ralph/health-events/batch/",
"token_records": "/api/v1/context/ralph/token-records/",
"token_records_batch": "/api/v1/context/ralph/token-records/batch/",
"budgets": "/api/v1/context/ralph/budgets/",
"budgets_batch": "/api/v1/context/ralph/budgets/batch/",
}

# Batch size for syncing
BATCH_SIZE = 100

def __init__(self, config: Optional[CloudConfig] = None):
"""Initialize with optional cloud config."""
# Import CloudSyncClient at runtime to avoid circular imports
try:
from cloud_sync_client import CloudSyncClient, CloudConfig as CC
self._base_client = CloudSyncClient(config)
except ImportError:
self._base_client = None
print("Warning: CloudSyncClient not available", file=sys.stderr)

self._find_local_db()

def _find_local_db(self) -> None:
"""Find local sessions.db with Ralph Wiggum tables (ADR-118 Tier 3)."""
# Try paths module first (preferred)
try:
from scripts.core.paths import get_sessions_db_path
self.local_db = get_sessions_db_path()
if self.local_db.parent.exists():
return
except ImportError:
pass

# Check user data location (ADR-114)
user_data = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage"
if user_data.exists():
self.local_db = user_data / "sessions.db"
return

# Fallback to legacy location
legacy = Path.home() / ".coditect" / "context-storage"
if legacy.exists():
self.local_db = legacy / "sessions.db"
return

# Check environment variable
if os.environ.get("CODITECT_DATA_ROOT"):
self.local_db = Path(os.environ["CODITECT_DATA_ROOT"]) / "context-storage" / "sessions.db"
return

self.local_db = None

def _get_unsynced_records(
self,
table: str,
limit: int = 100,
) -> List[Dict[str, Any]]:
"""Get unsynced records from a table."""
if not self.local_db or not self.local_db.exists():
return []

conn = sqlite3.connect(self.local_db)
conn.row_factory = sqlite3.Row

try:
cursor = conn.execute(f"""
SELECT * FROM {table}
WHERE synced = 0 OR synced IS NULL
ORDER BY rowid
LIMIT ?
""", (limit,))
return [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError as e:
print(f"Table {table} not found or error: {e}", file=sys.stderr)
return []
finally:
conn.close()

def _mark_synced(
self,
table: str,
record_ids: List[str],
id_column: str = "id",
) -> int:
"""Mark records as synced in local database."""
if not self.local_db or not self.local_db.exists() or not record_ids:
return 0

conn = sqlite3.connect(self.local_db)
try:
placeholders = ",".join("?" * len(record_ids))
cursor = conn.execute(f"""
UPDATE {table}
SET synced = 1
WHERE {id_column} IN ({placeholders})
""", record_ids)
conn.commit()
return cursor.rowcount
finally:
conn.close()

def _queue_for_sync(self, event_type: str, data: Dict[str, Any]) -> None:
"""Queue failed sync for retry."""
if self._base_client:
self._base_client._queue_for_sync(event_type, data)

# =========================================================================
# SYNC METHODS
# =========================================================================

def sync_sessions(self) -> Dict[str, int]:
"""Sync agent sessions to cloud."""
records = self._get_unsynced_records("agent_sessions", self.BATCH_SIZE)

if not records:
return {"synced": 0, "queued": 0}

sessions = [AgentSessionEvent(**{k: v for k, v in r.items() if k != "synced"})
for r in records]

data = {
"sessions": [s.to_dict() for s in sessions],
}

if self._base_client:
result = self._base_client._api_request(
self.ENDPOINTS["sessions_batch"],
data
)

if result:
ids = [s.id for s in sessions]
self._mark_synced("agent_sessions", ids)
return {"synced": len(sessions), "queued": 0}

# Queue for later
self._queue_for_sync("ralph_sessions", data)
return {"synced": 0, "queued": len(sessions)}

def sync_checkpoints(self) -> Dict[str, int]:
"""Sync checkpoints to cloud."""
records = self._get_unsynced_records("checkpoints", self.BATCH_SIZE)

if not records:
return {"synced": 0, "queued": 0}

checkpoints = [CheckpointEvent(**{k: v for k, v in r.items() if k != "synced"})
for r in records]

data = {
"checkpoints": [c.to_dict() for c in checkpoints],
}

if self._base_client:
result = self._base_client._api_request(
self.ENDPOINTS["checkpoints_batch"],
data
)

if result:
ids = [c.id for c in checkpoints]
self._mark_synced("checkpoints", ids)
return {"synced": len(checkpoints), "queued": 0}

self._queue_for_sync("ralph_checkpoints", data)
return {"synced": 0, "queued": len(checkpoints)}

def sync_health_events(self) -> Dict[str, int]:
"""Sync health events to cloud."""
records = self._get_unsynced_records("health_events", self.BATCH_SIZE)

if not records:
return {"synced": 0, "queued": 0}

events = [HealthEventData(**{k: v for k, v in r.items() if k != "synced"})
for r in records]

data = {
"health_events": [e.to_dict() for e in events],
}

if self._base_client:
result = self._base_client._api_request(
self.ENDPOINTS["health_events_batch"],
data
)

if result:
ids = [str(e.id) for e in events]
self._mark_synced("health_events", ids)
return {"synced": len(events), "queued": 0}

self._queue_for_sync("ralph_health_events", data)
return {"synced": 0, "queued": len(events)}

def sync_token_records(self) -> Dict[str, int]:
"""Sync token records to cloud."""
records = self._get_unsynced_records("token_records", self.BATCH_SIZE)

if not records:
return {"synced": 0, "queued": 0}

token_records = [TokenRecordEvent(**{k: v for k, v in r.items()
if k not in ("synced", "total_tokens")})
for r in records]

data = {
"token_records": [t.to_dict() for t in token_records],
}

if self._base_client:
result = self._base_client._api_request(
self.ENDPOINTS["token_records_batch"],
data
)

if result:
ids = [t.id for t in token_records]
self._mark_synced("token_records", ids)
return {"synced": len(token_records), "queued": 0}

self._queue_for_sync("ralph_token_records", data)
return {"synced": 0, "queued": len(token_records)}

def sync_budgets(self) -> Dict[str, int]:
"""Sync budgets to cloud."""
records = self._get_unsynced_records("budgets", self.BATCH_SIZE)

if not records:
return {"synced": 0, "queued": 0}

budgets = [BudgetEvent(**{k: v for k, v in r.items() if k != "synced"})
for r in records]

data = {
"budgets": [b.to_dict() for b in budgets],
}

if self._base_client:
result = self._base_client._api_request(
self.ENDPOINTS["budgets_batch"],
data
)

if result:
ids = [b.id for b in budgets]
self._mark_synced("budgets", ids)
return {"synced": len(budgets), "queued": 0}

self._queue_for_sync("ralph_budgets", data)
return {"synced": 0, "queued": len(budgets)}

def sync_all(self) -> Dict[str, Any]:
"""
Sync all Ralph Wiggum tables to cloud.

Returns summary of sync results.
"""
results = {
"sessions": self.sync_sessions(),
"checkpoints": self.sync_checkpoints(),
"health_events": self.sync_health_events(),
"token_records": self.sync_token_records(),
"budgets": self.sync_budgets(),
}

# Calculate totals
total_synced = sum(r["synced"] for r in results.values())
total_queued = sum(r["queued"] for r in results.values())

results["total_synced"] = total_synced
results["total_queued"] = total_queued
results["timestamp"] = datetime.now(timezone.utc).isoformat()

return results

def get_sync_status(self) -> Dict[str, Any]:
"""Get current sync status for all Ralph Wiggum tables."""
status = {
"database_found": self.local_db is not None and self.local_db.exists(),
"database_path": str(self.local_db) if self.local_db else None,
"tables": {},
}

if not self.local_db or not self.local_db.exists():
return status

conn = sqlite3.connect(self.local_db)

tables = ["agent_sessions", "checkpoints", "health_events", "token_records", "budgets"]

for table in tables:
try:
# Count total records
cursor = conn.execute(f"SELECT COUNT(*) FROM {table}")
total = cursor.fetchone()[0]

# Count unsynced records
cursor = conn.execute(f"""
SELECT COUNT(*) FROM {table}
WHERE synced = 0 OR synced IS NULL
""")
unsynced = cursor.fetchone()[0]

status["tables"][table] = {
"total": total,
"unsynced": unsynced,
"synced": total - unsynced,
}
except sqlite3.OperationalError:
status["tables"][table] = {"error": "table not found"}

conn.close()

# Add cloud sync status from base client
if self._base_client:
cloud_status = self._base_client.get_sync_status()
status["cloud"] = cloud_status

return status

=============================================================================

CLI

=============================================================================

def main(): """CLI entry point for Ralph Wiggum sync operations.""" parser = argparse.ArgumentParser( description="CODITECT Ralph Wiggum Cloud Sync Client (H.8.5.4)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples:

Show sync status

%(prog)s --status

Sync all Ralph Wiggum tables

%(prog)s --sync

Sync specific tables

%(prog)s --sync-sessions %(prog)s --sync-checkpoints %(prog)s --sync-health %(prog)s --sync-tokens %(prog)s --sync-budgets """ )

parser.add_argument("--status", action="store_true",
help="Show sync status for all tables")
parser.add_argument("--sync", action="store_true",
help="Sync all Ralph Wiggum tables")
parser.add_argument("--sync-sessions", action="store_true",
help="Sync agent sessions only")
parser.add_argument("--sync-checkpoints", action="store_true",
help="Sync checkpoints only")
parser.add_argument("--sync-health", action="store_true",
help="Sync health events only")
parser.add_argument("--sync-tokens", action="store_true",
help="Sync token records only")
parser.add_argument("--sync-budgets", action="store_true",
help="Sync budgets only")
parser.add_argument("--json", action="store_true",
help="Output in JSON format")

args = parser.parse_args()

client = RalphWiggumSyncClient()

if args.status:
status = client.get_sync_status()
if args.json:
print(json.dumps(status, indent=2))
else:
print("Ralph Wiggum Sync Status")
print("=" * 50)
print(f"Database: {status.get('database_path', 'Not found')}")
print(f"Found: {status.get('database_found', False)}")
print()
print("Tables:")
for table, info in status.get("tables", {}).items():
if "error" in info:
print(f" {table}: {info['error']}")
else:
print(f" {table}:")
print(f" Total: {info['total']}")
print(f" Synced: {info['synced']}")
print(f" Unsynced: {info['unsynced']}")
return

if args.sync:
result = client.sync_all()
if args.json:
print(json.dumps(result, indent=2))
else:
print("Ralph Wiggum Sync Results")
print("=" * 50)
for table in ["sessions", "checkpoints", "health_events", "token_records", "budgets"]:
r = result.get(table, {})
print(f"{table}: {r.get('synced', 0)} synced, {r.get('queued', 0)} queued")
print("-" * 50)
print(f"Total: {result['total_synced']} synced, {result['total_queued']} queued")
return

if args.sync_sessions:
result = client.sync_sessions()
print(json.dumps(result, indent=2) if args.json else
f"Sessions: {result['synced']} synced, {result['queued']} queued")
return

if args.sync_checkpoints:
result = client.sync_checkpoints()
print(json.dumps(result, indent=2) if args.json else
f"Checkpoints: {result['synced']} synced, {result['queued']} queued")
return

if args.sync_health:
result = client.sync_health_events()
print(json.dumps(result, indent=2) if args.json else
f"Health Events: {result['synced']} synced, {result['queued']} queued")
return

if args.sync_tokens:
result = client.sync_token_records()
print(json.dumps(result, indent=2) if args.json else
f"Token Records: {result['synced']} synced, {result['queued']} queued")
return

if args.sync_budgets:
result = client.sync_budgets()
print(json.dumps(result, indent=2) if args.json else
f"Budgets: {result['synced']} synced, {result['queued']} queued")
return

# No action specified - show help
parser.print_help()

if name == "main": main()