#!/usr/bin/env python3 """ Unit tests for CODITECT Circuit Breaker
Tests cover:
- CircuitBreaker state machine (CLOSED → OPEN → HALF_OPEN → CLOSED)
- Failure counting and threshold transitions
- Recovery timeout behavior
- CircuitBreakerRegistry multi-breaker management
- AgentCircuitBreaker with fallback routing
- Decorator functionality
- Metrics collection
- Health check integration
- Edge cases and error handling
Part of Track H.2.4: Circuit Breaker Pattern Implementation Author: CODITECT Framework Created: January 8, 2026 """
import asyncio import pytest import time from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch
Import circuit breaker components
import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(file))))
from scripts.core.circuit_breaker import ( CircuitBreaker, CircuitBreakerConfig, CircuitBreakerRegistry, AgentCircuitBreaker, CircuitBreakerHealthCheck, CircuitState, CircuitOpenError, CircuitBreakerConfigError, CircuitMetrics, FailureType, FailureRecord, circuit_breaker, create_circuit_breaker, get_global_registry, with_circuit_breaker, )
=============================================================================
Fixtures
=============================================================================
@pytest.fixture def config(): """Create a test configuration with fast timeouts.""" return CircuitBreakerConfig( fail_max=3, fail_window_seconds=10.0, recovery_timeout=1.0, # Fast for testing half_open_max_calls=2, success_threshold=2, call_timeout=5.0, )
@pytest.fixture def breaker(config): """Create a circuit breaker with test config.""" return CircuitBreaker("test-breaker", config)
@pytest.fixture def registry(config): """Create a registry with test config.""" return CircuitBreakerRegistry(config)
=============================================================================
Test CircuitBreakerConfig
=============================================================================
class TestCircuitBreakerConfig: """Tests for CircuitBreakerConfig."""
def test_default_config(self):
"""Test default configuration values."""
config = CircuitBreakerConfig()
assert config.fail_max == 5
assert config.recovery_timeout == 60.0
assert config.half_open_max_calls == 3
assert config.success_threshold == 2
assert config.call_timeout == 30.0
def test_custom_config(self):
"""Test custom configuration values."""
config = CircuitBreakerConfig(
fail_max=10,
recovery_timeout=120.0,
half_open_max_calls=5,
success_threshold=3,
)
assert config.fail_max == 10
assert config.recovery_timeout == 120.0
assert config.half_open_max_calls == 5
assert config.success_threshold == 3
def test_config_validation_fail_max(self):
"""Test that fail_max must be at least 1."""
with pytest.raises(CircuitBreakerConfigError):
CircuitBreakerConfig(fail_max=0)
def test_config_validation_recovery_timeout(self):
"""Test that recovery_timeout must be non-negative."""
with pytest.raises(CircuitBreakerConfigError):
CircuitBreakerConfig(recovery_timeout=-1)
def test_config_validation_half_open_max_calls(self):
"""Test that half_open_max_calls must be at least 1."""
with pytest.raises(CircuitBreakerConfigError):
CircuitBreakerConfig(half_open_max_calls=0)
def test_config_from_env(self):
"""Test configuration from environment variables."""
with patch.dict(os.environ, {
"CIRCUIT_BREAKER_FAIL_MAX": "10",
"CIRCUIT_BREAKER_RECOVERY_TIMEOUT": "120",
}):
config = CircuitBreakerConfig.from_env()
assert config.fail_max == 10
assert config.recovery_timeout == 120.0
=============================================================================
Test CircuitBreaker State Machine
=============================================================================
class TestCircuitBreakerStateMachine: """Tests for circuit breaker state transitions."""
def test_initial_state_is_closed(self, breaker):
"""Test that circuit starts in CLOSED state."""
assert breaker.state == CircuitState.CLOSED
assert breaker.is_closed
assert not breaker.is_open
assert not breaker.is_half_open
@pytest.mark.asyncio
async def test_transition_closed_to_open(self, breaker, config):
"""Test transition from CLOSED to OPEN after failures."""
async def failing_func():
raise Exception("Test failure")
# Fail enough times to open the circuit
for i in range(config.fail_max):
with pytest.raises(Exception):
await breaker.call(failing_func)
assert breaker.state == CircuitState.OPEN
assert breaker.is_open
@pytest.mark.asyncio
async def test_transition_open_to_half_open(self, breaker, config):
"""Test transition from OPEN to HALF_OPEN after recovery timeout."""
async def failing_func():
raise Exception("Test failure")
# Open the circuit
for i in range(config.fail_max):
with pytest.raises(Exception):
await breaker.call(failing_func)
assert breaker.is_open
# Wait for recovery timeout
await asyncio.sleep(config.recovery_timeout + 0.1)
# Check state - accessing state triggers transition
assert breaker.state == CircuitState.HALF_OPEN
assert breaker.is_half_open
@pytest.mark.asyncio
async def test_transition_half_open_to_closed(self, breaker, config):
"""Test transition from HALF_OPEN to CLOSED after successes."""
async def failing_func():
raise Exception("Test failure")
async def success_func():
return "success"
# Open the circuit
for i in range(config.fail_max):
with pytest.raises(Exception):
await breaker.call(failing_func)
# Wait for half-open
await asyncio.sleep(config.recovery_timeout + 0.1)
assert breaker.is_half_open
# Succeed enough times to close
for i in range(config.success_threshold):
result = await breaker.call(success_func)
assert result == "success"
assert breaker.state == CircuitState.CLOSED
assert breaker.is_closed
@pytest.mark.asyncio
async def test_transition_half_open_to_open(self, breaker, config):
"""Test transition from HALF_OPEN back to OPEN after failure."""
async def failing_func():
raise Exception("Test failure")
# Open the circuit
for i in range(config.fail_max):
with pytest.raises(Exception):
await breaker.call(failing_func)
# Wait for half-open
await asyncio.sleep(config.recovery_timeout + 0.1)
assert breaker.is_half_open
# Fail again - should go back to OPEN
with pytest.raises(Exception):
await breaker.call(failing_func)
assert breaker.state == CircuitState.OPEN
assert breaker.is_open
=============================================================================
Test CircuitBreaker Call Handling
=============================================================================
class TestCircuitBreakerCalls: """Tests for circuit breaker call handling."""
@pytest.mark.asyncio
async def test_successful_call(self, breaker):
"""Test that successful calls pass through."""
async def success_func():
return "result"
result = await breaker.call(success_func)
assert result == "result"
@pytest.mark.asyncio
async def test_successful_call_with_args(self, breaker):
"""Test that arguments are passed correctly."""
async def func_with_args(a, b, c=None):
return f"{a}-{b}-{c}"
result = await breaker.call(func_with_args, 1, 2, c=3)
assert result == "1-2-3"
@pytest.mark.asyncio
async def test_exception_propagation(self, breaker):
"""Test that exceptions are propagated."""
class CustomError(Exception):
pass
async def failing_func():
raise CustomError("custom error")
with pytest.raises(CustomError):
await breaker.call(failing_func)
@pytest.mark.asyncio
async def test_rejection_when_open(self, breaker, config):
"""Test that calls are rejected when circuit is open."""
async def failing_func():
raise Exception("failure")
async def success_func():
return "should not reach"
# Open the circuit
for i in range(config.fail_max):
with pytest.raises(Exception):
await breaker.call(failing_func)
# Should be rejected
with pytest.raises(CircuitOpenError) as exc_info:
await breaker.call(success_func)
assert exc_info.value.breaker_name == "test-breaker"
assert exc_info.value.time_remaining > 0
@pytest.mark.asyncio
async def test_limited_calls_in_half_open(self, breaker, config):
"""Test that half-open state limits calls."""
async def failing_func():
raise Exception("failure")
async def success_func():
return "success"
# Open the circuit
for i in range(config.fail_max):
with pytest.raises(Exception):
await breaker.call(failing_func)
# Wait for half-open
await asyncio.sleep(config.recovery_timeout + 0.1)
assert breaker.is_half_open
# First calls should be allowed (up to half_open_max_calls)
for i in range(config.half_open_max_calls):
result = await breaker.call(success_func)
assert result == "success"
# But if we're still half-open (not enough successes), further calls may be blocked
# Note: In the implementation, each success might close the circuit if threshold met
@pytest.mark.asyncio
async def test_timeout_handling(self, config):
"""Test that timeouts are handled."""
config.call_timeout = 0.1 # Very short timeout
breaker = CircuitBreaker("timeout-test", config)
async def slow_func():
await asyncio.sleep(1.0) # Slower than timeout
return "never reached"
with pytest.raises(asyncio.TimeoutError):
await breaker.call(slow_func)
def test_sync_call(self, breaker):
"""Test synchronous call handling."""
def sync_func():
return "sync result"
result = breaker.call_sync(sync_func)
assert result == "sync result"
def test_sync_call_exception(self, breaker, config):
"""Test synchronous call exception handling."""
def failing_sync():
raise Exception("sync failure")
for i in range(config.fail_max):
with pytest.raises(Exception):
breaker.call_sync(failing_sync)
assert breaker.is_open
=============================================================================
Test Exception Filtering
=============================================================================
class TestExceptionFiltering: """Tests for exception inclusion/exclusion."""
@pytest.mark.asyncio
async def test_excluded_exception_not_counted(self):
"""Test that excluded exceptions don't count as failures."""
class ExcludedError(Exception):
pass
config = CircuitBreakerConfig(
fail_max=2,
exclude_exceptions=[ExcludedError],
)
breaker = CircuitBreaker("exclude-test", config)
async def raise_excluded():
raise ExcludedError("excluded")
# These should NOT count as failures
for i in range(5):
with pytest.raises(ExcludedError):
await breaker.call(raise_excluded)
# Circuit should still be closed
assert breaker.is_closed
@pytest.mark.asyncio
async def test_included_exceptions_only(self):
"""Test that only included exceptions count."""
class IncludedError(Exception):
pass
class NotIncludedError(Exception):
pass
config = CircuitBreakerConfig(
fail_max=2,
include_exceptions=[IncludedError],
)
breaker = CircuitBreaker("include-test", config)
async def raise_not_included():
raise NotIncludedError("not included")
# These should NOT count as failures
for i in range(5):
with pytest.raises(NotIncludedError):
await breaker.call(raise_not_included)
assert breaker.is_closed
=============================================================================
Test CircuitBreaker Control Methods
=============================================================================
class TestCircuitBreakerControl: """Tests for manual circuit control."""
def test_force_open(self, breaker):
"""Test forcing circuit open."""
assert breaker.is_closed
breaker.force_open()
assert breaker.is_open
def test_force_close(self, breaker, config):
"""Test forcing circuit closed."""
async def failing_func():
raise Exception("failure")
# Open normally
asyncio.run(self._open_circuit(breaker, config))
assert breaker.is_open
# Force close
breaker.force_close()
assert breaker.is_closed
async def _open_circuit(self, breaker, config):
async def failing_func():
raise Exception("failure")
for i in range(config.fail_max):
try:
await breaker.call(failing_func)
except:
pass
def test_reset(self, breaker):
"""Test resetting circuit breaker."""
breaker.force_open()
breaker.reset()
assert breaker.is_closed
metrics = breaker.get_metrics()
assert metrics.total_calls == 0
assert metrics.failure_count == 0
=============================================================================
Test Metrics
=============================================================================
class TestCircuitBreakerMetrics: """Tests for metrics collection."""
@pytest.mark.asyncio
async def test_metrics_collection(self, breaker):
"""Test that metrics are collected."""
async def success_func():
return "ok"
async def fail_func():
raise Exception("fail")
# Some successes
for i in range(3):
await breaker.call(success_func)
# Some failures
for i in range(2):
try:
await breaker.call(fail_func)
except:
pass
metrics = breaker.get_metrics()
assert metrics.total_calls == 5
assert metrics.success_count == 3
assert metrics.failure_count == 2
assert metrics.last_success_time is not None
assert metrics.last_failure_time is not None
assert 0.0 <= metrics.failure_rate <= 1.0
def test_metrics_to_dict(self, breaker):
"""Test metrics serialization."""
metrics = breaker.get_metrics()
d = metrics.to_dict()
assert "name" in d
assert "state" in d
assert "failure_count" in d
assert "success_count" in d
assert "failure_rate" in d
=============================================================================
Test CircuitBreakerRegistry
=============================================================================
class TestCircuitBreakerRegistry: """Tests for the circuit breaker registry."""
def test_get_creates_new(self, registry):
"""Test that get creates new breakers."""
breaker1 = registry.get("service-1")
breaker2 = registry.get("service-2")
assert breaker1 is not None
assert breaker2 is not None
assert breaker1.name == "service-1"
assert breaker2.name == "service-2"
def test_get_returns_same(self, registry):
"""Test that get returns same breaker for same name."""
breaker1 = registry.get("service-1")
breaker2 = registry.get("service-1")
assert breaker1 is breaker2
def test_get_all(self, registry):
"""Test getting all breakers."""
registry.get("a")
registry.get("b")
registry.get("c")
all_breakers = registry.get_all()
assert len(all_breakers) == 3
assert "a" in all_breakers
assert "b" in all_breakers
assert "c" in all_breakers
def test_remove(self, registry):
"""Test removing a breaker."""
registry.get("to-remove")
assert "to-remove" in registry.get_all()
result = registry.remove("to-remove")
assert result is True
assert "to-remove" not in registry.get_all()
def test_remove_nonexistent(self, registry):
"""Test removing non-existent breaker."""
result = registry.remove("does-not-exist")
assert result is False
def test_reset_all(self, registry):
"""Test resetting all breakers."""
b1 = registry.get("b1")
b2 = registry.get("b2")
b1.force_open()
b2.force_open()
registry.reset_all()
assert b1.is_closed
assert b2.is_closed
def test_get_open_circuits(self, registry):
"""Test getting list of open circuits."""
b1 = registry.get("healthy")
b2 = registry.get("unhealthy-1")
b3 = registry.get("unhealthy-2")
b2.force_open()
b3.force_open()
open_circuits = registry.get_open_circuits()
assert "healthy" not in open_circuits
assert "unhealthy-1" in open_circuits
assert "unhealthy-2" in open_circuits
def test_get_summary(self, registry):
"""Test getting summary."""
registry.get("closed-1")
registry.get("closed-2")
registry.get("open-1").force_open()
summary = registry.get_summary()
assert summary["total_breakers"] == 3
assert summary["closed"] == 2
assert summary["open"] == 1
assert "open-1" in summary["open_circuits"]
def test_get_all_metrics(self, registry):
"""Test getting metrics for all breakers."""
registry.get("m1")
registry.get("m2")
all_metrics = registry.get_all_metrics()
assert len(all_metrics) == 2
assert all(isinstance(m, CircuitMetrics) for m in all_metrics)
=============================================================================
Test AgentCircuitBreaker
=============================================================================
class TestAgentCircuitBreaker: """Tests for agent-specific circuit breaker."""
def test_get_breaker_for_agent(self, config):
"""Test getting breaker for an agent."""
agent_breaker = AgentCircuitBreaker(config=config)
breaker = agent_breaker.get_breaker("agent-1")
assert breaker is not None
assert "agent:agent-1" in breaker.name
@pytest.mark.asyncio
async def test_call_agent_success(self, config):
"""Test successful agent call."""
agent_breaker = AgentCircuitBreaker(config=config)
async def agent_task():
return "agent result"
result = await agent_breaker.call_agent("test-agent", agent_task)
assert result == "agent result"
@pytest.mark.asyncio
async def test_call_agent_failure(self, config):
"""Test agent call failure."""
agent_breaker = AgentCircuitBreaker(config=config)
async def failing_task():
raise Exception("agent failure")
# Multiple failures should open circuit
for i in range(config.fail_max):
with pytest.raises(Exception):
await agent_breaker.call_agent("failing-agent", failing_task, fallback=False)
# Next call should be rejected
with pytest.raises(CircuitOpenError):
await agent_breaker.call_agent("failing-agent", failing_task, fallback=False)
def test_get_agent_status(self, config):
"""Test getting agent status."""
agent_breaker = AgentCircuitBreaker(config=config)
agent_breaker.get_breaker("status-agent") # Create breaker
status = agent_breaker.get_agent_status("status-agent")
assert status["agent_id"] == "status-agent"
assert status["circuit_state"] == "closed"
def test_force_open_agent(self, config):
"""Test forcing agent circuit open."""
agent_breaker = AgentCircuitBreaker(config=config)
agent_breaker.force_open_agent("force-open-agent")
status = agent_breaker.get_agent_status("force-open-agent")
assert status["circuit_state"] == "open"
def test_reset_agent(self, config):
"""Test resetting agent circuit."""
agent_breaker = AgentCircuitBreaker(config=config)
agent_breaker.force_open_agent("reset-agent")
agent_breaker.reset_agent("reset-agent")
status = agent_breaker.get_agent_status("reset-agent")
assert status["circuit_state"] == "closed"
=============================================================================
Test Decorator
=============================================================================
class TestDecorator: """Tests for the circuit_breaker decorator."""
@pytest.mark.asyncio
async def test_async_decorator(self):
"""Test decorator with async function."""
@circuit_breaker("async-decorated")
async def decorated_func():
return "decorated result"
result = await decorated_func()
assert result == "decorated result"
def test_sync_decorator(self):
"""Test decorator with sync function."""
@circuit_breaker("sync-decorated")
def decorated_sync():
return "sync decorated"
result = decorated_sync()
assert result == "sync decorated"
@pytest.mark.asyncio
async def test_decorator_with_config(self):
"""Test decorator with custom config."""
config = CircuitBreakerConfig(fail_max=2, recovery_timeout=1.0)
@circuit_breaker("custom-config", config=config)
async def custom_func():
raise Exception("always fail")
# Should open after 2 failures
for i in range(2):
with pytest.raises(Exception):
await custom_func()
# Circuit should be open
breaker = get_global_registry().get("custom-config")
assert breaker.is_open
=============================================================================
Test Health Check
=============================================================================
class TestHealthCheck: """Tests for health check integration."""
def test_healthy_when_all_closed(self, registry):
"""Test healthy status when all circuits closed."""
registry.get("healthy-1")
registry.get("healthy-2")
health = CircuitBreakerHealthCheck(registry)
result = health.check()
assert result["status"] == "healthy"
assert "closed" in result["message"].lower()
def test_degraded_when_some_open(self, registry):
"""Test degraded status when some circuits open."""
registry.get("healthy-1")
registry.get("healthy-2")
registry.get("unhealthy-1").force_open()
health = CircuitBreakerHealthCheck(registry)
result = health.check()
# 1/3 open = 33%, below 50% threshold
assert result["status"] == "degraded"
def test_unhealthy_when_many_open(self, registry):
"""Test unhealthy status when many circuits open."""
registry.get("healthy-1")
registry.get("unhealthy-1").force_open()
registry.get("unhealthy-2").force_open()
health = CircuitBreakerHealthCheck(registry)
result = health.check()
# 2/3 open = 67%, above 50% threshold
assert result["status"] == "unhealthy"
def test_healthy_with_no_breakers(self, registry):
"""Test healthy status with no breakers."""
health = CircuitBreakerHealthCheck(registry)
result = health.check()
assert result["status"] == "healthy"
=============================================================================
Test Utility Functions
=============================================================================
class TestUtilityFunctions: """Tests for utility functions."""
def test_create_circuit_breaker(self):
"""Test create_circuit_breaker helper."""
breaker = create_circuit_breaker(
"helper-created",
fail_max=10,
recovery_timeout=120.0,
)
assert breaker.name == "helper-created"
assert breaker.config.fail_max == 10
assert breaker.config.recovery_timeout == 120.0
@pytest.mark.asyncio
async def test_with_circuit_breaker(self):
"""Test with_circuit_breaker helper."""
breaker = create_circuit_breaker("with-helper")
async def func():
return "helper result"
result = await with_circuit_breaker(breaker, func)
assert result == "helper result"
def test_get_global_registry(self):
"""Test global registry access."""
registry1 = get_global_registry()
registry2 = get_global_registry()
assert registry1 is registry2
=============================================================================
Test State Change Callbacks
=============================================================================
class TestStateChangeCallbacks: """Tests for state change callbacks."""
@pytest.mark.asyncio
async def test_on_open_callback(self):
"""Test on_open callback is called."""
callback_called = {"called": False, "breaker": None}
def on_open(breaker):
callback_called["called"] = True
callback_called["breaker"] = breaker
config = CircuitBreakerConfig(
fail_max=2,
on_open=on_open,
)
breaker = CircuitBreaker("callback-test", config)
async def failing():
raise Exception("fail")
for i in range(2):
try:
await breaker.call(failing)
except:
pass
assert callback_called["called"] is True
assert callback_called["breaker"] is breaker
@pytest.mark.asyncio
async def test_on_close_callback(self):
"""Test on_close callback is called."""
callback_called = {"called": False}
def on_close(breaker):
callback_called["called"] = True
config = CircuitBreakerConfig(
fail_max=2,
recovery_timeout=0.1,
success_threshold=1,
on_close=on_close,
)
breaker = CircuitBreaker("close-callback-test", config)
async def failing():
raise Exception("fail")
async def success():
return "ok"
# Open circuit
for i in range(2):
try:
await breaker.call(failing)
except:
pass
# Wait for half-open
await asyncio.sleep(0.2)
# Succeed to close
await breaker.call(success)
assert callback_called["called"] is True
=============================================================================
Test Edge Cases
=============================================================================
class TestEdgeCases: """Tests for edge cases."""
@pytest.mark.asyncio
async def test_rapid_failures(self, config):
"""Test handling of rapid failures."""
breaker = CircuitBreaker("rapid-test", config)
async def failing():
raise Exception("rapid fail")
# Rapid fire failures
tasks = []
for i in range(config.fail_max + 5):
tasks.append(asyncio.create_task(
self._safe_call(breaker, failing)
))
await asyncio.gather(*tasks)
# Circuit should be open
assert breaker.is_open
async def _safe_call(self, breaker, func):
try:
await breaker.call(func)
except:
pass
def test_thread_safety(self, breaker):
"""Test thread-safe operations."""
import threading
errors = []
def thread_func():
try:
for i in range(100):
breaker.get_metrics()
if i % 10 == 0:
breaker.force_open()
if i % 10 == 5:
breaker.force_close()
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=thread_func) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
@pytest.mark.asyncio
async def test_zero_timeout(self):
"""Test with no timeout."""
config = CircuitBreakerConfig(call_timeout=None)
breaker = CircuitBreaker("no-timeout", config)
async def slow_func():
await asyncio.sleep(0.1)
return "completed"
result = await breaker.call(slow_func)
assert result == "completed"
=============================================================================
Test FailureRecord
=============================================================================
class TestFailureRecord: """Tests for FailureRecord dataclass."""
def test_failure_record_creation(self):
"""Test creating a failure record."""
record = FailureRecord(
timestamp=datetime.utcnow(),
failure_type=FailureType.EXCEPTION,
exception_type="ValueError",
message="test error",
)
assert record.failure_type == FailureType.EXCEPTION
assert record.exception_type == "ValueError"
assert record.message == "test error"
def test_failure_types(self):
"""Test all failure types."""
assert FailureType.EXCEPTION.value == "exception"
assert FailureType.TIMEOUT.value == "timeout"
assert FailureType.REJECTION.value == "rejection"
assert FailureType.CUSTOM.value == "custom"
=============================================================================
Integration Tests
=============================================================================
class TestIntegration: """Integration tests combining multiple components."""
@pytest.mark.asyncio
async def test_full_lifecycle(self):
"""Test full circuit breaker lifecycle."""
config = CircuitBreakerConfig(
fail_max=2,
recovery_timeout=0.5,
success_threshold=1,
)
breaker = CircuitBreaker("lifecycle-test", config)
call_count = 0
async def flaky_service():
nonlocal call_count
call_count += 1
if call_count <= 3:
raise Exception(f"Failure #{call_count}")
return f"Success #{call_count}"
# Phase 1: CLOSED -> OPEN (failures)
assert breaker.is_closed
for i in range(2):
with pytest.raises(Exception):
await breaker.call(flaky_service)
assert breaker.is_open
# Phase 2: OPEN - rejection
with pytest.raises(CircuitOpenError):
await breaker.call(flaky_service)
# Phase 3: OPEN -> HALF_OPEN (timeout)
await asyncio.sleep(0.6)
assert breaker.is_half_open
# Phase 4: HALF_OPEN -> OPEN (failure in half-open)
with pytest.raises(Exception):
await breaker.call(flaky_service)
assert breaker.is_open
# Phase 5: Wait again, then succeed
await asyncio.sleep(0.6)
assert breaker.is_half_open
result = await breaker.call(flaky_service)
assert result == "Success #4"
assert breaker.is_closed
@pytest.mark.asyncio
async def test_registry_with_multiple_services(self):
"""Test registry managing multiple services."""
config = CircuitBreakerConfig(fail_max=2, recovery_timeout=0.5)
registry = CircuitBreakerRegistry(config)
# Different services with different health
healthy_breaker = registry.get("healthy-service")
failing_breaker = registry.get("failing-service")
async def success():
return "ok"
async def fail():
raise Exception("fail")
# Use healthy service normally
for i in range(3):
result = await healthy_breaker.call(success)
assert result == "ok"
# Fail the failing service
for i in range(2):
try:
await failing_breaker.call(fail)
except:
pass
# Verify states
assert healthy_breaker.is_closed
assert failing_breaker.is_open
# Summary should reflect this
summary = registry.get_summary()
assert summary["closed"] == 1
assert summary["open"] == 1
assert "failing-service" in summary["open_circuits"]
=============================================================================
Run tests
=============================================================================
if name == "main": pytest.main([file, "-v", "--tb=short"])