下面列出了org.springframework.util.concurrent.ListenableFuture#addCallback ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void optionsForAllowCallback() throws Exception {
ListenableFuture<Set<HttpMethod>> allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get"));
allowedFuture.addCallback(new ListenableFutureCallback<Set<HttpMethod>>() {
@Override
public void onSuccess(Set<HttpMethod> result) {
assertEquals("Invalid response", EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS,
HttpMethod.HEAD, HttpMethod.TRACE), result);
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(allowedFuture);
}
@Test
public void postForEntityCallback() throws Exception {
HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
ListenableFuture<ResponseEntity<String>> responseEntityFuture =
template.postForEntity(baseUrl + "/{method}", requestEntity, String.class, "post");
responseEntityFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
assertEquals("Invalid content", helloWorld, result.getBody());
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(responseEntityFuture);
}
@Test
public void notFoundCallback() throws Exception {
ListenableFuture<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
fail("onSuccess not expected");
}
@Override
public void onFailure(Throwable t) {
assertTrue(t instanceof HttpClientErrorException);
HttpClientErrorException ex = (HttpClientErrorException) t;
assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
assertNotNull(ex.getStatusText());
assertNotNull(ex.getResponseBodyAsString());
}
});
while (!future.isDone()) {
}
}
@Test
public void postForLocationCallback() throws Exception {
HttpHeaders entityHeaders = new HttpHeaders();
entityHeaders.setContentType(new MediaType("text", "plain", StandardCharsets.ISO_8859_1));
HttpEntity<String> entity = new HttpEntity<>(helloWorld, entityHeaders);
final URI expected = new URI(baseUrl + "/post/1");
ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
locationFuture.addCallback(new ListenableFutureCallback<URI>() {
@Override
public void onSuccess(URI result) {
assertEquals("Invalid location", expected, result);
}
@Override
public void onFailure(Throwable ex) {
fail(ex.getMessage());
}
});
waitTillDone(locationFuture);
}
@GetMapping(value="/webSyncDept/{id}.json", produces ="application/json", headers = {"Accept=text/xml, application/json"})
public WebAsyncTask<Department> websyncDeptList(@PathVariable("id") Integer id){
Callable<Department> callable = new Callable<Department>() {
public Department call() throws Exception {
ListenableFuture<Department> listenFuture = departmentServiceImpl.findAllFirstById(id);
listenFuture.addCallback(new ListenableFutureCallback<Department>(){
@Override
public void onSuccess(Department dept) {
result = dept;
}
@Override
public void onFailure(Throwable arg0) {
result = new Department();
}
});
return result;
}
};
return new WebAsyncTask<Department>(500, callable);
}
@Test
public void asyncResultWithCallbackAndValue() throws Exception {
String value = "val";
final Set<String> values = new HashSet<>(1);
ListenableFuture<String> future = AsyncResult.forValue(value);
future.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
values.add(result);
}
@Override
public void onFailure(Throwable ex) {
fail("Failure callback not expected: " + ex);
}
});
assertSame(value, values.iterator().next());
assertSame(value, future.get());
assertSame(value, future.completable().get());
future.completable().thenAccept(v -> assertSame(value, v));
}
@Test
public void postForLocationCallbackWithLambdas() throws Exception {
HttpHeaders entityHeaders = new HttpHeaders();
entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15")));
HttpEntity<String> entity = new HttpEntity<String>(helloWorld, entityHeaders);
final URI expected = new URI(baseUrl + "/post/1");
ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
locationFuture.addCallback(result -> assertEquals("Invalid location", expected, result),
ex -> fail(ex.getMessage()));
while (!locationFuture.isDone()) {
}
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
AsyncClientHttpRequestExecution execution) throws IOException {
final String urlTemplate = urlTemplateHolder.get();
urlTemplateHolder.remove();
final Clock clock = meterRegistry.config().clock();
final long startTime = clock.monotonicTime();
ListenableFuture<ClientHttpResponse> future;
try {
future = execution.executeAsync(request, body);
} catch (IOException e) {
getTimeBuilder(urlTemplate, request, null, e).register(meterRegistry)
.record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
throw e;
}
future.addCallback(new ListenableFutureCallback<ClientHttpResponse>() {
@Override
public void onFailure(final Throwable ex) {
getTimeBuilder(urlTemplate, request, null, ex).register(meterRegistry)
.record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
}
@Override
public void onSuccess(final ClientHttpResponse response) {
getTimeBuilder(urlTemplate, request, response, null).register(meterRegistry)
.record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
}
});
return future;
}
@Test
public void serverErrorCallbackWithLambdas() throws Exception {
ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
future.addCallback(result -> fail("onSuccess not expected"), ex -> {
assertTrue(ex instanceof HttpServerErrorException);
HttpServerErrorException hsex = (HttpServerErrorException) ex;
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode());
assertNotNull(hsex.getStatusText());
assertNotNull(hsex.getResponseBodyAsString());
});
while (!future.isDone()) {
}
}
public DeferredResult<LoginDetails> send(String content) {
System.out.println("send request");
final DeferredResult<LoginDetails> response = new DeferredResult<>();
ListenableFuture<LoginDetails> future = asyncRabbitTemplate.convertSendAndReceive(requestQueue.getName(),
content);
future.addCallback(new LoginHandlerResponse(response));
System.out.println(asyncRabbitTemplate.isAutoStartup());
System.out.println(asyncRabbitTemplate.isRunning());
return response;
}
@Test
public void submitListenableRunnable() throws Exception {
TestTask task = new TestTask(1);
// Act
ListenableFuture<?> future = executor.submitListenable(task);
future.addCallback(result -> outcome = result, ex -> outcome = ex);
// Assert
Awaitility.await()
.atMost(1, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.until(future::isDone);
assertNull(outcome);
assertThreadNamePrefix(task);
}
private void doSend(String topic, SyncWrapper<String> wrapper) {
final SyncData event = wrapper.getEvent();
// require that messages with the same key (for instance, a unique id) are always seen in the correct order,
// attaching a key to messages will ensure messages with the same key always go to the same partition in a topic
ListenableFuture<SendResult<String, Object>> future;
Long partitionId = event.getPartitionId();
if (partitionId != null) {
future = kafkaTemplate.send(topic, partitionId.toString(), event.getResult());
} else {
logger.warn("Send {} to {} without key", event, topic);
future = kafkaTemplate.send(topic, event.getResult());
}
ListenableFutureCallback<SendResult<String, Object>> callback = new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(final SendResult<String, Object> message) {
ackSuccess(Lists.newArrayList(wrapper));
logger.info("sent {} with offset {} ", event, message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
SyncerHealth.consumer(consumerId, identifier, Health.red(throwable.getMessage()));
retryFailed(Lists.newArrayList(wrapper), throwable);
logger.error("unable to send {} ", event, throwable);
}
};
future.addCallback(callback);
// no need to wait future, the order between batch is ensured by kafka client
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void exchangeGetCallbackWithLambdas() throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.set("MyHeader", "MyValue");
HttpEntity<?> requestEntity = new HttpEntity(requestHeaders);
ListenableFuture<ResponseEntity<String>> responseFuture =
template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, String.class, "get");
responseFuture.addCallback(result -> assertEquals("Invalid content", helloWorld,
result.getBody()), ex -> fail(ex.getMessage()));
waitTillDone(responseFuture);
}
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
AsyncClientHttpRequestExecution execution)
throws IOException {
SofaTracerSpan sofaTracerSpan = restTemplateTracer.clientSend(request.getMethod().name());
appendRestTemplateRequestSpanTags(request, sofaTracerSpan);
Exception exception = null;
try {
ListenableFuture<ClientHttpResponse> result = execution.executeAsync(request, body);
result.addCallback(new SofaTraceListenableFutureCallback(restTemplateTracer,
sofaTracerSpan));
return result;
} catch (IOException e) {
exception = e;
throw e;
} finally {
// when error , clear tl soon
if (exception != null) {
SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext()
.getCurrentSpan();
currentSpan.setTag(Tags.ERROR.getKey(), exception.getMessage());
restTemplateTracer.clientReceive(String.valueOf(500));
} else {
// clear current
SofaTraceContextHolder.getSofaTraceContext().pop();
if (sofaTracerSpan != null && sofaTracerSpan.getParentSofaTracerSpan() != null) {
// reset parent
SofaTraceContextHolder.getSofaTraceContext().push(
sofaTracerSpan.getParentSofaTracerSpan());
}
}
}
}
@Override
public void initiateRes(IMHeader header, MessageLite body, ChannelHandlerContext ctx) {
IMAVCallInitiateRes msg = (IMAVCallInitiateRes)body;
long fromId = msg.getFromId();
long toId = msg.getToId();
long callId = msg.getCallId();
ListenableFuture<?> future =messageServerCluster.webrtcInitateCallRes(fromId, toId, super.getHandleId(ctx));
future.addCallback((result) -> {
// TODO 通知其他端取消
// messageServerCluster.sendToUser(toId, header, body);
}, (throwable) -> {
// TODO 接受失败
// 所有端取消
});
// IMBaseDefine.ClientType nType = msg.getCalledClientType();
// logger.debug("webrtc initiate resposne {} {} {} {}", fromId, toId, callId, nType);
// ClientUser toClientUser = ClientUserManager.getUserById(toId);
// if (toClientUser != null ){
//
// // 呼叫接起时,取消其他端的呼叫提醒
// IMHeader hdCancel = new IMHeader();
// hdCancel.setServiceId(ServiceID.SID_AVCALL_VALUE);
// hdCancel.setCommandId(AVCallCmdId.CID_AVCALL_CANCEL_REQ_VALUE);
//
// IMAVCallCancelReq msgCancelReq = IMAVCallCancelReq.newBuilder().setFromId(fromId)
// .setToId(toId).setCallId(callId).build();
//
// IMProtoMessage<MessageLite> msgCancel = new IMProtoMessage<MessageLite>(hdCancel, msgCancelReq);
// toClientUser.broadcast(msgCancel, ctx);
// }
//
// messageServerCluster.send(header, body);
}
@Test
public void testPull_AndManualMultiSubscriptionAck()
throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService mockExecutor = Mockito.mock(ExecutorService.class);
this.pubSubSubscriberTemplate.setAckExecutor(mockExecutor);
List<AcknowledgeablePubsubMessage> result1 = this.pubSubSubscriberTemplate.pull(
"sub1", 1, true);
List<AcknowledgeablePubsubMessage> result2 = this.pubSubSubscriberTemplate.pull(
"sub2", 1, true);
Set<AcknowledgeablePubsubMessage> combinedMessages = new HashSet<>(result1);
combinedMessages.addAll(result2);
assertThat(combinedMessages.size()).isEqualTo(2);
TestListenableFutureCallback testListenableFutureCallback = new TestListenableFutureCallback();
ListenableFuture<Void> listenableFuture = this.pubSubSubscriberTemplate.ack(combinedMessages);
assertThat(listenableFuture).isNotNull();
listenableFuture.addCallback(testListenableFutureCallback);
listenableFuture.get(10L, TimeUnit.SECONDS);
assertThat(listenableFuture.isDone()).isTrue();
assertThat(testListenableFutureCallback.getThrowable()).isNull();
verify(this.ackCallable, times(2)).futureCall(any(AcknowledgeRequest.class));
verify(this.ackApiFuture, times(2)).addListener(any(), same(mockExecutor));
}
@Test
public void postForEntityCallbackWithLambdas() throws Exception {
HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
ListenableFuture<ResponseEntity<String>> responseEntityFuture =
template.postForEntity(baseUrl + "/{method}", requestEntity, String.class, "post");
responseEntityFuture.addCallback(result -> assertEquals("Invalid content", helloWorld, result.getBody()),
ex -> fail(ex.getMessage()));
while (!responseEntityFuture.isDone()) {
}
}
@Test
public void serverErrorCallbackWithLambdas() throws Exception {
ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
future.addCallback(result -> fail("onSuccess not expected"), ex -> {
assertTrue(ex instanceof HttpServerErrorException);
HttpServerErrorException hsex = (HttpServerErrorException) ex;
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode());
assertNotNull(hsex.getStatusText());
assertNotNull(hsex.getResponseBodyAsString());
});
waitTillDone(future);
}
/**
* Forward the given message to the STOMP broker.
* <p>The method checks whether we have an active TCP connection and have
* received the STOMP CONNECTED frame. For client messages this should be
* false only if we lose the TCP connection around the same time when a
* client message is being forwarded, so we simply log the ignored message
* at debug level. For messages from within the application being sent on
* the "system" connection an exception is raised so that components sending
* the message have a chance to handle it -- by default the broker message
* channel is synchronous.
* <p>Note that if messages arrive concurrently around the same time a TCP
* connection is lost, there is a brief period of time before the connection
* is reset when one or more messages may sneak through and an attempt made
* to forward them. Rather than synchronizing to guard against that, this
* method simply lets them try and fail. For client sessions that may
* result in an additional STOMP ERROR frame(s) being sent downstream but
* code handling that downstream should be idempotent in such cases.
* @param message the message to send (never {@code null})
* @return a future to wait for the result
*/
@SuppressWarnings("unchecked")
public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
TcpConnection<byte[]> conn = this.tcpConnection;
if (!this.isStompConnected || conn == null) {
if (this.isRemoteClientSession) {
if (logger.isDebugEnabled()) {
logger.debug("TCP connection closed already, ignoring " +
accessor.getShortLogMessage(message.getPayload()));
}
return EMPTY_TASK;
}
else {
throw new IllegalStateException("Cannot forward messages " +
(conn != null ? "before STOMP CONNECTED. " : "while inactive. ") +
"Consider subscribing to receive BrokerAvailabilityEvent's from " +
"an ApplicationListener Spring bean. Dropped " +
accessor.getShortLogMessage(message.getPayload()));
}
}
final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ?
MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
StompCommand command = accessor.getCommand();
if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) ||
StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) {
logger.debug("Forwarding " + accessor.getShortLogMessage(message.getPayload()));
}
else if (logger.isTraceEnabled()) {
logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload()));
}
ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
future.addCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
if (accessor.getCommand() == StompCommand.DISCONNECT) {
afterDisconnectSent(accessor);
}
}
@Override
public void onFailure(Throwable ex) {
if (tcpConnection != null) {
handleTcpConnectionFailure("failed to forward " +
accessor.getShortLogMessage(message.getPayload()), ex);
}
else if (logger.isErrorEnabled()) {
logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload()));
}
}
});
return future;
}
@Override
public void doPublish(String topic, Object data) {
ListenableFuture<SendResult<String, Object>> reuslt = kafkaTemplate.send(topic, data);
reuslt.addCallback(success -> log.info("推送消息到Kafka:{}", data)
, failure -> log.error("推送消息到Kafka失败:" + data.toString(), failure.getCause()));
}