Skip to main content

#!/usr/bin/env python3 """ CODITECT Retry Engine - Resilient Operation Execution with Backoff

Part of Track H.2.5: Inter-Agent Communication Infrastructure Based on AUTONOMOUS-AGENT-SYSTEM-DESIGN.md specifications

This module provides:

  • RetryEngine: Configurable retry execution with multiple backoff strategies
  • RetryPolicy: Delay calculation with jitter support
  • @retry decorator: Easy function wrapping
  • RetryWithCircuitBreaker: Coordinated fault tolerance
  • Comprehensive metrics and monitoring

Backoff Strategies: ┌─────────────────────────────────────────────────────────────────┐ │ EXPONENTIAL: delay = base * (multiplier ^ attempt) │ │ LINEAR: delay = base + (increment * attempt) │ │ FIXED: delay = base (constant) │ │ FIBONACCI: delay = base * fib(attempt) │ │ DECORRELATED: delay = random(base, previous_delay * 3) │ └─────────────────────────────────────────────────────────────────┘

Jitter Types (prevent thundering herd): ┌─────────────────────────────────────────────────────────────────┐ │ NONE: No jitter (exact delay) │ │ FULL: random(0, delay) │ │ EQUAL: delay/2 + random(0, delay/2) │ │ DECORRELATED: random(base, delay * 3) - AWS recommended │ └─────────────────────────────────────────────────────────────────┘

Usage: from scripts.core.retry_engine import ( RetryEngine, RetryConfig, retry, RetryWithCircuitBreaker, )

# Using decorator
@retry(max_attempts=5, backoff="exponential")
async def call_api(endpoint: str):
return await http_client.get(endpoint)

# Using engine directly
engine = RetryEngine(RetryConfig(
max_attempts=5,
base_delay=1.0,
max_delay=60.0,
backoff_strategy=BackoffStrategy.EXPONENTIAL,
jitter=JitterType.FULL,
))

result = await engine.execute(risky_function, arg1, arg2)

# With circuit breaker integration
resilient = RetryWithCircuitBreaker(
retry_config=RetryConfig(max_attempts=3),
circuit_breaker=breaker,
)
result = await resilient.execute(api_call)

Author: CODITECT Framework Created: January 8, 2026 Version: 1.0.0 """

import asyncio import functools import logging import random import time from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum from typing import ( Any, Callable, Dict, List, Optional, Set, Tuple, Type, TypeVar, Union, )

Configure logging

logger = logging.getLogger(name)

Type variables

T = TypeVar("T") F = TypeVar("F", bound=Callable[..., Any])

=============================================================================

Enums

=============================================================================

class BackoffStrategy(Enum): """Backoff strategy for retry delays.""" EXPONENTIAL = "exponential" # delay = base * (multiplier ^ attempt) LINEAR = "linear" # delay = base + (increment * attempt) FIXED = "fixed" # delay = base (constant) FIBONACCI = "fibonacci" # delay = base * fib(attempt) DECORRELATED = "decorrelated" # AWS-style decorrelated jitter

class JitterType(Enum): """Jitter type to add randomness to delays.""" NONE = "none" # No jitter FULL = "full" # random(0, delay) EQUAL = "equal" # delay/2 + random(0, delay/2) DECORRELATED = "decorrelated" # random(base, delay * 3)

class RetryDecision(Enum): """Decision for whether to retry.""" RETRY = "retry" STOP = "stop" RAISE = "raise"

=============================================================================

Exceptions

=============================================================================

class RetryError(Exception): """Base exception for retry errors.""" pass

class MaxRetriesExceeded(RetryError): """Raised when max retries exceeded.""" def init( self, message: str, attempts: int, last_exception: Optional[Exception] = None, ): self.attempts = attempts self.last_exception = last_exception super().init(message)

class RetryTimeout(RetryError): """Raised when total retry time exceeded.""" def init(self, message: str, elapsed: float, timeout: float): self.elapsed = elapsed self.timeout = timeout super().init(message)

=============================================================================

Data Classes

=============================================================================

@dataclass class RetryConfig: """ Configuration for retry behavior.

Attributes:
max_attempts: Maximum number of attempts (including first try)
base_delay: Base delay in seconds
max_delay: Maximum delay cap in seconds
multiplier: Multiplier for exponential backoff
increment: Increment for linear backoff
backoff_strategy: Strategy for calculating delays
jitter: Jitter type to add randomness
jitter_range: Range for jitter (0.0 to 1.0)
total_timeout: Total timeout for all attempts (None = no limit)
retry_exceptions: Exception types to retry (None = all)
ignore_exceptions: Exception types to not retry
on_retry: Callback(attempt, delay, exception) before retry
on_success: Callback(attempt, result) on success
on_failure: Callback(attempt, exception) on final failure
"""
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
multiplier: float = 2.0
increment: float = 1.0
backoff_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL
jitter: JitterType = JitterType.FULL
jitter_range: float = 1.0
total_timeout: Optional[float] = None
retry_exceptions: Optional[Set[Type[Exception]]] = None
ignore_exceptions: Set[Type[Exception]] = field(default_factory=set)
on_retry: Optional[Callable[[int, float, Exception], None]] = None
on_success: Optional[Callable[[int, Any], None]] = None
on_failure: Optional[Callable[[int, Exception], None]] = None

def __post_init__(self):
if self.max_attempts < 1:
raise ValueError("max_attempts must be at least 1")
if self.base_delay < 0:
raise ValueError("base_delay must be non-negative")
if self.max_delay < self.base_delay:
raise ValueError("max_delay must be >= base_delay")

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary (without callables)."""
return {
"max_attempts": self.max_attempts,
"base_delay": self.base_delay,
"max_delay": self.max_delay,
"multiplier": self.multiplier,
"increment": self.increment,
"backoff_strategy": self.backoff_strategy.value,
"jitter": self.jitter.value,
"jitter_range": self.jitter_range,
"total_timeout": self.total_timeout,
}

@dataclass class RetryAttempt: """Record of a single retry attempt.""" attempt_number: int started_at: datetime ended_at: Optional[datetime] = None duration_ms: float = 0.0 success: bool = False exception_type: Optional[str] = None exception_message: Optional[str] = None delay_before: float = 0.0 delay_after: float = 0.0

@dataclass class RetryMetrics: """Metrics for retry operations.""" total_attempts: int = 0 successful_attempts: int = 0 failed_attempts: int = 0 total_retries: int = 0 total_delay_seconds: float = 0.0 avg_attempts_per_success: float = 0.0 last_attempt_time: Optional[datetime] = None attempts_by_exception: Dict[str, int] = field(default_factory=dict) success_rate: float = 0.0

=============================================================================

Retry Policy

=============================================================================

class RetryPolicy: """ Calculates retry delays based on configuration.

Supports multiple backoff strategies and jitter types.
"""

def __init__(self, config: RetryConfig):
self.config = config
self._fibonacci_cache: Dict[int, int] = {0: 0, 1: 1}
self._last_delay: float = config.base_delay

def calculate_delay(self, attempt: int) -> float:
"""
Calculate delay for a given attempt number.

Args:
attempt: Attempt number (0-indexed)

Returns:
Delay in seconds
"""
# Calculate base delay based on strategy
if self.config.backoff_strategy == BackoffStrategy.EXPONENTIAL:
delay = self.config.base_delay * (
self.config.multiplier ** attempt
)
elif self.config.backoff_strategy == BackoffStrategy.LINEAR:
delay = self.config.base_delay + (
self.config.increment * attempt
)
elif self.config.backoff_strategy == BackoffStrategy.FIXED:
delay = self.config.base_delay
elif self.config.backoff_strategy == BackoffStrategy.FIBONACCI:
delay = self.config.base_delay * self._fibonacci(attempt + 1)
elif self.config.backoff_strategy == BackoffStrategy.DECORRELATED:
# AWS-style decorrelated jitter
delay = random.uniform(
self.config.base_delay,
self._last_delay * 3,
)
self._last_delay = delay
else:
delay = self.config.base_delay

# Apply max delay cap
delay = min(delay, self.config.max_delay)

# Apply jitter
delay = self._apply_jitter(delay)

return delay

def _fibonacci(self, n: int) -> int:
"""Calculate fibonacci number with memoization."""
if n in self._fibonacci_cache:
return self._fibonacci_cache[n]

result = self._fibonacci(n - 1) + self._fibonacci(n - 2)
self._fibonacci_cache[n] = result
return result

def _apply_jitter(self, delay: float) -> float:
"""Apply jitter to delay."""
if self.config.jitter == JitterType.NONE:
return delay

if self.config.jitter == JitterType.FULL:
# Full jitter: random(0, delay)
return random.uniform(0, delay * self.config.jitter_range)

if self.config.jitter == JitterType.EQUAL:
# Equal jitter: delay/2 + random(0, delay/2)
half = delay / 2
return half + random.uniform(0, half * self.config.jitter_range)

if self.config.jitter == JitterType.DECORRELATED:
# Decorrelated jitter: random(base, delay * 3)
return random.uniform(
self.config.base_delay,
delay * 3 * self.config.jitter_range,
)

return delay

def should_retry(
self,
attempt: int,
exception: Exception,
elapsed: float,
) -> RetryDecision:
"""
Decide whether to retry.

Args:
attempt: Current attempt number (0-indexed)
exception: Exception that was raised
elapsed: Total elapsed time in seconds

Returns:
RetryDecision
"""
# Check max attempts
if attempt >= self.config.max_attempts - 1:
return RetryDecision.STOP

# Check total timeout
if self.config.total_timeout is not None:
if elapsed >= self.config.total_timeout:
return RetryDecision.STOP

# Check ignore exceptions
for exc_type in self.config.ignore_exceptions:
if isinstance(exception, exc_type):
return RetryDecision.RAISE

# Check retry exceptions (if specified)
if self.config.retry_exceptions is not None:
for exc_type in self.config.retry_exceptions:
if isinstance(exception, exc_type):
return RetryDecision.RETRY
return RetryDecision.RAISE

return RetryDecision.RETRY

def reset(self):
"""Reset policy state (for decorrelated backoff)."""
self._last_delay = self.config.base_delay

=============================================================================

Retry Engine

=============================================================================

class RetryEngine: """ Executes functions with retry logic and backoff.

Supports both sync and async functions with configurable
retry policies, backoff strategies, and jitter.

Example:
engine = RetryEngine(RetryConfig(
max_attempts=5,
backoff_strategy=BackoffStrategy.EXPONENTIAL,
))

result = await engine.execute(api_call, endpoint="/users")
"""

def __init__(self, config: Optional[RetryConfig] = None):
"""
Initialize retry engine.

Args:
config: Retry configuration (uses defaults if None)
"""
self.config = config or RetryConfig()
self.policy = RetryPolicy(self.config)

# Metrics
self._total_executions = 0
self._successful_executions = 0
self._failed_executions = 0
self._total_retries = 0
self._total_delay = 0.0
self._attempts_by_exception: Dict[str, int] = {}
self._attempt_history: List[RetryAttempt] = []

async def execute(
self,
func: Callable[..., T],
*args: Any,
**kwargs: Any,
) -> T:
"""
Execute a function with retry logic.

Args:
func: Function to execute (sync or async)
*args: Positional arguments
**kwargs: Keyword arguments

Returns:
Result of the function

Raises:
MaxRetriesExceeded: If all retries exhausted
RetryTimeout: If total timeout exceeded
Exception: If non-retryable exception raised
"""
self._total_executions += 1
self.policy.reset()

start_time = time.time()
last_exception: Optional[Exception] = None
attempts: List[RetryAttempt] = []

for attempt in range(self.config.max_attempts):
attempt_start = datetime.utcnow()
attempt_record = RetryAttempt(
attempt_number=attempt + 1,
started_at=attempt_start,
delay_before=0.0 if attempt == 0 else attempts[-1].delay_after,
)

try:
# Execute function
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)

# Success
attempt_record.ended_at = datetime.utcnow()
attempt_record.duration_ms = (
time.time() - attempt_start.timestamp()
) * 1000
attempt_record.success = True
attempts.append(attempt_record)

self._successful_executions += 1
self._attempt_history.extend(attempts)

# Callback
if self.config.on_success:
try:
self.config.on_success(attempt + 1, result)
except Exception as e:
logger.warning(f"on_success callback error: {e}")

logger.debug(
f"Retry engine success on attempt {attempt + 1}"
)
return result

except Exception as e:
last_exception = e
attempt_record.ended_at = datetime.utcnow()
attempt_record.duration_ms = (
time.time() - attempt_start.timestamp()
) * 1000
attempt_record.exception_type = type(e).__name__
attempt_record.exception_message = str(e)

# Track by exception type
exc_name = type(e).__name__
self._attempts_by_exception[exc_name] = (
self._attempts_by_exception.get(exc_name, 0) + 1
)

elapsed = time.time() - start_time
decision = self.policy.should_retry(attempt, e, elapsed)

if decision == RetryDecision.RAISE:
attempts.append(attempt_record)
self._failed_executions += 1
self._attempt_history.extend(attempts)
raise

if decision == RetryDecision.STOP:
attempts.append(attempt_record)
break

# Calculate delay
delay = self.policy.calculate_delay(attempt)
attempt_record.delay_after = delay
attempts.append(attempt_record)

self._total_retries += 1
self._total_delay += delay

# Callback before retry
if self.config.on_retry:
try:
self.config.on_retry(attempt + 1, delay, e)
except Exception as cb_err:
logger.warning(f"on_retry callback error: {cb_err}")

logger.debug(
f"Retry engine attempt {attempt + 1} failed: {e}, "
f"retrying in {delay:.2f}s"
)

# Wait before retry
await asyncio.sleep(delay)

# All retries exhausted
self._failed_executions += 1
self._attempt_history.extend(attempts)

# Callback on failure
if self.config.on_failure and last_exception:
try:
self.config.on_failure(len(attempts), last_exception)
except Exception as e:
logger.warning(f"on_failure callback error: {e}")

# Check if timeout was the reason
elapsed = time.time() - start_time
if (
self.config.total_timeout is not None
and elapsed >= self.config.total_timeout
):
raise RetryTimeout(
f"Retry timeout after {elapsed:.2f}s",
elapsed,
self.config.total_timeout,
)

raise MaxRetriesExceeded(
f"Max retries ({self.config.max_attempts}) exceeded",
len(attempts),
last_exception,
)

def execute_sync(
self,
func: Callable[..., T],
*args: Any,
**kwargs: Any,
) -> T:
"""
Execute a synchronous function with retry logic.

Args:
func: Synchronous function to execute
*args: Positional arguments
**kwargs: Keyword arguments

Returns:
Result of the function
"""
self._total_executions += 1
self.policy.reset()

start_time = time.time()
last_exception: Optional[Exception] = None

for attempt in range(self.config.max_attempts):
try:
result = func(*args, **kwargs)

self._successful_executions += 1

if self.config.on_success:
try:
self.config.on_success(attempt + 1, result)
except Exception as e:
logger.warning(f"on_success callback error: {e}")

return result

except Exception as e:
last_exception = e

exc_name = type(e).__name__
self._attempts_by_exception[exc_name] = (
self._attempts_by_exception.get(exc_name, 0) + 1
)

elapsed = time.time() - start_time
decision = self.policy.should_retry(attempt, e, elapsed)

if decision == RetryDecision.RAISE:
self._failed_executions += 1
raise

if decision == RetryDecision.STOP:
break

delay = self.policy.calculate_delay(attempt)
self._total_retries += 1
self._total_delay += delay

if self.config.on_retry:
try:
self.config.on_retry(attempt + 1, delay, e)
except Exception as cb_err:
logger.warning(f"on_retry callback error: {cb_err}")

time.sleep(delay)

self._failed_executions += 1

if self.config.on_failure and last_exception:
try:
self.config.on_failure(self.config.max_attempts, last_exception)
except Exception as e:
logger.warning(f"on_failure callback error: {e}")

elapsed = time.time() - start_time
if (
self.config.total_timeout is not None
and elapsed >= self.config.total_timeout
):
raise RetryTimeout(
f"Retry timeout after {elapsed:.2f}s",
elapsed,
self.config.total_timeout,
)

raise MaxRetriesExceeded(
f"Max retries ({self.config.max_attempts}) exceeded",
self.config.max_attempts,
last_exception,
)

def get_metrics(self) -> RetryMetrics:
"""Get retry metrics."""
total = self._successful_executions + self._failed_executions
success_rate = (
self._successful_executions / total if total > 0 else 0.0
)
avg_attempts = (
(self._total_retries + self._successful_executions)
/ self._successful_executions
if self._successful_executions > 0
else 0.0
)

return RetryMetrics(
total_attempts=self._total_executions,
successful_attempts=self._successful_executions,
failed_attempts=self._failed_executions,
total_retries=self._total_retries,
total_delay_seconds=self._total_delay,
avg_attempts_per_success=avg_attempts,
last_attempt_time=(
self._attempt_history[-1].started_at
if self._attempt_history
else None
),
attempts_by_exception=self._attempts_by_exception.copy(),
success_rate=success_rate,
)

def reset_metrics(self):
"""Reset all metrics."""
self._total_executions = 0
self._successful_executions = 0
self._failed_executions = 0
self._total_retries = 0
self._total_delay = 0.0
self._attempts_by_exception.clear()
self._attempt_history.clear()

=============================================================================

Retry with Circuit Breaker

=============================================================================

class RetryWithCircuitBreaker: """ Combines retry logic with circuit breaker for coordinated fault tolerance.

The circuit breaker wraps the retry logic:
- If circuit is OPEN: Fail fast without retrying
- If circuit is CLOSED/HALF_OPEN: Execute with retries
- Retry failures count toward circuit breaker threshold

Example:
from scripts.core.circuit_breaker import CircuitBreaker

resilient = RetryWithCircuitBreaker(
retry_config=RetryConfig(max_attempts=3),
circuit_breaker=CircuitBreaker("api-service"),
)

result = await resilient.execute(api_call, endpoint="/users")
"""

def __init__(
self,
retry_config: Optional[RetryConfig] = None,
circuit_breaker: Optional[Any] = None, # CircuitBreaker type
):
"""
Initialize retry with circuit breaker.

Args:
retry_config: Retry configuration
circuit_breaker: CircuitBreaker instance (optional)
"""
self.retry_engine = RetryEngine(retry_config or RetryConfig())
self.circuit_breaker = circuit_breaker

async def execute(
self,
func: Callable[..., T],
*args: Any,
**kwargs: Any,
) -> T:
"""
Execute with retry and circuit breaker protection.

Args:
func: Function to execute
*args: Positional arguments
**kwargs: Keyword arguments

Returns:
Result of the function

Raises:
CircuitBreakerOpen: If circuit is open
MaxRetriesExceeded: If all retries exhausted
"""
if self.circuit_breaker is None:
# No circuit breaker - just retry
return await self.retry_engine.execute(func, *args, **kwargs)

# Wrap retry execution in circuit breaker
async def retry_wrapper():
return await self.retry_engine.execute(func, *args, **kwargs)

# Use circuit breaker's call method
if hasattr(self.circuit_breaker, "call"):
return await self.circuit_breaker.call(retry_wrapper)
else:
# Fallback if circuit breaker has different interface
return await retry_wrapper()

def execute_sync(
self,
func: Callable[..., T],
*args: Any,
**kwargs: Any,
) -> T:
"""Execute synchronously with retry and circuit breaker."""
if self.circuit_breaker is None:
return self.retry_engine.execute_sync(func, *args, **kwargs)

def retry_wrapper():
return self.retry_engine.execute_sync(func, *args, **kwargs)

if hasattr(self.circuit_breaker, "call_sync"):
return self.circuit_breaker.call_sync(retry_wrapper)
else:
return retry_wrapper()

=============================================================================

Decorator

=============================================================================

def retry( max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, multiplier: float = 2.0, backoff: Union[str, BackoffStrategy] = BackoffStrategy.EXPONENTIAL, jitter: Union[str, JitterType] = JitterType.FULL, retry_exceptions: Optional[Set[Type[Exception]]] = None, ignore_exceptions: Optional[Set[Type[Exception]]] = None, total_timeout: Optional[float] = None, on_retry: Optional[Callable[[int, float, Exception], None]] = None, ) -> Callable[[F], F]: """ Decorator to add retry logic to a function.

Example:
@retry(max_attempts=5, backoff="exponential")
async def call_api(url: str):
return await http_client.get(url)

@retry(
max_attempts=3,
retry_exceptions={ConnectionError, TimeoutError},
)
def connect_database():
return db.connect()

Args:
max_attempts: Maximum attempts (including first try)
base_delay: Base delay in seconds
max_delay: Maximum delay cap
multiplier: Multiplier for exponential backoff
backoff: Backoff strategy name or enum
jitter: Jitter type name or enum
retry_exceptions: Only retry these exception types
ignore_exceptions: Never retry these exception types
total_timeout: Total timeout for all attempts
on_retry: Callback before each retry

Returns:
Decorated function
"""
# Convert string to enum if needed
if isinstance(backoff, str):
backoff = BackoffStrategy(backoff)
if isinstance(jitter, str):
jitter = JitterType(jitter)

config = RetryConfig(
max_attempts=max_attempts,
base_delay=base_delay,
max_delay=max_delay,
multiplier=multiplier,
backoff_strategy=backoff,
jitter=jitter,
retry_exceptions=retry_exceptions,
ignore_exceptions=ignore_exceptions or set(),
total_timeout=total_timeout,
on_retry=on_retry,
)

def decorator(func: F) -> F:
engine = RetryEngine(config)

if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
return await engine.execute(func, *args, **kwargs)
return async_wrapper # type: ignore
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
return engine.execute_sync(func, *args, **kwargs)
return sync_wrapper # type: ignore

return decorator

=============================================================================

Convenience Functions

=============================================================================

def create_retry_engine( max_attempts: int = 3, base_delay: float = 1.0, backoff: BackoffStrategy = BackoffStrategy.EXPONENTIAL, jitter: JitterType = JitterType.FULL, **kwargs, ) -> RetryEngine: """Create a retry engine with common settings.""" config = RetryConfig( max_attempts=max_attempts, base_delay=base_delay, backoff_strategy=backoff, jitter=jitter, **kwargs, ) return RetryEngine(config)

def exponential_backoff( max_attempts: int = 5, base_delay: float = 1.0, max_delay: float = 60.0, multiplier: float = 2.0, ) -> RetryEngine: """Create retry engine with exponential backoff.""" return create_retry_engine( max_attempts=max_attempts, base_delay=base_delay, max_delay=max_delay, backoff=BackoffStrategy.EXPONENTIAL, multiplier=multiplier, )

def linear_backoff( max_attempts: int = 5, base_delay: float = 1.0, max_delay: float = 30.0, increment: float = 2.0, ) -> RetryEngine: """Create retry engine with linear backoff.""" config = RetryConfig( max_attempts=max_attempts, base_delay=base_delay, max_delay=max_delay, increment=increment, backoff_strategy=BackoffStrategy.LINEAR, ) return RetryEngine(config)

def constant_backoff( max_attempts: int = 3, delay: float = 1.0, ) -> RetryEngine: """Create retry engine with constant delay.""" config = RetryConfig( max_attempts=max_attempts, base_delay=delay, max_delay=delay, backoff_strategy=BackoffStrategy.FIXED, jitter=JitterType.NONE, ) return RetryEngine(config)

async def retry_async( func: Callable[..., T], *args: Any, max_attempts: int = 3, base_delay: float = 1.0, **kwargs: Any, ) -> T: """ Convenience function to retry an async function.

Example:
result = await retry_async(api_call, url, max_attempts=5)
"""
engine = create_retry_engine(max_attempts=max_attempts, base_delay=base_delay)
return await engine.execute(func, *args, **kwargs)

def retry_sync( func: Callable[..., T], *args: Any, max_attempts: int = 3, base_delay: float = 1.0, **kwargs: Any, ) -> T: """ Convenience function to retry a sync function.

Example:
result = retry_sync(db_connect, host, max_attempts=3)
"""
engine = create_retry_engine(max_attempts=max_attempts, base_delay=base_delay)
return engine.execute_sync(func, *args, **kwargs)

=============================================================================

Predefined Retry Configurations

=============================================================================

Standard configurations for common use cases

RETRY_CONFIGS = { "aggressive": RetryConfig( max_attempts=10, base_delay=0.1, max_delay=10.0, multiplier=1.5, jitter=JitterType.FULL, ), "standard": RetryConfig( max_attempts=5, base_delay=1.0, max_delay=30.0, multiplier=2.0, jitter=JitterType.EQUAL, ), "conservative": RetryConfig( max_attempts=3, base_delay=2.0, max_delay=60.0, multiplier=3.0, jitter=JitterType.FULL, ), "quick": RetryConfig( max_attempts=3, base_delay=0.1, max_delay=1.0, multiplier=2.0, jitter=JitterType.FULL, ), "database": RetryConfig( max_attempts=5, base_delay=0.5, max_delay=30.0, multiplier=2.0, jitter=JitterType.EQUAL, retry_exceptions={ConnectionError, TimeoutError}, ), "api": RetryConfig( max_attempts=3, base_delay=1.0, max_delay=10.0, multiplier=2.0, jitter=JitterType.FULL, total_timeout=30.0, ), }

def get_retry_config(name: str) -> RetryConfig: """Get a predefined retry configuration.""" if name not in RETRY_CONFIGS: raise ValueError( f"Unknown config '{name}'. " f"Available: {list(RETRY_CONFIGS.keys())}" ) return RETRY_CONFIGS[name]

=============================================================================

CLI for testing

=============================================================================

if name == "main": import argparse import sys

parser = argparse.ArgumentParser(description="CODITECT Retry Engine CLI")
parser.add_argument(
"command",
choices=["demo", "backoff-chart", "configs"],
help="Command to run",
)
parser.add_argument("--attempts", type=int, default=5, help="Max attempts")
parser.add_argument("--delay", type=float, default=1.0, help="Base delay")
parser.add_argument(
"--strategy",
choices=["exponential", "linear", "fixed", "fibonacci", "decorrelated"],
default="exponential",
help="Backoff strategy",
)
parser.add_argument(
"--jitter",
choices=["none", "full", "equal", "decorrelated"],
default="full",
help="Jitter type",
)
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose")

args = parser.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

async def main():
if args.command == "demo":
print("=" * 60)
print("CODITECT Retry Engine Demo")
print("=" * 60)

config = RetryConfig(
max_attempts=args.attempts,
base_delay=args.delay,
backoff_strategy=BackoffStrategy(args.strategy),
jitter=JitterType(args.jitter),
)
engine = RetryEngine(config)

# Simulate flaky function
call_count = 0

async def flaky_service():
nonlocal call_count
call_count += 1
if call_count < args.attempts - 1:
raise Exception(f"Simulated failure #{call_count}")
return f"Success on attempt {call_count}"

print(f"\nConfig: {config.to_dict()}")
print(f"\nExecuting flaky service (fails until attempt {args.attempts - 1})...")
print("-" * 60)

try:
result = await engine.execute(flaky_service)
print(f"\nResult: {result}")
except MaxRetriesExceeded as e:
print(f"\nFailed: {e}")

print("\nMetrics:")
metrics = engine.get_metrics()
print(f" Total attempts: {metrics.total_attempts}")
print(f" Successful: {metrics.successful_attempts}")
print(f" Failed: {metrics.failed_attempts}")
print(f" Total retries: {metrics.total_retries}")
print(f" Total delay: {metrics.total_delay_seconds:.2f}s")
print(f" Success rate: {metrics.success_rate:.1%}")

elif args.command == "backoff-chart":
print("=" * 60)
print("Backoff Delay Chart")
print("=" * 60)

strategies = [
BackoffStrategy.EXPONENTIAL,
BackoffStrategy.LINEAR,
BackoffStrategy.FIBONACCI,
]

for strategy in strategies:
config = RetryConfig(
max_attempts=10,
base_delay=args.delay,
max_delay=120.0,
backoff_strategy=strategy,
jitter=JitterType.NONE,
)
policy = RetryPolicy(config)

print(f"\n{strategy.value.upper()}:")
for i in range(args.attempts):
delay = policy.calculate_delay(i)
bar = "█" * int(delay)
print(f" Attempt {i+1}: {delay:6.2f}s {bar}")

elif args.command == "configs":
print("=" * 60)
print("Predefined Retry Configurations")
print("=" * 60)

for name, config in RETRY_CONFIGS.items():
print(f"\n{name}:")
for key, value in config.to_dict().items():
print(f" {key}: {value}")

asyncio.run(main())