下面列出了怎么用org.springframework.util.concurrent.FailureCallback的API类实例代码及写法,或者点击链接到github查看源代码。
@Before
public void beforeMethod() {
failureCallbackMock = mock(FailureCallback.class);
inObj = new Exception("kaboom");
throwExceptionDuringCall = false;
currentSpanStackWhenFailureCallbackWasCalled = new ArrayList<>();
currentMdcInfoWhenFailureCallbackWasCalled = new ArrayList<>();
doAnswer(invocation -> {
currentSpanStackWhenFailureCallbackWasCalled.add(Tracer.getInstance().getCurrentSpanStackCopy());
currentMdcInfoWhenFailureCallbackWasCalled.add(MDC.getCopyOfContextMap());
if (throwExceptionDuringCall)
throw new RuntimeException("kaboom");
return null;
}).when(failureCallbackMock).onFailure(inObj);
resetTracing();
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) {
failureCallback.onFailure(exposedException(this.executionException));
}
else {
successCallback.onSuccess(this.value);
}
}
catch (Throwable ex) {
// Ignore
}
}
@Override
public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback,
FailureCallback failureCallback) {
this.callback.addSuccessCallback(successCallback);
this.callback.addFailureCallback(failureCallback);
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) {
failureCallback.onFailure(exposedException(this.executionException));
}
else {
successCallback.onSuccess(this.value);
}
}
catch (Throwable ex) {
// Ignore
}
}
@Override
public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback,
FailureCallback failureCallback) {
this.callback.addSuccessCallback(successCallback);
this.callback.addFailureCallback(failureCallback);
}
public void send(String topic, String key, String payload){
LOGGER.info("Sending payload='{}' to topic='{}' with key='{}'", payload, topic, key);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key ,payload);
SuccessCallback<SendResult<String,String>> successCallback = sendResult -> {
LOGGER.info("Sent payload='{}' with key='{}' to [email protected]='{}'", payload, key, sendResult.getRecordMetadata().toString());
};
FailureCallback failureCallback = throwable -> {
LOGGER.info("Sending payload='{}' to topic='{}' with key='{}' failed!!!", payload, topic, key);
};
future.addCallback(successCallback, failureCallback);
}
@GetMapping
public Future<?> process() {
AsyncRestTemplate template = new AsyncRestTemplate();
SuccessCallback onSuccess = r -> System.out.println("Success");
FailureCallback onFailure = e -> System.out.println("Failure");
ListenableFuture<?> response = template.getForEntity(
"http://localhost:" + PORT + "/api/v2/resource/b",
ExamplesCollection.class
);
response.addCallback(onSuccess, onFailure);
return response;
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) {
Throwable cause = this.executionException.getCause();
failureCallback.onFailure(cause != null ? cause : this.executionException);
}
else {
successCallback.onSuccess(this.value);
}
}
catch (Throwable ex) {
// Ignore
}
}
/**
* 事件注册
*
* @param eventArg 事件参数
*/
@Override
public void regist(IEventArg eventArg) {
ListenableFuture<SendResult<String, IEventArg>> listenableFuture = kafkaTemplate.send(getTopic(eventArg), eventArg);
EventRegisterResult registerResult = new EventRegisterResult(eventArg);
//发送成功回调
SuccessCallback<SendResult<String, IEventArg>> successCallback = result -> {
if (registerStore == null) return;
registerResult.setSuccess(true);
Map<String, Object> map = new HashMap<>(2);
if (result != null) {
map.put("producerRecord", result.getProducerRecord());
map.put("metadata", result.getRecordMetadata());
}
registerResult.setExtJson(JSON.toJSONString(map));
registerStore.register(registerResult);
};
//发送失败回调
FailureCallback failureCallback = ex -> {
if (registerStore == null) return;
registerResult.setSuccess(false);
registerResult.setMessage(ex.getMessage());
registerStore.register(registerResult);
log.error(ex.getMessage());
};
listenableFuture.addCallback(successCallback, failureCallback);
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
successCallback.onSuccess(this.value);
}
catch (Throwable ex) {
failureCallback.onFailure(ex);
}
}
/**
* Constructor that uses the given trace and MDC information, which will be associated with the thread when the
* given operation is executed.
*
* <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in
* null for the operation).
*
* <p>The trace and/or MDC info can be null and no error will be thrown, however any trace or MDC info that is null
* means the corresponding info will not be available to the thread when the operation is executed.
*/
public FailureCallbackWithTracing(FailureCallback origFailureCallback,
Deque<Span> spanStackForExecution,
Map<String, String> mdcContextMapForExecution) {
if (origFailureCallback == null)
throw new IllegalArgumentException("origFailureCallback cannot be null");
this.origFailureCallback = origFailureCallback;
this.spanStackForExecution = spanStackForExecution;
this.mdcContextMapForExecution = mdcContextMapForExecution;
}
@Before
public void beforeMethod() {
resetTracing();
httpMessageMock = mock(HttpMessage.class);
headersMock = mock(HttpHeaders.class);
doReturn(headersMock).when(httpMessageMock).getHeaders();
successCallbackMock = mock(SuccessCallback.class);
failureCallbackMock = mock(FailureCallback.class);
listenableFutureCallbackMock = mock(ListenableFutureCallback.class);
tagStrategyMock = mock(HttpTagAndSpanNamingStrategy.class);
tagAdapterMock = mock(HttpTagAndSpanNamingAdapter.class);
}
private void verifyFailureCallbackWithTracing(FailureCallback result,
FailureCallback expectedCoreInstance,
Deque<Span> expectedSpanStack,
Map<String, String> expectedMdcInfo) {
assertThat(result).isInstanceOf(FailureCallbackWithTracing.class);
assertThat(Whitebox.getInternalState(result, "origFailureCallback")).isSameAs(expectedCoreInstance);
assertThat(Whitebox.getInternalState(result, "spanStackForExecution")).isEqualTo(expectedSpanStack);
assertThat(Whitebox.getInternalState(result, "mdcContextMapForExecution")).isEqualTo(expectedMdcInfo);
}
@Test
public void failureCallbackWithTracing_using_current_thread_info_works_as_expected() {
// given
Pair<Deque<Span>, Map<String, String>> setupInfo = setupCurrentThreadWithTracingInfo();
// when
FailureCallback result = failureCallbackWithTracing(failureCallbackMock);
// then
verifyFailureCallbackWithTracing(result, failureCallbackMock, setupInfo.getLeft(), setupInfo.getRight());
}
@Test
public void failureCallbackWithTracing_pair_works_as_expected() {
// given
Pair<Deque<Span>, Map<String, String>> setupInfo = generateTracingInfo();
// when
FailureCallback result = failureCallbackWithTracing(failureCallbackMock, setupInfo);
// then
verifyFailureCallbackWithTracing(result, failureCallbackMock, setupInfo.getLeft(), setupInfo.getRight());
}
@Test
public void failureCallbackWithTracing_separate_args_works_as_expected() {
// given
Pair<Deque<Span>, Map<String, String>> setupInfo = generateTracingInfo();
// when
FailureCallback result = failureCallbackWithTracing(
failureCallbackMock, setupInfo.getLeft(), setupInfo.getRight()
);
// then
verifyFailureCallbackWithTracing(result, failureCallbackMock, setupInfo.getLeft(), setupInfo.getRight());
}
public void addCallback(SuccessCallback<? super T> successCallback,
FailureCallback failureCallback) {
delegate.addCallback(
successCallback != null
? new TraceContextSuccessCallback<>(successCallback, this)
: null,
failureCallback != null
? new TraceContextFailureCallback(failureCallback, this)
: null
);
}
public void addFailureCallback(FailureCallback callback) {
this.callbacks.addFailureCallback(callback);
}
public void addFailureCallback(FailureCallback callback) {
this.callbacks.addFailureCallback(callback);
}
public void addFailureCallback(FailureCallback callback) {
this.callbacks.addFailureCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback, FailureCallback failureCallback) {
this.callback.addSuccessCallback(successCallback);
this.callback.addFailureCallback(failureCallback);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.registry.addSuccessCallback(successCallback);
this.registry.addFailureCallback(failureCallback);
}
public void addFailureCallback(FailureCallback callback) {
this.callbacks.addFailureCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback, FailureCallback failureCallback) {
this.callback.addSuccessCallback(successCallback);
this.callback.addFailureCallback(failureCallback);
}
/**
* @return A {@link FailureCallback} that wraps the given original so that the given distributed tracing and MDC
* information is registered with the thread and therefore available during execution and unregistered after
* execution.
*/
public static FailureCallback failureCallbackWithTracing(FailureCallback failureCallback,
Deque<Span> spanStackToLink,
Map<String, String> mdcContextMapToLink) {
return new FailureCallbackWithTracing(failureCallback, spanStackToLink, mdcContextMapToLink);
}
@Override
public void addCallback(final SuccessCallback<? super T> successCallback, final FailureCallback failureCallback) {
callbacks.addSuccessCallback(successCallback);
callbacks.addFailureCallback(failureCallback);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
getWrappedFuture().addCallbacks(successCallback::onSuccess, failureCallback::onFailure);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
getWrappedFuture().addCallback(successCallback, failureCallback);
}
TraceContextFailureCallback(FailureCallback delegate,
TraceContextListenableFuture<?> future) {
this.delegate = delegate;
this.currentTraceContext = future.currentTraceContext;
this.invocationContext = future.invocationContext;
}
/**
* Constructor that uses the given trace and MDC information, which will be associated with the thread when the
* given operation is executed.
*
* <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in
* null for the operation).
*
* <p>The {@link Pair} can be null, or you can pass null for the left and/or right side of the pair, and no error
* will be thrown. Any trace or MDC info that is null means the corresponding info will not be available to the
* thread when the operation is executed however.
*
* <p>You can pass in a {@link TracingState} for clearer less verbose code since it extends
* {@code Pair<Deque<Span>, Map<String, String>>}.
*/
public FailureCallbackWithTracing(FailureCallback origFailureCallback,
Pair<Deque<Span>, Map<String, String>> originalThreadInfo) {
this(
origFailureCallback,
(originalThreadInfo == null) ? null : originalThreadInfo.getLeft(),
(originalThreadInfo == null) ? null : originalThreadInfo.getRight()
);
}