类org.springframework.util.concurrent.ListenableFutureCallback源码实例Demo

下面列出了怎么用org.springframework.util.concurrent.ListenableFutureCallback的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: cubeai   文件: KafkaProducer.java
public void send(String topic, String message) {
    // the KafkaTemplate provides asynchronous send methods returning a Future
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

    // register a callback with the listener to receive the result of the send asynchronously
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            log.info("Kafka sent message='{}' with offset={}", message,
                result.getRecordMetadata().offset());
        }

        @Override
        public void onFailure(Throwable ex) {
            log.error("Kafka unable to send message='{}'", message, ex);
        }
    });

    // or, to block the sending thread to await the result, invoke the future's get() method
}
 
@Override
protected void openConnection() {
	if (logger.isInfoEnabled()) {
		logger.info("Connecting to WebSocket at " + getUri());
	}

	ListenableFuture<WebSocketSession> future =
			this.client.doHandshake(this.webSocketHandler, this.headers, getUri());

	future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
		@Override
		public void onSuccess(@Nullable WebSocketSession result) {
			webSocketSession = result;
			logger.info("Successfully connected");
		}
		@Override
		public void onFailure(Throwable ex) {
			logger.error("Failed to connect", ex);
		}
	});
}
 
源代码3 项目: spring-analysis-note   文件: WebSocketTransport.java
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
	final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<>();
	WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future);
	handler = new ClientSockJsWebSocketHandler(session);
	request.addTimeoutTask(session.getTimeoutTask());

	URI url = request.getTransportUrl();
	WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders());
	if (logger.isDebugEnabled()) {
		logger.debug("Starting WebSocket session on " + url);
	}
	this.webSocketClient.doHandshake(handler, headers, url).addCallback(
			new ListenableFutureCallback<WebSocketSession>() {
				@Override
				public void onSuccess(@Nullable WebSocketSession webSocketSession) {
					// WebSocket session ready, SockJS Session not yet
				}
				@Override
				public void onFailure(Throwable ex) {
					future.setException(ex);
				}
			});
	return future;
}
 
@Test
public void infoRequestFailure() throws Exception {
	TestClientHandler handler = new TestClientHandler();
	this.testFilter.sendErrorMap.put("/info", 500);
	CountDownLatch latch = new CountDownLatch(1);
	initSockJsClient(createWebSocketTransport());
	this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").addCallback(
			new ListenableFutureCallback<WebSocketSession>() {
				@Override
				public void onSuccess(WebSocketSession result) {
				}

				@Override
				public void onFailure(Throwable ex) {
					latch.countDown();
				}
			}
	);
	assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
 
@Test
public void connectFailure() throws Exception {
	final HttpServerErrorException expected = new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR);
	RestOperations restTemplate = mock(RestOperations.class);
	given(restTemplate.execute((URI) any(), eq(HttpMethod.POST), any(), any())).willThrow(expected);

	final CountDownLatch latch = new CountDownLatch(1);
	connect(restTemplate).addCallback(
			new ListenableFutureCallback<WebSocketSession>() {
				@Override
				public void onSuccess(WebSocketSession result) {
				}
				@Override
				public void onFailure(Throwable ex) {
					if (ex == expected) {
						latch.countDown();
					}
				}
			}
	);
	verifyNoMoreInteractions(this.webSocketHandler);
}
 
@Test
public void getEntityCallback() throws Exception {
	ListenableFuture<ResponseEntity<String>> futureEntity =
			template.getForEntity(baseUrl + "/{method}", String.class, "get");
	futureEntity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
		@Override
		public void onSuccess(ResponseEntity<String> entity) {
			assertEquals("Invalid content", helloWorld, entity.getBody());
			assertFalse("No headers", entity.getHeaders().isEmpty());
			assertEquals("Invalid content-type", textContentType, entity.getHeaders().getContentType());
			assertEquals("Invalid status code", HttpStatus.OK, entity.getStatusCode());
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(futureEntity);
}
 
@Test
public void postForLocationCallback() throws Exception  {
	HttpHeaders entityHeaders = new HttpHeaders();
	entityHeaders.setContentType(new MediaType("text", "plain", StandardCharsets.ISO_8859_1));
	HttpEntity<String> entity = new HttpEntity<>(helloWorld, entityHeaders);
	final URI expected = new URI(baseUrl + "/post/1");
	ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
	locationFuture.addCallback(new ListenableFutureCallback<URI>() {
		@Override
		public void onSuccess(URI result) {
			assertEquals("Invalid location", expected, result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(locationFuture);
}
 
@Test
public void putCallback() throws Exception  {
	HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
	ListenableFuture<?> responseEntityFuture = template.put(baseUrl + "/{method}", requestEntity, "put");
	responseEntityFuture.addCallback(new ListenableFutureCallback<Object>() {
		@Override
		public void onSuccess(Object result) {
			assertNull(result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(responseEntityFuture);
}
 
@Test
public void notFoundCallback() throws Exception {
	ListenableFuture<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
	future.addCallback(new ListenableFutureCallback<Object>() {
		@Override
		public void onSuccess(Object result) {
			fail("onSuccess not expected");
		}
		@Override
		public void onFailure(Throwable t) {
			assertTrue(t instanceof HttpClientErrorException);
			HttpClientErrorException ex = (HttpClientErrorException) t;
			assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
			assertNotNull(ex.getStatusText());
			assertNotNull(ex.getResponseBodyAsString());
		}
	});
	waitTillDone(future);
}
 
@Test
public void infoRequestFailure() throws Exception {
	TestClientHandler handler = new TestClientHandler();
	this.testFilter.sendErrorMap.put("/info", 500);
	CountDownLatch latch = new CountDownLatch(1);
	initSockJsClient(createWebSocketTransport());
	this.sockJsClient.doHandshake(handler, this.baseUrl + "/echo").addCallback(
			new ListenableFutureCallback<WebSocketSession>() {
				@Override
				public void onSuccess(WebSocketSession result) {
				}

				@Override
				public void onFailure(Throwable ex) {
					latch.countDown();
				}
			}
	);
	assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
 
@Test
public void optionsForAllowCallback() throws Exception {
	ListenableFuture<Set<HttpMethod>> allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get"));
	allowedFuture.addCallback(new ListenableFutureCallback<Set<HttpMethod>>() {
		@Override
		public void onSuccess(Set<HttpMethod> result) {
			assertEquals("Invalid response", EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS,
					HttpMethod.HEAD, HttpMethod.TRACE), result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(allowedFuture);
}
 
@Before
public void beforeMethod() {
    listenableFutureCallbackMock = mock(ListenableFutureCallback.class);

    successInObj = new Object();
    failureInObj = new Exception("kaboom");
    throwExceptionDuringCall = false;
    currentSpanStackWhenListenableFutureCallbackWasCalled = new ArrayList<>();
    currentMdcInfoWhenListenableFutureCallbackWasCalled = new ArrayList<>();
    doAnswer(invocation -> {
        currentSpanStackWhenListenableFutureCallbackWasCalled.add(Tracer.getInstance().getCurrentSpanStackCopy());
        currentMdcInfoWhenListenableFutureCallbackWasCalled.add(MDC.getCopyOfContextMap());
        if (throwExceptionDuringCall)
            throw new RuntimeException("kaboom");
        return null;
    }).when(listenableFutureCallbackMock).onSuccess(successInObj);
    doAnswer(invocation -> {
        currentSpanStackWhenListenableFutureCallbackWasCalled.add(Tracer.getInstance().getCurrentSpanStackCopy());
        currentMdcInfoWhenListenableFutureCallbackWasCalled.add(MDC.getCopyOfContextMap());
        if (throwExceptionDuringCall)
            throw new RuntimeException("kaboom");
        return null;
    }).when(listenableFutureCallbackMock).onFailure(failureInObj);

    resetTracing();
}
 
源代码13 项目: enode   文件: SendKafkaMessageService.java
@Override
public CompletableFuture<Void> sendMessageAsync(QueueMessage queueMessage) {
    CompletableFuture<Void> future = new CompletableFuture<>();
    ProducerRecord<String, String> message = KafkaTool.covertToProducerRecord(queueMessage);
    producer.send(message).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onFailure(Throwable throwable) {
            logger.error("Enode message async send has exception, message: {}", message, throwable);
            future.completeExceptionally(new IORuntimeException(throwable));
        }

        @Override
        public void onSuccess(SendResult<String, String> result) {
            if (logger.isDebugEnabled()) {
                logger.debug("Enode message async send success, sendResult: {}, message: {}", result, message);
            }
            future.complete(null);
        }
    });
    return future;
}
 
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
		ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

	if (returnValue == null) {
		mavContainer.setRequestHandled(true);
		return;
	}

	final DeferredResult<Object> deferredResult = new DeferredResult<Object>();
	WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);

	ListenableFuture<?> future = (ListenableFuture<?>) returnValue;
	future.addCallback(new ListenableFutureCallback<Object>() {
		@Override
		public void onSuccess(Object result) {
			deferredResult.setResult(result);
		}
		@Override
		public void onFailure(Throwable ex) {
			deferredResult.setErrorResult(ex);
		}
	});
}
 
源代码15 项目: stateful-functions   文件: KafkaDriverPublisher.java
@Override
public void accept(InboundDriverMessage driver) {
  byte[] keyBytes = driver.getDriverId().getBytes(StandardCharsets.UTF_8);
  ListenableFuture<SendResult<Object, Object>> future =
      kafkaTemplate.send(topic, keyBytes, driver.toByteArray());

  future.addCallback(
      new ListenableFutureCallback<SendResult<Object, Object>>() {
        @Override
        public void onFailure(Throwable throwable) {
          log.warn("Failed sending an event to kafka", throwable);
        }

        @Override
        public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {}
      });
}
 
@Override
public void accept(InboundPassengerMessage passenger) {
  byte[] bytes = passenger.getPassengerId().getBytes(StandardCharsets.UTF_8);
  kafkaTemplate
      .send(topic, bytes, passenger.toByteArray())
      .addCallback(
          new ListenableFutureCallback<SendResult<Object, Object>>() {
            @Override
            public void onFailure(@NonNull Throwable throwable) {
              log.warn("couldn't send passenger data.", throwable);
            }

            @Override
            public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
              log.info("Sent passenger data");
            }
          });
}
 
@Test
public void notFoundCallback() throws Exception {
	ListenableFuture<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
	future.addCallback(new ListenableFutureCallback<Object>() {
		@Override
		public void onSuccess(Object result) {
			fail("onSuccess not expected");
		}
		@Override
		public void onFailure(Throwable t) {
			assertTrue(t instanceof HttpClientErrorException);
			HttpClientErrorException ex = (HttpClientErrorException) t;
			assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
			assertNotNull(ex.getStatusText());
			assertNotNull(ex.getResponseBodyAsString());
		}
	});
	while (!future.isDone()) {
	}
}
 
@Test
public void postForEntityCallback() throws Exception  {
	HttpEntity<String> requestEntity = new HttpEntity<>(helloWorld);
	ListenableFuture<ResponseEntity<String>> responseEntityFuture =
			template.postForEntity(baseUrl + "/{method}", requestEntity, String.class, "post");
	responseEntityFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
		@Override
		public void onSuccess(ResponseEntity<String> result) {
			assertEquals("Invalid content", helloWorld, result.getBody());
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	while (!responseEntityFuture.isDone()) {
	}
}
 
@Override
protected void openConnection() {
	if (logger.isInfoEnabled()) {
		logger.info("Connecting to WebSocket at " + getUri());
	}

	ListenableFuture<WebSocketSession> future =
			this.client.doHandshake(this.webSocketHandler, this.headers, getUri());

	future.addCallback(new ListenableFutureCallback<WebSocketSession>() {
		@Override
		public void onSuccess(@Nullable WebSocketSession result) {
			webSocketSession = result;
			logger.info("Successfully connected");
		}
		@Override
		public void onFailure(Throwable ex) {
			logger.error("Failed to connect", ex);
		}
	});
}
 
源代码20 项目: java-technology-stack   文件: WebSocketTransport.java
@Override
public ListenableFuture<WebSocketSession> connect(TransportRequest request, WebSocketHandler handler) {
	final SettableListenableFuture<WebSocketSession> future = new SettableListenableFuture<>();
	WebSocketClientSockJsSession session = new WebSocketClientSockJsSession(request, handler, future);
	handler = new ClientSockJsWebSocketHandler(session);
	request.addTimeoutTask(session.getTimeoutTask());

	URI url = request.getTransportUrl();
	WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders());
	if (logger.isDebugEnabled()) {
		logger.debug("Starting WebSocket session on " + url);
	}
	this.webSocketClient.doHandshake(handler, headers, url).addCallback(
			new ListenableFutureCallback<WebSocketSession>() {
				@Override
				public void onSuccess(@Nullable WebSocketSession webSocketSession) {
					// WebSocket session ready, SockJS Session not yet
				}
				@Override
				public void onFailure(Throwable ex) {
					future.setException(ex);
				}
			});
	return future;
}
 
@Test
public void connectFailure() throws Exception {
	final HttpServerErrorException expected = new HttpServerErrorException(HttpStatus.INTERNAL_SERVER_ERROR);
	RestOperations restTemplate = mock(RestOperations.class);
	given(restTemplate.execute((URI) any(), eq(HttpMethod.POST), any(), any())).willThrow(expected);

	final CountDownLatch latch = new CountDownLatch(1);
	connect(restTemplate).addCallback(
			new ListenableFutureCallback<WebSocketSession>() {
				@Override
				public void onSuccess(WebSocketSession result) {
				}
				@Override
				public void onFailure(Throwable ex) {
					if (ex == expected) {
						latch.countDown();
					}
				}
			}
	);
	verifyNoMoreInteractions(this.webSocketHandler);
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void exchangeGetCallback() throws Exception {
	HttpHeaders requestHeaders = new HttpHeaders();
	requestHeaders.set("MyHeader", "MyValue");
	HttpEntity<?> requestEntity = new HttpEntity(requestHeaders);
	ListenableFuture<ResponseEntity<String>> responseFuture =
			template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, String.class, "get");
	responseFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
		@Override
		public void onSuccess(ResponseEntity<String> result) {
			assertEquals("Invalid content", helloWorld, result.getBody());
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	while (!responseFuture.isDone()) {
	}
}
 
源代码23 项目: dhis2-core   文件: MessageSendingCallback.java
public ListenableFutureCallback<OutboundMessageResponseSummary> getBatchCallBack()
{
    return new ListenableFutureCallback<OutboundMessageResponseSummary>()
    {
        @Override
        public void onFailure( Throwable ex )
        {
            log.error( "Message sending failed", ex );
        }

        @Override
        public void onSuccess( OutboundMessageResponseSummary result )
        {
            int successful = result.getSent();
            int failed = result.getFailed();

            log.info( String.format( "%s Message sending status: Successful: %d Failed: %d", result.getChannel().name(), successful, failed ) );
        }
    };
}
 
@Test
public void postForLocationCallback() throws Exception  {
	HttpHeaders entityHeaders = new HttpHeaders();
	entityHeaders.setContentType(new MediaType("text", "plain", StandardCharsets.ISO_8859_1));
	HttpEntity<String> entity = new HttpEntity<>(helloWorld, entityHeaders);
	final URI expected = new URI(baseUrl + "/post/1");
	ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
	locationFuture.addCallback(new ListenableFutureCallback<URI>() {
		@Override
		public void onSuccess(URI result) {
			assertEquals("Invalid location", expected, result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(locationFuture);
}
 
@Test
public void serverErrorCallback() throws Exception {
	ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
	future.addCallback(new ListenableFutureCallback<Void>() {
		@Override
		public void onSuccess(Void result) {
			fail("onSuccess not expected");
		}
		@Override
		public void onFailure(Throwable ex) {
			assertTrue(ex instanceof HttpServerErrorException);
			HttpServerErrorException hsex = (HttpServerErrorException) ex;
			assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode());
			assertNotNull(hsex.getStatusText());
			assertNotNull(hsex.getResponseBodyAsString());
		}
	});
	while (!future.isDone()) {
	}
}
 
@Test
public void postForLocationCallback() throws Exception  {
	HttpHeaders entityHeaders = new HttpHeaders();
	entityHeaders.setContentType(new MediaType("text", "plain", Charset.forName("ISO-8859-15")));
	HttpEntity<String> entity = new HttpEntity<String>(helloWorld, entityHeaders);
	final URI expected = new URI(baseUrl + "/post/1");
	ListenableFuture<URI> locationFuture = template.postForLocation(baseUrl + "/{method}", entity, "post");
	locationFuture.addCallback(new ListenableFutureCallback<URI>() {
		@Override
		public void onSuccess(URI result) {
			assertEquals("Invalid location", expected, result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	while (!locationFuture.isDone()) {
	}
}
 
@Test
public void notFoundCallback() throws Exception {
	ListenableFuture<?> future = template.execute(baseUrl + "/status/notfound", HttpMethod.GET, null, null);
	future.addCallback(new ListenableFutureCallback<Object>() {
		@Override
		public void onSuccess(Object result) {
			fail("onSuccess not expected");
		}
		@Override
		public void onFailure(Throwable t) {
			assertTrue(t instanceof HttpClientErrorException);
			HttpClientErrorException ex = (HttpClientErrorException) t;
			assertEquals(HttpStatus.NOT_FOUND, ex.getStatusCode());
			assertNotNull(ex.getStatusText());
			assertNotNull(ex.getResponseBodyAsString());
		}
	});
	waitTillDone(future);
}
 
@Test
public void serverErrorCallback() throws Exception {
	ListenableFuture<Void> future = template.execute(baseUrl + "/status/server", HttpMethod.GET, null, null);
	future.addCallback(new ListenableFutureCallback<Void>() {
		@Override
		public void onSuccess(Void result) {
			fail("onSuccess not expected");
		}
		@Override
		public void onFailure(Throwable ex) {
			assertTrue(ex instanceof HttpServerErrorException);
			HttpServerErrorException hsex = (HttpServerErrorException) ex;
			assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, hsex.getStatusCode());
			assertNotNull(hsex.getStatusText());
			assertNotNull(hsex.getResponseBodyAsString());
		}
	});
	waitTillDone(future);
}
 
@Test
public void optionsForAllowCallback() throws Exception {
	ListenableFuture<Set<HttpMethod>> allowedFuture = template.optionsForAllow(new URI(baseUrl + "/get"));
	allowedFuture.addCallback(new ListenableFutureCallback<Set<HttpMethod>>() {
		@Override
		public void onSuccess(Set<HttpMethod> result) {
			assertEquals("Invalid response", EnumSet.of(HttpMethod.GET, HttpMethod.OPTIONS,
					HttpMethod.HEAD, HttpMethod.TRACE), result);
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(allowedFuture);
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void exchangeGetCallback() throws Exception {
	HttpHeaders requestHeaders = new HttpHeaders();
	requestHeaders.set("MyHeader", "MyValue");
	HttpEntity<?> requestEntity = new HttpEntity(requestHeaders);
	ListenableFuture<ResponseEntity<String>> responseFuture =
			template.exchange(baseUrl + "/{method}", HttpMethod.GET, requestEntity, String.class, "get");
	responseFuture.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
		@Override
		public void onSuccess(ResponseEntity<String> result) {
			assertEquals("Invalid content", helloWorld, result.getBody());
		}
		@Override
		public void onFailure(Throwable ex) {
			fail(ex.getMessage());
		}
	});
	waitTillDone(responseFuture);
}
 
 类所在包
 类方法
 同包方法