org.springframework.util.concurrent.ListenableFuture#addCallback ( )源码实例Demo

下面列出了org.springframework.util.concurrent.ListenableFuture#addCallback ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void optionsForAllowCallback() throws Exception {
	ListenableFuture<Set<HttpMethod>> allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get"));
	allowedFuture.addCallback(new ListenableFutureCallback<Set<HttpMethod>>() {
		@Override
		public void onSuccess(Set<HttpMethod> result) {
			assertEquals("Invalid response", EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS,
					HttpMethod.HEAD, HttpMethod.TRACE), result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(allowedFuture);
}
 
@Test
public void postForEntityCallback() throws Exception  {
	HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
	ListenableFuture<ResponseEntity<String>> responseEntityFuture =
			template.postForEntity(baseUrl + "/{method}", requestEntity, String.class, "post");
	responseEntityFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
		@Override
		public void onSuccess(ResponseEntity<String> result) {
			assertEquals("Invalid content", helloWorld, result.getBody());
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(responseEntityFuture);
}
 
@Test
public void notFoundCallback() throws Exception {
	ListenableFuture<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
	future.addCallback(new ListenableFutureCallback<Object>() {
		@Override
		public void onSuccess(Object result) {
			fail("onSuccess not expected");
		}
		@Override
		public void onFailure(Throwable t) {
			assertTrue(t instanceof HttpClientErrorException);
			HttpClientErrorException ex = (HttpClientErrorException) t;
			assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
			assertNotNull(ex.getStatusText());
			assertNotNull(ex.getResponseBodyAsString());
		}
	});
	while (!future.isDone()) {
	}
}
 
@Test
public void postForLocationCallback() throws Exception  {
	HttpHeaders entityHeaders = new HttpHeaders();
	entityHeaders.setContentType(new MediaType("text", "plain", StandardCharsets.ISO_8859_1));
	HttpEntity<String> entity = new HttpEntity<>(helloWorld, entityHeaders);
	final URI expected = new URI(baseUrl + "/post/1");
	ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
	locationFuture.addCallback(new ListenableFutureCallback<URI>() {
		@Override
		public void onSuccess(URI result) {
			assertEquals("Invalid location", expected, result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(locationFuture);
}
 
源代码5 项目: Spring-5.0-Cookbook   文件: DeptAsyncController.java
@GetMapping(value="/webSyncDept/{id}.json", produces ="application/json", headers = {"Accept=text/xml, application/json"})
public WebAsyncTask<Department> websyncDeptList(@PathVariable("id") Integer id){
   
    Callable<Department> callable = new Callable<Department>() {
    	public Department call() throws Exception {
    		
    		 ListenableFuture<Department> listenFuture = departmentServiceImpl.findAllFirstById(id);
    		 listenFuture.addCallback(new ListenableFutureCallback<Department>(){

				@Override
				public void onSuccess(Department dept) {
					result = dept;
				}

				@Override
				public void onFailure(Throwable arg0) {
					result = new Department();
				}
    			 
    		 });
    		 return result;
          }
    };
    return new WebAsyncTask<Department>(500, callable);
}
 
源代码6 项目: java-technology-stack   文件: AsyncResultTests.java
@Test
public void asyncResultWithCallbackAndValue() throws Exception {
	String value = "val";
	final Set<String> values = new HashSet<>(1);
	ListenableFuture<String> future = AsyncResult.forValue(value);
	future.addCallback(new ListenableFutureCallback<String>() {
		@Override
		public void onSuccess(String result) {
			values.add(result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail("Failure callback not expected: " + ex);
		}
	});
	assertSame(value, values.iterator().next());
	assertSame(value, future.get());
	assertSame(value, future.completable().get());
	future.completable().thenAccept(v -> assertSame(value, v));
}
 
@Test
public void postForLocationCallbackWithLambdas() throws Exception  {
	HttpHeaders entityHeaders = new HttpHeaders();
	entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15")));
	HttpEntity<String> entity = new HttpEntity<String>(helloWorld, entityHeaders);
	final URI expected = new URI(baseUrl + "/post/1");
	ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
	locationFuture.addCallback(result -> assertEquals("Invalid location", expected, result),
			ex -> fail(ex.getMessage()));
	while (!locationFuture.isDone()) {
	}
}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
    AsyncClientHttpRequestExecution execution) throws IOException {
    final String urlTemplate = urlTemplateHolder.get();
    urlTemplateHolder.remove();
    final Clock clock = meterRegistry.config().clock();
    final long startTime = clock.monotonicTime();
    ListenableFuture<ClientHttpResponse> future;
    try {
        future = execution.executeAsync(request, body);
    } catch (IOException e) {
        getTimeBuilder(urlTemplate, request, null, e).register(meterRegistry)
            .record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
        throw e;
    }
    future.addCallback(new ListenableFutureCallback<ClientHttpResponse>() {
        @Override
        public void onFailure(final Throwable ex) {
            getTimeBuilder(urlTemplate, request, null, ex).register(meterRegistry)
                .record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
        }

        @Override
        public void onSuccess(final ClientHttpResponse response) {
            getTimeBuilder(urlTemplate, request, response, null).register(meterRegistry)
                .record(clock.monotonicTime() - startTime, TimeUnit.NANOSECONDS);
        }
    });
    return future;
}
 
@Test
public void serverErrorCallbackWithLambdas() throws Exception {
	ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
	future.addCallback(result -> fail("onSuccess not expected"), ex -> {
			assertTrue(ex instanceof HttpServerErrorException);
			HttpServerErrorException hsex = (HttpServerErrorException) ex;
			assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode());
			assertNotNull(hsex.getStatusText());
			assertNotNull(hsex.getResponseBodyAsString());
	});
	while (!future.isDone()) {
	}
}
 
源代码10 项目: Spring-5.0-Cookbook   文件: SendAsyncEventLogin.java
public DeferredResult<LoginDetails> send(String content) {
	System.out.println("send request");
	final DeferredResult<LoginDetails> response = new DeferredResult<>();
	ListenableFuture<LoginDetails> future = asyncRabbitTemplate.convertSendAndReceive(requestQueue.getName(),
			content);
	future.addCallback(new LoginHandlerResponse(response));

	System.out.println(asyncRabbitTemplate.isAutoStartup());
	System.out.println(asyncRabbitTemplate.isRunning());

	return response;
}
 
@Test
public void submitListenableRunnable() throws Exception {
	TestTask task = new TestTask(1);
	// Act
	ListenableFuture<?> future = executor.submitListenable(task);
	future.addCallback(result -> outcome = result, ex -> outcome = ex);
	// Assert
	Awaitility.await()
				.atMost(1, TimeUnit.SECONDS)
				.pollInterval(10, TimeUnit.MILLISECONDS)
				.until(future::isDone);
	assertNull(outcome);
	assertThreadNamePrefix(task);
}
 
源代码12 项目: syncer   文件: KafkaChannel.java
private void doSend(String topic, SyncWrapper<String> wrapper) {
  final SyncData event = wrapper.getEvent();
  // require that messages with the same key (for instance, a unique id) are always seen in the correct order,
  // attaching a key to messages will ensure messages with the same key always go to the same partition in a topic
  ListenableFuture<SendResult<String, Object>> future;
  Long partitionId = event.getPartitionId();
  if (partitionId != null) {
    future = kafkaTemplate.send(topic, partitionId.toString(), event.getResult());
  } else {
    logger.warn("Send {} to {} without key", event, topic);
    future = kafkaTemplate.send(topic, event.getResult());
  }
  ListenableFutureCallback<SendResult<String, Object>> callback = new ListenableFutureCallback<SendResult<String, Object>>() {

    @Override
    public void onSuccess(final SendResult<String, Object> message) {
      ackSuccess(Lists.newArrayList(wrapper));
      logger.info("sent {} with offset {} ", event, message.getRecordMetadata().offset());
    }

    @Override
    public void onFailure(final Throwable throwable) {
      SyncerHealth.consumer(consumerId, identifier, Health.red(throwable.getMessage()));
      retryFailed(Lists.newArrayList(wrapper), throwable);
      logger.error("unable to send {} ", event, throwable);
    }
  };
  future.addCallback(callback);
  // no need to wait future, the order between batch is ensured by kafka client
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void exchangeGetCallbackWithLambdas() throws Exception {
	HttpHeaders requestHeaders = new HttpHeaders();
	requestHeaders.set("MyHeader", "MyValue");
	HttpEntity<?> requestEntity = new HttpEntity(requestHeaders);
	ListenableFuture<ResponseEntity<String>> responseFuture =
			template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, String.class, "get");
	responseFuture.addCallback(result -> assertEquals("Invalid content", helloWorld,
			result.getBody()), ex -> fail(ex.getMessage()));
	waitTillDone(responseFuture);
}
 
@Override
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
                                                      AsyncClientHttpRequestExecution execution)
                                                                                                throws IOException {
    SofaTracerSpan sofaTracerSpan = restTemplateTracer.clientSend(request.getMethod().name());
    appendRestTemplateRequestSpanTags(request, sofaTracerSpan);
    Exception exception = null;
    try {
        ListenableFuture<ClientHttpResponse> result = execution.executeAsync(request, body);
        result.addCallback(new SofaTraceListenableFutureCallback(restTemplateTracer,
            sofaTracerSpan));
        return result;
    } catch (IOException e) {
        exception = e;
        throw e;
    } finally {
        // when error , clear tl soon
        if (exception != null) {
            SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext()
                .getCurrentSpan();
            currentSpan.setTag(Tags.ERROR.getKey(), exception.getMessage());
            restTemplateTracer.clientReceive(String.valueOf(500));
        } else {
            // clear current
            SofaTraceContextHolder.getSofaTraceContext().pop();
            if (sofaTracerSpan != null && sofaTracerSpan.getParentSofaTracerSpan() != null) {
                // reset parent
                SofaTraceContextHolder.getSofaTraceContext().push(
                    sofaTracerSpan.getParentSofaTracerSpan());
            }
        }
    }
}
 
源代码15 项目: sctalk   文件: IMWebrtcHandlerImpl.java
@Override
    public void initiateRes(IMHeader header, MessageLite body, ChannelHandlerContext ctx) {
        IMAVCallInitiateRes msg = (IMAVCallInitiateRes)body;
        long fromId = msg.getFromId();
        long toId = msg.getToId();
        long callId = msg.getCallId();

        ListenableFuture<?> future =messageServerCluster.webrtcInitateCallRes(fromId, toId, super.getHandleId(ctx));
        
        future.addCallback((result) -> {
            // TODO 通知其他端取消
            // messageServerCluster.sendToUser(toId, header, body);
        }, (throwable) -> {
            // TODO 接受失败
            // 所有端取消
        });
        // IMBaseDefine.ClientType nType = msg.getCalledClientType();
        // logger.debug("webrtc initiate resposne {} {} {} {}", fromId, toId, callId, nType);
        
//        ClientUser toClientUser = ClientUserManager.getUserById(toId);
//        if (toClientUser != null ){
//            
//            // 呼叫接起时,取消其他端的呼叫提醒
//            IMHeader hdCancel = new IMHeader();
//            hdCancel.setServiceId(ServiceID.SID_AVCALL_VALUE);
//            hdCancel.setCommandId(AVCallCmdId.CID_AVCALL_CANCEL_REQ_VALUE);
//            
//            IMAVCallCancelReq msgCancelReq = IMAVCallCancelReq.newBuilder().setFromId(fromId)
//                    .setToId(toId).setCallId(callId).build();
//            
//            IMProtoMessage<MessageLite>  msgCancel = new IMProtoMessage<MessageLite>(hdCancel, msgCancelReq);
//            toClientUser.broadcast(msgCancel, ctx);
//        }
//        
//        messageServerCluster.send(header, body);
    }
 
@Test
public void testPull_AndManualMultiSubscriptionAck()
		throws InterruptedException, ExecutionException, TimeoutException {
	ExecutorService mockExecutor = Mockito.mock(ExecutorService.class);
	this.pubSubSubscriberTemplate.setAckExecutor(mockExecutor);

	List<AcknowledgeablePubsubMessage> result1 = this.pubSubSubscriberTemplate.pull(
			"sub1", 1, true);
	List<AcknowledgeablePubsubMessage> result2 = this.pubSubSubscriberTemplate.pull(
			"sub2", 1, true);
	Set<AcknowledgeablePubsubMessage> combinedMessages = new HashSet<>(result1);
	combinedMessages.addAll(result2);

	assertThat(combinedMessages.size()).isEqualTo(2);

	TestListenableFutureCallback testListenableFutureCallback = new TestListenableFutureCallback();

	ListenableFuture<Void> listenableFuture = this.pubSubSubscriberTemplate.ack(combinedMessages);
	assertThat(listenableFuture).isNotNull();

	listenableFuture.addCallback(testListenableFutureCallback);
	listenableFuture.get(10L, TimeUnit.SECONDS);

	assertThat(listenableFuture.isDone()).isTrue();
	assertThat(testListenableFutureCallback.getThrowable()).isNull();
	verify(this.ackCallable, times(2)).futureCall(any(AcknowledgeRequest.class));
	verify(this.ackApiFuture, times(2)).addListener(any(), same(mockExecutor));
}
 
@Test
public void postForEntityCallbackWithLambdas() throws Exception  {
	HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
	ListenableFuture<ResponseEntity<String>> responseEntityFuture =
			template.postForEntity(baseUrl + "/{method}", requestEntity, String.class, "post");
	responseEntityFuture.addCallback(result -> assertEquals("Invalid content", helloWorld, result.getBody()),
			ex -> fail(ex.getMessage()));
	while (!responseEntityFuture.isDone()) {
	}
}
 
@Test
public void serverErrorCallbackWithLambdas() throws Exception {
	ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
	future.addCallback(result -> fail("onSuccess not expected"), ex -> {
			assertTrue(ex instanceof HttpServerErrorException);
			HttpServerErrorException hsex = (HttpServerErrorException) ex;
			assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode());
			assertNotNull(hsex.getStatusText());
			assertNotNull(hsex.getResponseBodyAsString());
	});
	waitTillDone(future);
}
 
/**
 * Forward the given message to the STOMP broker.
 * <p>The method checks whether we have an active TCP connection and have
 * received the STOMP CONNECTED frame. For client messages this should be
 * false only if we lose the TCP connection around the same time when a
 * client message is being forwarded, so we simply log the ignored message
 * at debug level. For messages from within the application being sent on
 * the "system" connection an exception is raised so that components sending
 * the message have a chance to handle it -- by default the broker message
 * channel is synchronous.
 * <p>Note that if messages arrive concurrently around the same time a TCP
 * connection is lost, there is a brief period of time before the connection
 * is reset when one or more messages may sneak through and an attempt made
 * to forward them. Rather than synchronizing to guard against that, this
 * method simply lets them try and fail. For client sessions that may
 * result in an additional STOMP ERROR frame(s) being sent downstream but
 * code handling that downstream should be idempotent in such cases.
 * @param message the message to send (never {@code null})
 * @return a future to wait for the result
 */
@SuppressWarnings("unchecked")
public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
	TcpConnection<byte[]> conn = this.tcpConnection;

	if (!this.isStompConnected || conn == null) {
		if (this.isRemoteClientSession) {
			if (logger.isDebugEnabled()) {
				logger.debug("TCP connection closed already, ignoring " +
						accessor.getShortLogMessage(message.getPayload()));
			}
			return EMPTY_TASK;
		}
		else {
			throw new IllegalStateException("Cannot forward messages " +
					(conn != null ? "before STOMP CONNECTED. " : "while inactive. ") +
					"Consider subscribing to receive BrokerAvailabilityEvent's from " +
					"an ApplicationListener Spring bean. Dropped " +
					accessor.getShortLogMessage(message.getPayload()));
		}
	}

	final Message<?> messageToSend = (accessor.isMutable() && accessor.isModified()) ?
			MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;

	StompCommand command = accessor.getCommand();
	if (logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) ||
			StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) {
		logger.debug("Forwarding " + accessor.getShortLogMessage(message.getPayload()));
	}
	else if (logger.isTraceEnabled()) {
		logger.trace("Forwarding " + accessor.getDetailedLogMessage(message.getPayload()));
	}

	ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
	future.addCallback(new ListenableFutureCallback<Void>() {
		@Override
		public void onSuccess(@Nullable Void result) {
			if (accessor.getCommand() == StompCommand.DISCONNECT) {
				afterDisconnectSent(accessor);
			}
		}
		@Override
		public void onFailure(Throwable ex) {
			if (tcpConnection != null) {
				handleTcpConnectionFailure("failed to forward " +
						accessor.getShortLogMessage(message.getPayload()), ex);
			}
			else if (logger.isErrorEnabled()) {
				logger.error("Failed to forward " + accessor.getShortLogMessage(message.getPayload()));
			}
		}
	});
	return future;
}
 
源代码20 项目: kkbinlog   文件: DataPublisherKafkaImpl.java
@Override
public void doPublish(String topic, Object data) {
    ListenableFuture<SendResult<String, Object>> reuslt = kafkaTemplate.send(topic, data);
    reuslt.addCallback(success -> log.info("推送消息到Kafka:{}", data)
    , failure -> log.error("推送消息到Kafka失败:" + data.toString(), failure.getCause()));
}