下面列出了怎么用org.springframework.util.concurrent.ListenableFutureCallback的API类实例代码及写法,或者点击链接到github查看源代码。
public void send(String topic, String message) {
// the KafkaTemplate provides asynchronous send methods returning a Future
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
// register a callback with the listener to receive the result of the send asynchronously
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Kafka sent message='{}' with offset={}", message,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("Kafka unable to send message='{}'", message, ex);
}
});
// or, to block the sending thread to await the result, invoke the future's get() method
}
@Override
protected void openConnection() {
if (logger.isInfoEnabled()) {
logger.info("Connecting to WebSocket at " + getUri());
}
ListenableFuture<WebSocketSession> future =
this.client.doHandshake(this.webSocketHandler, this.headers, getUri());
future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(@Nullable WebSocketSession result) {
webSocketSession = result;
logger.info("Successfully connected");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Failed to connect", ex);
}
});
}
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<>();
WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future);
handler = new ClientSockJsWebSocketHandler(session);
request.addTimeoutTask(session.getTimeoutTask());
URI url = request.getTransportUrl();
WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Starting WebSocket session on " + url);
}
this.webSocketClient.doHandshake(handler, headers, url).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(@Nullable WebSocketSession webSocketSession) {
// WebSocket session ready, SockJS Session not yet
}
@Override
public void onFailure(Throwable ex) {
future.setException(ex);
}
});
return future;
}
@Test
public void infoRequestFailure() throws Exception {
TestClientHandler handler = new TestClientHandler();
this.testFilter.sendErrorMap.put("/info", 500);
CountDownLatch latch = new CountDownLatch(1);
initSockJsClient(createWebSocketTransport());
this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(WebSocketSession result) {
}
@Override
public void onFailure(Throwable ex) {
latch.countDown();
}
}
);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
@Test
public void connectFailure() throws Exception {
final HttpServerErrorException expected = new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR);
RestOperations restTemplate = mock(RestOperations.class);
given(restTemplate.execute((URI) any(), eq(HttpMethod.POST), any(), any())).willThrow(expected);
final CountDownLatch latch = new CountDownLatch(1);
connect(restTemplate).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(WebSocketSession result) {
}
@Override
public void onFailure(Throwable ex) {
if (ex == expected) {
latch.countDown();
}
}
}
);
verifyNoMoreInteractions(this.webSocketHandler);
}
@Test
public void getEntityCallback() throws Exception {
ListenableFuture<ResponseEntity<String>> futureEntity =
template.getForEntity(baseUrl + "/{method}", String.class, "get");
futureEntity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> entity) {
assertEquals("Invalid content", helloWorld, entity.getBody());
assertFalse("No headers", entity.getHeaders().isEmpty());
assertEquals("Invalid content-type", textContentType, entity.getHeaders().getContentType());
assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode());
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(futureEntity);
}
@Test
public void postForLocationCallback() throws Exception {
HttpHeaders entityHeaders = new HttpHeaders();
entityHeaders.setContentType(new MediaType("text", "plain", StandardCharsets.ISO_8859_1));
HttpEntity<String> entity = new HttpEntity<>(helloWorld, entityHeaders);
final URI expected = new URI(baseUrl + "/post/1");
ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
locationFuture.addCallback(new ListenableFutureCallback<URI>() {
@Override
public void onSuccess(URI result) {
assertEquals("Invalid location", expected, result);
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(locationFuture);
}
@Test
public void putCallback() throws Exception {
HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
ListenableFuture<?> responseEntityFuture = template.put(baseUrl + "/{method}", requestEntity, "put");
responseEntityFuture.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
assertNull(result);
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(responseEntityFuture);
}
@Test
public void notFoundCallback() throws Exception {
ListenableFuture<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable t) {
assertTrue(t instanceof HttpClientErrorException);
HttpClientErrorException ex = (HttpClientErrorException) t;
assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
assertNotNull(ex.getStatusText());
assertNotNull(ex.getResponseBodyAsString());
}
});
waitTillDone(future);
}
@Test
public void infoRequestFailure() throws Exception {
TestClientHandler handler = new TestClientHandler();
this.testFilter.sendErrorMap.put("/info", 500);
CountDownLatch latch = new CountDownLatch(1);
initSockJsClient(createWebSocketTransport());
this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(WebSocketSession result) {
}
@Override
public void onFailure(Throwable ex) {
latch.countDown();
}
}
);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
@Test
public void optionsForAllowCallback() throws Exception {
ListenableFuture<Set<HttpMethod>> allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get"));
allowedFuture.addCallback(new ListenableFutureCallback<Set<HttpMethod>>() {
@Override
public void onSuccess(Set<HttpMethod> result) {
assertEquals("Invalid response", EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS,
HttpMethod.HEAD, HttpMethod.TRACE), result);
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(allowedFuture);
}
@Before
public void beforeMethod() {
listenableFutureCallbackMock = mock(ListenableFutureCallback.class);
successInObj = new Object();
failureInObj = new Exception("kaboom");
throwExceptionDuringCall = false;
currentSpanStackWhenListenableFutureCallbackWasCalled = new ArrayList<>();
currentMdcInfoWhenListenableFutureCallbackWasCalled = new ArrayList<>();
doAnswer(invocation -> {
currentSpanStackWhenListenableFutureCallbackWasCalled.add(Tracer.getInstance().getCurrentSpanStackCopy());
currentMdcInfoWhenListenableFutureCallbackWasCalled.add(MDC.getCopyOfContextMap());
if (throwExceptionDuringCall)
throw new RuntimeException("kaboom");
return null;
}).when(listenableFutureCallbackMock).onSuccess(successInObj);
doAnswer(invocation -> {
currentSpanStackWhenListenableFutureCallbackWasCalled.add(Tracer.getInstance().getCurrentSpanStackCopy());
currentMdcInfoWhenListenableFutureCallbackWasCalled.add(MDC.getCopyOfContextMap());
if (throwExceptionDuringCall)
throw new RuntimeException("kaboom");
return null;
}).when(listenableFutureCallbackMock).onFailure(failureInObj);
resetTracing();
}
@Override
public CompletableFuture<Void> sendMessageAsync(QueueMessage queueMessage) {
CompletableFuture<Void> future = new CompletableFuture<>();
ProducerRecord<String, String> message = KafkaTool.covertToProducerRecord(queueMessage);
producer.send(message).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.error("Enode message async send has exception, message: {}", message, throwable);
future.completeExceptionally(new IORuntimeException(throwable));
}
@Override
public void onSuccess(SendResult<String, String> result) {
if (logger.isDebugEnabled()) {
logger.debug("Enode message async send success, sendResult: {}, message: {}", result, message);
}
future.complete(null);
}
});
return future;
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
final DeferredResult<Object> deferredResult = new DeferredResult<Object>();
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
ListenableFuture<?> future = (ListenableFuture<?>) returnValue;
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
deferredResult.setResult(result);
}
@Override
public void onFailure(Throwable ex) {
deferredResult.setErrorResult(ex);
}
});
}
@Override
public void accept(InboundDriverMessage driver) {
byte[] keyBytes = driver.getDriverId().getBytes(StandardCharsets.UTF_8);
ListenableFuture<SendResult<Object, Object>> future =
kafkaTemplate.send(topic, keyBytes, driver.toByteArray());
future.addCallback(
new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.warn("Failed sending an event to kafka", throwable);
}
@Override
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {}
});
}
@Override
public void accept(InboundPassengerMessage passenger) {
byte[] bytes = passenger.getPassengerId().getBytes(StandardCharsets.UTF_8);
kafkaTemplate
.send(topic, bytes, passenger.toByteArray())
.addCallback(
new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(@NonNull Throwable throwable) {
log.warn("couldn't send passenger data.", throwable);
}
@Override
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
log.info("Sent passenger data");
}
});
}
@Test
public void notFoundCallback() throws Exception {
ListenableFuture<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable t) {
assertTrue(t instanceof HttpClientErrorException);
HttpClientErrorException ex = (HttpClientErrorException) t;
assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
assertNotNull(ex.getStatusText());
assertNotNull(ex.getResponseBodyAsString());
}
});
while (!future.isDone()) {
}
}
@Test
public void postForEntityCallback() throws Exception {
HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
ListenableFuture<ResponseEntity<String>> responseEntityFuture =
template.postForEntity(baseUrl + "/{method}", requestEntity, String.class, "post");
responseEntityFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
assertEquals("Invalid content", helloWorld, result.getBody());
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
while (!responseEntityFuture.isDone()) {
}
}
@Override
protected void openConnection() {
if (logger.isInfoEnabled()) {
logger.info("Connecting to WebSocket at " + getUri());
}
ListenableFuture<WebSocketSession> future =
this.client.doHandshake(this.webSocketHandler, this.headers, getUri());
future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(@Nullable WebSocketSession result) {
webSocketSession = result;
logger.info("Successfully connected");
}
@Override
public void onFailure(Throwable ex) {
logger.error("Failed to connect", ex);
}
});
}
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<>();
WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future);
handler = new ClientSockJsWebSocketHandler(session);
request.addTimeoutTask(session.getTimeoutTask());
URI url = request.getTransportUrl();
WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Starting WebSocket session on " + url);
}
this.webSocketClient.doHandshake(handler, headers, url).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(@Nullable WebSocketSession webSocketSession) {
// WebSocket session ready, SockJS Session not yet
}
@Override
public void onFailure(Throwable ex) {
future.setException(ex);
}
});
return future;
}
@Test
public void connectFailure() throws Exception {
final HttpServerErrorException expected = new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR);
RestOperations restTemplate = mock(RestOperations.class);
given(restTemplate.execute((URI) any(), eq(HttpMethod.POST), any(), any())).willThrow(expected);
final CountDownLatch latch = new CountDownLatch(1);
connect(restTemplate).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
@Override
public void onSuccess(WebSocketSession result) {
}
@Override
public void onFailure(Throwable ex) {
if (ex == expected) {
latch.countDown();
}
}
}
);
verifyNoMoreInteractions(this.webSocketHandler);
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void exchangeGetCallback() throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set("MyHeader", "MyValue");
HttpEntity<?> requestEntity = new HttpEntity(requestHeaders);
ListenableFuture<ResponseEntity<String>> responseFuture =
template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, String.class, "get");
responseFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
assertEquals("Invalid content", helloWorld, result.getBody());
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
while (!responseFuture.isDone()) {
}
}
public ListenableFutureCallback<OutboundMessageResponseSummary> getBatchCallBack()
{
return new ListenableFutureCallback<OutboundMessageResponseSummary>()
{
@Override
public void onFailure( Throwable ex )
{
log.error( "Message sending failed", ex );
}
@Override
public void onSuccess( OutboundMessageResponseSummary result )
{
int successful = result.getSent();
int failed = result.getFailed();
log.info( String.format( "%s Message sending status: Successful: %d Failed: %d", result.getChannel().name(), successful, failed ) );
}
};
}
@Test
public void postForLocationCallback() throws Exception {
HttpHeaders entityHeaders = new HttpHeaders();
entityHeaders.setContentType(new MediaType("text", "plain", StandardCharsets.ISO_8859_1));
HttpEntity<String> entity = new HttpEntity<>(helloWorld, entityHeaders);
final URI expected = new URI(baseUrl + "/post/1");
ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
locationFuture.addCallback(new ListenableFutureCallback<URI>() {
@Override
public void onSuccess(URI result) {
assertEquals("Invalid location", expected, result);
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(locationFuture);
}
@Test
public void serverErrorCallback() throws Exception {
ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable ex) {
assertTrue(ex instanceof HttpServerErrorException);
HttpServerErrorException hsex = (HttpServerErrorException) ex;
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode());
assertNotNull(hsex.getStatusText());
assertNotNull(hsex.getResponseBodyAsString());
}
});
while (!future.isDone()) {
}
}
@Test
public void postForLocationCallback() throws Exception {
HttpHeaders entityHeaders = new HttpHeaders();
entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15")));
HttpEntity<String> entity = new HttpEntity<String>(helloWorld, entityHeaders);
final URI expected = new URI(baseUrl + "/post/1");
ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
locationFuture.addCallback(new ListenableFutureCallback<URI>() {
@Override
public void onSuccess(URI result) {
assertEquals("Invalid location", expected, result);
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
while (!locationFuture.isDone()) {
}
}
@Test
public void notFoundCallback() throws Exception {
ListenableFuture<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable t) {
assertTrue(t instanceof HttpClientErrorException);
HttpClientErrorException ex = (HttpClientErrorException) t;
assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
assertNotNull(ex.getStatusText());
assertNotNull(ex.getResponseBodyAsString());
}
});
waitTillDone(future);
}
@Test
public void serverErrorCallback() throws Exception {
ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable ex) {
assertTrue(ex instanceof HttpServerErrorException);
HttpServerErrorException hsex = (HttpServerErrorException) ex;
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode());
assertNotNull(hsex.getStatusText());
assertNotNull(hsex.getResponseBodyAsString());
}
});
waitTillDone(future);
}
@Test
public void optionsForAllowCallback() throws Exception {
ListenableFuture<Set<HttpMethod>> allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get"));
allowedFuture.addCallback(new ListenableFutureCallback<Set<HttpMethod>>() {
@Override
public void onSuccess(Set<HttpMethod> result) {
assertEquals("Invalid response", EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS,
HttpMethod.HEAD, HttpMethod.TRACE), result);
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(allowedFuture);
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void exchangeGetCallback() throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set("MyHeader", "MyValue");
HttpEntity<?> requestEntity = new HttpEntity(requestHeaders);
ListenableFuture<ResponseEntity<String>> responseFuture =
template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, String.class, "get");
responseFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
assertEquals("Invalid content", helloWorld, result.getBody());
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(responseFuture);
}