Skip to main content

#!/usr/bin/env python3 """ CODITECT Ralph Wiggum Cloud Sync Bridge (H.8.5.4)

Bridges the in-memory/JSON Ralph Wiggum modules to the SQLite persistence layer in sessions.db, enabling cloud sync via RalphWiggumSyncClient.

The Ralph Wiggum modules (checkpoint_protocol, health_monitoring, token_economics, loop_orchestrator) store state in JSON files and in-memory dicts. This bridge writes structured records into SQLite tables that the sync client reads.

Integration surfaces:

  1. persist_checkpoint() → checkpoints table
  2. persist_health_event() → health_events table
  3. persist_token_record() → token_records table
  4. persist_budget() → budgets table
  5. persist_session() → agent_sessions table
  6. harvest_json_files() → bulk import from JSON storage

Author: CODITECT Framework Task: H.8.5.4 Version: 1.0.0 Created: February 2026 """

import json import logging import sqlite3 import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional

logger = logging.getLogger(name)

Default sessions.db location

_DEFAULT_DB = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db"

class CloudSyncBridge: """Bridge between Ralph Wiggum modules and SQLite persistence for cloud sync.

Usage:
bridge = CloudSyncBridge()
bridge.persist_checkpoint(checkpoint_data)
bridge.persist_health_event(agent_id, "healthy", "degraded", reason="high context")
"""

def __init__(self, db_path: Optional[Path] = None):
"""Initialize with optional database path.

Args:
db_path: Path to sessions.db. If None, uses default location
or falls back to paths module discovery.
"""
self._db_path = db_path or self._discover_db()
self._ensure_tables()

def _discover_db(self) -> Path:
"""Discover sessions.db location using paths module or defaults."""
try:
from scripts.core.paths import get_sessions_db_path
return get_sessions_db_path()
except ImportError:
pass

if _DEFAULT_DB.parent.exists():
return _DEFAULT_DB

# Last resort
return _DEFAULT_DB

def _ensure_tables(self) -> None:
"""Ensure Ralph Wiggum tables exist (idempotent)."""
if not self._db_path.parent.exists():
self._db_path.parent.mkdir(parents=True, exist_ok=True)

migration_sql = Path(__file__).parent.parent.parent / "migrations" / "add-ralph-wiggum-tables.sql"
if migration_sql.exists():
conn = sqlite3.connect(self._db_path)
try:
conn.executescript(migration_sql.read_text())
conn.commit()
except sqlite3.OperationalError:
pass # Tables already exist
finally:
conn.close()

def _conn(self) -> sqlite3.Connection:
"""Get a database connection."""
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
return conn

# =========================================================================
# PERSIST METHODS
# =========================================================================

def persist_session(
self,
session_id: str,
task_id: Optional[str] = None,
agent_type: str = "implementation",
loop_type: str = "ralph",
status: str = "running",
max_iterations: int = 100,
config: Optional[Dict] = None,
) -> str:
"""Persist an agent session to SQLite.

Returns the session ID.
"""
conn = self._conn()
try:
conn.execute(
"""INSERT OR REPLACE INTO agent_sessions
(id, task_id, agent_type, loop_type, started_at, status,
max_iterations, config, synced)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)""",
(
session_id,
task_id,
agent_type,
loop_type,
datetime.now(timezone.utc).isoformat(),
status,
max_iterations,
json.dumps(config) if config else None,
),
)
conn.commit()
return session_id
finally:
conn.close()

def update_session(
self,
session_id: str,
status: Optional[str] = None,
total_iterations: Optional[int] = None,
final_outcome: Optional[str] = None,
) -> bool:
"""Update an existing agent session.

Returns True if a row was updated.
"""
updates = []
params = []

if status is not None:
updates.append("status = ?")
params.append(status)
if status in ("completed", "failed", "terminated"):
updates.append("completed_at = ?")
params.append(datetime.now(timezone.utc).isoformat())

if total_iterations is not None:
updates.append("total_iterations = ?")
params.append(total_iterations)

if final_outcome is not None:
updates.append("final_outcome = ?")
params.append(final_outcome)

if not updates:
return False

updates.append("synced = 0")
params.append(session_id)

conn = self._conn()
try:
cursor = conn.execute(
f"UPDATE agent_sessions SET {', '.join(updates)} WHERE id = ?",
params,
)
conn.commit()
return cursor.rowcount > 0
finally:
conn.close()

def persist_checkpoint(self, checkpoint_data: Dict[str, Any]) -> str:
"""Persist a checkpoint record to SQLite.

Args:
checkpoint_data: Dict with keys matching the checkpoints table columns.
Required: task_id, agent_id, phase, hash
Optional: id, session_id, iteration, completed_items, pending_items,
blocked_items, current_focus, context_summary, tokens_consumed,
tools_invoked, files_modified, tests_passed, tests_failed,
tests_skipped, coverage_percent, last_successful_state,
rollback_instructions, continuation_prompt, signature

Returns the checkpoint ID.
"""
cp_id = checkpoint_data.get("id") or str(uuid.uuid4())

conn = self._conn()
try:
conn.execute(
"""INSERT OR REPLACE INTO checkpoints
(id, session_id, task_id, agent_id, agent_type, iteration,
phase, completed_items, pending_items, blocked_items,
current_focus, context_summary, tokens_consumed, tools_invoked,
files_modified, tests_passed, tests_failed, tests_skipped,
coverage_percent, last_successful_state, rollback_instructions,
continuation_prompt, hash, signature, created_at, synced)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,0)""",
(
cp_id,
checkpoint_data.get("session_id"),
checkpoint_data.get("task_id", ""),
checkpoint_data.get("agent_id", ""),
checkpoint_data.get("agent_type", "implementation"),
checkpoint_data.get("iteration", 1),
checkpoint_data.get("phase", "planning"),
_json_or_none(checkpoint_data.get("completed_items")),
_json_or_none(checkpoint_data.get("pending_items")),
_json_or_none(checkpoint_data.get("blocked_items")),
checkpoint_data.get("current_focus", ""),
_json_or_none(checkpoint_data.get("context_summary")),
checkpoint_data.get("tokens_consumed", 0),
checkpoint_data.get("tools_invoked", 0),
_json_or_none(checkpoint_data.get("files_modified")),
checkpoint_data.get("tests_passed", 0),
checkpoint_data.get("tests_failed", 0),
checkpoint_data.get("tests_skipped", 0),
checkpoint_data.get("coverage_percent"),
checkpoint_data.get("last_successful_state"),
checkpoint_data.get("rollback_instructions"),
checkpoint_data.get("continuation_prompt"),
checkpoint_data.get("hash", ""),
checkpoint_data.get("signature"),
checkpoint_data.get("created_at", datetime.now(timezone.utc).isoformat()),
),
)
conn.commit()
return cp_id
finally:
conn.close()

def persist_health_event(
self,
agent_id: str,
new_state: str,
previous_state: Optional[str] = None,
session_id: Optional[str] = None,
task_id: Optional[str] = None,
state_reason: Optional[str] = None,
heartbeat_data: Optional[Dict] = None,
intervention_level: Optional[str] = None,
intervention_action: Optional[str] = None,
intervention_result: Optional[str] = None,
circuit_breaker_target: Optional[str] = None,
circuit_breaker_state: Optional[str] = None,
circuit_breaker_failures: Optional[int] = None,
) -> int:
"""Persist a health event to SQLite.

Returns the auto-increment event ID.
"""
conn = self._conn()
try:
cursor = conn.execute(
"""INSERT INTO health_events
(agent_id, session_id, task_id, timestamp,
previous_state, new_state, state_reason,
heartbeat_data, intervention_level, intervention_action,
intervention_result, circuit_breaker_target,
circuit_breaker_state, circuit_breaker_failures, synced)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,0)""",
(
agent_id,
session_id,
task_id,
datetime.now(timezone.utc).isoformat(),
previous_state,
new_state,
state_reason,
json.dumps(heartbeat_data) if heartbeat_data else None,
intervention_level,
intervention_action,
intervention_result,
circuit_breaker_target,
circuit_breaker_state,
circuit_breaker_failures,
),
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()

def persist_token_record(
self,
model: str,
input_tokens: int = 0,
output_tokens: int = 0,
record_id: Optional[str] = None,
session_id: Optional[str] = None,
task_id: Optional[str] = None,
agent_id: Optional[str] = None,
iteration: Optional[int] = None,
checkpoint_id: Optional[str] = None,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
input_cost_usd: Optional[float] = None,
output_cost_usd: Optional[float] = None,
cache_cost_usd: Optional[float] = None,
total_cost_usd: Optional[float] = None,
tool_calls: Optional[List[str]] = None,
latency_ms: Optional[int] = None,
success: bool = True,
) -> str:
"""Persist a token consumption record to SQLite.

Returns the record ID.
"""
rec_id = record_id or str(uuid.uuid4())

conn = self._conn()
try:
conn.execute(
"""INSERT OR REPLACE INTO token_records
(id, timestamp, session_id, task_id, agent_id,
iteration, checkpoint_id, model,
input_tokens, output_tokens,
cache_read_tokens, cache_write_tokens,
input_cost_usd, output_cost_usd,
cache_cost_usd, total_cost_usd,
tool_calls, latency_ms, success, synced)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,0)""",
(
rec_id,
datetime.now(timezone.utc).isoformat(),
session_id,
task_id,
agent_id,
iteration,
checkpoint_id,
model,
input_tokens,
output_tokens,
cache_read_tokens,
cache_write_tokens,
input_cost_usd,
output_cost_usd,
cache_cost_usd,
total_cost_usd,
json.dumps(tool_calls) if tool_calls else None,
latency_ms,
1 if success else 0,
),
)
conn.commit()
return rec_id
finally:
conn.close()

def persist_budget(
self,
budget_id: str,
scope: str,
scope_id: str,
limit_type: str,
limit_value: float,
current_value: float = 0.0,
parent_budget_id: Optional[str] = None,
period: Optional[str] = None,
period_start: Optional[str] = None,
action: str = "alert_only",
alert_threshold_percent: int = 80,
status: str = "active",
) -> str:
"""Persist a budget record to SQLite.

Returns the budget ID.
"""
conn = self._conn()
try:
conn.execute(
"""INSERT OR REPLACE INTO budgets
(id, scope, scope_id, parent_budget_id,
limit_type, limit_value, current_value,
period, period_start, action,
alert_threshold_percent, status, synced)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,0)""",
(
budget_id,
scope,
scope_id,
parent_budget_id,
limit_type,
limit_value,
current_value,
period,
period_start,
action,
alert_threshold_percent,
status,
),
)
conn.commit()
return budget_id
finally:
conn.close()

# =========================================================================
# HEALTH MONITORING CALLBACK
# =========================================================================

def health_state_change_callback(
self,
agent_id: str,
old_state: str,
new_state: str,
health: Any,
) -> None:
"""Callback compatible with HealthMonitoringService.on_state_change().

Register with:
bridge = CloudSyncBridge()
health_service.on_state_change(bridge.health_state_change_callback)
"""
try:
task_id = getattr(health, "task_id", None)
self.persist_health_event(
agent_id=agent_id,
new_state=new_state,
previous_state=old_state,
task_id=task_id,
state_reason=f"State transition: {old_state} -> {new_state}",
)
except Exception as e:
logger.warning(f"Failed to persist health event: {e}")

# =========================================================================
# JSON HARVESTER
# =========================================================================

def harvest_checkpoints(self, checkpoints_dir: Optional[Path] = None) -> int:
"""Harvest checkpoint JSON files into SQLite.

Args:
checkpoints_dir: Directory containing checkpoint JSON files.
Defaults to ~/.coditect-data/checkpoints/

Returns count of records imported.
"""
if checkpoints_dir is None:
checkpoints_dir = Path.home() / "PROJECTS" / ".coditect-data" / "checkpoints"

if not checkpoints_dir.exists():
return 0

count = 0
conn = self._conn()

try:
for task_dir in checkpoints_dir.iterdir():
if not task_dir.is_dir():
continue
for cp_file in task_dir.glob("*.json"):
try:
data = json.loads(cp_file.read_text())
# Check if already imported
cp_id = data.get("id") or data.get("checkpoint_id") or cp_file.stem
existing = conn.execute(
"SELECT id FROM checkpoints WHERE id = ?", (cp_id,)
).fetchone()
if existing:
continue

data["id"] = cp_id
if "task_id" not in data:
data["task_id"] = task_dir.name
if "hash" not in data:
data["hash"] = ""
if "agent_id" not in data:
data["agent_id"] = data.get("agent", "unknown")
if "phase" not in data:
data["phase"] = data.get("execution_phase", "unknown")

self.persist_checkpoint(data)
count += 1
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"Skipping {cp_file}: {e}")
finally:
conn.close()

logger.info(f"Harvested {count} checkpoints from JSON files")
return count

def harvest_token_records(self, token_dir: Optional[Path] = None) -> int:
"""Harvest token record JSONL files into SQLite.

Args:
token_dir: Directory containing token record JSONL files.
Defaults to ~/.coditect-data/token-economics/records/

Returns count of records imported.
"""
if token_dir is None:
token_dir = Path.home() / "PROJECTS" / ".coditect-data" / "token-economics" / "records"

if not token_dir.exists():
return 0

count = 0
conn = self._conn()

try:
for jsonl_file in token_dir.glob("*.jsonl"):
for line in jsonl_file.read_text().splitlines():
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
rec_id = data.get("id") or str(uuid.uuid4())

existing = conn.execute(
"SELECT id FROM token_records WHERE id = ?", (rec_id,)
).fetchone()
if existing:
continue

self.persist_token_record(
record_id=rec_id,
model=data.get("model", "unknown"),
input_tokens=data.get("input_tokens", 0),
output_tokens=data.get("output_tokens", 0),
cache_read_tokens=data.get("cache_read_tokens", 0),
cache_write_tokens=data.get("cache_write_tokens", 0),
input_cost_usd=data.get("input_cost_usd"),
output_cost_usd=data.get("output_cost_usd"),
total_cost_usd=data.get("total_cost_usd"),
session_id=data.get("session_id"),
task_id=data.get("task_id"),
agent_id=data.get("agent_id"),
iteration=data.get("iteration"),
latency_ms=data.get("latency_ms"),
success=data.get("success", True),
)
count += 1
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Skipping line in {jsonl_file}: {e}")
finally:
conn.close()

logger.info(f"Harvested {count} token records from JSONL files")
return count

def harvest_all(self) -> Dict[str, int]:
"""Harvest all JSON-stored data into SQLite for sync.

Returns dict of table: count imported.
"""
results = {
"checkpoints": self.harvest_checkpoints(),
"token_records": self.harvest_token_records(),
}
total = sum(results.values())
logger.info(f"Harvest complete: {total} total records imported")
return results

# =========================================================================
# STATUS / QUERY
# =========================================================================

def get_unsynced_counts(self) -> Dict[str, int]:
"""Return count of unsynced records per table."""
tables = ["agent_sessions", "checkpoints", "health_events", "token_records", "budgets"]
counts = {}

conn = self._conn()
try:
for table in tables:
try:
cursor = conn.execute(
f"SELECT COUNT(*) FROM {table} WHERE synced = 0 OR synced IS NULL"
)
counts[table] = cursor.fetchone()[0]
except sqlite3.OperationalError:
counts[table] = 0
finally:
conn.close()

return counts

def get_status(self) -> Dict[str, Any]:
"""Return bridge status summary."""
unsynced = self.get_unsynced_counts()
return {
"db_path": str(self._db_path),
"db_exists": self._db_path.exists(),
"unsynced": unsynced,
"total_unsynced": sum(unsynced.values()),
}

def _json_or_none(value: Any) -> Optional[str]: """Convert a value to JSON string, or None if already None/str.""" if value is None: return None if isinstance(value, str): return value return json.dumps(value)

all = [ "CloudSyncBridge", ]