#!/usr/bin/env python3 """ CODITECT Ralph Loop Orchestrator (H.8.6.1)
Autonomous agent loop orchestrator that combines checkpoint, health monitoring, and token economics services for self-sustaining development iterations.
Key Insight from Ralph Wiggum: "Single-context loops degrade; fresh-context iterations maintain quality."
The loop orchestrator coordinates:
- Task planning and decomposition
- Agent spawning with fresh context per iteration
- Health monitoring with graduated intervention
- Checkpoint creation at phase boundaries
- Token budget enforcement across iterations
- Handoff protocol between iterations
- Termination criteria evaluation
Usage: from scripts.core.ralph_wiggum.loop_orchestrator import LoopOrchestrator
orchestrator = LoopOrchestrator(task_id="H.8.6.1", project_id="PILOT")
result = orchestrator.run(
goal="Implement feature X with tests",
agent_type="senior-architect",
max_iterations=10,
)
Author: CODITECT Framework Version: 1.0.0 Created: 2026-02-16 Task Reference: H.8.6.1 ADR References: ADR-108, ADR-110, ADR-111, ADR-112 """
import json import logging import os import time import uuid from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Callable, Dict, List, Optional
from .checkpoint_protocol import ( Checkpoint, CheckpointMetadata, CheckpointService, ContextSummary, ExecutionPhase, ExecutionState, HandoffProtocol, HandoffTrigger, ) from .health_monitoring import ( AgentHealth, CircuitBreaker, CircuitBreakerState, HealthMonitoringService, HealthState, HeartbeatPayload, InterventionLevel, ) from .token_economics import ( BudgetAction, BudgetContext, TokenEconomicsService, TokenRecord, ) from .termination_criteria import ( TerminationCriteria, TerminationReason, TerminationResult, )
Configure logging
LOG_DIR = Path.home() / "PROJECTS" / ".coditect-data" / "logs" LOG_DIR.mkdir(parents=True, exist_ok=True) LOG_FILE = LOG_DIR / "ralph-loop-orchestrator.log" logging.basicConfig( filename=str(LOG_FILE), level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) logger = logging.getLogger("ralph-loop-orchestrator")
State storage
STATE_DIR = Path.home() / "PROJECTS" / ".coditect-data" / "ralph-loops"
=============================================================================
DATA CLASSES
=============================================================================
class LoopState(Enum): """Loop execution states.""" INITIALIZING = "initializing" RUNNING = "running" PAUSED = "paused" HANDOFF = "handoff" COMPLETING = "completing" COMPLETED = "completed" FAILED = "failed" TERMINATED = "terminated"
@dataclass class IterationResult: """Result from a single loop iteration.""" iteration: int agent_id: str started_at: str = "" completed_at: str = "" phase: str = ExecutionPhase.IMPLEMENTING.value completed_items: List[str] = field(default_factory=list) pending_items: List[str] = field(default_factory=list) errors: List[str] = field(default_factory=list) checkpoint_id: str = "" input_tokens: int = 0 output_tokens: int = 0 cost: float = 0.0 handoff_trigger: str = "" health_state: str = HealthState.HEALTHY.value
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass class LoopConfig: """Configuration for the autonomous loop.""" max_iterations: int = 10 max_cost: float = 50.0 max_duration_minutes: int = 120 context_threshold: float = 0.70 health_check_interval: int = 60 checkpoint_on_phase_change: bool = True checkpoint_every_n_iterations: int = 1 auto_handoff: bool = True model: str = "claude-opus-4-6" agent_type: str = "senior-architect" retry_on_failure: bool = True max_consecutive_errors: int = 3
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "LoopConfig":
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
@dataclass class LoopStatus: """Current status of an autonomous loop.""" loop_id: str task_id: str project_id: str state: str = LoopState.INITIALIZING.value goal: str = "" config: Dict[str, Any] = field(default_factory=dict) current_iteration: int = 0 iterations: List[Dict[str, Any]] = field(default_factory=list) total_input_tokens: int = 0 total_output_tokens: int = 0 total_cost: float = 0.0 started_at: str = "" completed_at: str = "" termination_reason: str = "" last_checkpoint_id: str = "" errors: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "LoopStatus":
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
=============================================================================
LOOP ORCHESTRATOR
=============================================================================
class LoopOrchestrator: """ Orchestrates autonomous agent execution loops.
Combines CheckpointService (ADR-108), HealthMonitoringService (ADR-110),
and TokenEconomicsService (ADR-111) into a unified loop controller.
Loop Lifecycle:
1. Initialize → Load goal, config, previous checkpoints
2. Plan → Decompose goal into iteration tasks
3. Execute → Run iterations with fresh context
4. Monitor → Health checks, budget enforcement
5. Checkpoint → Save state at boundaries
6. Handoff → Generate continuation prompt for next iteration
7. Evaluate → Check termination criteria
8. Complete → Save final checkpoint, generate report
"""
def __init__(
self,
task_id: str,
project_id: str = "",
loop_id: str = "",
):
self.task_id = task_id
self.project_id = project_id or os.environ.get("CODITECT_PROJECT", "")
self.loop_id = loop_id or str(uuid.uuid4())
# Initialize services
self.checkpoint_service = CheckpointService()
self.token_service = TokenEconomicsService()
self.termination = TerminationCriteria()
# Circuit breaker for the loop itself
self.circuit_breaker = CircuitBreaker(
name=f"loop-{self.loop_id}",
failure_threshold=3,
recovery_timeout_seconds=60.0,
)
# State
self._status: Optional[LoopStatus] = None
self._iterations: List[IterationResult] = []
# Storage
self._state_dir = STATE_DIR / self.loop_id
self._state_dir.mkdir(parents=True, exist_ok=True)
logger.info(
f"LoopOrchestrator initialized: loop={self.loop_id}, "
f"task={task_id}, project={project_id}"
)
@property
def status(self) -> LoopStatus:
"""Get current loop status."""
if self._status is None:
self._status = self._load_status()
return self._status
def initialize(
self,
goal: str,
config: Optional[LoopConfig] = None,
resume_from_checkpoint: str = "",
) -> LoopStatus:
"""
Initialize or resume an autonomous loop.
Args:
goal: High-level goal for the loop
config: Loop configuration
resume_from_checkpoint: Checkpoint ID to resume from
Returns:
Initial LoopStatus
"""
config = config or LoopConfig()
now = datetime.now(timezone.utc).isoformat()
if resume_from_checkpoint:
# Resume from existing checkpoint
checkpoint = self.checkpoint_service.get_checkpoint(
self.task_id, resume_from_checkpoint
)
if checkpoint:
logger.info(f"Resuming from checkpoint {resume_from_checkpoint}")
self._status = self._load_status()
self._status.state = LoopState.RUNNING.value
self._status.last_checkpoint_id = resume_from_checkpoint
self._save_status()
return self._status
# Fresh initialization
self._status = LoopStatus(
loop_id=self.loop_id,
task_id=self.task_id,
project_id=self.project_id,
state=LoopState.INITIALIZING.value,
goal=goal,
config=config.to_dict(),
started_at=now,
)
self._save_status()
logger.info(f"Loop initialized: goal='{goal[:100]}', config={config.to_dict()}")
return self._status
def plan_iteration(self, iteration: int) -> Dict[str, Any]:
"""
Plan the next iteration based on current state.
Returns a plan dict with:
- agent_type: Which agent to spawn
- prompt: The continuation prompt
- context: Relevant context from checkpoints
- phase: Expected execution phase
"""
status = self.status
config = LoopConfig.from_dict(status.config)
# Get latest checkpoint for context
latest_checkpoint = self.checkpoint_service.get_latest_checkpoint(self.task_id)
if latest_checkpoint and iteration > 1:
# Generate continuation prompt from checkpoint
continuation = HandoffProtocol.generate_continuation_prompt(latest_checkpoint)
phase = latest_checkpoint.execution_state.phase
else:
# First iteration — use goal directly
continuation = f"## Task Goal\n\n{status.goal}\n\n## Instructions\n\nThis is iteration {iteration}. Execute the goal above."
phase = ExecutionPhase.PLANNING.value
plan = {
"iteration": iteration,
"agent_type": config.agent_type,
"model": config.model,
"prompt": continuation,
"phase": phase,
"checkpoint_id": latest_checkpoint.metadata.checkpoint_id if latest_checkpoint else "",
"budget_remaining": self._get_budget_remaining(config),
}
logger.info(f"Iteration {iteration} planned: phase={phase}, agent={config.agent_type}")
return plan
def record_iteration(self, result: IterationResult) -> None:
"""Record the result of a completed iteration."""
status = self.status
status.current_iteration = result.iteration
status.total_input_tokens += result.input_tokens
status.total_output_tokens += result.output_tokens
status.total_cost += result.cost
status.iterations.append(result.to_dict())
status.last_checkpoint_id = result.checkpoint_id
if result.errors:
status.errors.extend(result.errors)
self._iterations.append(result)
self._save_status()
# Record token consumption
self.token_service.record_consumption(
task_id=self.task_id,
model=LoopConfig.from_dict(status.config).model,
input_tokens=result.input_tokens,
output_tokens=result.output_tokens,
)
logger.info(
f"Iteration {result.iteration} recorded: "
f"items={len(result.completed_items)}, errors={len(result.errors)}, "
f"cost=${result.cost:.2f}, health={result.health_state}"
)
def create_iteration_checkpoint(
self,
iteration: int,
execution_state: ExecutionState,
context_summary: Optional[ContextSummary] = None,
) -> str:
"""
Create a checkpoint at the end of an iteration.
Returns the checkpoint ID.
"""
agent_id = f"ralph-loop-{self.loop_id}-iter-{iteration}"
checkpoint = self.checkpoint_service.create_checkpoint(
task_id=self.task_id,
agent_id=agent_id,
execution_state=execution_state,
context_summary=context_summary or ContextSummary(),
)
checkpoint_id = checkpoint.metadata.checkpoint_id
self.status.last_checkpoint_id = checkpoint_id
self._save_status()
logger.info(f"Checkpoint created: {checkpoint_id[:8]} for iteration {iteration}")
return checkpoint_id
def evaluate_termination(self) -> TerminationResult:
"""
Evaluate whether the loop should terminate.
Checks all termination criteria:
- Goal achieved
- Budget exhausted
- Max iterations reached
- Max duration exceeded
- Health failure
- Consecutive errors
"""
status = self.status
config = LoopConfig.from_dict(status.config)
return self.termination.evaluate(
current_iteration=status.current_iteration,
max_iterations=config.max_iterations,
total_cost=status.total_cost,
max_cost=config.max_cost,
started_at=status.started_at,
max_duration_minutes=config.max_duration_minutes,
consecutive_errors=self._count_consecutive_errors(),
max_consecutive_errors=config.max_consecutive_errors,
last_health_state=self._get_last_health_state(),
completed_items=self._get_all_completed_items(),
goal=status.goal,
)
def check_handoff(
self,
context_utilization: float = 0.0,
consecutive_errors: int = 0,
phase_complete: bool = False,
) -> Optional[HandoffTrigger]:
"""
Check if a handoff should be triggered.
Returns the trigger type if handoff needed, None otherwise.
"""
status = self.status
config = LoopConfig.from_dict(status.config)
if not config.auto_handoff:
return None
# Calculate token budget utilization
budget_used = status.total_cost / config.max_cost if config.max_cost > 0 else 0
return HandoffProtocol.should_handoff(
context_utilization=context_utilization,
consecutive_errors=consecutive_errors,
token_budget_used=budget_used,
phase_complete=phase_complete,
)
def complete(self, reason: str = "goal_achieved") -> LoopStatus:
"""Mark the loop as completed."""
status = self.status
status.state = LoopState.COMPLETED.value
status.completed_at = datetime.now(timezone.utc).isoformat()
status.termination_reason = reason
# Create final checkpoint
execution_state = ExecutionState(
phase=ExecutionPhase.COMPLETE.value,
completed_items=self._get_all_completed_items(),
pending_items=[],
blocked_items=[],
current_focus="Loop completed",
)
self.create_iteration_checkpoint(
iteration=status.current_iteration,
execution_state=execution_state,
context_summary=ContextSummary(
key_decisions=[f"Loop terminated: {reason}"],
),
)
self._save_status()
self._save_report()
logger.info(
f"Loop completed: reason={reason}, iterations={status.current_iteration}, "
f"cost=${status.total_cost:.2f}"
)
return status
def fail(self, error: str) -> LoopStatus:
"""Mark the loop as failed."""
status = self.status
status.state = LoopState.FAILED.value
status.completed_at = datetime.now(timezone.utc).isoformat()
status.termination_reason = f"failure: {error}"
status.errors.append(error)
self._save_status()
logger.error(f"Loop failed: {error}")
return status
def get_report(self) -> Dict[str, Any]:
"""Generate a summary report of the loop execution."""
status = self.status
config = LoopConfig.from_dict(status.config)
return {
"loop_id": self.loop_id,
"task_id": self.task_id,
"project_id": self.project_id,
"goal": status.goal,
"state": status.state,
"iterations": status.current_iteration,
"max_iterations": config.max_iterations,
"total_tokens": {
"input": status.total_input_tokens,
"output": status.total_output_tokens,
"total": status.total_input_tokens + status.total_output_tokens,
},
"total_cost": round(status.total_cost, 2),
"budget": round(config.max_cost, 2),
"budget_utilization": round(
status.total_cost / config.max_cost if config.max_cost > 0 else 0, 4
),
"started_at": status.started_at,
"completed_at": status.completed_at,
"termination_reason": status.termination_reason,
"completed_items": self._get_all_completed_items(),
"error_count": len(status.errors),
}
# =========================================================================
# PRIVATE METHODS
# =========================================================================
def _get_budget_remaining(self, config: LoopConfig) -> float:
"""Get remaining budget for the loop."""
return max(0, config.max_cost - self.status.total_cost)
def _count_consecutive_errors(self) -> int:
"""Count consecutive error iterations from the end."""
count = 0
for iteration in reversed(self.status.iterations):
if iteration.get("errors"):
count += 1
else:
break
return count
def _get_last_health_state(self) -> str:
"""Get the health state from the last iteration."""
if self.status.iterations:
return self.status.iterations[-1].get("health_state", HealthState.HEALTHY.value)
return HealthState.HEALTHY.value
def _get_all_completed_items(self) -> List[str]:
"""Collect all completed items across iterations."""
items = []
for iteration in self.status.iterations:
items.extend(iteration.get("completed_items", []))
return items
def _load_status(self) -> LoopStatus:
"""Load loop status from disk."""
status_file = self._state_dir / "status.json"
if status_file.exists():
try:
data = json.loads(status_file.read_text())
return LoopStatus.from_dict(data)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Failed to load status: {e}")
return LoopStatus(
loop_id=self.loop_id,
task_id=self.task_id,
project_id=self.project_id,
)
def _save_status(self) -> None:
"""Persist loop status to disk."""
status_file = self._state_dir / "status.json"
status_file.write_text(json.dumps(self.status.to_dict(), indent=2))
def _save_report(self) -> None:
"""Save the loop execution report."""
report_file = self._state_dir / "report.json"
report_file.write_text(json.dumps(self.get_report(), indent=2))
=============================================================================
CONVENIENCE: Run a loop from CLI
=============================================================================
def run_loop( task_id: str, goal: str, project_id: str = "", max_iterations: int = 10, max_cost: float = 50.0, agent_type: str = "senior-architect", model: str = "claude-opus-4-6", ) -> LoopStatus: """ Convenience function to initialize and return a configured loop orchestrator.
The actual iteration execution happens in the /ralph-loop command or
ralph-loop-monitor agent, which call plan_iteration(), record_iteration(),
and evaluate_termination() in a loop.
Returns the initialized LoopStatus (ready for first iteration).
"""
config = LoopConfig(
max_iterations=max_iterations,
max_cost=max_cost,
agent_type=agent_type,
model=model,
)
orchestrator = LoopOrchestrator(
task_id=task_id,
project_id=project_id,
)
return orchestrator.initialize(goal=goal, config=config)