下面列出了怎么用org.springframework.util.concurrent.SuccessCallback的API类实例代码及写法,或者点击链接到github查看源代码。
@Before
public void beforeMethod() {
successCallbackMock = mock(SuccessCallback.class);
inObj = new Object();
throwExceptionDuringCall = false;
currentSpanStackWhenSuccessCallbackWasCalled = new ArrayList<>();
currentMdcInfoWhenSuccessCallbackWasCalled = new ArrayList<>();
doAnswer(invocation -> {
currentSpanStackWhenSuccessCallbackWasCalled.add(Tracer.getInstance().getCurrentSpanStackCopy());
currentMdcInfoWhenSuccessCallbackWasCalled.add(MDC.getCopyOfContextMap());
if (throwExceptionDuringCall)
throw new RuntimeException("kaboom");
return null;
}).when(successCallbackMock).onSuccess(inObj);
resetTracing();
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) {
failureCallback.onFailure(exposedException(this.executionException));
}
else {
successCallback.onSuccess(this.value);
}
}
catch (Throwable ex) {
// Ignore
}
}
@Override
public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback,
FailureCallback failureCallback) {
this.callback.addSuccessCallback(successCallback);
this.callback.addFailureCallback(failureCallback);
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) {
failureCallback.onFailure(exposedException(this.executionException));
}
else {
successCallback.onSuccess(this.value);
}
}
catch (Throwable ex) {
// Ignore
}
}
@Override
public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback,
FailureCallback failureCallback) {
this.callback.addSuccessCallback(successCallback);
this.callback.addFailureCallback(failureCallback);
}
public void send(String topic, String key, String payload){
LOGGER.info("Sending payload='{}' to topic='{}' with key='{}'", payload, topic, key);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key ,payload);
SuccessCallback<SendResult<String,String>> successCallback = sendResult -> {
LOGGER.info("Sent payload='{}' with key='{}' to [email protected]='{}'", payload, key, sendResult.getRecordMetadata().toString());
};
FailureCallback failureCallback = throwable -> {
LOGGER.info("Sending payload='{}' to topic='{}' with key='{}' failed!!!", payload, topic, key);
};
future.addCallback(successCallback, failureCallback);
}
@GetMapping
public Future<?> process() {
AsyncRestTemplate template = new AsyncRestTemplate();
SuccessCallback onSuccess = r -> System.out.println("Success");
FailureCallback onFailure = e -> System.out.println("Failure");
ListenableFuture<?> response = template.getForEntity(
"http://localhost:" + PORT + "/api/v2/resource/b",
ExamplesCollection.class
);
response.addCallback(onSuccess, onFailure);
return response;
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
if (this.executionException != null) {
Throwable cause = this.executionException.getCause();
failureCallback.onFailure(cause != null ? cause : this.executionException);
}
else {
successCallback.onSuccess(this.value);
}
}
catch (Throwable ex) {
// Ignore
}
}
/**
* 事件注册
*
* @param eventArg 事件参数
*/
@Override
public void regist(IEventArg eventArg) {
ListenableFuture<SendResult<String, IEventArg>> listenableFuture = kafkaTemplate.send(getTopic(eventArg), eventArg);
EventRegisterResult registerResult = new EventRegisterResult(eventArg);
//发送成功回调
SuccessCallback<SendResult<String, IEventArg>> successCallback = result -> {
if (registerStore == null) return;
registerResult.setSuccess(true);
Map<String, Object> map = new HashMap<>(2);
if (result != null) {
map.put("producerRecord", result.getProducerRecord());
map.put("metadata", result.getRecordMetadata());
}
registerResult.setExtJson(JSON.toJSONString(map));
registerStore.register(registerResult);
};
//发送失败回调
FailureCallback failureCallback = ex -> {
if (registerStore == null) return;
registerResult.setSuccess(false);
registerResult.setMessage(ex.getMessage());
registerStore.register(registerResult);
log.error(ex.getMessage());
};
listenableFuture.addCallback(successCallback, failureCallback);
}
@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
successCallback.onSuccess(this.value);
}
catch (Throwable ex) {
failureCallback.onFailure(ex);
}
}
/**
* Constructor that uses the given trace and MDC information, which will be associated with the thread when the
* given operation is executed.
*
* <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in
* null for the operation).
*
* <p>The trace and/or MDC info can be null and no error will be thrown, however any trace or MDC info that is null
* means the corresponding info will not be available to the thread when the operation is executed.
*/
public SuccessCallbackWithTracing(SuccessCallback<T> origSuccessCallback,
Deque<Span> spanStackForExecution,
Map<String, String> mdcContextMapForExecution) {
if (origSuccessCallback == null)
throw new IllegalArgumentException("origSuccessCallback cannot be null");
this.origSuccessCallback = origSuccessCallback;
this.spanStackForExecution = spanStackForExecution;
this.mdcContextMapForExecution = mdcContextMapForExecution;
}
/**
* @return A {@link SuccessCallback} that wraps the given original so that the given distributed tracing and MDC
* information is registered with the thread and therefore available during execution and unregistered after
* execution. You can pass in a {@link TracingState} for clearer less verbose code since it extends
* {@code Pair<Deque<Span>, Map<String, String>>}.
*/
public static <T> SuccessCallback<T> successCallbackWithTracing(
SuccessCallback<T> successCallback,
Pair<Deque<Span>, Map<String, String>> threadInfoToLink
) {
return new SuccessCallbackWithTracing<>(successCallback, threadInfoToLink);
}
/**
* @return A {@link SuccessCallback} that wraps the given original so that the given distributed tracing and MDC
* information is registered with the thread and therefore available during execution and unregistered after
* execution.
*/
public static <T> SuccessCallback<T> successCallbackWithTracing(
SuccessCallback<T> successCallback,
Deque<Span> spanStackToLink,
Map<String, String> mdcContextMapToLink
) {
return new SuccessCallbackWithTracing<>(successCallback, spanStackToLink, mdcContextMapToLink);
}
@Before
public void beforeMethod() {
resetTracing();
httpMessageMock = mock(HttpMessage.class);
headersMock = mock(HttpHeaders.class);
doReturn(headersMock).when(httpMessageMock).getHeaders();
successCallbackMock = mock(SuccessCallback.class);
failureCallbackMock = mock(FailureCallback.class);
listenableFutureCallbackMock = mock(ListenableFutureCallback.class);
tagStrategyMock = mock(HttpTagAndSpanNamingStrategy.class);
tagAdapterMock = mock(HttpTagAndSpanNamingAdapter.class);
}
private void verifySuccessCallbackWithTracing(SuccessCallback result,
SuccessCallback expectedCoreInstance,
Deque<Span> expectedSpanStack,
Map<String, String> expectedMdcInfo) {
assertThat(result).isInstanceOf(SuccessCallbackWithTracing.class);
assertThat(Whitebox.getInternalState(result, "origSuccessCallback")).isSameAs(expectedCoreInstance);
assertThat(Whitebox.getInternalState(result, "spanStackForExecution")).isEqualTo(expectedSpanStack);
assertThat(Whitebox.getInternalState(result, "mdcContextMapForExecution")).isEqualTo(expectedMdcInfo);
}
@Test
public void successCallbackWithTracing_using_current_thread_info_works_as_expected() {
// given
Pair<Deque<Span>, Map<String, String>> setupInfo = setupCurrentThreadWithTracingInfo();
// when
SuccessCallback result = successCallbackWithTracing(successCallbackMock);
// then
verifySuccessCallbackWithTracing(result, successCallbackMock, setupInfo.getLeft(), setupInfo.getRight());
}
@Test
public void successCallbackWithTracing_pair_works_as_expected() {
// given
Pair<Deque<Span>, Map<String, String>> setupInfo = generateTracingInfo();
// when
SuccessCallback result = successCallbackWithTracing(successCallbackMock, setupInfo);
// then
verifySuccessCallbackWithTracing(result, successCallbackMock, setupInfo.getLeft(), setupInfo.getRight());
}
@Test
public void successCallbackWithTracing_separate_args_works_as_expected() {
// given
Pair<Deque<Span>, Map<String, String>> setupInfo = generateTracingInfo();
// when
SuccessCallback result = successCallbackWithTracing(
successCallbackMock, setupInfo.getLeft(), setupInfo.getRight()
);
// then
verifySuccessCallbackWithTracing(result, successCallbackMock, setupInfo.getLeft(), setupInfo.getRight());
}
@Test
public void testUnsubscribeOnEventTypeRemoval() {
// Mock the task scheduler and capture the runnable
ArgumentCaptor<Runnable> runnableArg = ArgumentCaptor.forClass(Runnable.class);
when(taskScheduler.scheduleWithFixedDelay(runnableArg.capture(), anyLong())).thenReturn(Mockito.mock(ScheduledFuture.class));
// Mock the response to the subsribeContext
ArgumentCaptor<SuccessCallback> successArg = ArgumentCaptor.forClass(SuccessCallback.class);
ListenableFuture<SubscribeContextResponse> responseFuture = Mockito.mock(ListenableFuture.class);
doNothing().when(responseFuture).addCallback(successArg.capture(), any());
// Return the mocked future on subscription
when(ngsiClient.subscribeContext(any(), any(), any())).thenReturn(responseFuture);
Configuration configuration = getBasicConf();
subscriptionManager.setConfiguration(configuration);
// Execute scheduled runnable
runnableArg.getValue().run();
// Return the SubscribeContextResponse
callSuccessCallback(successArg);
// Mock future for unsubscribeContext
ListenableFuture<UnsubscribeContextResponse> responseFuture2 = Mockito.mock(ListenableFuture.class);
doNothing().when(responseFuture2).addCallback(successArg.capture(), any());
when(ngsiClient.unsubscribeContext(eq("http://iotAgent"), eq(null), eq("12345678"))).thenReturn(responseFuture2);
// Set a configuration without the eventType
Configuration emptyConfiguration = new Configuration();
emptyConfiguration.setEventTypeIns(Collections.emptyList());
subscriptionManager.setConfiguration(emptyConfiguration);
// Check that unsubsribe is called when a later configuration removed the event type
Assert.notNull(successArg.getValue());
}
@Test
public void testUnsubscribeOnProviderRemoval() {
// Mock the task scheduler and capture the runnable
ArgumentCaptor<Runnable> runnableArg = ArgumentCaptor.forClass(Runnable.class);
when(taskScheduler.scheduleWithFixedDelay(runnableArg.capture(), anyLong())).thenReturn(Mockito.mock(ScheduledFuture.class));
// Mock the response to the subsribeContext
ArgumentCaptor<SuccessCallback> successArg = ArgumentCaptor.forClass(SuccessCallback.class);
ListenableFuture<SubscribeContextResponse> responseFuture = Mockito.mock(ListenableFuture.class);
doNothing().when(responseFuture).addCallback(successArg.capture(), any());
// Return the mocked future on subscription
when(ngsiClient.subscribeContext(any(),any(), any())).thenReturn(responseFuture);
Configuration configuration = getBasicConf();
subscriptionManager.setConfiguration(configuration);
// Execute scheduled runnable
runnableArg.getValue().run();
// Return the SubscribeContextResponse
callSuccessCallback(successArg);
// Mock future for unsubscribeContext
ListenableFuture<UnsubscribeContextResponse> responseFuture2 = Mockito.mock(ListenableFuture.class);
doNothing().when(responseFuture2).addCallback(successArg.capture(), any());
when(ngsiClient.unsubscribeContext(eq("http://iotAgent"), eq(null), eq("12345678"))).thenReturn(responseFuture2);
// Reset conf should trigger unsubsribeContext
Configuration emptyConfiguration = getBasicConf();
emptyConfiguration.getEventTypeIns().get(0).setProviders(Collections.emptySet());
subscriptionManager.setConfiguration(emptyConfiguration);
// Check that unsubsribe is called
Assert.notNull(successArg.getValue());
}
@Test
public void testValidSubscription() {
// add configuration
// Mock the task scheduler and capture the runnable
ArgumentCaptor<Runnable> runnableArg = ArgumentCaptor.forClass(Runnable.class);
when(taskScheduler.scheduleWithFixedDelay(runnableArg.capture(), anyLong())).thenReturn(Mockito.mock(ScheduledFuture.class));
// Mock the response to the subsribeContext
ArgumentCaptor<SuccessCallback> successArg = ArgumentCaptor.forClass(SuccessCallback.class);
ListenableFuture<SubscribeContextResponse> responseFuture = Mockito.mock(ListenableFuture.class);
doNothing().when(responseFuture).addCallback(successArg.capture(), any());
Configuration configuration = getBasicConf();
subscriptionManager.setConfiguration(configuration);
// Capture the arg of subscription and return the mocked future
ArgumentCaptor<String> urlProviderArg = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<SubscribeContext> subscribeContextArg = ArgumentCaptor.forClass(SubscribeContext.class);
when(ngsiClient.subscribeContext(urlProviderArg.capture(), eq(null), subscribeContextArg.capture())).thenReturn(responseFuture);
// Execute scheduled runnable
runnableArg.getValue().run();
// Return the SubscribeContextResponse
callSuccessCallback(successArg);
// check ngsiClient.unsubscribe() is never called
verify(ngsiClient, never()).unsubscribeContext(any(), any(), any());
subscriptionManager.validateSubscriptionId("12345678", "http://iotAgent");
}
private void callSuccessCallback (ArgumentCaptor<SuccessCallback> successArg) {
SubscribeContextResponse response = new SubscribeContextResponse();
SubscribeResponse subscribeResponse = new SubscribeResponse();
subscribeResponse.setSubscriptionId("12345678");
subscribeResponse.setDuration("PT1H");
response.setSubscribeResponse(subscribeResponse);
successArg.getValue().onSuccess(response);
}
public void addCallback(SuccessCallback<? super T> successCallback,
FailureCallback failureCallback) {
delegate.addCallback(
successCallback != null
? new TraceContextSuccessCallback<>(successCallback, this)
: null,
failureCallback != null
? new TraceContextFailureCallback(failureCallback, this)
: null
);
}
public void addSuccessCallback(SuccessCallback<? super ClientHttpResponse> callback) {
this.callbacks.addSuccessCallback(callback);
}
public void addSuccessCallback(SuccessCallback<? super ClientHttpResponse> callback) {
this.callbacks.addSuccessCallback(callback);
}
public void addSuccessCallback(SuccessCallback<? super ClientHttpResponse> callback) {
this.callbacks.addSuccessCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback, FailureCallback failureCallback) {
this.callback.addSuccessCallback(successCallback);
this.callback.addFailureCallback(failureCallback);
}
@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.registry.addSuccessCallback(successCallback);
this.registry.addFailureCallback(failureCallback);
}
public void addSuccessCallback(SuccessCallback<? super ClientHttpResponse> callback) {
this.callbacks.addSuccessCallback(callback);
}
@Override
public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback, FailureCallback failureCallback) {
this.callback.addSuccessCallback(successCallback);
this.callback.addFailureCallback(failureCallback);
}