#!/usr/bin/env python3 """ CODITECT Retry Engine Unit Tests
Part of Track H.2.5: Inter-Agent Communication Infrastructure Tests for retry_engine.py - comprehensive coverage of all components
Test Categories: 1. Enum Tests (BackoffStrategy, JitterType, RetryDecision) 2. Exception Tests (RetryError, MaxRetriesExceeded, RetryTimeout) 3. RetryConfig Tests (validation, defaults, serialization) 4. RetryPolicy Tests (delay calculation, jitter, should_retry) 5. RetryEngine Tests (sync, async, metrics) 6. Decorator Tests (@retry) 7. Integration Tests (RetryWithCircuitBreaker) 8. Convenience Function Tests 9. Predefined Config Tests 10. Edge Case Tests
Run: pytest scripts/core/test_retry_engine.py -v """
import asyncio import pytest import time from datetime import datetime from typing import List from unittest.mock import Mock, patch, AsyncMock
Import retry engine components
import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(file))))
from scripts.core.retry_engine import ( # Enums BackoffStrategy, JitterType, RetryDecision, # Exceptions RetryError, MaxRetriesExceeded, RetryTimeout, # Data classes RetryConfig, RetryAttempt, RetryMetrics, # Core classes RetryPolicy, RetryEngine, RetryWithCircuitBreaker, # Decorator retry, # Convenience functions create_retry_engine, exponential_backoff, linear_backoff, constant_backoff, retry_async, retry_sync, # Predefined configs RETRY_CONFIGS, get_retry_config, )
=============================================================================
Test Fixtures
=============================================================================
@pytest.fixture def default_config(): """Default retry configuration.""" return RetryConfig()
@pytest.fixture def exponential_config(): """Exponential backoff configuration.""" return RetryConfig( max_attempts=5, base_delay=1.0, max_delay=60.0, multiplier=2.0, backoff_strategy=BackoffStrategy.EXPONENTIAL, jitter=JitterType.NONE, )
@pytest.fixture def linear_config(): """Linear backoff configuration.""" return RetryConfig( max_attempts=5, base_delay=1.0, max_delay=30.0, increment=2.0, backoff_strategy=BackoffStrategy.LINEAR, jitter=JitterType.NONE, )
@pytest.fixture def retry_engine(default_config): """Default retry engine.""" return RetryEngine(default_config)
=============================================================================
1. Enum Tests
=============================================================================
class TestBackoffStrategy: """Tests for BackoffStrategy enum."""
def test_exponential_value(self):
"""Test exponential strategy value."""
assert BackoffStrategy.EXPONENTIAL.value == "exponential"
def test_linear_value(self):
"""Test linear strategy value."""
assert BackoffStrategy.LINEAR.value == "linear"
def test_fixed_value(self):
"""Test fixed strategy value."""
assert BackoffStrategy.FIXED.value == "fixed"
def test_fibonacci_value(self):
"""Test fibonacci strategy value."""
assert BackoffStrategy.FIBONACCI.value == "fibonacci"
def test_decorrelated_value(self):
"""Test decorrelated strategy value."""
assert BackoffStrategy.DECORRELATED.value == "decorrelated"
def test_from_string(self):
"""Test creating strategy from string."""
assert BackoffStrategy("exponential") == BackoffStrategy.EXPONENTIAL
assert BackoffStrategy("linear") == BackoffStrategy.LINEAR
def test_all_strategies_exist(self):
"""Test all expected strategies exist."""
strategies = [s.value for s in BackoffStrategy]
assert "exponential" in strategies
assert "linear" in strategies
assert "fixed" in strategies
assert "fibonacci" in strategies
assert "decorrelated" in strategies
class TestJitterType: """Tests for JitterType enum."""
def test_none_value(self):
"""Test none jitter value."""
assert JitterType.NONE.value == "none"
def test_full_value(self):
"""Test full jitter value."""
assert JitterType.FULL.value == "full"
def test_equal_value(self):
"""Test equal jitter value."""
assert JitterType.EQUAL.value == "equal"
def test_decorrelated_value(self):
"""Test decorrelated jitter value."""
assert JitterType.DECORRELATED.value == "decorrelated"
def test_from_string(self):
"""Test creating jitter type from string."""
assert JitterType("full") == JitterType.FULL
assert JitterType("none") == JitterType.NONE
class TestRetryDecision: """Tests for RetryDecision enum."""
def test_retry_value(self):
"""Test retry decision value."""
assert RetryDecision.RETRY.value == "retry"
def test_stop_value(self):
"""Test stop decision value."""
assert RetryDecision.STOP.value == "stop"
def test_raise_value(self):
"""Test raise decision value."""
assert RetryDecision.RAISE.value == "raise"
=============================================================================
2. Exception Tests
=============================================================================
class TestRetryExceptions: """Tests for retry exceptions."""
def test_retry_error_base(self):
"""Test RetryError is base exception."""
error = RetryError("test error")
assert str(error) == "test error"
assert isinstance(error, Exception)
def test_max_retries_exceeded(self):
"""Test MaxRetriesExceeded exception."""
original = ValueError("original error")
error = MaxRetriesExceeded(
"Max retries exceeded",
attempts=5,
last_exception=original,
)
assert error.attempts == 5
assert error.last_exception is original
assert "Max retries exceeded" in str(error)
assert isinstance(error, RetryError)
def test_max_retries_exceeded_without_last_exception(self):
"""Test MaxRetriesExceeded without last exception."""
error = MaxRetriesExceeded("Failed", attempts=3)
assert error.attempts == 3
assert error.last_exception is None
def test_retry_timeout(self):
"""Test RetryTimeout exception."""
error = RetryTimeout(
"Timeout exceeded",
elapsed=45.5,
timeout=30.0,
)
assert error.elapsed == 45.5
assert error.timeout == 30.0
assert "Timeout exceeded" in str(error)
assert isinstance(error, RetryError)
=============================================================================
3. RetryConfig Tests
=============================================================================
class TestRetryConfig: """Tests for RetryConfig dataclass."""
def test_default_values(self):
"""Test default configuration values."""
config = RetryConfig()
assert config.max_attempts == 3
assert config.base_delay == 1.0
assert config.max_delay == 60.0
assert config.multiplier == 2.0
assert config.increment == 1.0
assert config.backoff_strategy == BackoffStrategy.EXPONENTIAL
assert config.jitter == JitterType.FULL
assert config.jitter_range == 1.0
assert config.total_timeout is None
assert config.retry_exceptions is None
assert config.ignore_exceptions == set()
def test_custom_values(self):
"""Test custom configuration values."""
config = RetryConfig(
max_attempts=10,
base_delay=0.5,
max_delay=30.0,
multiplier=1.5,
backoff_strategy=BackoffStrategy.LINEAR,
jitter=JitterType.EQUAL,
)
assert config.max_attempts == 10
assert config.base_delay == 0.5
assert config.max_delay == 30.0
assert config.multiplier == 1.5
assert config.backoff_strategy == BackoffStrategy.LINEAR
assert config.jitter == JitterType.EQUAL
def test_validation_max_attempts_zero(self):
"""Test validation rejects zero max_attempts."""
with pytest.raises(ValueError, match="max_attempts must be at least 1"):
RetryConfig(max_attempts=0)
def test_validation_max_attempts_negative(self):
"""Test validation rejects negative max_attempts."""
with pytest.raises(ValueError, match="max_attempts must be at least 1"):
RetryConfig(max_attempts=-1)
def test_validation_negative_base_delay(self):
"""Test validation rejects negative base_delay."""
with pytest.raises(ValueError, match="base_delay must be non-negative"):
RetryConfig(base_delay=-1.0)
def test_validation_max_delay_less_than_base(self):
"""Test validation rejects max_delay < base_delay."""
with pytest.raises(ValueError, match="max_delay must be >= base_delay"):
RetryConfig(base_delay=10.0, max_delay=5.0)
def test_to_dict(self):
"""Test configuration to dictionary conversion."""
config = RetryConfig(
max_attempts=5,
base_delay=2.0,
backoff_strategy=BackoffStrategy.LINEAR,
)
result = config.to_dict()
assert result["max_attempts"] == 5
assert result["base_delay"] == 2.0
assert result["backoff_strategy"] == "linear"
assert "on_retry" not in result # Callbacks excluded
def test_retry_exceptions_set(self):
"""Test retry_exceptions configuration."""
config = RetryConfig(
retry_exceptions={ConnectionError, TimeoutError}
)
assert ConnectionError in config.retry_exceptions
assert TimeoutError in config.retry_exceptions
def test_ignore_exceptions_set(self):
"""Test ignore_exceptions configuration."""
config = RetryConfig(
ignore_exceptions={ValueError, KeyError}
)
assert ValueError in config.ignore_exceptions
assert KeyError in config.ignore_exceptions
def test_total_timeout(self):
"""Test total_timeout configuration."""
config = RetryConfig(total_timeout=30.0)
assert config.total_timeout == 30.0
def test_callbacks(self):
"""Test callback configuration."""
on_retry = Mock()
on_success = Mock()
on_failure = Mock()
config = RetryConfig(
on_retry=on_retry,
on_success=on_success,
on_failure=on_failure,
)
assert config.on_retry is on_retry
assert config.on_success is on_success
assert config.on_failure is on_failure
=============================================================================
4. RetryPolicy Tests
=============================================================================
class TestRetryPolicy: """Tests for RetryPolicy class."""
def test_exponential_delay_calculation(self, exponential_config):
"""Test exponential backoff delay calculation."""
policy = RetryPolicy(exponential_config)
# delay = base * (multiplier ^ attempt)
assert policy.calculate_delay(0) == 1.0 # 1 * 2^0 = 1
assert policy.calculate_delay(1) == 2.0 # 1 * 2^1 = 2
assert policy.calculate_delay(2) == 4.0 # 1 * 2^2 = 4
assert policy.calculate_delay(3) == 8.0 # 1 * 2^3 = 8
def test_linear_delay_calculation(self, linear_config):
"""Test linear backoff delay calculation."""
policy = RetryPolicy(linear_config)
# delay = base + (increment * attempt)
assert policy.calculate_delay(0) == 1.0 # 1 + 2*0 = 1
assert policy.calculate_delay(1) == 3.0 # 1 + 2*1 = 3
assert policy.calculate_delay(2) == 5.0 # 1 + 2*2 = 5
assert policy.calculate_delay(3) == 7.0 # 1 + 2*3 = 7
def test_fixed_delay_calculation(self):
"""Test fixed backoff delay calculation."""
config = RetryConfig(
base_delay=5.0,
backoff_strategy=BackoffStrategy.FIXED,
jitter=JitterType.NONE,
)
policy = RetryPolicy(config)
# Fixed delay is always base_delay
assert policy.calculate_delay(0) == 5.0
assert policy.calculate_delay(1) == 5.0
assert policy.calculate_delay(5) == 5.0
def test_fibonacci_delay_calculation(self):
"""Test fibonacci backoff delay calculation."""
config = RetryConfig(
base_delay=1.0,
max_delay=100.0,
backoff_strategy=BackoffStrategy.FIBONACCI,
jitter=JitterType.NONE,
)
policy = RetryPolicy(config)
# delay = base * fib(attempt + 1)
# fib sequence: 1, 1, 2, 3, 5, 8, 13, 21...
assert policy.calculate_delay(0) == 1.0 # 1 * fib(1) = 1
assert policy.calculate_delay(1) == 1.0 # 1 * fib(2) = 1
assert policy.calculate_delay(2) == 2.0 # 1 * fib(3) = 2
assert policy.calculate_delay(3) == 3.0 # 1 * fib(4) = 3
assert policy.calculate_delay(4) == 5.0 # 1 * fib(5) = 5
assert policy.calculate_delay(5) == 8.0 # 1 * fib(6) = 8
def test_decorrelated_delay_calculation(self):
"""Test decorrelated backoff delay calculation."""
config = RetryConfig(
base_delay=1.0,
max_delay=30.0,
backoff_strategy=BackoffStrategy.DECORRELATED,
jitter=JitterType.NONE,
)
policy = RetryPolicy(config)
# Decorrelated: random(base, previous * 3)
delays = [policy.calculate_delay(i) for i in range(5)]
# Each delay should be >= base_delay
for delay in delays:
assert delay >= config.base_delay
# Delays should be capped at max_delay
for delay in delays:
assert delay <= config.max_delay
def test_max_delay_cap(self):
"""Test delay is capped at max_delay."""
config = RetryConfig(
base_delay=1.0,
max_delay=10.0,
multiplier=10.0, # Aggressive multiplier
backoff_strategy=BackoffStrategy.EXPONENTIAL,
jitter=JitterType.NONE,
)
policy = RetryPolicy(config)
# After a few attempts, delay should be capped
assert policy.calculate_delay(5) == 10.0 # Would be 100000 uncapped
def test_full_jitter(self):
"""Test full jitter reduces delay randomly."""
config = RetryConfig(
base_delay=10.0,
backoff_strategy=BackoffStrategy.FIXED,
jitter=JitterType.FULL,
)
policy = RetryPolicy(config)
delays = [policy.calculate_delay(0) for _ in range(100)]
# Full jitter: random(0, delay)
assert min(delays) >= 0
assert max(delays) <= 10.0
# Should have variation
assert min(delays) < max(delays)
def test_equal_jitter(self):
"""Test equal jitter adds randomness in upper half."""
config = RetryConfig(
base_delay=10.0,
backoff_strategy=BackoffStrategy.FIXED,
jitter=JitterType.EQUAL,
)
policy = RetryPolicy(config)
delays = [policy.calculate_delay(0) for _ in range(100)]
# Equal jitter: delay/2 + random(0, delay/2)
# So range is [5, 10]
assert min(delays) >= 5.0
assert max(delays) <= 10.0
def test_decorrelated_jitter(self):
"""Test decorrelated jitter."""
config = RetryConfig(
base_delay=1.0,
max_delay=30.0,
backoff_strategy=BackoffStrategy.FIXED,
jitter=JitterType.DECORRELATED,
)
policy = RetryPolicy(config)
delays = [policy.calculate_delay(0) for _ in range(100)]
# Decorrelated: random(base, delay * 3)
assert min(delays) >= 1.0
# Should have wide variation
assert max(delays) > min(delays)
def test_no_jitter(self):
"""Test no jitter returns exact delay."""
config = RetryConfig(
base_delay=5.0,
backoff_strategy=BackoffStrategy.FIXED,
jitter=JitterType.NONE,
)
policy = RetryPolicy(config)
delays = [policy.calculate_delay(0) for _ in range(10)]
# All delays should be exactly 5.0
assert all(d == 5.0 for d in delays)
def test_should_retry_within_attempts(self):
"""Test should_retry returns RETRY within max_attempts."""
config = RetryConfig(max_attempts=5)
policy = RetryPolicy(config)
decision = policy.should_retry(0, Exception("test"), 0.0)
assert decision == RetryDecision.RETRY
decision = policy.should_retry(3, Exception("test"), 0.0)
assert decision == RetryDecision.RETRY
def test_should_retry_at_max_attempts(self):
"""Test should_retry returns STOP at max_attempts."""
config = RetryConfig(max_attempts=3)
policy = RetryPolicy(config)
# Attempt 2 is the last (0-indexed)
decision = policy.should_retry(2, Exception("test"), 0.0)
assert decision == RetryDecision.STOP
def test_should_retry_timeout_exceeded(self):
"""Test should_retry returns STOP on timeout."""
config = RetryConfig(max_attempts=10, total_timeout=30.0)
policy = RetryPolicy(config)
decision = policy.should_retry(0, Exception("test"), 35.0)
assert decision == RetryDecision.STOP
def test_should_retry_ignore_exception(self):
"""Test should_retry returns RAISE for ignored exceptions."""
config = RetryConfig(
max_attempts=5,
ignore_exceptions={ValueError},
)
policy = RetryPolicy(config)
decision = policy.should_retry(0, ValueError("bad value"), 0.0)
assert decision == RetryDecision.RAISE
def test_should_retry_only_specified_exceptions(self):
"""Test should_retry only retries specified exceptions."""
config = RetryConfig(
max_attempts=5,
retry_exceptions={ConnectionError},
)
policy = RetryPolicy(config)
# Should retry ConnectionError
decision = policy.should_retry(0, ConnectionError(), 0.0)
assert decision == RetryDecision.RETRY
# Should raise ValueError (not in retry_exceptions)
decision = policy.should_retry(0, ValueError(), 0.0)
assert decision == RetryDecision.RAISE
def test_reset_decorrelated(self):
"""Test reset resets decorrelated state."""
config = RetryConfig(
base_delay=1.0,
backoff_strategy=BackoffStrategy.DECORRELATED,
)
policy = RetryPolicy(config)
# Calculate some delays
policy.calculate_delay(0)
policy.calculate_delay(1)
# Reset
policy.reset()
# _last_delay should be reset to base_delay
assert policy._last_delay == config.base_delay
=============================================================================
5. RetryEngine Tests
=============================================================================
class TestRetryEngineSync: """Tests for RetryEngine synchronous execution."""
def test_execute_sync_success_first_try(self):
"""Test sync execution succeeds on first try."""
config = RetryConfig(max_attempts=3, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
result = engine.execute_sync(lambda: "success")
assert result == "success"
metrics = engine.get_metrics()
assert metrics.successful_attempts == 1
assert metrics.total_retries == 0
def test_execute_sync_success_after_retries(self):
"""Test sync execution succeeds after retries."""
config = RetryConfig(max_attempts=5, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
call_count = 0
def flaky_func():
nonlocal call_count
call_count += 1
if call_count < 3:
raise ConnectionError("failed")
return "success"
result = engine.execute_sync(flaky_func)
assert result == "success"
assert call_count == 3
metrics = engine.get_metrics()
assert metrics.successful_attempts == 1
assert metrics.total_retries == 2
def test_execute_sync_max_retries_exceeded(self):
"""Test sync execution raises MaxRetriesExceeded."""
config = RetryConfig(max_attempts=3, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
def always_fails():
raise ConnectionError("always fails")
with pytest.raises(MaxRetriesExceeded) as exc_info:
engine.execute_sync(always_fails)
assert exc_info.value.attempts == 3
assert isinstance(exc_info.value.last_exception, ConnectionError)
def test_execute_sync_with_args(self):
"""Test sync execution with arguments."""
config = RetryConfig(max_attempts=3)
engine = RetryEngine(config)
def add(a, b):
return a + b
result = engine.execute_sync(add, 2, 3)
assert result == 5
def test_execute_sync_with_kwargs(self):
"""Test sync execution with keyword arguments."""
config = RetryConfig(max_attempts=3)
engine = RetryEngine(config)
def greet(name, greeting="Hello"):
return f"{greeting}, {name}!"
result = engine.execute_sync(greet, "World", greeting="Hi")
assert result == "Hi, World!"
def test_execute_sync_immediate_raise(self):
"""Test sync execution raises immediately for ignored exceptions."""
config = RetryConfig(
max_attempts=5,
ignore_exceptions={ValueError},
)
engine = RetryEngine(config)
call_count = 0
def raises_value_error():
nonlocal call_count
call_count += 1
raise ValueError("bad value")
with pytest.raises(ValueError):
engine.execute_sync(raises_value_error)
# Should have raised immediately, not retried
assert call_count == 1
def test_execute_sync_on_retry_callback(self):
"""Test on_retry callback is called."""
callbacks = []
def on_retry(attempt, delay, exception):
callbacks.append((attempt, delay, type(exception).__name__))
config = RetryConfig(
max_attempts=3,
base_delay=0.01,
jitter=JitterType.NONE,
on_retry=on_retry,
)
engine = RetryEngine(config)
call_count = 0
def flaky():
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("fail")
return "ok"
engine.execute_sync(flaky)
assert len(callbacks) == 2
assert callbacks[0][0] == 1 # First retry attempt number
assert callbacks[1][0] == 2 # Second retry attempt number
def test_execute_sync_on_success_callback(self):
"""Test on_success callback is called."""
success_calls = []
def on_success(attempt, result):
success_calls.append((attempt, result))
config = RetryConfig(
max_attempts=3,
on_success=on_success,
)
engine = RetryEngine(config)
engine.execute_sync(lambda: "result")
assert len(success_calls) == 1
assert success_calls[0] == (1, "result")
def test_execute_sync_on_failure_callback(self):
"""Test on_failure callback is called."""
failure_calls = []
def on_failure(attempt, exception):
failure_calls.append((attempt, type(exception).__name__))
config = RetryConfig(
max_attempts=3,
base_delay=0.01,
jitter=JitterType.NONE,
on_failure=on_failure,
)
engine = RetryEngine(config)
with pytest.raises(MaxRetriesExceeded):
engine.execute_sync(lambda: 1/0)
assert len(failure_calls) == 1
assert failure_calls[0][0] == 3 # Total attempts
assert failure_calls[0][1] == "ZeroDivisionError"
class TestRetryEngineAsync: """Tests for RetryEngine asynchronous execution."""
@pytest.mark.asyncio
async def test_execute_async_success_first_try(self):
"""Test async execution succeeds on first try."""
config = RetryConfig(max_attempts=3, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
async def async_func():
return "async success"
result = await engine.execute(async_func)
assert result == "async success"
metrics = engine.get_metrics()
assert metrics.successful_attempts == 1
@pytest.mark.asyncio
async def test_execute_async_success_after_retries(self):
"""Test async execution succeeds after retries."""
config = RetryConfig(max_attempts=5, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
call_count = 0
async def flaky_async():
nonlocal call_count
call_count += 1
if call_count < 3:
raise ConnectionError("failed")
return "success"
result = await engine.execute(flaky_async)
assert result == "success"
assert call_count == 3
@pytest.mark.asyncio
async def test_execute_async_max_retries_exceeded(self):
"""Test async execution raises MaxRetriesExceeded."""
config = RetryConfig(max_attempts=3, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
async def always_fails():
raise TimeoutError("timeout")
with pytest.raises(MaxRetriesExceeded) as exc_info:
await engine.execute(always_fails)
assert exc_info.value.attempts == 3
@pytest.mark.asyncio
async def test_execute_sync_function_in_async_engine(self):
"""Test sync function can be executed in async engine."""
config = RetryConfig(max_attempts=3)
engine = RetryEngine(config)
def sync_func():
return "sync in async"
result = await engine.execute(sync_func)
assert result == "sync in async"
@pytest.mark.asyncio
async def test_execute_async_with_args(self):
"""Test async execution with arguments."""
config = RetryConfig(max_attempts=3)
engine = RetryEngine(config)
async def async_add(a, b):
await asyncio.sleep(0.001)
return a + b
result = await engine.execute(async_add, 5, 7)
assert result == 12
@pytest.mark.asyncio
async def test_execute_async_retry_delay(self):
"""Test async execution actually delays between retries."""
config = RetryConfig(
max_attempts=3,
base_delay=0.1,
backoff_strategy=BackoffStrategy.FIXED,
jitter=JitterType.NONE,
)
engine = RetryEngine(config)
call_count = 0
call_times = []
async def timed_func():
nonlocal call_count
call_count += 1
call_times.append(time.time())
if call_count < 3:
raise Exception("fail")
return "ok"
start = time.time()
await engine.execute(timed_func)
elapsed = time.time() - start
# Should have waited ~0.2s (2 retries * 0.1s)
assert elapsed >= 0.15
assert len(call_times) == 3
class TestRetryEngineMetrics: """Tests for RetryEngine metrics tracking."""
def test_metrics_initial_state(self):
"""Test metrics initial state."""
engine = RetryEngine()
metrics = engine.get_metrics()
assert metrics.total_attempts == 0
assert metrics.successful_attempts == 0
assert metrics.failed_attempts == 0
assert metrics.total_retries == 0
assert metrics.total_delay_seconds == 0.0
assert metrics.success_rate == 0.0
def test_metrics_after_success(self):
"""Test metrics after successful execution."""
config = RetryConfig(max_attempts=3, base_delay=0.01)
engine = RetryEngine(config)
engine.execute_sync(lambda: "ok")
engine.execute_sync(lambda: "ok")
metrics = engine.get_metrics()
assert metrics.total_attempts == 2
assert metrics.successful_attempts == 2
assert metrics.failed_attempts == 0
assert metrics.success_rate == 1.0
def test_metrics_after_failure(self):
"""Test metrics after failed execution."""
config = RetryConfig(max_attempts=2, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
with pytest.raises(MaxRetriesExceeded):
engine.execute_sync(lambda: 1/0)
metrics = engine.get_metrics()
assert metrics.total_attempts == 1
assert metrics.successful_attempts == 0
assert metrics.failed_attempts == 1
assert metrics.success_rate == 0.0
def test_metrics_exception_tracking(self):
"""Test metrics track exceptions by type."""
config = RetryConfig(max_attempts=2, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
with pytest.raises(MaxRetriesExceeded):
engine.execute_sync(lambda: 1/0)
metrics = engine.get_metrics()
assert "ZeroDivisionError" in metrics.attempts_by_exception
assert metrics.attempts_by_exception["ZeroDivisionError"] == 2
def test_metrics_reset(self):
"""Test metrics reset."""
config = RetryConfig(max_attempts=3, base_delay=0.01)
engine = RetryEngine(config)
engine.execute_sync(lambda: "ok")
engine.reset_metrics()
metrics = engine.get_metrics()
assert metrics.total_attempts == 0
assert metrics.successful_attempts == 0
def test_metrics_avg_attempts(self):
"""Test average attempts per success calculation."""
config = RetryConfig(max_attempts=5, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
call_count = 0
def flaky():
nonlocal call_count
call_count += 1
if call_count % 2 == 0:
return "ok"
raise Exception("fail")
# First call: fails once, succeeds on 2nd attempt
call_count = 0
engine.execute_sync(flaky)
metrics = engine.get_metrics()
# 1 retry + 1 success = 2 attempts for 1 success = avg 2.0
assert metrics.avg_attempts_per_success == 2.0
=============================================================================
6. Decorator Tests
=============================================================================
class TestRetryDecorator: """Tests for @retry decorator."""
def test_decorator_sync_success(self):
"""Test decorator on sync function success."""
@retry(max_attempts=3)
def sync_func():
return "decorated"
result = sync_func()
assert result == "decorated"
def test_decorator_sync_with_retries(self):
"""Test decorator on sync function with retries."""
call_count = 0
@retry(max_attempts=5, base_delay=0.01, jitter="none")
def flaky_func():
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("fail")
return "success"
result = flaky_func()
assert result == "success"
assert call_count == 3
@pytest.mark.asyncio
async def test_decorator_async_success(self):
"""Test decorator on async function success."""
@retry(max_attempts=3)
async def async_func():
return "async decorated"
result = await async_func()
assert result == "async decorated"
@pytest.mark.asyncio
async def test_decorator_async_with_retries(self):
"""Test decorator on async function with retries."""
call_count = 0
@retry(max_attempts=5, base_delay=0.01, jitter="none")
async def flaky_async():
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("fail")
return "success"
result = await flaky_async()
assert result == "success"
assert call_count == 3
def test_decorator_backoff_string(self):
"""Test decorator accepts backoff as string."""
@retry(max_attempts=3, backoff="linear")
def func():
return "ok"
result = func()
assert result == "ok"
def test_decorator_jitter_string(self):
"""Test decorator accepts jitter as string."""
@retry(max_attempts=3, jitter="full")
def func():
return "ok"
result = func()
assert result == "ok"
def test_decorator_retry_exceptions(self):
"""Test decorator with retry_exceptions."""
call_count = 0
@retry(max_attempts=3, retry_exceptions={ConnectionError})
def func():
nonlocal call_count
call_count += 1
if call_count == 1:
raise ConnectionError()
if call_count == 2:
raise ValueError() # Should not retry this
return "ok"
# Should raise ValueError immediately (not in retry_exceptions)
with pytest.raises(ValueError):
func()
assert call_count == 2
def test_decorator_ignore_exceptions(self):
"""Test decorator with ignore_exceptions."""
call_count = 0
@retry(max_attempts=5, ignore_exceptions={ValueError})
def func():
nonlocal call_count
call_count += 1
raise ValueError("ignore me")
with pytest.raises(ValueError):
func()
assert call_count == 1 # No retries
def test_decorator_preserves_function_name(self):
"""Test decorator preserves function metadata."""
@retry(max_attempts=3)
def my_function():
"""My docstring."""
return "ok"
assert my_function.__name__ == "my_function"
assert "My docstring" in my_function.__doc__
def test_decorator_with_on_retry(self):
"""Test decorator with on_retry callback."""
retry_calls = []
@retry(
max_attempts=3,
base_delay=0.01,
jitter="none",
on_retry=lambda a, d, e: retry_calls.append(a),
)
def flaky():
if len(retry_calls) < 2:
raise Exception("fail")
return "ok"
flaky()
assert len(retry_calls) == 2
def test_decorator_max_retries_exceeded(self):
"""Test decorator raises MaxRetriesExceeded."""
@retry(max_attempts=2, base_delay=0.01, jitter="none")
def always_fails():
raise Exception("always fails")
with pytest.raises(MaxRetriesExceeded):
always_fails()
=============================================================================
7. Integration Tests (RetryWithCircuitBreaker)
=============================================================================
class TestRetryWithCircuitBreaker: """Tests for RetryWithCircuitBreaker integration."""
def test_init_without_circuit_breaker(self):
"""Test initialization without circuit breaker."""
resilient = RetryWithCircuitBreaker(
retry_config=RetryConfig(max_attempts=3)
)
assert resilient.circuit_breaker is None
def test_execute_sync_without_circuit_breaker(self):
"""Test sync execution without circuit breaker."""
config = RetryConfig(max_attempts=3, base_delay=0.01)
resilient = RetryWithCircuitBreaker(retry_config=config)
result = resilient.execute_sync(lambda: "success")
assert result == "success"
@pytest.mark.asyncio
async def test_execute_async_without_circuit_breaker(self):
"""Test async execution without circuit breaker."""
config = RetryConfig(max_attempts=3, base_delay=0.01)
resilient = RetryWithCircuitBreaker(retry_config=config)
async def async_func():
return "async success"
result = await resilient.execute(async_func)
assert result == "async success"
def test_execute_sync_with_mock_circuit_breaker(self):
"""Test sync execution with mock circuit breaker."""
mock_breaker = Mock()
mock_breaker.call_sync = Mock(side_effect=lambda f: f())
config = RetryConfig(max_attempts=3, base_delay=0.01)
resilient = RetryWithCircuitBreaker(
retry_config=config,
circuit_breaker=mock_breaker,
)
result = resilient.execute_sync(lambda: "protected")
assert result == "protected"
mock_breaker.call_sync.assert_called_once()
@pytest.mark.asyncio
async def test_execute_async_with_mock_circuit_breaker(self):
"""Test async execution with mock circuit breaker."""
mock_breaker = Mock()
async def mock_call(f):
return await f()
mock_breaker.call = mock_call
config = RetryConfig(max_attempts=3, base_delay=0.01)
resilient = RetryWithCircuitBreaker(
retry_config=config,
circuit_breaker=mock_breaker,
)
async def async_func():
return "async protected"
result = await resilient.execute(async_func)
assert result == "async protected"
def test_execute_sync_fallback_without_call_sync(self):
"""Test sync execution falls back when circuit breaker lacks call_sync."""
mock_breaker = Mock(spec=[]) # No methods
config = RetryConfig(max_attempts=3, base_delay=0.01)
resilient = RetryWithCircuitBreaker(
retry_config=config,
circuit_breaker=mock_breaker,
)
result = resilient.execute_sync(lambda: "fallback")
assert result == "fallback"
=============================================================================
8. Convenience Function Tests
=============================================================================
class TestConvenienceFunctions: """Tests for convenience functions."""
def test_create_retry_engine(self):
"""Test create_retry_engine function."""
engine = create_retry_engine(
max_attempts=5,
base_delay=0.5,
backoff=BackoffStrategy.LINEAR,
)
assert engine.config.max_attempts == 5
assert engine.config.base_delay == 0.5
assert engine.config.backoff_strategy == BackoffStrategy.LINEAR
def test_exponential_backoff_factory(self):
"""Test exponential_backoff factory function."""
engine = exponential_backoff(
max_attempts=5,
base_delay=0.5,
max_delay=30.0,
multiplier=3.0,
)
assert engine.config.max_attempts == 5
assert engine.config.base_delay == 0.5
assert engine.config.max_delay == 30.0
assert engine.config.multiplier == 3.0
assert engine.config.backoff_strategy == BackoffStrategy.EXPONENTIAL
def test_linear_backoff_factory(self):
"""Test linear_backoff factory function."""
engine = linear_backoff(
max_attempts=3,
base_delay=1.0,
max_delay=10.0,
increment=3.0,
)
assert engine.config.max_attempts == 3
assert engine.config.base_delay == 1.0
assert engine.config.increment == 3.0
assert engine.config.backoff_strategy == BackoffStrategy.LINEAR
def test_constant_backoff_factory(self):
"""Test constant_backoff factory function."""
engine = constant_backoff(max_attempts=5, delay=2.0)
assert engine.config.max_attempts == 5
assert engine.config.base_delay == 2.0
assert engine.config.backoff_strategy == BackoffStrategy.FIXED
assert engine.config.jitter == JitterType.NONE
@pytest.mark.asyncio
async def test_retry_async_convenience(self):
"""Test retry_async convenience function."""
async def async_func():
return "async result"
result = await retry_async(async_func, max_attempts=3, base_delay=0.01)
assert result == "async result"
def test_retry_sync_convenience(self):
"""Test retry_sync convenience function."""
result = retry_sync(lambda: "sync result", max_attempts=3, base_delay=0.01)
assert result == "sync result"
=============================================================================
9. Predefined Config Tests
=============================================================================
class TestPredefinedConfigs: """Tests for predefined retry configurations."""
def test_aggressive_config(self):
"""Test aggressive configuration."""
config = RETRY_CONFIGS["aggressive"]
assert config.max_attempts == 10
assert config.base_delay == 0.1
assert config.multiplier == 1.5
assert config.jitter == JitterType.FULL
def test_standard_config(self):
"""Test standard configuration."""
config = RETRY_CONFIGS["standard"]
assert config.max_attempts == 5
assert config.base_delay == 1.0
assert config.multiplier == 2.0
assert config.jitter == JitterType.EQUAL
def test_conservative_config(self):
"""Test conservative configuration."""
config = RETRY_CONFIGS["conservative"]
assert config.max_attempts == 3
assert config.base_delay == 2.0
assert config.multiplier == 3.0
def test_quick_config(self):
"""Test quick configuration."""
config = RETRY_CONFIGS["quick"]
assert config.max_attempts == 3
assert config.base_delay == 0.1
assert config.max_delay == 1.0
def test_database_config(self):
"""Test database configuration."""
config = RETRY_CONFIGS["database"]
assert config.max_attempts == 5
assert config.retry_exceptions == {ConnectionError, TimeoutError}
def test_api_config(self):
"""Test API configuration."""
config = RETRY_CONFIGS["api"]
assert config.max_attempts == 3
assert config.total_timeout == 30.0
def test_get_retry_config(self):
"""Test get_retry_config function."""
config = get_retry_config("standard")
assert config == RETRY_CONFIGS["standard"]
def test_get_retry_config_unknown(self):
"""Test get_retry_config raises for unknown config."""
with pytest.raises(ValueError, match="Unknown config"):
get_retry_config("nonexistent")
def test_all_predefined_configs_valid(self):
"""Test all predefined configs are valid."""
for name, config in RETRY_CONFIGS.items():
assert config.max_attempts >= 1
assert config.base_delay >= 0
assert config.max_delay >= config.base_delay
=============================================================================
10. Edge Case Tests
=============================================================================
class TestEdgeCases: """Tests for edge cases and boundary conditions."""
def test_single_attempt(self):
"""Test with max_attempts=1."""
config = RetryConfig(max_attempts=1)
engine = RetryEngine(config)
with pytest.raises(MaxRetriesExceeded):
engine.execute_sync(lambda: 1/0)
def test_zero_base_delay(self):
"""Test with zero base delay."""
config = RetryConfig(base_delay=0.0, max_delay=0.0, jitter=JitterType.NONE)
engine = RetryEngine(config)
call_count = 0
def flaky():
nonlocal call_count
call_count += 1
if call_count < 3:
raise Exception("fail")
return "ok"
result = engine.execute_sync(flaky)
assert result == "ok"
def test_very_large_delay_capped(self):
"""Test very large delays are capped at max_delay."""
config = RetryConfig(
max_attempts=10,
base_delay=1.0,
max_delay=5.0,
multiplier=100.0, # Aggressive multiplier will exceed max
backoff_strategy=BackoffStrategy.EXPONENTIAL,
jitter=JitterType.NONE,
)
policy = RetryPolicy(config)
# After a few attempts, delay should be capped at max_delay
# Attempt 2: 1 * 100^2 = 10000, but capped at 5.0
delay = policy.calculate_delay(2)
assert delay == 5.0 # Capped at max_delay
def test_callback_exception_ignored(self):
"""Test callback exceptions don't break execution."""
def bad_callback(attempt, result):
raise Exception("callback error")
config = RetryConfig(on_success=bad_callback)
engine = RetryEngine(config)
# Should still succeed despite callback error
result = engine.execute_sync(lambda: "ok")
assert result == "ok"
def test_exception_message_preserved(self):
"""Test original exception message is preserved."""
config = RetryConfig(max_attempts=1)
engine = RetryEngine(config)
with pytest.raises(MaxRetriesExceeded) as exc_info:
engine.execute_sync(lambda: (_ for _ in ()).throw(ValueError("original message")))
# Note: The single-attempt case won't retry
# Need to test differently
pass
def test_none_config_uses_defaults(self):
"""Test None config uses defaults."""
engine = RetryEngine(None)
assert engine.config.max_attempts == 3
assert engine.config.base_delay == 1.0
def test_fibonacci_large_numbers(self):
"""Test fibonacci doesn't overflow for large attempts."""
config = RetryConfig(
max_attempts=20,
base_delay=0.001,
max_delay=1000.0,
backoff_strategy=BackoffStrategy.FIBONACCI,
jitter=JitterType.NONE,
)
policy = RetryPolicy(config)
# Should not raise overflow error
delay = policy.calculate_delay(15)
assert delay > 0
def test_decorrelated_randomness(self):
"""Test decorrelated backoff produces varied results."""
config = RetryConfig(
base_delay=1.0,
max_delay=100.0,
backoff_strategy=BackoffStrategy.DECORRELATED,
)
# Run multiple times and check for variation
all_delays = []
for _ in range(10):
policy = RetryPolicy(config)
delays = [policy.calculate_delay(i) for i in range(5)]
all_delays.extend(delays)
# Should have significant variation
assert max(all_delays) > min(all_delays) * 1.5
@pytest.mark.asyncio
async def test_concurrent_executions(self):
"""Test multiple concurrent async executions."""
config = RetryConfig(max_attempts=3, base_delay=0.01, jitter=JitterType.NONE)
engine = RetryEngine(config)
async def delayed_success(value):
await asyncio.sleep(0.01)
return value
# Run multiple concurrent executions
results = await asyncio.gather(
engine.execute(delayed_success, 1),
engine.execute(delayed_success, 2),
engine.execute(delayed_success, 3),
)
assert results == [1, 2, 3]
def test_retry_timeout_with_fast_failures(self):
"""Test total timeout triggers even with fast failures."""
config = RetryConfig(
max_attempts=100,
base_delay=0.01,
total_timeout=0.1, # Very short timeout
jitter=JitterType.NONE,
)
engine = RetryEngine(config)
with pytest.raises((RetryTimeout, MaxRetriesExceeded)):
engine.execute_sync(lambda: 1/0)
def test_empty_retry_exceptions_set(self):
"""Test empty retry_exceptions retries nothing."""
config = RetryConfig(
max_attempts=5,
retry_exceptions=set(), # Empty set
)
policy = RetryPolicy(config)
# With empty retry_exceptions, nothing should be retried
decision = policy.should_retry(0, Exception("test"), 0.0)
assert decision == RetryDecision.RAISE
=============================================================================
Run tests
=============================================================================
if name == "main": pytest.main([file, "-v", "--tb=short"])