下面列出了org.springframework.boot.actuate.endpoint.annotation.ReadOperation#net.jodah.failsafe.CircuitBreaker 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public <T> T failsafe(Callable<T> callable, Long operationId, String operationPath) {
CircuitBreakerHolder circuitBreakerHolder = getCircuitHolder(operationId, circuits);
CircuitBreaker circuitBreaker = circuitBreakerHolder.getCircuitBreaker();
if (circuitBreaker.isOpen()) {
return Failsafe.with(circuitBreaker)
.withFallback(() -> {
String body = logAndCreateBody("CircuitBreaker ENABLED | Operation: {0}, Exception: {1}",
operationPath,
circuitBreakerHolder.getMessage());
RequestContext context = RequestContext.getCurrentContext();
context.setSendZuulResponse(false);
context.setResponseStatusCode(HttpStatus.SERVICE_UNAVAILABLE.value());
context.setResponseBody(body);
context.addZuulResponseHeader(ConstantsContext.CIRCUIT_BREAKER_ENABLED, "enabled");
context.getResponse().setContentType(MediaType.APPLICATION_JSON_VALUE);
})
.get(callable);
}
return Failsafe.with(circuitBreaker)
.onFailure((ignored, throwable) -> circuitBreakerHolder.setThrowable(throwable))
.get(callable);
}
public <T> T failsafe(Callable<T> callable, String url) {
CircuitBreakerHolder circuitBreakerHolder = getCircuitHolder(url, middlewareCircuits);
CircuitBreaker circuitBreaker = circuitBreakerHolder.getCircuitBreaker();
if (circuitBreaker.isOpen()) {
return Failsafe.with(circuitBreaker)
.withFallback(() -> {
String body = logAndCreateBody("CircuitBreaker ENABLED | URL: {0}, Exception: {1}",
url,
circuitBreakerHolder.getMessage());
return ResponseEntity
.status(HttpStatus.SERVICE_UNAVAILABLE.value())
.header(ConstantsContext.CIRCUIT_BREAKER_ENABLED, "enabled")
.body(body);
}).get(callable);
}
return Failsafe.with(circuitBreaker)
.onFailure((ignored, throwable) -> circuitBreakerHolder.setThrowable(throwable))
.get(callable);
}
private <T> CircuitBreakerHolder getCircuitHolder(T key, ConcurrentHashMap<T, CircuitBreakerHolder> concurrentHashMap) {
CircuitBreakerHolder breakerHolder = concurrentHashMap.get(key);
if (Objects.isNull(breakerHolder)) {
breakerHolder = new CircuitBreakerHolder();
breakerHolder.setCircuitBreaker(new CircuitBreaker()
.withFailureThreshold(property.getFailsafe().getFailureNumber())
.withSuccessThreshold(property.getFailsafe().getSuccessNumber())
.withDelay(property.getFailsafe().getDelayTimeSeconds(), TimeUnit.SECONDS));
concurrentHashMap.put(key, breakerHolder);
}
return breakerHolder;
}
public static CircuitBreaker<ClientHttpResponse> createCircuitBreaker(
final Client client,
final CircuitBreakerListener listener) {
final CircuitBreaker<ClientHttpResponse> breaker = new CircuitBreaker<>();
Optional.ofNullable(client.getCircuitBreaker().getFailureThreshold())
.ifPresent(threshold -> threshold.applyTo(breaker::withFailureThreshold));
Optional.ofNullable(client.getCircuitBreaker().getDelay())
.ifPresent(delay -> delay.applyTo(breaker::withDelay));
Optional.ofNullable(client.getCircuitBreaker().getSuccessThreshold())
.ifPresent(threshold -> threshold.applyTo(breaker::withSuccessThreshold));
breaker.withDelay(delayFunction());
breaker.onOpen(listener::onOpen);
breaker.onHalfOpen(listener::onHalfOpen);
breaker.onClose(listener::onClose);
return breaker;
}
/**
* More alarming async case where the Future is not even completed
* since Failsafe does not recover from the {@link NullPointerException} thrown by the predicate.
*/
public void asyncShouldCompleteTheFuture() throws Throwable {
CircuitBreaker<String> circuitBreaker = new CircuitBreaker<String>().handleResultIf(handleIfEqualsIgnoreCaseFoo);
FailsafeExecutor<String> failsafe = Failsafe.with(circuitBreaker).with(Executors.newSingleThreadScheduledExecutor());
Waiter waiter = new Waiter();
failsafe
.getStageAsync(() -> {
CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(new IOException("let's blame it on network error"));
return future;
})
.whenComplete((s, t) -> waiter.resume()); // Never invoked!
waiter.await(1000);
}
/**
* Asserts that the the circuit is opened after the failure ratio is met.
*/
public void testFailureWithFailureRatio() {
// Given
CircuitBreaker breaker = new CircuitBreaker().withFailureThreshold(2, 3);
breaker.close();
ClosedState state = new ClosedState(breaker, getInternals(breaker));
// When
state.recordFailure(null);
state.recordSuccess();
assertTrue(breaker.isClosed());
state.recordFailure(null);
// Then
assertTrue(breaker.isOpen());
}
/**
* Asserts that the the circuit is opened after the failure threshold is met.
*/
public void testFailureWithFailureThreshold() {
// Given
CircuitBreaker breaker = new CircuitBreaker().withFailureThreshold(3);
breaker.close();
ClosedState state = new ClosedState(breaker, getInternals(breaker));
// When
state.recordFailure(null);
state.recordSuccess();
state.recordFailure(null);
state.recordFailure(null);
assertTrue(breaker.isClosed());
state.recordFailure(null);
// Then
assertTrue(breaker.isOpen());
}
@ReadOperation
@Nullable
public CircuitBreakerView circuitBreaker(@Selector final String name) {
final CircuitBreaker breaker = breakers.get(name);
if (breaker == null) {
return null;
}
return toView(breaker);
}
@WriteOperation
@Nullable
public CircuitBreakerView transitionTo(@Selector final String name, final CircuitBreaker.State state) {
final CircuitBreaker breaker = breakers.get(name);
if (breaker == null) {
return null;
}
transitioner(state).accept(breaker);
return toView(breaker);
}
private Consumer<CircuitBreaker> transitioner(final CircuitBreaker.State state) {
switch (state) {
case CLOSED:
return CircuitBreaker::close;
case OPEN:
return CircuitBreaker::open;
case HALF_OPEN:
return CircuitBreaker::halfOpen;
default:
throw new UnsupportedOperationException("Unknown state: " + state);
}
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnEnabledEndpoint
public CircuitBreakersEndpoint circuitBreakersEndpoint(
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired(required = false) @Nullable final Map<String, CircuitBreaker> breakers) {
return new CircuitBreakersEndpoint(breakers == null ? Collections.emptyMap() : breakers);
}
private void verifyTransitionTo(final CircuitBreaker.State state) {
final CircuitBreakerView written = write("test", state);
final CircuitBreakerView read = readOne("test");
assertEquals(read, written);
assertEquals(read.getState(), state);
}
public static Plugin createCircuitBreakerPlugin(
final CircuitBreaker<ClientHttpResponse> breaker,
final TaskDecorator decorator) {
return new FailsafePlugin()
.withPolicy(breaker)
.withDecorator(decorator);
}
private Optional<String> registerCircuitBreakerFailsafePlugin(final String id, final Client client) {
if (client.getCircuitBreaker().getEnabled()) {
final String pluginId = registry.registerIfAbsent(name(id, CircuitBreaker.class, FailsafePlugin.class),
() -> {
log.debug("Client [{}]: Registering [CircuitBreakerFailsafePlugin]", id);
return genericBeanDefinition(FailsafePluginFactory.class)
.setFactoryMethod("createCircuitBreakerPlugin")
.addConstructorArgValue(registerCircuitBreaker(id, client))
.addConstructorArgValue(createTaskDecorator(id, client));
});
return Optional.of(pluginId);
}
return Optional.empty();
}
private BeanMetadataElement registerCircuitBreaker(final String id, final Client client) {
return ref(registry.registerIfAbsent(id, CircuitBreaker.class, () ->
genericBeanDefinition(FailsafePluginFactory.class)
.setFactoryMethod("createCircuitBreaker")
.addConstructorArgValue(client)
.addConstructorArgReference(registerCircuitBreakerListener(id, client))));
}
static CircuitStats create(CircuitBreaker breaker, int capacity, boolean supportsTimeBased, CircuitStats oldStats) {
if (supportsTimeBased && breaker.getFailureThresholdingPeriod() != null)
return new TimedCircuitStats(TimedCircuitStats.DEFAULT_BUCKET_COUNT, breaker.getFailureThresholdingPeriod(),
new Clock(), oldStats);
else if (capacity > 1) {
return new CountingCircuitStats(capacity, oldStats);
} else {
return new DefaultCircuitStats();
}
}
/**
* Returns the capacity of the breaker in the closed state.
*/
private static int capacityFor(CircuitBreaker<?> breaker) {
if (breaker.getFailureExecutionThreshold() != 0)
return breaker.getFailureExecutionThreshold();
else
return breaker.getFailureThresholdingCapacity();
}
/**
* Returns the capacity of the breaker in the half-open state.
*/
private static int capacityFor(CircuitBreaker<?> breaker) {
int capacity = breaker.getSuccessThresholdingCapacity();
if (capacity == 0)
capacity = breaker.getFailureExecutionThreshold();
if (capacity == 0)
capacity = breaker.getFailureThresholdingCapacity();
return capacity;
}
public void testRetryPolicyAndCircuitBreaker() {
RetryPolicy<Object> rp = new RetryPolicy<>().withMaxRetries(2);
CircuitBreaker<Object> cb = new CircuitBreaker<>().withFailureThreshold(5);
Execution execution = new Execution(rp, cb);
execution.recordFailure(new Exception());
execution.recordFailure(new Exception());
assertFalse(execution.isComplete());
execution.recordFailure(new Exception());
assertTrue(execution.isComplete());
assertTrue(cb.isClosed());
}
public void testCircuitBreakerAndRetryPolicy() {
RetryPolicy rp = new RetryPolicy().withMaxRetries(1);
CircuitBreaker cb = new CircuitBreaker().withFailureThreshold(5);
Execution execution = new Execution(cb, rp);
execution.recordFailure(new Exception());
assertFalse(execution.isComplete());
execution.recordFailure(new Exception());
assertTrue(execution.isComplete());
assertTrue(cb.isClosed());
}
/**
* Simple synchronous case throwing a {@link NullPointerException}
* instead of the expected {@link FailsafeException}.
*/
@Test(expectedExceptions = FailsafeException.class)
public void syncShouldThrowTheUnderlyingIOException() {
CircuitBreaker<String> circuitBreaker = new CircuitBreaker<String>().handleResultIf(handleIfEqualsIgnoreCaseFoo);
FailsafeExecutor<String> failsafe = Failsafe.with(circuitBreaker);
// I expect this getAsync() to throw IOException, not NPE.
failsafe.get(() -> {
throw new IOException("let's blame it on network error");
});
}
@Test
public void testThatFailSafeIsBrokenWithFallback() throws Exception {
CircuitBreaker<Integer> breaker = new CircuitBreaker<Integer>().withFailureThreshold(10, 100).withSuccessThreshold(2).withDelay(
Duration.ofMillis(100));
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
int result = Failsafe.with(Fallback.of(e -> 999), breaker)
.with(service)
.getStageAsync(() -> CompletableFuture.completedFuture(223))
.get();
Assert.assertEquals(result, 223);
}
public void testAllowsExecution() throws Throwable {
// Given
CircuitBreaker breaker = new CircuitBreaker().withDelay(Duration.ofMillis(100));
breaker.open();
OpenState state = new OpenState(breaker, new ClosedState(breaker, Testing.getInternals(breaker)), breaker.getDelay());
assertTrue(breaker.isOpen());
assertFalse(state.allowsExecution());
// When
Thread.sleep(110);
// Then
assertTrue(state.allowsExecution());
assertEquals(breaker.getState(), State.HALF_OPEN);
}
public void testRemainingDelay() throws Throwable {
// Given
CircuitBreaker breaker = new CircuitBreaker().withDelay(Duration.ofSeconds(1));
OpenState state = new OpenState(breaker, new ClosedState(breaker, Testing.getInternals(breaker)), breaker.getDelay());
// When / Then
long remainingDelayMillis = state.getRemainingDelay().toMillis();
assertTrue(remainingDelayMillis < 1000);
assertTrue(remainingDelayMillis > 0);
Thread.sleep(110);
remainingDelayMillis = state.getRemainingDelay().toMillis();
assertTrue(remainingDelayMillis < 900);
assertTrue(remainingDelayMillis > 0);
}
public void testNoRemainingDelay() throws Throwable {
// Given
CircuitBreaker breaker = new CircuitBreaker().withDelay(Duration.ofMillis(10));
assertEquals(breaker.getRemainingDelay(), Duration.ZERO);
// When
OpenState state = new OpenState(breaker, new ClosedState(breaker, Testing.getInternals(breaker)), breaker.getDelay());
Thread.sleep(50);
// Then
assertEquals(state.getRemainingDelay().toMillis(), 0);
}
@BeforeMethod
protected void beforeMethod() {
breaker = new CircuitBreaker<>().onOpen(() -> System.out.println("Opening"));
// .onHalfOpen(() -> System.out.println("Half-opening"))
// .onClose(() -> System.out.println("Closing"))
// .onSuccess(e -> System.out.println("Success"))
// .onFailure(e -> System.out.println("Failure"));
}
/**
* Asserts that the late configuration of a failure ratio is handled by resetting the state's internal tracking. Also
* asserts that executions from prior configurations are carried over to a new configuration.
*/
public void shouldHandleLateSetFailureRatio() {
// Given
breaker.halfOpen();
HalfOpenState state = Testing.stateFor(breaker);
// When
breaker.withFailureThreshold(2);
state.recordFailure(null);
assertTrue(breaker.isHalfOpen());
state.recordFailure(null);
// Then
assertTrue(breaker.isOpen());
// Given
breaker = new CircuitBreaker().withFailureThreshold(3);
breaker.halfOpen();
state = Testing.stateFor(breaker);
// When
state.recordFailure(null);
state.recordFailure(null);
breaker.withFailureThreshold(3, 5);
state.recordSuccess();
state.recordSuccess();
assertTrue(breaker.isHalfOpen());
state.recordFailure(null);
// Then
assertTrue(breaker.isOpen());
}
/**
* Asserts that the late configuration of a success ratio is handled by resetting the state's internal tracking. Also
* asserts that executions from prior configurations are carried over to a new configuration.
*/
public void shouldHandleLateSetSucessRatio() {
// Given
breaker.halfOpen();
HalfOpenState state = Testing.stateFor(breaker);
// When
breaker.withSuccessThreshold(2);
state.recordSuccess();
assertTrue(breaker.isHalfOpen());
state.recordSuccess();
// Then
assertTrue(breaker.isClosed());
// Given
breaker = new CircuitBreaker().withFailureThreshold(3);
breaker.halfOpen();
state = Testing.stateFor(breaker);
// When
state.recordFailure(null);
state.recordFailure(null);
breaker.withSuccessThreshold(2, 4);
state.recordSuccess();
assertTrue(breaker.isHalfOpen());
state.recordSuccess();
// Then
assertTrue(breaker.isClosed());
}
/**
* Asserts that the the circuit is opened after a single failure.
*/
public void testFailureWithDefaultConfig() {
// Given
CircuitBreaker breaker = new CircuitBreaker();
breaker.close();
ClosedState state = new ClosedState(breaker, getInternals(breaker));
assertFalse(breaker.isOpen());
// When
state.recordFailure(null);
// Then
assertTrue(breaker.isOpen());
}
/**
* Asserts that the the circuit is still closed after a single success.
*/
public void testSuccessWithDefaultConfig() {
// Given
CircuitBreaker breaker = new CircuitBreaker();
breaker.close();
ClosedState state = new ClosedState(breaker, getInternals(breaker));
assertTrue(breaker.isClosed());
// When
state.recordSuccess();
// Then
assertTrue(breaker.isClosed());
}